package main
import (
"context"
"crypto/sha256"
"database/sql"
"encoding/hex"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"html/template"
"io"
"log"
"math"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
const uiTemplates = `
{{define "header"}}
{{.Title}}
{{end}}
{{define "footer"}}
{{end}}
{{define "dashboard"}}
{{template "header" .}}
{{.Title}}
Stand: {{fmtTime .Now}}
Agents gesamt
{{.Stats.AgentsTotal}}
Agents aktiv
{{.Stats.AgentsActive}}
Events 24h
{{.Stats.Events24h}}
Detections 24h
{{.Stats.Detections24h}}
High Detections 24h
{{.Stats.HighDetections24h}}
Neueste Detections
| Zeit | Rule | Severity | Host | Zusammenfassung |
{{range .RecentDetections}}
| {{fmtTime .CreatedAt}} |
{{.RuleName}} |
{{.Severity}} |
{{.Hostname}} |
{{.Summary}} |
{{end}}
Neueste Events
| Zeit | Host | Channel | EventID | User | IP | Nachricht |
{{range .RecentEvents}}
| {{fmtTime .Time}} |
{{.Hostname}} |
{{.Channel}} |
{{.EventID}} |
{{if .TargetUser}}{{.TargetUser}}{{else}}{{.SubjectUser}}{{end}} |
{{.SrcIP}} |
{{short .Message 120}} |
{{end}}
{{template "footer" .}}
{{end}}
{{define "detections"}}
{{template "header" .}}
{{.Title}}
| Zeit | Rule | Severity | Host | Score | Summary | Events |
{{range .Detections}}
| {{fmtTime .CreatedAt}} |
{{.RuleName}} |
{{.Severity}} |
{{.Hostname}} |
{{printf "%.2f" .Score}} |
{{.Summary}} |
anzeigen |
{{end}}
{{template "footer" .}}
{{end}}
{{define "events"}}
{{template "header" .}}
{{.Title}}
| Zeit | Host | Channel | EventID | Target User | Subject User | IP | Workstation | Detail |
{{range .Events}}
| {{fmtTime .Time}} |
{{.Hostname}} |
{{.Channel}} |
{{.EventID}} |
{{.TargetUser}} |
{{.SubjectUser}} |
{{.SrcIP}} |
{{.Workstation}} |
öffnen |
{{end}}
{{template "footer" .}}
{{end}}
{{define "event_detail"}}
{{template "header" .}}
{{.Title}}
Channel{{.Event.Channel}}
EventID{{.Event.EventID}}
Zeit{{fmtTime .Event.Time}}
Target User{{.Event.TargetUser}}
Subject User{{.Event.SubjectUser}}
Source IP{{.Event.SrcIP}}
Workstation{{.Event.Workstation}}
Logon Type{{.Event.LogonType}}
Process{{.Event.ProcessName}}
Status{{.Event.StatusText}}
SubStatus{{.Event.SubStatusText}}
Rohes Event XML
{{.Event.Message}}
{{template "footer" .}}
{{end}}
`
type Config struct {
ListenAddr string
DBDSN string
MaxBodyBytes int64
HTTPReadTimeout time.Duration
HTTPWriteTimeout time.Duration
HTTPIdleTimeout time.Duration
DBMaxOpenConns int
DBMaxIdleConns int
DBConnMaxLifetime time.Duration
DBConnMaxIdleTime time.Duration
DetectionInterval time.Duration
OfflineAfter time.Duration
FailedLogonWindow time.Duration
FailedLogonThreshold int
RebootWindow time.Duration
RebootThreshold int
PasswordSprayWindow time.Duration
PasswordSprayMinUsers int
PasswordSprayMinAttempts int
SuccessAfterFailureWindow time.Duration
NewSourceIPLookback time.Duration
NewSourceIPWindow time.Duration
DetectionsLimit int
EnrollmentKey string
}
type LogPayload struct {
Hostname string `json:"host"`
Channel string `json:"channel"`
EventID uint32 `json:"id"`
Source string `json:"source"`
Time time.Time `json:"ts"`
Message string `json:"msg"`
}
type NormalizedEvent struct {
Computer string
ProviderName string
LevelValue uint32
TaskValue uint32
OpcodeValue uint32
Keywords string
TargetUser string
TargetDomain string
SubjectUser string
SubjectDomain string
Workstation string
SrcIP string
SrcPort string
LogonType string
ProcessName string
AuthenticationPackage string
LogonProcess string
StatusText string
SubStatusText string
FailureReason string
}
type Detection struct {
ID uint64 `json:"id"`
RuleName string `json:"rule_name"`
Severity string `json:"severity"`
Hostname string `json:"hostname"`
Channel string `json:"channel"`
EventID uint32 `json:"event_id"`
Score float64 `json:"score"`
WindowStart time.Time `json:"window_start"`
WindowEnd time.Time `json:"window_end"`
Summary string `json:"summary"`
Details json.RawMessage `json:"details_json"`
CreatedAt time.Time `json:"created_at"`
}
type ingestResponse struct {
Accepted int `json:"accepted"`
}
type server struct {
db *sql.DB
logger *log.Logger
cfg Config
registry *prometheus.Registry
detector *detector
startTime time.Time
templates *template.Template
}
type detector struct {
db *sql.DB
cfg Config
logger *log.Logger
lastSeenGauge *prometheus.GaugeVec
activeAgentsGauge prometheus.Gauge
anomalyScoreGauge *prometheus.GaugeVec
detectionHitsTotal *prometheus.CounterVec
ruleLastRunGauge *prometheus.GaugeVec
ruleRuntimeHist *prometheus.HistogramVec
ruleErrorsTotal *prometheus.CounterVec
}
type EventRow struct {
ID uint64
Hostname string
Channel string
EventID uint32
Source string
Computer string
ProviderName string
TargetUser string
TargetDomain string
SubjectUser string
SubjectDomain string
Workstation string
SrcIP string
SrcPort string
LogonType string
ProcessName string
AuthenticationPackage string
LogonProcess string
StatusText string
SubStatusText string
FailureReason string
Time time.Time
ReceivedAt time.Time
Message string
}
type DashboardStats struct {
AgentsTotal int
AgentsActive int
Events24h int64
Detections24h int64
HighDetections24h int64
}
type DashboardPageData struct {
Title string
Now time.Time
Stats DashboardStats
RecentDetections []Detection
RecentEvents []EventRow
}
type DetectionListPageData struct {
Title string
Now time.Time
Filters map[string]string
Detections []Detection
}
type EventListPageData struct {
Title string
Now time.Time
Filters map[string]string
Events []EventRow
}
type EventDetailPageData struct {
Title string
Now time.Time
Event EventRow
}
var (
httpRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{Name: "eventcollector_http_requests_total", Help: "Total HTTP requests."},
[]string{"path", "method", "status"},
)
httpRequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{Name: "eventcollector_http_request_duration_seconds", Help: "HTTP request latency.", Buckets: prometheus.DefBuckets},
[]string{"path", "method", "status"},
)
ingestBatchesTotal = prometheus.NewCounter(
prometheus.CounterOpts{Name: "eventcollector_ingest_batches_total", Help: "Total ingested batches."},
)
ingestEventsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{Name: "eventcollector_ingest_events_total", Help: "Total ingested events."},
[]string{"channel", "event_id"},
)
ingestRejectedTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{Name: "eventcollector_ingest_rejected_total", Help: "Rejected ingest requests."},
[]string{"reason"},
)
dbInsertEventsTotal = prometheus.NewCounter(
prometheus.CounterOpts{Name: "eventcollector_db_insert_events_total", Help: "Total inserted events into database."},
)
dbInsertFailuresTotal = prometheus.NewCounter(
prometheus.CounterOpts{Name: "eventcollector_db_insert_failures_total", Help: "Failed database insert operations."},
)
dbBatchSizeHist = prometheus.NewHistogram(
prometheus.HistogramOpts{Name: "eventcollector_db_batch_size", Help: "Batch sizes written to database.", Buckets: []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000}},
)
dbTxDurationHist = prometheus.NewHistogram(
prometheus.HistogramOpts{Name: "eventcollector_db_tx_duration_seconds", Help: "Database transaction duration.", Buckets: prometheus.DefBuckets},
)
)
func main() {
cfg := loadConfig()
logger := log.New(os.Stdout, "", log.LstdFlags|log.Lmicroseconds|log.LUTC)
db, err := sql.Open("mysql", cfg.DBDSN)
if err != nil {
logger.Fatalf("sql.Open: %v", err)
}
db.SetMaxOpenConns(cfg.DBMaxOpenConns)
db.SetMaxIdleConns(cfg.DBMaxIdleConns)
db.SetConnMaxLifetime(cfg.DBConnMaxLifetime)
db.SetConnMaxIdleTime(cfg.DBConnMaxIdleTime)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
logger.Fatalf("db.PingContext: %v", err)
}
reg := prometheus.NewRegistry()
reg.MustRegister(
httpRequestsTotal,
httpRequestDuration,
ingestBatchesTotal,
ingestEventsTotal,
ingestRejectedTotal,
dbInsertEventsTotal,
dbInsertFailuresTotal,
dbBatchSizeHist,
dbTxDurationHist,
)
d := &detector{
db: db,
cfg: cfg,
logger: logger,
lastSeenGauge: prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: "eventcollector_agent_last_seen_unixtime", Help: "Unix time when agent was last seen."},
[]string{"host"},
),
activeAgentsGauge: prometheus.NewGauge(
prometheus.GaugeOpts{Name: "eventcollector_active_agents", Help: "Number of active agents seen within offline threshold."},
),
anomalyScoreGauge: prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: "eventcollector_anomaly_score", Help: "Current anomaly score per host and rule."},
[]string{"host", "rule"},
),
detectionHitsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{Name: "eventcollector_detection_hits_total", Help: "Total number of detections per rule and severity."},
[]string{"rule", "severity"},
),
ruleLastRunGauge: prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: "eventcollector_rule_last_run_unixtime", Help: "Unix time of the last successful rule run."},
[]string{"rule"},
),
ruleRuntimeHist: prometheus.NewHistogramVec(
prometheus.HistogramOpts{Name: "eventcollector_rule_runtime_seconds", Help: "Rule runtime duration.", Buckets: prometheus.DefBuckets},
[]string{"rule"},
),
ruleErrorsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{Name: "eventcollector_rule_errors_total", Help: "Rule execution errors."},
[]string{"rule"},
),
}
reg.MustRegister(
d.lastSeenGauge,
d.activeAgentsGauge,
d.anomalyScoreGauge,
d.detectionHitsTotal,
d.ruleLastRunGauge,
d.ruleRuntimeHist,
d.ruleErrorsTotal,
)
s := &server{
db: db,
logger: logger,
cfg: cfg,
registry: reg,
detector: d,
startTime: time.Now().UTC(),
}
tmpl := template.Must(template.New("ui").Funcs(template.FuncMap{
"q": url.QueryEscape,
"fmtTime": func(t time.Time) string {
if t.IsZero() {
return ""
}
return t.Local().Format("2006-01-02 15:04:05")
},
"short": func(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n] + "..."
},
}).Parse(uiTemplates))
s.templates = tmpl
go s.runDetectionLoop()
mux := http.NewServeMux()
mux.HandleFunc("/healthz", s.handleHealthz)
mux.HandleFunc("/readyz", s.handleReadyz)
mux.HandleFunc("/ingest", s.handleIngest)
mux.HandleFunc("/detections", s.handleDetections)
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
mux.HandleFunc("/ui", s.handleUIIndex)
mux.HandleFunc("/ui/detections", s.handleUIDetections)
mux.HandleFunc("/ui/events", s.handleUIEvents)
mux.HandleFunc("/ui/event", s.handleUIEventDetail)
httpSrv := &http.Server{
Addr: cfg.ListenAddr,
Handler: metricsMiddleware(logger, recoveryMiddleware(mux)),
ReadTimeout: cfg.HTTPReadTimeout,
WriteTimeout: cfg.HTTPWriteTimeout,
IdleTimeout: cfg.HTTPIdleTimeout,
}
go func() {
logger.Printf("listening on %s", cfg.ListenAddr)
if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Fatalf("ListenAndServe: %v", err)
}
}()
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
<-stop
logger.Println("shutdown requested")
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
defer shutdownCancel()
if err := httpSrv.Shutdown(shutdownCtx); err != nil {
logger.Printf("http shutdown error: %v", err)
}
if err := db.Close(); err != nil {
logger.Printf("db close error: %v", err)
}
}
func (s *server) handleUIIndex(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
stats, err := s.getDashboardStats(ctx)
if err != nil {
s.logger.Printf("dashboard stats: %v", err)
writeError(w, http.StatusInternalServerError, "internal error")
return
}
dets, err := s.listDetections(ctx, "", "", "", 20)
if err != nil {
s.logger.Printf("dashboard detections: %v", err)
writeError(w, http.StatusInternalServerError, "internal error")
return
}
events, err := s.listEvents(ctx, EventFilter{Limit: 20})
if err != nil {
s.logger.Printf("dashboard events: %v", err)
writeError(w, http.StatusInternalServerError, "internal error")
return
}
data := DashboardPageData{
Title: "SIEM-lite Dashboard",
Now: time.Now(),
Stats: stats,
RecentDetections: dets,
RecentEvents: events,
}
s.renderTemplate(w, "dashboard", data)
}
func (s *server) handleUIDetections(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
filters := map[string]string{
"host": strings.TrimSpace(r.URL.Query().Get("host")),
"rule": strings.TrimSpace(r.URL.Query().Get("rule")),
"severity": strings.TrimSpace(r.URL.Query().Get("severity")),
"limit": strings.TrimSpace(r.URL.Query().Get("limit")),
}
limit := s.cfg.DetectionsLimit
if filters["limit"] != "" {
if n, err := strconv.Atoi(filters["limit"]); err == nil && n > 0 && n <= 500 {
limit = n
}
}
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
items, err := s.listDetections(ctx, filters["host"], filters["rule"], filters["severity"], limit)
if err != nil {
s.logger.Printf("ui detections: %v", err)
writeError(w, http.StatusInternalServerError, "internal error")
return
}
data := DetectionListPageData{
Title: "Detections",
Now: time.Now(),
Filters: filters,
Detections: items,
}
s.renderTemplate(w, "detections", data)
}
func (s *server) handleUIEvents(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
filter := EventFilter{
Host: strings.TrimSpace(r.URL.Query().Get("host")),
Channel: strings.TrimSpace(r.URL.Query().Get("channel")),
Rule: strings.TrimSpace(r.URL.Query().Get("rule")),
User: strings.TrimSpace(r.URL.Query().Get("user")),
SrcIP: strings.TrimSpace(r.URL.Query().Get("src_ip")),
Severity: strings.TrimSpace(r.URL.Query().Get("severity")),
TimeFrom: strings.TrimSpace(r.URL.Query().Get("from")),
TimeTo: strings.TrimSpace(r.URL.Query().Get("to")),
Limit: 100,
}
if v := strings.TrimSpace(r.URL.Query().Get("event_id")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
filter.EventID = uint32(n)
}
}
if v := strings.TrimSpace(r.URL.Query().Get("limit")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 1000 {
filter.Limit = n
}
}
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
events, err := s.listEvents(ctx, filter)
if err != nil {
s.logger.Printf("ui events: %v", err)
writeError(w, http.StatusInternalServerError, "internal error")
return
}
filters := map[string]string{
"host": filter.Host,
"channel": filter.Channel,
"rule": filter.Rule,
"user": filter.User,
"src_ip": filter.SrcIP,
"severity": filter.Severity,
"from": filter.TimeFrom,
"to": filter.TimeTo,
"limit": strconv.Itoa(filter.Limit),
"event_id": func() string {
if filter.EventID == 0 {
return ""
}
return strconv.Itoa(int(filter.EventID))
}(),
}
data := EventListPageData{
Title: "Events",
Now: time.Now(),
Filters: filters,
Events: events,
}
s.renderTemplate(w, "events", data)
}
func (s *server) handleUIEventDetail(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
idStr := strings.TrimSpace(r.URL.Query().Get("id"))
if idStr == "" {
writeError(w, http.StatusBadRequest, "missing id")
return
}
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid id")
return
}
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
ev, err := s.getEventByID(ctx, id)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
http.NotFound(w, r)
return
}
s.logger.Printf("ui event detail: %v", err)
writeError(w, http.StatusInternalServerError, "internal error")
return
}
data := EventDetailPageData{
Title: "Event Detail",
Now: time.Now(),
Event: ev,
}
s.renderTemplate(w, "event_detail", data)
}
func (s *server) renderTemplate(w http.ResponseWriter, name string, data any) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if err := s.templates.ExecuteTemplate(w, name, data); err != nil {
s.logger.Printf("render template %s: %v", name, err)
http.Error(w, "template error", http.StatusInternalServerError)
}
}
type EventFilter struct {
Host string
Channel string
Rule string
User string
SrcIP string
Severity string
TimeFrom string
TimeTo string
EventID uint32
Limit int
}
func (s *server) getDashboardStats(ctx context.Context) (DashboardStats, error) {
var stats DashboardStats
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM agents WHERE is_enabled = 1`).Scan(&stats.AgentsTotal); err != nil {
return stats, err
}
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM agents WHERE is_enabled = 1 AND last_seen >= ?`, time.Now().UTC().Add(-s.cfg.OfflineAfter)).Scan(&stats.AgentsActive); err != nil {
return stats, err
}
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM event_logs WHERE ts >= ?`, time.Now().UTC().Add(-24*time.Hour)).Scan(&stats.Events24h); err != nil {
return stats, err
}
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM detections WHERE created_at >= ?`, time.Now().UTC().Add(-24*time.Hour)).Scan(&stats.Detections24h); err != nil {
return stats, err
}
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM detections WHERE created_at >= ? AND severity = 'high'`, time.Now().UTC().Add(-24*time.Hour)).Scan(&stats.HighDetections24h); err != nil {
return stats, err
}
return stats, nil
}
func (s *server) listEvents(ctx context.Context, f EventFilter) ([]EventRow, error) {
if f.Limit <= 0 || f.Limit > 1000 {
f.Limit = 100
}
query := `
SELECT id, hostname, channel_name, event_id, source, computer, provider_name,
target_user, target_domain, subject_user, subject_domain,
workstation, src_ip, src_port, logon_type, process_name,
authentication_package, logon_process, status_text, sub_status_text,
failure_reason, ts, received_at, msg
FROM event_logs
WHERE 1=1
`
args := make([]any, 0, 16)
if f.Host != "" {
query += ` AND hostname = ?`
args = append(args, f.Host)
}
if f.Channel != "" {
query += ` AND channel_name = ?`
args = append(args, f.Channel)
}
if f.EventID != 0 {
query += ` AND event_id = ?`
args = append(args, f.EventID)
}
if f.User != "" {
query += ` AND (target_user = ? OR subject_user = ?)`
args = append(args, f.User, f.User)
}
if f.SrcIP != "" {
query += ` AND src_ip = ?`
args = append(args, f.SrcIP)
}
if f.TimeFrom != "" {
if t, err := parseUIRFC3339(f.TimeFrom); err == nil {
query += ` AND ts >= ?`
args = append(args, t)
}
}
if f.TimeTo != "" {
if t, err := parseUIRFC3339(f.TimeTo); err == nil {
query += ` AND ts <= ?`
args = append(args, t)
}
}
if f.Rule != "" || f.Severity != "" {
query += ` AND EXISTS (
SELECT 1
FROM detections d
WHERE d.hostname = event_logs.hostname
AND event_logs.ts >= d.window_start
AND event_logs.ts <= d.window_end
`
if f.Rule != "" {
query += ` AND d.rule_name = ?`
args = append(args, f.Rule)
}
if f.Severity != "" {
query += ` AND d.severity = ?`
args = append(args, f.Severity)
}
query += ` )`
}
query += ` ORDER BY ts DESC LIMIT ?`
args = append(args, f.Limit)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var out []EventRow
for rows.Next() {
var ev EventRow
if err := rows.Scan(
&ev.ID, &ev.Hostname, &ev.Channel, &ev.EventID, &ev.Source,
&ev.Computer, &ev.ProviderName,
&ev.TargetUser, &ev.TargetDomain, &ev.SubjectUser, &ev.SubjectDomain,
&ev.Workstation, &ev.SrcIP, &ev.SrcPort, &ev.LogonType, &ev.ProcessName,
&ev.AuthenticationPackage, &ev.LogonProcess, &ev.StatusText, &ev.SubStatusText,
&ev.FailureReason, &ev.Time, &ev.ReceivedAt, &ev.Message,
); err != nil {
return nil, err
}
out = append(out, ev)
}
return out, rows.Err()
}
func (s *server) getEventByID(ctx context.Context, id uint64) (EventRow, error) {
const q = `
SELECT id, hostname, channel_name, event_id, source, computer, provider_name,
target_user, target_domain, subject_user, subject_domain,
workstation, src_ip, src_port, logon_type, process_name,
authentication_package, logon_process, status_text, sub_status_text,
failure_reason, ts, received_at, msg
FROM event_logs
WHERE id = ?
LIMIT 1
`
var ev EventRow
err := s.db.QueryRowContext(ctx, q, id).Scan(
&ev.ID, &ev.Hostname, &ev.Channel, &ev.EventID, &ev.Source,
&ev.Computer, &ev.ProviderName,
&ev.TargetUser, &ev.TargetDomain, &ev.SubjectUser, &ev.SubjectDomain,
&ev.Workstation, &ev.SrcIP, &ev.SrcPort, &ev.LogonType, &ev.ProcessName,
&ev.AuthenticationPackage, &ev.LogonProcess, &ev.StatusText, &ev.SubStatusText,
&ev.FailureReason, &ev.Time, &ev.ReceivedAt, &ev.Message,
)
return ev, err
}
func parseUIRFC3339(v string) (time.Time, error) {
return time.Parse(time.RFC3339, strings.TrimSpace(v))
}
func loadConfig() Config {
return Config{
ListenAddr: getenv("LISTEN_ADDR", ":8080"),
DBDSN: mustGetenv("DB_DSN"),
MaxBodyBytes: getenvInt64("MAX_BODY_BYTES", 10*1024*1024),
HTTPReadTimeout: getenvDuration("HTTP_READ_TIMEOUT", 15*time.Second),
HTTPWriteTimeout: getenvDuration("HTTP_WRITE_TIMEOUT", 30*time.Second),
HTTPIdleTimeout: getenvDuration("HTTP_IDLE_TIMEOUT", 60*time.Second),
DBMaxOpenConns: getenvInt("DB_MAX_OPEN_CONNS", 50),
DBMaxIdleConns: getenvInt("DB_MAX_IDLE_CONNS", 25),
DBConnMaxLifetime: getenvDuration("DB_CONN_MAX_LIFETIME", 3*time.Minute),
DBConnMaxIdleTime: getenvDuration("DB_CONN_MAX_IDLE_TIME", 1*time.Minute),
DetectionInterval: getenvDuration("DETECTION_INTERVAL", 1*time.Minute),
OfflineAfter: getenvDuration("OFFLINE_AFTER", 10*time.Minute),
FailedLogonWindow: getenvDuration("FAILED_LOGON_WINDOW", 5*time.Minute),
FailedLogonThreshold: getenvInt("FAILED_LOGON_THRESHOLD", 25),
RebootWindow: getenvDuration("REBOOT_WINDOW", 15*time.Minute),
RebootThreshold: getenvInt("REBOOT_THRESHOLD", 3),
PasswordSprayWindow: getenvDuration("PASSWORD_SPRAY_WINDOW", 5*time.Minute),
PasswordSprayMinUsers: getenvInt("PASSWORD_SPRAY_MIN_USERS", 5),
PasswordSprayMinAttempts: getenvInt("PASSWORD_SPRAY_MIN_ATTEMPTS", 15),
SuccessAfterFailureWindow: getenvDuration("SUCCESS_AFTER_FAILURE_WINDOW", 10*time.Minute),
NewSourceIPLookback: getenvDuration("NEW_SOURCE_IP_LOOKBACK", 30*24*time.Hour),
NewSourceIPWindow: getenvDuration("NEW_SOURCE_IP_WINDOW", 10*time.Minute),
DetectionsLimit: getenvInt("DETECTIONS_LIMIT", 100),
EnrollmentKey: mustGetenv("ENROLLMENT_KEY"),
}
}
func (s *server) handleHealthz(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
ingestRejectedTotal.WithLabelValues("method_not_allowed").Inc()
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
writeJSON(w, http.StatusOK, map[string]any{
"status": "ok",
"uptime_sec": int(time.Since(s.startTime).Seconds()),
})
}
func (s *server) handleReadyz(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
defer cancel()
if err := s.db.PingContext(ctx); err != nil {
writeError(w, http.StatusServiceUnavailable, "database not ready")
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ready"})
}
func (s *server) handleIngest(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
ingestRejectedTotal.WithLabelValues("method_not_allowed").Inc()
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
apiKey := strings.TrimSpace(r.Header.Get("X-API-Key"))
enrollmentKey := strings.TrimSpace(r.Header.Get("X-Enrollment-Key"))
if apiKey == "" {
ingestRejectedTotal.WithLabelValues("missing_api_key").Inc()
writeError(w, http.StatusUnauthorized, "missing api key")
return
}
r.Body = http.MaxBytesReader(w, r.Body, s.cfg.MaxBodyBytes)
defer r.Body.Close()
dec := json.NewDecoder(r.Body)
dec.DisallowUnknownFields()
var batch []LogPayload
if err := dec.Decode(&batch); err != nil {
ingestRejectedTotal.WithLabelValues("invalid_json").Inc()
writeError(w, http.StatusBadRequest, "invalid json")
return
}
if len(batch) == 0 {
ingestRejectedTotal.WithLabelValues("empty_batch").Inc()
writeError(w, http.StatusBadRequest, "empty batch")
return
}
if len(batch) > 1000 {
ingestRejectedTotal.WithLabelValues("batch_too_large").Inc()
writeError(w, http.StatusBadRequest, "batch too large")
return
}
for i := range batch {
if err := validatePayload(&batch[i]); err != nil {
ingestRejectedTotal.WithLabelValues("invalid_payload").Inc()
writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid payload at index %d: %v", i, err))
return
}
}
hostname := batch[0].Hostname
for i := 1; i < len(batch); i++ {
if batch[i].Hostname != hostname {
ingestRejectedTotal.WithLabelValues("mixed_hostnames").Inc()
writeError(w, http.StatusBadRequest, "all events in a batch must use the same hostname")
return
}
}
ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
defer cancel()
agentID, err := s.authenticateTouchOrEnrollAgent(
ctx,
hostname,
apiKey,
enrollmentKey,
clientIP(r.RemoteAddr),
)
if err != nil {
if errors.Is(err, errUnauthorized) {
ingestRejectedTotal.WithLabelValues("unauthorized").Inc()
writeError(w, http.StatusUnauthorized, "invalid api key or hostname")
return
}
ingestRejectedTotal.WithLabelValues("auth_error").Inc()
s.logger.Printf("authenticate agent: %v", err)
writeError(w, http.StatusInternalServerError, "internal error")
return
}
if err := s.insertBatch(ctx, agentID, batch); err != nil {
dbInsertFailuresTotal.Inc()
s.logger.Printf("insert batch: %v", err)
writeError(w, http.StatusInternalServerError, "internal error")
return
}
ingestBatchesTotal.Inc()
for _, item := range batch {
ingestEventsTotal.WithLabelValues(item.Channel, strconv.FormatUint(uint64(item.EventID), 10)).Inc()
}
writeJSON(w, http.StatusAccepted, ingestResponse{Accepted: len(batch)})
}
func (s *server) handleDetections(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
limit := s.cfg.DetectionsLimit
if v := strings.TrimSpace(r.URL.Query().Get("limit")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 500 {
limit = n
}
}
host := strings.TrimSpace(r.URL.Query().Get("host"))
rule := strings.TrimSpace(r.URL.Query().Get("rule"))
severity := strings.TrimSpace(r.URL.Query().Get("severity"))
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
items, err := s.listDetections(ctx, host, rule, severity, limit)
if err != nil {
s.logger.Printf("list detections: %v", err)
writeError(w, http.StatusInternalServerError, "internal error")
return
}
writeJSON(w, http.StatusOK, items)
}
func (s *server) authenticateTouchOrEnrollAgent(ctx context.Context, hostname, apiKey, enrollmentKey, remoteIP string) (uint64, error) {
const q = `
SELECT id, api_key_hash, is_enabled
FROM agents
WHERE hostname = ?
LIMIT 1
`
var id uint64
var storedHash string
var enabled bool
err := s.db.QueryRowContext(ctx, q, hostname).Scan(&id, &storedHash, &enabled)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
if enrollmentKey == "" {
return 0, errUnauthorized
}
if !constantTimeEqual(sha256Hex(enrollmentKey), sha256Hex(s.cfg.EnrollmentKey)) {
return 0, errUnauthorized
}
return s.enrollNewAgent(ctx, hostname, apiKey, remoteIP)
}
return 0, err
}
if !enabled {
return 0, errUnauthorized
}
if !constantTimeEqual(strings.ToLower(storedHash), sha256Hex(apiKey)) {
return 0, errUnauthorized
}
const upd = `
UPDATE agents
SET last_seen = CURRENT_TIMESTAMP(6), last_ip = ?
WHERE id = ?
`
if _, err := s.db.ExecContext(ctx, upd, remoteIP, id); err != nil {
return 0, err
}
return id, nil
}
func (s *server) enrollNewAgent(ctx context.Context, hostname, apiKey, remoteIP string) (uint64, error) {
if strings.TrimSpace(hostname) == "" {
return 0, errUnauthorized
}
if strings.TrimSpace(apiKey) == "" {
return 0, errUnauthorized
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return 0, err
}
defer func() { _ = tx.Rollback() }()
const ins = `
INSERT INTO agents (hostname, api_key_hash, first_seen, last_seen, last_ip, is_enabled)
VALUES (?, ?, CURRENT_TIMESTAMP(6), CURRENT_TIMESTAMP(6), ?, 1)
`
res, err := tx.ExecContext(ctx, ins, hostname, sha256Hex(apiKey), remoteIP)
if err != nil {
return 0, err
}
id, err := res.LastInsertId()
if err != nil {
return 0, err
}
if err := tx.Commit(); err != nil {
return 0, err
}
s.logger.Printf("agent auto-enrolled: host=%s ip=%s", hostname, remoteIP)
return uint64(id), nil
}
var errUnauthorized = errors.New("unauthorized")
func (s *server) insertBatch(ctx context.Context, agentID uint64, batch []LogPayload) error {
start := time.Now()
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()
var sb strings.Builder
args := make([]any, 0, len(batch)*28)
sb.WriteString(`
INSERT INTO event_logs (
agent_id, hostname, channel_name, event_id, source, computer, provider_name,
level_value, task_value, opcode_value, keywords,
target_user, target_domain, subject_user, subject_domain,
workstation, src_ip, src_port, logon_type, process_name,
authentication_package, logon_process, status_text, sub_status_text,
failure_reason, ts, msg, msg_sha256
) VALUES
`)
for i, item := range batch {
if i > 0 {
sb.WriteString(",")
}
sb.WriteString("(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
norm := NormalizeEventXML(item.Message)
args = append(args,
agentID,
item.Hostname,
item.Channel,
item.EventID,
item.Source,
norm.Computer,
firstNonEmpty(norm.ProviderName, item.Source),
norm.LevelValue,
norm.TaskValue,
norm.OpcodeValue,
norm.Keywords,
norm.TargetUser,
norm.TargetDomain,
norm.SubjectUser,
norm.SubjectDomain,
norm.Workstation,
norm.SrcIP,
norm.SrcPort,
norm.LogonType,
norm.ProcessName,
norm.AuthenticationPackage,
norm.LogonProcess,
norm.StatusText,
norm.SubStatusText,
norm.FailureReason,
item.Time.UTC(),
item.Message,
sha256Hex(item.Message),
)
}
if _, err := tx.ExecContext(ctx, sb.String(), args...); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
dbBatchSizeHist.Observe(float64(len(batch)))
dbInsertEventsTotal.Add(float64(len(batch)))
dbTxDurationHist.Observe(time.Since(start).Seconds())
return nil
}
func (s *server) listDetections(ctx context.Context, host, rule, severity string, limit int) ([]Detection, error) {
base := `
SELECT id, rule_name, severity, hostname, channel_name, event_id, score,
window_start, window_end, summary, details_json, created_at
FROM detections
WHERE 1=1
`
args := make([]any, 0, 4)
if host != "" {
base += " AND hostname = ?"
args = append(args, host)
}
if rule != "" {
base += " AND rule_name = ?"
args = append(args, rule)
}
if severity != "" {
base += " AND severity = ?"
args = append(args, severity)
}
base += " ORDER BY created_at DESC LIMIT ?"
args = append(args, limit)
rows, err := s.db.QueryContext(ctx, base, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var out []Detection
for rows.Next() {
var d Detection
if err := rows.Scan(
&d.ID, &d.RuleName, &d.Severity, &d.Hostname, &d.Channel,
&d.EventID, &d.Score, &d.WindowStart, &d.WindowEnd,
&d.Summary, &d.Details, &d.CreatedAt,
); err != nil {
return nil, err
}
out = append(out, d)
}
return out, rows.Err()
}
func (s *server) runDetectionLoop() {
ticker := time.NewTicker(s.cfg.DetectionInterval)
defer ticker.Stop()
s.runDetectionsOnce()
for range ticker.C {
s.runDetectionsOnce()
}
}
func (s *server) runDetectionsOnce() {
ctx, cancel := context.WithTimeout(context.Background(), s.cfg.DetectionInterval)
defer cancel()
if err := s.detector.updateAgentMetrics(ctx); err != nil {
s.logger.Printf("update agent metrics: %v", err)
}
rules := []struct {
name string
fn func(context.Context) error
}{
{"agent_offline", s.detector.runAgentOfflineRule},
{"failed_logon_spike", s.detector.runFailedLogonSpikeRule},
{"reboot_spike", s.detector.runRebootSpikeRule},
{"new_event_id", s.detector.runNewEventIDRule},
{"password_spray", s.detector.runPasswordSprayRule},
{"success_after_failures", s.detector.runSuccessAfterFailuresRule},
{"new_source_ip_for_user", s.detector.runNewSourceIPForUserRule},
}
for _, rule := range rules {
start := time.Now()
if err := rule.fn(ctx); err != nil {
s.logger.Printf("rule %s error: %v", rule.name, err)
s.detector.ruleErrorsTotal.WithLabelValues(rule.name).Inc()
continue
}
s.detector.ruleLastRunGauge.WithLabelValues(rule.name).Set(float64(time.Now().Unix()))
s.detector.ruleRuntimeHist.WithLabelValues(rule.name).Observe(time.Since(start).Seconds())
}
}
func (d *detector) updateAgentMetrics(ctx context.Context) error {
const q = `
SELECT hostname, UNIX_TIMESTAMP(last_seen)
FROM agents
WHERE is_enabled = 1
`
rows, err := d.db.QueryContext(ctx, q)
if err != nil {
return err
}
defer rows.Close()
active := 0
now := time.Now().UTC()
for rows.Next() {
var host string
var lastSeen sql.NullInt64
if err := rows.Scan(&host, &lastSeen); err != nil {
return err
}
if lastSeen.Valid {
d.lastSeenGauge.WithLabelValues(host).Set(float64(lastSeen.Int64))
if now.Sub(time.Unix(lastSeen.Int64, 0).UTC()) <= d.cfg.OfflineAfter {
active++
}
}
}
if err := rows.Err(); err != nil {
return err
}
d.activeAgentsGauge.Set(float64(active))
return nil
}
func (d *detector) runAgentOfflineRule(ctx context.Context) error {
windowEnd := time.Now().UTC()
windowStart := windowEnd.Add(-d.cfg.OfflineAfter)
const q = `
SELECT hostname, last_seen
FROM agents
WHERE is_enabled = 1
AND last_seen < ?
`
rows, err := d.db.QueryContext(ctx, q, windowStart)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var host string
var lastSeen time.Time
if err := rows.Scan(&host, &lastSeen); err != nil {
return err
}
minutes := int(windowEnd.Sub(lastSeen).Minutes())
score := math.Max(1, float64(minutes)/float64(int(d.cfg.OfflineAfter.Minutes())))
severity := severityFromScore(score, 1.5, 3.0)
d.anomalyScoreGauge.WithLabelValues(host, "agent_offline").Set(score)
created, err := d.insertDetection(ctx, Detection{
RuleName: "agent_offline",
Severity: severity,
Hostname: host,
Score: score,
WindowStart: windowStart,
WindowEnd: windowEnd,
Summary: fmt.Sprintf("Agent %s ist seit %d Minuten offline", host, minutes),
Details: mustJSON(map[string]any{
"last_seen": lastSeen.UTC().Format(time.RFC3339Nano),
"offline_minutes": minutes,
"offline_after_min": int(d.cfg.OfflineAfter.Minutes()),
}),
})
if err != nil {
return err
}
if created {
d.detectionHitsTotal.WithLabelValues("agent_offline", severity).Inc()
}
}
return rows.Err()
}
func (d *detector) runFailedLogonSpikeRule(ctx context.Context) error {
windowEnd := time.Now().UTC()
windowStart := windowEnd.Add(-d.cfg.FailedLogonWindow)
const q = `
SELECT hostname, COUNT(*) AS cnt
FROM event_logs
WHERE channel_name = 'Security'
AND event_id = 4625
AND ts >= ? AND ts < ?
GROUP BY hostname
HAVING COUNT(*) >= ?
`
rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, d.cfg.FailedLogonThreshold)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var host string
var count int
if err := rows.Scan(&host, &count); err != nil {
return err
}
score := float64(count) / float64(d.cfg.FailedLogonThreshold)
severity := severityFromScore(score, 2.0, 5.0)
d.anomalyScoreGauge.WithLabelValues(host, "failed_logon_spike").Set(score)
created, err := d.insertDetection(ctx, Detection{
RuleName: "failed_logon_spike",
Severity: severity,
Hostname: host,
Channel: "Security",
EventID: 4625,
Score: score,
WindowStart: windowStart,
WindowEnd: windowEnd,
Summary: fmt.Sprintf("Host %s hatte %d fehlgeschlagene Logons in %d Minuten", host, count, int(d.cfg.FailedLogonWindow.Minutes())),
Details: mustJSON(map[string]any{
"count": count,
"threshold": d.cfg.FailedLogonThreshold,
"window_minutes": int(d.cfg.FailedLogonWindow.Minutes()),
"event_id": 4625,
}),
})
if err != nil {
return err
}
if created {
d.detectionHitsTotal.WithLabelValues("failed_logon_spike", severity).Inc()
}
}
return rows.Err()
}
func (d *detector) runRebootSpikeRule(ctx context.Context) error {
windowEnd := time.Now().UTC()
windowStart := windowEnd.Add(-d.cfg.RebootWindow)
const q = `
SELECT hostname, COUNT(*) AS cnt
FROM event_logs
WHERE channel_name = 'System'
AND event_id IN (1074, 6005, 6006)
AND ts >= ? AND ts < ?
GROUP BY hostname
HAVING COUNT(*) >= ?
`
rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, d.cfg.RebootThreshold)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var host string
var count int
if err := rows.Scan(&host, &count); err != nil {
return err
}
score := float64(count) / float64(d.cfg.RebootThreshold)
severity := severityFromScore(score, 2.0, 4.0)
d.anomalyScoreGauge.WithLabelValues(host, "reboot_spike").Set(score)
created, err := d.insertDetection(ctx, Detection{
RuleName: "reboot_spike",
Severity: severity,
Hostname: host,
Channel: "System",
Score: score,
WindowStart: windowStart,
WindowEnd: windowEnd,
Summary: fmt.Sprintf("Host %s hatte %d Reboot-/Shutdown-Events in %d Minuten", host, count, int(d.cfg.RebootWindow.Minutes())),
Details: mustJSON(map[string]any{
"count": count,
"threshold": d.cfg.RebootThreshold,
"window_minutes": int(d.cfg.RebootWindow.Minutes()),
"event_ids": []int{1074, 6005, 6006},
}),
})
if err != nil {
return err
}
if created {
d.detectionHitsTotal.WithLabelValues("reboot_spike", severity).Inc()
}
}
return rows.Err()
}
func (d *detector) runNewEventIDRule(ctx context.Context) error {
windowEnd := time.Now().UTC()
windowStart := windowEnd.Add(-d.cfg.DetectionInterval)
const q = `
SELECT e.hostname, e.channel_name, e.event_id, COUNT(*) AS cnt
FROM event_logs e
WHERE e.ts >= ? AND e.ts < ?
AND NOT EXISTS (
SELECT 1
FROM event_logs old
WHERE old.hostname = e.hostname
AND old.channel_name = e.channel_name
AND old.event_id = e.event_id
AND old.ts < ?
)
GROUP BY e.hostname, e.channel_name, e.event_id
`
rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, windowStart)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var host, channel string
var eventID uint32
var count int
if err := rows.Scan(&host, &channel, &eventID, &count); err != nil {
return err
}
score := 1.0 + math.Log10(float64(count)+1)
severity := "medium"
if count >= 10 {
severity = "high"
}
d.anomalyScoreGauge.WithLabelValues(host, "new_event_id").Set(score)
created, err := d.insertDetection(ctx, Detection{
RuleName: "new_event_id",
Severity: severity,
Hostname: host,
Channel: channel,
EventID: eventID,
Score: score,
WindowStart: windowStart,
WindowEnd: windowEnd,
Summary: fmt.Sprintf("Host %s sendet erstmals Event-ID %d im Channel %s", host, eventID, channel),
Details: mustJSON(map[string]any{
"count": count,
"channel": channel,
"event_id": eventID,
}),
})
if err != nil {
return err
}
if created {
d.detectionHitsTotal.WithLabelValues("new_event_id", severity).Inc()
}
}
return rows.Err()
}
func (d *detector) runPasswordSprayRule(ctx context.Context) error {
windowEnd := time.Now().UTC()
windowStart := windowEnd.Add(-d.cfg.PasswordSprayWindow)
const q = `
SELECT hostname, src_ip, COUNT(*) AS attempts, COUNT(DISTINCT target_user) AS users
FROM event_logs
WHERE channel_name = 'Security'
AND event_id = 4625
AND ts >= ? AND ts < ?
AND src_ip <> '' AND src_ip <> '-' AND src_ip <> '::1' AND src_ip <> '127.0.0.1'
AND target_user <> '' AND target_user <> '-'
GROUP BY hostname, src_ip
HAVING COUNT(*) >= ? AND COUNT(DISTINCT target_user) >= ?
`
rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, d.cfg.PasswordSprayMinAttempts, d.cfg.PasswordSprayMinUsers)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var host, srcIP string
var attempts, users int
if err := rows.Scan(&host, &srcIP, &attempts, &users); err != nil {
return err
}
score := math.Max(float64(attempts)/float64(d.cfg.PasswordSprayMinAttempts), float64(users)/float64(d.cfg.PasswordSprayMinUsers))
severity := severityFromScore(score, 1.5, 3.0)
d.anomalyScoreGauge.WithLabelValues(host, "password_spray").Set(score)
created, err := d.insertDetection(ctx, Detection{
RuleName: "password_spray",
Severity: severity,
Hostname: host,
Channel: "Security",
EventID: 4625,
Score: score,
WindowStart: windowStart,
WindowEnd: windowEnd,
Summary: fmt.Sprintf("Möglicher Password-Spray auf %s von %s: %d Fehlversuche gegen %d Benutzer", host, srcIP, attempts, users),
Details: mustJSON(map[string]any{
"src_ip": srcIP,
"attempts": attempts,
"distinct_users": users,
"window_minutes": int(d.cfg.PasswordSprayWindow.Minutes()),
"event_id": 4625,
}),
})
if err != nil {
return err
}
if created {
d.detectionHitsTotal.WithLabelValues("password_spray", severity).Inc()
}
}
return rows.Err()
}
func (d *detector) runSuccessAfterFailuresRule(ctx context.Context) error {
windowEnd := time.Now().UTC()
windowStart := windowEnd.Add(-d.cfg.SuccessAfterFailureWindow)
const q = `
SELECT s.hostname, s.target_user, s.src_ip, COUNT(*) AS success_count
FROM event_logs s
WHERE s.channel_name = 'Security'
AND s.event_id = 4624
AND s.ts >= ? AND s.ts < ?
AND s.target_user <> '' AND s.target_user <> '-'
AND EXISTS (
SELECT 1
FROM event_logs f
WHERE f.hostname = s.hostname
AND f.channel_name = 'Security'
AND f.event_id = 4625
AND f.target_user = s.target_user
AND (
(f.src_ip = s.src_ip AND s.src_ip <> '' AND s.src_ip <> '-')
OR (s.src_ip = '' OR s.src_ip = '-' OR f.src_ip = '' OR f.src_ip = '-')
)
AND f.ts >= DATE_SUB(s.ts, INTERVAL ? SECOND)
AND f.ts < s.ts
)
GROUP BY s.hostname, s.target_user, s.src_ip
`
rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, int(d.cfg.SuccessAfterFailureWindow.Seconds()))
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var host, user, srcIP string
var successCount int
if err := rows.Scan(&host, &user, &srcIP, &successCount); err != nil {
return err
}
score := 2.0 + math.Log10(float64(successCount)+1)
severity := "high"
d.anomalyScoreGauge.WithLabelValues(host, "success_after_failures").Set(score)
created, err := d.insertDetection(ctx, Detection{
RuleName: "success_after_failures",
Severity: severity,
Hostname: host,
Channel: "Security",
EventID: 4624,
Score: score,
WindowStart: windowStart,
WindowEnd: windowEnd,
Summary: fmt.Sprintf("Erfolgreicher Logon nach Fehlversuchen auf %s für Benutzer %s", host, user),
Details: mustJSON(map[string]any{
"user": user,
"src_ip": srcIP,
"success_count": successCount,
"window_minutes": int(d.cfg.SuccessAfterFailureWindow.Minutes()),
"success_event": 4624,
"failure_event": 4625,
}),
})
if err != nil {
return err
}
if created {
d.detectionHitsTotal.WithLabelValues("success_after_failures", severity).Inc()
}
}
return rows.Err()
}
func (d *detector) runNewSourceIPForUserRule(ctx context.Context) error {
windowEnd := time.Now().UTC()
windowStart := windowEnd.Add(-d.cfg.NewSourceIPWindow)
lookbackStart := windowStart.Add(-d.cfg.NewSourceIPLookback)
const q = `
SELECT e.hostname, e.target_user, e.src_ip, COUNT(*) AS cnt
FROM event_logs e
WHERE e.channel_name = 'Security'
AND e.event_id = 4624
AND e.ts >= ? AND e.ts < ?
AND e.target_user <> '' AND e.target_user <> '-'
AND e.src_ip <> '' AND e.src_ip <> '-' AND e.src_ip <> '::1' AND e.src_ip <> '127.0.0.1'
AND NOT EXISTS (
SELECT 1
FROM event_logs old
WHERE old.hostname = e.hostname
AND old.channel_name = 'Security'
AND old.event_id = 4624
AND old.target_user = e.target_user
AND old.src_ip = e.src_ip
AND old.ts >= ? AND old.ts < ?
)
GROUP BY e.hostname, e.target_user, e.src_ip
`
rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, lookbackStart, windowStart)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var host, user, srcIP string
var cnt int
if err := rows.Scan(&host, &user, &srcIP, &cnt); err != nil {
return err
}
score := 1.5 + math.Log10(float64(cnt)+1)
severity := "medium"
if cnt >= 5 {
severity = "high"
}
d.anomalyScoreGauge.WithLabelValues(host, "new_source_ip_for_user").Set(score)
created, err := d.insertDetection(ctx, Detection{
RuleName: "new_source_ip_for_user",
Severity: severity,
Hostname: host,
Channel: "Security",
EventID: 4624,
Score: score,
WindowStart: windowStart,
WindowEnd: windowEnd,
Summary: fmt.Sprintf("Benutzer %s meldet sich auf %s von neuer Quell-IP %s an", user, host, srcIP),
Details: mustJSON(map[string]any{
"user": user,
"src_ip": srcIP,
"count": cnt,
"window_minutes": int(d.cfg.NewSourceIPWindow.Minutes()),
"lookback_hours": int(d.cfg.NewSourceIPLookback.Hours()),
"event_id": 4624,
}),
})
if err != nil {
return err
}
if created {
d.detectionHitsTotal.WithLabelValues("new_source_ip_for_user", severity).Inc()
}
}
return rows.Err()
}
func (d *detector) insertDetection(ctx context.Context, det Detection) (bool, error) {
const q = `
INSERT IGNORE INTO detections
(rule_name, severity, hostname, channel_name, event_id, score, window_start, window_end, summary, details_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
res, err := d.db.ExecContext(ctx, q,
det.RuleName,
det.Severity,
det.Hostname,
det.Channel,
det.EventID,
det.Score,
det.WindowStart.UTC(),
det.WindowEnd.UTC(),
det.Summary,
string(det.Details),
)
if err != nil {
return false, err
}
affected, err := res.RowsAffected()
if err != nil {
return false, err
}
return affected > 0, nil
}
func NormalizeEventXML(xmlStr string) NormalizedEvent {
var out NormalizedEvent
if strings.TrimSpace(xmlStr) == "" {
return out
}
dec := xml.NewDecoder(strings.NewReader(xmlStr))
var path []string
var currentDataName string
for {
tok, err := dec.Token()
if err != nil {
if err == io.EOF {
break
}
return out
}
switch t := tok.(type) {
case xml.StartElement:
path = append(path, t.Name.Local)
switch t.Name.Local {
case "Provider":
for _, a := range t.Attr {
if a.Name.Local == "Name" {
out.ProviderName = strings.TrimSpace(a.Value)
}
}
case "Data":
currentDataName = ""
for _, a := range t.Attr {
if a.Name.Local == "Name" {
currentDataName = strings.TrimSpace(a.Value)
break
}
}
}
case xml.EndElement:
if len(path) > 0 {
path = path[:len(path)-1]
}
if t.Name.Local == "Data" {
currentDataName = ""
}
case xml.CharData:
v := strings.TrimSpace(string(t))
if v == "" {
continue
}
if endsWithPath(path, "System", "Computer") {
out.Computer = v
continue
}
if endsWithPath(path, "System", "Level") {
out.LevelValue = parseUint32(v)
continue
}
if endsWithPath(path, "System", "Task") {
out.TaskValue = parseUint32(v)
continue
}
if endsWithPath(path, "System", "Opcode") {
out.OpcodeValue = parseUint32(v)
continue
}
if endsWithPath(path, "System", "Keywords") {
out.Keywords = v
continue
}
if currentDataName != "" {
switch currentDataName {
case "TargetUserName":
out.TargetUser = v
case "TargetDomainName":
out.TargetDomain = v
case "SubjectUserName":
out.SubjectUser = v
case "SubjectDomainName":
out.SubjectDomain = v
case "WorkstationName":
out.Workstation = v
case "IpAddress":
out.SrcIP = v
case "IpPort":
out.SrcPort = v
case "LogonType":
out.LogonType = v
case "ProcessName":
out.ProcessName = v
case "AuthenticationPackageName":
out.AuthenticationPackage = v
case "LogonProcessName":
out.LogonProcess = v
case "Status":
out.StatusText = v
case "SubStatus":
out.SubStatusText = v
case "FailureReason":
out.FailureReason = v
}
}
}
}
out.TargetUser = cleanupWinField(out.TargetUser)
out.TargetDomain = cleanupWinField(out.TargetDomain)
out.SubjectUser = cleanupWinField(out.SubjectUser)
out.SubjectDomain = cleanupWinField(out.SubjectDomain)
out.Workstation = cleanupWinField(out.Workstation)
out.SrcIP = cleanupWinField(out.SrcIP)
out.SrcPort = cleanupWinField(out.SrcPort)
out.LogonType = cleanupWinField(out.LogonType)
out.ProcessName = cleanupWinField(out.ProcessName)
out.AuthenticationPackage = cleanupWinField(out.AuthenticationPackage)
out.LogonProcess = cleanupWinField(out.LogonProcess)
out.StatusText = cleanupWinField(out.StatusText)
out.SubStatusText = cleanupWinField(out.SubStatusText)
out.FailureReason = cleanupWinField(out.FailureReason)
out.Computer = cleanupWinField(out.Computer)
out.ProviderName = cleanupWinField(out.ProviderName)
return out
}
func endsWithPath(path []string, parts ...string) bool {
if len(path) < len(parts) {
return false
}
start := len(path) - len(parts)
for i := range parts {
if path[start+i] != parts[i] {
return false
}
}
return true
}
func parseUint32(v string) uint32 {
n, err := strconv.ParseUint(strings.TrimSpace(v), 10, 32)
if err != nil {
return 0
}
return uint32(n)
}
func cleanupWinField(v string) string {
v = strings.TrimSpace(v)
if v == "" {
return ""
}
if v == "-" {
return ""
}
return v
}
func firstNonEmpty(v ...string) string {
for _, s := range v {
if strings.TrimSpace(s) != "" {
return strings.TrimSpace(s)
}
}
return ""
}
func validatePayload(p *LogPayload) error {
p.Hostname = strings.TrimSpace(p.Hostname)
p.Channel = strings.TrimSpace(p.Channel)
p.Source = strings.TrimSpace(p.Source)
if p.Hostname == "" {
return errors.New("host is required")
}
if len(p.Hostname) > 255 {
return errors.New("host too long")
}
if p.Channel == "" {
return errors.New("channel is required")
}
if len(p.Channel) > 128 {
return errors.New("channel too long")
}
if p.Source == "" {
return errors.New("source is required")
}
if len(p.Source) > 255 {
return errors.New("source too long")
}
if p.EventID == 0 {
return errors.New("event id is required")
}
if p.Message == "" {
return errors.New("msg is required")
}
if len(p.Message) > 2*1024*1024 {
return errors.New("msg too large")
}
if p.Time.IsZero() {
return errors.New("ts is required")
}
return nil
}
func severityFromScore(score, medium, high float64) string {
switch {
case score >= high:
return "high"
case score >= medium:
return "medium"
default:
return "low"
}
}
func mustJSON(v any) json.RawMessage {
b, _ := json.Marshal(v)
return b
}
func metricsMiddleware(logger *log.Logger, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rw := &statusRecorder{ResponseWriter: w, status: 200}
start := time.Now()
next.ServeHTTP(rw, r)
status := strconv.Itoa(rw.status)
path := r.URL.Path
httpRequestsTotal.WithLabelValues(path, r.Method, status).Inc()
httpRequestDuration.WithLabelValues(path, r.Method, status).Observe(time.Since(start).Seconds())
logger.Printf("%s %s remote=%s status=%s dur=%s", r.Method, r.URL.Path, r.RemoteAddr, status, time.Since(start))
})
}
type statusRecorder struct {
http.ResponseWriter
status int
}
func (r *statusRecorder) WriteHeader(status int) {
r.status = status
r.ResponseWriter.WriteHeader(status)
}
func recoveryMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
if recover() != nil {
writeError(w, http.StatusInternalServerError, "internal error")
}
}()
next.ServeHTTP(w, r)
})
}
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(v)
}
func writeError(w http.ResponseWriter, status int, msg string) {
writeJSON(w, status, map[string]string{"error": msg})
}
func sha256Hex(s string) string {
sum := sha256.Sum256([]byte(s))
return hex.EncodeToString(sum[:])
}
func constantTimeEqual(a, b string) bool {
if len(a) != len(b) {
return false
}
var v byte
for i := 0; i < len(a); i++ {
v |= a[i] ^ b[i]
}
return v == 0
}
func clientIP(remoteAddr string) string {
host, _, err := net.SplitHostPort(remoteAddr)
if err != nil {
return remoteAddr
}
return host
}
func getenv(key, def string) string {
v := strings.TrimSpace(os.Getenv(key))
if v == "" {
return def
}
return v
}
func mustGetenv(key string) string {
v := strings.TrimSpace(os.Getenv(key))
if v == "" {
log.Fatalf("missing required environment variable: %s", key)
}
return v
}
func getenvInt(key string, def int) int {
v := strings.TrimSpace(os.Getenv(key))
if v == "" {
return def
}
n, err := strconv.Atoi(v)
if err != nil {
log.Fatalf("invalid int for %s: %v", key, err)
}
return n
}
func getenvInt64(key string, def int64) int64 {
v := strings.TrimSpace(os.Getenv(key))
if v == "" {
return def
}
n, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Fatalf("invalid int64 for %s: %v", key, err)
}
return n
}
func getenvDuration(key string, def time.Duration) time.Duration {
v := strings.TrimSpace(os.Getenv(key))
if v == "" {
return def
}
d, err := time.ParseDuration(v)
if err != nil {
log.Fatalf("invalid duration for %s: %v", key, err)
}
return d
}