mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 08:16:39 +00:00
Compare commits
4 Commits
feat/migra
...
test/grpc-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
39822808b3 | ||
|
|
694a54d196 | ||
|
|
118ca450a6 | ||
|
|
a749e4fe73 |
2
go.mod
2
go.mod
@@ -106,6 +106,7 @@ require (
|
|||||||
golang.org/x/oauth2 v0.24.0
|
golang.org/x/oauth2 v0.24.0
|
||||||
golang.org/x/sync v0.13.0
|
golang.org/x/sync v0.13.0
|
||||||
golang.org/x/term v0.31.0
|
golang.org/x/term v0.31.0
|
||||||
|
golang.org/x/time v0.5.0
|
||||||
google.golang.org/api v0.177.0
|
google.golang.org/api v0.177.0
|
||||||
gopkg.in/yaml.v3 v3.0.1
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
gorm.io/driver/mysql v1.5.7
|
gorm.io/driver/mysql v1.5.7
|
||||||
@@ -240,7 +241,6 @@ require (
|
|||||||
golang.org/x/image v0.18.0 // indirect
|
golang.org/x/image v0.18.0 // indirect
|
||||||
golang.org/x/mod v0.17.0 // indirect
|
golang.org/x/mod v0.17.0 // indirect
|
||||||
golang.org/x/text v0.24.0 // indirect
|
golang.org/x/text v0.24.0 // indirect
|
||||||
golang.org/x/time v0.5.0 // indirect
|
|
||||||
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
|
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
|
||||||
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
|
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
|
||||||
google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect
|
google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect
|
||||||
|
|||||||
@@ -3,8 +3,11 @@ package server
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand/v2"
|
||||||
"net"
|
"net"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -13,6 +16,7 @@ import (
|
|||||||
"github.com/golang/protobuf/ptypes/timestamp"
|
"github.com/golang/protobuf/ptypes/timestamp"
|
||||||
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
|
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/peer"
|
"google.golang.org/grpc/peer"
|
||||||
@@ -47,6 +51,10 @@ type GRPCServer struct {
|
|||||||
ephemeralManager *EphemeralManager
|
ephemeralManager *EphemeralManager
|
||||||
peerLocks sync.Map
|
peerLocks sync.Map
|
||||||
authManager auth.Manager
|
authManager auth.Manager
|
||||||
|
|
||||||
|
syncLimiterStore sync.Map
|
||||||
|
syncRate rate.Limit
|
||||||
|
syncBurst int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer creates a new Management server
|
// NewServer creates a new Management server
|
||||||
@@ -76,6 +84,24 @@ func NewServer(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
syncTokenPerInterval, err := strconv.Atoi(os.Getenv("NB_SYNC_RATE_PER_M"))
|
||||||
|
if syncTokenPerInterval == 0 || err != nil {
|
||||||
|
syncTokenPerInterval = 200
|
||||||
|
}
|
||||||
|
log.WithContext(ctx).Infof("sync rate limit set to %d/min", syncTokenPerInterval)
|
||||||
|
|
||||||
|
syncTokenInterval, err := time.ParseDuration(os.Getenv("NB_SYNC_RATE_INTERVAL"))
|
||||||
|
if syncTokenInterval == 0 || err != nil {
|
||||||
|
syncTokenInterval = time.Minute
|
||||||
|
}
|
||||||
|
log.WithContext(ctx).Infof("sync rate interval set to %s", syncTokenInterval)
|
||||||
|
|
||||||
|
syncBurst, err := strconv.Atoi(os.Getenv("NB_SYNC_BURST"))
|
||||||
|
if syncBurst == 0 || err != nil {
|
||||||
|
syncBurst = 200
|
||||||
|
}
|
||||||
|
log.WithContext(ctx).Infof("sync burst limit set to %d", syncBurst)
|
||||||
|
|
||||||
return &GRPCServer{
|
return &GRPCServer{
|
||||||
wgKey: key,
|
wgKey: key,
|
||||||
// peerKey -> event channel
|
// peerKey -> event channel
|
||||||
@@ -87,6 +113,8 @@ func NewServer(
|
|||||||
authManager: authManager,
|
authManager: authManager,
|
||||||
appMetrics: appMetrics,
|
appMetrics: appMetrics,
|
||||||
ephemeralManager: ephemeralManager,
|
ephemeralManager: ephemeralManager,
|
||||||
|
syncRate: rate.Every(syncTokenInterval / time.Duration(syncTokenPerInterval)),
|
||||||
|
syncBurst: syncBurst,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -162,6 +190,29 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if accountID == "cvlkjjbl0ubs73clbdr0" {
|
||||||
|
limiterIface, ok := s.syncLimiterStore.Load(req.WgPubKey)
|
||||||
|
if !ok {
|
||||||
|
// Create new limiter for this peer
|
||||||
|
newLimiter := rate.NewLimiter(s.syncRate, s.syncBurst)
|
||||||
|
s.syncLimiterStore.Store(req.WgPubKey, newLimiter)
|
||||||
|
|
||||||
|
if !newLimiter.Allow() {
|
||||||
|
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||||
|
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
|
||||||
|
return fmt.Errorf("sync rate limit reached for this peer")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Use existing limiter for this peer
|
||||||
|
limiter := limiterIface.(*rate.Limiter)
|
||||||
|
if !limiter.Allow() {
|
||||||
|
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||||
|
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
|
||||||
|
return fmt.Errorf("sync rate limit reached for this peer")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// nolint:staticcheck
|
// nolint:staticcheck
|
||||||
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
|
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user