From a1e9396999657856c22a972e6fcd18ffc7ec9aa7 Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 31 Mar 2026 13:47:32 -0700 Subject: [PATCH] Handle backlog better --- .../lib/logStreaming/LogStreamingManager.ts | 171 ++++++++++++++++-- server/private/lib/logStreaming/types.ts | 2 + .../createEventStreamingDestination.ts | 14 ++ 3 files changed, 175 insertions(+), 12 deletions(-) diff --git a/server/private/lib/logStreaming/LogStreamingManager.ts b/server/private/lib/logStreaming/LogStreamingManager.ts index 131cc8de0..04e35ad00 100644 --- a/server/private/lib/logStreaming/LogStreamingManager.ts +++ b/server/private/lib/logStreaming/LogStreamingManager.ts @@ -77,6 +77,17 @@ const BACKOFF_SCHEDULE_MS = [ 30 * 60_000 // 30 min (failure 5+) ]; +/** + * If a destination has been continuously unreachable for this long, its + * cursors are advanced to the current max row id and the backlog is silently + * discarded. This prevents unbounded queue growth when a webhook endpoint is + * down for an extended period. A prominent warning is logged so operators are + * aware logs were dropped. + * + * Default: 24 hours. + */ +const MAX_BACKLOG_DURATION_MS = 24 * 60 * 60_000; + // --------------------------------------------------------------------------- // LogStreamingManager // --------------------------------------------------------------------------- @@ -96,6 +107,10 @@ const BACKOFF_SCHEDULE_MS = [ * catch-up batches to avoid hammering the remote endpoint. * - **Back-off**: consecutive send failures trigger exponential back-off * (tracked in-memory per destination). Successful sends reset the counter. + * - **Backlog abandonment**: if a destination remains unreachable for longer + * than `MAX_BACKLOG_DURATION_MS`, all cursors for that destination are + * advanced to the current max id so the backlog is discarded and streaming + * resumes from the present moment on recovery. */ export class LogStreamingManager { private pollTimer: ReturnType | null = null; @@ -116,6 +131,53 @@ export class LogStreamingManager { this.schedulePoll(POLL_INTERVAL_MS); } + // ------------------------------------------------------------------------- + // Cursor initialisation (call this when a destination is first created) + // ------------------------------------------------------------------------- + + /** + * Eagerly seed cursors for every log type at the **current** max row id of + * each table, scoped to the destination's org. + * + * Call this immediately after inserting a new row into + * `eventStreamingDestinations` so the destination only receives events + * that were written *after* it was created. If a cursor row already exists + * (e.g. the method is called twice) it is left untouched. + * + * The manager also has a lazy fallback inside `getOrCreateCursor` for + * destinations that existed before this method was introduced. + */ + async initializeCursorsForDestination( + destinationId: number, + orgId: string + ): Promise { + for (const logType of LOG_TYPES) { + const currentMaxId = await this.getCurrentMaxId(logType, orgId); + try { + await db + .insert(eventStreamingCursors) + .values({ + destinationId, + logType, + lastSentId: currentMaxId, + lastSentAt: null + }) + .onConflictDoNothing(); + } catch (err) { + logger.warn( + `LogStreamingManager: could not initialise cursor for ` + + `destination ${destinationId} logType="${logType}"`, + err + ); + } + } + + logger.debug( + `LogStreamingManager: cursors initialised for destination ${destinationId} ` + + `(org=${orgId})` + ); + } + async shutdown(): Promise { this.isRunning = false; if (this.pollTimer !== null) { @@ -188,8 +250,21 @@ export class LogStreamingManager { private async processDestination( dest: EventStreamingDestination ): Promise { - // Check back-off const failState = this.failures.get(dest.destinationId); + + // Check whether this destination has been unreachable long enough that + // we should give up on the accumulated backlog. + if (failState) { + const failingForMs = Date.now() - failState.firstFailedAt; + if (failingForMs >= MAX_BACKLOG_DURATION_MS) { + await this.abandonBacklog(dest, failState); + this.failures.delete(dest.destinationId); + // Cursors now point to the current head – retry on next poll. + return; + } + } + + // Check regular exponential back-off window if (failState && Date.now() < failState.nextRetryAt) { logger.debug( `LogStreamingManager: destination ${dest.destinationId} in back-off, skipping` @@ -255,6 +330,69 @@ export class LogStreamingManager { } } + /** + * Advance every cursor for the destination to the current max row id, + * effectively discarding the accumulated backlog. Called when the + * destination has been unreachable for longer than MAX_BACKLOG_DURATION_MS. + */ + private async abandonBacklog( + dest: EventStreamingDestination, + failState: DestinationFailureState + ): Promise { + const failingForHours = ( + (Date.now() - failState.firstFailedAt) / + 3_600_000 + ).toFixed(1); + + let totalDropped = 0; + + for (const logType of LOG_TYPES) { + try { + const currentMaxId = await this.getCurrentMaxId( + logType, + dest.orgId + ); + + // Find out how many rows are being skipped for this type + const cursor = await db + .select({ lastSentId: eventStreamingCursors.lastSentId }) + .from(eventStreamingCursors) + .where( + and( + eq(eventStreamingCursors.destinationId, dest.destinationId), + eq(eventStreamingCursors.logType, logType) + ) + ) + .limit(1); + + const prevId = cursor[0]?.lastSentId ?? currentMaxId; + totalDropped += Math.max(0, currentMaxId - prevId); + + await this.updateCursor( + dest.destinationId, + logType, + currentMaxId + ); + } catch (err) { + logger.error( + `LogStreamingManager: failed to advance cursor for ` + + `destination ${dest.destinationId} logType="${logType}" ` + + `during backlog abandonment`, + err + ); + } + } + + logger.warn( + `LogStreamingManager: destination ${dest.destinationId} has been ` + + `unreachable for ${failingForHours}h ` + + `(${failState.consecutiveFailures} consecutive failures). ` + + `Discarding backlog of ~${totalDropped} log event(s) and ` + + `resuming from the current position. ` + + `Verify the destination URL and credentials.` + ); + } + /** * Forward all pending log records of a specific type for a destination. * @@ -342,20 +480,27 @@ export class LogStreamingManager { return { lastSentId: existing[0].lastSentId }; } - // No cursor yet – initialise at the current maximum id of the log - // table for this org so we only forward *new* events going forward. + // No cursor yet – this destination pre-dates the eager initialisation + // path (initializeCursorsForDestination). Seed at the current max id + // so we do not replay historical logs. const initialId = await this.getCurrentMaxId(logType, orgId); - await db.insert(eventStreamingCursors).values({ - destinationId, - logType, - lastSentId: initialId, - lastSentAt: null - }); + // Use onConflictDoNothing in case of a rare race between two poll + // cycles both hitting this branch simultaneously. + await db + .insert(eventStreamingCursors) + .values({ + destinationId, + logType, + lastSentId: initialId, + lastSentAt: null + }) + .onConflictDoNothing(); logger.debug( - `LogStreamingManager: initialised cursor for destination ${destinationId} ` + - `logType="${logType}" at id=${initialId}` + `LogStreamingManager: lazily initialised cursor for destination ${destinationId} ` + + `logType="${logType}" at id=${initialId} ` + + `(prefer initializeCursorsForDestination at creation time)` ); return { lastSentId: initialId }; @@ -574,7 +719,9 @@ export class LogStreamingManager { private recordFailure(destinationId: number): void { const current = this.failures.get(destinationId) ?? { consecutiveFailures: 0, - nextRetryAt: 0 + nextRetryAt: 0, + // Stamp the very first failure so we can measure total outage duration + firstFailedAt: Date.now() }; current.consecutiveFailures += 1; diff --git a/server/private/lib/logStreaming/types.ts b/server/private/lib/logStreaming/types.ts index 2bd25418d..585adb2a2 100644 --- a/server/private/lib/logStreaming/types.ts +++ b/server/private/lib/logStreaming/types.ts @@ -112,4 +112,6 @@ export interface DestinationFailureState { consecutiveFailures: number; /** Date.now() value after which the destination may be retried */ nextRetryAt: number; + /** Date.now() value of the very first failure in the current streak */ + firstFailedAt: number; } \ No newline at end of file diff --git a/server/private/routers/eventStreamingDestination/createEventStreamingDestination.ts b/server/private/routers/eventStreamingDestination/createEventStreamingDestination.ts index 1c9de788a..623f2d9e0 100644 --- a/server/private/routers/eventStreamingDestination/createEventStreamingDestination.ts +++ b/server/private/routers/eventStreamingDestination/createEventStreamingDestination.ts @@ -15,6 +15,7 @@ import { Request, Response, NextFunction } from "express"; import { z } from "zod"; import { db } from "@server/db"; import { eventStreamingDestinations } from "@server/db"; +import { logStreamingManager } from "#private/lib/logStreaming"; import response from "@server/lib/response"; import HttpCode from "@server/types/HttpCode"; import createHttpError from "http-errors"; @@ -106,6 +107,19 @@ export async function createEventStreamingDestination( }) .returning(); + // Seed cursors at the current max row id for every log type so this + // destination only receives events written *after* it was created. + // Fire-and-forget: a failure here is non-fatal; the manager has a lazy + // fallback that will seed at the next poll if these rows are missing. + logStreamingManager + .initializeCursorsForDestination(destination.destinationId, orgId) + .catch((err) => + logger.error( + "createEventStreamingDestination: failed to initialise streaming cursors", + err + ) + ); + return response(res, { data: { destinationId: destination.destinationId