package accesslog import ( "context" "net/netip" "sync" "time" log "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/timestamppb" "github.com/netbirdio/netbird/proxy/auth" "github.com/netbirdio/netbird/shared/management/proto" ) const ( requestThreshold = 10000 // Log every 10k requests bytesThreshold = 1024 * 1024 * 1024 // Log every 1GB usageCleanupPeriod = 1 * time.Hour // Clean up stale counters every hour usageInactiveWindow = 24 * time.Hour // Consider domain inactive if no traffic for 24 hours ) type domainUsage struct { requestCount int64 requestStartTime time.Time bytesTransferred int64 bytesStartTime time.Time lastActivity time.Time // Track last activity for cleanup } type gRPCClient interface { SendAccessLog(ctx context.Context, in *proto.SendAccessLogRequest, opts ...grpc.CallOption) (*proto.SendAccessLogResponse, error) } // Logger sends access log entries to the management server via gRPC. type Logger struct { client gRPCClient logger *log.Logger trustedProxies []netip.Prefix usageMux sync.Mutex domainUsage map[string]*domainUsage cleanupCancel context.CancelFunc } // NewLogger creates a new access log Logger. The trustedProxies parameter // configures which upstream proxy IP ranges are trusted for extracting // the real client IP from X-Forwarded-For headers. func NewLogger(client gRPCClient, logger *log.Logger, trustedProxies []netip.Prefix) *Logger { if logger == nil { logger = log.StandardLogger() } ctx, cancel := context.WithCancel(context.Background()) l := &Logger{ client: client, logger: logger, trustedProxies: trustedProxies, domainUsage: make(map[string]*domainUsage), cleanupCancel: cancel, } // Start background cleanup routine go l.cleanupStaleUsage(ctx) return l } // Close stops the cleanup routine. Should be called during graceful shutdown. func (l *Logger) Close() { if l.cleanupCancel != nil { l.cleanupCancel() } } type logEntry struct { ID string AccountID string ServiceId string Host string Path string DurationMs int64 Method string ResponseCode int32 SourceIp string AuthMechanism string UserId string AuthSuccess bool BytesUpload int64 BytesDownload int64 } func (l *Logger) log(ctx context.Context, entry logEntry) { // Fire off the log request in a separate routine. // This increases the possibility of losing a log message // (although it should still get logged in the event of an error), // but it will reduce latency returning the request in the // middleware. // There is also a chance that log messages will arrive at // the server out of order; however, the timestamp should // allow for resolving that on the server. now := timestamppb.Now() // Grab the timestamp before launching the goroutine to try to prevent weird timing issues. This is probably unnecessary. go func() { logCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if entry.AuthMechanism != auth.MethodOIDC.String() { entry.UserId = "" } if _, err := l.client.SendAccessLog(logCtx, &proto.SendAccessLogRequest{ Log: &proto.AccessLog{ LogId: entry.ID, AccountId: entry.AccountID, Timestamp: now, ServiceId: entry.ServiceId, Host: entry.Host, Path: entry.Path, DurationMs: entry.DurationMs, Method: entry.Method, ResponseCode: entry.ResponseCode, SourceIp: entry.SourceIp, AuthMechanism: entry.AuthMechanism, UserId: entry.UserId, AuthSuccess: entry.AuthSuccess, BytesUpload: entry.BytesUpload, BytesDownload: entry.BytesDownload, }, }); err != nil { // If it fails to send on the gRPC connection, then at least log it to the error log. l.logger.WithFields(log.Fields{ "service_id": entry.ServiceId, "host": entry.Host, "path": entry.Path, "duration": entry.DurationMs, "method": entry.Method, "response_code": entry.ResponseCode, "source_ip": entry.SourceIp, "auth_mechanism": entry.AuthMechanism, "user_id": entry.UserId, "auth_success": entry.AuthSuccess, "error": err, }).Error("Error sending access log on gRPC connection") } }() } // trackUsage records request and byte counts per domain, logging when thresholds are hit. func (l *Logger) trackUsage(domain string, bytesTransferred int64) { if domain == "" { return } l.usageMux.Lock() defer l.usageMux.Unlock() now := time.Now() usage, exists := l.domainUsage[domain] if !exists { usage = &domainUsage{ requestStartTime: now, bytesStartTime: now, lastActivity: now, } l.domainUsage[domain] = usage } usage.lastActivity = now usage.requestCount++ if usage.requestCount >= requestThreshold { elapsed := time.Since(usage.requestStartTime) l.logger.WithFields(log.Fields{ "domain": domain, "requests": usage.requestCount, "duration": elapsed.String(), }).Infof("domain %s had %d requests over %s", domain, usage.requestCount, elapsed) usage.requestCount = 0 usage.requestStartTime = now } usage.bytesTransferred += bytesTransferred if usage.bytesTransferred >= bytesThreshold { elapsed := time.Since(usage.bytesStartTime) bytesInGB := float64(usage.bytesTransferred) / (1024 * 1024 * 1024) l.logger.WithFields(log.Fields{ "domain": domain, "bytes": usage.bytesTransferred, "bytes_gb": bytesInGB, "duration": elapsed.String(), }).Infof("domain %s transferred %.2f GB over %s", domain, bytesInGB, elapsed) usage.bytesTransferred = 0 usage.bytesStartTime = now } } // cleanupStaleUsage removes usage entries for domains that have been inactive. func (l *Logger) cleanupStaleUsage(ctx context.Context) { ticker := time.NewTicker(usageCleanupPeriod) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: l.usageMux.Lock() now := time.Now() removed := 0 for domain, usage := range l.domainUsage { if now.Sub(usage.lastActivity) > usageInactiveWindow { delete(l.domainUsage, domain) removed++ } } l.usageMux.Unlock() if removed > 0 { l.logger.Debugf("cleaned up %d stale domain usage entries", removed) } } } }