App-Hang Fix Umbau auf Baseline und Event Buckets
All checks were successful
release-tag / release-image (push) Successful in 2m7s

This commit is contained in:
2026-04-30 07:49:12 +02:00
parent d93151746c
commit 1587991eda

399
main.go
View File

@@ -1479,6 +1479,17 @@ type RawEventInsert struct {
SHA256 string
}
type EventCountBucketAgg struct {
BucketStart time.Time
BucketEnd time.Time
Hostname string
Channel string
EventID uint32
Count uint64
FirstTS time.Time
LastTS time.Time
}
var (
httpRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{Name: "eventcollector_http_requests_total", Help: "Total HTTP requests."},
@@ -1688,8 +1699,9 @@ func main() {
s.templates = tmpl
go s.runSOCLoop()
go s.runBaselineLoop()
go s.runDetectionLoop()
go s.runBaselineLoop()
mux := http.NewServeMux()
mux.HandleFunc("/healthz", s.handleHealthz)
@@ -1746,6 +1758,72 @@ func main() {
}
}
func bucketStart(t time.Time, size time.Duration) time.Time {
t = t.UTC()
if size <= 0 {
size = 5 * time.Minute
}
unix := t.Unix()
bucketSeconds := int64(size.Seconds())
start := unix - (unix % bucketSeconds)
return time.Unix(start, 0).UTC()
}
func upsertEventCountBucketsTx(ctx context.Context, tx *sql.Tx, buckets map[string]*EventCountBucketAgg) error {
if len(buckets) == 0 {
return nil
}
var sb strings.Builder
args := make([]any, 0, len(buckets)*8)
sb.WriteString(`
INSERT INTO event_count_buckets
(bucket_start, bucket_end, hostname, channel_name, event_id, cnt, first_event_ts, last_event_ts)
VALUES
`)
i := 0
for _, b := range buckets {
if i > 0 {
sb.WriteString(",")
}
i++
sb.WriteString("(?,?,?,?,?,?,?,?)")
args = append(args,
b.BucketStart.UTC(),
b.BucketEnd.UTC(),
b.Hostname,
b.Channel,
b.EventID,
b.Count,
b.FirstTS.UTC(),
b.LastTS.UTC(),
)
}
sb.WriteString(`
ON DUPLICATE KEY UPDATE
cnt = cnt + VALUES(cnt),
first_event_ts = LEAST(first_event_ts, VALUES(first_event_ts)),
last_event_ts = GREATEST(last_event_ts, VALUES(last_event_ts)),
finalized = 0,
updated_at = UTC_TIMESTAMP(6)
`)
_, err := tx.ExecContext(ctx, sb.String(), args...)
if err != nil {
return fmt.Errorf("upsert event_count_buckets: %w", err)
}
return nil
}
func (s *server) runSOCLoop() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
@@ -1778,6 +1856,7 @@ func (s *server) runBaselineOnce() {
timeout time.Duration
fn func(context.Context) error
}{
{"baseline_finalize_buckets", 30 * time.Second, s.detector.finalizeEventCountBuckets},
{"baseline_anomaly", 120 * time.Second, s.detector.runBaselineAnomalyRule},
{"baseline_update", 120 * time.Second, s.detector.runBaselineUpdate},
}
@@ -3749,6 +3828,7 @@ INSERT INTO event_logs (
) VALUES
`)
bucketAgg := make(map[string]*EventCountBucketAgg)
rawEvents := make([]RawEventInsert, 0, len(batch))
for i, item := range batch {
@@ -3769,6 +3849,46 @@ INSERT INTO event_logs (
targetUser := normalizeUsername(norm.TargetUser)
subjectUser := normalizeUsername(norm.SubjectUser)
bucketSize := s.cfg.BaselineWindow
if bucketSize <= 0 {
bucketSize = 5 * time.Minute
}
bs := bucketStart(item.Time, bucketSize)
be := bs.Add(bucketSize)
bucketKey := fmt.Sprintf(
"%s|%s|%s|%d",
bs.Format(time.RFC3339Nano),
realHost,
item.Channel,
item.EventID,
)
agg := bucketAgg[bucketKey]
if agg == nil {
agg = &EventCountBucketAgg{
BucketStart: bs,
BucketEnd: be,
Hostname: realHost,
Channel: item.Channel,
EventID: item.EventID,
FirstTS: item.Time.UTC(),
LastTS: item.Time.UTC(),
}
bucketAgg[bucketKey] = agg
}
agg.Count++
eventTS := item.Time.UTC()
if eventTS.Before(agg.FirstTS) {
agg.FirstTS = eventTS
}
if eventTS.After(agg.LastTS) {
agg.LastTS = eventTS
}
// Erfolgreicher Logon
if item.Channel == "Security" && item.EventID == 4624 && targetUser != "" {
priv, err := s.detector.isPrivilegedUser(ctx, targetUser)
@@ -3853,6 +3973,10 @@ INSERT INTO event_logs (
return err
}
if err := upsertEventCountBucketsTx(ctx, tx, bucketAgg); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
@@ -3863,6 +3987,43 @@ INSERT INTO event_logs (
return nil
}
func (d *detector) finalizeEventCountBuckets(ctx context.Context) error {
_, err := d.db.ExecContext(ctx, `
UPDATE event_count_buckets
SET finalized = 1,
updated_at = UTC_TIMESTAMP(6)
WHERE finalized = 0
AND bucket_end < UTC_TIMESTAMP(6) - INTERVAL 2 MINUTE
`)
return err
}
func (d *detector) markBucketAnomalyChecked(ctx context.Context, bucketStart time.Time, hostname, channel string, eventID uint32) error {
_, err := d.db.ExecContext(ctx, `
UPDATE event_count_buckets
SET anomaly_checked_at = UTC_TIMESTAMP(6)
WHERE bucket_start = ?
AND hostname = ?
AND channel_name = ?
AND event_id = ?
`, bucketStart.UTC(), hostname, channel, eventID)
return err
}
func (d *detector) markBucketLearned(ctx context.Context, bucketStart time.Time, hostname, channel string, eventID uint32) error {
_, err := d.db.ExecContext(ctx, `
UPDATE event_count_buckets
SET learned_at = UTC_TIMESTAMP(6)
WHERE bucket_start = ?
AND hostname = ?
AND channel_name = ?
AND event_id = ?
`, bucketStart.UTC(), hostname, channel, eventID)
return err
}
func insertRawEventsTx(ctx context.Context, tx *sql.Tx, firstEventID uint64, rawEvents []RawEventInsert) error {
if len(rawEvents) == 0 {
return nil
@@ -3986,30 +4147,33 @@ func (d *detector) runBaselineUpdate(ctx context.Context) error {
return nil
}
windowEnd := time.Now().UTC()
windowStart := windowEnd.Add(-d.cfg.BaselineWindow)
hour := windowEnd.Hour()
dayOfWeek := int(windowEnd.Weekday()+6) % 7 // Go: Sonntag=0, MySQL WEEKDAY: Montag=0
rows, err := d.db.QueryContext(ctx, `
SELECT
bucket_start,
bucket_end,
hostname,
channel_name,
event_id,
COUNT(*) AS cnt
FROM event_logs
WHERE ts >= ? AND ts < ?
GROUP BY hostname, channel_name, event_id
`, windowStart, windowEnd)
cnt
FROM event_count_buckets
WHERE finalized = 1
AND learned_at IS NULL
ORDER BY bucket_start ASC
LIMIT 5000
`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var bucketStart time.Time
var bucketEnd time.Time
var b BaselineBucket
if err := rows.Scan(
&bucketStart,
&bucketEnd,
&b.Hostname,
&b.Channel,
&b.EventID,
@@ -4018,26 +4182,31 @@ GROUP BY hostname, channel_name, event_id
return err
}
b.Hour = hour
b.DayOfWeek = dayOfWeek
bucketStart = bucketStart.UTC()
bucketEnd = bucketEnd.UTC()
b.Hour = bucketStart.Hour()
b.DayOfWeek = mysqlWeekday(bucketStart)
excluded, err := d.isBaselineExcluded(ctx, b.Hostname, b.Channel, b.EventID)
if err != nil {
return err
}
if excluded {
continue
if !excluded {
incident, err := d.hasConfirmedIncidentInWindow(ctx, b.Hostname, b.Channel, b.EventID, bucketStart, bucketEnd)
if err != nil {
return err
}
if !incident {
if err := d.updateBaselineBucket(ctx, b); err != nil {
return err
}
}
}
incident, err := d.hasConfirmedIncidentInWindow(ctx, b.Hostname, b.Channel, b.EventID, windowStart, windowEnd)
if err != nil {
return err
}
if incident {
continue
}
if err := d.updateBaselineBucket(ctx, b); err != nil {
if err := d.markBucketLearned(ctx, bucketStart, b.Hostname, b.Channel, b.EventID); err != nil {
return err
}
}
@@ -4182,43 +4351,37 @@ func (d *detector) runBaselineAnomalyRule(ctx context.Context) error {
return nil
}
windowEnd := time.Now().UTC()
windowStart := windowEnd.Add(-d.cfg.BaselineWindow)
hour := windowEnd.Hour()
dayOfWeek := mysqlWeekday(windowEnd)
rows, err := d.db.QueryContext(ctx, `
SELECT
e.bucket_start,
e.bucket_end,
e.hostname,
e.channel_name,
e.event_id,
COUNT(*) AS cnt,
e.cnt,
COALESCE(b.avg_count, 0),
COALESCE(b.stddev_count, 0),
COALESCE(b.sample_count, 0)
FROM event_logs e
FROM event_count_buckets e
LEFT JOIN baseline_event_stats b
ON b.hostname = e.hostname
AND b.channel_name = e.channel_name
AND b.event_id = e.event_id
AND b.hour_of_day = ?
AND b.day_of_week = ?
WHERE e.ts >= ? AND e.ts < ?
GROUP BY
e.hostname,
e.channel_name,
e.event_id,
b.avg_count,
b.stddev_count,
b.sample_count
`, hour, dayOfWeek, windowStart, windowEnd)
AND b.hour_of_day = HOUR(e.bucket_start)
AND b.day_of_week = WEEKDAY(e.bucket_start)
WHERE e.finalized = 1
AND e.anomaly_checked_at IS NULL
ORDER BY e.bucket_start ASC
LIMIT 5000
`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var bucketStart time.Time
var bucketEnd time.Time
var host string
var channel string
var eventID uint32
@@ -4228,6 +4391,8 @@ GROUP BY
var samples int
if err := rows.Scan(
&bucketStart,
&bucketEnd,
&host,
&channel,
&eventID,
@@ -4239,6 +4404,9 @@ GROUP BY
return err
}
bucketStart = bucketStart.UTC()
bucketEnd = bucketEnd.UTC()
eventIDStr := strconv.Itoa(int(eventID))
d.baselineCurrentCountGauge.WithLabelValues(host, channel, eventIDStr).Set(float64(count))
@@ -4246,84 +4414,79 @@ GROUP BY
d.baselineStddevGauge.WithLabelValues(host, channel, eventIDStr).Set(stddev)
d.baselineSamplesGauge.WithLabelValues(host, channel, eventIDStr).Set(float64(samples))
if samples < d.cfg.BaselineMinSamples {
continue
if samples >= d.cfg.BaselineMinSamples &&
count >= d.cfg.BaselineMinCount &&
stddev > 0 {
z := (float64(count) - avg) / stddev
if z >= d.cfg.BaselineMediumZScore {
severity := "medium"
if z >= d.cfg.BaselineHighZScore {
severity = "high"
}
suppressed, err := d.isBaselineSuppressed(ctx, host, channel, eventID, bucketEnd)
if err != nil {
return err
}
if !suppressed {
created, err := d.insertDetection(ctx, Detection{
RuleName: "baseline_event_rate_anomaly",
Severity: severity,
Hostname: host,
Channel: channel,
EventID: eventID,
Score: z,
WindowStart: bucketStart,
WindowEnd: bucketEnd,
Summary: fmt.Sprintf(
"Baseline-Anomalie auf %s: %s EventID %d kam %d-mal im Bucket %s bis %s, normal %.2f ± %.2f, z=%.2f",
host,
channel,
eventID,
count,
bucketStart.Format(time.RFC3339),
bucketEnd.Format(time.RFC3339),
avg,
stddev,
z,
),
Details: mustJSON(map[string]any{
"hostname": host,
"channel": channel,
"event_id": eventID,
"count": count,
"avg_count": avg,
"stddev_count": stddev,
"z_score": z,
"sample_count": samples,
"hour_of_day": bucketStart.Hour(),
"day_of_week": mysqlWeekday(bucketStart),
"bucket_start": bucketStart.Format(time.RFC3339Nano),
"bucket_end": bucketEnd.Format(time.RFC3339Nano),
"window_minutes": int(d.cfg.BaselineWindow.Minutes()),
"min_samples": d.cfg.BaselineMinSamples,
"medium_z": d.cfg.BaselineMediumZScore,
"high_z": d.cfg.BaselineHighZScore,
}),
})
if err != nil {
return err
}
if created {
d.detectionHitsTotal.WithLabelValues("baseline_event_rate_anomaly", severity).Inc()
d.anomalyScoreGauge.WithLabelValues(host, "baseline_event_rate_anomaly").Set(z)
}
}
}
}
if count < d.cfg.BaselineMinCount {
continue
}
if stddev <= 0 {
continue
}
z := (float64(count) - avg) / stddev
if z < d.cfg.BaselineMediumZScore {
continue
}
severity := "medium"
if z >= d.cfg.BaselineHighZScore {
severity = "high"
}
suppressed, err := d.isBaselineSuppressed(ctx, host, channel, eventID, windowEnd)
if err != nil {
if err := d.markBucketAnomalyChecked(ctx, bucketStart, host, channel, eventID); err != nil {
return err
}
if suppressed {
continue
}
score := z
created, err := d.insertDetection(ctx, Detection{
RuleName: "baseline_event_rate_anomaly",
Severity: severity,
Hostname: host,
Channel: channel,
EventID: eventID,
Score: score,
WindowStart: windowStart,
WindowEnd: windowEnd,
Summary: fmt.Sprintf(
"Baseline-Anomalie auf %s: %s EventID %d kam %d-mal in %d Minuten, normal %.2f ± %.2f, z=%.2f",
host,
channel,
eventID,
count,
int(d.cfg.BaselineWindow.Minutes()),
avg,
stddev,
z,
),
Details: mustJSON(map[string]any{
"hostname": host,
"channel": channel,
"event_id": eventID,
"count": count,
"avg_count": avg,
"stddev_count": stddev,
"z_score": z,
"sample_count": samples,
"hour_of_day": hour,
"day_of_week": dayOfWeek,
"window_minutes": int(d.cfg.BaselineWindow.Minutes()),
"min_samples": d.cfg.BaselineMinSamples,
"medium_z": d.cfg.BaselineMediumZScore,
"high_z": d.cfg.BaselineHighZScore,
}),
})
if err != nil {
return err
}
if created {
d.detectionHitsTotal.WithLabelValues("baseline_event_rate_anomaly", severity).Inc()
d.anomalyScoreGauge.WithLabelValues(host, "baseline_event_rate_anomaly").Set(score)
}
}
return rows.Err()