diff --git a/deploy/mariadb/init/001-schema.sql b/deploy/mariadb/init/001-schema.sql index 0a706a9..b623b82 100644 --- a/deploy/mariadb/init/001-schema.sql +++ b/deploy/mariadb/init/001-schema.sql @@ -1809,4 +1809,108 @@ CREATE TABLE IF NOT EXISTS user_source_ip_seen ( last_seen DATETIME(6) NOT NULL, seen_count BIGINT NOT NULL DEFAULT 1, PRIMARY KEY (username, src_ip, hostname) -); \ No newline at end of file +); + +### + +ALTER TABLE detections + ADD INDEX idx_detections_created_at (created_at), + ADD INDEX idx_detections_status_created (status, created_at), + ADD INDEX idx_detections_severity_created (severity, created_at), + ADD INDEX idx_detections_host_created (hostname, created_at), + ADD INDEX idx_detections_rule_created (rule_name, created_at), + ADD INDEX idx_detections_host_rule_created (hostname, rule_name, created_at), + ADD INDEX idx_detections_host_status_created (hostname, status, created_at), + ADD INDEX idx_detections_risk_window (created_at, status, hostname, severity), + ADD INDEX idx_detections_window_lookup (hostname, window_start, window_end); + +ALTER TABLE ueba_user_baseline + ADD UNIQUE KEY uq_ueba_user_context (username, hostname, src_ip, workstation), + ADD INDEX idx_ueba_user_last_seen (username, last_seen), + ADD INDEX idx_ueba_host_last_seen (hostname, last_seen); + +ALTER TABLE user_source_ip_seen + ADD UNIQUE KEY uq_user_source_ip_host (username, src_ip, hostname), + ADD INDEX idx_user_source_ip_last_seen (username, src_ip, last_seen); + +ALTER TABLE user_privilege_baseline + ADD UNIQUE KEY uq_user_privilege_username (username); + +ALTER TABLE baseline_event_stats + ADD UNIQUE KEY uq_baseline_bucket ( + hostname, + channel_name, + event_id, + hour_of_day, + day_of_week + ); + +ALTER TABLE detection_suppressions + ADD INDEX idx_detection_suppressions_lookup ( + enabled, + rule_name, + hostname, + channel_name, + event_id, + expires_at + ); + +ALTER TABLE baseline_exclusions + ADD INDEX idx_baseline_exclusions_lookup ( + enabled, + hostname, + channel_name, + event_id, + expires_at + ); + +ALTER TABLE detections + ADD INDEX idx_detections_status_created2 (status, created_at), + ADD INDEX idx_detections_severity_created2 (severity, created_at); + +### + + CREATE TABLE IF NOT EXISTS event_log_raw ( + event_log_id BIGINT UNSIGNED NOT NULL, + msg MEDIUMTEXT NOT NULL, + msg_sha256 CHAR(64) NOT NULL, + created_at DATETIME(6) NOT NULL DEFAULT (UTC_TIMESTAMP(6)), + PRIMARY KEY (event_log_id), + INDEX idx_event_log_raw_sha256 (msg_sha256), + INDEX idx_event_log_raw_created_at (created_at) +); + +ALTER TABLE event_log_raw + ADD CONSTRAINT fk_event_log_raw_event + FOREIGN KEY (event_log_id) + REFERENCES event_logs(id) + ON DELETE CASCADE; + +######################## MIGRATION ############################ +INSERT INTO event_log_raw (event_log_id, msg, msg_sha256, created_at) +SELECT id, msg, msg_sha256, COALESCE(received_at, UTC_TIMESTAMP(6)) +FROM event_logs +WHERE msg IS NOT NULL + AND msg <> '' +ON DUPLICATE KEY UPDATE + msg = VALUES(msg), + msg_sha256 = VALUES(msg_sha256); +######################## MIGRATION ############################ + + +######################## TEST ################################# +SELECT COUNT(*) AS raw_rows FROM event_log_raw; +SELECT COUNT(*) AS event_rows_with_msg FROM event_logs WHERE msg IS NOT NULL AND msg <> ''; +######################## TEST ################################# + +######################## MIGRATION ############################ +UPDATE event_logs +SET msg = '' +WHERE msg IS NOT NULL + AND msg <> '' +LIMIT 10000; +######################## MIGRATION ############################ + +######################## TEST ################################# +SELECT COUNT(*) FROM event_logs WHERE msg IS NOT NULL AND msg <> ''; +######################## TEST ################################# \ No newline at end of file diff --git a/main.go b/main.go index 7047b7d..47cfb77 100644 --- a/main.go +++ b/main.go @@ -1474,6 +1474,11 @@ type PrivilegedUsersPageData struct { Users []PrivilegedUserRow } +type RawEventInsert struct { + Message string + SHA256 string +} + var ( httpRequestsTotal = prometheus.NewCounterVec( prometheus.CounterOpts{Name: "eventcollector_http_requests_total", Help: "Total HTTP requests."}, @@ -2062,15 +2067,85 @@ LIMIT ? return out, rows.Err() } +func (s *server) getRawEventXMLByID(ctx context.Context, id uint64) (string, error) { + var msg string + + err := s.db.QueryRowContext(ctx, ` +SELECT msg +FROM event_log_raw +WHERE event_log_id = ? +LIMIT 1 +`, id).Scan(&msg) + + if err == nil { + return msg, nil + } + + if !errors.Is(err, sql.ErrNoRows) { + return "", err + } + + // Fallback für Alt-Daten, falls event_log_raw noch nicht vollständig befüllt ist. + err = s.db.QueryRowContext(ctx, ` +SELECT COALESCE(msg, '') +FROM event_logs +WHERE id = ? +LIMIT 1 +`, id).Scan(&msg) + + if err != nil { + return "", err + } + + return msg, nil +} + +func eventListSummary(ev EventRow) string { + user := firstNonEmpty(ev.TargetUser, ev.SubjectUser) + parts := []string{ + fmt.Sprintf("%s EventID %d", ev.Channel, ev.EventID), + } + + if user != "" { + parts = append(parts, "User="+user) + } + if ev.SrcIP != "" { + parts = append(parts, "IP="+ev.SrcIP) + } + if ev.Workstation != "" { + parts = append(parts, "Workstation="+ev.Workstation) + } + if ev.ProcessName != "" { + parts = append(parts, "Process="+ev.ProcessName) + } + if ev.FailureReason != "" { + parts = append(parts, "Reason="+ev.FailureReason) + } + + return strings.Join(parts, " | ") +} + func (s *server) listSOCRecentIncidents(ctx context.Context, limit int) ([]SOCRecentIncidentRow, error) { rows, err := s.db.QueryContext(ctx, ` SELECT id, created_at, rule_name, severity, status, hostname, summary -FROM detections -WHERE status IN ('open', 'investigating', 'confirmed_incident') - OR severity IN ('high', 'critical') +FROM ( + SELECT id, created_at, rule_name, severity, status, hostname, summary + FROM detections + WHERE status IN ('open', 'investigating', 'confirmed_incident') + ORDER BY created_at DESC + LIMIT ? + + UNION DISTINCT + + SELECT id, created_at, rule_name, severity, status, hostname, summary + FROM detections + WHERE severity IN ('high', 'critical') + ORDER BY created_at DESC + LIMIT ? +) x ORDER BY created_at DESC LIMIT ? -`, limit) +`, limit, limit, limit) if err != nil { return nil, err } @@ -2090,8 +2165,10 @@ LIMIT ? ); err != nil { return nil, err } + r.CreatedAt = normalizeTime(r.CreatedAt) out = append(out, r) } + return out, rows.Err() } @@ -2921,6 +2998,13 @@ func (s *server) handleUIEvents(w http.ResponseWriter, r *http.Request) { return } + timeFrom := strings.TrimSpace(r.URL.Query().Get("from")) + timeTo := strings.TrimSpace(r.URL.Query().Get("to")) + + if timeFrom == "" && timeTo == "" { + timeFrom = time.Now().UTC().Add(-24 * time.Hour).Format(time.RFC3339) + } + filter := EventFilter{ Host: strings.TrimSpace(r.URL.Query().Get("host")), Channel: strings.TrimSpace(r.URL.Query().Get("channel")), @@ -2928,8 +3012,8 @@ func (s *server) handleUIEvents(w http.ResponseWriter, r *http.Request) { User: strings.TrimSpace(r.URL.Query().Get("user")), SrcIP: strings.TrimSpace(r.URL.Query().Get("src_ip")), Severity: strings.TrimSpace(r.URL.Query().Get("severity")), - TimeFrom: strings.TrimSpace(r.URL.Query().Get("from")), - TimeTo: strings.TrimSpace(r.URL.Query().Get("to")), + TimeFrom: timeFrom, + TimeTo: timeTo, Limit: 100, } @@ -3109,7 +3193,7 @@ SELECT id, hostname, channel_name, event_id, source, computer, provider_name, target_user, target_domain, subject_user, subject_domain, workstation, src_ip, src_port, logon_type, process_name, authentication_package, logon_process, status_text, sub_status_text, - failure_reason, ts, received_at, msg + failure_reason, ts, received_at FROM event_logs WHERE 1=1 ` @@ -3151,12 +3235,13 @@ WHERE 1=1 if f.Rule != "" || f.Severity != "" { query += ` AND EXISTS ( - SELECT 1 - FROM detections d - WHERE d.hostname = event_logs.hostname - AND event_logs.ts >= d.window_start - AND event_logs.ts <= d.window_end - ` + SELECT 1 + FROM detections d + WHERE d.hostname = event_logs.hostname + AND event_logs.ts >= d.window_start + AND event_logs.ts <= d.window_end + AND d.created_at >= DATE_SUB(event_logs.ts, INTERVAL 1 DAY) + ` if f.Rule != "" { query += ` AND d.rule_name = ?` args = append(args, f.Rule) @@ -3186,10 +3271,15 @@ WHERE 1=1 &ev.TargetUser, &ev.TargetDomain, &ev.SubjectUser, &ev.SubjectDomain, &ev.Workstation, &ev.SrcIP, &ev.SrcPort, &ev.LogonType, &ev.ProcessName, &ev.AuthenticationPackage, &ev.LogonProcess, &ev.StatusText, &ev.SubStatusText, - &ev.FailureReason, &ev.Time, &ev.ReceivedAt, &ev.Message, + &ev.FailureReason, &ev.Time, &ev.ReceivedAt, ); err != nil { return nil, err } + + ev.Time = normalizeTime(ev.Time) + ev.ReceivedAt = normalizeTime(ev.ReceivedAt) + ev.Message = eventListSummary(ev) + out = append(out, ev) } @@ -3202,7 +3292,7 @@ SELECT id, hostname, channel_name, event_id, source, computer, provider_name, target_user, target_domain, subject_user, subject_domain, workstation, src_ip, src_port, logon_type, process_name, authentication_package, logon_process, status_text, sub_status_text, - failure_reason, ts, received_at, msg + failure_reason, ts, received_at FROM event_logs WHERE id = ? LIMIT 1 @@ -3211,13 +3301,26 @@ LIMIT 1 var ev EventRow err := s.db.QueryRowContext(ctx, q, id).Scan( &ev.ID, &ev.Hostname, &ev.Channel, &ev.EventID, &ev.Source, - &ev.Computer, &ev.ProviderName, - &ev.TargetUser, &ev.TargetDomain, &ev.SubjectUser, &ev.SubjectDomain, - &ev.Workstation, &ev.SrcIP, &ev.SrcPort, &ev.LogonType, &ev.ProcessName, - &ev.AuthenticationPackage, &ev.LogonProcess, &ev.StatusText, &ev.SubStatusText, - &ev.FailureReason, &ev.Time, &ev.ReceivedAt, &ev.Message, + &ev.Computer, &ev.ProviderName, &ev.TargetUser, &ev.TargetDomain, + &ev.SubjectUser, &ev.SubjectDomain, &ev.Workstation, &ev.SrcIP, + &ev.SrcPort, &ev.LogonType, &ev.ProcessName, &ev.AuthenticationPackage, + &ev.LogonProcess, &ev.StatusText, &ev.SubStatusText, + &ev.FailureReason, &ev.Time, &ev.ReceivedAt, ) - return ev, err + if err != nil { + return ev, err + } + + ev.Time = normalizeTime(ev.Time) + ev.ReceivedAt = normalizeTime(ev.ReceivedAt) + + raw, err := s.getRawEventXMLByID(ctx, id) + if err != nil { + return ev, err + } + + ev.Message = raw + return ev, nil } func parseUIRFC3339(v string) (time.Time, error) { @@ -3557,11 +3660,19 @@ INSERT INTO event_logs ( ) VALUES `) + rawEvents := make([]RawEventInsert, 0, len(batch)) + for i, item := range batch { if i > 0 { sb.WriteString(",") } - sb.WriteString("(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,UTC_TIMESTAMP(6),?,?)") + sb.WriteString("(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,UTC_TIMESTAMP(6),'',?)") + + msgHash := sha256Hex(item.Message) + rawEvents = append(rawEvents, RawEventInsert{ + Message: item.Message, + SHA256: msgHash, + }) norm := NormalizeEventXML(item.Message) @@ -3626,14 +3737,33 @@ INSERT INTO event_logs ( norm.SubStatusText, norm.FailureReason, item.Time.UTC(), - item.Message, - sha256Hex(item.Message), + msgHash, ) } - if _, err := tx.ExecContext(ctx, sb.String(), args...); err != nil { + res, err := tx.ExecContext(ctx, sb.String(), args...) + if err != nil { return err } + + firstID, err := res.LastInsertId() + if err != nil { + return fmt.Errorf("event_logs LastInsertId: %w", err) + } + + affected, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("event_logs RowsAffected: %w", err) + } + + if affected != int64(len(batch)) { + return fmt.Errorf("event_logs insert affected %d rows, expected %d", affected, len(batch)) + } + + if err := insertRawEventsTx(ctx, tx, uint64(firstID), rawEvents); err != nil { + return err + } + if err := tx.Commit(); err != nil { return err } @@ -3644,6 +3774,42 @@ INSERT INTO event_logs ( return nil } +func insertRawEventsTx(ctx context.Context, tx *sql.Tx, firstEventID uint64, rawEvents []RawEventInsert) error { + if len(rawEvents) == 0 { + return nil + } + + var sb strings.Builder + args := make([]any, 0, len(rawEvents)*3) + + sb.WriteString(` +INSERT INTO event_log_raw +(event_log_id, msg, msg_sha256) +VALUES +`) + + for i, raw := range rawEvents { + if i > 0 { + sb.WriteString(",") + } + + sb.WriteString("(?,?,?)") + + args = append(args, + firstEventID+uint64(i), + raw.Message, + raw.SHA256, + ) + } + + _, err := tx.ExecContext(ctx, sb.String(), args...) + if err != nil { + return fmt.Errorf("insert event_log_raw: %w", err) + } + + return nil +} + func (s *server) listDetections(ctx context.Context, host, rule, severity, status string, limit int) ([]Detection, error) { base := ` SELECT id, rule_name, severity, hostname, channel_name, event_id, score,