Handle backlog better

This commit is contained in:
Owen
2026-03-31 13:47:32 -07:00
parent d155d7e31b
commit a1e9396999
3 changed files with 175 additions and 12 deletions

View File

@@ -77,6 +77,17 @@ const BACKOFF_SCHEDULE_MS = [
30 * 60_000 // 30 min (failure 5+)
];
/**
* If a destination has been continuously unreachable for this long, its
* cursors are advanced to the current max row id and the backlog is silently
* discarded. This prevents unbounded queue growth when a webhook endpoint is
* down for an extended period. A prominent warning is logged so operators are
* aware logs were dropped.
*
* Default: 24 hours.
*/
const MAX_BACKLOG_DURATION_MS = 24 * 60 * 60_000;
// ---------------------------------------------------------------------------
// LogStreamingManager
// ---------------------------------------------------------------------------
@@ -96,6 +107,10 @@ const BACKOFF_SCHEDULE_MS = [
* catch-up batches to avoid hammering the remote endpoint.
* - **Back-off**: consecutive send failures trigger exponential back-off
* (tracked in-memory per destination). Successful sends reset the counter.
* - **Backlog abandonment**: if a destination remains unreachable for longer
* than `MAX_BACKLOG_DURATION_MS`, all cursors for that destination are
* advanced to the current max id so the backlog is discarded and streaming
* resumes from the present moment on recovery.
*/
export class LogStreamingManager {
private pollTimer: ReturnType<typeof setTimeout> | null = null;
@@ -116,6 +131,53 @@ export class LogStreamingManager {
this.schedulePoll(POLL_INTERVAL_MS);
}
// -------------------------------------------------------------------------
// Cursor initialisation (call this when a destination is first created)
// -------------------------------------------------------------------------
/**
* Eagerly seed cursors for every log type at the **current** max row id of
* each table, scoped to the destination's org.
*
* Call this immediately after inserting a new row into
* `eventStreamingDestinations` so the destination only receives events
* that were written *after* it was created. If a cursor row already exists
* (e.g. the method is called twice) it is left untouched.
*
* The manager also has a lazy fallback inside `getOrCreateCursor` for
* destinations that existed before this method was introduced.
*/
async initializeCursorsForDestination(
destinationId: number,
orgId: string
): Promise<void> {
for (const logType of LOG_TYPES) {
const currentMaxId = await this.getCurrentMaxId(logType, orgId);
try {
await db
.insert(eventStreamingCursors)
.values({
destinationId,
logType,
lastSentId: currentMaxId,
lastSentAt: null
})
.onConflictDoNothing();
} catch (err) {
logger.warn(
`LogStreamingManager: could not initialise cursor for ` +
`destination ${destinationId} logType="${logType}"`,
err
);
}
}
logger.debug(
`LogStreamingManager: cursors initialised for destination ${destinationId} ` +
`(org=${orgId})`
);
}
async shutdown(): Promise<void> {
this.isRunning = false;
if (this.pollTimer !== null) {
@@ -188,8 +250,21 @@ export class LogStreamingManager {
private async processDestination(
dest: EventStreamingDestination
): Promise<void> {
// Check back-off
const failState = this.failures.get(dest.destinationId);
// Check whether this destination has been unreachable long enough that
// we should give up on the accumulated backlog.
if (failState) {
const failingForMs = Date.now() - failState.firstFailedAt;
if (failingForMs >= MAX_BACKLOG_DURATION_MS) {
await this.abandonBacklog(dest, failState);
this.failures.delete(dest.destinationId);
// Cursors now point to the current head retry on next poll.
return;
}
}
// Check regular exponential back-off window
if (failState && Date.now() < failState.nextRetryAt) {
logger.debug(
`LogStreamingManager: destination ${dest.destinationId} in back-off, skipping`
@@ -255,6 +330,69 @@ export class LogStreamingManager {
}
}
/**
* Advance every cursor for the destination to the current max row id,
* effectively discarding the accumulated backlog. Called when the
* destination has been unreachable for longer than MAX_BACKLOG_DURATION_MS.
*/
private async abandonBacklog(
dest: EventStreamingDestination,
failState: DestinationFailureState
): Promise<void> {
const failingForHours = (
(Date.now() - failState.firstFailedAt) /
3_600_000
).toFixed(1);
let totalDropped = 0;
for (const logType of LOG_TYPES) {
try {
const currentMaxId = await this.getCurrentMaxId(
logType,
dest.orgId
);
// Find out how many rows are being skipped for this type
const cursor = await db
.select({ lastSentId: eventStreamingCursors.lastSentId })
.from(eventStreamingCursors)
.where(
and(
eq(eventStreamingCursors.destinationId, dest.destinationId),
eq(eventStreamingCursors.logType, logType)
)
)
.limit(1);
const prevId = cursor[0]?.lastSentId ?? currentMaxId;
totalDropped += Math.max(0, currentMaxId - prevId);
await this.updateCursor(
dest.destinationId,
logType,
currentMaxId
);
} catch (err) {
logger.error(
`LogStreamingManager: failed to advance cursor for ` +
`destination ${dest.destinationId} logType="${logType}" ` +
`during backlog abandonment`,
err
);
}
}
logger.warn(
`LogStreamingManager: destination ${dest.destinationId} has been ` +
`unreachable for ${failingForHours}h ` +
`(${failState.consecutiveFailures} consecutive failures). ` +
`Discarding backlog of ~${totalDropped} log event(s) and ` +
`resuming from the current position. ` +
`Verify the destination URL and credentials.`
);
}
/**
* Forward all pending log records of a specific type for a destination.
*
@@ -342,20 +480,27 @@ export class LogStreamingManager {
return { lastSentId: existing[0].lastSentId };
}
// No cursor yet initialise at the current maximum id of the log
// table for this org so we only forward *new* events going forward.
// No cursor yet this destination pre-dates the eager initialisation
// path (initializeCursorsForDestination). Seed at the current max id
// so we do not replay historical logs.
const initialId = await this.getCurrentMaxId(logType, orgId);
await db.insert(eventStreamingCursors).values({
destinationId,
logType,
lastSentId: initialId,
lastSentAt: null
});
// Use onConflictDoNothing in case of a rare race between two poll
// cycles both hitting this branch simultaneously.
await db
.insert(eventStreamingCursors)
.values({
destinationId,
logType,
lastSentId: initialId,
lastSentAt: null
})
.onConflictDoNothing();
logger.debug(
`LogStreamingManager: initialised cursor for destination ${destinationId} ` +
`logType="${logType}" at id=${initialId}`
`LogStreamingManager: lazily initialised cursor for destination ${destinationId} ` +
`logType="${logType}" at id=${initialId} ` +
`(prefer initializeCursorsForDestination at creation time)`
);
return { lastSentId: initialId };
@@ -574,7 +719,9 @@ export class LogStreamingManager {
private recordFailure(destinationId: number): void {
const current = this.failures.get(destinationId) ?? {
consecutiveFailures: 0,
nextRetryAt: 0
nextRetryAt: 0,
// Stamp the very first failure so we can measure total outage duration
firstFailedAt: Date.now()
};
current.consecutiveFailures += 1;

View File

@@ -112,4 +112,6 @@ export interface DestinationFailureState {
consecutiveFailures: number;
/** Date.now() value after which the destination may be retried */
nextRetryAt: number;
/** Date.now() value of the very first failure in the current streak */
firstFailedAt: number;
}