From 2b08d518ad10d554d336ac8db2aaceffb6fa8deb Mon Sep 17 00:00:00 2001 From: jbergner Date: Thu, 23 Apr 2026 21:56:44 +0200 Subject: [PATCH] init --- .env | 38 + .gitea/workflows/registry.yml | 51 + Dockerfile | 32 + compose.yml | 118 + .../provisioning/dashboards/dashboards.yml | 12 + .../dashboards/siem-overview.json | 76 + .../provisioning/datasources/datasource.yml | 10 + deploy/mariadb/init/001-schema.sql | 88 + deploy/prometheus/prometheus.yml | 18 + deploy/prometheus/rules/siem-alerts.yml | 38 + go.mod | 22 + go.sum | 25 + main.go | 2194 +++++++++++++++++ schema.sql | 101 + 14 files changed, 2823 insertions(+) create mode 100644 .env create mode 100644 .gitea/workflows/registry.yml create mode 100644 Dockerfile create mode 100644 compose.yml create mode 100644 deploy/grafana/provisioning/dashboards/dashboards.yml create mode 100644 deploy/grafana/provisioning/dashboards/siem-overview.json create mode 100644 deploy/grafana/provisioning/datasources/datasource.yml create mode 100644 deploy/mariadb/init/001-schema.sql create mode 100644 deploy/prometheus/prometheus.yml create mode 100644 deploy/prometheus/rules/siem-alerts.yml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 schema.sql diff --git a/.env b/.env new file mode 100644 index 0000000..3941672 --- /dev/null +++ b/.env @@ -0,0 +1,38 @@ +TZ=Europe/Berlin + +LISTEN_ADDR=:8080 +DB_DSN=eventuser:DEINPASSWORT@tcp(mariadb:3306)/eventcollector?parseTime=true&charset=utf8mb4,utf8&collation=utf8mb4_unicode_ci&loc=UTC + +DB_MAX_OPEN_CONNS=50 +DB_MAX_IDLE_CONNS=25 +DB_CONN_MAX_LIFETIME=3m +DB_CONN_MAX_IDLE_TIME=1m + +MAX_BODY_BYTES=10485760 +HTTP_READ_TIMEOUT=15s +HTTP_WRITE_TIMEOUT=30s +HTTP_IDLE_TIMEOUT=60s + +DETECTION_INTERVAL=1m +OFFLINE_AFTER=10m +FAILED_LOGON_WINDOW=5m +FAILED_LOGON_THRESHOLD=25 +REBOOT_WINDOW=15m +REBOOT_THRESHOLD=3 +PASSWORD_SPRAY_WINDOW=5m +PASSWORD_SPRAY_MIN_USERS=5 +PASSWORD_SPRAY_MIN_ATTEMPTS=15 +SUCCESS_AFTER_FAILURE_WINDOW=10m +NEW_SOURCE_IP_LOOKBACK=720h +NEW_SOURCE_IP_WINDOW=10m +DETECTIONS_LIMIT=100 + +MARIADB_DATABASE=eventcollector +MARIADB_USER=eventuser +MARIADB_PASSWORD=DEINPASSWORT +MARIADB_ROOT_PASSWORD=ROOTPASSWORT + +GRAFANA_ADMIN_USER=admin +GRAFANA_ADMIN_PASSWORD=BITTE_AENDERN + +ENROLLMENT_KEY=BITTE_SEHR_LANG_UND_ZUFAELLIG \ No newline at end of file diff --git a/.gitea/workflows/registry.yml b/.gitea/workflows/registry.yml new file mode 100644 index 0000000..cfe785d --- /dev/null +++ b/.gitea/workflows/registry.yml @@ -0,0 +1,51 @@ +name: release-tag +on: + push: + branches: + - 'main' +jobs: + release-image: + runs-on: ubuntu-latest + env: + DOCKER_ORG: sendnrw + DOCKER_LATEST: latest + RUNNER_TOOL_CACHE: /toolcache + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + + - name: Set up Docker BuildX + uses: docker/setup-buildx-action@v2 + with: # replace it with your local IP + config-inline: | + [registry."git.send.nrw"] + http = true + insecure = true + + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + registry: git.send.nrw # replace it with your local IP + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - name: Get Meta + id: meta + run: | + echo REPO_NAME=$(echo ${GITHUB_REPOSITORY} | awk -F"/" '{print $2}') >> $GITHUB_OUTPUT + echo REPO_VERSION=$(git describe --tags --always | sed 's/^v//') >> $GITHUB_OUTPUT + + - name: Build and push + uses: docker/build-push-action@v4 + with: + context: . + file: ./Dockerfile + platforms: | + linux/amd64 + push: true + tags: | # replace it with your local IP and tags + git.send.nrw/${{ env.DOCKER_ORG }}/${{ steps.meta.outputs.REPO_NAME }}:${{ steps.meta.outputs.REPO_VERSION }} + git.send.nrw/${{ env.DOCKER_ORG }}/${{ steps.meta.outputs.REPO_NAME }}:${{ env.DOCKER_LATEST }} \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..cbf7396 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,32 @@ +FROM golang:1.24 AS builder + +WORKDIR /src + +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . + +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \ + go build -trimpath -ldflags="-s -w" -o /out/eventcollector . + +FROM debian:trixie-slim + +ENV LISTEN_ADDR=:8080 +ENV TZ=Europe/Berlin + +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates tzdata wget \ + && rm -rf /var/lib/apt/lists/* \ + && groupadd --system --gid 10001 app \ + && useradd --system --uid 10001 --gid 10001 --no-create-home --home-dir /nonexistent --shell /usr/sbin/nologin app + +WORKDIR /app + +COPY --from=builder /out/eventcollector /app/eventcollector + +USER 10001:10001 + +EXPOSE 8080 + +ENTRYPOINT ["/app/eventcollector"] \ No newline at end of file diff --git a/compose.yml b/compose.yml new file mode 100644 index 0000000..0c6b2a4 --- /dev/null +++ b/compose.yml @@ -0,0 +1,118 @@ +services: + mariadb: + image: mariadb:11.8 + container_name: siem-mariadb + restart: unless-stopped + env_file: + - .env + environment: + MARIADB_DATABASE: ${MARIADB_DATABASE} + MARIADB_USER: ${MARIADB_USER} + MARIADB_PASSWORD: ${MARIADB_PASSWORD} + MARIADB_ROOT_PASSWORD: ${MARIADB_ROOT_PASSWORD} + TZ: ${TZ} + command: + - --character-set-server=utf8mb4 + - --collation-server=utf8mb4_unicode_ci + - --innodb-buffer-pool-size=512M + - --max-connections=300 + volumes: + - mariadb_data:/var/lib/mysql + - ./deploy/mariadb/init:/docker-entrypoint-initdb.d:ro + healthcheck: + test: ["CMD-SHELL", "mariadb-admin ping -h 127.0.0.1 -u root -p$$MARIADB_ROOT_PASSWORD --silent"] + interval: 20s + timeout: 5s + retries: 10 + start_period: 30s + + siem-backend: + build: + context: . + dockerfile: Dockerfile + image: siem-backend:latest + container_name: siem-backend + restart: unless-stopped + env_file: + - .env + environment: + LISTEN_ADDR: ${LISTEN_ADDR} + DB_DSN: ${DB_DSN} + DB_MAX_OPEN_CONNS: ${DB_MAX_OPEN_CONNS} + DB_MAX_IDLE_CONNS: ${DB_MAX_IDLE_CONNS} + DB_CONN_MAX_LIFETIME: ${DB_CONN_MAX_LIFETIME} + DB_CONN_MAX_IDLE_TIME: ${DB_CONN_MAX_IDLE_TIME} + MAX_BODY_BYTES: ${MAX_BODY_BYTES} + HTTP_READ_TIMEOUT: ${HTTP_READ_TIMEOUT} + HTTP_WRITE_TIMEOUT: ${HTTP_WRITE_TIMEOUT} + HTTP_IDLE_TIMEOUT: ${HTTP_IDLE_TIMEOUT} + DETECTION_INTERVAL: ${DETECTION_INTERVAL} + OFFLINE_AFTER: ${OFFLINE_AFTER} + FAILED_LOGON_WINDOW: ${FAILED_LOGON_WINDOW} + FAILED_LOGON_THRESHOLD: ${FAILED_LOGON_THRESHOLD} + REBOOT_WINDOW: ${REBOOT_WINDOW} + REBOOT_THRESHOLD: ${REBOOT_THRESHOLD} + PASSWORD_SPRAY_WINDOW: ${PASSWORD_SPRAY_WINDOW} + PASSWORD_SPRAY_MIN_USERS: ${PASSWORD_SPRAY_MIN_USERS} + PASSWORD_SPRAY_MIN_ATTEMPTS: ${PASSWORD_SPRAY_MIN_ATTEMPTS} + SUCCESS_AFTER_FAILURE_WINDOW: ${SUCCESS_AFTER_FAILURE_WINDOW} + NEW_SOURCE_IP_LOOKBACK: ${NEW_SOURCE_IP_LOOKBACK} + NEW_SOURCE_IP_WINDOW: ${NEW_SOURCE_IP_WINDOW} + DETECTIONS_LIMIT: ${DETECTIONS_LIMIT} + TZ: ${TZ} + depends_on: + mariadb: + condition: service_healthy + ports: + - "8080:8080" + healthcheck: + test: ["CMD-SHELL", "wget -qO- http://127.0.0.1:8080/healthz >/dev/null 2>&1 || exit 1"] + interval: 30s + timeout: 5s + retries: 5 + start_period: 20s + + prometheus: + image: prom/prometheus:latest + container_name: siem-prometheus + restart: unless-stopped + command: + - --config.file=/etc/prometheus/prometheus.yml + - --storage.tsdb.path=/prometheus + - --storage.tsdb.retention.time=30d + - --web.enable-lifecycle + depends_on: + siem-backend: + condition: service_healthy + volumes: + - ./deploy/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro + - ./deploy/prometheus/rules:/etc/prometheus/rules:ro + - prometheus_data:/prometheus + ports: + - "9090:9090" + + grafana: + image: grafana/grafana:latest + container_name: siem-grafana + restart: unless-stopped + env_file: + - .env + environment: + GF_SECURITY_ADMIN_USER: ${GRAFANA_ADMIN_USER} + GF_SECURITY_ADMIN_PASSWORD: ${GRAFANA_ADMIN_PASSWORD} + GF_USERS_ALLOW_SIGN_UP: "false" + GF_SERVER_ROOT_URL: http://localhost:3000 + TZ: ${TZ} + depends_on: + - prometheus + volumes: + - grafana_data:/var/lib/grafana + - ./deploy/grafana/provisioning:/etc/grafana/provisioning:ro + - ./deploy/grafana/dashboards:/var/lib/grafana/dashboards:ro + ports: + - "3000:3000" + +volumes: + mariadb_data: + prometheus_data: + grafana_data: \ No newline at end of file diff --git a/deploy/grafana/provisioning/dashboards/dashboards.yml b/deploy/grafana/provisioning/dashboards/dashboards.yml new file mode 100644 index 0000000..e95f332 --- /dev/null +++ b/deploy/grafana/provisioning/dashboards/dashboards.yml @@ -0,0 +1,12 @@ +apiVersion: 1 + +providers: + - name: SIEM Dashboards + orgId: 1 + folder: SIEM + type: file + disableDeletion: false + editable: true + updateIntervalSeconds: 30 + options: + path: /var/lib/grafana/dashboards \ No newline at end of file diff --git a/deploy/grafana/provisioning/dashboards/siem-overview.json b/deploy/grafana/provisioning/dashboards/siem-overview.json new file mode 100644 index 0000000..433b4f5 --- /dev/null +++ b/deploy/grafana/provisioning/dashboards/siem-overview.json @@ -0,0 +1,76 @@ +{ + "annotations": { + "list": [] + }, + "editable": true, + "panels": [ + { + "type": "stat", + "title": "Active Agents", + "gridPos": { "h": 4, "w": 6, "x": 0, "y": 0 }, + "targets": [ + { + "expr": "eventcollector_active_agents", + "refId": "A" + } + ] + }, + { + "type": "stat", + "title": "High Detections (5m)", + "gridPos": { "h": 4, "w": 6, "x": 6, "y": 0 }, + "targets": [ + { + "expr": "increase(eventcollector_detection_hits_total{severity=\"high\"}[5m])", + "refId": "A" + } + ] + }, + { + "type": "timeseries", + "title": "HTTP Requests", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 4 }, + "targets": [ + { + "expr": "rate(eventcollector_http_requests_total[5m])", + "legendFormat": "{{path}} {{status}}", + "refId": "A" + } + ] + }, + { + "type": "timeseries", + "title": "Detection Hits", + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 4 }, + "targets": [ + { + "expr": "increase(eventcollector_detection_hits_total[5m])", + "legendFormat": "{{rule}} {{severity}}", + "refId": "A" + } + ] + }, + { + "type": "timeseries", + "title": "Ingested Events", + "gridPos": { "h": 8, "w": 24, "x": 0, "y": 12 }, + "targets": [ + { + "expr": "rate(eventcollector_ingest_events_total[5m])", + "legendFormat": "{{channel}} {{event_id}}", + "refId": "A" + } + ] + } + ], + "schemaVersion": 39, + "style": "dark", + "tags": ["siem"], + "templating": { "list": [] }, + "time": { + "from": "now-6h", + "to": "now" + }, + "title": "SIEM Overview", + "version": 1 +} \ No newline at end of file diff --git a/deploy/grafana/provisioning/datasources/datasource.yml b/deploy/grafana/provisioning/datasources/datasource.yml new file mode 100644 index 0000000..fe7fa20 --- /dev/null +++ b/deploy/grafana/provisioning/datasources/datasource.yml @@ -0,0 +1,10 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + uid: prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true + editable: true \ No newline at end of file diff --git a/deploy/mariadb/init/001-schema.sql b/deploy/mariadb/init/001-schema.sql new file mode 100644 index 0000000..2789d7e --- /dev/null +++ b/deploy/mariadb/init/001-schema.sql @@ -0,0 +1,88 @@ +CREATE DATABASE IF NOT EXISTS eventcollector + CHARACTER SET utf8mb4 + COLLATE utf8mb4_unicode_ci; + +USE eventcollector; + +CREATE TABLE IF NOT EXISTS agents ( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + hostname VARCHAR(255) NOT NULL, + api_key_hash CHAR(64) NOT NULL, + first_seen DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + last_seen DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + last_ip VARCHAR(64) NOT NULL DEFAULT '', + is_enabled TINYINT(1) NOT NULL DEFAULT 1, + PRIMARY KEY (id), + UNIQUE KEY ux_agents_hostname (hostname), + KEY ix_agents_last_seen (last_seen) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + +CREATE TABLE IF NOT EXISTS event_logs ( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + agent_id BIGINT UNSIGNED NOT NULL, + hostname VARCHAR(255) NOT NULL, + channel_name VARCHAR(128) NOT NULL, + event_id INT UNSIGNED NOT NULL, + source VARCHAR(255) NOT NULL, + computer VARCHAR(255) NOT NULL DEFAULT '', + provider_name VARCHAR(255) NOT NULL DEFAULT '', + level_value INT UNSIGNED NOT NULL DEFAULT 0, + task_value INT UNSIGNED NOT NULL DEFAULT 0, + opcode_value INT UNSIGNED NOT NULL DEFAULT 0, + keywords VARCHAR(255) NOT NULL DEFAULT '', + target_user VARCHAR(255) NOT NULL DEFAULT '', + target_domain VARCHAR(255) NOT NULL DEFAULT '', + subject_user VARCHAR(255) NOT NULL DEFAULT '', + subject_domain VARCHAR(255) NOT NULL DEFAULT '', + workstation VARCHAR(255) NOT NULL DEFAULT '', + src_ip VARCHAR(64) NOT NULL DEFAULT '', + src_port VARCHAR(32) NOT NULL DEFAULT '', + logon_type VARCHAR(32) NOT NULL DEFAULT '', + process_name VARCHAR(512) NOT NULL DEFAULT '', + authentication_package VARCHAR(128) NOT NULL DEFAULT '', + logon_process VARCHAR(128) NOT NULL DEFAULT '', + status_text VARCHAR(64) NOT NULL DEFAULT '', + sub_status_text VARCHAR(64) NOT NULL DEFAULT '', + failure_reason VARCHAR(512) NOT NULL DEFAULT '', + ts DATETIME(6) NOT NULL, + received_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + msg LONGTEXT NOT NULL, + msg_sha256 CHAR(64) NOT NULL, + PRIMARY KEY (id), + KEY ix_event_logs_ts (ts), + KEY ix_event_logs_received_at (received_at), + KEY ix_event_logs_agent_ts (agent_id, ts), + KEY ix_event_logs_eventid_ts (event_id, ts), + KEY ix_event_logs_hostname_ts (hostname, ts), + KEY ix_event_logs_channel_event_ts (channel_name, event_id, ts), + KEY ix_event_logs_target_user_ts (target_user, ts), + KEY ix_event_logs_src_ip_ts (src_ip, ts), + KEY ix_event_logs_target_user_src_ip_ts (target_user, src_ip, ts), + KEY ix_event_logs_eventid_srcip_ts (event_id, src_ip, ts), + KEY ix_event_logs_eventid_targetuser_ts (event_id, target_user, ts), + KEY ix_event_logs_eventid_logontype_ts (event_id, logon_type, ts), + CONSTRAINT fk_event_logs_agent + FOREIGN KEY (agent_id) REFERENCES agents(id) + ON DELETE RESTRICT + ON UPDATE RESTRICT +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + +CREATE TABLE IF NOT EXISTS detections ( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + rule_name VARCHAR(128) NOT NULL, + severity VARCHAR(32) NOT NULL, + hostname VARCHAR(255) NOT NULL, + channel_name VARCHAR(128) NOT NULL DEFAULT '', + event_id INT UNSIGNED NOT NULL DEFAULT 0, + score DOUBLE NOT NULL DEFAULT 0, + window_start DATETIME(6) NOT NULL, + window_end DATETIME(6) NOT NULL, + summary VARCHAR(512) NOT NULL, + details_json JSON NOT NULL, + created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + PRIMARY KEY (id), + UNIQUE KEY ux_detection_dedupe (rule_name, hostname, channel_name, event_id, window_start, window_end), + KEY ix_detections_created (created_at), + KEY ix_detections_rule_host_time (rule_name, hostname, created_at), + KEY ix_detections_severity_time (severity, created_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; \ No newline at end of file diff --git a/deploy/prometheus/prometheus.yml b/deploy/prometheus/prometheus.yml new file mode 100644 index 0000000..8f80b65 --- /dev/null +++ b/deploy/prometheus/prometheus.yml @@ -0,0 +1,18 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + +rule_files: + - /etc/prometheus/rules/*.yml + +scrape_configs: + - job_name: siem-backend + metrics_path: /metrics + static_configs: + - targets: + - siem-backend:8080 + + - job_name: prometheus + static_configs: + - targets: + - localhost:9090 \ No newline at end of file diff --git a/deploy/prometheus/rules/siem-alerts.yml b/deploy/prometheus/rules/siem-alerts.yml new file mode 100644 index 0000000..f9c7fa6 --- /dev/null +++ b/deploy/prometheus/rules/siem-alerts.yml @@ -0,0 +1,38 @@ +groups: + - name: siem-backend + rules: + - alert: SiemBackendDown + expr: up{job="siem-backend"} == 0 + for: 2m + labels: + severity: critical + annotations: + summary: "SIEM backend nicht erreichbar" + description: "Prometheus kann das SIEM-Backend seit mindestens 2 Minuten nicht scrapen." + + - alert: SiemHighDetections + expr: increase(eventcollector_detection_hits_total{severity="high"}[5m]) > 0 + for: 1m + labels: + severity: high + annotations: + summary: "Neue High-Severity Detection" + description: "Es wurde mindestens eine neue High-Severity-Detection in den letzten 5 Minuten erzeugt." + + - alert: SiemRuleErrors + expr: increase(eventcollector_rule_errors_total[5m]) > 0 + for: 1m + labels: + severity: warning + annotations: + summary: "Fehler in Detection-Regeln" + description: "Mindestens eine Detection-Regel hat in den letzten 5 Minuten einen Fehler erzeugt." + + - alert: SiemTooFewActiveAgents + expr: eventcollector_active_agents < 1 + for: 5m + labels: + severity: warning + annotations: + summary: "Zu wenige aktive Agents" + description: "Es wurden weniger aktive Agents erkannt als erwartet." \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..1a2f53c --- /dev/null +++ b/go.mod @@ -0,0 +1,22 @@ +module git.send.nrw/sendnrw/siem-backend + +go 1.26.1 + +require github.com/go-sql-driver/mysql v1.9.3 + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect + golang.org/x/sys v0.35.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect +) + +require ( + filippo.io/edwards25519 v1.1.0 // indirect + github.com/prometheus/client_golang v1.23.2 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..2163bbe --- /dev/null +++ b/go.sum @@ -0,0 +1,25 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo= +github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/main.go b/main.go new file mode 100644 index 0000000..5ec8b66 --- /dev/null +++ b/main.go @@ -0,0 +1,2194 @@ +package main + +import ( + "context" + "crypto/sha256" + "database/sql" + "encoding/hex" + "encoding/json" + "encoding/xml" + "errors" + "fmt" + "html/template" + "io" + "log" + "math" + "net" + "net/http" + "net/url" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const uiTemplates = ` +{{define "header"}} + + + + + + {{.Title}} + + + +
+
SIEM-lite
+ +
+
+{{end}} + +{{define "footer"}} +
+ + +{{end}} + +{{define "dashboard"}} +{{template "header" .}} +

