diff --git a/management/cmd/management.go b/management/cmd/management.go index d6735f955..03c97fd16 100644 --- a/management/cmd/management.go +++ b/management/cmd/management.go @@ -176,7 +176,7 @@ var ( if disableSingleAccMode { mgmtSingleAccModeDomain = "" } - eventStore, key, err := integrations.InitEventStore(ctx, config.Datadir, config.DataStoreEncryptionKey) + eventStore, key, err := integrations.InitEventStore(ctx, config.Datadir, config.DataStoreEncryptionKey, appMetrics) if err != nil { return fmt.Errorf("failed to initialize database: %s", err) } diff --git a/management/server/telemetry/app_metrics.go b/management/server/telemetry/app_metrics.go index 09deb8127..155e751c9 100644 --- a/management/server/telemetry/app_metrics.go +++ b/management/server/telemetry/app_metrics.go @@ -29,6 +29,7 @@ type MockAppMetrics struct { StoreMetricsFunc func() *StoreMetrics UpdateChannelMetricsFunc func() *UpdateChannelMetrics AddAccountManagerMetricsFunc func() *AccountManagerMetrics + EventStreamingMetricsFunc func() *EventStreamingMetrics } // GetMeter mocks the GetMeter function of the AppMetrics interface @@ -103,6 +104,14 @@ func (mock *MockAppMetrics) AccountManagerMetrics() *AccountManagerMetrics { return nil } +// EventStreamingMetrics mocks the EventStreamingMetrics function of the AppMetrics interface +func (mock *MockAppMetrics) EventStreamingMetrics() *EventStreamingMetrics { + if mock.EventStreamingMetricsFunc != nil { + return mock.EventStreamingMetricsFunc() + } + return nil +} + // AppMetrics is metrics interface type AppMetrics interface { GetMeter() metric2.Meter @@ -114,6 +123,7 @@ type AppMetrics interface { StoreMetrics() *StoreMetrics UpdateChannelMetrics() *UpdateChannelMetrics AccountManagerMetrics() *AccountManagerMetrics + EventStreamingMetrics() *EventStreamingMetrics } // defaultAppMetrics are core application metrics based on OpenTelemetry https://opentelemetry.io/ @@ -128,6 +138,7 @@ type defaultAppMetrics struct { storeMetrics *StoreMetrics updateChannelMetrics *UpdateChannelMetrics accountManagerMetrics *AccountManagerMetrics + eventStreamingMetrics *EventStreamingMetrics } // IDPMetrics returns metrics for the idp package @@ -160,6 +171,11 @@ func (appMetrics *defaultAppMetrics) AccountManagerMetrics() *AccountManagerMetr return appMetrics.accountManagerMetrics } +// EventStreamingMetrics returns metrics for the event streaming module +func (appMetrics *defaultAppMetrics) EventStreamingMetrics() *EventStreamingMetrics { + return appMetrics.eventStreamingMetrics +} + // Close stop application metrics HTTP handler and closes listener. func (appMetrics *defaultAppMetrics) Close() error { if appMetrics.listener == nil { @@ -184,10 +200,10 @@ func (appMetrics *defaultAppMetrics) Expose(ctx context.Context, port int, endpo } appMetrics.listener = listener go func() { - err := http.Serve(listener, rootRouter) - if err != nil { - return + if err := http.Serve(listener, rootRouter); err != nil && err != http.ErrServerClosed { + log.WithContext(ctx).Errorf("metrics server error: %v", err) } + log.WithContext(ctx).Info("metrics server stopped") }() log.WithContext(ctx).Infof("enabled application metrics and exposing on http://%s", listener.Addr().String()) @@ -204,7 +220,7 @@ func (appMetrics *defaultAppMetrics) GetMeter() metric2.Meter { func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) { exporter, err := prometheus.New() if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create prometheus exporter: %w", err) } provider := metric.NewMeterProvider(metric.WithReader(exporter)) @@ -213,32 +229,37 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) { idpMetrics, err := NewIDPMetrics(ctx, meter) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to initialize IDP metrics: %w", err) } middleware, err := NewMetricsMiddleware(ctx, meter) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to initialize HTTP middleware metrics: %w", err) } grpcMetrics, err := NewGRPCMetrics(ctx, meter) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to initialize gRPC metrics: %w", err) } storeMetrics, err := NewStoreMetrics(ctx, meter) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to initialize store metrics: %w", err) } updateChannelMetrics, err := NewUpdateChannelMetrics(ctx, meter) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to initialize update channel metrics: %w", err) } accountManagerMetrics, err := NewAccountManagerMetrics(ctx, meter) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to initialize account manager metrics: %w", err) + } + + eventStreamingMetrics, err := NewEventStreamingMetrics(ctx, meter) + if err != nil { + return nil, fmt.Errorf("failed to initialize event streaming metrics: %w", err) } return &defaultAppMetrics{ @@ -250,5 +271,6 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) { storeMetrics: storeMetrics, updateChannelMetrics: updateChannelMetrics, accountManagerMetrics: accountManagerMetrics, + eventStreamingMetrics: eventStreamingMetrics, }, nil } diff --git a/management/server/telemetry/eventstreaming_metrics.go b/management/server/telemetry/eventstreaming_metrics.go new file mode 100644 index 000000000..9a9d49059 --- /dev/null +++ b/management/server/telemetry/eventstreaming_metrics.go @@ -0,0 +1,207 @@ +package telemetry + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +const ( + labelTargetPlatform = attribute.Key("target_platform") + labelAccountID = attribute.Key("account_id") + labelHTTPStatusCode = attribute.Key("http_status_code") + labelFailureReason = attribute.Key("failure_reason") + labelManagerAction = attribute.Key("manager_action") + labelManagerActionResult = attribute.Key("manager_action_result") + labelEventStreamStatus = attribute.Key("event_stream_status") +) + +type EventStreamingMetrics struct { + ctx context.Context + + eventsSentTotal metric.Int64Counter + eventsFailedTotal metric.Int64Counter + deliveryAttemptsTotal metric.Int64Counter + eventQueueSize metric.Int64UpDownCounter + activeWorkers metric.Int64UpDownCounter + deliveryDurationMilliseconds metric.Float64Histogram + managerActionTotal metric.Int64Counter + integrationEventsProcessedTotal metric.Int64Counter +} + +func NewEventStreamingMetrics(ctx context.Context, meter metric.Meter) (*EventStreamingMetrics, error) { + var err error + metrics := &EventStreamingMetrics{ctx: ctx} + + metrics.eventsSentTotal, err = meter.Int64Counter( + "eventstreaming.events.sent.total", + metric.WithDescription("Total number of events successfully sent."), + metric.WithUnit("1"), + ) + if err != nil { + return nil, err + } + + metrics.eventsFailedTotal, err = meter.Int64Counter( + "eventstreaming.events.failed.total", + metric.WithDescription("Total number of events that failed to be sent after all retries."), + metric.WithUnit("1"), + ) + if err != nil { + return nil, err + } + + metrics.deliveryAttemptsTotal, err = meter.Int64Counter( + "eventstreaming.delivery.attempts.total", + metric.WithDescription("Total number of delivery attempts for events."), + metric.WithUnit("1"), + ) + if err != nil { + return nil, err + } + + metrics.eventQueueSize, err = meter.Int64UpDownCounter( + "eventstreaming.event.queue.size", + metric.WithDescription("Current number of events in the Generic HTTP client's delivery queue."), + metric.WithUnit("1"), + ) + if err != nil { + return nil, err + } + + metrics.activeWorkers, err = meter.Int64UpDownCounter( + "eventstreaming.active.workers", + metric.WithDescription("Current number of active workers in the Generic HTTP client."), + metric.WithUnit("1"), + ) + if err != nil { + return nil, err + } + + metrics.deliveryDurationMilliseconds, err = meter.Float64Histogram( + "eventstreaming.delivery.duration.ms", + metric.WithDescription("Duration from when an event is picked from the queue until it's successfully sent or finally fails for Generic HTTP."), + metric.WithUnit("ms"), + metric.WithExplicitBucketBoundaries(10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 30000), + ) + if err != nil { + return nil, err + } + + metrics.managerActionTotal, err = meter.Int64Counter( + "eventstreaming.manager.action.total", + metric.WithDescription("Total number of integration management actions (create, update, delete, test)."), + metric.WithUnit("1"), + ) + if err != nil { + return nil, err + } + + metrics.integrationEventsProcessedTotal, err = meter.Int64Counter( + "eventstreaming.integration.events.processed.total", + metric.WithDescription("Total number of events processed by the streaming store, indicating outcome."), + metric.WithUnit("1"), + ) + if err != nil { + return nil, err + } + + return metrics, nil +} + +func (m *EventStreamingMetrics) RecordGenericHTTPSentEvent(accountID, platform, statusCode string) { + m.eventsSentTotal.Add(m.ctx, 1, + metric.WithAttributes( + labelAccountID.String(accountID), + labelTargetPlatform.String(platform), + labelHTTPStatusCode.String(statusCode), + ), + ) +} + +func (m *EventStreamingMetrics) RecordGenericHTTPFailedEvent(accountID, platform, reason string) { + m.eventsFailedTotal.Add(m.ctx, 1, + metric.WithAttributes( + labelAccountID.String(accountID), + labelTargetPlatform.String(platform), + labelFailureReason.String(reason), + ), + ) +} + +func (m *EventStreamingMetrics) RecordGenericHTTPDeliveryAttempt(accountID, platform string) { + m.deliveryAttemptsTotal.Add(m.ctx, 1, + metric.WithAttributes( + labelAccountID.String(accountID), + labelTargetPlatform.String(platform), + ), + ) +} + +func (m *EventStreamingMetrics) UpdateGenericHTTPEventQueueSize(accountID, platform string, count int64, add bool) { + if add { + m.eventQueueSize.Add(m.ctx, count, + metric.WithAttributes( + labelAccountID.String(accountID), + labelTargetPlatform.String(platform), + ), + ) + } else { + m.eventQueueSize.Add(m.ctx, -count, + metric.WithAttributes( + labelAccountID.String(accountID), + labelTargetPlatform.String(platform), + ), + ) + } +} + +func (m *EventStreamingMetrics) AddActiveWorker(accountID, platform string) { + m.activeWorkers.Add(m.ctx, 1, + metric.WithAttributes( + labelAccountID.String(accountID), + labelTargetPlatform.String(platform), + ), + ) +} + +func (m *EventStreamingMetrics) RemoveActiveWorker(accountID, platform string) { + m.activeWorkers.Add(m.ctx, -1, + metric.WithAttributes( + labelAccountID.String(accountID), + labelTargetPlatform.String(platform), + ), + ) +} + +func (m *EventStreamingMetrics) RecordGenericHTTPDeliveryDuration(accountID, platform string, duration time.Duration) { + m.deliveryDurationMilliseconds.Record(m.ctx, float64(duration.Milliseconds()), + metric.WithAttributes( + labelAccountID.String(accountID), + labelTargetPlatform.String(platform), + ), + ) +} + +func (m *EventStreamingMetrics) RecordManagerAction(accountID, action, platform, resultStatus string) { + m.managerActionTotal.Add(m.ctx, 1, + metric.WithAttributes( + labelAccountID.String(accountID), + labelManagerAction.String(action), + labelTargetPlatform.String(platform), + labelManagerActionResult.String(resultStatus), + ), + ) +} + +func (m *EventStreamingMetrics) RecordIntegrationEventProcessed(accountID, platform, statusValue string) { + m.integrationEventsProcessedTotal.Add(m.ctx, 1, + metric.WithAttributes( + labelAccountID.String(accountID), + labelTargetPlatform.String(platform), + labelEventStreamStatus.String(statusValue), + ), + ) +}