diff --git a/.env b/.env
new file mode 100644
index 0000000..3941672
--- /dev/null
+++ b/.env
@@ -0,0 +1,38 @@
+TZ=Europe/Berlin
+
+LISTEN_ADDR=:8080
+DB_DSN=eventuser:DEINPASSWORT@tcp(mariadb:3306)/eventcollector?parseTime=true&charset=utf8mb4,utf8&collation=utf8mb4_unicode_ci&loc=UTC
+
+DB_MAX_OPEN_CONNS=50
+DB_MAX_IDLE_CONNS=25
+DB_CONN_MAX_LIFETIME=3m
+DB_CONN_MAX_IDLE_TIME=1m
+
+MAX_BODY_BYTES=10485760
+HTTP_READ_TIMEOUT=15s
+HTTP_WRITE_TIMEOUT=30s
+HTTP_IDLE_TIMEOUT=60s
+
+DETECTION_INTERVAL=1m
+OFFLINE_AFTER=10m
+FAILED_LOGON_WINDOW=5m
+FAILED_LOGON_THRESHOLD=25
+REBOOT_WINDOW=15m
+REBOOT_THRESHOLD=3
+PASSWORD_SPRAY_WINDOW=5m
+PASSWORD_SPRAY_MIN_USERS=5
+PASSWORD_SPRAY_MIN_ATTEMPTS=15
+SUCCESS_AFTER_FAILURE_WINDOW=10m
+NEW_SOURCE_IP_LOOKBACK=720h
+NEW_SOURCE_IP_WINDOW=10m
+DETECTIONS_LIMIT=100
+
+MARIADB_DATABASE=eventcollector
+MARIADB_USER=eventuser
+MARIADB_PASSWORD=DEINPASSWORT
+MARIADB_ROOT_PASSWORD=ROOTPASSWORT
+
+GRAFANA_ADMIN_USER=admin
+GRAFANA_ADMIN_PASSWORD=BITTE_AENDERN
+
+ENROLLMENT_KEY=BITTE_SEHR_LANG_UND_ZUFAELLIG
\ No newline at end of file
diff --git a/.gitea/workflows/registry.yml b/.gitea/workflows/registry.yml
new file mode 100644
index 0000000..cfe785d
--- /dev/null
+++ b/.gitea/workflows/registry.yml
@@ -0,0 +1,51 @@
+name: release-tag
+on:
+ push:
+ branches:
+ - 'main'
+jobs:
+ release-image:
+ runs-on: ubuntu-latest
+ env:
+ DOCKER_ORG: sendnrw
+ DOCKER_LATEST: latest
+ RUNNER_TOOL_CACHE: /toolcache
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+
+ - name: Set up QEMU
+ uses: docker/setup-qemu-action@v2
+
+ - name: Set up Docker BuildX
+ uses: docker/setup-buildx-action@v2
+ with: # replace it with your local IP
+ config-inline: |
+ [registry."git.send.nrw"]
+ http = true
+ insecure = true
+
+ - name: Login to DockerHub
+ uses: docker/login-action@v2
+ with:
+ registry: git.send.nrw # replace it with your local IP
+ username: ${{ secrets.DOCKER_USERNAME }}
+ password: ${{ secrets.DOCKER_PASSWORD }}
+
+ - name: Get Meta
+ id: meta
+ run: |
+ echo REPO_NAME=$(echo ${GITHUB_REPOSITORY} | awk -F"/" '{print $2}') >> $GITHUB_OUTPUT
+ echo REPO_VERSION=$(git describe --tags --always | sed 's/^v//') >> $GITHUB_OUTPUT
+
+ - name: Build and push
+ uses: docker/build-push-action@v4
+ with:
+ context: .
+ file: ./Dockerfile
+ platforms: |
+ linux/amd64
+ push: true
+ tags: | # replace it with your local IP and tags
+ git.send.nrw/${{ env.DOCKER_ORG }}/${{ steps.meta.outputs.REPO_NAME }}:${{ steps.meta.outputs.REPO_VERSION }}
+ git.send.nrw/${{ env.DOCKER_ORG }}/${{ steps.meta.outputs.REPO_NAME }}:${{ env.DOCKER_LATEST }}
\ No newline at end of file
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..cbf7396
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,32 @@
+FROM golang:1.24 AS builder
+
+WORKDIR /src
+
+COPY go.mod go.sum ./
+RUN go mod download
+
+COPY . .
+
+RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \
+ go build -trimpath -ldflags="-s -w" -o /out/eventcollector .
+
+FROM debian:trixie-slim
+
+ENV LISTEN_ADDR=:8080
+ENV TZ=Europe/Berlin
+
+RUN apt-get update \
+ && apt-get install -y --no-install-recommends ca-certificates tzdata wget \
+ && rm -rf /var/lib/apt/lists/* \
+ && groupadd --system --gid 10001 app \
+ && useradd --system --uid 10001 --gid 10001 --no-create-home --home-dir /nonexistent --shell /usr/sbin/nologin app
+
+WORKDIR /app
+
+COPY --from=builder /out/eventcollector /app/eventcollector
+
+USER 10001:10001
+
+EXPOSE 8080
+
+ENTRYPOINT ["/app/eventcollector"]
\ No newline at end of file
diff --git a/compose.yml b/compose.yml
new file mode 100644
index 0000000..0c6b2a4
--- /dev/null
+++ b/compose.yml
@@ -0,0 +1,118 @@
+services:
+ mariadb:
+ image: mariadb:11.8
+ container_name: siem-mariadb
+ restart: unless-stopped
+ env_file:
+ - .env
+ environment:
+ MARIADB_DATABASE: ${MARIADB_DATABASE}
+ MARIADB_USER: ${MARIADB_USER}
+ MARIADB_PASSWORD: ${MARIADB_PASSWORD}
+ MARIADB_ROOT_PASSWORD: ${MARIADB_ROOT_PASSWORD}
+ TZ: ${TZ}
+ command:
+ - --character-set-server=utf8mb4
+ - --collation-server=utf8mb4_unicode_ci
+ - --innodb-buffer-pool-size=512M
+ - --max-connections=300
+ volumes:
+ - mariadb_data:/var/lib/mysql
+ - ./deploy/mariadb/init:/docker-entrypoint-initdb.d:ro
+ healthcheck:
+ test: ["CMD-SHELL", "mariadb-admin ping -h 127.0.0.1 -u root -p$$MARIADB_ROOT_PASSWORD --silent"]
+ interval: 20s
+ timeout: 5s
+ retries: 10
+ start_period: 30s
+
+ siem-backend:
+ build:
+ context: .
+ dockerfile: Dockerfile
+ image: siem-backend:latest
+ container_name: siem-backend
+ restart: unless-stopped
+ env_file:
+ - .env
+ environment:
+ LISTEN_ADDR: ${LISTEN_ADDR}
+ DB_DSN: ${DB_DSN}
+ DB_MAX_OPEN_CONNS: ${DB_MAX_OPEN_CONNS}
+ DB_MAX_IDLE_CONNS: ${DB_MAX_IDLE_CONNS}
+ DB_CONN_MAX_LIFETIME: ${DB_CONN_MAX_LIFETIME}
+ DB_CONN_MAX_IDLE_TIME: ${DB_CONN_MAX_IDLE_TIME}
+ MAX_BODY_BYTES: ${MAX_BODY_BYTES}
+ HTTP_READ_TIMEOUT: ${HTTP_READ_TIMEOUT}
+ HTTP_WRITE_TIMEOUT: ${HTTP_WRITE_TIMEOUT}
+ HTTP_IDLE_TIMEOUT: ${HTTP_IDLE_TIMEOUT}
+ DETECTION_INTERVAL: ${DETECTION_INTERVAL}
+ OFFLINE_AFTER: ${OFFLINE_AFTER}
+ FAILED_LOGON_WINDOW: ${FAILED_LOGON_WINDOW}
+ FAILED_LOGON_THRESHOLD: ${FAILED_LOGON_THRESHOLD}
+ REBOOT_WINDOW: ${REBOOT_WINDOW}
+ REBOOT_THRESHOLD: ${REBOOT_THRESHOLD}
+ PASSWORD_SPRAY_WINDOW: ${PASSWORD_SPRAY_WINDOW}
+ PASSWORD_SPRAY_MIN_USERS: ${PASSWORD_SPRAY_MIN_USERS}
+ PASSWORD_SPRAY_MIN_ATTEMPTS: ${PASSWORD_SPRAY_MIN_ATTEMPTS}
+ SUCCESS_AFTER_FAILURE_WINDOW: ${SUCCESS_AFTER_FAILURE_WINDOW}
+ NEW_SOURCE_IP_LOOKBACK: ${NEW_SOURCE_IP_LOOKBACK}
+ NEW_SOURCE_IP_WINDOW: ${NEW_SOURCE_IP_WINDOW}
+ DETECTIONS_LIMIT: ${DETECTIONS_LIMIT}
+ TZ: ${TZ}
+ depends_on:
+ mariadb:
+ condition: service_healthy
+ ports:
+ - "8080:8080"
+ healthcheck:
+ test: ["CMD-SHELL", "wget -qO- http://127.0.0.1:8080/healthz >/dev/null 2>&1 || exit 1"]
+ interval: 30s
+ timeout: 5s
+ retries: 5
+ start_period: 20s
+
+ prometheus:
+ image: prom/prometheus:latest
+ container_name: siem-prometheus
+ restart: unless-stopped
+ command:
+ - --config.file=/etc/prometheus/prometheus.yml
+ - --storage.tsdb.path=/prometheus
+ - --storage.tsdb.retention.time=30d
+ - --web.enable-lifecycle
+ depends_on:
+ siem-backend:
+ condition: service_healthy
+ volumes:
+ - ./deploy/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro
+ - ./deploy/prometheus/rules:/etc/prometheus/rules:ro
+ - prometheus_data:/prometheus
+ ports:
+ - "9090:9090"
+
+ grafana:
+ image: grafana/grafana:latest
+ container_name: siem-grafana
+ restart: unless-stopped
+ env_file:
+ - .env
+ environment:
+ GF_SECURITY_ADMIN_USER: ${GRAFANA_ADMIN_USER}
+ GF_SECURITY_ADMIN_PASSWORD: ${GRAFANA_ADMIN_PASSWORD}
+ GF_USERS_ALLOW_SIGN_UP: "false"
+ GF_SERVER_ROOT_URL: http://localhost:3000
+ TZ: ${TZ}
+ depends_on:
+ - prometheus
+ volumes:
+ - grafana_data:/var/lib/grafana
+ - ./deploy/grafana/provisioning:/etc/grafana/provisioning:ro
+ - ./deploy/grafana/dashboards:/var/lib/grafana/dashboards:ro
+ ports:
+ - "3000:3000"
+
+volumes:
+ mariadb_data:
+ prometheus_data:
+ grafana_data:
\ No newline at end of file
diff --git a/deploy/grafana/provisioning/dashboards/dashboards.yml b/deploy/grafana/provisioning/dashboards/dashboards.yml
new file mode 100644
index 0000000..e95f332
--- /dev/null
+++ b/deploy/grafana/provisioning/dashboards/dashboards.yml
@@ -0,0 +1,12 @@
+apiVersion: 1
+
+providers:
+ - name: SIEM Dashboards
+ orgId: 1
+ folder: SIEM
+ type: file
+ disableDeletion: false
+ editable: true
+ updateIntervalSeconds: 30
+ options:
+ path: /var/lib/grafana/dashboards
\ No newline at end of file
diff --git a/deploy/grafana/provisioning/dashboards/siem-overview.json b/deploy/grafana/provisioning/dashboards/siem-overview.json
new file mode 100644
index 0000000..433b4f5
--- /dev/null
+++ b/deploy/grafana/provisioning/dashboards/siem-overview.json
@@ -0,0 +1,76 @@
+{
+ "annotations": {
+ "list": []
+ },
+ "editable": true,
+ "panels": [
+ {
+ "type": "stat",
+ "title": "Active Agents",
+ "gridPos": { "h": 4, "w": 6, "x": 0, "y": 0 },
+ "targets": [
+ {
+ "expr": "eventcollector_active_agents",
+ "refId": "A"
+ }
+ ]
+ },
+ {
+ "type": "stat",
+ "title": "High Detections (5m)",
+ "gridPos": { "h": 4, "w": 6, "x": 6, "y": 0 },
+ "targets": [
+ {
+ "expr": "increase(eventcollector_detection_hits_total{severity=\"high\"}[5m])",
+ "refId": "A"
+ }
+ ]
+ },
+ {
+ "type": "timeseries",
+ "title": "HTTP Requests",
+ "gridPos": { "h": 8, "w": 12, "x": 0, "y": 4 },
+ "targets": [
+ {
+ "expr": "rate(eventcollector_http_requests_total[5m])",
+ "legendFormat": "{{path}} {{status}}",
+ "refId": "A"
+ }
+ ]
+ },
+ {
+ "type": "timeseries",
+ "title": "Detection Hits",
+ "gridPos": { "h": 8, "w": 12, "x": 12, "y": 4 },
+ "targets": [
+ {
+ "expr": "increase(eventcollector_detection_hits_total[5m])",
+ "legendFormat": "{{rule}} {{severity}}",
+ "refId": "A"
+ }
+ ]
+ },
+ {
+ "type": "timeseries",
+ "title": "Ingested Events",
+ "gridPos": { "h": 8, "w": 24, "x": 0, "y": 12 },
+ "targets": [
+ {
+ "expr": "rate(eventcollector_ingest_events_total[5m])",
+ "legendFormat": "{{channel}} {{event_id}}",
+ "refId": "A"
+ }
+ ]
+ }
+ ],
+ "schemaVersion": 39,
+ "style": "dark",
+ "tags": ["siem"],
+ "templating": { "list": [] },
+ "time": {
+ "from": "now-6h",
+ "to": "now"
+ },
+ "title": "SIEM Overview",
+ "version": 1
+}
\ No newline at end of file
diff --git a/deploy/grafana/provisioning/datasources/datasource.yml b/deploy/grafana/provisioning/datasources/datasource.yml
new file mode 100644
index 0000000..fe7fa20
--- /dev/null
+++ b/deploy/grafana/provisioning/datasources/datasource.yml
@@ -0,0 +1,10 @@
+apiVersion: 1
+
+datasources:
+ - name: Prometheus
+ uid: prometheus
+ type: prometheus
+ access: proxy
+ url: http://prometheus:9090
+ isDefault: true
+ editable: true
\ No newline at end of file
diff --git a/deploy/mariadb/init/001-schema.sql b/deploy/mariadb/init/001-schema.sql
new file mode 100644
index 0000000..2789d7e
--- /dev/null
+++ b/deploy/mariadb/init/001-schema.sql
@@ -0,0 +1,88 @@
+CREATE DATABASE IF NOT EXISTS eventcollector
+ CHARACTER SET utf8mb4
+ COLLATE utf8mb4_unicode_ci;
+
+USE eventcollector;
+
+CREATE TABLE IF NOT EXISTS agents (
+ id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
+ hostname VARCHAR(255) NOT NULL,
+ api_key_hash CHAR(64) NOT NULL,
+ first_seen DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+ last_seen DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+ last_ip VARCHAR(64) NOT NULL DEFAULT '',
+ is_enabled TINYINT(1) NOT NULL DEFAULT 1,
+ PRIMARY KEY (id),
+ UNIQUE KEY ux_agents_hostname (hostname),
+ KEY ix_agents_last_seen (last_seen)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
+
+CREATE TABLE IF NOT EXISTS event_logs (
+ id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
+ agent_id BIGINT UNSIGNED NOT NULL,
+ hostname VARCHAR(255) NOT NULL,
+ channel_name VARCHAR(128) NOT NULL,
+ event_id INT UNSIGNED NOT NULL,
+ source VARCHAR(255) NOT NULL,
+ computer VARCHAR(255) NOT NULL DEFAULT '',
+ provider_name VARCHAR(255) NOT NULL DEFAULT '',
+ level_value INT UNSIGNED NOT NULL DEFAULT 0,
+ task_value INT UNSIGNED NOT NULL DEFAULT 0,
+ opcode_value INT UNSIGNED NOT NULL DEFAULT 0,
+ keywords VARCHAR(255) NOT NULL DEFAULT '',
+ target_user VARCHAR(255) NOT NULL DEFAULT '',
+ target_domain VARCHAR(255) NOT NULL DEFAULT '',
+ subject_user VARCHAR(255) NOT NULL DEFAULT '',
+ subject_domain VARCHAR(255) NOT NULL DEFAULT '',
+ workstation VARCHAR(255) NOT NULL DEFAULT '',
+ src_ip VARCHAR(64) NOT NULL DEFAULT '',
+ src_port VARCHAR(32) NOT NULL DEFAULT '',
+ logon_type VARCHAR(32) NOT NULL DEFAULT '',
+ process_name VARCHAR(512) NOT NULL DEFAULT '',
+ authentication_package VARCHAR(128) NOT NULL DEFAULT '',
+ logon_process VARCHAR(128) NOT NULL DEFAULT '',
+ status_text VARCHAR(64) NOT NULL DEFAULT '',
+ sub_status_text VARCHAR(64) NOT NULL DEFAULT '',
+ failure_reason VARCHAR(512) NOT NULL DEFAULT '',
+ ts DATETIME(6) NOT NULL,
+ received_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+ msg LONGTEXT NOT NULL,
+ msg_sha256 CHAR(64) NOT NULL,
+ PRIMARY KEY (id),
+ KEY ix_event_logs_ts (ts),
+ KEY ix_event_logs_received_at (received_at),
+ KEY ix_event_logs_agent_ts (agent_id, ts),
+ KEY ix_event_logs_eventid_ts (event_id, ts),
+ KEY ix_event_logs_hostname_ts (hostname, ts),
+ KEY ix_event_logs_channel_event_ts (channel_name, event_id, ts),
+ KEY ix_event_logs_target_user_ts (target_user, ts),
+ KEY ix_event_logs_src_ip_ts (src_ip, ts),
+ KEY ix_event_logs_target_user_src_ip_ts (target_user, src_ip, ts),
+ KEY ix_event_logs_eventid_srcip_ts (event_id, src_ip, ts),
+ KEY ix_event_logs_eventid_targetuser_ts (event_id, target_user, ts),
+ KEY ix_event_logs_eventid_logontype_ts (event_id, logon_type, ts),
+ CONSTRAINT fk_event_logs_agent
+ FOREIGN KEY (agent_id) REFERENCES agents(id)
+ ON DELETE RESTRICT
+ ON UPDATE RESTRICT
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
+
+CREATE TABLE IF NOT EXISTS detections (
+ id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
+ rule_name VARCHAR(128) NOT NULL,
+ severity VARCHAR(32) NOT NULL,
+ hostname VARCHAR(255) NOT NULL,
+ channel_name VARCHAR(128) NOT NULL DEFAULT '',
+ event_id INT UNSIGNED NOT NULL DEFAULT 0,
+ score DOUBLE NOT NULL DEFAULT 0,
+ window_start DATETIME(6) NOT NULL,
+ window_end DATETIME(6) NOT NULL,
+ summary VARCHAR(512) NOT NULL,
+ details_json JSON NOT NULL,
+ created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+ PRIMARY KEY (id),
+ UNIQUE KEY ux_detection_dedupe (rule_name, hostname, channel_name, event_id, window_start, window_end),
+ KEY ix_detections_created (created_at),
+ KEY ix_detections_rule_host_time (rule_name, hostname, created_at),
+ KEY ix_detections_severity_time (severity, created_at)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
\ No newline at end of file
diff --git a/deploy/prometheus/prometheus.yml b/deploy/prometheus/prometheus.yml
new file mode 100644
index 0000000..8f80b65
--- /dev/null
+++ b/deploy/prometheus/prometheus.yml
@@ -0,0 +1,18 @@
+global:
+ scrape_interval: 15s
+ evaluation_interval: 15s
+
+rule_files:
+ - /etc/prometheus/rules/*.yml
+
+scrape_configs:
+ - job_name: siem-backend
+ metrics_path: /metrics
+ static_configs:
+ - targets:
+ - siem-backend:8080
+
+ - job_name: prometheus
+ static_configs:
+ - targets:
+ - localhost:9090
\ No newline at end of file
diff --git a/deploy/prometheus/rules/siem-alerts.yml b/deploy/prometheus/rules/siem-alerts.yml
new file mode 100644
index 0000000..f9c7fa6
--- /dev/null
+++ b/deploy/prometheus/rules/siem-alerts.yml
@@ -0,0 +1,38 @@
+groups:
+ - name: siem-backend
+ rules:
+ - alert: SiemBackendDown
+ expr: up{job="siem-backend"} == 0
+ for: 2m
+ labels:
+ severity: critical
+ annotations:
+ summary: "SIEM backend nicht erreichbar"
+ description: "Prometheus kann das SIEM-Backend seit mindestens 2 Minuten nicht scrapen."
+
+ - alert: SiemHighDetections
+ expr: increase(eventcollector_detection_hits_total{severity="high"}[5m]) > 0
+ for: 1m
+ labels:
+ severity: high
+ annotations:
+ summary: "Neue High-Severity Detection"
+ description: "Es wurde mindestens eine neue High-Severity-Detection in den letzten 5 Minuten erzeugt."
+
+ - alert: SiemRuleErrors
+ expr: increase(eventcollector_rule_errors_total[5m]) > 0
+ for: 1m
+ labels:
+ severity: warning
+ annotations:
+ summary: "Fehler in Detection-Regeln"
+ description: "Mindestens eine Detection-Regel hat in den letzten 5 Minuten einen Fehler erzeugt."
+
+ - alert: SiemTooFewActiveAgents
+ expr: eventcollector_active_agents < 1
+ for: 5m
+ labels:
+ severity: warning
+ annotations:
+ summary: "Zu wenige aktive Agents"
+ description: "Es wurden weniger aktive Agents erkannt als erwartet."
\ No newline at end of file
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..1a2f53c
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,22 @@
+module git.send.nrw/sendnrw/siem-backend
+
+go 1.26.1
+
+require github.com/go-sql-driver/mysql v1.9.3
+
+require (
+ github.com/beorn7/perks v1.0.1 // indirect
+ github.com/cespare/xxhash/v2 v2.3.0 // indirect
+ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
+ github.com/prometheus/client_model v0.6.2 // indirect
+ github.com/prometheus/common v0.66.1 // indirect
+ github.com/prometheus/procfs v0.16.1 // indirect
+ go.yaml.in/yaml/v2 v2.4.2 // indirect
+ golang.org/x/sys v0.35.0 // indirect
+ google.golang.org/protobuf v1.36.8 // indirect
+)
+
+require (
+ filippo.io/edwards25519 v1.1.0 // indirect
+ github.com/prometheus/client_golang v1.23.2
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..2163bbe
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,25 @@
+filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
+filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
+github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
+github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo=
+github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU=
+github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
+github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
+github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
+github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
+github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
+github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
+github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs=
+github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
+github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
+github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
+go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
+go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
+golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
+golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
+google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
+google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..5ec8b66
--- /dev/null
+++ b/main.go
@@ -0,0 +1,2194 @@
+package main
+
+import (
+ "context"
+ "crypto/sha256"
+ "database/sql"
+ "encoding/hex"
+ "encoding/json"
+ "encoding/xml"
+ "errors"
+ "fmt"
+ "html/template"
+ "io"
+ "log"
+ "math"
+ "net"
+ "net/http"
+ "net/url"
+ "os"
+ "os/signal"
+ "strconv"
+ "strings"
+ "syscall"
+ "time"
+
+ _ "github.com/go-sql-driver/mysql"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+const uiTemplates = `
+{{define "header"}}
+
+
+
+
+
+ {{.Title}}
+
+
+
+
+
+{{end}}
+
+{{define "footer"}}
+
+
+
+{{end}}
+
+{{define "dashboard"}}
+{{template "header" .}}
+{{.Title}}
+Stand: {{fmtTime .Now}}
+
+
Agents gesamt
{{.Stats.AgentsTotal}}
+
Agents aktiv
{{.Stats.AgentsActive}}
+
Events 24h
{{.Stats.Events24h}}
+
Detections 24h
{{.Stats.Detections24h}}
+
High Detections 24h
{{.Stats.HighDetections24h}}
+
+
+Neueste Detections
+
+ | Zeit | Rule | Severity | Host | Zusammenfassung |
+ {{range .RecentDetections}}
+
+ | {{fmtTime .CreatedAt}} |
+ {{.RuleName}} |
+ {{.Severity}} |
+ {{.Hostname}} |
+ {{.Summary}} |
+
+ {{end}}
+
+
+Neueste Events
+
+ | Zeit | Host | Channel | EventID | User | IP | Nachricht |
+ {{range .RecentEvents}}
+
+ | {{fmtTime .Time}} |
+ {{.Hostname}} |
+ {{.Channel}} |
+ {{.EventID}} |
+ {{if .TargetUser}}{{.TargetUser}}{{else}}{{.SubjectUser}}{{end}} |
+ {{.SrcIP}} |
+ {{short .Message 120}} |
+
+ {{end}}
+
+{{template "footer" .}}
+{{end}}
+
+{{define "detections"}}
+{{template "header" .}}
+{{.Title}}
+
+
+ | Zeit | Rule | Severity | Host | Score | Summary | Events |
+ {{range .Detections}}
+
+ | {{fmtTime .CreatedAt}} |
+ {{.RuleName}} |
+ {{.Severity}} |
+ {{.Hostname}} |
+ {{printf "%.2f" .Score}} |
+ {{.Summary}} |
+ anzeigen |
+
+ {{end}}
+
+{{template "footer" .}}
+{{end}}
+
+{{define "events"}}
+{{template "header" .}}
+{{.Title}}
+
+
+
+ | Zeit | Host | Channel | EventID | Target User | Subject User | IP | Workstation | Detail |
+
+ {{range .Events}}
+
+ | {{fmtTime .Time}} |
+ {{.Hostname}} |
+ {{.Channel}} |
+ {{.EventID}} |
+ {{.TargetUser}} |
+ {{.SubjectUser}} |
+ {{.SrcIP}} |
+ {{.Workstation}} |
+ öffnen |
+
+ {{end}}
+
+{{template "footer" .}}
+{{end}}
+
+{{define "event_detail"}}
+{{template "header" .}}
+{{.Title}}
+
+
+
Channel{{.Event.Channel}}
+
EventID{{.Event.EventID}}
+
Zeit{{fmtTime .Event.Time}}
+
Target User{{.Event.TargetUser}}
+
Subject User{{.Event.SubjectUser}}
+
Source IP{{.Event.SrcIP}}
+
Workstation{{.Event.Workstation}}
+
Logon Type{{.Event.LogonType}}
+
Process{{.Event.ProcessName}}
+
Status{{.Event.StatusText}}
+
SubStatus{{.Event.SubStatusText}}
+
+
+Rohes Event XML
+{{.Event.Message}}
+{{template "footer" .}}
+{{end}}
+`
+
+type Config struct {
+ ListenAddr string
+ DBDSN string
+ MaxBodyBytes int64
+ HTTPReadTimeout time.Duration
+ HTTPWriteTimeout time.Duration
+ HTTPIdleTimeout time.Duration
+ DBMaxOpenConns int
+ DBMaxIdleConns int
+ DBConnMaxLifetime time.Duration
+ DBConnMaxIdleTime time.Duration
+ DetectionInterval time.Duration
+ OfflineAfter time.Duration
+ FailedLogonWindow time.Duration
+ FailedLogonThreshold int
+ RebootWindow time.Duration
+ RebootThreshold int
+ PasswordSprayWindow time.Duration
+ PasswordSprayMinUsers int
+ PasswordSprayMinAttempts int
+ SuccessAfterFailureWindow time.Duration
+ NewSourceIPLookback time.Duration
+ NewSourceIPWindow time.Duration
+ DetectionsLimit int
+
+ EnrollmentKey string
+}
+
+type LogPayload struct {
+ Hostname string `json:"host"`
+ Channel string `json:"channel"`
+ EventID uint32 `json:"id"`
+ Source string `json:"source"`
+ Time time.Time `json:"ts"`
+ Message string `json:"msg"`
+}
+
+type NormalizedEvent struct {
+ Computer string
+ ProviderName string
+ LevelValue uint32
+ TaskValue uint32
+ OpcodeValue uint32
+ Keywords string
+ TargetUser string
+ TargetDomain string
+ SubjectUser string
+ SubjectDomain string
+ Workstation string
+ SrcIP string
+ SrcPort string
+ LogonType string
+ ProcessName string
+ AuthenticationPackage string
+ LogonProcess string
+ StatusText string
+ SubStatusText string
+ FailureReason string
+}
+
+type Detection struct {
+ ID uint64 `json:"id"`
+ RuleName string `json:"rule_name"`
+ Severity string `json:"severity"`
+ Hostname string `json:"hostname"`
+ Channel string `json:"channel"`
+ EventID uint32 `json:"event_id"`
+ Score float64 `json:"score"`
+ WindowStart time.Time `json:"window_start"`
+ WindowEnd time.Time `json:"window_end"`
+ Summary string `json:"summary"`
+ Details json.RawMessage `json:"details_json"`
+ CreatedAt time.Time `json:"created_at"`
+}
+
+type ingestResponse struct {
+ Accepted int `json:"accepted"`
+}
+
+type server struct {
+ db *sql.DB
+ logger *log.Logger
+ cfg Config
+ registry *prometheus.Registry
+ detector *detector
+ startTime time.Time
+ templates *template.Template
+}
+
+type detector struct {
+ db *sql.DB
+ cfg Config
+ logger *log.Logger
+
+ lastSeenGauge *prometheus.GaugeVec
+ activeAgentsGauge prometheus.Gauge
+ anomalyScoreGauge *prometheus.GaugeVec
+ detectionHitsTotal *prometheus.CounterVec
+ ruleLastRunGauge *prometheus.GaugeVec
+ ruleRuntimeHist *prometheus.HistogramVec
+ ruleErrorsTotal *prometheus.CounterVec
+}
+
+type EventRow struct {
+ ID uint64
+ Hostname string
+ Channel string
+ EventID uint32
+ Source string
+ Computer string
+ ProviderName string
+ TargetUser string
+ TargetDomain string
+ SubjectUser string
+ SubjectDomain string
+ Workstation string
+ SrcIP string
+ SrcPort string
+ LogonType string
+ ProcessName string
+ AuthenticationPackage string
+ LogonProcess string
+ StatusText string
+ SubStatusText string
+ FailureReason string
+ Time time.Time
+ ReceivedAt time.Time
+ Message string
+}
+
+type DashboardStats struct {
+ AgentsTotal int
+ AgentsActive int
+ Events24h int64
+ Detections24h int64
+ HighDetections24h int64
+}
+
+type DashboardPageData struct {
+ Title string
+ Now time.Time
+ Stats DashboardStats
+ RecentDetections []Detection
+ RecentEvents []EventRow
+}
+
+type DetectionListPageData struct {
+ Title string
+ Now time.Time
+ Filters map[string]string
+ Detections []Detection
+}
+
+type EventListPageData struct {
+ Title string
+ Now time.Time
+ Filters map[string]string
+ Events []EventRow
+}
+
+type EventDetailPageData struct {
+ Title string
+ Now time.Time
+ Event EventRow
+}
+
+var (
+ httpRequestsTotal = prometheus.NewCounterVec(
+ prometheus.CounterOpts{Name: "eventcollector_http_requests_total", Help: "Total HTTP requests."},
+ []string{"path", "method", "status"},
+ )
+ httpRequestDuration = prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{Name: "eventcollector_http_request_duration_seconds", Help: "HTTP request latency.", Buckets: prometheus.DefBuckets},
+ []string{"path", "method", "status"},
+ )
+ ingestBatchesTotal = prometheus.NewCounter(
+ prometheus.CounterOpts{Name: "eventcollector_ingest_batches_total", Help: "Total ingested batches."},
+ )
+ ingestEventsTotal = prometheus.NewCounterVec(
+ prometheus.CounterOpts{Name: "eventcollector_ingest_events_total", Help: "Total ingested events."},
+ []string{"channel", "event_id"},
+ )
+ ingestRejectedTotal = prometheus.NewCounterVec(
+ prometheus.CounterOpts{Name: "eventcollector_ingest_rejected_total", Help: "Rejected ingest requests."},
+ []string{"reason"},
+ )
+ dbInsertEventsTotal = prometheus.NewCounter(
+ prometheus.CounterOpts{Name: "eventcollector_db_insert_events_total", Help: "Total inserted events into database."},
+ )
+ dbInsertFailuresTotal = prometheus.NewCounter(
+ prometheus.CounterOpts{Name: "eventcollector_db_insert_failures_total", Help: "Failed database insert operations."},
+ )
+ dbBatchSizeHist = prometheus.NewHistogram(
+ prometheus.HistogramOpts{Name: "eventcollector_db_batch_size", Help: "Batch sizes written to database.", Buckets: []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000}},
+ )
+ dbTxDurationHist = prometheus.NewHistogram(
+ prometheus.HistogramOpts{Name: "eventcollector_db_tx_duration_seconds", Help: "Database transaction duration.", Buckets: prometheus.DefBuckets},
+ )
+)
+
+func main() {
+ cfg := loadConfig()
+ logger := log.New(os.Stdout, "", log.LstdFlags|log.Lmicroseconds|log.LUTC)
+
+ db, err := sql.Open("mysql", cfg.DBDSN)
+ if err != nil {
+ logger.Fatalf("sql.Open: %v", err)
+ }
+ db.SetMaxOpenConns(cfg.DBMaxOpenConns)
+ db.SetMaxIdleConns(cfg.DBMaxIdleConns)
+ db.SetConnMaxLifetime(cfg.DBConnMaxLifetime)
+ db.SetConnMaxIdleTime(cfg.DBConnMaxIdleTime)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ if err := db.PingContext(ctx); err != nil {
+ logger.Fatalf("db.PingContext: %v", err)
+ }
+
+ reg := prometheus.NewRegistry()
+ reg.MustRegister(
+ httpRequestsTotal,
+ httpRequestDuration,
+ ingestBatchesTotal,
+ ingestEventsTotal,
+ ingestRejectedTotal,
+ dbInsertEventsTotal,
+ dbInsertFailuresTotal,
+ dbBatchSizeHist,
+ dbTxDurationHist,
+ )
+
+ d := &detector{
+ db: db,
+ cfg: cfg,
+ logger: logger,
+ lastSeenGauge: prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{Name: "eventcollector_agent_last_seen_unixtime", Help: "Unix time when agent was last seen."},
+ []string{"host"},
+ ),
+ activeAgentsGauge: prometheus.NewGauge(
+ prometheus.GaugeOpts{Name: "eventcollector_active_agents", Help: "Number of active agents seen within offline threshold."},
+ ),
+ anomalyScoreGauge: prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{Name: "eventcollector_anomaly_score", Help: "Current anomaly score per host and rule."},
+ []string{"host", "rule"},
+ ),
+ detectionHitsTotal: prometheus.NewCounterVec(
+ prometheus.CounterOpts{Name: "eventcollector_detection_hits_total", Help: "Total number of detections per rule and severity."},
+ []string{"rule", "severity"},
+ ),
+ ruleLastRunGauge: prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{Name: "eventcollector_rule_last_run_unixtime", Help: "Unix time of the last successful rule run."},
+ []string{"rule"},
+ ),
+ ruleRuntimeHist: prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{Name: "eventcollector_rule_runtime_seconds", Help: "Rule runtime duration.", Buckets: prometheus.DefBuckets},
+ []string{"rule"},
+ ),
+ ruleErrorsTotal: prometheus.NewCounterVec(
+ prometheus.CounterOpts{Name: "eventcollector_rule_errors_total", Help: "Rule execution errors."},
+ []string{"rule"},
+ ),
+ }
+ reg.MustRegister(
+ d.lastSeenGauge,
+ d.activeAgentsGauge,
+ d.anomalyScoreGauge,
+ d.detectionHitsTotal,
+ d.ruleLastRunGauge,
+ d.ruleRuntimeHist,
+ d.ruleErrorsTotal,
+ )
+
+ s := &server{
+ db: db,
+ logger: logger,
+ cfg: cfg,
+ registry: reg,
+ detector: d,
+ startTime: time.Now().UTC(),
+ }
+
+ tmpl := template.Must(template.New("ui").Funcs(template.FuncMap{
+ "q": url.QueryEscape,
+ "fmtTime": func(t time.Time) string {
+ if t.IsZero() {
+ return ""
+ }
+ return t.Local().Format("2006-01-02 15:04:05")
+ },
+ "short": func(s string, n int) string {
+ if len(s) <= n {
+ return s
+ }
+ return s[:n] + "..."
+ },
+ }).Parse(uiTemplates))
+
+ s.templates = tmpl
+
+ go s.runDetectionLoop()
+
+ mux := http.NewServeMux()
+ mux.HandleFunc("/healthz", s.handleHealthz)
+ mux.HandleFunc("/readyz", s.handleReadyz)
+ mux.HandleFunc("/ingest", s.handleIngest)
+ mux.HandleFunc("/detections", s.handleDetections)
+ mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
+ mux.HandleFunc("/ui", s.handleUIIndex)
+ mux.HandleFunc("/ui/detections", s.handleUIDetections)
+ mux.HandleFunc("/ui/events", s.handleUIEvents)
+ mux.HandleFunc("/ui/event", s.handleUIEventDetail)
+
+ httpSrv := &http.Server{
+ Addr: cfg.ListenAddr,
+ Handler: metricsMiddleware(logger, recoveryMiddleware(mux)),
+ ReadTimeout: cfg.HTTPReadTimeout,
+ WriteTimeout: cfg.HTTPWriteTimeout,
+ IdleTimeout: cfg.HTTPIdleTimeout,
+ }
+
+ go func() {
+ logger.Printf("listening on %s", cfg.ListenAddr)
+ if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
+ logger.Fatalf("ListenAndServe: %v", err)
+ }
+ }()
+
+ stop := make(chan os.Signal, 1)
+ signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
+ <-stop
+
+ logger.Println("shutdown requested")
+
+ shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
+ defer shutdownCancel()
+
+ if err := httpSrv.Shutdown(shutdownCtx); err != nil {
+ logger.Printf("http shutdown error: %v", err)
+ }
+ if err := db.Close(); err != nil {
+ logger.Printf("db close error: %v", err)
+ }
+}
+
+func (s *server) handleUIIndex(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ return
+ }
+
+ ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
+ defer cancel()
+
+ stats, err := s.getDashboardStats(ctx)
+ if err != nil {
+ s.logger.Printf("dashboard stats: %v", err)
+ writeError(w, http.StatusInternalServerError, "internal error")
+ return
+ }
+
+ dets, err := s.listDetections(ctx, "", "", "", 20)
+ if err != nil {
+ s.logger.Printf("dashboard detections: %v", err)
+ writeError(w, http.StatusInternalServerError, "internal error")
+ return
+ }
+
+ events, err := s.listEvents(ctx, EventFilter{Limit: 20})
+ if err != nil {
+ s.logger.Printf("dashboard events: %v", err)
+ writeError(w, http.StatusInternalServerError, "internal error")
+ return
+ }
+
+ data := DashboardPageData{
+ Title: "SIEM-lite Dashboard",
+ Now: time.Now(),
+ Stats: stats,
+ RecentDetections: dets,
+ RecentEvents: events,
+ }
+
+ s.renderTemplate(w, "dashboard", data)
+}
+
+func (s *server) handleUIDetections(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ return
+ }
+
+ filters := map[string]string{
+ "host": strings.TrimSpace(r.URL.Query().Get("host")),
+ "rule": strings.TrimSpace(r.URL.Query().Get("rule")),
+ "severity": strings.TrimSpace(r.URL.Query().Get("severity")),
+ "limit": strings.TrimSpace(r.URL.Query().Get("limit")),
+ }
+
+ limit := s.cfg.DetectionsLimit
+ if filters["limit"] != "" {
+ if n, err := strconv.Atoi(filters["limit"]); err == nil && n > 0 && n <= 500 {
+ limit = n
+ }
+ }
+
+ ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
+ defer cancel()
+
+ items, err := s.listDetections(ctx, filters["host"], filters["rule"], filters["severity"], limit)
+ if err != nil {
+ s.logger.Printf("ui detections: %v", err)
+ writeError(w, http.StatusInternalServerError, "internal error")
+ return
+ }
+
+ data := DetectionListPageData{
+ Title: "Detections",
+ Now: time.Now(),
+ Filters: filters,
+ Detections: items,
+ }
+
+ s.renderTemplate(w, "detections", data)
+}
+
+func (s *server) handleUIEvents(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ return
+ }
+
+ filter := EventFilter{
+ Host: strings.TrimSpace(r.URL.Query().Get("host")),
+ Channel: strings.TrimSpace(r.URL.Query().Get("channel")),
+ Rule: strings.TrimSpace(r.URL.Query().Get("rule")),
+ User: strings.TrimSpace(r.URL.Query().Get("user")),
+ SrcIP: strings.TrimSpace(r.URL.Query().Get("src_ip")),
+ Severity: strings.TrimSpace(r.URL.Query().Get("severity")),
+ TimeFrom: strings.TrimSpace(r.URL.Query().Get("from")),
+ TimeTo: strings.TrimSpace(r.URL.Query().Get("to")),
+ Limit: 100,
+ }
+
+ if v := strings.TrimSpace(r.URL.Query().Get("event_id")); v != "" {
+ if n, err := strconv.Atoi(v); err == nil && n > 0 {
+ filter.EventID = uint32(n)
+ }
+ }
+
+ if v := strings.TrimSpace(r.URL.Query().Get("limit")); v != "" {
+ if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 1000 {
+ filter.Limit = n
+ }
+ }
+
+ ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
+ defer cancel()
+
+ events, err := s.listEvents(ctx, filter)
+ if err != nil {
+ s.logger.Printf("ui events: %v", err)
+ writeError(w, http.StatusInternalServerError, "internal error")
+ return
+ }
+
+ filters := map[string]string{
+ "host": filter.Host,
+ "channel": filter.Channel,
+ "rule": filter.Rule,
+ "user": filter.User,
+ "src_ip": filter.SrcIP,
+ "severity": filter.Severity,
+ "from": filter.TimeFrom,
+ "to": filter.TimeTo,
+ "limit": strconv.Itoa(filter.Limit),
+ "event_id": func() string {
+ if filter.EventID == 0 {
+ return ""
+ }
+ return strconv.Itoa(int(filter.EventID))
+ }(),
+ }
+
+ data := EventListPageData{
+ Title: "Events",
+ Now: time.Now(),
+ Filters: filters,
+ Events: events,
+ }
+
+ s.renderTemplate(w, "events", data)
+}
+
+func (s *server) handleUIEventDetail(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ return
+ }
+
+ idStr := strings.TrimSpace(r.URL.Query().Get("id"))
+ if idStr == "" {
+ writeError(w, http.StatusBadRequest, "missing id")
+ return
+ }
+
+ id, err := strconv.ParseUint(idStr, 10, 64)
+ if err != nil {
+ writeError(w, http.StatusBadRequest, "invalid id")
+ return
+ }
+
+ ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
+ defer cancel()
+
+ ev, err := s.getEventByID(ctx, id)
+ if err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ http.NotFound(w, r)
+ return
+ }
+ s.logger.Printf("ui event detail: %v", err)
+ writeError(w, http.StatusInternalServerError, "internal error")
+ return
+ }
+
+ data := EventDetailPageData{
+ Title: "Event Detail",
+ Now: time.Now(),
+ Event: ev,
+ }
+ s.renderTemplate(w, "event_detail", data)
+}
+
+func (s *server) renderTemplate(w http.ResponseWriter, name string, data any) {
+ w.Header().Set("Content-Type", "text/html; charset=utf-8")
+ if err := s.templates.ExecuteTemplate(w, name, data); err != nil {
+ s.logger.Printf("render template %s: %v", name, err)
+ http.Error(w, "template error", http.StatusInternalServerError)
+ }
+}
+
+type EventFilter struct {
+ Host string
+ Channel string
+ Rule string
+ User string
+ SrcIP string
+ Severity string
+ TimeFrom string
+ TimeTo string
+ EventID uint32
+ Limit int
+}
+
+func (s *server) getDashboardStats(ctx context.Context) (DashboardStats, error) {
+ var stats DashboardStats
+
+ if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM agents WHERE is_enabled = 1`).Scan(&stats.AgentsTotal); err != nil {
+ return stats, err
+ }
+ if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM agents WHERE is_enabled = 1 AND last_seen >= ?`, time.Now().UTC().Add(-s.cfg.OfflineAfter)).Scan(&stats.AgentsActive); err != nil {
+ return stats, err
+ }
+ if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM event_logs WHERE ts >= ?`, time.Now().UTC().Add(-24*time.Hour)).Scan(&stats.Events24h); err != nil {
+ return stats, err
+ }
+ if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM detections WHERE created_at >= ?`, time.Now().UTC().Add(-24*time.Hour)).Scan(&stats.Detections24h); err != nil {
+ return stats, err
+ }
+ if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM detections WHERE created_at >= ? AND severity = 'high'`, time.Now().UTC().Add(-24*time.Hour)).Scan(&stats.HighDetections24h); err != nil {
+ return stats, err
+ }
+
+ return stats, nil
+}
+
+func (s *server) listEvents(ctx context.Context, f EventFilter) ([]EventRow, error) {
+ if f.Limit <= 0 || f.Limit > 1000 {
+ f.Limit = 100
+ }
+
+ query := `
+SELECT id, hostname, channel_name, event_id, source, computer, provider_name,
+ target_user, target_domain, subject_user, subject_domain,
+ workstation, src_ip, src_port, logon_type, process_name,
+ authentication_package, logon_process, status_text, sub_status_text,
+ failure_reason, ts, received_at, msg
+FROM event_logs
+WHERE 1=1
+`
+
+ args := make([]any, 0, 16)
+
+ if f.Host != "" {
+ query += ` AND hostname = ?`
+ args = append(args, f.Host)
+ }
+ if f.Channel != "" {
+ query += ` AND channel_name = ?`
+ args = append(args, f.Channel)
+ }
+ if f.EventID != 0 {
+ query += ` AND event_id = ?`
+ args = append(args, f.EventID)
+ }
+ if f.User != "" {
+ query += ` AND (target_user = ? OR subject_user = ?)`
+ args = append(args, f.User, f.User)
+ }
+ if f.SrcIP != "" {
+ query += ` AND src_ip = ?`
+ args = append(args, f.SrcIP)
+ }
+ if f.TimeFrom != "" {
+ if t, err := parseUIRFC3339(f.TimeFrom); err == nil {
+ query += ` AND ts >= ?`
+ args = append(args, t)
+ }
+ }
+ if f.TimeTo != "" {
+ if t, err := parseUIRFC3339(f.TimeTo); err == nil {
+ query += ` AND ts <= ?`
+ args = append(args, t)
+ }
+ }
+
+ if f.Rule != "" || f.Severity != "" {
+ query += ` AND EXISTS (
+ SELECT 1
+ FROM detections d
+ WHERE d.hostname = event_logs.hostname
+ AND event_logs.ts >= d.window_start
+ AND event_logs.ts <= d.window_end
+ `
+ if f.Rule != "" {
+ query += ` AND d.rule_name = ?`
+ args = append(args, f.Rule)
+ }
+ if f.Severity != "" {
+ query += ` AND d.severity = ?`
+ args = append(args, f.Severity)
+ }
+ query += ` )`
+ }
+
+ query += ` ORDER BY ts DESC LIMIT ?`
+ args = append(args, f.Limit)
+
+ rows, err := s.db.QueryContext(ctx, query, args...)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+
+ var out []EventRow
+ for rows.Next() {
+ var ev EventRow
+ if err := rows.Scan(
+ &ev.ID, &ev.Hostname, &ev.Channel, &ev.EventID, &ev.Source,
+ &ev.Computer, &ev.ProviderName,
+ &ev.TargetUser, &ev.TargetDomain, &ev.SubjectUser, &ev.SubjectDomain,
+ &ev.Workstation, &ev.SrcIP, &ev.SrcPort, &ev.LogonType, &ev.ProcessName,
+ &ev.AuthenticationPackage, &ev.LogonProcess, &ev.StatusText, &ev.SubStatusText,
+ &ev.FailureReason, &ev.Time, &ev.ReceivedAt, &ev.Message,
+ ); err != nil {
+ return nil, err
+ }
+ out = append(out, ev)
+ }
+
+ return out, rows.Err()
+}
+
+func (s *server) getEventByID(ctx context.Context, id uint64) (EventRow, error) {
+ const q = `
+SELECT id, hostname, channel_name, event_id, source, computer, provider_name,
+ target_user, target_domain, subject_user, subject_domain,
+ workstation, src_ip, src_port, logon_type, process_name,
+ authentication_package, logon_process, status_text, sub_status_text,
+ failure_reason, ts, received_at, msg
+FROM event_logs
+WHERE id = ?
+LIMIT 1
+`
+
+ var ev EventRow
+ err := s.db.QueryRowContext(ctx, q, id).Scan(
+ &ev.ID, &ev.Hostname, &ev.Channel, &ev.EventID, &ev.Source,
+ &ev.Computer, &ev.ProviderName,
+ &ev.TargetUser, &ev.TargetDomain, &ev.SubjectUser, &ev.SubjectDomain,
+ &ev.Workstation, &ev.SrcIP, &ev.SrcPort, &ev.LogonType, &ev.ProcessName,
+ &ev.AuthenticationPackage, &ev.LogonProcess, &ev.StatusText, &ev.SubStatusText,
+ &ev.FailureReason, &ev.Time, &ev.ReceivedAt, &ev.Message,
+ )
+ return ev, err
+}
+
+func parseUIRFC3339(v string) (time.Time, error) {
+ return time.Parse(time.RFC3339, strings.TrimSpace(v))
+}
+
+func loadConfig() Config {
+ return Config{
+ ListenAddr: getenv("LISTEN_ADDR", ":8080"),
+ DBDSN: mustGetenv("DB_DSN"),
+ MaxBodyBytes: getenvInt64("MAX_BODY_BYTES", 10*1024*1024),
+ HTTPReadTimeout: getenvDuration("HTTP_READ_TIMEOUT", 15*time.Second),
+ HTTPWriteTimeout: getenvDuration("HTTP_WRITE_TIMEOUT", 30*time.Second),
+ HTTPIdleTimeout: getenvDuration("HTTP_IDLE_TIMEOUT", 60*time.Second),
+ DBMaxOpenConns: getenvInt("DB_MAX_OPEN_CONNS", 50),
+ DBMaxIdleConns: getenvInt("DB_MAX_IDLE_CONNS", 25),
+ DBConnMaxLifetime: getenvDuration("DB_CONN_MAX_LIFETIME", 3*time.Minute),
+ DBConnMaxIdleTime: getenvDuration("DB_CONN_MAX_IDLE_TIME", 1*time.Minute),
+ DetectionInterval: getenvDuration("DETECTION_INTERVAL", 1*time.Minute),
+ OfflineAfter: getenvDuration("OFFLINE_AFTER", 10*time.Minute),
+ FailedLogonWindow: getenvDuration("FAILED_LOGON_WINDOW", 5*time.Minute),
+ FailedLogonThreshold: getenvInt("FAILED_LOGON_THRESHOLD", 25),
+ RebootWindow: getenvDuration("REBOOT_WINDOW", 15*time.Minute),
+ RebootThreshold: getenvInt("REBOOT_THRESHOLD", 3),
+ PasswordSprayWindow: getenvDuration("PASSWORD_SPRAY_WINDOW", 5*time.Minute),
+ PasswordSprayMinUsers: getenvInt("PASSWORD_SPRAY_MIN_USERS", 5),
+ PasswordSprayMinAttempts: getenvInt("PASSWORD_SPRAY_MIN_ATTEMPTS", 15),
+ SuccessAfterFailureWindow: getenvDuration("SUCCESS_AFTER_FAILURE_WINDOW", 10*time.Minute),
+ NewSourceIPLookback: getenvDuration("NEW_SOURCE_IP_LOOKBACK", 30*24*time.Hour),
+ NewSourceIPWindow: getenvDuration("NEW_SOURCE_IP_WINDOW", 10*time.Minute),
+ DetectionsLimit: getenvInt("DETECTIONS_LIMIT", 100),
+
+ EnrollmentKey: mustGetenv("ENROLLMENT_KEY"),
+ }
+}
+
+func (s *server) handleHealthz(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ ingestRejectedTotal.WithLabelValues("method_not_allowed").Inc()
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ return
+ }
+ writeJSON(w, http.StatusOK, map[string]any{
+ "status": "ok",
+ "uptime_sec": int(time.Since(s.startTime).Seconds()),
+ })
+}
+
+func (s *server) handleReadyz(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ return
+ }
+ ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
+ defer cancel()
+ if err := s.db.PingContext(ctx); err != nil {
+ writeError(w, http.StatusServiceUnavailable, "database not ready")
+ return
+ }
+ writeJSON(w, http.StatusOK, map[string]string{"status": "ready"})
+}
+
+func (s *server) handleIngest(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodPost {
+ ingestRejectedTotal.WithLabelValues("method_not_allowed").Inc()
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ return
+ }
+
+ apiKey := strings.TrimSpace(r.Header.Get("X-API-Key"))
+ enrollmentKey := strings.TrimSpace(r.Header.Get("X-Enrollment-Key"))
+ if apiKey == "" {
+ ingestRejectedTotal.WithLabelValues("missing_api_key").Inc()
+ writeError(w, http.StatusUnauthorized, "missing api key")
+ return
+ }
+
+ r.Body = http.MaxBytesReader(w, r.Body, s.cfg.MaxBodyBytes)
+ defer r.Body.Close()
+
+ dec := json.NewDecoder(r.Body)
+ dec.DisallowUnknownFields()
+
+ var batch []LogPayload
+ if err := dec.Decode(&batch); err != nil {
+ ingestRejectedTotal.WithLabelValues("invalid_json").Inc()
+ writeError(w, http.StatusBadRequest, "invalid json")
+ return
+ }
+
+ if len(batch) == 0 {
+ ingestRejectedTotal.WithLabelValues("empty_batch").Inc()
+ writeError(w, http.StatusBadRequest, "empty batch")
+ return
+ }
+ if len(batch) > 1000 {
+ ingestRejectedTotal.WithLabelValues("batch_too_large").Inc()
+ writeError(w, http.StatusBadRequest, "batch too large")
+ return
+ }
+
+ for i := range batch {
+ if err := validatePayload(&batch[i]); err != nil {
+ ingestRejectedTotal.WithLabelValues("invalid_payload").Inc()
+ writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid payload at index %d: %v", i, err))
+ return
+ }
+ }
+
+ hostname := batch[0].Hostname
+ for i := 1; i < len(batch); i++ {
+ if batch[i].Hostname != hostname {
+ ingestRejectedTotal.WithLabelValues("mixed_hostnames").Inc()
+ writeError(w, http.StatusBadRequest, "all events in a batch must use the same hostname")
+ return
+ }
+ }
+
+ ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
+ defer cancel()
+
+ agentID, err := s.authenticateTouchOrEnrollAgent(
+ ctx,
+ hostname,
+ apiKey,
+ enrollmentKey,
+ clientIP(r.RemoteAddr),
+ )
+ if err != nil {
+ if errors.Is(err, errUnauthorized) {
+ ingestRejectedTotal.WithLabelValues("unauthorized").Inc()
+ writeError(w, http.StatusUnauthorized, "invalid api key or hostname")
+ return
+ }
+ ingestRejectedTotal.WithLabelValues("auth_error").Inc()
+ s.logger.Printf("authenticate agent: %v", err)
+ writeError(w, http.StatusInternalServerError, "internal error")
+ return
+ }
+
+ if err := s.insertBatch(ctx, agentID, batch); err != nil {
+ dbInsertFailuresTotal.Inc()
+ s.logger.Printf("insert batch: %v", err)
+ writeError(w, http.StatusInternalServerError, "internal error")
+ return
+ }
+
+ ingestBatchesTotal.Inc()
+ for _, item := range batch {
+ ingestEventsTotal.WithLabelValues(item.Channel, strconv.FormatUint(uint64(item.EventID), 10)).Inc()
+ }
+
+ writeJSON(w, http.StatusAccepted, ingestResponse{Accepted: len(batch)})
+}
+
+func (s *server) handleDetections(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ writeError(w, http.StatusMethodNotAllowed, "method not allowed")
+ return
+ }
+
+ limit := s.cfg.DetectionsLimit
+ if v := strings.TrimSpace(r.URL.Query().Get("limit")); v != "" {
+ if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 500 {
+ limit = n
+ }
+ }
+
+ host := strings.TrimSpace(r.URL.Query().Get("host"))
+ rule := strings.TrimSpace(r.URL.Query().Get("rule"))
+ severity := strings.TrimSpace(r.URL.Query().Get("severity"))
+
+ ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
+ defer cancel()
+
+ items, err := s.listDetections(ctx, host, rule, severity, limit)
+ if err != nil {
+ s.logger.Printf("list detections: %v", err)
+ writeError(w, http.StatusInternalServerError, "internal error")
+ return
+ }
+
+ writeJSON(w, http.StatusOK, items)
+}
+
+func (s *server) authenticateTouchOrEnrollAgent(ctx context.Context, hostname, apiKey, enrollmentKey, remoteIP string) (uint64, error) {
+ const q = `
+SELECT id, api_key_hash, is_enabled
+FROM agents
+WHERE hostname = ?
+LIMIT 1
+`
+
+ var id uint64
+ var storedHash string
+ var enabled bool
+
+ err := s.db.QueryRowContext(ctx, q, hostname).Scan(&id, &storedHash, &enabled)
+ if err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ if enrollmentKey == "" {
+ return 0, errUnauthorized
+ }
+ if !constantTimeEqual(sha256Hex(enrollmentKey), sha256Hex(s.cfg.EnrollmentKey)) {
+ return 0, errUnauthorized
+ }
+ return s.enrollNewAgent(ctx, hostname, apiKey, remoteIP)
+ }
+ return 0, err
+ }
+
+ if !enabled {
+ return 0, errUnauthorized
+ }
+
+ if !constantTimeEqual(strings.ToLower(storedHash), sha256Hex(apiKey)) {
+ return 0, errUnauthorized
+ }
+
+ const upd = `
+UPDATE agents
+SET last_seen = CURRENT_TIMESTAMP(6), last_ip = ?
+WHERE id = ?
+`
+ if _, err := s.db.ExecContext(ctx, upd, remoteIP, id); err != nil {
+ return 0, err
+ }
+
+ return id, nil
+}
+
+func (s *server) enrollNewAgent(ctx context.Context, hostname, apiKey, remoteIP string) (uint64, error) {
+ if strings.TrimSpace(hostname) == "" {
+ return 0, errUnauthorized
+ }
+ if strings.TrimSpace(apiKey) == "" {
+ return 0, errUnauthorized
+ }
+
+ tx, err := s.db.BeginTx(ctx, nil)
+ if err != nil {
+ return 0, err
+ }
+ defer func() { _ = tx.Rollback() }()
+
+ const ins = `
+INSERT INTO agents (hostname, api_key_hash, first_seen, last_seen, last_ip, is_enabled)
+VALUES (?, ?, CURRENT_TIMESTAMP(6), CURRENT_TIMESTAMP(6), ?, 1)
+`
+ res, err := tx.ExecContext(ctx, ins, hostname, sha256Hex(apiKey), remoteIP)
+ if err != nil {
+ return 0, err
+ }
+
+ id, err := res.LastInsertId()
+ if err != nil {
+ return 0, err
+ }
+
+ if err := tx.Commit(); err != nil {
+ return 0, err
+ }
+
+ s.logger.Printf("agent auto-enrolled: host=%s ip=%s", hostname, remoteIP)
+ return uint64(id), nil
+}
+
+var errUnauthorized = errors.New("unauthorized")
+
+func (s *server) insertBatch(ctx context.Context, agentID uint64, batch []LogPayload) error {
+ start := time.Now()
+
+ tx, err := s.db.BeginTx(ctx, nil)
+ if err != nil {
+ return err
+ }
+ defer func() { _ = tx.Rollback() }()
+
+ var sb strings.Builder
+ args := make([]any, 0, len(batch)*28)
+
+ sb.WriteString(`
+INSERT INTO event_logs (
+ agent_id, hostname, channel_name, event_id, source, computer, provider_name,
+ level_value, task_value, opcode_value, keywords,
+ target_user, target_domain, subject_user, subject_domain,
+ workstation, src_ip, src_port, logon_type, process_name,
+ authentication_package, logon_process, status_text, sub_status_text,
+ failure_reason, ts, msg, msg_sha256
+) VALUES
+`)
+
+ for i, item := range batch {
+ if i > 0 {
+ sb.WriteString(",")
+ }
+ sb.WriteString("(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
+
+ norm := NormalizeEventXML(item.Message)
+
+ args = append(args,
+ agentID,
+ item.Hostname,
+ item.Channel,
+ item.EventID,
+ item.Source,
+ norm.Computer,
+ firstNonEmpty(norm.ProviderName, item.Source),
+ norm.LevelValue,
+ norm.TaskValue,
+ norm.OpcodeValue,
+ norm.Keywords,
+ norm.TargetUser,
+ norm.TargetDomain,
+ norm.SubjectUser,
+ norm.SubjectDomain,
+ norm.Workstation,
+ norm.SrcIP,
+ norm.SrcPort,
+ norm.LogonType,
+ norm.ProcessName,
+ norm.AuthenticationPackage,
+ norm.LogonProcess,
+ norm.StatusText,
+ norm.SubStatusText,
+ norm.FailureReason,
+ item.Time.UTC(),
+ item.Message,
+ sha256Hex(item.Message),
+ )
+ }
+
+ if _, err := tx.ExecContext(ctx, sb.String(), args...); err != nil {
+ return err
+ }
+ if err := tx.Commit(); err != nil {
+ return err
+ }
+
+ dbBatchSizeHist.Observe(float64(len(batch)))
+ dbInsertEventsTotal.Add(float64(len(batch)))
+ dbTxDurationHist.Observe(time.Since(start).Seconds())
+ return nil
+}
+
+func (s *server) listDetections(ctx context.Context, host, rule, severity string, limit int) ([]Detection, error) {
+ base := `
+SELECT id, rule_name, severity, hostname, channel_name, event_id, score,
+ window_start, window_end, summary, details_json, created_at
+FROM detections
+WHERE 1=1
+`
+ args := make([]any, 0, 4)
+
+ if host != "" {
+ base += " AND hostname = ?"
+ args = append(args, host)
+ }
+ if rule != "" {
+ base += " AND rule_name = ?"
+ args = append(args, rule)
+ }
+ if severity != "" {
+ base += " AND severity = ?"
+ args = append(args, severity)
+ }
+
+ base += " ORDER BY created_at DESC LIMIT ?"
+ args = append(args, limit)
+
+ rows, err := s.db.QueryContext(ctx, base, args...)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+
+ var out []Detection
+ for rows.Next() {
+ var d Detection
+ if err := rows.Scan(
+ &d.ID, &d.RuleName, &d.Severity, &d.Hostname, &d.Channel,
+ &d.EventID, &d.Score, &d.WindowStart, &d.WindowEnd,
+ &d.Summary, &d.Details, &d.CreatedAt,
+ ); err != nil {
+ return nil, err
+ }
+ out = append(out, d)
+ }
+ return out, rows.Err()
+}
+
+func (s *server) runDetectionLoop() {
+ ticker := time.NewTicker(s.cfg.DetectionInterval)
+ defer ticker.Stop()
+
+ s.runDetectionsOnce()
+
+ for range ticker.C {
+ s.runDetectionsOnce()
+ }
+}
+
+func (s *server) runDetectionsOnce() {
+ ctx, cancel := context.WithTimeout(context.Background(), s.cfg.DetectionInterval)
+ defer cancel()
+
+ if err := s.detector.updateAgentMetrics(ctx); err != nil {
+ s.logger.Printf("update agent metrics: %v", err)
+ }
+
+ rules := []struct {
+ name string
+ fn func(context.Context) error
+ }{
+ {"agent_offline", s.detector.runAgentOfflineRule},
+ {"failed_logon_spike", s.detector.runFailedLogonSpikeRule},
+ {"reboot_spike", s.detector.runRebootSpikeRule},
+ {"new_event_id", s.detector.runNewEventIDRule},
+ {"password_spray", s.detector.runPasswordSprayRule},
+ {"success_after_failures", s.detector.runSuccessAfterFailuresRule},
+ {"new_source_ip_for_user", s.detector.runNewSourceIPForUserRule},
+ }
+
+ for _, rule := range rules {
+ start := time.Now()
+ if err := rule.fn(ctx); err != nil {
+ s.logger.Printf("rule %s error: %v", rule.name, err)
+ s.detector.ruleErrorsTotal.WithLabelValues(rule.name).Inc()
+ continue
+ }
+ s.detector.ruleLastRunGauge.WithLabelValues(rule.name).Set(float64(time.Now().Unix()))
+ s.detector.ruleRuntimeHist.WithLabelValues(rule.name).Observe(time.Since(start).Seconds())
+ }
+}
+
+func (d *detector) updateAgentMetrics(ctx context.Context) error {
+ const q = `
+SELECT hostname, UNIX_TIMESTAMP(last_seen)
+FROM agents
+WHERE is_enabled = 1
+`
+ rows, err := d.db.QueryContext(ctx, q)
+ if err != nil {
+ return err
+ }
+ defer rows.Close()
+
+ active := 0
+ now := time.Now().UTC()
+
+ for rows.Next() {
+ var host string
+ var lastSeen sql.NullInt64
+ if err := rows.Scan(&host, &lastSeen); err != nil {
+ return err
+ }
+ if lastSeen.Valid {
+ d.lastSeenGauge.WithLabelValues(host).Set(float64(lastSeen.Int64))
+ if now.Sub(time.Unix(lastSeen.Int64, 0).UTC()) <= d.cfg.OfflineAfter {
+ active++
+ }
+ }
+ }
+ if err := rows.Err(); err != nil {
+ return err
+ }
+ d.activeAgentsGauge.Set(float64(active))
+ return nil
+}
+
+func (d *detector) runAgentOfflineRule(ctx context.Context) error {
+ windowEnd := time.Now().UTC()
+ windowStart := windowEnd.Add(-d.cfg.OfflineAfter)
+
+ const q = `
+SELECT hostname, last_seen
+FROM agents
+WHERE is_enabled = 1
+ AND last_seen < ?
+`
+ rows, err := d.db.QueryContext(ctx, q, windowStart)
+ if err != nil {
+ return err
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var host string
+ var lastSeen time.Time
+ if err := rows.Scan(&host, &lastSeen); err != nil {
+ return err
+ }
+ minutes := int(windowEnd.Sub(lastSeen).Minutes())
+ score := math.Max(1, float64(minutes)/float64(int(d.cfg.OfflineAfter.Minutes())))
+ severity := severityFromScore(score, 1.5, 3.0)
+
+ d.anomalyScoreGauge.WithLabelValues(host, "agent_offline").Set(score)
+
+ created, err := d.insertDetection(ctx, Detection{
+ RuleName: "agent_offline",
+ Severity: severity,
+ Hostname: host,
+ Score: score,
+ WindowStart: windowStart,
+ WindowEnd: windowEnd,
+ Summary: fmt.Sprintf("Agent %s ist seit %d Minuten offline", host, minutes),
+ Details: mustJSON(map[string]any{
+ "last_seen": lastSeen.UTC().Format(time.RFC3339Nano),
+ "offline_minutes": minutes,
+ "offline_after_min": int(d.cfg.OfflineAfter.Minutes()),
+ }),
+ })
+ if err != nil {
+ return err
+ }
+ if created {
+ d.detectionHitsTotal.WithLabelValues("agent_offline", severity).Inc()
+ }
+ }
+ return rows.Err()
+}
+
+func (d *detector) runFailedLogonSpikeRule(ctx context.Context) error {
+ windowEnd := time.Now().UTC()
+ windowStart := windowEnd.Add(-d.cfg.FailedLogonWindow)
+
+ const q = `
+SELECT hostname, COUNT(*) AS cnt
+FROM event_logs
+WHERE channel_name = 'Security'
+ AND event_id = 4625
+ AND ts >= ? AND ts < ?
+GROUP BY hostname
+HAVING COUNT(*) >= ?
+`
+ rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, d.cfg.FailedLogonThreshold)
+ if err != nil {
+ return err
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var host string
+ var count int
+ if err := rows.Scan(&host, &count); err != nil {
+ return err
+ }
+
+ score := float64(count) / float64(d.cfg.FailedLogonThreshold)
+ severity := severityFromScore(score, 2.0, 5.0)
+ d.anomalyScoreGauge.WithLabelValues(host, "failed_logon_spike").Set(score)
+
+ created, err := d.insertDetection(ctx, Detection{
+ RuleName: "failed_logon_spike",
+ Severity: severity,
+ Hostname: host,
+ Channel: "Security",
+ EventID: 4625,
+ Score: score,
+ WindowStart: windowStart,
+ WindowEnd: windowEnd,
+ Summary: fmt.Sprintf("Host %s hatte %d fehlgeschlagene Logons in %d Minuten", host, count, int(d.cfg.FailedLogonWindow.Minutes())),
+ Details: mustJSON(map[string]any{
+ "count": count,
+ "threshold": d.cfg.FailedLogonThreshold,
+ "window_minutes": int(d.cfg.FailedLogonWindow.Minutes()),
+ "event_id": 4625,
+ }),
+ })
+ if err != nil {
+ return err
+ }
+ if created {
+ d.detectionHitsTotal.WithLabelValues("failed_logon_spike", severity).Inc()
+ }
+ }
+ return rows.Err()
+}
+
+func (d *detector) runRebootSpikeRule(ctx context.Context) error {
+ windowEnd := time.Now().UTC()
+ windowStart := windowEnd.Add(-d.cfg.RebootWindow)
+
+ const q = `
+SELECT hostname, COUNT(*) AS cnt
+FROM event_logs
+WHERE channel_name = 'System'
+ AND event_id IN (1074, 6005, 6006)
+ AND ts >= ? AND ts < ?
+GROUP BY hostname
+HAVING COUNT(*) >= ?
+`
+ rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, d.cfg.RebootThreshold)
+ if err != nil {
+ return err
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var host string
+ var count int
+ if err := rows.Scan(&host, &count); err != nil {
+ return err
+ }
+
+ score := float64(count) / float64(d.cfg.RebootThreshold)
+ severity := severityFromScore(score, 2.0, 4.0)
+ d.anomalyScoreGauge.WithLabelValues(host, "reboot_spike").Set(score)
+
+ created, err := d.insertDetection(ctx, Detection{
+ RuleName: "reboot_spike",
+ Severity: severity,
+ Hostname: host,
+ Channel: "System",
+ Score: score,
+ WindowStart: windowStart,
+ WindowEnd: windowEnd,
+ Summary: fmt.Sprintf("Host %s hatte %d Reboot-/Shutdown-Events in %d Minuten", host, count, int(d.cfg.RebootWindow.Minutes())),
+ Details: mustJSON(map[string]any{
+ "count": count,
+ "threshold": d.cfg.RebootThreshold,
+ "window_minutes": int(d.cfg.RebootWindow.Minutes()),
+ "event_ids": []int{1074, 6005, 6006},
+ }),
+ })
+ if err != nil {
+ return err
+ }
+ if created {
+ d.detectionHitsTotal.WithLabelValues("reboot_spike", severity).Inc()
+ }
+ }
+ return rows.Err()
+}
+
+func (d *detector) runNewEventIDRule(ctx context.Context) error {
+ windowEnd := time.Now().UTC()
+ windowStart := windowEnd.Add(-d.cfg.DetectionInterval)
+
+ const q = `
+SELECT e.hostname, e.channel_name, e.event_id, COUNT(*) AS cnt
+FROM event_logs e
+WHERE e.ts >= ? AND e.ts < ?
+ AND NOT EXISTS (
+ SELECT 1
+ FROM event_logs old
+ WHERE old.hostname = e.hostname
+ AND old.channel_name = e.channel_name
+ AND old.event_id = e.event_id
+ AND old.ts < ?
+ )
+GROUP BY e.hostname, e.channel_name, e.event_id
+`
+ rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, windowStart)
+ if err != nil {
+ return err
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var host, channel string
+ var eventID uint32
+ var count int
+ if err := rows.Scan(&host, &channel, &eventID, &count); err != nil {
+ return err
+ }
+
+ score := 1.0 + math.Log10(float64(count)+1)
+ severity := "medium"
+ if count >= 10 {
+ severity = "high"
+ }
+ d.anomalyScoreGauge.WithLabelValues(host, "new_event_id").Set(score)
+
+ created, err := d.insertDetection(ctx, Detection{
+ RuleName: "new_event_id",
+ Severity: severity,
+ Hostname: host,
+ Channel: channel,
+ EventID: eventID,
+ Score: score,
+ WindowStart: windowStart,
+ WindowEnd: windowEnd,
+ Summary: fmt.Sprintf("Host %s sendet erstmals Event-ID %d im Channel %s", host, eventID, channel),
+ Details: mustJSON(map[string]any{
+ "count": count,
+ "channel": channel,
+ "event_id": eventID,
+ }),
+ })
+ if err != nil {
+ return err
+ }
+ if created {
+ d.detectionHitsTotal.WithLabelValues("new_event_id", severity).Inc()
+ }
+ }
+ return rows.Err()
+}
+
+func (d *detector) runPasswordSprayRule(ctx context.Context) error {
+ windowEnd := time.Now().UTC()
+ windowStart := windowEnd.Add(-d.cfg.PasswordSprayWindow)
+
+ const q = `
+SELECT hostname, src_ip, COUNT(*) AS attempts, COUNT(DISTINCT target_user) AS users
+FROM event_logs
+WHERE channel_name = 'Security'
+ AND event_id = 4625
+ AND ts >= ? AND ts < ?
+ AND src_ip <> '' AND src_ip <> '-' AND src_ip <> '::1' AND src_ip <> '127.0.0.1'
+ AND target_user <> '' AND target_user <> '-'
+GROUP BY hostname, src_ip
+HAVING COUNT(*) >= ? AND COUNT(DISTINCT target_user) >= ?
+`
+ rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, d.cfg.PasswordSprayMinAttempts, d.cfg.PasswordSprayMinUsers)
+ if err != nil {
+ return err
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var host, srcIP string
+ var attempts, users int
+ if err := rows.Scan(&host, &srcIP, &attempts, &users); err != nil {
+ return err
+ }
+
+ score := math.Max(float64(attempts)/float64(d.cfg.PasswordSprayMinAttempts), float64(users)/float64(d.cfg.PasswordSprayMinUsers))
+ severity := severityFromScore(score, 1.5, 3.0)
+ d.anomalyScoreGauge.WithLabelValues(host, "password_spray").Set(score)
+
+ created, err := d.insertDetection(ctx, Detection{
+ RuleName: "password_spray",
+ Severity: severity,
+ Hostname: host,
+ Channel: "Security",
+ EventID: 4625,
+ Score: score,
+ WindowStart: windowStart,
+ WindowEnd: windowEnd,
+ Summary: fmt.Sprintf("Möglicher Password-Spray auf %s von %s: %d Fehlversuche gegen %d Benutzer", host, srcIP, attempts, users),
+ Details: mustJSON(map[string]any{
+ "src_ip": srcIP,
+ "attempts": attempts,
+ "distinct_users": users,
+ "window_minutes": int(d.cfg.PasswordSprayWindow.Minutes()),
+ "event_id": 4625,
+ }),
+ })
+ if err != nil {
+ return err
+ }
+ if created {
+ d.detectionHitsTotal.WithLabelValues("password_spray", severity).Inc()
+ }
+ }
+ return rows.Err()
+}
+
+func (d *detector) runSuccessAfterFailuresRule(ctx context.Context) error {
+ windowEnd := time.Now().UTC()
+ windowStart := windowEnd.Add(-d.cfg.SuccessAfterFailureWindow)
+
+ const q = `
+SELECT s.hostname, s.target_user, s.src_ip, COUNT(*) AS success_count
+FROM event_logs s
+WHERE s.channel_name = 'Security'
+ AND s.event_id = 4624
+ AND s.ts >= ? AND s.ts < ?
+ AND s.target_user <> '' AND s.target_user <> '-'
+ AND EXISTS (
+ SELECT 1
+ FROM event_logs f
+ WHERE f.hostname = s.hostname
+ AND f.channel_name = 'Security'
+ AND f.event_id = 4625
+ AND f.target_user = s.target_user
+ AND (
+ (f.src_ip = s.src_ip AND s.src_ip <> '' AND s.src_ip <> '-')
+ OR (s.src_ip = '' OR s.src_ip = '-' OR f.src_ip = '' OR f.src_ip = '-')
+ )
+ AND f.ts >= DATE_SUB(s.ts, INTERVAL ? SECOND)
+ AND f.ts < s.ts
+ )
+GROUP BY s.hostname, s.target_user, s.src_ip
+`
+ rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, int(d.cfg.SuccessAfterFailureWindow.Seconds()))
+ if err != nil {
+ return err
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var host, user, srcIP string
+ var successCount int
+ if err := rows.Scan(&host, &user, &srcIP, &successCount); err != nil {
+ return err
+ }
+
+ score := 2.0 + math.Log10(float64(successCount)+1)
+ severity := "high"
+ d.anomalyScoreGauge.WithLabelValues(host, "success_after_failures").Set(score)
+
+ created, err := d.insertDetection(ctx, Detection{
+ RuleName: "success_after_failures",
+ Severity: severity,
+ Hostname: host,
+ Channel: "Security",
+ EventID: 4624,
+ Score: score,
+ WindowStart: windowStart,
+ WindowEnd: windowEnd,
+ Summary: fmt.Sprintf("Erfolgreicher Logon nach Fehlversuchen auf %s für Benutzer %s", host, user),
+ Details: mustJSON(map[string]any{
+ "user": user,
+ "src_ip": srcIP,
+ "success_count": successCount,
+ "window_minutes": int(d.cfg.SuccessAfterFailureWindow.Minutes()),
+ "success_event": 4624,
+ "failure_event": 4625,
+ }),
+ })
+ if err != nil {
+ return err
+ }
+ if created {
+ d.detectionHitsTotal.WithLabelValues("success_after_failures", severity).Inc()
+ }
+ }
+ return rows.Err()
+}
+
+func (d *detector) runNewSourceIPForUserRule(ctx context.Context) error {
+ windowEnd := time.Now().UTC()
+ windowStart := windowEnd.Add(-d.cfg.NewSourceIPWindow)
+ lookbackStart := windowStart.Add(-d.cfg.NewSourceIPLookback)
+
+ const q = `
+SELECT e.hostname, e.target_user, e.src_ip, COUNT(*) AS cnt
+FROM event_logs e
+WHERE e.channel_name = 'Security'
+ AND e.event_id = 4624
+ AND e.ts >= ? AND e.ts < ?
+ AND e.target_user <> '' AND e.target_user <> '-'
+ AND e.src_ip <> '' AND e.src_ip <> '-' AND e.src_ip <> '::1' AND e.src_ip <> '127.0.0.1'
+ AND NOT EXISTS (
+ SELECT 1
+ FROM event_logs old
+ WHERE old.hostname = e.hostname
+ AND old.channel_name = 'Security'
+ AND old.event_id = 4624
+ AND old.target_user = e.target_user
+ AND old.src_ip = e.src_ip
+ AND old.ts >= ? AND old.ts < ?
+ )
+GROUP BY e.hostname, e.target_user, e.src_ip
+`
+ rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, lookbackStart, windowStart)
+ if err != nil {
+ return err
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var host, user, srcIP string
+ var cnt int
+ if err := rows.Scan(&host, &user, &srcIP, &cnt); err != nil {
+ return err
+ }
+
+ score := 1.5 + math.Log10(float64(cnt)+1)
+ severity := "medium"
+ if cnt >= 5 {
+ severity = "high"
+ }
+ d.anomalyScoreGauge.WithLabelValues(host, "new_source_ip_for_user").Set(score)
+
+ created, err := d.insertDetection(ctx, Detection{
+ RuleName: "new_source_ip_for_user",
+ Severity: severity,
+ Hostname: host,
+ Channel: "Security",
+ EventID: 4624,
+ Score: score,
+ WindowStart: windowStart,
+ WindowEnd: windowEnd,
+ Summary: fmt.Sprintf("Benutzer %s meldet sich auf %s von neuer Quell-IP %s an", user, host, srcIP),
+ Details: mustJSON(map[string]any{
+ "user": user,
+ "src_ip": srcIP,
+ "count": cnt,
+ "window_minutes": int(d.cfg.NewSourceIPWindow.Minutes()),
+ "lookback_hours": int(d.cfg.NewSourceIPLookback.Hours()),
+ "event_id": 4624,
+ }),
+ })
+ if err != nil {
+ return err
+ }
+ if created {
+ d.detectionHitsTotal.WithLabelValues("new_source_ip_for_user", severity).Inc()
+ }
+ }
+ return rows.Err()
+}
+
+func (d *detector) insertDetection(ctx context.Context, det Detection) (bool, error) {
+ const q = `
+INSERT IGNORE INTO detections
+(rule_name, severity, hostname, channel_name, event_id, score, window_start, window_end, summary, details_json)
+VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+`
+ res, err := d.db.ExecContext(ctx, q,
+ det.RuleName,
+ det.Severity,
+ det.Hostname,
+ det.Channel,
+ det.EventID,
+ det.Score,
+ det.WindowStart.UTC(),
+ det.WindowEnd.UTC(),
+ det.Summary,
+ string(det.Details),
+ )
+ if err != nil {
+ return false, err
+ }
+ affected, err := res.RowsAffected()
+ if err != nil {
+ return false, err
+ }
+ return affected > 0, nil
+}
+
+func NormalizeEventXML(xmlStr string) NormalizedEvent {
+ var out NormalizedEvent
+ if strings.TrimSpace(xmlStr) == "" {
+ return out
+ }
+
+ dec := xml.NewDecoder(strings.NewReader(xmlStr))
+
+ var path []string
+ var currentDataName string
+
+ for {
+ tok, err := dec.Token()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ return out
+ }
+
+ switch t := tok.(type) {
+ case xml.StartElement:
+ path = append(path, t.Name.Local)
+
+ switch t.Name.Local {
+ case "Provider":
+ for _, a := range t.Attr {
+ if a.Name.Local == "Name" {
+ out.ProviderName = strings.TrimSpace(a.Value)
+ }
+ }
+ case "Data":
+ currentDataName = ""
+ for _, a := range t.Attr {
+ if a.Name.Local == "Name" {
+ currentDataName = strings.TrimSpace(a.Value)
+ break
+ }
+ }
+ }
+
+ case xml.EndElement:
+ if len(path) > 0 {
+ path = path[:len(path)-1]
+ }
+ if t.Name.Local == "Data" {
+ currentDataName = ""
+ }
+
+ case xml.CharData:
+ v := strings.TrimSpace(string(t))
+ if v == "" {
+ continue
+ }
+
+ if endsWithPath(path, "System", "Computer") {
+ out.Computer = v
+ continue
+ }
+ if endsWithPath(path, "System", "Level") {
+ out.LevelValue = parseUint32(v)
+ continue
+ }
+ if endsWithPath(path, "System", "Task") {
+ out.TaskValue = parseUint32(v)
+ continue
+ }
+ if endsWithPath(path, "System", "Opcode") {
+ out.OpcodeValue = parseUint32(v)
+ continue
+ }
+ if endsWithPath(path, "System", "Keywords") {
+ out.Keywords = v
+ continue
+ }
+
+ if currentDataName != "" {
+ switch currentDataName {
+ case "TargetUserName":
+ out.TargetUser = v
+ case "TargetDomainName":
+ out.TargetDomain = v
+ case "SubjectUserName":
+ out.SubjectUser = v
+ case "SubjectDomainName":
+ out.SubjectDomain = v
+ case "WorkstationName":
+ out.Workstation = v
+ case "IpAddress":
+ out.SrcIP = v
+ case "IpPort":
+ out.SrcPort = v
+ case "LogonType":
+ out.LogonType = v
+ case "ProcessName":
+ out.ProcessName = v
+ case "AuthenticationPackageName":
+ out.AuthenticationPackage = v
+ case "LogonProcessName":
+ out.LogonProcess = v
+ case "Status":
+ out.StatusText = v
+ case "SubStatus":
+ out.SubStatusText = v
+ case "FailureReason":
+ out.FailureReason = v
+ }
+ }
+ }
+ }
+
+ out.TargetUser = cleanupWinField(out.TargetUser)
+ out.TargetDomain = cleanupWinField(out.TargetDomain)
+ out.SubjectUser = cleanupWinField(out.SubjectUser)
+ out.SubjectDomain = cleanupWinField(out.SubjectDomain)
+ out.Workstation = cleanupWinField(out.Workstation)
+ out.SrcIP = cleanupWinField(out.SrcIP)
+ out.SrcPort = cleanupWinField(out.SrcPort)
+ out.LogonType = cleanupWinField(out.LogonType)
+ out.ProcessName = cleanupWinField(out.ProcessName)
+ out.AuthenticationPackage = cleanupWinField(out.AuthenticationPackage)
+ out.LogonProcess = cleanupWinField(out.LogonProcess)
+ out.StatusText = cleanupWinField(out.StatusText)
+ out.SubStatusText = cleanupWinField(out.SubStatusText)
+ out.FailureReason = cleanupWinField(out.FailureReason)
+ out.Computer = cleanupWinField(out.Computer)
+ out.ProviderName = cleanupWinField(out.ProviderName)
+
+ return out
+}
+
+func endsWithPath(path []string, parts ...string) bool {
+ if len(path) < len(parts) {
+ return false
+ }
+ start := len(path) - len(parts)
+ for i := range parts {
+ if path[start+i] != parts[i] {
+ return false
+ }
+ }
+ return true
+}
+
+func parseUint32(v string) uint32 {
+ n, err := strconv.ParseUint(strings.TrimSpace(v), 10, 32)
+ if err != nil {
+ return 0
+ }
+ return uint32(n)
+}
+
+func cleanupWinField(v string) string {
+ v = strings.TrimSpace(v)
+ if v == "" {
+ return ""
+ }
+ if v == "-" {
+ return ""
+ }
+ return v
+}
+
+func firstNonEmpty(v ...string) string {
+ for _, s := range v {
+ if strings.TrimSpace(s) != "" {
+ return strings.TrimSpace(s)
+ }
+ }
+ return ""
+}
+
+func validatePayload(p *LogPayload) error {
+ p.Hostname = strings.TrimSpace(p.Hostname)
+ p.Channel = strings.TrimSpace(p.Channel)
+ p.Source = strings.TrimSpace(p.Source)
+
+ if p.Hostname == "" {
+ return errors.New("host is required")
+ }
+ if len(p.Hostname) > 255 {
+ return errors.New("host too long")
+ }
+ if p.Channel == "" {
+ return errors.New("channel is required")
+ }
+ if len(p.Channel) > 128 {
+ return errors.New("channel too long")
+ }
+ if p.Source == "" {
+ return errors.New("source is required")
+ }
+ if len(p.Source) > 255 {
+ return errors.New("source too long")
+ }
+ if p.EventID == 0 {
+ return errors.New("event id is required")
+ }
+ if p.Message == "" {
+ return errors.New("msg is required")
+ }
+ if len(p.Message) > 2*1024*1024 {
+ return errors.New("msg too large")
+ }
+ if p.Time.IsZero() {
+ return errors.New("ts is required")
+ }
+ return nil
+}
+
+func severityFromScore(score, medium, high float64) string {
+ switch {
+ case score >= high:
+ return "high"
+ case score >= medium:
+ return "medium"
+ default:
+ return "low"
+ }
+}
+
+func mustJSON(v any) json.RawMessage {
+ b, _ := json.Marshal(v)
+ return b
+}
+
+func metricsMiddleware(logger *log.Logger, next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ rw := &statusRecorder{ResponseWriter: w, status: 200}
+ start := time.Now()
+ next.ServeHTTP(rw, r)
+
+ status := strconv.Itoa(rw.status)
+ path := r.URL.Path
+
+ httpRequestsTotal.WithLabelValues(path, r.Method, status).Inc()
+ httpRequestDuration.WithLabelValues(path, r.Method, status).Observe(time.Since(start).Seconds())
+ logger.Printf("%s %s remote=%s status=%s dur=%s", r.Method, r.URL.Path, r.RemoteAddr, status, time.Since(start))
+ })
+}
+
+type statusRecorder struct {
+ http.ResponseWriter
+ status int
+}
+
+func (r *statusRecorder) WriteHeader(status int) {
+ r.status = status
+ r.ResponseWriter.WriteHeader(status)
+}
+
+func recoveryMiddleware(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ defer func() {
+ if recover() != nil {
+ writeError(w, http.StatusInternalServerError, "internal error")
+ }
+ }()
+ next.ServeHTTP(w, r)
+ })
+}
+
+func writeJSON(w http.ResponseWriter, status int, v any) {
+ w.Header().Set("Content-Type", "application/json; charset=utf-8")
+ w.WriteHeader(status)
+ _ = json.NewEncoder(w).Encode(v)
+}
+
+func writeError(w http.ResponseWriter, status int, msg string) {
+ writeJSON(w, status, map[string]string{"error": msg})
+}
+
+func sha256Hex(s string) string {
+ sum := sha256.Sum256([]byte(s))
+ return hex.EncodeToString(sum[:])
+}
+
+func constantTimeEqual(a, b string) bool {
+ if len(a) != len(b) {
+ return false
+ }
+ var v byte
+ for i := 0; i < len(a); i++ {
+ v |= a[i] ^ b[i]
+ }
+ return v == 0
+}
+
+func clientIP(remoteAddr string) string {
+ host, _, err := net.SplitHostPort(remoteAddr)
+ if err != nil {
+ return remoteAddr
+ }
+ return host
+}
+
+func getenv(key, def string) string {
+ v := strings.TrimSpace(os.Getenv(key))
+ if v == "" {
+ return def
+ }
+ return v
+}
+
+func mustGetenv(key string) string {
+ v := strings.TrimSpace(os.Getenv(key))
+ if v == "" {
+ log.Fatalf("missing required environment variable: %s", key)
+ }
+ return v
+}
+
+func getenvInt(key string, def int) int {
+ v := strings.TrimSpace(os.Getenv(key))
+ if v == "" {
+ return def
+ }
+ n, err := strconv.Atoi(v)
+ if err != nil {
+ log.Fatalf("invalid int for %s: %v", key, err)
+ }
+ return n
+}
+
+func getenvInt64(key string, def int64) int64 {
+ v := strings.TrimSpace(os.Getenv(key))
+ if v == "" {
+ return def
+ }
+ n, err := strconv.ParseInt(v, 10, 64)
+ if err != nil {
+ log.Fatalf("invalid int64 for %s: %v", key, err)
+ }
+ return n
+}
+
+func getenvDuration(key string, def time.Duration) time.Duration {
+ v := strings.TrimSpace(os.Getenv(key))
+ if v == "" {
+ return def
+ }
+ d, err := time.ParseDuration(v)
+ if err != nil {
+ log.Fatalf("invalid duration for %s: %v", key, err)
+ }
+ return d
+}
diff --git a/schema.sql b/schema.sql
new file mode 100644
index 0000000..64cba70
--- /dev/null
+++ b/schema.sql
@@ -0,0 +1,101 @@
+CREATE DATABASE IF NOT EXISTS eventcollector
+ CHARACTER SET utf8mb4
+ COLLATE utf8mb4_unicode_ci;
+
+USE eventcollector;
+
+CREATE TABLE IF NOT EXISTS agents (
+ id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
+ hostname VARCHAR(255) NOT NULL,
+ api_key_hash CHAR(64) NOT NULL,
+ first_seen DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+ last_seen DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+ last_ip VARCHAR(64) NOT NULL DEFAULT '',
+ is_enabled TINYINT(1) NOT NULL DEFAULT 1,
+ PRIMARY KEY (id),
+ UNIQUE KEY ux_agents_hostname (hostname),
+ KEY ix_agents_last_seen (last_seen)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
+
+CREATE TABLE IF NOT EXISTS event_logs (
+ id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
+ agent_id BIGINT UNSIGNED NOT NULL,
+ hostname VARCHAR(255) NOT NULL,
+ channel_name VARCHAR(128) NOT NULL,
+ event_id INT UNSIGNED NOT NULL,
+ source VARCHAR(255) NOT NULL,
+ ts DATETIME(6) NOT NULL,
+ received_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+ msg LONGTEXT NOT NULL,
+ msg_sha256 CHAR(64) NOT NULL,
+ PRIMARY KEY (id),
+ KEY ix_event_logs_ts (ts),
+ KEY ix_event_logs_received_at (received_at),
+ KEY ix_event_logs_agent_ts (agent_id, ts),
+ KEY ix_event_logs_eventid_ts (event_id, ts),
+ KEY ix_event_logs_hostname_ts (hostname, ts),
+ KEY ix_event_logs_channel_event_ts (channel_name, event_id, ts),
+ CONSTRAINT fk_event_logs_agent
+ FOREIGN KEY (agent_id) REFERENCES agents(id)
+ ON DELETE RESTRICT
+ ON UPDATE RESTRICT
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
+
+CREATE TABLE IF NOT EXISTS detections (
+ id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
+ rule_name VARCHAR(128) NOT NULL,
+ severity VARCHAR(32) NOT NULL,
+ hostname VARCHAR(255) NOT NULL,
+ channel_name VARCHAR(128) NOT NULL DEFAULT '',
+ event_id INT UNSIGNED NOT NULL DEFAULT 0,
+ score DOUBLE NOT NULL DEFAULT 0,
+ window_start DATETIME(6) NOT NULL,
+ window_end DATETIME(6) NOT NULL,
+ summary VARCHAR(512) NOT NULL,
+ details_json JSON NOT NULL,
+ created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+ PRIMARY KEY (id),
+ UNIQUE KEY ux_detection_dedupe (rule_name, hostname, channel_name, event_id, window_start, window_end),
+ KEY ix_detections_created (created_at),
+ KEY ix_detections_rule_host_time (rule_name, hostname, created_at),
+ KEY ix_detections_severity_time (severity, created_at)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
+
+USE eventcollector;
+
+INSERT INTO agents (hostname, api_key_hash)
+VALUES
+ ('client01.domain.local', SHA2('SUPER-LANGER-AGENT-KEY-01', 256)),
+ ('client02.domain.local', SHA2('SUPER-LANGER-AGENT-KEY-02', 256));
+
+ #V2
+
+ ALTER TABLE event_logs
+ ADD COLUMN computer VARCHAR(255) NOT NULL DEFAULT '' AFTER source,
+ ADD COLUMN provider_name VARCHAR(255) NOT NULL DEFAULT '' AFTER computer,
+ ADD COLUMN level_value INT UNSIGNED NOT NULL DEFAULT 0 AFTER provider_name,
+ ADD COLUMN task_value INT UNSIGNED NOT NULL DEFAULT 0 AFTER level_value,
+ ADD COLUMN opcode_value INT UNSIGNED NOT NULL DEFAULT 0 AFTER task_value,
+ ADD COLUMN keywords VARCHAR(255) NOT NULL DEFAULT '' AFTER opcode_value,
+ ADD COLUMN target_user VARCHAR(255) NOT NULL DEFAULT '' AFTER keywords,
+ ADD COLUMN target_domain VARCHAR(255) NOT NULL DEFAULT '' AFTER target_user,
+ ADD COLUMN subject_user VARCHAR(255) NOT NULL DEFAULT '' AFTER target_domain,
+ ADD COLUMN subject_domain VARCHAR(255) NOT NULL DEFAULT '' AFTER subject_user,
+ ADD COLUMN workstation VARCHAR(255) NOT NULL DEFAULT '' AFTER subject_domain,
+ ADD COLUMN src_ip VARCHAR(64) NOT NULL DEFAULT '' AFTER workstation,
+ ADD COLUMN src_port VARCHAR(32) NOT NULL DEFAULT '' AFTER src_ip,
+ ADD COLUMN logon_type VARCHAR(32) NOT NULL DEFAULT '' AFTER src_port,
+ ADD COLUMN process_name VARCHAR(512) NOT NULL DEFAULT '' AFTER logon_type,
+ ADD COLUMN authentication_package VARCHAR(128) NOT NULL DEFAULT '' AFTER process_name,
+ ADD COLUMN logon_process VARCHAR(128) NOT NULL DEFAULT '' AFTER authentication_package,
+ ADD COLUMN status_text VARCHAR(64) NOT NULL DEFAULT '' AFTER logon_process,
+ ADD COLUMN sub_status_text VARCHAR(64) NOT NULL DEFAULT '' AFTER status_text,
+ ADD COLUMN failure_reason VARCHAR(512) NOT NULL DEFAULT '' AFTER sub_status_text;
+
+ALTER TABLE event_logs
+ ADD KEY ix_event_logs_target_user_ts (target_user, ts),
+ ADD KEY ix_event_logs_src_ip_ts (src_ip, ts),
+ ADD KEY ix_event_logs_target_user_src_ip_ts (target_user, src_ip, ts),
+ ADD KEY ix_event_logs_eventid_srcip_ts (event_id, src_ip, ts),
+ ADD KEY ix_event_logs_eventid_targetuser_ts (event_id, target_user, ts),
+ ADD KEY ix_event_logs_eventid_logontype_ts (event_id, logon_type, ts);
\ No newline at end of file