Suppress repeated signal packages

This commit is contained in:
Zoltán Papp
2025-09-12 12:14:21 +02:00
parent 0c6f671a7c
commit 5e4473310c
2 changed files with 299 additions and 0 deletions

View File

@@ -0,0 +1,156 @@
package suppressor
import (
"fmt"
"time"
)
const (
// DefaultRepetitionThreshold determines after how many repetitions it will be suppressed. It is a counter
DefaultRepetitionThreshold = 90 // If the peer repeats the packets every 10 seconds, suppress them after 15 minutes
minRepetitionThreshold = 3
// minTimeBetweenPackages below this period do not check the repetitions
minTimeBetweenPackages = 7 * time.Second
toleranceRange = 1 * time.Second
)
type PeerID string
type packageStat struct {
lastSeen time.Time // last packet timestamp
lastDelta *time.Duration // time between same size of packages
lastSize int
repetitionTimes int
}
type Opts struct {
RepetitionThreshold int
}
// Suppressor filters repeated packages from peers to prevent spam or abuse.
//
// It works by keeping track of the timing and size of packages received
// from each peer. For each peer, it stores the last package size, the
// timestamp when it was seen, the time difference (delta) between consecutive
// packages of the same size, and a repetition counter.
//
// The suppressor uses the following rules:
//
// 1. **Short intervals**: If a package arrives sooner than minTimeBetweenPackages
// since the last package, it is accepted without repetition checks. This
// allows bursts or backoff recovery to pass through.
//
// 2. **Clock skew / negative delta**: If the system clock goes backward
// and produces a negative delta, the package is accepted and the state
// is reset to prevent exploitation.
//
// 3. **Size changes**: If the new package size differs from the previous
// one, the package is accepted and the repetition counter is reset.
//
// 4. **Tolerance-based repetition detection**: If a package arrives with a
// delta close to the previous delta (within the toleranceRange), it is
// considered a repeated pattern and the repetition counter is incremented.
//
// 5. **Suppression**: Once the repetition counter exceeds repetitionThreshold,
// further packages with the same timing pattern are suppressed.
//
// This design ensures that repeated or spammy traffic patterns are filtered
// while allowing legitimate variations due to network jitter or bursty traffic.
type Suppressor struct {
repetitionThreshold int
peers map[PeerID]*packageStat
}
func NewSuppressor(opts *Opts) (*Suppressor, error) {
threshold := DefaultRepetitionThreshold
if opts != nil {
if opts.RepetitionThreshold < minRepetitionThreshold {
return nil, fmt.Errorf("invalid repetition threshold")
}
threshold = opts.RepetitionThreshold
}
return &Suppressor{
repetitionThreshold: threshold,
peers: make(map[PeerID]*packageStat),
}, nil
}
// PackageReceived handles a newly received package from a peer.
//
// Parameters:
// - destination: the PeerID of the peer that sent the package
// - size: the size of the package
// - arrivedTime: the timestamp when the package arrived
//
// Returns:
// - true if the package is accepted (not suppressed)
// - false if the package is considered a repeated package and suppressed
func (s *Suppressor) PackageReceived(destination PeerID, size int, arrivedTime time.Time) bool {
p, ok := s.peers[destination]
if !ok {
s.peers[destination] = &packageStat{
lastSeen: arrivedTime,
lastSize: size,
}
return true
}
if p.lastSize != size {
p.lastSeen = arrivedTime
p.lastSize = size
p.lastDelta = nil
p.repetitionTimes = 0
return true
}
// Calculate delta
delta := arrivedTime.Sub(p.lastSeen)
// Clock went backwards - don't reset state to prevent exploitation
// Just update timestamp and continue with existing state
if delta < 0 {
p.lastSeen = arrivedTime
p.lastDelta = nil
p.repetitionTimes = 0
return true
}
// if it is below the threshold we want to allow because the backoff ticker is active
if delta < minTimeBetweenPackages {
p.lastSeen = arrivedTime
p.lastDelta = nil
p.repetitionTimes = 0
return true
}
// case when we have only one package in the history
if p.lastDelta == nil {
p.lastSeen = arrivedTime
p.lastDelta = &delta
return true
}
if abs(delta-*p.lastDelta) > toleranceRange {
p.lastSeen = arrivedTime
p.lastDelta = &delta
p.repetitionTimes = 0
return true
}
p.lastSeen = arrivedTime
p.lastDelta = &delta
p.repetitionTimes++
return p.repetitionTimes < s.repetitionThreshold
}
func abs(d time.Duration) time.Duration {
if d < 0 {
return -d
}
return d
}

View File

@@ -0,0 +1,143 @@
package suppressor
import (
"testing"
"time"
)
func TestSuppressor_PackageReceived(t *testing.T) {
destID := PeerID("remote")
s, _ := NewSuppressor(&Opts{RepetitionThreshold: 3})
// Define sequence with base deltas (s ±10% tolerance)
deltas := []time.Duration{
800 * time.Millisecond,
1600 * time.Millisecond,
3200 * time.Millisecond,
6400 * time.Millisecond,
10 * time.Second,
10 * time.Second,
10 * time.Second,
10 * time.Second, // should be suppressed
10 * time.Second,
10 * time.Second,
}
sizes := []int{
100,
100,
100,
100,
100,
100,
100,
100,
100,
100,
}
expected := []bool{
true,
true,
true,
true,
true,
true,
true,
false,
false,
false,
}
// Apply ±10% tolerance
times := make([]time.Time, len(deltas)+1)
times[0] = time.Now()
for i, d := range deltas {
// ±10% randomization
offset := d / 10
times[i+1] = times[i].Add(d + offset) // for deterministic test, using +10%
}
for i, arrival := range times[1:] {
allowed := s.PackageReceived(destID, sizes[i], arrival)
if allowed != expected[i] {
t.Errorf("Packet %d at %v: expected allowed=%v, got %v", i+1, arrival.Sub(times[0]), expected[i], allowed)
}
t.Logf("Packet %d at %v allowed: %v", i+1, arrival.Sub(times[0]), allowed)
}
}
func TestSuppressor_PackageReceivedReset(t *testing.T) {
destID := PeerID("remote")
s, _ := NewSuppressor(&Opts{RepetitionThreshold: 5})
// Define sequence with base deltas (s ±10% tolerance)
deltas := []time.Duration{
800 * time.Millisecond,
1600 * time.Millisecond,
3200 * time.Millisecond,
6400 * time.Millisecond,
10 * time.Second,
10 * time.Second,
10 * time.Second,
10 * time.Second,
10 * time.Second,
10 * time.Second,
10 * time.Second,
10 * time.Second,
10 * time.Second,
10 * time.Second,
10 * time.Second,
}
sizes := []int{
100,
100,
100,
100,
100,
100,
100,
100,
100,
100,
100,
50,
100,
100,
100,
}
expected := []bool{
true,
true,
true,
true,
true,
true,
true,
true,
true,
false,
false,
true,
true,
true,
true,
}
// Apply ±10% tolerance
times := make([]time.Time, len(deltas)+1)
times[0] = time.Now()
for i, d := range deltas {
// ±10% randomization
offset := d / 10
times[i+1] = times[i].Add(d + offset) // for deterministic test, using +10%
}
for i, arrival := range times[1:] {
allowed := s.PackageReceived(destID, sizes[i], arrival)
if allowed != expected[i] {
t.Errorf("Packet %d at %v: expected allowed=%v, got %v", i+1, arrival.Sub(times[0]), expected[i], allowed)
}
t.Logf("Packet %d at %v allowed: %v", i+1, arrival.Sub(times[0]), allowed)
}
}