Process log to form sessions

This commit is contained in:
Owen
2026-03-24 17:26:44 -07:00
parent 0f57985b6f
commit 69019d5655
2 changed files with 980 additions and 10 deletions

View File

@@ -7,6 +7,8 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"net"
"sort"
"sync"
"time"
@@ -19,6 +21,15 @@ const (
// maxBufferedSessions is the max number of completed sessions to buffer before forcing a flush
maxBufferedSessions = 100
// sessionGapThreshold is the maximum gap between the end of one connection
// and the start of the next for them to be considered part of the same session.
// If the gap exceeds this, a new consolidated session is created.
sessionGapThreshold = 5 * time.Second
// minConnectionsToConsolidate is the minimum number of connections in a group
// before we bother consolidating. Groups smaller than this are sent as-is.
minConnectionsToConsolidate = 2
)
// SendFunc is a callback that sends compressed access log data to the server.
@@ -27,15 +38,16 @@ type SendFunc func(data string) error
// AccessSession represents a tracked access session through the proxy
type AccessSession struct {
SessionID string `json:"sessionId"`
ResourceID int `json:"resourceId"`
SourceAddr string `json:"sourceAddr"`
DestAddr string `json:"destAddr"`
Protocol string `json:"protocol"`
StartedAt time.Time `json:"startedAt"`
EndedAt time.Time `json:"endedAt,omitempty"`
BytesTx int64 `json:"bytesTx"`
BytesRx int64 `json:"bytesRx"`
SessionID string `json:"sessionId"`
ResourceID int `json:"resourceId"`
SourceAddr string `json:"sourceAddr"`
DestAddr string `json:"destAddr"`
Protocol string `json:"protocol"`
StartedAt time.Time `json:"startedAt"`
EndedAt time.Time `json:"endedAt,omitempty"`
BytesTx int64 `json:"bytesTx"`
BytesRx int64 `json:"bytesRx"`
ConnectionCount int `json:"connectionCount,omitempty"` // number of raw connections merged into this session (0 or 1 = single)
}
// udpSessionKey identifies a unique UDP "session" by src -> dst
@@ -45,6 +57,16 @@ type udpSessionKey struct {
protocol string
}
// consolidationKey groups connections that may be part of the same logical session.
// Source port is intentionally excluded so that many ephemeral-port connections
// from the same source IP to the same destination are grouped together.
type consolidationKey struct {
sourceIP string // IP only, no port
destAddr string // full host:port of the destination
protocol string
resourceID int
}
// AccessLogger tracks access sessions for resources and periodically
// flushes completed sessions to the server via a configurable SendFunc.
type AccessLogger struct {
@@ -251,7 +273,137 @@ func (al *AccessLogger) reapStaleSessions() {
}
}
// flush drains the completed sessions buffer, compresses with zlib, and sends via the SendFunc.
// extractIP strips the port from an address string and returns just the IP.
// If the address has no port component it is returned as-is.
func extractIP(addr string) string {
host, _, err := net.SplitHostPort(addr)
if err != nil {
// Might already be a bare IP
return addr
}
return host
}
// consolidateSessions takes a slice of completed sessions and merges bursts of
// short-lived connections from the same source IP to the same destination into
// single higher-level session entries.
//
// The algorithm:
// 1. Group sessions by (sourceIP, destAddr, protocol, resourceID).
// 2. Within each group, sort by StartedAt.
// 3. Walk through the sorted list and merge consecutive sessions whose gap
// (previous EndedAt → next StartedAt) is ≤ sessionGapThreshold.
// 4. For merged sessions the earliest StartedAt and latest EndedAt are kept,
// bytes are summed, and ConnectionCount records how many raw connections
// were folded in. If the merged connections used more than one source port,
// SourceAddr is set to just the IP (port omitted).
// 5. Groups with fewer than minConnectionsToConsolidate members are passed
// through unmodified.
func consolidateSessions(sessions []*AccessSession) []*AccessSession {
if len(sessions) <= 1 {
return sessions
}
// Group sessions by consolidation key
groups := make(map[consolidationKey][]*AccessSession)
for _, s := range sessions {
key := consolidationKey{
sourceIP: extractIP(s.SourceAddr),
destAddr: s.DestAddr,
protocol: s.Protocol,
resourceID: s.ResourceID,
}
groups[key] = append(groups[key], s)
}
result := make([]*AccessSession, 0, len(sessions))
for key, group := range groups {
// Small groups don't need consolidation
if len(group) < minConnectionsToConsolidate {
result = append(result, group...)
continue
}
// Sort the group by start time so we can detect gaps
sort.Slice(group, func(i, j int) bool {
return group[i].StartedAt.Before(group[j].StartedAt)
})
// Walk through and merge runs that are within the gap threshold
var merged []*AccessSession
cur := cloneSession(group[0])
cur.ConnectionCount = 1
sourcePorts := make(map[string]struct{})
sourcePorts[cur.SourceAddr] = struct{}{}
for i := 1; i < len(group); i++ {
s := group[i]
// Determine the gap: from the latest end time we've seen so far to the
// start of the next connection.
gapRef := cur.EndedAt
if gapRef.IsZero() {
gapRef = cur.StartedAt
}
gap := s.StartedAt.Sub(gapRef)
if gap <= sessionGapThreshold {
// Merge into the current consolidated session
cur.ConnectionCount++
cur.BytesTx += s.BytesTx
cur.BytesRx += s.BytesRx
sourcePorts[s.SourceAddr] = struct{}{}
// Extend EndedAt to the latest time
endTime := s.EndedAt
if endTime.IsZero() {
endTime = s.StartedAt
}
if endTime.After(cur.EndedAt) {
cur.EndedAt = endTime
}
} else {
// Gap exceeded — finalize the current session and start a new one
finalizeMergedSourceAddr(cur, key.sourceIP, sourcePorts)
merged = append(merged, cur)
cur = cloneSession(s)
cur.ConnectionCount = 1
sourcePorts = make(map[string]struct{})
sourcePorts[s.SourceAddr] = struct{}{}
}
}
// Finalize the last accumulated session
finalizeMergedSourceAddr(cur, key.sourceIP, sourcePorts)
merged = append(merged, cur)
result = append(result, merged...)
}
return result
}
// cloneSession creates a shallow copy of an AccessSession.
func cloneSession(s *AccessSession) *AccessSession {
cp := *s
return &cp
}
// finalizeMergedSourceAddr sets the SourceAddr on a consolidated session.
// If multiple distinct source addresses (ports) were seen, the port is
// stripped and only the IP is kept so the log isn't misleading.
func finalizeMergedSourceAddr(s *AccessSession, sourceIP string, ports map[string]struct{}) {
if len(ports) > 1 {
// Multiple source ports — just report the IP
s.SourceAddr = sourceIP
}
// Otherwise keep the original SourceAddr which already has ip:port
}
// flush drains the completed sessions buffer, consolidates bursts of
// short-lived connections, compresses with zlib, and sends via the SendFunc.
func (al *AccessLogger) flush() {
al.mu.Lock()
if len(al.completedSessions) == 0 {
@@ -268,6 +420,13 @@ func (al *AccessLogger) flush() {
return
}
// Consolidate bursts of short-lived connections into higher-level sessions
originalCount := len(batch)
batch = consolidateSessions(batch)
if len(batch) != originalCount {
logger.Info("Access logger: consolidated %d raw connections into %d sessions", originalCount, len(batch))
}
compressed, err := compressSessions(batch)
if err != nil {
logger.Error("Access logger: failed to compress %d sessions: %v", len(batch), err)