diff --git a/signal/suppressor/suppressor.go b/signal/suppressor/suppressor.go new file mode 100644 index 000000000..7ce2244c4 --- /dev/null +++ b/signal/suppressor/suppressor.go @@ -0,0 +1,156 @@ +package suppressor + +import ( + "fmt" + "time" +) + +const ( + + // DefaultRepetitionThreshold determines after how many repetitions it will be suppressed. It is a counter + DefaultRepetitionThreshold = 90 // If the peer repeats the packets every 10 seconds, suppress them after 15 minutes + minRepetitionThreshold = 3 + + // minTimeBetweenPackages below this period do not check the repetitions + minTimeBetweenPackages = 7 * time.Second + toleranceRange = 1 * time.Second +) + +type PeerID string + +type packageStat struct { + lastSeen time.Time // last packet timestamp + lastDelta *time.Duration // time between same size of packages + lastSize int + repetitionTimes int +} + +type Opts struct { + RepetitionThreshold int +} + +// Suppressor filters repeated packages from peers to prevent spam or abuse. +// +// It works by keeping track of the timing and size of packages received +// from each peer. For each peer, it stores the last package size, the +// timestamp when it was seen, the time difference (delta) between consecutive +// packages of the same size, and a repetition counter. +// +// The suppressor uses the following rules: +// +// 1. **Short intervals**: If a package arrives sooner than minTimeBetweenPackages +// since the last package, it is accepted without repetition checks. This +// allows bursts or backoff recovery to pass through. +// +// 2. **Clock skew / negative delta**: If the system clock goes backward +// and produces a negative delta, the package is accepted and the state +// is reset to prevent exploitation. +// +// 3. **Size changes**: If the new package size differs from the previous +// one, the package is accepted and the repetition counter is reset. +// +// 4. **Tolerance-based repetition detection**: If a package arrives with a +// delta close to the previous delta (within the toleranceRange), it is +// considered a repeated pattern and the repetition counter is incremented. +// +// 5. **Suppression**: Once the repetition counter exceeds repetitionThreshold, +// further packages with the same timing pattern are suppressed. +// +// This design ensures that repeated or spammy traffic patterns are filtered +// while allowing legitimate variations due to network jitter or bursty traffic. +type Suppressor struct { + repetitionThreshold int + peers map[PeerID]*packageStat +} + +func NewSuppressor(opts *Opts) (*Suppressor, error) { + threshold := DefaultRepetitionThreshold + if opts != nil { + if opts.RepetitionThreshold < minRepetitionThreshold { + return nil, fmt.Errorf("invalid repetition threshold") + } + + threshold = opts.RepetitionThreshold + } + + return &Suppressor{ + repetitionThreshold: threshold, + peers: make(map[PeerID]*packageStat), + }, nil +} + +// PackageReceived handles a newly received package from a peer. +// +// Parameters: +// - destination: the PeerID of the peer that sent the package +// - size: the size of the package +// - arrivedTime: the timestamp when the package arrived +// +// Returns: +// - true if the package is accepted (not suppressed) +// - false if the package is considered a repeated package and suppressed +func (s *Suppressor) PackageReceived(destination PeerID, size int, arrivedTime time.Time) bool { + p, ok := s.peers[destination] + if !ok { + s.peers[destination] = &packageStat{ + lastSeen: arrivedTime, + lastSize: size, + } + return true + } + + if p.lastSize != size { + p.lastSeen = arrivedTime + p.lastSize = size + p.lastDelta = nil + p.repetitionTimes = 0 + return true + } + + // Calculate delta + delta := arrivedTime.Sub(p.lastSeen) + + // Clock went backwards - don't reset state to prevent exploitation + // Just update timestamp and continue with existing state + if delta < 0 { + p.lastSeen = arrivedTime + p.lastDelta = nil + p.repetitionTimes = 0 + return true + } + + // if it is below the threshold we want to allow because the backoff ticker is active + if delta < minTimeBetweenPackages { + p.lastSeen = arrivedTime + p.lastDelta = nil + p.repetitionTimes = 0 + return true + } + + // case when we have only one package in the history + if p.lastDelta == nil { + p.lastSeen = arrivedTime + p.lastDelta = &delta + return true + } + + if abs(delta-*p.lastDelta) > toleranceRange { + p.lastSeen = arrivedTime + p.lastDelta = &delta + p.repetitionTimes = 0 + return true + + } + p.lastSeen = arrivedTime + p.lastDelta = &delta + p.repetitionTimes++ + + return p.repetitionTimes < s.repetitionThreshold +} + +func abs(d time.Duration) time.Duration { + if d < 0 { + return -d + } + return d +} diff --git a/signal/suppressor/suppressor_test.go b/signal/suppressor/suppressor_test.go new file mode 100644 index 000000000..2e96f88bd --- /dev/null +++ b/signal/suppressor/suppressor_test.go @@ -0,0 +1,143 @@ +package suppressor + +import ( + "testing" + "time" +) + +func TestSuppressor_PackageReceived(t *testing.T) { + destID := PeerID("remote") + s, _ := NewSuppressor(&Opts{RepetitionThreshold: 3}) + + // Define sequence with base deltas (s ±10% tolerance) + deltas := []time.Duration{ + 800 * time.Millisecond, + 1600 * time.Millisecond, + 3200 * time.Millisecond, + 6400 * time.Millisecond, + 10 * time.Second, + 10 * time.Second, + 10 * time.Second, + 10 * time.Second, // should be suppressed + 10 * time.Second, + 10 * time.Second, + } + sizes := []int{ + 100, + 100, + 100, + 100, + 100, + 100, + 100, + 100, + 100, + 100, + } + + expected := []bool{ + true, + true, + true, + true, + true, + true, + true, + false, + false, + false, + } + + // Apply ±10% tolerance + times := make([]time.Time, len(deltas)+1) + times[0] = time.Now() + for i, d := range deltas { + // ±10% randomization + offset := d / 10 + times[i+1] = times[i].Add(d + offset) // for deterministic test, using +10% + } + + for i, arrival := range times[1:] { + allowed := s.PackageReceived(destID, sizes[i], arrival) + if allowed != expected[i] { + t.Errorf("Packet %d at %v: expected allowed=%v, got %v", i+1, arrival.Sub(times[0]), expected[i], allowed) + } + t.Logf("Packet %d at %v allowed: %v", i+1, arrival.Sub(times[0]), allowed) + } +} + +func TestSuppressor_PackageReceivedReset(t *testing.T) { + destID := PeerID("remote") + s, _ := NewSuppressor(&Opts{RepetitionThreshold: 5}) + + // Define sequence with base deltas (s ±10% tolerance) + deltas := []time.Duration{ + 800 * time.Millisecond, + 1600 * time.Millisecond, + 3200 * time.Millisecond, + 6400 * time.Millisecond, + 10 * time.Second, + 10 * time.Second, + 10 * time.Second, + 10 * time.Second, + 10 * time.Second, + 10 * time.Second, + 10 * time.Second, + 10 * time.Second, + 10 * time.Second, + 10 * time.Second, + 10 * time.Second, + } + sizes := []int{ + 100, + 100, + 100, + 100, + 100, + 100, + 100, + 100, + 100, + 100, + 100, + 50, + 100, + 100, + 100, + } + + expected := []bool{ + true, + true, + true, + true, + true, + true, + true, + true, + true, + false, + false, + true, + true, + true, + true, + } + + // Apply ±10% tolerance + times := make([]time.Time, len(deltas)+1) + times[0] = time.Now() + for i, d := range deltas { + // ±10% randomization + offset := d / 10 + times[i+1] = times[i].Add(d + offset) // for deterministic test, using +10% + } + + for i, arrival := range times[1:] { + allowed := s.PackageReceived(destID, sizes[i], arrival) + if allowed != expected[i] { + t.Errorf("Packet %d at %v: expected allowed=%v, got %v", i+1, arrival.Sub(times[0]), expected[i], allowed) + } + t.Logf("Packet %d at %v allowed: %v", i+1, arrival.Sub(times[0]), allowed) + } +}