{{.Title}}

+

Stand: {{fmtTime .Now}}

+
+
Agents gesamt
{{.Stats.AgentsTotal}}
+
Agents aktiv
{{.Stats.AgentsActive}}
+
Events 24h
{{.Stats.Events24h}}
+
Detections 24h
{{.Stats.Detections24h}}
+
High Detections 24h
{{.Stats.HighDetections24h}}
+
+ +

Neueste Detections

+ + + {{range .RecentDetections}} + + + + + + + + {{end}} +
ZeitRuleSeverityHostZusammenfassung
{{fmtTime .CreatedAt}}{{.RuleName}}{{.Severity}}{{.Hostname}}{{.Summary}}
+ +

Neueste Events

+ + + {{range .RecentEvents}} + + + + + + + + + + {{end}} +
ZeitHostChannelEventIDUserIPNachricht
{{fmtTime .Time}}{{.Hostname}}{{.Channel}}{{.EventID}}{{if .TargetUser}}{{.TargetUser}}{{else}}{{.SubjectUser}}{{end}}{{.SrcIP}}{{short .Message 120}}
+{{template "footer" .}} +{{end}} + +{{define "detections"}} +{{template "header" .}} +

{{.Title}}

+
+
+
+
+
+
+
+ +
+ + + {{range .Detections}} + + + + + + + + + + {{end}} +
ZeitRuleSeverityHostScoreSummaryEvents
{{fmtTime .CreatedAt}}{{.RuleName}}{{.Severity}}{{.Hostname}}{{printf "%.2f" .Score}}{{.Summary}}anzeigen
+{{template "footer" .}} +{{end}} + +{{define "events"}} +{{template "header" .}} +

