Performance Optimierung da System mit 450k Daten nach 12 Stunden zu langsam!
All checks were successful
release-tag / release-image (push) Successful in 2m14s

This commit is contained in:
2026-04-27 22:16:26 +02:00
parent 32322ff2cc
commit 4cc14cae76
2 changed files with 296 additions and 26 deletions

View File

@@ -1809,4 +1809,108 @@ CREATE TABLE IF NOT EXISTS user_source_ip_seen (
last_seen DATETIME(6) NOT NULL,
seen_count BIGINT NOT NULL DEFAULT 1,
PRIMARY KEY (username, src_ip, hostname)
);
);
###
ALTER TABLE detections
ADD INDEX idx_detections_created_at (created_at),
ADD INDEX idx_detections_status_created (status, created_at),
ADD INDEX idx_detections_severity_created (severity, created_at),
ADD INDEX idx_detections_host_created (hostname, created_at),
ADD INDEX idx_detections_rule_created (rule_name, created_at),
ADD INDEX idx_detections_host_rule_created (hostname, rule_name, created_at),
ADD INDEX idx_detections_host_status_created (hostname, status, created_at),
ADD INDEX idx_detections_risk_window (created_at, status, hostname, severity),
ADD INDEX idx_detections_window_lookup (hostname, window_start, window_end);
ALTER TABLE ueba_user_baseline
ADD UNIQUE KEY uq_ueba_user_context (username, hostname, src_ip, workstation),
ADD INDEX idx_ueba_user_last_seen (username, last_seen),
ADD INDEX idx_ueba_host_last_seen (hostname, last_seen);
ALTER TABLE user_source_ip_seen
ADD UNIQUE KEY uq_user_source_ip_host (username, src_ip, hostname),
ADD INDEX idx_user_source_ip_last_seen (username, src_ip, last_seen);
ALTER TABLE user_privilege_baseline
ADD UNIQUE KEY uq_user_privilege_username (username);
ALTER TABLE baseline_event_stats
ADD UNIQUE KEY uq_baseline_bucket (
hostname,
channel_name,
event_id,
hour_of_day,
day_of_week
);
ALTER TABLE detection_suppressions
ADD INDEX idx_detection_suppressions_lookup (
enabled,
rule_name,
hostname,
channel_name,
event_id,
expires_at
);
ALTER TABLE baseline_exclusions
ADD INDEX idx_baseline_exclusions_lookup (
enabled,
hostname,
channel_name,
event_id,
expires_at
);
ALTER TABLE detections
ADD INDEX idx_detections_status_created2 (status, created_at),
ADD INDEX idx_detections_severity_created2 (severity, created_at);
###
CREATE TABLE IF NOT EXISTS event_log_raw (
event_log_id BIGINT UNSIGNED NOT NULL,
msg MEDIUMTEXT NOT NULL,
msg_sha256 CHAR(64) NOT NULL,
created_at DATETIME(6) NOT NULL DEFAULT (UTC_TIMESTAMP(6)),
PRIMARY KEY (event_log_id),
INDEX idx_event_log_raw_sha256 (msg_sha256),
INDEX idx_event_log_raw_created_at (created_at)
);
ALTER TABLE event_log_raw
ADD CONSTRAINT fk_event_log_raw_event
FOREIGN KEY (event_log_id)
REFERENCES event_logs(id)
ON DELETE CASCADE;
######################## MIGRATION ############################
INSERT INTO event_log_raw (event_log_id, msg, msg_sha256, created_at)
SELECT id, msg, msg_sha256, COALESCE(received_at, UTC_TIMESTAMP(6))
FROM event_logs
WHERE msg IS NOT NULL
AND msg <> ''
ON DUPLICATE KEY UPDATE
msg = VALUES(msg),
msg_sha256 = VALUES(msg_sha256);
######################## MIGRATION ############################
######################## TEST #################################
SELECT COUNT(*) AS raw_rows FROM event_log_raw;
SELECT COUNT(*) AS event_rows_with_msg FROM event_logs WHERE msg IS NOT NULL AND msg <> '';
######################## TEST #################################
######################## MIGRATION ############################
UPDATE event_logs
SET msg = ''
WHERE msg IS NOT NULL
AND msg <> ''
LIMIT 10000;
######################## MIGRATION ############################
######################## TEST #################################
SELECT COUNT(*) FROM event_logs WHERE msg IS NOT NULL AND msg <> '';
######################## TEST #################################

216
main.go
View File

