From 1587991edad74c11bc4b3f83db52677dab7863ae Mon Sep 17 00:00:00 2001 From: groot Date: Thu, 30 Apr 2026 07:49:12 +0200 Subject: [PATCH] App-Hang Fix Umbau auf Baseline und Event Buckets --- main.go | 399 +++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 281 insertions(+), 118 deletions(-) diff --git a/main.go b/main.go index 72ac893..db10937 100644 --- a/main.go +++ b/main.go @@ -1479,6 +1479,17 @@ type RawEventInsert struct { SHA256 string } +type EventCountBucketAgg struct { + BucketStart time.Time + BucketEnd time.Time + Hostname string + Channel string + EventID uint32 + Count uint64 + FirstTS time.Time + LastTS time.Time +} + var ( httpRequestsTotal = prometheus.NewCounterVec( prometheus.CounterOpts{Name: "eventcollector_http_requests_total", Help: "Total HTTP requests."}, @@ -1688,8 +1699,9 @@ func main() { s.templates = tmpl go s.runSOCLoop() - go s.runBaselineLoop() + go s.runDetectionLoop() + go s.runBaselineLoop() mux := http.NewServeMux() mux.HandleFunc("/healthz", s.handleHealthz) @@ -1746,6 +1758,72 @@ func main() { } } +func bucketStart(t time.Time, size time.Duration) time.Time { + t = t.UTC() + + if size <= 0 { + size = 5 * time.Minute + } + + unix := t.Unix() + bucketSeconds := int64(size.Seconds()) + start := unix - (unix % bucketSeconds) + + return time.Unix(start, 0).UTC() +} + +func upsertEventCountBucketsTx(ctx context.Context, tx *sql.Tx, buckets map[string]*EventCountBucketAgg) error { + if len(buckets) == 0 { + return nil + } + + var sb strings.Builder + args := make([]any, 0, len(buckets)*8) + + sb.WriteString(` +INSERT INTO event_count_buckets +(bucket_start, bucket_end, hostname, channel_name, event_id, cnt, first_event_ts, last_event_ts) +VALUES +`) + + i := 0 + for _, b := range buckets { + if i > 0 { + sb.WriteString(",") + } + i++ + + sb.WriteString("(?,?,?,?,?,?,?,?)") + + args = append(args, + b.BucketStart.UTC(), + b.BucketEnd.UTC(), + b.Hostname, + b.Channel, + b.EventID, + b.Count, + b.FirstTS.UTC(), + b.LastTS.UTC(), + ) + } + + sb.WriteString(` +ON DUPLICATE KEY UPDATE + cnt = cnt + VALUES(cnt), + first_event_ts = LEAST(first_event_ts, VALUES(first_event_ts)), + last_event_ts = GREATEST(last_event_ts, VALUES(last_event_ts)), + finalized = 0, + updated_at = UTC_TIMESTAMP(6) +`) + + _, err := tx.ExecContext(ctx, sb.String(), args...) + if err != nil { + return fmt.Errorf("upsert event_count_buckets: %w", err) + } + + return nil +} + func (s *server) runSOCLoop() { ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() @@ -1778,6 +1856,7 @@ func (s *server) runBaselineOnce() { timeout time.Duration fn func(context.Context) error }{ + {"baseline_finalize_buckets", 30 * time.Second, s.detector.finalizeEventCountBuckets}, {"baseline_anomaly", 120 * time.Second, s.detector.runBaselineAnomalyRule}, {"baseline_update", 120 * time.Second, s.detector.runBaselineUpdate}, } @@ -3749,6 +3828,7 @@ INSERT INTO event_logs ( ) VALUES `) + bucketAgg := make(map[string]*EventCountBucketAgg) rawEvents := make([]RawEventInsert, 0, len(batch)) for i, item := range batch { @@ -3769,6 +3849,46 @@ INSERT INTO event_logs ( targetUser := normalizeUsername(norm.TargetUser) subjectUser := normalizeUsername(norm.SubjectUser) + bucketSize := s.cfg.BaselineWindow + if bucketSize <= 0 { + bucketSize = 5 * time.Minute + } + + bs := bucketStart(item.Time, bucketSize) + be := bs.Add(bucketSize) + + bucketKey := fmt.Sprintf( + "%s|%s|%s|%d", + bs.Format(time.RFC3339Nano), + realHost, + item.Channel, + item.EventID, + ) + + agg := bucketAgg[bucketKey] + if agg == nil { + agg = &EventCountBucketAgg{ + BucketStart: bs, + BucketEnd: be, + Hostname: realHost, + Channel: item.Channel, + EventID: item.EventID, + FirstTS: item.Time.UTC(), + LastTS: item.Time.UTC(), + } + bucketAgg[bucketKey] = agg + } + + agg.Count++ + + eventTS := item.Time.UTC() + if eventTS.Before(agg.FirstTS) { + agg.FirstTS = eventTS + } + if eventTS.After(agg.LastTS) { + agg.LastTS = eventTS + } + // Erfolgreicher Logon if item.Channel == "Security" && item.EventID == 4624 && targetUser != "" { priv, err := s.detector.isPrivilegedUser(ctx, targetUser) @@ -3853,6 +3973,10 @@ INSERT INTO event_logs ( return err } + if err := upsertEventCountBucketsTx(ctx, tx, bucketAgg); err != nil { + return err + } + if err := tx.Commit(); err != nil { return err } @@ -3863,6 +3987,43 @@ INSERT INTO event_logs ( return nil } +func (d *detector) finalizeEventCountBuckets(ctx context.Context) error { + _, err := d.db.ExecContext(ctx, ` +UPDATE event_count_buckets +SET finalized = 1, + updated_at = UTC_TIMESTAMP(6) +WHERE finalized = 0 + AND bucket_end < UTC_TIMESTAMP(6) - INTERVAL 2 MINUTE +`) + return err +} + +func (d *detector) markBucketAnomalyChecked(ctx context.Context, bucketStart time.Time, hostname, channel string, eventID uint32) error { + _, err := d.db.ExecContext(ctx, ` +UPDATE event_count_buckets +SET anomaly_checked_at = UTC_TIMESTAMP(6) +WHERE bucket_start = ? + AND hostname = ? + AND channel_name = ? + AND event_id = ? +`, bucketStart.UTC(), hostname, channel, eventID) + + return err +} + +func (d *detector) markBucketLearned(ctx context.Context, bucketStart time.Time, hostname, channel string, eventID uint32) error { + _, err := d.db.ExecContext(ctx, ` +UPDATE event_count_buckets +SET learned_at = UTC_TIMESTAMP(6) +WHERE bucket_start = ? + AND hostname = ? + AND channel_name = ? + AND event_id = ? +`, bucketStart.UTC(), hostname, channel, eventID) + + return err +} + func insertRawEventsTx(ctx context.Context, tx *sql.Tx, firstEventID uint64, rawEvents []RawEventInsert) error { if len(rawEvents) == 0 { return nil @@ -3986,30 +4147,33 @@ func (d *detector) runBaselineUpdate(ctx context.Context) error { return nil } - windowEnd := time.Now().UTC() - windowStart := windowEnd.Add(-d.cfg.BaselineWindow) - - hour := windowEnd.Hour() - dayOfWeek := int(windowEnd.Weekday()+6) % 7 // Go: Sonntag=0, MySQL WEEKDAY: Montag=0 - rows, err := d.db.QueryContext(ctx, ` SELECT + bucket_start, + bucket_end, hostname, channel_name, event_id, - COUNT(*) AS cnt -FROM event_logs -WHERE ts >= ? AND ts < ? -GROUP BY hostname, channel_name, event_id -`, windowStart, windowEnd) + cnt +FROM event_count_buckets +WHERE finalized = 1 + AND learned_at IS NULL +ORDER BY bucket_start ASC +LIMIT 5000 +`) if err != nil { return err } defer rows.Close() for rows.Next() { + var bucketStart time.Time + var bucketEnd time.Time var b BaselineBucket + if err := rows.Scan( + &bucketStart, + &bucketEnd, &b.Hostname, &b.Channel, &b.EventID, @@ -4018,26 +4182,31 @@ GROUP BY hostname, channel_name, event_id return err } - b.Hour = hour - b.DayOfWeek = dayOfWeek + bucketStart = bucketStart.UTC() + bucketEnd = bucketEnd.UTC() + + b.Hour = bucketStart.Hour() + b.DayOfWeek = mysqlWeekday(bucketStart) excluded, err := d.isBaselineExcluded(ctx, b.Hostname, b.Channel, b.EventID) if err != nil { return err } - if excluded { - continue + + if !excluded { + incident, err := d.hasConfirmedIncidentInWindow(ctx, b.Hostname, b.Channel, b.EventID, bucketStart, bucketEnd) + if err != nil { + return err + } + + if !incident { + if err := d.updateBaselineBucket(ctx, b); err != nil { + return err + } + } } - incident, err := d.hasConfirmedIncidentInWindow(ctx, b.Hostname, b.Channel, b.EventID, windowStart, windowEnd) - if err != nil { - return err - } - if incident { - continue - } - - if err := d.updateBaselineBucket(ctx, b); err != nil { + if err := d.markBucketLearned(ctx, bucketStart, b.Hostname, b.Channel, b.EventID); err != nil { return err } } @@ -4182,43 +4351,37 @@ func (d *detector) runBaselineAnomalyRule(ctx context.Context) error { return nil } - windowEnd := time.Now().UTC() - windowStart := windowEnd.Add(-d.cfg.BaselineWindow) - - hour := windowEnd.Hour() - dayOfWeek := mysqlWeekday(windowEnd) - rows, err := d.db.QueryContext(ctx, ` SELECT + e.bucket_start, + e.bucket_end, e.hostname, e.channel_name, e.event_id, - COUNT(*) AS cnt, + e.cnt, COALESCE(b.avg_count, 0), COALESCE(b.stddev_count, 0), COALESCE(b.sample_count, 0) -FROM event_logs e +FROM event_count_buckets e LEFT JOIN baseline_event_stats b ON b.hostname = e.hostname AND b.channel_name = e.channel_name AND b.event_id = e.event_id - AND b.hour_of_day = ? - AND b.day_of_week = ? -WHERE e.ts >= ? AND e.ts < ? -GROUP BY - e.hostname, - e.channel_name, - e.event_id, - b.avg_count, - b.stddev_count, - b.sample_count -`, hour, dayOfWeek, windowStart, windowEnd) + AND b.hour_of_day = HOUR(e.bucket_start) + AND b.day_of_week = WEEKDAY(e.bucket_start) +WHERE e.finalized = 1 + AND e.anomaly_checked_at IS NULL +ORDER BY e.bucket_start ASC +LIMIT 5000 +`) if err != nil { return err } defer rows.Close() for rows.Next() { + var bucketStart time.Time + var bucketEnd time.Time var host string var channel string var eventID uint32 @@ -4228,6 +4391,8 @@ GROUP BY var samples int if err := rows.Scan( + &bucketStart, + &bucketEnd, &host, &channel, &eventID, @@ -4239,6 +4404,9 @@ GROUP BY return err } + bucketStart = bucketStart.UTC() + bucketEnd = bucketEnd.UTC() + eventIDStr := strconv.Itoa(int(eventID)) d.baselineCurrentCountGauge.WithLabelValues(host, channel, eventIDStr).Set(float64(count)) @@ -4246,84 +4414,79 @@ GROUP BY d.baselineStddevGauge.WithLabelValues(host, channel, eventIDStr).Set(stddev) d.baselineSamplesGauge.WithLabelValues(host, channel, eventIDStr).Set(float64(samples)) - if samples < d.cfg.BaselineMinSamples { - continue + if samples >= d.cfg.BaselineMinSamples && + count >= d.cfg.BaselineMinCount && + stddev > 0 { + + z := (float64(count) - avg) / stddev + + if z >= d.cfg.BaselineMediumZScore { + severity := "medium" + if z >= d.cfg.BaselineHighZScore { + severity = "high" + } + + suppressed, err := d.isBaselineSuppressed(ctx, host, channel, eventID, bucketEnd) + if err != nil { + return err + } + + if !suppressed { + created, err := d.insertDetection(ctx, Detection{ + RuleName: "baseline_event_rate_anomaly", + Severity: severity, + Hostname: host, + Channel: channel, + EventID: eventID, + Score: z, + WindowStart: bucketStart, + WindowEnd: bucketEnd, + Summary: fmt.Sprintf( + "Baseline-Anomalie auf %s: %s EventID %d kam %d-mal im Bucket %s bis %s, normal %.2f ± %.2f, z=%.2f", + host, + channel, + eventID, + count, + bucketStart.Format(time.RFC3339), + bucketEnd.Format(time.RFC3339), + avg, + stddev, + z, + ), + Details: mustJSON(map[string]any{ + "hostname": host, + "channel": channel, + "event_id": eventID, + "count": count, + "avg_count": avg, + "stddev_count": stddev, + "z_score": z, + "sample_count": samples, + "hour_of_day": bucketStart.Hour(), + "day_of_week": mysqlWeekday(bucketStart), + "bucket_start": bucketStart.Format(time.RFC3339Nano), + "bucket_end": bucketEnd.Format(time.RFC3339Nano), + "window_minutes": int(d.cfg.BaselineWindow.Minutes()), + "min_samples": d.cfg.BaselineMinSamples, + "medium_z": d.cfg.BaselineMediumZScore, + "high_z": d.cfg.BaselineHighZScore, + }), + }) + if err != nil { + return err + } + + if created { + d.detectionHitsTotal.WithLabelValues("baseline_event_rate_anomaly", severity).Inc() + d.anomalyScoreGauge.WithLabelValues(host, "baseline_event_rate_anomaly").Set(z) + } + } + } } - if count < d.cfg.BaselineMinCount { - continue - } - - if stddev <= 0 { - continue - } - - z := (float64(count) - avg) / stddev - - if z < d.cfg.BaselineMediumZScore { - continue - } - - severity := "medium" - if z >= d.cfg.BaselineHighZScore { - severity = "high" - } - - suppressed, err := d.isBaselineSuppressed(ctx, host, channel, eventID, windowEnd) - if err != nil { + if err := d.markBucketAnomalyChecked(ctx, bucketStart, host, channel, eventID); err != nil { return err } - if suppressed { - continue - } - - score := z - - created, err := d.insertDetection(ctx, Detection{ - RuleName: "baseline_event_rate_anomaly", - Severity: severity, - Hostname: host, - Channel: channel, - EventID: eventID, - Score: score, - WindowStart: windowStart, - WindowEnd: windowEnd, - Summary: fmt.Sprintf( - "Baseline-Anomalie auf %s: %s EventID %d kam %d-mal in %d Minuten, normal %.2f ± %.2f, z=%.2f", - host, - channel, - eventID, - count, - int(d.cfg.BaselineWindow.Minutes()), - avg, - stddev, - z, - ), - Details: mustJSON(map[string]any{ - "hostname": host, - "channel": channel, - "event_id": eventID, - "count": count, - "avg_count": avg, - "stddev_count": stddev, - "z_score": z, - "sample_count": samples, - "hour_of_day": hour, - "day_of_week": dayOfWeek, - "window_minutes": int(d.cfg.BaselineWindow.Minutes()), - "min_samples": d.cfg.BaselineMinSamples, - "medium_z": d.cfg.BaselineMediumZScore, - "high_z": d.cfg.BaselineHighZScore, - }), - }) - if err != nil { - return err - } - - if created { - d.detectionHitsTotal.WithLabelValues("baseline_event_rate_anomaly", severity).Inc() - d.anomalyScoreGauge.WithLabelValues(host, "baseline_event_rate_anomaly").Set(score) - } } return rows.Err()