{{.Title}}

+
+
+
+
+
+
+
+
+
+
+
+
+
+ +
+ + + + + {{range .Events}} + + + + + + + + + + + + {{end}} +
ZeitHostChannelEventIDTarget UserSubject UserIPWorkstationDetail
{{fmtTime .Time}}{{.Hostname}}{{.Channel}}{{.EventID}}{{.TargetUser}}{{.SubjectUser}}{{.SrcIP}}{{.Workstation}}öffnen
+{{template "footer" .}} +{{end}} + +{{define "event_detail"}} +{{template "header" .}} +

{{.Title}}

+
+
Host
{{.Event.Hostname}}
+
Channel
{{.Event.Channel}}
+
EventID
{{.Event.EventID}}
+
Zeit
{{fmtTime .Event.Time}}
+
Target User
{{.Event.TargetUser}}
+
Subject User
{{.Event.SubjectUser}}
+
Source IP
{{.Event.SrcIP}}
+
Workstation
{{.Event.Workstation}}
+
Logon Type
{{.Event.LogonType}}
+
Process
{{.Event.ProcessName}}
+
Status
{{.Event.StatusText}}
+
SubStatus
{{.Event.SubStatusText}}
+
+ +

Rohes Event XML

+
{{.Event.Message}}
+{{template "footer" .}} +{{end}} +` + +type Config struct { + ListenAddr string + DBDSN string + MaxBodyBytes int64 + HTTPReadTimeout time.Duration + HTTPWriteTimeout time.Duration + HTTPIdleTimeout time.Duration + DBMaxOpenConns int + DBMaxIdleConns int + DBConnMaxLifetime time.Duration + DBConnMaxIdleTime time.Duration + DetectionInterval time.Duration + OfflineAfter time.Duration + FailedLogonWindow time.Duration + FailedLogonThreshold int + RebootWindow time.Duration + RebootThreshold int + PasswordSprayWindow time.Duration + PasswordSprayMinUsers int + PasswordSprayMinAttempts int + SuccessAfterFailureWindow time.Duration + NewSourceIPLookback time.Duration + NewSourceIPWindow time.Duration + DetectionsLimit int + + EnrollmentKey string +} + +type LogPayload struct { + Hostname string `json:"host"` + Channel string `json:"channel"` + EventID uint32 `json:"id"` + Source string `json:"source"` + Time time.Time `json:"ts"` + Message string `json:"msg"` +} + +type NormalizedEvent struct { + Computer string + ProviderName string + LevelValue uint32 + TaskValue uint32 + OpcodeValue uint32 + Keywords string + TargetUser string + TargetDomain string + SubjectUser string + SubjectDomain string + Workstation string + SrcIP string + SrcPort string + LogonType string + ProcessName string + AuthenticationPackage string + LogonProcess string + StatusText string + SubStatusText string + FailureReason string +} + +type Detection struct { + ID uint64 `json:"id"` + RuleName string `json:"rule_name"` + Severity string `json:"severity"` + Hostname string `json:"hostname"` + Channel string `json:"channel"` + EventID uint32 `json:"event_id"` + Score float64 `json:"score"` + WindowStart time.Time `json:"window_start"` + WindowEnd time.Time `json:"window_end"` + Summary string `json:"summary"` + Details json.RawMessage `json:"details_json"` + CreatedAt time.Time `json:"created_at"` +} + +type ingestResponse struct { + Accepted int `json:"accepted"` +} + +type server struct { + db *sql.DB + logger *log.Logger + cfg Config + registry *prometheus.Registry + detector *detector + startTime time.Time + templates *template.Template +} + +type detector struct { + db *sql.DB + cfg Config + logger *log.Logger + + lastSeenGauge *prometheus.GaugeVec + activeAgentsGauge prometheus.Gauge + anomalyScoreGauge *prometheus.GaugeVec + detectionHitsTotal *prometheus.CounterVec + ruleLastRunGauge *prometheus.GaugeVec + ruleRuntimeHist *prometheus.HistogramVec + ruleErrorsTotal *prometheus.CounterVec +} + +type EventRow struct { + ID uint64 + Hostname string + Channel string + EventID uint32 + Source string + Computer string + ProviderName string + TargetUser string + TargetDomain string + SubjectUser string + SubjectDomain string + Workstation string + SrcIP string + SrcPort string + LogonType string + ProcessName string + AuthenticationPackage string + LogonProcess string + StatusText string + SubStatusText string + FailureReason string + Time time.Time + ReceivedAt time.Time + Message string +} + +type DashboardStats struct { + AgentsTotal int + AgentsActive int + Events24h int64 + Detections24h int64 + HighDetections24h int64 +} + +type DashboardPageData struct { + Title string + Now time.Time + Stats DashboardStats + RecentDetections []Detection + RecentEvents []EventRow +} + +type DetectionListPageData struct { + Title string + Now time.Time + Filters map[string]string + Detections []Detection +} + +type EventListPageData struct { + Title string + Now time.Time + Filters map[string]string + Events []EventRow +} + +type EventDetailPageData struct { + Title string + Now time.Time + Event EventRow +} + +var ( + httpRequestsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{Name: "eventcollector_http_requests_total", Help: "Total HTTP requests."}, + []string{"path", "method", "status"}, + ) + httpRequestDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{Name: "eventcollector_http_request_duration_seconds", Help: "HTTP request latency.", Buckets: prometheus.DefBuckets}, + []string{"path", "method", "status"}, + ) + ingestBatchesTotal = prometheus.NewCounter( + prometheus.CounterOpts{Name: "eventcollector_ingest_batches_total", Help: "Total ingested batches."}, + ) + ingestEventsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{Name: "eventcollector_ingest_events_total", Help: "Total ingested events."}, + []string{"channel", "event_id"}, + ) + ingestRejectedTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{Name: "eventcollector_ingest_rejected_total", Help: "Rejected ingest requests."}, + []string{"reason"}, + ) + dbInsertEventsTotal = prometheus.NewCounter( + prometheus.CounterOpts{Name: "eventcollector_db_insert_events_total", Help: "Total inserted events into database."}, + ) + dbInsertFailuresTotal = prometheus.NewCounter( + prometheus.CounterOpts{Name: "eventcollector_db_insert_failures_total", Help: "Failed database insert operations."}, + ) + dbBatchSizeHist = prometheus.NewHistogram( + prometheus.HistogramOpts{Name: "eventcollector_db_batch_size", Help: "Batch sizes written to database.", Buckets: []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000}}, + ) + dbTxDurationHist = prometheus.NewHistogram( + prometheus.HistogramOpts{Name: "eventcollector_db_tx_duration_seconds", Help: "Database transaction duration.", Buckets: prometheus.DefBuckets}, + ) +) + +func main() { + cfg := loadConfig() + logger := log.New(os.Stdout, "", log.LstdFlags|log.Lmicroseconds|log.LUTC) + + db, err := sql.Open("mysql", cfg.DBDSN) + if err != nil { + logger.Fatalf("sql.Open: %v", err) + } + db.SetMaxOpenConns(cfg.DBMaxOpenConns) + db.SetMaxIdleConns(cfg.DBMaxIdleConns) + db.SetConnMaxLifetime(cfg.DBConnMaxLifetime) + db.SetConnMaxIdleTime(cfg.DBConnMaxIdleTime) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := db.PingContext(ctx); err != nil { + logger.Fatalf("db.PingContext: %v", err) + } + + reg := prometheus.NewRegistry() + reg.MustRegister( + httpRequestsTotal, + httpRequestDuration, + ingestBatchesTotal, + ingestEventsTotal, + ingestRejectedTotal, + dbInsertEventsTotal, + dbInsertFailuresTotal, + dbBatchSizeHist, + dbTxDurationHist, + ) + + d := &detector{ + db: db, + cfg: cfg, + logger: logger, + lastSeenGauge: prometheus.NewGaugeVec( + prometheus.GaugeOpts{Name: "eventcollector_agent_last_seen_unixtime", Help: "Unix time when agent was last seen."}, + []string{"host"}, + ), + activeAgentsGauge: prometheus.NewGauge( + prometheus.GaugeOpts{Name: "eventcollector_active_agents", Help: "Number of active agents seen within offline threshold."}, + ), + anomalyScoreGauge: prometheus.NewGaugeVec( + prometheus.GaugeOpts{Name: "eventcollector_anomaly_score", Help: "Current anomaly score per host and rule."}, + []string{"host", "rule"}, + ), + detectionHitsTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{Name: "eventcollector_detection_hits_total", Help: "Total number of detections per rule and severity."}, + []string{"rule", "severity"}, + ), + ruleLastRunGauge: prometheus.NewGaugeVec( + prometheus.GaugeOpts{Name: "eventcollector_rule_last_run_unixtime", Help: "Unix time of the last successful rule run."}, + []string{"rule"}, + ), + ruleRuntimeHist: prometheus.NewHistogramVec( + prometheus.HistogramOpts{Name: "eventcollector_rule_runtime_seconds", Help: "Rule runtime duration.", Buckets: prometheus.DefBuckets}, + []string{"rule"}, + ), + ruleErrorsTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{Name: "eventcollector_rule_errors_total", Help: "Rule execution errors."}, + []string{"rule"}, + ), + } + reg.MustRegister( + d.lastSeenGauge, + d.activeAgentsGauge, + d.anomalyScoreGauge, + d.detectionHitsTotal, + d.ruleLastRunGauge, + d.ruleRuntimeHist, + d.ruleErrorsTotal, + ) + + s := &server{ + db: db, + logger: logger, + cfg: cfg, + registry: reg, + detector: d, + startTime: time.Now().UTC(), + } + + tmpl := template.Must(template.New("ui").Funcs(template.FuncMap{ + "q": url.QueryEscape, + "fmtTime": func(t time.Time) string { + if t.IsZero() { + return "" + } + return t.Local().Format("2006-01-02 15:04:05") + }, + "short": func(s string, n int) string { + if len(s) <= n { + return s + } + return s[:n] + "..." + }, + }).Parse(uiTemplates)) + + s.templates = tmpl + + go s.runDetectionLoop() + + mux := http.NewServeMux() + mux.HandleFunc("/healthz", s.handleHealthz) + mux.HandleFunc("/readyz", s.handleReadyz) + mux.HandleFunc("/ingest", s.handleIngest) + mux.HandleFunc("/detections", s.handleDetections) + mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) + mux.HandleFunc("/ui", s.handleUIIndex) + mux.HandleFunc("/ui/detections", s.handleUIDetections) + mux.HandleFunc("/ui/events", s.handleUIEvents) + mux.HandleFunc("/ui/event", s.handleUIEventDetail) + + httpSrv := &http.Server{ + Addr: cfg.ListenAddr, + Handler: metricsMiddleware(logger, recoveryMiddleware(mux)), + ReadTimeout: cfg.HTTPReadTimeout, + WriteTimeout: cfg.HTTPWriteTimeout, + IdleTimeout: cfg.HTTPIdleTimeout, + } + + go func() { + logger.Printf("listening on %s", cfg.ListenAddr) + if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Fatalf("ListenAndServe: %v", err) + } + }() + + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + <-stop + + logger.Println("shutdown requested") + + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second) + defer shutdownCancel() + + if err := httpSrv.Shutdown(shutdownCtx); err != nil { + logger.Printf("http shutdown error: %v", err) + } + if err := db.Close(); err != nil { + logger.Printf("db close error: %v", err) + } +} + +func (s *server) handleUIIndex(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + stats, err := s.getDashboardStats(ctx) + if err != nil { + s.logger.Printf("dashboard stats: %v", err) + writeError(w, http.StatusInternalServerError, "internal error") + return + } + + dets, err := s.listDetections(ctx, "", "", "", 20) + if err != nil { + s.logger.Printf("dashboard detections: %v", err) + writeError(w, http.StatusInternalServerError, "internal error") + return + } + + events, err := s.listEvents(ctx, EventFilter{Limit: 20}) + if err != nil { + s.logger.Printf("dashboard events: %v", err) + writeError(w, http.StatusInternalServerError, "internal error") + return + } + + data := DashboardPageData{ + Title: "SIEM-lite Dashboard", + Now: time.Now(), + Stats: stats, + RecentDetections: dets, + RecentEvents: events, + } + + s.renderTemplate(w, "dashboard", data) +} + +func (s *server) handleUIDetections(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + filters := map[string]string{ + "host": strings.TrimSpace(r.URL.Query().Get("host")), + "rule": strings.TrimSpace(r.URL.Query().Get("rule")), + "severity": strings.TrimSpace(r.URL.Query().Get("severity")), + "limit": strings.TrimSpace(r.URL.Query().Get("limit")), + } + + limit := s.cfg.DetectionsLimit + if filters["limit"] != "" { + if n, err := strconv.Atoi(filters["limit"]); err == nil && n > 0 && n <= 500 { + limit = n + } + } + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + items, err := s.listDetections(ctx, filters["host"], filters["rule"], filters["severity"], limit) + if err != nil { + s.logger.Printf("ui detections: %v", err) + writeError(w, http.StatusInternalServerError, "internal error") + return + } + + data := DetectionListPageData{ + Title: "Detections", + Now: time.Now(), + Filters: filters, + Detections: items, + } + + s.renderTemplate(w, "detections", data) +} + +func (s *server) handleUIEvents(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + filter := EventFilter{ + Host: strings.TrimSpace(r.URL.Query().Get("host")), + Channel: strings.TrimSpace(r.URL.Query().Get("channel")), + Rule: strings.TrimSpace(r.URL.Query().Get("rule")), + User: strings.TrimSpace(r.URL.Query().Get("user")), + SrcIP: strings.TrimSpace(r.URL.Query().Get("src_ip")), + Severity: strings.TrimSpace(r.URL.Query().Get("severity")), + TimeFrom: strings.TrimSpace(r.URL.Query().Get("from")), + TimeTo: strings.TrimSpace(r.URL.Query().Get("to")), + Limit: 100, + } + + if v := strings.TrimSpace(r.URL.Query().Get("event_id")); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + filter.EventID = uint32(n) + } + } + + if v := strings.TrimSpace(r.URL.Query().Get("limit")); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 1000 { + filter.Limit = n + } + } + + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + events, err := s.listEvents(ctx, filter) + if err != nil { + s.logger.Printf("ui events: %v", err) + writeError(w, http.StatusInternalServerError, "internal error") + return + } + + filters := map[string]string{ + "host": filter.Host, + "channel": filter.Channel, + "rule": filter.Rule, + "user": filter.User, + "src_ip": filter.SrcIP, + "severity": filter.Severity, + "from": filter.TimeFrom, + "to": filter.TimeTo, + "limit": strconv.Itoa(filter.Limit), + "event_id": func() string { + if filter.EventID == 0 { + return "" + } + return strconv.Itoa(int(filter.EventID)) + }(), + } + + data := EventListPageData{ + Title: "Events", + Now: time.Now(), + Filters: filters, + Events: events, + } + + s.renderTemplate(w, "events", data) +} + +func (s *server) handleUIEventDetail(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + idStr := strings.TrimSpace(r.URL.Query().Get("id")) + if idStr == "" { + writeError(w, http.StatusBadRequest, "missing id") + return + } + + id, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid id") + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + ev, err := s.getEventByID(ctx, id) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + http.NotFound(w, r) + return + } + s.logger.Printf("ui event detail: %v", err) + writeError(w, http.StatusInternalServerError, "internal error") + return + } + + data := EventDetailPageData{ + Title: "Event Detail", + Now: time.Now(), + Event: ev, + } + s.renderTemplate(w, "event_detail", data) +} + +func (s *server) renderTemplate(w http.ResponseWriter, name string, data any) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + if err := s.templates.ExecuteTemplate(w, name, data); err != nil { + s.logger.Printf("render template %s: %v", name, err) + http.Error(w, "template error", http.StatusInternalServerError) + } +} + +type EventFilter struct { + Host string + Channel string + Rule string + User string + SrcIP string + Severity string + TimeFrom string + TimeTo string + EventID uint32 + Limit int +} + +func (s *server) getDashboardStats(ctx context.Context) (DashboardStats, error) { + var stats DashboardStats + + if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM agents WHERE is_enabled = 1`).Scan(&stats.AgentsTotal); err != nil { + return stats, err + } + if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM agents WHERE is_enabled = 1 AND last_seen >= ?`, time.Now().UTC().Add(-s.cfg.OfflineAfter)).Scan(&stats.AgentsActive); err != nil { + return stats, err + } + if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM event_logs WHERE ts >= ?`, time.Now().UTC().Add(-24*time.Hour)).Scan(&stats.Events24h); err != nil { + return stats, err + } + if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM detections WHERE created_at >= ?`, time.Now().UTC().Add(-24*time.Hour)).Scan(&stats.Detections24h); err != nil { + return stats, err + } + if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM detections WHERE created_at >= ? AND severity = 'high'`, time.Now().UTC().Add(-24*time.Hour)).Scan(&stats.HighDetections24h); err != nil { + return stats, err + } + + return stats, nil +} + +func (s *server) listEvents(ctx context.Context, f EventFilter) ([]EventRow, error) { + if f.Limit <= 0 || f.Limit > 1000 { + f.Limit = 100 + } + + query := ` +SELECT id, hostname, channel_name, event_id, source, computer, provider_name, + target_user, target_domain, subject_user, subject_domain, + workstation, src_ip, src_port, logon_type, process_name, + authentication_package, logon_process, status_text, sub_status_text, + failure_reason, ts, received_at, msg +FROM event_logs +WHERE 1=1 +` + + args := make([]any, 0, 16) + + if f.Host != "" { + query += ` AND hostname = ?` + args = append(args, f.Host) + } + if f.Channel != "" { + query += ` AND channel_name = ?` + args = append(args, f.Channel) + } + if f.EventID != 0 { + query += ` AND event_id = ?` + args = append(args, f.EventID) + } + if f.User != "" { + query += ` AND (target_user = ? OR subject_user = ?)` + args = append(args, f.User, f.User) + } + if f.SrcIP != "" { + query += ` AND src_ip = ?` + args = append(args, f.SrcIP) + } + if f.TimeFrom != "" { + if t, err := parseUIRFC3339(f.TimeFrom); err == nil { + query += ` AND ts >= ?` + args = append(args, t) + } + } + if f.TimeTo != "" { + if t, err := parseUIRFC3339(f.TimeTo); err == nil { + query += ` AND ts <= ?` + args = append(args, t) + } + } + + if f.Rule != "" || f.Severity != "" { + query += ` AND EXISTS ( + SELECT 1 + FROM detections d + WHERE d.hostname = event_logs.hostname + AND event_logs.ts >= d.window_start + AND event_logs.ts <= d.window_end + ` + if f.Rule != "" { + query += ` AND d.rule_name = ?` + args = append(args, f.Rule) + } + if f.Severity != "" { + query += ` AND d.severity = ?` + args = append(args, f.Severity) + } + query += ` )` + } + + query += ` ORDER BY ts DESC LIMIT ?` + args = append(args, f.Limit) + + rows, err := s.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var out []EventRow + for rows.Next() { + var ev EventRow + if err := rows.Scan( + &ev.ID, &ev.Hostname, &ev.Channel, &ev.EventID, &ev.Source, + &ev.Computer, &ev.ProviderName, + &ev.TargetUser, &ev.TargetDomain, &ev.SubjectUser, &ev.SubjectDomain, + &ev.Workstation, &ev.SrcIP, &ev.SrcPort, &ev.LogonType, &ev.ProcessName, + &ev.AuthenticationPackage, &ev.LogonProcess, &ev.StatusText, &ev.SubStatusText, + &ev.FailureReason, &ev.Time, &ev.ReceivedAt, &ev.Message, + ); err != nil { + return nil, err + } + out = append(out, ev) + } + + return out, rows.Err() +} + +func (s *server) getEventByID(ctx context.Context, id uint64) (EventRow, error) { + const q = ` +SELECT id, hostname, channel_name, event_id, source, computer, provider_name, + target_user, target_domain, subject_user, subject_domain, + workstation, src_ip, src_port, logon_type, process_name, + authentication_package, logon_process, status_text, sub_status_text, + failure_reason, ts, received_at, msg +FROM event_logs +WHERE id = ? +LIMIT 1 +` + + var ev EventRow + err := s.db.QueryRowContext(ctx, q, id).Scan( + &ev.ID, &ev.Hostname, &ev.Channel, &ev.EventID, &ev.Source, + &ev.Computer, &ev.ProviderName, + &ev.TargetUser, &ev.TargetDomain, &ev.SubjectUser, &ev.SubjectDomain, + &ev.Workstation, &ev.SrcIP, &ev.SrcPort, &ev.LogonType, &ev.ProcessName, + &ev.AuthenticationPackage, &ev.LogonProcess, &ev.StatusText, &ev.SubStatusText, + &ev.FailureReason, &ev.Time, &ev.ReceivedAt, &ev.Message, + ) + return ev, err +} + +func parseUIRFC3339(v string) (time.Time, error) { + return time.Parse(time.RFC3339, strings.TrimSpace(v)) +} + +func loadConfig() Config { + return Config{ + ListenAddr: getenv("LISTEN_ADDR", ":8080"), + DBDSN: mustGetenv("DB_DSN"), + MaxBodyBytes: getenvInt64("MAX_BODY_BYTES", 10*1024*1024), + HTTPReadTimeout: getenvDuration("HTTP_READ_TIMEOUT", 15*time.Second), + HTTPWriteTimeout: getenvDuration("HTTP_WRITE_TIMEOUT", 30*time.Second), + HTTPIdleTimeout: getenvDuration("HTTP_IDLE_TIMEOUT", 60*time.Second), + DBMaxOpenConns: getenvInt("DB_MAX_OPEN_CONNS", 50), + DBMaxIdleConns: getenvInt("DB_MAX_IDLE_CONNS", 25), + DBConnMaxLifetime: getenvDuration("DB_CONN_MAX_LIFETIME", 3*time.Minute), + DBConnMaxIdleTime: getenvDuration("DB_CONN_MAX_IDLE_TIME", 1*time.Minute), + DetectionInterval: getenvDuration("DETECTION_INTERVAL", 1*time.Minute), + OfflineAfter: getenvDuration("OFFLINE_AFTER", 10*time.Minute), + FailedLogonWindow: getenvDuration("FAILED_LOGON_WINDOW", 5*time.Minute), + FailedLogonThreshold: getenvInt("FAILED_LOGON_THRESHOLD", 25), + RebootWindow: getenvDuration("REBOOT_WINDOW", 15*time.Minute), + RebootThreshold: getenvInt("REBOOT_THRESHOLD", 3), + PasswordSprayWindow: getenvDuration("PASSWORD_SPRAY_WINDOW", 5*time.Minute), + PasswordSprayMinUsers: getenvInt("PASSWORD_SPRAY_MIN_USERS", 5), + PasswordSprayMinAttempts: getenvInt("PASSWORD_SPRAY_MIN_ATTEMPTS", 15), + SuccessAfterFailureWindow: getenvDuration("SUCCESS_AFTER_FAILURE_WINDOW", 10*time.Minute), + NewSourceIPLookback: getenvDuration("NEW_SOURCE_IP_LOOKBACK", 30*24*time.Hour), + NewSourceIPWindow: getenvDuration("NEW_SOURCE_IP_WINDOW", 10*time.Minute), + DetectionsLimit: getenvInt("DETECTIONS_LIMIT", 100), + + EnrollmentKey: mustGetenv("ENROLLMENT_KEY"), + } +} + +func (s *server) handleHealthz(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + ingestRejectedTotal.WithLabelValues("method_not_allowed").Inc() + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "status": "ok", + "uptime_sec": int(time.Since(s.startTime).Seconds()), + }) +} + +func (s *server) handleReadyz(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second) + defer cancel() + if err := s.db.PingContext(ctx); err != nil { + writeError(w, http.StatusServiceUnavailable, "database not ready") + return + } + writeJSON(w, http.StatusOK, map[string]string{"status": "ready"}) +} + +func (s *server) handleIngest(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + ingestRejectedTotal.WithLabelValues("method_not_allowed").Inc() + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + apiKey := strings.TrimSpace(r.Header.Get("X-API-Key")) + enrollmentKey := strings.TrimSpace(r.Header.Get("X-Enrollment-Key")) + if apiKey == "" { + ingestRejectedTotal.WithLabelValues("missing_api_key").Inc() + writeError(w, http.StatusUnauthorized, "missing api key") + return + } + + r.Body = http.MaxBytesReader(w, r.Body, s.cfg.MaxBodyBytes) + defer r.Body.Close() + + dec := json.NewDecoder(r.Body) + dec.DisallowUnknownFields() + + var batch []LogPayload + if err := dec.Decode(&batch); err != nil { + ingestRejectedTotal.WithLabelValues("invalid_json").Inc() + writeError(w, http.StatusBadRequest, "invalid json") + return + } + + if len(batch) == 0 { + ingestRejectedTotal.WithLabelValues("empty_batch").Inc() + writeError(w, http.StatusBadRequest, "empty batch") + return + } + if len(batch) > 1000 { + ingestRejectedTotal.WithLabelValues("batch_too_large").Inc() + writeError(w, http.StatusBadRequest, "batch too large") + return + } + + for i := range batch { + if err := validatePayload(&batch[i]); err != nil { + ingestRejectedTotal.WithLabelValues("invalid_payload").Inc() + writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid payload at index %d: %v", i, err)) + return + } + } + + hostname := batch[0].Hostname + for i := 1; i < len(batch); i++ { + if batch[i].Hostname != hostname { + ingestRejectedTotal.WithLabelValues("mixed_hostnames").Inc() + writeError(w, http.StatusBadRequest, "all events in a batch must use the same hostname") + return + } + } + + ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second) + defer cancel() + + agentID, err := s.authenticateTouchOrEnrollAgent( + ctx, + hostname, + apiKey, + enrollmentKey, + clientIP(r.RemoteAddr), + ) + if err != nil { + if errors.Is(err, errUnauthorized) { + ingestRejectedTotal.WithLabelValues("unauthorized").Inc() + writeError(w, http.StatusUnauthorized, "invalid api key or hostname") + return + } + ingestRejectedTotal.WithLabelValues("auth_error").Inc() + s.logger.Printf("authenticate agent: %v", err) + writeError(w, http.StatusInternalServerError, "internal error") + return + } + + if err := s.insertBatch(ctx, agentID, batch); err != nil { + dbInsertFailuresTotal.Inc() + s.logger.Printf("insert batch: %v", err) + writeError(w, http.StatusInternalServerError, "internal error") + return + } + + ingestBatchesTotal.Inc() + for _, item := range batch { + ingestEventsTotal.WithLabelValues(item.Channel, strconv.FormatUint(uint64(item.EventID), 10)).Inc() + } + + writeJSON(w, http.StatusAccepted, ingestResponse{Accepted: len(batch)}) +} + +func (s *server) handleDetections(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + limit := s.cfg.DetectionsLimit + if v := strings.TrimSpace(r.URL.Query().Get("limit")); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 500 { + limit = n + } + } + + host := strings.TrimSpace(r.URL.Query().Get("host")) + rule := strings.TrimSpace(r.URL.Query().Get("rule")) + severity := strings.TrimSpace(r.URL.Query().Get("severity")) + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + items, err := s.listDetections(ctx, host, rule, severity, limit) + if err != nil { + s.logger.Printf("list detections: %v", err) + writeError(w, http.StatusInternalServerError, "internal error") + return + } + + writeJSON(w, http.StatusOK, items) +} + +func (s *server) authenticateTouchOrEnrollAgent(ctx context.Context, hostname, apiKey, enrollmentKey, remoteIP string) (uint64, error) { + const q = ` +SELECT id, api_key_hash, is_enabled +FROM agents +WHERE hostname = ? +LIMIT 1 +` + + var id uint64 + var storedHash string + var enabled bool + + err := s.db.QueryRowContext(ctx, q, hostname).Scan(&id, &storedHash, &enabled) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + if enrollmentKey == "" { + return 0, errUnauthorized + } + if !constantTimeEqual(sha256Hex(enrollmentKey), sha256Hex(s.cfg.EnrollmentKey)) { + return 0, errUnauthorized + } + return s.enrollNewAgent(ctx, hostname, apiKey, remoteIP) + } + return 0, err + } + + if !enabled { + return 0, errUnauthorized + } + + if !constantTimeEqual(strings.ToLower(storedHash), sha256Hex(apiKey)) { + return 0, errUnauthorized + } + + const upd = ` +UPDATE agents +SET last_seen = CURRENT_TIMESTAMP(6), last_ip = ? +WHERE id = ? +` + if _, err := s.db.ExecContext(ctx, upd, remoteIP, id); err != nil { + return 0, err + } + + return id, nil +} + +func (s *server) enrollNewAgent(ctx context.Context, hostname, apiKey, remoteIP string) (uint64, error) { + if strings.TrimSpace(hostname) == "" { + return 0, errUnauthorized + } + if strings.TrimSpace(apiKey) == "" { + return 0, errUnauthorized + } + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return 0, err + } + defer func() { _ = tx.Rollback() }() + + const ins = ` +INSERT INTO agents (hostname, api_key_hash, first_seen, last_seen, last_ip, is_enabled) +VALUES (?, ?, CURRENT_TIMESTAMP(6), CURRENT_TIMESTAMP(6), ?, 1) +` + res, err := tx.ExecContext(ctx, ins, hostname, sha256Hex(apiKey), remoteIP) + if err != nil { + return 0, err + } + + id, err := res.LastInsertId() + if err != nil { + return 0, err + } + + if err := tx.Commit(); err != nil { + return 0, err + } + + s.logger.Printf("agent auto-enrolled: host=%s ip=%s", hostname, remoteIP) + return uint64(id), nil +} + +var errUnauthorized = errors.New("unauthorized") + +func (s *server) insertBatch(ctx context.Context, agentID uint64, batch []LogPayload) error { + start := time.Now() + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer func() { _ = tx.Rollback() }() + + var sb strings.Builder + args := make([]any, 0, len(batch)*28) + + sb.WriteString(` +INSERT INTO event_logs ( + agent_id, hostname, channel_name, event_id, source, computer, provider_name, + level_value, task_value, opcode_value, keywords, + target_user, target_domain, subject_user, subject_domain, + workstation, src_ip, src_port, logon_type, process_name, + authentication_package, logon_process, status_text, sub_status_text, + failure_reason, ts, msg, msg_sha256 +) VALUES +`) + + for i, item := range batch { + if i > 0 { + sb.WriteString(",") + } + sb.WriteString("(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + + norm := NormalizeEventXML(item.Message) + + args = append(args, + agentID, + item.Hostname, + item.Channel, + item.EventID, + item.Source, + norm.Computer, + firstNonEmpty(norm.ProviderName, item.Source), + norm.LevelValue, + norm.TaskValue, + norm.OpcodeValue, + norm.Keywords, + norm.TargetUser, + norm.TargetDomain, + norm.SubjectUser, + norm.SubjectDomain, + norm.Workstation, + norm.SrcIP, + norm.SrcPort, + norm.LogonType, + norm.ProcessName, + norm.AuthenticationPackage, + norm.LogonProcess, + norm.StatusText, + norm.SubStatusText, + norm.FailureReason, + item.Time.UTC(), + item.Message, + sha256Hex(item.Message), + ) + } + + if _, err := tx.ExecContext(ctx, sb.String(), args...); err != nil { + return err + } + if err := tx.Commit(); err != nil { + return err + } + + dbBatchSizeHist.Observe(float64(len(batch))) + dbInsertEventsTotal.Add(float64(len(batch))) + dbTxDurationHist.Observe(time.Since(start).Seconds()) + return nil +} + +func (s *server) listDetections(ctx context.Context, host, rule, severity string, limit int) ([]Detection, error) { + base := ` +SELECT id, rule_name, severity, hostname, channel_name, event_id, score, + window_start, window_end, summary, details_json, created_at +FROM detections +WHERE 1=1 +` + args := make([]any, 0, 4) + + if host != "" { + base += " AND hostname = ?" + args = append(args, host) + } + if rule != "" { + base += " AND rule_name = ?" + args = append(args, rule) + } + if severity != "" { + base += " AND severity = ?" + args = append(args, severity) + } + + base += " ORDER BY created_at DESC LIMIT ?" + args = append(args, limit) + + rows, err := s.db.QueryContext(ctx, base, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var out []Detection + for rows.Next() { + var d Detection + if err := rows.Scan( + &d.ID, &d.RuleName, &d.Severity, &d.Hostname, &d.Channel, + &d.EventID, &d.Score, &d.WindowStart, &d.WindowEnd, + &d.Summary, &d.Details, &d.CreatedAt, + ); err != nil { + return nil, err + } + out = append(out, d) + } + return out, rows.Err() +} + +func (s *server) runDetectionLoop() { + ticker := time.NewTicker(s.cfg.DetectionInterval) + defer ticker.Stop() + + s.runDetectionsOnce() + + for range ticker.C { + s.runDetectionsOnce() + } +} + +func (s *server) runDetectionsOnce() { + ctx, cancel := context.WithTimeout(context.Background(), s.cfg.DetectionInterval) + defer cancel() + + if err := s.detector.updateAgentMetrics(ctx); err != nil { + s.logger.Printf("update agent metrics: %v", err) + } + + rules := []struct { + name string + fn func(context.Context) error + }{ + {"agent_offline", s.detector.runAgentOfflineRule}, + {"failed_logon_spike", s.detector.runFailedLogonSpikeRule}, + {"reboot_spike", s.detector.runRebootSpikeRule}, + {"new_event_id", s.detector.runNewEventIDRule}, + {"password_spray", s.detector.runPasswordSprayRule}, + {"success_after_failures", s.detector.runSuccessAfterFailuresRule}, + {"new_source_ip_for_user", s.detector.runNewSourceIPForUserRule}, + } + + for _, rule := range rules { + start := time.Now() + if err := rule.fn(ctx); err != nil { + s.logger.Printf("rule %s error: %v", rule.name, err) + s.detector.ruleErrorsTotal.WithLabelValues(rule.name).Inc() + continue + } + s.detector.ruleLastRunGauge.WithLabelValues(rule.name).Set(float64(time.Now().Unix())) + s.detector.ruleRuntimeHist.WithLabelValues(rule.name).Observe(time.Since(start).Seconds()) + } +} + +func (d *detector) updateAgentMetrics(ctx context.Context) error { + const q = ` +SELECT hostname, UNIX_TIMESTAMP(last_seen) +FROM agents +WHERE is_enabled = 1 +` + rows, err := d.db.QueryContext(ctx, q) + if err != nil { + return err + } + defer rows.Close() + + active := 0 + now := time.Now().UTC() + + for rows.Next() { + var host string + var lastSeen sql.NullInt64 + if err := rows.Scan(&host, &lastSeen); err != nil { + return err + } + if lastSeen.Valid { + d.lastSeenGauge.WithLabelValues(host).Set(float64(lastSeen.Int64)) + if now.Sub(time.Unix(lastSeen.Int64, 0).UTC()) <= d.cfg.OfflineAfter { + active++ + } + } + } + if err := rows.Err(); err != nil { + return err + } + d.activeAgentsGauge.Set(float64(active)) + return nil +} + +func (d *detector) runAgentOfflineRule(ctx context.Context) error { + windowEnd := time.Now().UTC() + windowStart := windowEnd.Add(-d.cfg.OfflineAfter) + + const q = ` +SELECT hostname, last_seen +FROM agents +WHERE is_enabled = 1 + AND last_seen < ? +` + rows, err := d.db.QueryContext(ctx, q, windowStart) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var host string + var lastSeen time.Time + if err := rows.Scan(&host, &lastSeen); err != nil { + return err + } + minutes := int(windowEnd.Sub(lastSeen).Minutes()) + score := math.Max(1, float64(minutes)/float64(int(d.cfg.OfflineAfter.Minutes()))) + severity := severityFromScore(score, 1.5, 3.0) + + d.anomalyScoreGauge.WithLabelValues(host, "agent_offline").Set(score) + + created, err := d.insertDetection(ctx, Detection{ + RuleName: "agent_offline", + Severity: severity, + Hostname: host, + Score: score, + WindowStart: windowStart, + WindowEnd: windowEnd, + Summary: fmt.Sprintf("Agent %s ist seit %d Minuten offline", host, minutes), + Details: mustJSON(map[string]any{ + "last_seen": lastSeen.UTC().Format(time.RFC3339Nano), + "offline_minutes": minutes, + "offline_after_min": int(d.cfg.OfflineAfter.Minutes()), + }), + }) + if err != nil { + return err + } + if created { + d.detectionHitsTotal.WithLabelValues("agent_offline", severity).Inc() + } + } + return rows.Err() +} + +func (d *detector) runFailedLogonSpikeRule(ctx context.Context) error { + windowEnd := time.Now().UTC() + windowStart := windowEnd.Add(-d.cfg.FailedLogonWindow) + + const q = ` +SELECT hostname, COUNT(*) AS cnt +FROM event_logs +WHERE channel_name = 'Security' + AND event_id = 4625 + AND ts >= ? AND ts < ? +GROUP BY hostname +HAVING COUNT(*) >= ? +` + rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, d.cfg.FailedLogonThreshold) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var host string + var count int + if err := rows.Scan(&host, &count); err != nil { + return err + } + + score := float64(count) / float64(d.cfg.FailedLogonThreshold) + severity := severityFromScore(score, 2.0, 5.0) + d.anomalyScoreGauge.WithLabelValues(host, "failed_logon_spike").Set(score) + + created, err := d.insertDetection(ctx, Detection{ + RuleName: "failed_logon_spike", + Severity: severity, + Hostname: host, + Channel: "Security", + EventID: 4625, + Score: score, + WindowStart: windowStart, + WindowEnd: windowEnd, + Summary: fmt.Sprintf("Host %s hatte %d fehlgeschlagene Logons in %d Minuten", host, count, int(d.cfg.FailedLogonWindow.Minutes())), + Details: mustJSON(map[string]any{ + "count": count, + "threshold": d.cfg.FailedLogonThreshold, + "window_minutes": int(d.cfg.FailedLogonWindow.Minutes()), + "event_id": 4625, + }), + }) + if err != nil { + return err + } + if created { + d.detectionHitsTotal.WithLabelValues("failed_logon_spike", severity).Inc() + } + } + return rows.Err() +} + +func (d *detector) runRebootSpikeRule(ctx context.Context) error { + windowEnd := time.Now().UTC() + windowStart := windowEnd.Add(-d.cfg.RebootWindow) + + const q = ` +SELECT hostname, COUNT(*) AS cnt +FROM event_logs +WHERE channel_name = 'System' + AND event_id IN (1074, 6005, 6006) + AND ts >= ? AND ts < ? +GROUP BY hostname +HAVING COUNT(*) >= ? +` + rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, d.cfg.RebootThreshold) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var host string + var count int + if err := rows.Scan(&host, &count); err != nil { + return err + } + + score := float64(count) / float64(d.cfg.RebootThreshold) + severity := severityFromScore(score, 2.0, 4.0) + d.anomalyScoreGauge.WithLabelValues(host, "reboot_spike").Set(score) + + created, err := d.insertDetection(ctx, Detection{ + RuleName: "reboot_spike", + Severity: severity, + Hostname: host, + Channel: "System", + Score: score, + WindowStart: windowStart, + WindowEnd: windowEnd, + Summary: fmt.Sprintf("Host %s hatte %d Reboot-/Shutdown-Events in %d Minuten", host, count, int(d.cfg.RebootWindow.Minutes())), + Details: mustJSON(map[string]any{ + "count": count, + "threshold": d.cfg.RebootThreshold, + "window_minutes": int(d.cfg.RebootWindow.Minutes()), + "event_ids": []int{1074, 6005, 6006}, + }), + }) + if err != nil { + return err + } + if created { + d.detectionHitsTotal.WithLabelValues("reboot_spike", severity).Inc() + } + } + return rows.Err() +} + +func (d *detector) runNewEventIDRule(ctx context.Context) error { + windowEnd := time.Now().UTC() + windowStart := windowEnd.Add(-d.cfg.DetectionInterval) + + const q = ` +SELECT e.hostname, e.channel_name, e.event_id, COUNT(*) AS cnt +FROM event_logs e +WHERE e.ts >= ? AND e.ts < ? + AND NOT EXISTS ( + SELECT 1 + FROM event_logs old + WHERE old.hostname = e.hostname + AND old.channel_name = e.channel_name + AND old.event_id = e.event_id + AND old.ts < ? + ) +GROUP BY e.hostname, e.channel_name, e.event_id +` + rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, windowStart) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var host, channel string + var eventID uint32 + var count int + if err := rows.Scan(&host, &channel, &eventID, &count); err != nil { + return err + } + + score := 1.0 + math.Log10(float64(count)+1) + severity := "medium" + if count >= 10 { + severity = "high" + } + d.anomalyScoreGauge.WithLabelValues(host, "new_event_id").Set(score) + + created, err := d.insertDetection(ctx, Detection{ + RuleName: "new_event_id", + Severity: severity, + Hostname: host, + Channel: channel, + EventID: eventID, + Score: score, + WindowStart: windowStart, + WindowEnd: windowEnd, + Summary: fmt.Sprintf("Host %s sendet erstmals Event-ID %d im Channel %s", host, eventID, channel), + Details: mustJSON(map[string]any{ + "count": count, + "channel": channel, + "event_id": eventID, + }), + }) + if err != nil { + return err + } + if created { + d.detectionHitsTotal.WithLabelValues("new_event_id", severity).Inc() + } + } + return rows.Err() +} + +func (d *detector) runPasswordSprayRule(ctx context.Context) error { + windowEnd := time.Now().UTC() + windowStart := windowEnd.Add(-d.cfg.PasswordSprayWindow) + + const q = ` +SELECT hostname, src_ip, COUNT(*) AS attempts, COUNT(DISTINCT target_user) AS users +FROM event_logs +WHERE channel_name = 'Security' + AND event_id = 4625 + AND ts >= ? AND ts < ? + AND src_ip <> '' AND src_ip <> '-' AND src_ip <> '::1' AND src_ip <> '127.0.0.1' + AND target_user <> '' AND target_user <> '-' +GROUP BY hostname, src_ip +HAVING COUNT(*) >= ? AND COUNT(DISTINCT target_user) >= ? +` + rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, d.cfg.PasswordSprayMinAttempts, d.cfg.PasswordSprayMinUsers) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var host, srcIP string + var attempts, users int + if err := rows.Scan(&host, &srcIP, &attempts, &users); err != nil { + return err + } + + score := math.Max(float64(attempts)/float64(d.cfg.PasswordSprayMinAttempts), float64(users)/float64(d.cfg.PasswordSprayMinUsers)) + severity := severityFromScore(score, 1.5, 3.0) + d.anomalyScoreGauge.WithLabelValues(host, "password_spray").Set(score) + + created, err := d.insertDetection(ctx, Detection{ + RuleName: "password_spray", + Severity: severity, + Hostname: host, + Channel: "Security", + EventID: 4625, + Score: score, + WindowStart: windowStart, + WindowEnd: windowEnd, + Summary: fmt.Sprintf("Möglicher Password-Spray auf %s von %s: %d Fehlversuche gegen %d Benutzer", host, srcIP, attempts, users), + Details: mustJSON(map[string]any{ + "src_ip": srcIP, + "attempts": attempts, + "distinct_users": users, + "window_minutes": int(d.cfg.PasswordSprayWindow.Minutes()), + "event_id": 4625, + }), + }) + if err != nil { + return err + } + if created { + d.detectionHitsTotal.WithLabelValues("password_spray", severity).Inc() + } + } + return rows.Err() +} + +func (d *detector) runSuccessAfterFailuresRule(ctx context.Context) error { + windowEnd := time.Now().UTC() + windowStart := windowEnd.Add(-d.cfg.SuccessAfterFailureWindow) + + const q = ` +SELECT s.hostname, s.target_user, s.src_ip, COUNT(*) AS success_count +FROM event_logs s +WHERE s.channel_name = 'Security' + AND s.event_id = 4624 + AND s.ts >= ? AND s.ts < ? + AND s.target_user <> '' AND s.target_user <> '-' + AND EXISTS ( + SELECT 1 + FROM event_logs f + WHERE f.hostname = s.hostname + AND f.channel_name = 'Security' + AND f.event_id = 4625 + AND f.target_user = s.target_user + AND ( + (f.src_ip = s.src_ip AND s.src_ip <> '' AND s.src_ip <> '-') + OR (s.src_ip = '' OR s.src_ip = '-' OR f.src_ip = '' OR f.src_ip = '-') + ) + AND f.ts >= DATE_SUB(s.ts, INTERVAL ? SECOND) + AND f.ts < s.ts + ) +GROUP BY s.hostname, s.target_user, s.src_ip +` + rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, int(d.cfg.SuccessAfterFailureWindow.Seconds())) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var host, user, srcIP string + var successCount int + if err := rows.Scan(&host, &user, &srcIP, &successCount); err != nil { + return err + } + + score := 2.0 + math.Log10(float64(successCount)+1) + severity := "high" + d.anomalyScoreGauge.WithLabelValues(host, "success_after_failures").Set(score) + + created, err := d.insertDetection(ctx, Detection{ + RuleName: "success_after_failures", + Severity: severity, + Hostname: host, + Channel: "Security", + EventID: 4624, + Score: score, + WindowStart: windowStart, + WindowEnd: windowEnd, + Summary: fmt.Sprintf("Erfolgreicher Logon nach Fehlversuchen auf %s für Benutzer %s", host, user), + Details: mustJSON(map[string]any{ + "user": user, + "src_ip": srcIP, + "success_count": successCount, + "window_minutes": int(d.cfg.SuccessAfterFailureWindow.Minutes()), + "success_event": 4624, + "failure_event": 4625, + }), + }) + if err != nil { + return err + } + if created { + d.detectionHitsTotal.WithLabelValues("success_after_failures", severity).Inc() + } + } + return rows.Err() +} + +func (d *detector) runNewSourceIPForUserRule(ctx context.Context) error { + windowEnd := time.Now().UTC() + windowStart := windowEnd.Add(-d.cfg.NewSourceIPWindow) + lookbackStart := windowStart.Add(-d.cfg.NewSourceIPLookback) + + const q = ` +SELECT e.hostname, e.target_user, e.src_ip, COUNT(*) AS cnt +FROM event_logs e +WHERE e.channel_name = 'Security' + AND e.event_id = 4624 + AND e.ts >= ? AND e.ts < ? + AND e.target_user <> '' AND e.target_user <> '-' + AND e.src_ip <> '' AND e.src_ip <> '-' AND e.src_ip <> '::1' AND e.src_ip <> '127.0.0.1' + AND NOT EXISTS ( + SELECT 1 + FROM event_logs old + WHERE old.hostname = e.hostname + AND old.channel_name = 'Security' + AND old.event_id = 4624 + AND old.target_user = e.target_user + AND old.src_ip = e.src_ip + AND old.ts >= ? AND old.ts < ? + ) +GROUP BY e.hostname, e.target_user, e.src_ip +` + rows, err := d.db.QueryContext(ctx, q, windowStart, windowEnd, lookbackStart, windowStart) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var host, user, srcIP string + var cnt int + if err := rows.Scan(&host, &user, &srcIP, &cnt); err != nil { + return err + } + + score := 1.5 + math.Log10(float64(cnt)+1) + severity := "medium" + if cnt >= 5 { + severity = "high" + } + d.anomalyScoreGauge.WithLabelValues(host, "new_source_ip_for_user").Set(score) + + created, err := d.insertDetection(ctx, Detection{ + RuleName: "new_source_ip_for_user", + Severity: severity, + Hostname: host, + Channel: "Security", + EventID: 4624, + Score: score, + WindowStart: windowStart, + WindowEnd: windowEnd, + Summary: fmt.Sprintf("Benutzer %s meldet sich auf %s von neuer Quell-IP %s an", user, host, srcIP), + Details: mustJSON(map[string]any{ + "user": user, + "src_ip": srcIP, + "count": cnt, + "window_minutes": int(d.cfg.NewSourceIPWindow.Minutes()), + "lookback_hours": int(d.cfg.NewSourceIPLookback.Hours()), + "event_id": 4624, + }), + }) + if err != nil { + return err + } + if created { + d.detectionHitsTotal.WithLabelValues("new_source_ip_for_user", severity).Inc() + } + } + return rows.Err() +} + +func (d *detector) insertDetection(ctx context.Context, det Detection) (bool, error) { + const q = ` +INSERT IGNORE INTO detections +(rule_name, severity, hostname, channel_name, event_id, score, window_start, window_end, summary, details_json) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +` + res, err := d.db.ExecContext(ctx, q, + det.RuleName, + det.Severity, + det.Hostname, + det.Channel, + det.EventID, + det.Score, + det.WindowStart.UTC(), + det.WindowEnd.UTC(), + det.Summary, + string(det.Details), + ) + if err != nil { + return false, err + } + affected, err := res.RowsAffected() + if err != nil { + return false, err + } + return affected > 0, nil +} + +func NormalizeEventXML(xmlStr string) NormalizedEvent { + var out NormalizedEvent + if strings.TrimSpace(xmlStr) == "" { + return out + } + + dec := xml.NewDecoder(strings.NewReader(xmlStr)) + + var path []string + var currentDataName string + + for { + tok, err := dec.Token() + if err != nil { + if err == io.EOF { + break + } + return out + } + + switch t := tok.(type) { + case xml.StartElement: + path = append(path, t.Name.Local) + + switch t.Name.Local { + case "Provider": + for _, a := range t.Attr { + if a.Name.Local == "Name" { + out.ProviderName = strings.TrimSpace(a.Value) + } + } + case "Data": + currentDataName = "" + for _, a := range t.Attr { + if a.Name.Local == "Name" { + currentDataName = strings.TrimSpace(a.Value) + break + } + } + } + + case xml.EndElement: + if len(path) > 0 { + path = path[:len(path)-1] + } + if t.Name.Local == "Data" { + currentDataName = "" + } + + case xml.CharData: + v := strings.TrimSpace(string(t)) + if v == "" { + continue + } + + if endsWithPath(path, "System", "Computer") { + out.Computer = v + continue + } + if endsWithPath(path, "System", "Level") { + out.LevelValue = parseUint32(v) + continue + } + if endsWithPath(path, "System", "Task") { + out.TaskValue = parseUint32(v) + continue + } + if endsWithPath(path, "System", "Opcode") { + out.OpcodeValue = parseUint32(v) + continue + } + if endsWithPath(path, "System", "Keywords") { + out.Keywords = v + continue + } + + if currentDataName != "" { + switch currentDataName { + case "TargetUserName": + out.TargetUser = v + case "TargetDomainName": + out.TargetDomain = v + case "SubjectUserName": + out.SubjectUser = v + case "SubjectDomainName": + out.SubjectDomain = v + case "WorkstationName": + out.Workstation = v + case "IpAddress": + out.SrcIP = v + case "IpPort": + out.SrcPort = v + case "LogonType": + out.LogonType = v + case "ProcessName": + out.ProcessName = v + case "AuthenticationPackageName": + out.AuthenticationPackage = v + case "LogonProcessName": + out.LogonProcess = v + case "Status": + out.StatusText = v + case "SubStatus": + out.SubStatusText = v + case "FailureReason": + out.FailureReason = v + } + } + } + } + + out.TargetUser = cleanupWinField(out.TargetUser) + out.TargetDomain = cleanupWinField(out.TargetDomain) + out.SubjectUser = cleanupWinField(out.SubjectUser) + out.SubjectDomain = cleanupWinField(out.SubjectDomain) + out.Workstation = cleanupWinField(out.Workstation) + out.SrcIP = cleanupWinField(out.SrcIP) + out.SrcPort = cleanupWinField(out.SrcPort) + out.LogonType = cleanupWinField(out.LogonType) + out.ProcessName = cleanupWinField(out.ProcessName) + out.AuthenticationPackage = cleanupWinField(out.AuthenticationPackage) + out.LogonProcess = cleanupWinField(out.LogonProcess) + out.StatusText = cleanupWinField(out.StatusText) + out.SubStatusText = cleanupWinField(out.SubStatusText) + out.FailureReason = cleanupWinField(out.FailureReason) + out.Computer = cleanupWinField(out.Computer) + out.ProviderName = cleanupWinField(out.ProviderName) + + return out +} + +func endsWithPath(path []string, parts ...string) bool { + if len(path) < len(parts) { + return false + } + start := len(path) - len(parts) + for i := range parts { + if path[start+i] != parts[i] { + return false + } + } + return true +} + +func parseUint32(v string) uint32 { + n, err := strconv.ParseUint(strings.TrimSpace(v), 10, 32) + if err != nil { + return 0 + } + return uint32(n) +} + +func cleanupWinField(v string) string { + v = strings.TrimSpace(v) + if v == "" { + return "" + } + if v == "-" { + return "" + } + return v +} + +func firstNonEmpty(v ...string) string { + for _, s := range v { + if strings.TrimSpace(s) != "" { + return strings.TrimSpace(s) + } + } + return "" +} + +func validatePayload(p *LogPayload) error { + p.Hostname = strings.TrimSpace(p.Hostname) + p.Channel = strings.TrimSpace(p.Channel) + p.Source = strings.TrimSpace(p.Source) + + if p.Hostname == "" { + return errors.New("host is required") + } + if len(p.Hostname) > 255 { + return errors.New("host too long") + } + if p.Channel == "" { + return errors.New("channel is required") + } + if len(p.Channel) > 128 { + return errors.New("channel too long") + } + if p.Source == "" { + return errors.New("source is required") + } + if len(p.Source) > 255 { + return errors.New("source too long") + } + if p.EventID == 0 { + return errors.New("event id is required") + } + if p.Message == "" { + return errors.New("msg is required") + } + if len(p.Message) > 2*1024*1024 { + return errors.New("msg too large") + } + if p.Time.IsZero() { + return errors.New("ts is required") + } + return nil +} + +func severityFromScore(score, medium, high float64) string { + switch { + case score >= high: + return "high" + case score >= medium: + return "medium" + default: + return "low" + } +} + +func mustJSON(v any) json.RawMessage { + b, _ := json.Marshal(v) + return b +} + +func metricsMiddleware(logger *log.Logger, next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + rw := &statusRecorder{ResponseWriter: w, status: 200} + start := time.Now() + next.ServeHTTP(rw, r) + + status := strconv.Itoa(rw.status) + path := r.URL.Path + + httpRequestsTotal.WithLabelValues(path, r.Method, status).Inc() + httpRequestDuration.WithLabelValues(path, r.Method, status).Observe(time.Since(start).Seconds()) + logger.Printf("%s %s remote=%s status=%s dur=%s", r.Method, r.URL.Path, r.RemoteAddr, status, time.Since(start)) + }) +} + +type statusRecorder struct { + http.ResponseWriter + status int +} + +func (r *statusRecorder) WriteHeader(status int) { + r.status = status + r.ResponseWriter.WriteHeader(status) +} + +func recoveryMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer func() { + if recover() != nil { + writeError(w, http.StatusInternalServerError, "internal error") + } + }() + next.ServeHTTP(w, r) + }) +} + +func writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(v) +} + +func writeError(w http.ResponseWriter, status int, msg string) { + writeJSON(w, status, map[string]string{"error": msg}) +} + +func sha256Hex(s string) string { + sum := sha256.Sum256([]byte(s)) + return hex.EncodeToString(sum[:]) +} + +func constantTimeEqual(a, b string) bool { + if len(a) != len(b) { + return false + } + var v byte + for i := 0; i < len(a); i++ { + v |= a[i] ^ b[i] + } + return v == 0 +} + +func clientIP(remoteAddr string) string { + host, _, err := net.SplitHostPort(remoteAddr) + if err != nil { + return remoteAddr + } + return host +} + +func getenv(key, def string) string { + v := strings.TrimSpace(os.Getenv(key)) + if v == "" { + return def + } + return v +} + +func mustGetenv(key string) string { + v := strings.TrimSpace(os.Getenv(key)) + if v == "" { + log.Fatalf("missing required environment variable: %s", key) + } + return v +} + +func getenvInt(key string, def int) int { + v := strings.TrimSpace(os.Getenv(key)) + if v == "" { + return def + } + n, err := strconv.Atoi(v) + if err != nil { + log.Fatalf("invalid int for %s: %v", key, err) + } + return n +} + +func getenvInt64(key string, def int64) int64 { + v := strings.TrimSpace(os.Getenv(key)) + if v == "" { + return def + } + n, err := strconv.ParseInt(v, 10, 64) + if err != nil { + log.Fatalf("invalid int64 for %s: %v", key, err) + } + return n +} + +func getenvDuration(key string, def time.Duration) time.Duration { + v := strings.TrimSpace(os.Getenv(key)) + if v == "" { + return def + } + d, err := time.ParseDuration(v) + if err != nil { + log.Fatalf("invalid duration for %s: %v", key, err) + } + return d +} diff --git a/schema.sql b/schema.sql new file mode 100644 index 0000000..64cba70 --- /dev/null +++ b/schema.sql @@ -0,0 +1,101 @@ +CREATE DATABASE IF NOT EXISTS eventcollector + CHARACTER SET utf8mb4 + COLLATE utf8mb4_unicode_ci; + +USE eventcollector; + +CREATE TABLE IF NOT EXISTS agents ( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + hostname VARCHAR(255) NOT NULL, + api_key_hash CHAR(64) NOT NULL, + first_seen DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + last_seen DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + last_ip VARCHAR(64) NOT NULL DEFAULT '', + is_enabled TINYINT(1) NOT NULL DEFAULT 1, + PRIMARY KEY (id), + UNIQUE KEY ux_agents_hostname (hostname), + KEY ix_agents_last_seen (last_seen) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + +CREATE TABLE IF NOT EXISTS event_logs ( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + agent_id BIGINT UNSIGNED NOT NULL, + hostname VARCHAR(255) NOT NULL, + channel_name VARCHAR(128) NOT NULL, + event_id INT UNSIGNED NOT NULL, + source VARCHAR(255) NOT NULL, + ts DATETIME(6) NOT NULL, + received_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + msg LONGTEXT NOT NULL, + msg_sha256 CHAR(64) NOT NULL, + PRIMARY KEY (id), + KEY ix_event_logs_ts (ts), + KEY ix_event_logs_received_at (received_at), + KEY ix_event_logs_agent_ts (agent_id, ts), + KEY ix_event_logs_eventid_ts (event_id, ts), + KEY ix_event_logs_hostname_ts (hostname, ts), + KEY ix_event_logs_channel_event_ts (channel_name, event_id, ts), + CONSTRAINT fk_event_logs_agent + FOREIGN KEY (agent_id) REFERENCES agents(id) + ON DELETE RESTRICT + ON UPDATE RESTRICT +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + +CREATE TABLE IF NOT EXISTS detections ( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + rule_name VARCHAR(128) NOT NULL, + severity VARCHAR(32) NOT NULL, + hostname VARCHAR(255) NOT NULL, + channel_name VARCHAR(128) NOT NULL DEFAULT '', + event_id INT UNSIGNED NOT NULL DEFAULT 0, + score DOUBLE NOT NULL DEFAULT 0, + window_start DATETIME(6) NOT NULL, + window_end DATETIME(6) NOT NULL, + summary VARCHAR(512) NOT NULL, + details_json JSON NOT NULL, + created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + PRIMARY KEY (id), + UNIQUE KEY ux_detection_dedupe (rule_name, hostname, channel_name, event_id, window_start, window_end), + KEY ix_detections_created (created_at), + KEY ix_detections_rule_host_time (rule_name, hostname, created_at), + KEY ix_detections_severity_time (severity, created_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + +USE eventcollector; + +INSERT INTO agents (hostname, api_key_hash) +VALUES + ('client01.domain.local', SHA2('SUPER-LANGER-AGENT-KEY-01', 256)), + ('client02.domain.local', SHA2('SUPER-LANGER-AGENT-KEY-02', 256)); + + #V2 + + ALTER TABLE event_logs + ADD COLUMN computer VARCHAR(255) NOT NULL DEFAULT '' AFTER source, + ADD COLUMN provider_name VARCHAR(255) NOT NULL DEFAULT '' AFTER computer, + ADD COLUMN level_value INT UNSIGNED NOT NULL DEFAULT 0 AFTER provider_name, + ADD COLUMN task_value INT UNSIGNED NOT NULL DEFAULT 0 AFTER level_value, + ADD COLUMN opcode_value INT UNSIGNED NOT NULL DEFAULT 0 AFTER task_value, + ADD COLUMN keywords VARCHAR(255) NOT NULL DEFAULT '' AFTER opcode_value, + ADD COLUMN target_user VARCHAR(255) NOT NULL DEFAULT '' AFTER keywords, + ADD COLUMN target_domain VARCHAR(255) NOT NULL DEFAULT '' AFTER target_user, + ADD COLUMN subject_user VARCHAR(255) NOT NULL DEFAULT '' AFTER target_domain, + ADD COLUMN subject_domain VARCHAR(255) NOT NULL DEFAULT '' AFTER subject_user, + ADD COLUMN workstation VARCHAR(255) NOT NULL DEFAULT '' AFTER subject_domain, + ADD COLUMN src_ip VARCHAR(64) NOT NULL DEFAULT '' AFTER workstation, + ADD COLUMN src_port VARCHAR(32) NOT NULL DEFAULT '' AFTER src_ip, + ADD COLUMN logon_type VARCHAR(32) NOT NULL DEFAULT '' AFTER src_port, + ADD COLUMN process_name VARCHAR(512) NOT NULL DEFAULT '' AFTER logon_type, + ADD COLUMN authentication_package VARCHAR(128) NOT NULL DEFAULT '' AFTER process_name, + ADD COLUMN logon_process VARCHAR(128) NOT NULL DEFAULT '' AFTER authentication_package, + ADD COLUMN status_text VARCHAR(64) NOT NULL DEFAULT '' AFTER logon_process, + ADD COLUMN sub_status_text VARCHAR(64) NOT NULL DEFAULT '' AFTER status_text, + ADD COLUMN failure_reason VARCHAR(512) NOT NULL DEFAULT '' AFTER sub_status_text; + +ALTER TABLE event_logs + ADD KEY ix_event_logs_target_user_ts (target_user, ts), + ADD KEY ix_event_logs_src_ip_ts (src_ip, ts), + ADD KEY ix_event_logs_target_user_src_ip_ts (target_user, src_ip, ts), + ADD KEY ix_event_logs_eventid_srcip_ts (event_id, src_ip, ts), + ADD KEY ix_event_logs_eventid_targetuser_ts (event_id, target_user, ts), + ADD KEY ix_event_logs_eventid_logontype_ts (event_id, logon_type, ts); \ No newline at end of file