@@ -1474,6 +1474,11 @@ type PrivilegedUsersPageData struct {
Users []PrivilegedUserRow
}
type RawEventInsert struct {
Message string
SHA256 string
}
var (
httpRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{Name: "eventcollector_http_requests_total", Help: "Total HTTP requests."},
@@ -2062,15 +2067,85 @@ LIMIT ?
return out, rows.Err()
}
func (s *server) getRawEventXMLByID(ctx context.Context, id uint64) (string, error) {
var msg string
err := s.db.QueryRowContext(ctx, `
SELECT msg
FROM event_log_raw
WHERE event_log_id = ?
LIMIT 1
`, id).Scan(&msg)
if err == nil {
return msg, nil
}
if !errors.Is(err, sql.ErrNoRows) {
return "", err
}
// Fallback für Alt-Daten, falls event_log_raw noch nicht vollständig befüllt ist.
err = s.db.QueryRowContext(ctx, `
SELECT COALESCE(msg, '')
FROM event_logs
WHERE id = ?
LIMIT 1
`, id).Scan(&msg)
if err != nil {
return "", err
}
return msg, nil
}
func eventListSummary(ev EventRow) string {
user := firstNonEmpty(ev.TargetUser, ev.SubjectUser)
parts := []string{
fmt.Sprintf("%s EventID %d", ev.Channel, ev.EventID),
}
if user != "" {
parts = append(parts, "User="+user)
}
if ev.SrcIP != "" {
parts = append(parts, "IP="+ev.SrcIP)
}
if ev.Workstation != "" {
parts = append(parts, "Workstation="+ev.Workstation)
}
if ev.ProcessName != "" {
parts = append(parts, "Process="+ev.ProcessName)
}
if ev.FailureReason != "" {
parts = append(parts, "Reason="+ev.FailureReason)
}
return strings.Join(parts, " | ")
}
func (s *server) listSOCRecentIncidents(ctx context.Context, limit int) ([]SOCRecentIncidentRow, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, created_at, rule_name, severity, status, hostname, summary
FROM detections
WHERE status IN ('open', 'investigating', 'confirmed_incident')
OR severity IN ('high', 'critical')
FROM (
SELECT id, created_at, rule_name, severity, status, hostname, summary
FROM detections
WHERE status IN ('open', 'investigating', 'confirmed_incident')
ORDER BY created_at DESC
LIMIT ?
UNION DISTINCT
SELECT id, created_at, rule_name, severity, status, hostname, summary
FROM detections
WHERE severity IN ('high', 'critical')
ORDER BY created_at DESC
LIMIT ?
) x
ORDER BY created_at DESC
LIMIT ?
`, limit)
`, limit, limit, limit)
if err != nil {
return nil, err
}
@@ -2090,8 +2165,10 @@ LIMIT ?
); err != nil {
return nil, err
}
r.CreatedAt = normalizeTime(r.CreatedAt)
out = append(out, r)
}
return out, rows.Err()
}
@@ -2921,6 +2998,13 @@ func (s *server) handleUIEvents(w http.ResponseWriter, r *http.Request) {
return
}
timeFrom := strings.TrimSpace(r.URL.Query().Get("from"))
timeTo := strings.TrimSpace(r.URL.Query().Get("to"))
if timeFrom == "" && timeTo == "" {
timeFrom = time.Now().UTC().Add(-24 * time.Hour).Format(time.RFC3339)
}
filter := EventFilter{
Host: strings.TrimSpace(r.URL.Query().Get("host")),
Channel: strings.TrimSpace(r.URL.Query().Get("channel")),
@@ -2928,8 +3012,8 @@ func (s *server) handleUIEvents(w http.ResponseWriter, r *http.Request) {
User: strings.TrimSpace(r.URL.Query().Get("user")),
SrcIP: strings.TrimSpace(r.URL.Query().Get("src_ip")),
Severity: strings.TrimSpace(r.URL.Query().Get("severity")),
TimeFrom: strings.TrimSpace(r.URL.Query().Get("from")),
TimeTo: strings.TrimSpace(r.URL.Query().Get("to")),
TimeFrom: timeFrom,
TimeTo: timeTo,
Limit: 100,
}
@@ -3109,7 +3193,7 @@ SELECT id, hostname, channel_name, event_id, source, computer, provider_name,
target_user, target_domain, subject_user, subject_domain,
workstation, src_ip, src_port, logon_type, process_name,
authentication_package, logon_process, status_text, sub_status_text,
failure_reason, ts, received_at, msg
failure_reason, ts, received_at
FROM event_logs
WHERE 1=1
`
@@ -3151,12 +3235,13 @@ WHERE 1=1
if f.Rule != "" || f.Severity != "" {
query += ` AND EXISTS (
SELECT 1
FROM detections d
WHERE d.hostname = event_logs.hostname
AND event_logs.ts >= d.window_start
AND event_logs.ts <= d.window_end
`
SELECT 1
FROM detections d
WHERE d.hostname = event_logs.hostname
AND event_logs.ts >= d.window_start
AND event_logs.ts <= d.window_end
AND d.created_at >= DATE_SUB(event_logs.ts, INTERVAL 1 DAY)
`
if f.Rule != "" {
query += ` AND d.rule_name = ?`
args = append(args, f.Rule)
@@ -3186,10 +3271,15 @@ WHERE 1=1
&ev.TargetUser, &ev.TargetDomain, &ev.SubjectUser, &ev.SubjectDomain,
&ev.Workstation, &ev.SrcIP, &ev.SrcPort, &ev.LogonType, &ev.ProcessName,
&ev.AuthenticationPackage, &ev.LogonProcess, &ev.StatusText, &ev.SubStatusText,
&ev.FailureReason, &ev.Time, &ev.ReceivedAt, &ev.Message,
&ev.FailureReason, &ev.Time, &ev.ReceivedAt,
); err != nil {
return nil, err
}
ev.Time = normalizeTime(ev.Time)
ev.ReceivedAt = normalizeTime(ev.ReceivedAt)
ev.Message = eventListSummary(ev)
out = append(out, ev)
}
@@ -3202,7 +3292,7 @@ SELECT id, hostname, channel_name, event_id, source, computer, provider_name,
target_user, target_domain, subject_user, subject_domain,
workstation, src_ip, src_port, logon_type, process_name,
authentication_package, logon_process, status_text, sub_status_text,
failure_reason, ts, received_at, msg
failure_reason, ts, received_at
FROM event_logs
WHERE id = ?
LIMIT 1
@@ -3211,13 +3301,26 @@ LIMIT 1
var ev EventRow
err := s.db.QueryRowContext(ctx, q, id).Scan(
&ev.ID, &ev.Hostname, &ev.Channel, &ev.EventID, &ev.Source,
&ev.Computer, &ev.ProviderName,
&ev.TargetUser, &ev.TargetDomain, &ev.SubjectUser, &ev.SubjectDomain,
&ev.Workstation, &ev.SrcIP, &ev.SrcPort, &ev.LogonType, &ev.ProcessName,
&ev.AuthenticationPackage, &ev.LogonProcess, &ev.StatusText, &ev.SubStatusText,
&ev.FailureReason, &ev.Time, &ev.ReceivedAt, &ev.Message,
&ev.Computer, &ev.ProviderName, &ev.TargetUser, &ev.TargetDomain,
&ev.SubjectUser, &ev.SubjectDomain, &ev.Workstation, &ev.SrcIP,
&ev.SrcPort, &ev.LogonType, &ev.ProcessName, &ev.AuthenticationPackage,
&ev.LogonProcess, &ev.StatusText, &ev.SubStatusText,
&ev.FailureReason, &ev.Time, &ev.ReceivedAt,
)
return ev, err
if err != nil {
return ev, err
}
ev.Time = normalizeTime(ev.Time)
ev.ReceivedAt = normalizeTime(ev.ReceivedAt)
raw, err := s.getRawEventXMLByID(ctx, id)
if err != nil {
return ev, err
}
ev.Message = raw
return ev, nil
}
func parseUIRFC3339(v string) (time.Time, error) {
@@ -3557,11 +3660,19 @@ INSERT INTO event_logs (
) VALUES
`)
rawEvents := make([]RawEventInsert, 0, len(batch))
for i, item := range batch {
if i > 0 {
sb.WriteString(",")
}
sb.WriteString("(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,UTC_TIMESTAMP(6),?,?)")
sb.WriteString("(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,UTC_TIMESTAMP(6),'',?)")
msgHash := sha256Hex(item.Message)
rawEvents = append(rawEvents, RawEventInsert{
Message: item.Message,
SHA256: msgHash,
})
norm := NormalizeEventXML(item.Message)
@@ -3626,14 +3737,33 @@ INSERT INTO event_logs (
norm.SubStatusText,
norm.FailureReason,
item.Time.UTC(),
item.Message,
sha256Hex(item.Message),
msgHash,
)
}
if _, err := tx.ExecContext(ctx, sb.String(), args...); err != nil {
res, err := tx.ExecContext(ctx, sb.String(), args...)
if err != nil {
return err
}
firstID, err := res.LastInsertId()
if err != nil {
return fmt.Errorf("event_logs LastInsertId: %w", err)
}
affected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("event_logs RowsAffected: %w", err)
}
if affected != int64(len(batch)) {
return fmt.Errorf("event_logs insert affected %d rows, expected %d", affected, len(batch))
}
if err := insertRawEventsTx(ctx, tx, uint64(firstID), rawEvents); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
@@ -3644,6 +3774,42 @@ INSERT INTO event_logs (
return nil
}
func insertRawEventsTx(ctx context.Context, tx *sql.Tx, firstEventID uint64, rawEvents []RawEventInsert) error {
if len(rawEvents) == 0 {
return nil
}
var sb strings.Builder
args := make([]any, 0, len(rawEvents)*3)
sb.WriteString(`
INSERT INTO event_log_raw
(event_log_id, msg, msg_sha256)
VALUES
`)
for i, raw := range rawEvents {
if i > 0 {
sb.WriteString(",")
}
sb.WriteString("(?,?,?)")
args = append(args,
firstEventID+uint64(i),
raw.Message,
raw.SHA256,
)
}
_, err := tx.ExecContext(ctx, sb.String(), args...)
if err != nil {
return fmt.Errorf("insert event_log_raw: %w", err)
}
return nil
}
func (s *server) listDetections(ctx context.Context, host, rule, severity, status string, limit int) ([]Detection, error) {
base := `
SELECT id, rule_name, severity, hostname, channel_name, event_id, score,