Compare commits

..

2 Commits

Author SHA1 Message Date
Viktor Liu
a6a8d139c7 Fix tests 2025-10-09 14:31:01 +02:00
Viktor Liu
c987cddc85 Fix rule squashing with unsymmetrical proto ALL rules 2025-10-09 14:18:36 +02:00
35 changed files with 387 additions and 1058 deletions

View File

@@ -57,7 +57,7 @@ func startSignal(t *testing.T) (*grpc.Server, net.Listener) {
t.Fatal(err)
}
s := grpc.NewServer()
srv, err := sig.NewServer(context.Background(), otel.Meter(""), nil)
srv, err := sig.NewServer(context.Background(), otel.Meter(""))
require.NoError(t, err)
sigProto.RegisterSignalExchangeServer(s, srv)

View File

@@ -388,7 +388,8 @@ func (d *DefaultManager) squashAcceptRules(
// trace which type of protocols was squashed
squashedRules := []*mgmProto.FirewallRule{}
squashedProtocols := map[mgmProto.RuleProtocol]struct{}{}
squashedProtocolsIn := map[mgmProto.RuleProtocol]struct{}{}
squashedProtocolsOut := map[mgmProto.RuleProtocol]struct{}{}
// this function we use to do calculation, can we squash the rules by protocol or not.
// We summ amount of Peers IP for given protocol we found in original rules list.
@@ -397,7 +398,7 @@ func (d *DefaultManager) squashAcceptRules(
// 2. Any of rule contains Port.
//
// We zeroed this to notify squash function that this protocol can't be squashed.
addRuleToCalculationMap := func(i int, r *mgmProto.FirewallRule, protocols map[mgmProto.RuleProtocol]*protoMatch) {
addRuleToCalculationMap := func(i int, r *mgmProto.FirewallRule, protocols map[mgmProto.RuleProtocol]*protoMatch, squashedProtocols map[mgmProto.RuleProtocol]struct{}) {
hasPortRestrictions := r.Action == mgmProto.RuleAction_DROP ||
r.Port != "" || !portInfoEmpty(r.PortInfo)
@@ -435,9 +436,9 @@ func (d *DefaultManager) squashAcceptRules(
for i, r := range networkMap.FirewallRules {
// calculate squash for different directions
if r.Direction == mgmProto.RuleDirection_IN {
addRuleToCalculationMap(i, r, in)
addRuleToCalculationMap(i, r, in, squashedProtocolsIn)
} else {
addRuleToCalculationMap(i, r, out)
addRuleToCalculationMap(i, r, out, squashedProtocolsOut)
}
}
@@ -450,7 +451,7 @@ func (d *DefaultManager) squashAcceptRules(
mgmProto.RuleProtocol_UDP,
}
squash := func(matches map[mgmProto.RuleProtocol]*protoMatch, direction mgmProto.RuleDirection) {
squash := func(matches map[mgmProto.RuleProtocol]*protoMatch, direction mgmProto.RuleDirection, squashedProtocols map[mgmProto.RuleProtocol]struct{}) {
for _, protocol := range protocolOrders {
match, ok := matches[protocol]
if !ok || len(match.ips) != totalIPs || len(match.ips) < 2 {
@@ -478,11 +479,22 @@ func (d *DefaultManager) squashAcceptRules(
}
}
squash(in, mgmProto.RuleDirection_IN)
squash(out, mgmProto.RuleDirection_OUT)
squash(in, mgmProto.RuleDirection_IN, squashedProtocolsIn)
squash(out, mgmProto.RuleDirection_OUT, squashedProtocolsOut)
// if all protocol was squashed everything is allow and we can ignore all other rules
if _, ok := squashedProtocols[mgmProto.RuleProtocol_ALL]; ok {
_, inSquashed := squashedProtocolsIn[mgmProto.RuleProtocol_ALL]
_, outSquashed := squashedProtocolsOut[mgmProto.RuleProtocol_ALL]
squashedProtocols := make(map[mgmProto.RuleProtocol]struct{})
for k := range squashedProtocolsIn {
squashedProtocols[k] = struct{}{}
}
for k := range squashedProtocolsOut {
squashedProtocols[k] = struct{}{}
}
if inSquashed && outSquashed {
return squashedRules, squashedProtocols
}
@@ -494,10 +506,15 @@ func (d *DefaultManager) squashAcceptRules(
// filter out rules which was squashed from final list
// if we also have other not squashed rules.
for i, r := range networkMap.FirewallRules {
squashedProtocols := squashedProtocolsIn
protocols := in
if r.Direction == mgmProto.RuleDirection_OUT {
squashedProtocols = squashedProtocolsOut
protocols = out
}
if _, ok := squashedProtocols[r.Protocol]; ok {
if m, ok := in[r.Protocol]; ok && m.ips[r.PeerIP] == i {
continue
} else if m, ok := out[r.Protocol]; ok && m.ips[r.PeerIP] == i {
if m, ok := protocols[r.Protocol]; ok && m.ips[r.PeerIP] == i {
continue
}
}

View File

@@ -758,6 +758,129 @@ func TestPortInfoEmpty(t *testing.T) {
}
}
func TestDefaultManagerSquashRulesMixed(t *testing.T) {
networkMap := &mgmProto.NetworkMap{
RemotePeers: []*mgmProto.RemotePeerConfig{
{AllowedIps: []string{"100.66.152.160"}},
},
FirewallRules: []*mgmProto.FirewallRule{
{
PeerIP: "0.0.0.0",
Direction: mgmProto.RuleDirection_OUT,
Action: mgmProto.RuleAction_ACCEPT,
Protocol: mgmProto.RuleProtocol_ALL,
},
{
PeerIP: "100.66.152.160",
Direction: mgmProto.RuleDirection_IN,
Action: mgmProto.RuleAction_ACCEPT,
Protocol: mgmProto.RuleProtocol_ALL,
},
},
}
manager := &DefaultManager{}
rules, _ := manager.squashAcceptRules(networkMap)
assert.Equal(t, 2, len(rules))
var inRules, outRules []*mgmProto.FirewallRule
for _, r := range rules {
if r.Direction == mgmProto.RuleDirection_IN {
inRules = append(inRules, r)
} else {
outRules = append(outRules, r)
}
}
assert.Equal(t, 1, len(inRules))
assert.Equal(t, 1, len(outRules))
assert.Equal(t, "0.0.0.0", outRules[0].PeerIP)
assert.Equal(t, "100.66.152.160", inRules[0].PeerIP)
}
func TestDefaultManagerSquashRulesBothOptimized(t *testing.T) {
networkMap := &mgmProto.NetworkMap{
RemotePeers: []*mgmProto.RemotePeerConfig{
{AllowedIps: []string{"100.66.152.160"}},
},
FirewallRules: []*mgmProto.FirewallRule{
{
PeerIP: "0.0.0.0",
Direction: mgmProto.RuleDirection_OUT,
Action: mgmProto.RuleAction_ACCEPT,
Protocol: mgmProto.RuleProtocol_ALL,
},
{
PeerIP: "0.0.0.0",
Direction: mgmProto.RuleDirection_IN,
Action: mgmProto.RuleAction_ACCEPT,
Protocol: mgmProto.RuleProtocol_ALL,
},
},
}
manager := &DefaultManager{}
rules, _ := manager.squashAcceptRules(networkMap)
assert.Equal(t, 2, len(rules))
var inRules, outRules []*mgmProto.FirewallRule
for _, r := range rules {
if r.Direction == mgmProto.RuleDirection_IN {
inRules = append(inRules, r)
} else {
outRules = append(outRules, r)
}
}
assert.Equal(t, 1, len(inRules))
assert.Equal(t, 1, len(outRules))
assert.Equal(t, "0.0.0.0", outRules[0].PeerIP)
assert.Equal(t, "0.0.0.0", inRules[0].PeerIP)
}
func TestDefaultManagerSquashRulesBothSpecific(t *testing.T) {
networkMap := &mgmProto.NetworkMap{
RemotePeers: []*mgmProto.RemotePeerConfig{
{AllowedIps: []string{"100.66.152.160"}},
},
FirewallRules: []*mgmProto.FirewallRule{
{
PeerIP: "100.66.152.160",
Direction: mgmProto.RuleDirection_OUT,
Action: mgmProto.RuleAction_ACCEPT,
Protocol: mgmProto.RuleProtocol_ALL,
},
{
PeerIP: "100.66.152.160",
Direction: mgmProto.RuleDirection_IN,
Action: mgmProto.RuleAction_ACCEPT,
Protocol: mgmProto.RuleProtocol_ALL,
},
},
}
manager := &DefaultManager{}
rules, _ := manager.squashAcceptRules(networkMap)
assert.Equal(t, 2, len(rules))
var inRules, outRules []*mgmProto.FirewallRule
for _, r := range rules {
if r.Direction == mgmProto.RuleDirection_IN {
inRules = append(inRules, r)
} else {
outRules = append(outRules, r)
}
}
assert.Equal(t, 1, len(inRules))
assert.Equal(t, 1, len(outRules))
assert.Equal(t, "100.66.152.160", outRules[0].PeerIP)
assert.Equal(t, "100.66.152.160", inRules[0].PeerIP)
}
func TestDefaultManagerEnableSSHRules(t *testing.T) {
networkMap := &mgmProto.NetworkMap{
PeerConfig: &mgmProto.PeerConfig{

View File

@@ -1,78 +0,0 @@
package dnsfwd
import (
"net/netip"
"slices"
"strings"
"sync"
"github.com/miekg/dns"
)
type cache struct {
mu sync.RWMutex
records map[string]*cacheEntry
}
type cacheEntry struct {
ip4Addrs []netip.Addr
ip6Addrs []netip.Addr
}
func newCache() *cache {
return &cache{
records: make(map[string]*cacheEntry),
}
}
func (c *cache) get(domain string, reqType uint16) ([]netip.Addr, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
entry, exists := c.records[normalizeDomain(domain)]
if !exists {
return nil, false
}
switch reqType {
case dns.TypeA:
return slices.Clone(entry.ip4Addrs), true
case dns.TypeAAAA:
return slices.Clone(entry.ip6Addrs), true
default:
return nil, false
}
}
func (c *cache) set(domain string, reqType uint16, addrs []netip.Addr) {
c.mu.Lock()
defer c.mu.Unlock()
norm := normalizeDomain(domain)
entry, exists := c.records[norm]
if !exists {
entry = &cacheEntry{}
c.records[norm] = entry
}
switch reqType {
case dns.TypeA:
entry.ip4Addrs = slices.Clone(addrs)
case dns.TypeAAAA:
entry.ip6Addrs = slices.Clone(addrs)
}
}
// unset removes cached entries for the given domain and request type.
func (c *cache) unset(domain string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.records, normalizeDomain(domain))
}
// normalizeDomain converts an input domain into a canonical form used as cache key:
// lowercase and fully-qualified (with trailing dot).
func normalizeDomain(domain string) string {
// dns.Fqdn ensures trailing dot; ToLower for consistent casing
return dns.Fqdn(strings.ToLower(domain))
}

View File

@@ -1,86 +0,0 @@
package dnsfwd
import (
"net/netip"
"testing"
)
func mustAddr(t *testing.T, s string) netip.Addr {
t.Helper()
a, err := netip.ParseAddr(s)
if err != nil {
t.Fatalf("parse addr %s: %v", s, err)
}
return a
}
func TestCacheNormalization(t *testing.T) {
c := newCache()
// Mixed case, without trailing dot
domainInput := "ExAmPlE.CoM"
ipv4 := []netip.Addr{mustAddr(t, "1.2.3.4")}
c.set(domainInput, 1 /* dns.TypeA */, ipv4)
// Lookup with lower, with trailing dot
if got, ok := c.get("example.com.", 1); !ok || len(got) != 1 || got[0].String() != "1.2.3.4" {
t.Fatalf("expected cached IPv4 result via normalized key, got=%v ok=%v", got, ok)
}
// Lookup with different casing again
if got, ok := c.get("EXAMPLE.COM", 1); !ok || len(got) != 1 || got[0].String() != "1.2.3.4" {
t.Fatalf("expected cached IPv4 result via different casing, got=%v ok=%v", got, ok)
}
}
func TestCacheSeparateTypes(t *testing.T) {
c := newCache()
domain := "test.local"
ipv4 := []netip.Addr{mustAddr(t, "10.0.0.1")}
ipv6 := []netip.Addr{mustAddr(t, "2001:db8::1")}
c.set(domain, 1 /* A */, ipv4)
c.set(domain, 28 /* AAAA */, ipv6)
got4, ok4 := c.get(domain, 1)
if !ok4 || len(got4) != 1 || got4[0] != ipv4[0] {
t.Fatalf("expected A record from cache, got=%v ok=%v", got4, ok4)
}
got6, ok6 := c.get(domain, 28)
if !ok6 || len(got6) != 1 || got6[0] != ipv6[0] {
t.Fatalf("expected AAAA record from cache, got=%v ok=%v", got6, ok6)
}
}
func TestCacheCloneOnGetAndSet(t *testing.T) {
c := newCache()
domain := "clone.test"
src := []netip.Addr{mustAddr(t, "8.8.8.8")}
c.set(domain, 1, src)
// Mutate source slice; cache should be unaffected
src[0] = mustAddr(t, "9.9.9.9")
got, ok := c.get(domain, 1)
if !ok || len(got) != 1 || got[0].String() != "8.8.8.8" {
t.Fatalf("expected cached value to be independent of source slice, got=%v ok=%v", got, ok)
}
// Mutate returned slice; internal cache should remain unchanged
got[0] = mustAddr(t, "4.4.4.4")
got2, ok2 := c.get(domain, 1)
if !ok2 || len(got2) != 1 || got2[0].String() != "8.8.8.8" {
t.Fatalf("expected returned slice to be a clone, got=%v ok=%v", got2, ok2)
}
}
func TestCacheMiss(t *testing.T) {
c := newCache()
if got, ok := c.get("missing.example", 1); ok || got != nil {
t.Fatalf("expected cache miss, got=%v ok=%v", got, ok)
}
}

View File

@@ -46,7 +46,6 @@ type DNSForwarder struct {
fwdEntries []*ForwarderEntry
firewall firewaller
resolver resolver
cache *cache
}
func NewDNSForwarder(listenAddress string, ttl uint32, firewall firewaller, statusRecorder *peer.Status) *DNSForwarder {
@@ -57,7 +56,6 @@ func NewDNSForwarder(listenAddress string, ttl uint32, firewall firewaller, stat
firewall: firewall,
statusRecorder: statusRecorder,
resolver: net.DefaultResolver,
cache: newCache(),
}
}
@@ -105,39 +103,10 @@ func (f *DNSForwarder) UpdateDomains(entries []*ForwarderEntry) {
f.mutex.Lock()
defer f.mutex.Unlock()
// remove cache entries for domains that no longer appear
f.removeStaleCacheEntries(f.fwdEntries, entries)
f.fwdEntries = entries
log.Debugf("Updated DNS forwarder with %d domains", len(entries))
}
// removeStaleCacheEntries unsets cache items for domains that were present
// in the old list but not present in the new list.
func (f *DNSForwarder) removeStaleCacheEntries(oldEntries, newEntries []*ForwarderEntry) {
if f.cache == nil {
return
}
newSet := make(map[string]struct{}, len(newEntries))
for _, e := range newEntries {
if e == nil {
continue
}
newSet[e.Domain.PunycodeString()] = struct{}{}
}
for _, e := range oldEntries {
if e == nil {
continue
}
pattern := e.Domain.PunycodeString()
if _, ok := newSet[pattern]; !ok {
f.cache.unset(pattern)
}
}
}
func (f *DNSForwarder) Close(ctx context.Context) error {
var result *multierror.Error
@@ -202,7 +171,6 @@ func (f *DNSForwarder) handleDNSQuery(w dns.ResponseWriter, query *dns.Msg) *dns
f.updateInternalState(ips, mostSpecificResId, matchingEntries)
f.addIPsToResponse(resp, domain, ips)
f.cache.set(domain, question.Qtype, ips)
return resp
}
@@ -314,69 +282,29 @@ func (f *DNSForwarder) setResponseCodeForNotFound(ctx context.Context, resp *dns
resp.Rcode = dns.RcodeSuccess
}
// handleDNSError processes DNS lookup errors and sends an appropriate error response.
func (f *DNSForwarder) handleDNSError(
ctx context.Context,
w dns.ResponseWriter,
question dns.Question,
resp *dns.Msg,
domain string,
err error,
) {
// Default to SERVFAIL; override below when appropriate.
resp.Rcode = dns.RcodeServerFailure
qType := question.Qtype
qTypeName := dns.TypeToString[qType]
// Prefer typed DNS errors; fall back to generic logging otherwise.
// handleDNSError processes DNS lookup errors and sends an appropriate error response
func (f *DNSForwarder) handleDNSError(ctx context.Context, w dns.ResponseWriter, question dns.Question, resp *dns.Msg, domain string, err error) {
var dnsErr *net.DNSError
if !errors.As(err, &dnsErr) {
log.Warnf(errResolveFailed, domain, err)
if writeErr := w.WriteMsg(resp); writeErr != nil {
log.Errorf("failed to write failure DNS response: %v", writeErr)
}
return
}
// NotFound: set NXDOMAIN / appropriate code via helper.
if dnsErr.IsNotFound {
f.setResponseCodeForNotFound(ctx, resp, domain, qType)
if writeErr := w.WriteMsg(resp); writeErr != nil {
log.Errorf("failed to write failure DNS response: %v", writeErr)
switch {
case errors.As(err, &dnsErr):
resp.Rcode = dns.RcodeServerFailure
if dnsErr.IsNotFound {
f.setResponseCodeForNotFound(ctx, resp, domain, question.Qtype)
}
f.cache.set(domain, question.Qtype, nil)
return
}
// Upstream failed but we might have a cached answer—serve it if present.
if ips, ok := f.cache.get(domain, qType); ok {
if len(ips) > 0 {
log.Debugf("serving cached DNS response after upstream failure: domain=%s type=%s", domain, qTypeName)
f.addIPsToResponse(resp, domain, ips)
resp.Rcode = dns.RcodeSuccess
if writeErr := w.WriteMsg(resp); writeErr != nil {
log.Errorf("failed to write cached DNS response: %v", writeErr)
}
} else { // send NXDOMAIN / appropriate code if cache is empty
f.setResponseCodeForNotFound(ctx, resp, domain, qType)
if writeErr := w.WriteMsg(resp); writeErr != nil {
log.Errorf("failed to write failure DNS response: %v", writeErr)
}
if dnsErr.Server != "" {
log.Warnf("failed to resolve query for type=%s domain=%s server=%s: %v", dns.TypeToString[question.Qtype], domain, dnsErr.Server, err)
} else {
log.Warnf(errResolveFailed, domain, err)
}
return
}
// No cache. Log with or without the server field for more context.
if dnsErr.Server != "" {
log.Warnf("failed to resolve: type=%s domain=%s server=%s: %v", qTypeName, domain, dnsErr.Server, err)
} else {
default:
resp.Rcode = dns.RcodeServerFailure
log.Warnf(errResolveFailed, domain, err)
}
// Write final failure response.
if writeErr := w.WriteMsg(resp); writeErr != nil {
log.Errorf("failed to write failure DNS response: %v", writeErr)
if err := w.WriteMsg(resp); err != nil {
log.Errorf("failed to write failure DNS response: %v", err)
}
}

View File

@@ -648,95 +648,6 @@ func TestDNSForwarder_TCPTruncation(t *testing.T) {
assert.LessOrEqual(t, writtenResp.Len(), dns.MinMsgSize, "Response should fit in minimum UDP size")
}
// Ensures that when the first query succeeds and populates the cache,
// a subsequent upstream failure still returns a successful response from cache.
func TestDNSForwarder_ServeFromCacheOnUpstreamFailure(t *testing.T) {
mockResolver := &MockResolver{}
forwarder := NewDNSForwarder("127.0.0.1:0", 300, nil, &peer.Status{})
forwarder.resolver = mockResolver
d, err := domain.FromString("example.com")
require.NoError(t, err)
entries := []*ForwarderEntry{{Domain: d, ResID: "res-cache"}}
forwarder.UpdateDomains(entries)
ip := netip.MustParseAddr("1.2.3.4")
// First call resolves successfully and populates cache
mockResolver.On("LookupNetIP", mock.Anything, "ip4", dns.Fqdn("example.com")).
Return([]netip.Addr{ip}, nil).Once()
// Second call fails upstream; forwarder should serve from cache
mockResolver.On("LookupNetIP", mock.Anything, "ip4", dns.Fqdn("example.com")).
Return([]netip.Addr{}, &net.DNSError{Err: "temporary failure"}).Once()
// First query: populate cache
q1 := &dns.Msg{}
q1.SetQuestion(dns.Fqdn("example.com"), dns.TypeA)
w1 := &test.MockResponseWriter{}
resp1 := forwarder.handleDNSQuery(w1, q1)
require.NotNil(t, resp1)
require.Equal(t, dns.RcodeSuccess, resp1.Rcode)
require.Len(t, resp1.Answer, 1)
// Second query: serve from cache after upstream failure
q2 := &dns.Msg{}
q2.SetQuestion(dns.Fqdn("example.com"), dns.TypeA)
var writtenResp *dns.Msg
w2 := &test.MockResponseWriter{WriteMsgFunc: func(m *dns.Msg) error { writtenResp = m; return nil }}
_ = forwarder.handleDNSQuery(w2, q2)
require.NotNil(t, writtenResp, "expected response to be written")
require.Equal(t, dns.RcodeSuccess, writtenResp.Rcode)
require.Len(t, writtenResp.Answer, 1)
mockResolver.AssertExpectations(t)
}
// Verifies that cache normalization works across casing and trailing dot variations.
func TestDNSForwarder_CacheNormalizationCasingAndDot(t *testing.T) {
mockResolver := &MockResolver{}
forwarder := NewDNSForwarder("127.0.0.1:0", 300, nil, &peer.Status{})
forwarder.resolver = mockResolver
d, err := domain.FromString("ExAmPlE.CoM")
require.NoError(t, err)
entries := []*ForwarderEntry{{Domain: d, ResID: "res-norm"}}
forwarder.UpdateDomains(entries)
ip := netip.MustParseAddr("9.8.7.6")
// Initial resolution with mixed case to populate cache
mixedQuery := "ExAmPlE.CoM"
mockResolver.On("LookupNetIP", mock.Anything, "ip4", dns.Fqdn(strings.ToLower(mixedQuery))).
Return([]netip.Addr{ip}, nil).Once()
q1 := &dns.Msg{}
q1.SetQuestion(mixedQuery+".", dns.TypeA)
w1 := &test.MockResponseWriter{}
resp1 := forwarder.handleDNSQuery(w1, q1)
require.NotNil(t, resp1)
require.Equal(t, dns.RcodeSuccess, resp1.Rcode)
require.Len(t, resp1.Answer, 1)
// Subsequent query without dot and upper case should hit cache even if upstream fails
// Forwarder lowercases and uses the question name as-is (no trailing dot here)
mockResolver.On("LookupNetIP", mock.Anything, "ip4", strings.ToLower("EXAMPLE.COM")).
Return([]netip.Addr{}, &net.DNSError{Err: "temporary failure"}).Once()
q2 := &dns.Msg{}
q2.SetQuestion("EXAMPLE.COM", dns.TypeA)
var writtenResp *dns.Msg
w2 := &test.MockResponseWriter{WriteMsgFunc: func(m *dns.Msg) error { writtenResp = m; return nil }}
_ = forwarder.handleDNSQuery(w2, q2)
require.NotNil(t, writtenResp)
require.Equal(t, dns.RcodeSuccess, writtenResp.Rcode)
require.Len(t, writtenResp.Answer, 1)
mockResolver.AssertExpectations(t)
}
func TestDNSForwarder_MultipleOverlappingPatterns(t *testing.T) {
// Test complex overlapping pattern scenarios
mockFirewall := &MockFirewall{}

View File

@@ -1512,7 +1512,7 @@ func startSignal(t *testing.T) (*grpc.Server, string, error) {
log.Fatalf("failed to listen: %v", err)
}
srv, err := signalServer.NewServer(context.Background(), otel.Meter(""), nil)
srv, err := signalServer.NewServer(context.Background(), otel.Meter(""))
require.NoError(t, err)
proto.RegisterSignalExchangeServer(s, srv)

View File

@@ -107,14 +107,10 @@ type Conn struct {
wgProxyRelay wgproxy.Proxy
handshaker *Handshaker
guard Guard
guard *guard.Guard
semaphore *semaphoregroup.SemaphoreGroup
wg sync.WaitGroup
// used for replace the new guard with the old one in a thread-safe way
guardCtxCancel context.CancelFunc
wgGuard sync.WaitGroup
// debug purpose
dumpState *stateDump
@@ -175,9 +171,9 @@ func (conn *Conn) Open(engineCtx context.Context) error {
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay)
conn.handshaker.AddRelayListener(conn.workerRelay.OnNewOffer)
conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer)
if !isForceRelayed() {
conn.handshaker.AddICEListener(conn.workerICE.OnNewOffer)
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
}
conn.guard = guard.NewGuard(conn.Log, conn.isConnectedOnAllWay, conn.config.Timeout, conn.srWatcher)
@@ -200,34 +196,14 @@ func (conn *Conn) Open(engineCtx context.Context) error {
}
conn.wg.Add(1)
conn.wgGuard.Add(1)
guardCtx, cancel := context.WithCancel(conn.ctx)
conn.guardCtxCancel = cancel
go func() {
defer conn.wg.Done()
defer conn.wgGuard.Done()
defer cancel()
conn.waitInitialRandomSleepTime(conn.ctx)
conn.semaphore.Done(conn.ctx)
conn.guard.Start(guardCtx, conn.onGuardEvent)
conn.guard.Start(conn.ctx, conn.onGuardEvent)
}()
// both peer send offer
if err := conn.handshaker.SendOffer(); err != nil {
switch err {
case ErrPeerNotAvailable:
conn.Log.Warnf("failed to deliver offer to peer. Peer is not available")
case ErrSignalNotSupportDeliveryCheck:
conn.Log.Infof("signal delivery check is not supported, switch guard to retry mode")
conn.switchGuard()
default:
conn.Log.Errorf("failed to deliver offer to peer: %v", err)
conn.guard.FailedToSendOffer()
}
}
conn.opened = true
return nil
}
@@ -580,17 +556,7 @@ func (conn *Conn) onRelayDisconnected() {
func (conn *Conn) onGuardEvent() {
conn.dumpState.SendOffer()
if err := conn.handshaker.SendOffer(); err != nil {
switch err {
case ErrPeerNotAvailable:
conn.Log.Warnf("failed to deliver offer to peer. Peer is not available")
case ErrSignalNotSupportDeliveryCheck:
conn.Log.Infof("signal delivery check is not supported, switch guard to retry mode")
// must run on a separate goroutine to prevent deadlock while close the old guard
go conn.switchGuard()
default:
conn.Log.Errorf("failed to deliver offer to peer: %v", err)
conn.guard.FailedToSendOffer()
}
conn.Log.Errorf("failed to send offer: %v", err)
}
}
@@ -812,22 +778,6 @@ func (conn *Conn) rosenpassDetermKey() (*wgtypes.Key, error) {
return &key, nil
}
func (conn *Conn) switchGuard() {
if conn.guardCtxCancel == nil {
return
}
conn.guardCtxCancel()
conn.guardCtxCancel = nil
conn.wgGuard.Wait()
conn.wg.Add(1)
go func() {
defer conn.wg.Done()
conn.guard = guard.NewRetryGuard(conn.Log, conn.isConnectedOnAllWay, conn.config.Timeout, conn.srWatcher)
conn.guard.Start(conn.ctx, conn.onGuardEvent)
}()
}
func isController(config ConnConfig) bool {
return config.LocalKey > config.Key
}

View File

@@ -79,10 +79,10 @@ func TestConn_OnRemoteOffer(t *testing.T) {
return
}
onNewOfferChan := make(chan struct{})
onNewOffeChan := make(chan struct{})
conn.handshaker.AddRelayListener(func(remoteOfferAnswer *OfferAnswer) {
onNewOfferChan <- struct{}{}
conn.handshaker.AddOnNewOfferListener(func(remoteOfferAnswer *OfferAnswer) {
onNewOffeChan <- struct{}{}
})
conn.OnRemoteOffer(OfferAnswer{
@@ -98,7 +98,7 @@ func TestConn_OnRemoteOffer(t *testing.T) {
defer cancel()
select {
case <-onNewOfferChan:
case <-onNewOffeChan:
// success
case <-ctx.Done():
t.Error("expected to receive a new offer notification, but timed out")
@@ -118,10 +118,10 @@ func TestConn_OnRemoteAnswer(t *testing.T) {
return
}
onNewOfferChan := make(chan struct{})
onNewOffeChan := make(chan struct{})
conn.handshaker.AddRelayListener(func(remoteOfferAnswer *OfferAnswer) {
onNewOfferChan <- struct{}{}
conn.handshaker.AddOnNewOfferListener(func(remoteOfferAnswer *OfferAnswer) {
onNewOffeChan <- struct{}{}
})
conn.OnRemoteAnswer(OfferAnswer{
@@ -136,7 +136,7 @@ func TestConn_OnRemoteAnswer(t *testing.T) {
defer cancel()
select {
case <-onNewOfferChan:
case <-onNewOffeChan:
// success
case <-ctx.Done():
t.Error("expected to receive a new offer notification, but timed out")

View File

@@ -1,10 +0,0 @@
package peer
import "context"
type Guard interface {
Start(ctx context.Context, eventCallback func())
SetRelayedConnDisconnected()
SetICEConnDisconnected()
FailedToSendOffer()
}

View File

@@ -1,20 +0,0 @@
package guard
import (
"os"
"strconv"
"time"
)
const (
envICEMonitorPeriod = "NB_ICE_MONITOR_PERIOD"
)
func GetICEMonitorPeriod() time.Duration {
if envVal := os.Getenv(envICEMonitorPeriod); envVal != "" {
if seconds, err := strconv.Atoi(envVal); err == nil && seconds > 0 {
return time.Duration(seconds) * time.Second
}
}
return defaultCandidatesMonitorPeriod
}

View File

@@ -4,26 +4,20 @@ import (
"context"
"time"
"github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus"
)
const (
offerResendPeriod = 2 * time.Second
)
type isConnectedFunc func() bool
// Guard is responsible for the reconnection logic.
// It will trigger to send an offer to the peer then has connection issues.
// Only the offer error will start the timer to resend offer periodically.
//
// Watch these events:
// - Relay client reconnected to home server
// - Signal server connection state changed
// - ICE connection disconnected
// - Relayed connection disconnected
// - ICE candidate changes
// - Failed to send offer to remote peer
type Guard struct {
log *log.Entry
isConnectedOnAllWay isConnectedFunc
@@ -31,7 +25,6 @@ type Guard struct {
srWatcher *SRWatcher
relayedConnDisconnected chan struct{}
iCEConnDisconnected chan struct{}
offerError chan struct{}
}
func NewGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *Guard {
@@ -42,7 +35,6 @@ func NewGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Durati
srWatcher: srWatcher,
relayedConnDisconnected: make(chan struct{}, 1),
iCEConnDisconnected: make(chan struct{}, 1),
offerError: make(chan struct{}, 1),
}
}
@@ -65,54 +57,81 @@ func (g *Guard) SetICEConnDisconnected() {
}
}
func (g *Guard) FailedToSendOffer() {
select {
case g.offerError <- struct{}{}:
default:
}
}
// reconnectLoopWithRetry periodically check the connection status.
// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported
func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
srReconnectedChan := g.srWatcher.NewListener()
defer g.srWatcher.RemoveListener(srReconnectedChan)
offerResendTimer := time.NewTimer(0)
offerResendTimer.Stop()
defer offerResendTimer.Stop()
ticker := g.initialTicker(ctx)
defer ticker.Stop()
tickerChannel := ticker.C
for {
select {
case t := <-tickerChannel:
if t.IsZero() {
g.log.Infof("retry timed out, stop periodic offer sending")
// after backoff timeout the ticker.C will be closed. We need to a dummy channel to avoid loop
tickerChannel = make(<-chan time.Time)
continue
}
if !g.isConnectedOnAllWay() {
callback()
}
case <-g.relayedConnDisconnected:
g.log.Debugf("Relay connection changed, retry connection")
offerResendTimer.Stop()
if !g.isConnectedOnAllWay() {
callback()
}
g.log.Debugf("Relay connection changed, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C
case <-g.iCEConnDisconnected:
g.log.Debugf("ICE connection changed, retry connection")
offerResendTimer.Stop()
if !g.isConnectedOnAllWay() {
callback()
}
g.log.Debugf("ICE connection changed, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C
case <-srReconnectedChan:
g.log.Debugf("has network changes, retry connection")
offerResendTimer.Stop()
if !g.isConnectedOnAllWay() {
callback()
}
case <-g.offerError:
g.log.Debugf("failed to send offer, reset reconnection ticker")
offerResendTimer.Reset(offerResendPeriod)
continue
case <-offerResendTimer.C:
if !g.isConnectedOnAllWay() {
callback()
}
g.log.Debugf("has network changes, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C
case <-ctx.Done():
g.log.Debugf("context is done, stop reconnect loop")
return
}
}
}
// initialTicker give chance to the peer to establish the initial connection.
func (g *Guard) initialTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 3 * time.Second,
RandomizationFactor: 0.1,
Multiplier: 2,
MaxInterval: g.timeout,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
return backoff.NewTicker(bo)
}
func (g *Guard) prepareExponentTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: 0.1,
Multiplier: 2,
MaxInterval: g.timeout,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
ticker := backoff.NewTicker(bo)
<-ticker.C // consume the initial tick what is happening right after the ticker has been created
return ticker
}

View File

@@ -1,139 +0,0 @@
package guard
import (
"context"
"time"
"github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus"
)
// RetryGuard is responsible for the reconnection logic.
// It will trigger to send an offer to the peer then has connection issues.
// Watch these events:
// - Relay client reconnected to home server
// - Signal server connection state changed
// - ICE connection disconnected
// - Relayed connection disconnected
// - ICE candidate changes
type RetryGuard struct {
log *log.Entry
isConnectedOnAllWay isConnectedFunc
timeout time.Duration
srWatcher *SRWatcher
relayedConnDisconnected chan struct{}
iCEConnDisconnected chan struct{}
}
func (g *RetryGuard) FailedToSendOffer() {
log.Errorf("FailedToSendOffer is not implemented in GuardRetry")
}
func NewRetryGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *RetryGuard {
return &RetryGuard{
log: log,
isConnectedOnAllWay: isConnectedFn,
timeout: timeout,
srWatcher: srWatcher,
relayedConnDisconnected: make(chan struct{}, 1),
iCEConnDisconnected: make(chan struct{}, 1),
}
}
func (g *RetryGuard) Start(ctx context.Context, eventCallback func()) {
g.log.Infof("starting guard for reconnection with MaxInterval: %s", g.timeout)
g.reconnectLoopWithRetry(ctx, eventCallback)
}
func (g *RetryGuard) SetRelayedConnDisconnected() {
select {
case g.relayedConnDisconnected <- struct{}{}:
default:
}
}
func (g *RetryGuard) SetICEConnDisconnected() {
select {
case g.iCEConnDisconnected <- struct{}{}:
default:
}
}
// reconnectLoopWithRetry periodically check the connection status.
// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported
func (g *RetryGuard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
srReconnectedChan := g.srWatcher.NewListener()
defer g.srWatcher.RemoveListener(srReconnectedChan)
ticker := g.initialTicker(ctx)
defer ticker.Stop()
tickerChannel := ticker.C
for {
select {
case t := <-tickerChannel:
if t.IsZero() {
g.log.Infof("retry timed out, stop periodic offer sending")
// after backoff timeout the ticker.C will be closed. We need to a dummy channel to avoid loop
tickerChannel = make(<-chan time.Time)
continue
}
if !g.isConnectedOnAllWay() {
callback()
}
case <-g.relayedConnDisconnected:
g.log.Debugf("Relay connection changed, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C
case <-g.iCEConnDisconnected:
g.log.Debugf("ICE connection changed, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C
case <-srReconnectedChan:
g.log.Debugf("has network changes, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C
case <-ctx.Done():
g.log.Debugf("context is done, stop reconnect loop")
return
}
}
}
// initialTicker give chance to the peer to establish the initial connection.
func (g *RetryGuard) initialTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 3 * time.Second,
RandomizationFactor: 0.1,
Multiplier: 2,
MaxInterval: g.timeout,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
return backoff.NewTicker(bo)
}
func (g *RetryGuard) prepareExponentTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: 0.1,
Multiplier: 2,
MaxInterval: g.timeout,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
ticker := backoff.NewTicker(bo)
<-ticker.C // consume the initial tick what is happening right after the ticker has been created
return ticker
}

View File

@@ -16,8 +16,8 @@ import (
)
const (
defaultCandidatesMonitorPeriod = 5 * time.Minute
candidateGatheringTimeout = 5 * time.Second
candidatesMonitorPeriod = 5 * time.Minute
candidateGatheringTimeout = 5 * time.Second
)
type ICEMonitor struct {
@@ -25,19 +25,16 @@ type ICEMonitor struct {
iFaceDiscover stdnet.ExternalIFaceDiscover
iceConfig icemaker.Config
tickerPeriod time.Duration
currentCandidatesAddress []string
candidatesMu sync.Mutex
}
func NewICEMonitor(iFaceDiscover stdnet.ExternalIFaceDiscover, config icemaker.Config, period time.Duration) *ICEMonitor {
log.Debugf("prepare ICE monitor with period: %s", period)
func NewICEMonitor(iFaceDiscover stdnet.ExternalIFaceDiscover, config icemaker.Config) *ICEMonitor {
cm := &ICEMonitor{
ReconnectCh: make(chan struct{}, 1),
iFaceDiscover: iFaceDiscover,
iceConfig: config,
tickerPeriod: period,
}
return cm
}
@@ -49,12 +46,7 @@ func (cm *ICEMonitor) Start(ctx context.Context, onChanged func()) {
return
}
// Initial check to populate the candidates for later comparison
if _, err := cm.handleCandidateTick(ctx, ufrag, pwd); err != nil {
log.Warnf("Failed to check initial ICE candidates: %v", err)
}
ticker := time.NewTicker(cm.tickerPeriod)
ticker := time.NewTicker(candidatesMonitorPeriod)
defer ticker.Stop()
for {

View File

@@ -51,7 +51,7 @@ func (w *SRWatcher) Start() {
ctx, cancel := context.WithCancel(context.Background())
w.cancelIceMonitor = cancel
iceMonitor := NewICEMonitor(w.iFaceDiscover, w.iceConfig, GetICEMonitorPeriod())
iceMonitor := NewICEMonitor(w.iFaceDiscover, w.iceConfig)
go iceMonitor.Start(ctx, w.onICEChanged)
w.signalClient.SetOnReconnectedListener(w.onReconnected)
w.relayManager.SetOnReconnectedListener(w.onReconnected)

View File

@@ -44,19 +44,13 @@ type OfferAnswer struct {
}
type Handshaker struct {
mu sync.Mutex
log *log.Entry
config ConnConfig
signaler *Signaler
ice *WorkerICE
relay *WorkerRelay
// relayListener is not blocking because the listener is using a goroutine to process the messages
// and it will only keep the latest message if multiple offers are received in a short time
// this is to avoid blocking the handshaker if the listener is doing some heavy processing
// and also to avoid processing old offers if multiple offers are received in a short time
// the listener will always process the latest offer
relayListener *AsyncOfferListener
iceListener func(remoteOfferAnswer *OfferAnswer)
mu sync.Mutex
log *log.Entry
config ConnConfig
signaler *Signaler
ice *WorkerICE
relay *WorkerRelay
onNewOfferListeners []*OfferListener
// remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection
remoteOffersCh chan OfferAnswer
@@ -76,39 +70,28 @@ func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *W
}
}
func (h *Handshaker) AddRelayListener(offer func(remoteOfferAnswer *OfferAnswer)) {
h.relayListener = NewAsyncOfferListener(offer)
}
func (h *Handshaker) AddICEListener(offer func(remoteOfferAnswer *OfferAnswer)) {
h.iceListener = offer
func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAnswer)) {
l := NewOfferListener(offer)
h.onNewOfferListeners = append(h.onNewOfferListeners, l)
}
func (h *Handshaker) Listen(ctx context.Context) {
for {
select {
case remoteOfferAnswer := <-h.remoteOffersCh:
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
if h.relayListener != nil {
h.relayListener.Notify(&remoteOfferAnswer)
}
if h.iceListener != nil {
h.iceListener(&remoteOfferAnswer)
}
// received confirmation from the remote peer -> ready to proceed
if err := h.sendAnswer(); err != nil {
h.log.Errorf("failed to send remote offer confirmation: %s", err)
continue
}
for _, listener := range h.onNewOfferListeners {
listener.Notify(&remoteOfferAnswer)
}
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
case remoteOfferAnswer := <-h.remoteAnswerCh:
h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
if h.relayListener != nil {
h.relayListener.Notify(&remoteOfferAnswer)
}
if h.iceListener != nil {
h.iceListener(&remoteOfferAnswer)
for _, listener := range h.onNewOfferListeners {
listener.Notify(&remoteOfferAnswer)
}
case <-ctx.Done():
h.log.Infof("stop listening for remote offers and answers")

View File

@@ -13,20 +13,20 @@ func (oa *OfferAnswer) SessionIDString() string {
return oa.SessionID.String()
}
type AsyncOfferListener struct {
type OfferListener struct {
fn callbackFunc
running bool
latest *OfferAnswer
mu sync.Mutex
}
func NewAsyncOfferListener(fn callbackFunc) *AsyncOfferListener {
return &AsyncOfferListener{
func NewOfferListener(fn callbackFunc) *OfferListener {
return &OfferListener{
fn: fn,
}
}
func (o *AsyncOfferListener) Notify(remoteOfferAnswer *OfferAnswer) {
func (o *OfferListener) Notify(remoteOfferAnswer *OfferAnswer) {
o.mu.Lock()
defer o.mu.Unlock()

View File

@@ -14,7 +14,7 @@ func Test_newOfferListener(t *testing.T) {
runChan <- struct{}{}
}
hl := NewAsyncOfferListener(longRunningFn)
hl := NewOfferListener(longRunningFn)
hl.Notify(dummyOfferAnswer)
hl.Notify(dummyOfferAnswer)

View File

@@ -1,9 +1,6 @@
package peer
import (
"errors"
"sync/atomic"
"github.com/pion/ice/v4"
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
@@ -12,16 +9,9 @@ import (
sProto "github.com/netbirdio/netbird/shared/signal/proto"
)
var (
ErrPeerNotAvailable = signal.ErrPeerNotAvailable
ErrSignalNotSupportDeliveryCheck = errors.New("the signal client does not support SendWithDeliveryCheck")
)
type Signaler struct {
signal signal.Client
wgPrivateKey wgtypes.Key
deliveryCheckNotSupported atomic.Bool
}
func NewSignaler(signal signal.Client, wgPrivateKey wgtypes.Key) *Signaler {
@@ -77,21 +67,10 @@ func (s *Signaler) signalOfferAnswer(offerAnswer OfferAnswer, remoteKey string,
return err
}
if s.deliveryCheckNotSupported.Load() {
return s.signal.Send(msg)
if err = s.signal.Send(msg); err != nil {
return err
}
if err = s.signal.SendWithDeliveryCheck(msg); err != nil {
switch {
case errors.Is(err, signal.ErrPeerNotAvailable):
return ErrPeerNotAvailable
case errors.Is(err, signal.ErrUnimplementedMethod):
s.handleUnimplementedMethod(msg)
return ErrSignalNotSupportDeliveryCheck
default:
return err
}
}
return nil
}
@@ -104,15 +83,3 @@ func (s *Signaler) SignalIdle(remoteKey string) error {
},
})
}
func (s *Signaler) handleUnimplementedMethod(msg *sProto.Message) {
// print out the warning only once
if !s.deliveryCheckNotSupported.Load() {
log.Warnf("signal client does not support delivery check, falling back to Send method and resend")
}
s.deliveryCheckNotSupported.Store(true)
if err := s.signal.Send(msg); err != nil {
log.Warnf("failed to send signal msg to remote peer: %v", err)
}
}

View File

@@ -92,16 +92,23 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn *
func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
w.log.Debugf("OnNewOffer for ICE, serial: %s", remoteOfferAnswer.SessionIDString())
w.muxAgent.Lock()
defer w.muxAgent.Unlock()
if w.agent != nil || w.agentConnecting {
if w.agentConnecting {
w.log.Debugf("agent connection is in progress, skipping the offer")
w.muxAgent.Unlock()
return
}
if w.agent != nil {
// backward compatibility with old clients that do not send session ID
if remoteOfferAnswer.SessionID == nil {
w.log.Debugf("agent already exists, skipping the offer")
w.muxAgent.Unlock()
return
}
if w.remoteSessionID == *remoteOfferAnswer.SessionID {
w.log.Debugf("agent already exists and session ID matches, skipping the offer: %s", remoteOfferAnswer.SessionIDString())
w.muxAgent.Unlock()
return
}
w.log.Debugf("agent already exists, recreate the connection")
@@ -109,12 +116,6 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
if err := w.agent.Close(); err != nil {
w.log.Warnf("failed to close ICE agent: %s", err)
}
sessionID, err := NewICESessionID()
if err != nil {
w.log.Errorf("failed to create new session ID: %s", err)
}
w.sessionID = sessionID
w.agent = nil
}
@@ -125,23 +126,18 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
preferredCandidateTypes = icemaker.CandidateTypes()
}
if remoteOfferAnswer.SessionID != nil {
w.log.Debugf("recreate ICE agent: %s / %s", w.sessionID, *remoteOfferAnswer.SessionID)
}
w.log.Debugf("recreate ICE agent")
dialerCtx, dialerCancel := context.WithCancel(w.ctx)
agent, err := w.reCreateAgent(dialerCancel, preferredCandidateTypes)
if err != nil {
w.log.Errorf("failed to recreate ICE Agent: %s", err)
w.muxAgent.Unlock()
return
}
w.agent = agent
w.agentDialerCancel = dialerCancel
w.agentConnecting = true
if remoteOfferAnswer.SessionID != nil {
w.remoteSessionID = *remoteOfferAnswer.SessionID
} else {
w.remoteSessionID = ""
}
w.muxAgent.Unlock()
go w.connect(dialerCtx, agent, remoteOfferAnswer)
}
@@ -297,6 +293,9 @@ func (w *WorkerICE) connect(ctx context.Context, agent *icemaker.ThreadSafeAgent
w.muxAgent.Lock()
w.agentConnecting = false
w.lastSuccess = time.Now()
if remoteOfferAnswer.SessionID != nil {
w.remoteSessionID = *remoteOfferAnswer.SessionID
}
w.muxAgent.Unlock()
// todo: the potential problem is a race between the onConnectionStateChange
@@ -310,17 +309,16 @@ func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.C
}
w.muxAgent.Lock()
// todo review does it make sense to generate new session ID all the time when w.agent==agent
sessionID, err := NewICESessionID()
if err != nil {
w.log.Errorf("failed to create new session ID: %s", err)
}
w.sessionID = sessionID
if w.agent == agent {
// consider to remove from here and move to the OnNewOffer
sessionID, err := NewICESessionID()
if err != nil {
w.log.Errorf("failed to create new session ID: %s", err)
}
w.sessionID = sessionID
w.agent = nil
w.agentConnecting = false
w.remoteSessionID = ""
}
w.muxAgent.Unlock()
}
@@ -397,12 +395,11 @@ func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dia
// ice.ConnectionStateClosed happens when we recreate the agent. For the P2P to TURN switch important to
// notify the conn.onICEStateDisconnected changes to update the current used priority
w.closeAgent(agent, dialerCancel)
if w.lastKnownState == ice.ConnectionStateConnected {
w.lastKnownState = ice.ConnectionStateDisconnected
w.conn.onICEStateDisconnected()
}
w.closeAgent(agent, dialerCancel)
default:
return
}

View File

@@ -345,7 +345,7 @@ func startSignal(t *testing.T) (*grpc.Server, string, error) {
log.Fatalf("failed to listen: %v", err)
}
srv, err := signalServer.NewServer(context.Background(), otel.Meter(""), nil)
srv, err := signalServer.NewServer(context.Background(), otel.Meter(""))
require.NoError(t, err)
proto.RegisterSignalExchangeServer(s, srv)

View File

@@ -1354,13 +1354,7 @@ func (s *serviceClient) updateConfig() error {
}
// showLoginURL creates a borderless window styled like a pop-up in the top-right corner using s.wLoginURL.
// It also starts a background goroutine that periodically checks if the client is already connected
// and closes the window if so. The goroutine can be cancelled by the returned CancelFunc, and it is
// also cancelled when the window is closed.
func (s *serviceClient) showLoginURL() context.CancelFunc {
// create a cancellable context for the background check goroutine
ctx, cancel := context.WithCancel(s.ctx)
func (s *serviceClient) showLoginURL() {
resIcon := fyne.NewStaticResource("netbird.png", iconAbout)
@@ -1369,8 +1363,6 @@ func (s *serviceClient) showLoginURL() context.CancelFunc {
s.wLoginURL.Resize(fyne.NewSize(400, 200))
s.wLoginURL.SetIcon(resIcon)
}
// ensure goroutine is cancelled when the window is closed
s.wLoginURL.SetOnClosed(func() { cancel() })
// add a description label
label := widget.NewLabel("Your NetBird session has expired.\nPlease re-authenticate to continue using NetBird.")
@@ -1451,39 +1443,7 @@ func (s *serviceClient) showLoginURL() context.CancelFunc {
)
s.wLoginURL.SetContent(container.NewCenter(content))
// start a goroutine to check connection status and close the window if connected
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
conn, err := s.getSrvClient(failFastTimeout)
if err != nil {
return
}
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
status, err := conn.Status(s.ctx, &proto.StatusRequest{})
if err != nil {
continue
}
if status.Status == string(internal.StatusConnected) {
if s.wLoginURL != nil {
s.wLoginURL.Close()
}
return
}
}
}
}()
s.wLoginURL.Show()
// return cancel func so callers can stop the background goroutine if desired
return cancel
}
func openURL(url string) error {

2
go.mod
View File

@@ -63,7 +63,7 @@ require (
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/nadoo/ipset v0.5.0
github.com/netbirdio/management-integrations/integrations v0.0.0-20250906095204-f87a07690ba0
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250930114722-bab681dd3a96
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45
github.com/okta/okta-sdk-golang/v2 v2.18.0
github.com/oschwald/maxminddb-golang v1.12.0
github.com/patrickmn/go-cache v2.1.0+incompatible

4
go.sum
View File

@@ -507,8 +507,8 @@ github.com/netbirdio/management-integrations/integrations v0.0.0-20250906095204-
github.com/netbirdio/management-integrations/integrations v0.0.0-20250906095204-f87a07690ba0/go.mod h1:v0nUbbHbuQnqR7yKIYnKzsLBCswLtp2JctmKYmGgVhc=
github.com/netbirdio/service v0.0.0-20240911161631-f62744f42502 h1:3tHlFmhTdX9axERMVN63dqyFqnvuD+EMJHzM7mNGON8=
github.com/netbirdio/service v0.0.0-20240911161631-f62744f42502/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM=
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250930114722-bab681dd3a96 h1:rPu2HVjtRZs/GloIz6h4tcNr/9Fq55SBBAlz6uOExt8=
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250930114722-bab681dd3a96/go.mod h1:ap83I3Rs3SLND/58j9X+a3ixA9d9ztainwW01ExEw0w=
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45 h1:ujgviVYmx243Ksy7NdSwrdGPSRNE3pb8kEDSpH0QuAQ=
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45/go.mod h1:5/sjFmLb8O96B5737VCqhHyGRzNFIaN/Bu7ZodXc3qQ=
github.com/netbirdio/wireguard-go v0.0.0-20241230120307-6a676aebaaf6 h1:X5h5QgP7uHAv78FWgHV8+WYLjHxK9v3ilkVXT1cpCrQ=
github.com/netbirdio/wireguard-go v0.0.0-20241230120307-6a676aebaaf6/go.mod h1:tkCQ4FQXmpAgYVh++1cq16/dH4QJtmvpRv19DWGAHSA=
github.com/nicksnyder/go-i18n/v2 v2.4.0 h1:3IcvPOAvnCKwNm0TB0dLDTuawWEj+ax/RERNC+diLMM=

View File

@@ -35,7 +35,6 @@ type Client interface {
WaitStreamConnected()
SendToStream(msg *proto.EncryptedMessage) error
Send(msg *proto.Message) error
SendWithDeliveryCheck(msg *proto.Message) error
SetOnReconnectedListener(func())
}

View File

@@ -199,7 +199,7 @@ func startSignal() (*grpc.Server, net.Listener) {
panic(err)
}
s := grpc.NewServer()
srv, err := server.NewServer(context.Background(), otel.Meter(""), nil)
srv, err := server.NewServer(context.Background(), otel.Meter(""))
if err != nil {
panic(err)
}

View File

@@ -2,7 +2,6 @@ package client
import (
"context"
"errors"
"fmt"
"io"
"sync"
@@ -24,11 +23,6 @@ import (
"github.com/netbirdio/netbird/util/wsproxy"
)
var (
ErrPeerNotAvailable = errors.New("peer not available")
ErrUnimplementedMethod = errors.New("the signal client does not support SendWithDeliveryCheck")
)
// ConnStateNotifier is a wrapper interface of the status recorder
type ConnStateNotifier interface {
MarkSignalDisconnected(error)
@@ -403,36 +397,6 @@ func (c *GrpcClient) Send(msg *proto.Message) error {
return err
}
func (c *GrpcClient) SendWithDeliveryCheck(msg *proto.Message) error {
if !c.Ready() {
return fmt.Errorf("no connection to signal")
}
encryptedMessage, err := c.encryptMessage(msg)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(c.ctx, client.ConnectTimeout)
defer cancel()
_, err = c.realClient.SendWithDeliveryCheck(ctx, encryptedMessage)
if err != nil {
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.NotFound:
return ErrPeerNotAvailable
case codes.Unimplemented:
return ErrUnimplementedMethod
default:
return fmt.Errorf("grpc error %s: %w", st.Code(), err)
}
}
return err // Not a gRPC status error
}
return err
}
// receive receives messages from other peers coming through the Signal Exchange
// and distributes them to worker threads for processing
func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) error {

View File

@@ -16,7 +16,6 @@ type MockClient struct {
SendToStreamFunc func(msg *proto.EncryptedMessage) error
SendFunc func(msg *proto.Message) error
SetOnReconnectedListenerFunc func(f func())
SendWithDeliveryCheckFn func(msg *proto.Message) error
}
// SetOnReconnectedListener sets the function to be called when the client reconnects.
@@ -83,10 +82,3 @@ func (sm *MockClient) Send(msg *proto.Message) error {
}
return sm.SendFunc(msg)
}
func (sm *MockClient) SendWithDeliveryCheck(msg *proto.Message) error {
if sm.SendWithDeliveryCheckFn == nil {
return nil
}
return sm.SendWithDeliveryCheck(msg)
}

View File

@@ -10,7 +10,6 @@ import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
_ "google.golang.org/protobuf/types/descriptorpb"
emptypb "google.golang.org/protobuf/types/known/emptypb"
reflect "reflect"
sync "sync"
)
@@ -440,79 +439,72 @@ var file_signalexchange_proto_rawDesc = []byte{
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78,
0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x1a, 0x20, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74,
0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x56, 0x0a, 0x10, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74,
0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1c, 0x0a, 0x09, 0x72,
0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09,
0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64,
0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0x63, 0x0a,
0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65,
0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72,
0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79,
0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65,
0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52, 0x04, 0x62, 0x6f,
0x64, 0x79, 0x22, 0xe4, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d, 0x0a, 0x04, 0x74,
0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73, 0x69, 0x67, 0x6e,
0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x2e,
0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61,
0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79,
0x6c, 0x6f, 0x61, 0x64, 0x12, 0x22, 0x0a, 0x0c, 0x77, 0x67, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e,
0x50, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0c, 0x77, 0x67, 0x4c, 0x69,
0x73, 0x74, 0x65, 0x6e, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x26, 0x0a, 0x0e, 0x6e, 0x65, 0x74, 0x42,
0x69, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0e, 0x6e, 0x65, 0x74, 0x42, 0x69, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e,
0x12, 0x28, 0x0a, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14,
0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e,
0x4d, 0x6f, 0x64, 0x65, 0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x12, 0x2c, 0x0a, 0x11, 0x66, 0x65,
0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x53, 0x75, 0x70, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x64, 0x18,
0x06, 0x20, 0x03, 0x28, 0x0d, 0x52, 0x11, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x53,
0x75, 0x70, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x64, 0x12, 0x49, 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65,
0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e,
0x67, 0x65, 0x2e, 0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66,
0x69, 0x67, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e,
0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76,
0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52,
0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72,
0x65, 0x73, 0x73, 0x12, 0x21, 0x0a, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64,
0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f,
0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09,
0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4e, 0x53,
0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44, 0x49, 0x44, 0x41,
0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10, 0x04, 0x12, 0x0b,
0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x42, 0x0c, 0x0a, 0x0a, 0x5f,
0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x2e, 0x0a, 0x04, 0x4d, 0x6f, 0x64,
0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28,
0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01, 0x01, 0x42, 0x09,
0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f, 0x52, 0x6f, 0x73,
0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x28, 0x0a, 0x0f,
0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x18,
0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73,
0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70,
0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65,
0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0x8e, 0x02, 0x0a, 0x0e, 0x53, 0x69, 0x67,
0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c, 0x0a, 0x04, 0x53,
0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68,
0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78,
0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64,
0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x53, 0x0a, 0x15, 0x53, 0x65, 0x6e,
0x64, 0x57, 0x69, 0x74, 0x68, 0x44, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x79, 0x43, 0x68, 0x65,
0x63, 0x6b, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61,
0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x59,
0x0a, 0x0d, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12,
0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x56, 0x0a, 0x10, 0x45, 0x6e, 0x63, 0x72,
0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03,
0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1c,
0x0a, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x12, 0x0a, 0x04,
0x62, 0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79,
0x22, 0x63, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b,
0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1c, 0x0a,
0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62,
0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e,
0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52,
0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xe4, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d,
0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73,
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f,
0x64, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a,
0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07,
0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x22, 0x0a, 0x0c, 0x77, 0x67, 0x4c, 0x69, 0x73,
0x74, 0x65, 0x6e, 0x50, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0c, 0x77,
0x67, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x26, 0x0a, 0x0e, 0x6e,
0x65, 0x74, 0x42, 0x69, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20,
0x01, 0x28, 0x09, 0x52, 0x0e, 0x6e, 0x65, 0x74, 0x42, 0x69, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73,
0x69, 0x6f, 0x6e, 0x12, 0x28, 0x0a, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e,
0x67, 0x65, 0x2e, 0x4d, 0x6f, 0x64, 0x65, 0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x12, 0x2c, 0x0a,
0x11, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x53, 0x75, 0x70, 0x70, 0x6f, 0x72, 0x74,
0x65, 0x64, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0d, 0x52, 0x11, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72,
0x65, 0x73, 0x53, 0x75, 0x70, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x64, 0x12, 0x49, 0x0a, 0x0f, 0x72,
0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x07,
0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63,
0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43,
0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73,
0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53,
0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01,
0x28, 0x09, 0x52, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41,
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x21, 0x0a, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f,
0x6e, 0x49, 0x64, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x09, 0x73, 0x65, 0x73,
0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70,
0x65, 0x12, 0x09, 0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06,
0x41, 0x4e, 0x53, 0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44,
0x49, 0x44, 0x41, 0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10,
0x04, 0x12, 0x0b, 0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x42, 0x0c,
0x0a, 0x0a, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x2e, 0x0a, 0x04,
0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01,
0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01,
0x01, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f,
0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12,
0x28, 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b,
0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70,
0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73,
0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73,
0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x0a, 0x0e,
0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c,
0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65,
0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65,
0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61,
0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70,
0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0d,
0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e,
0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45,
0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a,
0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65,
0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e,
0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -536,7 +528,6 @@ var file_signalexchange_proto_goTypes = []interface{}{
(*Body)(nil), // 3: signalexchange.Body
(*Mode)(nil), // 4: signalexchange.Mode
(*RosenpassConfig)(nil), // 5: signalexchange.RosenpassConfig
(*emptypb.Empty)(nil), // 6: google.protobuf.Empty
}
var file_signalexchange_proto_depIdxs = []int32{
3, // 0: signalexchange.Message.body:type_name -> signalexchange.Body
@@ -544,13 +535,11 @@ var file_signalexchange_proto_depIdxs = []int32{
4, // 2: signalexchange.Body.mode:type_name -> signalexchange.Mode
5, // 3: signalexchange.Body.rosenpassConfig:type_name -> signalexchange.RosenpassConfig
1, // 4: signalexchange.SignalExchange.Send:input_type -> signalexchange.EncryptedMessage
1, // 5: signalexchange.SignalExchange.SendWithDeliveryCheck:input_type -> signalexchange.EncryptedMessage
1, // 6: signalexchange.SignalExchange.ConnectStream:input_type -> signalexchange.EncryptedMessage
1, // 7: signalexchange.SignalExchange.Send:output_type -> signalexchange.EncryptedMessage
6, // 8: signalexchange.SignalExchange.SendWithDeliveryCheck:output_type -> google.protobuf.Empty
1, // 9: signalexchange.SignalExchange.ConnectStream:output_type -> signalexchange.EncryptedMessage
7, // [7:10] is the sub-list for method output_type
4, // [4:7] is the sub-list for method input_type
1, // 5: signalexchange.SignalExchange.ConnectStream:input_type -> signalexchange.EncryptedMessage
1, // 6: signalexchange.SignalExchange.Send:output_type -> signalexchange.EncryptedMessage
1, // 7: signalexchange.SignalExchange.ConnectStream:output_type -> signalexchange.EncryptedMessage
6, // [6:8] is the sub-list for method output_type
4, // [4:6] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name

View File

@@ -1,7 +1,6 @@
syntax = "proto3";
import "google/protobuf/descriptor.proto";
import "google/protobuf/empty.proto";
option go_package = "/proto";
@@ -10,7 +9,6 @@ package signalexchange;
service SignalExchange {
// Synchronously connect to the Signal Exchange service offering connection candidates and waiting for connection candidates from the other party (remote peer)
rpc Send(EncryptedMessage) returns (EncryptedMessage) {}
rpc SendWithDeliveryCheck(EncryptedMessage) returns (google.protobuf.Empty) {}
// Connect to the Signal Exchange service offering connection candidates and maintain a channel for receiving candidates from the other party (remote peer)
rpc ConnectStream(stream EncryptedMessage) returns (stream EncryptedMessage) {}
}

View File

@@ -7,7 +7,6 @@ import (
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
emptypb "google.golang.org/protobuf/types/known/emptypb"
)
// This is a compile-time assertion to ensure that this generated file
@@ -21,7 +20,6 @@ const _ = grpc.SupportPackageIsVersion7
type SignalExchangeClient interface {
// Synchronously connect to the Signal Exchange service offering connection candidates and waiting for connection candidates from the other party (remote peer)
Send(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*EncryptedMessage, error)
SendWithDeliveryCheck(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*emptypb.Empty, error)
// Connect to the Signal Exchange service offering connection candidates and maintain a channel for receiving candidates from the other party (remote peer)
ConnectStream(ctx context.Context, opts ...grpc.CallOption) (SignalExchange_ConnectStreamClient, error)
}
@@ -43,15 +41,6 @@ func (c *signalExchangeClient) Send(ctx context.Context, in *EncryptedMessage, o
return out, nil
}
func (c *signalExchangeClient) SendWithDeliveryCheck(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/signalexchange.SignalExchange/SendWithDeliveryCheck", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *signalExchangeClient) ConnectStream(ctx context.Context, opts ...grpc.CallOption) (SignalExchange_ConnectStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &SignalExchange_ServiceDesc.Streams[0], "/signalexchange.SignalExchange/ConnectStream", opts...)
if err != nil {
@@ -89,7 +78,6 @@ func (x *signalExchangeConnectStreamClient) Recv() (*EncryptedMessage, error) {
type SignalExchangeServer interface {
// Synchronously connect to the Signal Exchange service offering connection candidates and waiting for connection candidates from the other party (remote peer)
Send(context.Context, *EncryptedMessage) (*EncryptedMessage, error)
SendWithDeliveryCheck(context.Context, *EncryptedMessage) (*emptypb.Empty, error)
// Connect to the Signal Exchange service offering connection candidates and maintain a channel for receiving candidates from the other party (remote peer)
ConnectStream(SignalExchange_ConnectStreamServer) error
mustEmbedUnimplementedSignalExchangeServer()
@@ -102,9 +90,6 @@ type UnimplementedSignalExchangeServer struct {
func (UnimplementedSignalExchangeServer) Send(context.Context, *EncryptedMessage) (*EncryptedMessage, error) {
return nil, status.Errorf(codes.Unimplemented, "method Send not implemented")
}
func (UnimplementedSignalExchangeServer) SendWithDeliveryCheck(context.Context, *EncryptedMessage) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method SendWithDeliveryCheck not implemented")
}
func (UnimplementedSignalExchangeServer) ConnectStream(SignalExchange_ConnectStreamServer) error {
return status.Errorf(codes.Unimplemented, "method ConnectStream not implemented")
}
@@ -139,24 +124,6 @@ func _SignalExchange_Send_Handler(srv interface{}, ctx context.Context, dec func
return interceptor(ctx, in, info, handler)
}
func _SignalExchange_SendWithDeliveryCheck_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(EncryptedMessage)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SignalExchangeServer).SendWithDeliveryCheck(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/signalexchange.SignalExchange/SendWithDeliveryCheck",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SignalExchangeServer).SendWithDeliveryCheck(ctx, req.(*EncryptedMessage))
}
return interceptor(ctx, in, info, handler)
}
func _SignalExchange_ConnectStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(SignalExchangeServer).ConnectStream(&signalExchangeConnectStreamServer{stream})
}
@@ -194,10 +161,6 @@ var SignalExchange_ServiceDesc = grpc.ServiceDesc{
MethodName: "Send",
Handler: _SignalExchange_Send_Handler,
},
{
MethodName: "SendWithDeliveryCheck",
Handler: _SignalExchange_SendWithDeliveryCheck_Handler,
},
},
Streams: []grpc.StreamDesc{
{

View File

@@ -2,7 +2,6 @@ package cmd
import (
"os"
"strconv"
"strings"
log "github.com/sirupsen/logrus"
@@ -10,19 +9,6 @@ import (
"github.com/spf13/pflag"
)
func EnvDisableSendWithDeliveryCheck() bool {
envVar := "NB_DISABLE_SEND_WITH_DELIVERY_CHECK"
value, present := os.LookupEnv(envVar)
if !present {
return false
}
if parsed, err := strconv.ParseBool(value); err == nil {
return parsed
}
return false
}
// setFlagsFromEnvVars reads and updates flag values from environment variables with prefix NB_
func setFlagsFromEnvVars(cmd *cobra.Command) {
flags := cmd.PersistentFlags()

View File

@@ -10,7 +10,6 @@ import (
"net/http"
// nolint:gosec
_ "net/http/pprof"
"os"
"time"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
@@ -93,9 +92,7 @@ var (
RunE: func(cmd *cobra.Command, args []string) error {
flag.Parse()
if os.Getenv("NB_PPROF_ENABLE") == "true" {
startPprof()
}
startPprof()
opts, certManager, err := getTLSConfigurations()
if err != nil {
@@ -117,11 +114,7 @@ var (
}
}()
optsSignal := &server.Options{
DisableSendWithDeliveryCheck: EnvDisableSendWithDeliveryCheck(),
}
srv, err := server.NewServer(cmd.Context(), metricsServer.Meter, optsSignal)
srv, err := server.NewServer(cmd.Context(), metricsServer.Meter)
if err != nil {
return fmt.Errorf("creating signal server: %v", err)
}
@@ -157,7 +150,7 @@ var (
serveHTTP(httpListener, grpcRootHandler)
}
if signalPort != legacyGRPCPort && os.Getenv("NB_DISABLE_FALLBACK_GRPC") != "true" {
if signalPort != legacyGRPCPort {
// The Signal gRPC server was running on port 10000 previously. Old agents that are already connected to Signal
// are using port 10000. For compatibility purposes we keep running a 2nd gRPC server on port 10000.
compatListener, err = serveGRPC(grpcServer, legacyGRPCPort)

View File

@@ -14,7 +14,6 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
gproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/netbirdio/signal-dispatcher/dispatcher"
@@ -23,8 +22,6 @@ import (
"github.com/netbirdio/netbird/signal/peer"
)
var ErrPeerNotConnected = errors.New("peer not connected")
const (
labelType = "type"
labelTypeError = "error"
@@ -52,32 +49,20 @@ var (
ErrPeerRegisteredAgain = errors.New("peer registered again")
)
type Options struct {
// Disable SendWithDeliveryCheck method
DisableSendWithDeliveryCheck bool
}
// Server an instance of a Signal server
type Server struct {
metrics *metrics.AppMetrics
disableSendWithDeliveryCheck bool
registry *peer.Registry
proto.UnimplementedSignalExchangeServer
dispatcher *dispatcher.Dispatcher
metrics *metrics.AppMetrics
successHeader metadata.MD
sendTimeout time.Duration
directSendDisabled bool
sendTimeout time.Duration
}
// NewServer creates a new Signal server
func NewServer(ctx context.Context, meter metric.Meter, opts *Options) (*Server, error) {
if opts == nil {
opts = &Options{}
}
func NewServer(ctx context.Context, meter metric.Meter) (*Server, error) {
appMetrics, err := metrics.NewAppMetrics(meter)
if err != nil {
return nil, fmt.Errorf("creating app metrics: %v", err)
@@ -96,21 +81,11 @@ func NewServer(ctx context.Context, meter metric.Meter, opts *Options) (*Server,
}
s := &Server{
dispatcher: d,
registry: peer.NewRegistry(appMetrics),
metrics: appMetrics,
successHeader: metadata.Pairs(proto.HeaderRegistered, "1"),
sendTimeout: sTimeout,
disableSendWithDeliveryCheck: opts.DisableSendWithDeliveryCheck,
}
if directSendDisabled := os.Getenv("NB_SIGNAL_DIRECT_SEND_DISABLED"); directSendDisabled == "true" {
s.directSendDisabled = true
log.Warn("direct send to connected peers is disabled")
}
if opts.DisableSendWithDeliveryCheck {
log.Warn("SendWithDeliveryCheck method is disabled")
dispatcher: d,
registry: peer.NewRegistry(appMetrics),
metrics: appMetrics,
successHeader: metadata.Pairs(proto.HeaderRegistered, "1"),
sendTimeout: sTimeout,
}
return s, nil
@@ -120,51 +95,12 @@ func NewServer(ctx context.Context, meter metric.Meter, opts *Options) (*Server,
func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
log.Tracef("received a new message to send from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
if _, found := s.registry.Get(msg.RemoteKey); found && !s.directSendDisabled {
_ = s.forwardMessageToPeer(ctx, msg)
if _, found := s.registry.Get(msg.RemoteKey); found {
s.forwardMessageToPeer(ctx, msg)
return &proto.EncryptedMessage{}, nil
}
if _, err := s.dispatcher.SendMessage(ctx, msg, false); err != nil {
log.Errorf("error sending message via dispatcher: %v", err)
}
return &proto.EncryptedMessage{}, nil
}
// SendWithDeliveryCheck forwards a message to the signal peer with error handling
// When the remote peer is not connected it returns codes.NotFound error, otherwise it returns other types of errors
// that can be retried. In case codes.NotFound is returned the caller should not retry sending the message. The remote
// peer should send a new offer to re-establish the connection when it comes back online.
// Todo: double check the thread safe registry management. When both peer come online at the same time then both peers
// might not be registered yet when the first message is sent.
func (s *Server) SendWithDeliveryCheck(ctx context.Context, msg *proto.EncryptedMessage) (*emptypb.Empty, error) {
if s.disableSendWithDeliveryCheck {
log.Tracef("SendWithDeliveryCheck is disabled")
return nil, status.Errorf(codes.Unimplemented, "SendWithDeliveryCheck method is disabled")
}
log.Tracef("received a new message to send from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
if _, found := s.registry.Get(msg.RemoteKey); found && !s.directSendDisabled {
if err := s.forwardMessageToPeer(ctx, msg); err != nil {
if errors.Is(err, ErrPeerNotConnected) {
log.Tracef("remote peer [%s] not connected", msg.RemoteKey)
return nil, status.Errorf(codes.NotFound, "remote peer not connected")
}
log.Errorf("error sending message with delivery check to peer [%s]: %v", msg.RemoteKey, err)
return nil, status.Errorf(codes.Internal, "error forwarding message to peer: %v", err)
}
return &emptypb.Empty{}, nil
}
if _, err := s.dispatcher.SendMessage(ctx, msg, true); err != nil {
if errors.Is(err, dispatcher.ErrPeerNotConnected) {
log.Tracef("remote peer [%s] doesn't have a listener", msg.RemoteKey)
return nil, status.Errorf(codes.NotFound, "remote peer not connected")
}
return nil, err
}
return &emptypb.Empty{}, nil
return s.dispatcher.SendMessage(ctx, msg)
}
// ConnectStream connects to the exchange stream
@@ -172,7 +108,6 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
ctx, cancel := context.WithCancel(context.Background())
p, err := s.RegisterPeer(stream, cancel)
if err != nil {
log.Errorf("error registering peer: %v", err)
return err
}
@@ -182,7 +117,6 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
err = stream.SendHeader(s.successHeader)
if err != nil {
s.metrics.RegistrationFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelError, labelErrorFailedHeader)))
log.Errorf("error sending registration header to peer [%s] [streamID %d] : %v", p.Id, p.StreamID, err)
return err
}
@@ -207,7 +141,7 @@ func (s *Server) RegisterPeer(stream proto.SignalExchange_ConnectStreamServer, c
p := peer.NewPeer(id[0], stream, cancel)
if err := s.registry.Register(p); err != nil {
return nil, fmt.Errorf("error adding peer to registry peer: %w", err)
return nil, err
}
err := s.dispatcher.ListenForMessages(stream.Context(), p.Id, s.forwardMessageToPeer)
if err != nil {
@@ -224,7 +158,7 @@ func (s *Server) DeregisterPeer(p *peer.Peer) {
s.registry.Deregister(p)
}
func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedMessage) error {
func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedMessage) {
log.Tracef("forwarding a new message from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
getRegistrationStart := time.Now()
@@ -236,7 +170,7 @@ func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedM
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotConnected)))
log.Tracef("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey)
// todo respond to the sender?
return ErrPeerNotConnected
return
}
s.metrics.GetRegistrationDelay.Record(ctx, float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream), attribute.String(labelRegistrationStatus, labelRegistrationFound)))
@@ -257,7 +191,7 @@ func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedM
if err != nil {
log.Tracef("error while forwarding message from peer [%s] to peer [%s]: %v", msg.Key, msg.RemoteKey, err)
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeError)))
return fmt.Errorf("error sending message to peer: %v", err)
return
}
s.metrics.MessageForwardLatency.Record(ctx, float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream)))
s.metrics.MessagesForwarded.Add(ctx, 1)
@@ -266,13 +200,10 @@ func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedM
case <-dstPeer.Stream.Context().Done():
log.Tracef("failed to forward message from peer [%s] to peer [%s]: destination peer disconnected", msg.Key, msg.RemoteKey)
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeDisconnected)))
return fmt.Errorf("destination peer disconnected")
case <-time.After(s.sendTimeout):
dstPeer.Cancel() // cancel the peer context to trigger deregistration
log.Tracef("failed to forward message from peer [%s] to peer [%s]: send timeout", msg.Key, msg.RemoteKey)
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeTimeout)))
return fmt.Errorf("sending message to peer timeout")
}
return nil
}