mirror of
https://github.com/netbirdio/netbird.git
synced 2026-06-29 11:19:56 +00:00
Compare commits
6 Commits
netmap_pro
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b434cda062 | ||
|
|
0b594c639a | ||
|
|
deff8af59f | ||
|
|
5711f0e38c | ||
|
|
1409a1325a | ||
|
|
4400372f37 |
@@ -33,7 +33,7 @@
|
||||
<br/>
|
||||
<br/>
|
||||
<strong>
|
||||
🚀 <a href="https://careers.netbird.io">We are hiring! Join us at careers.netbird.io</a>
|
||||
🚀 <a href="https://netbird.io/careers">We are hiring! Join us at https://netbird.io/careers</a>
|
||||
</strong>
|
||||
</p>
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"errors"
|
||||
"net"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"github.com/miekg/dns"
|
||||
@@ -167,7 +168,10 @@ func getRcodeForNotFound(ctx context.Context, r resolver, domain string, origina
|
||||
case dns.TypeA:
|
||||
alternativeNetwork = "ip6"
|
||||
default:
|
||||
return dns.RcodeNameError
|
||||
// Non-address types reach LookupIP only unexpectedly; without an
|
||||
// address pair to probe we cannot prove the name is absent, so answer
|
||||
// NODATA rather than a poisoning NXDOMAIN.
|
||||
return dns.RcodeSuccess
|
||||
}
|
||||
|
||||
if _, err := r.LookupNetIP(ctx, alternativeNetwork, domain); err != nil {
|
||||
@@ -184,6 +188,230 @@ func getRcodeForNotFound(ctx context.Context, r resolver, domain string, origina
|
||||
return dns.RcodeSuccess
|
||||
}
|
||||
|
||||
// RecordResolver is the host resolver surface used to forward non-address
|
||||
// record queries. net.DefaultResolver satisfies it.
|
||||
type RecordResolver interface {
|
||||
LookupMX(ctx context.Context, name string) ([]*net.MX, error)
|
||||
LookupTXT(ctx context.Context, name string) ([]string, error)
|
||||
LookupNS(ctx context.Context, name string) ([]*net.NS, error)
|
||||
LookupSRV(ctx context.Context, service, proto, name string) (string, []*net.SRV, error)
|
||||
LookupCNAME(ctx context.Context, host string) (string, error)
|
||||
LookupAddr(ctx context.Context, addr string) ([]string, error)
|
||||
}
|
||||
|
||||
// LookupRecords resolves a non-address DNS record type through the host
|
||||
// resolver and returns the resource records and the DNS rcode. Types the host
|
||||
// resolver cannot answer (anything not covered by the net.Resolver Lookup*
|
||||
// methods) yield NODATA so that a routed name is never poisoned with NXDOMAIN
|
||||
// for an unsupported type.
|
||||
func LookupRecords(ctx context.Context, r RecordResolver, name string, qtype uint16, ttl uint32) ([]dns.RR, int) {
|
||||
fqdn := dns.Fqdn(name)
|
||||
|
||||
switch qtype {
|
||||
case dns.TypeMX:
|
||||
return lookupMX(ctx, r, name, fqdn, ttl)
|
||||
case dns.TypeTXT:
|
||||
return lookupTXT(ctx, r, name, fqdn, ttl)
|
||||
case dns.TypeNS:
|
||||
return lookupNS(ctx, r, name, fqdn, ttl)
|
||||
case dns.TypeSRV:
|
||||
return lookupSRV(ctx, r, name, fqdn, ttl)
|
||||
case dns.TypeCNAME:
|
||||
return lookupCNAME(ctx, r, name, fqdn, ttl)
|
||||
case dns.TypePTR:
|
||||
return lookupPTR(ctx, r, name, fqdn, ttl)
|
||||
default:
|
||||
return nil, dns.RcodeSuccess
|
||||
}
|
||||
}
|
||||
|
||||
func recordHeader(fqdn string, rrtype uint16, ttl uint32) dns.RR_Header {
|
||||
return dns.RR_Header{Name: fqdn, Rrtype: rrtype, Class: dns.ClassINET, Ttl: ttl}
|
||||
}
|
||||
|
||||
func lookupMX(ctx context.Context, r RecordResolver, name, fqdn string, ttl uint32) ([]dns.RR, int) {
|
||||
recs, err := r.LookupMX(ctx, name)
|
||||
if err != nil {
|
||||
return nil, rcodeForRecordError(err)
|
||||
}
|
||||
rrs := make([]dns.RR, 0, len(recs))
|
||||
for _, mx := range recs {
|
||||
rrs = append(rrs, &dns.MX{
|
||||
Hdr: recordHeader(fqdn, dns.TypeMX, ttl),
|
||||
Preference: mx.Pref,
|
||||
Mx: dns.Fqdn(mx.Host),
|
||||
})
|
||||
}
|
||||
return rrs, dns.RcodeSuccess
|
||||
}
|
||||
|
||||
func lookupTXT(ctx context.Context, r RecordResolver, name, fqdn string, ttl uint32) ([]dns.RR, int) {
|
||||
recs, err := r.LookupTXT(ctx, name)
|
||||
if err != nil {
|
||||
return nil, rcodeForRecordError(err)
|
||||
}
|
||||
rrs := make([]dns.RR, 0, len(recs))
|
||||
for _, txt := range recs {
|
||||
rrs = append(rrs, &dns.TXT{
|
||||
Hdr: recordHeader(fqdn, dns.TypeTXT, ttl),
|
||||
Txt: chunkTXT(txt),
|
||||
})
|
||||
}
|
||||
return rrs, dns.RcodeSuccess
|
||||
}
|
||||
|
||||
func lookupNS(ctx context.Context, r RecordResolver, name, fqdn string, ttl uint32) ([]dns.RR, int) {
|
||||
recs, err := r.LookupNS(ctx, name)
|
||||
if err != nil {
|
||||
return nil, rcodeForRecordError(err)
|
||||
}
|
||||
rrs := make([]dns.RR, 0, len(recs))
|
||||
for _, ns := range recs {
|
||||
rrs = append(rrs, &dns.NS{
|
||||
Hdr: recordHeader(fqdn, dns.TypeNS, ttl),
|
||||
Ns: dns.Fqdn(ns.Host),
|
||||
})
|
||||
}
|
||||
return rrs, dns.RcodeSuccess
|
||||
}
|
||||
|
||||
func lookupSRV(ctx context.Context, r RecordResolver, name, fqdn string, ttl uint32) ([]dns.RR, int) {
|
||||
_, recs, err := r.LookupSRV(ctx, "", "", name)
|
||||
if err != nil {
|
||||
return nil, rcodeForRecordError(err)
|
||||
}
|
||||
rrs := make([]dns.RR, 0, len(recs))
|
||||
for _, srv := range recs {
|
||||
rrs = append(rrs, &dns.SRV{
|
||||
Hdr: recordHeader(fqdn, dns.TypeSRV, ttl),
|
||||
Priority: srv.Priority,
|
||||
Weight: srv.Weight,
|
||||
Port: srv.Port,
|
||||
Target: dns.Fqdn(srv.Target),
|
||||
})
|
||||
}
|
||||
return rrs, dns.RcodeSuccess
|
||||
}
|
||||
|
||||
func lookupCNAME(ctx context.Context, r RecordResolver, name, fqdn string, ttl uint32) ([]dns.RR, int) {
|
||||
cname, err := r.LookupCNAME(ctx, name)
|
||||
if err != nil {
|
||||
return nil, rcodeForRecordError(err)
|
||||
}
|
||||
// LookupCNAME returns the queried name itself when the name resolves but
|
||||
// has no CNAME record; that is a NODATA result, not a CNAME.
|
||||
if strings.EqualFold(dns.Fqdn(cname), fqdn) {
|
||||
return nil, dns.RcodeSuccess
|
||||
}
|
||||
return []dns.RR{&dns.CNAME{
|
||||
Hdr: recordHeader(fqdn, dns.TypeCNAME, ttl),
|
||||
Target: dns.Fqdn(cname),
|
||||
}}, dns.RcodeSuccess
|
||||
}
|
||||
|
||||
func lookupPTR(ctx context.Context, r RecordResolver, name, fqdn string, ttl uint32) ([]dns.RR, int) {
|
||||
addr, ok := ptrQueryAddr(name)
|
||||
if !ok {
|
||||
return nil, dns.RcodeSuccess
|
||||
}
|
||||
names, err := r.LookupAddr(ctx, addr)
|
||||
if err != nil {
|
||||
return nil, rcodeForRecordError(err)
|
||||
}
|
||||
rrs := make([]dns.RR, 0, len(names))
|
||||
for _, n := range names {
|
||||
rrs = append(rrs, &dns.PTR{
|
||||
Hdr: recordHeader(fqdn, dns.TypePTR, ttl),
|
||||
Ptr: dns.Fqdn(n),
|
||||
})
|
||||
}
|
||||
return rrs, dns.RcodeSuccess
|
||||
}
|
||||
|
||||
// ptrQueryAddr converts a reverse-DNS query name (in-addr.arpa or ip6.arpa)
|
||||
// into the address string expected by net.Resolver.LookupAddr. It reports false
|
||||
// when the name is not a well-formed reverse name.
|
||||
func ptrQueryAddr(qname string) (string, bool) {
|
||||
name := strings.TrimSuffix(strings.ToLower(dns.Fqdn(qname)), ".")
|
||||
|
||||
switch {
|
||||
case strings.HasSuffix(name, ".in-addr.arpa"):
|
||||
return parseInAddrArpa(strings.TrimSuffix(name, ".in-addr.arpa"))
|
||||
case strings.HasSuffix(name, ".ip6.arpa"):
|
||||
return parseIP6Arpa(strings.TrimSuffix(name, ".ip6.arpa"))
|
||||
default:
|
||||
return "", false
|
||||
}
|
||||
}
|
||||
|
||||
// parseInAddrArpa turns the label portion of an in-addr.arpa name into an IPv4
|
||||
// address string, reporting false when it is not a well-formed reverse name.
|
||||
func parseInAddrArpa(labelPart string) (string, bool) {
|
||||
labels := strings.Split(labelPart, ".")
|
||||
if len(labels) != 4 {
|
||||
return "", false
|
||||
}
|
||||
slices.Reverse(labels)
|
||||
addr, err := netip.ParseAddr(strings.Join(labels, "."))
|
||||
if err != nil || !addr.Is4() {
|
||||
return "", false
|
||||
}
|
||||
return addr.String(), true
|
||||
}
|
||||
|
||||
// parseIP6Arpa turns the nibble portion of an ip6.arpa name into an IPv6
|
||||
// address string, reporting false when it is not a well-formed reverse name.
|
||||
func parseIP6Arpa(nibblePart string) (string, bool) {
|
||||
nibbles := strings.Split(nibblePart, ".")
|
||||
if len(nibbles) != 32 {
|
||||
return "", false
|
||||
}
|
||||
slices.Reverse(nibbles)
|
||||
var sb strings.Builder
|
||||
for i, n := range nibbles {
|
||||
if i > 0 && i%4 == 0 {
|
||||
sb.WriteByte(':')
|
||||
}
|
||||
sb.WriteString(n)
|
||||
}
|
||||
addr, err := netip.ParseAddr(sb.String())
|
||||
if err != nil || !addr.Is6() {
|
||||
return "", false
|
||||
}
|
||||
return addr.String(), true
|
||||
}
|
||||
|
||||
// rcodeForRecordError maps a non-address lookup error to a DNS rcode. A
|
||||
// not-found result becomes NODATA rather than NXDOMAIN: net.DNSError.IsNotFound
|
||||
// does not distinguish a missing name from a name that exists only with records
|
||||
// of other types, so the name cannot be proven absent and must not be poisoned.
|
||||
func rcodeForRecordError(err error) int {
|
||||
var dnsErr *net.DNSError
|
||||
if errors.As(err, &dnsErr) && dnsErr.IsNotFound {
|
||||
return dns.RcodeSuccess
|
||||
}
|
||||
return dns.RcodeServerFailure
|
||||
}
|
||||
|
||||
// chunkTXT splits a TXT string into character-strings no longer than 255 bytes
|
||||
// so the record can be packed. The chunks form one TXT resource record.
|
||||
func chunkTXT(s string) []string {
|
||||
const maxLen = 255
|
||||
if len(s) <= maxLen {
|
||||
return []string{s}
|
||||
}
|
||||
|
||||
var chunks []string
|
||||
for len(s) > maxLen {
|
||||
chunks = append(chunks, s[:maxLen])
|
||||
s = s[maxLen:]
|
||||
}
|
||||
if len(s) > 0 {
|
||||
chunks = append(chunks, s)
|
||||
}
|
||||
return chunks
|
||||
}
|
||||
|
||||
// FormatAnswers formats DNS resource records for logging.
|
||||
func FormatAnswers(answers []dns.RR) string {
|
||||
if len(answers) == 0 {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"net"
|
||||
"net/netip"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/miekg/dns"
|
||||
@@ -121,6 +122,164 @@ func TestLookupIP_DNSErrorNotIsNotFound(t *testing.T) {
|
||||
assert.Equal(t, dns.RcodeServerFailure, result.Rcode, "upstream failure should map to SERVFAIL")
|
||||
}
|
||||
|
||||
func TestPtrQueryAddr(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
qname string
|
||||
want string
|
||||
wantOK bool
|
||||
}{
|
||||
{name: "ipv4", qname: "4.3.2.1.in-addr.arpa.", want: "1.2.3.4", wantOK: true},
|
||||
{name: "ipv4 no trailing dot", qname: "1.0.0.127.in-addr.arpa", want: "127.0.0.1", wantOK: true},
|
||||
{
|
||||
name: "ipv6",
|
||||
qname: "1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.",
|
||||
want: "2001:db8::1",
|
||||
wantOK: true,
|
||||
},
|
||||
{name: "ipv4 wrong label count", qname: "2.1.in-addr.arpa.", wantOK: false},
|
||||
{name: "ipv6 wrong nibble count", qname: "1.0.ip6.arpa.", wantOK: false},
|
||||
{name: "not a reverse name", qname: "example.com.", wantOK: false},
|
||||
{name: "ipv4 bad octet", qname: "4.3.2.999.in-addr.arpa.", wantOK: false},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, ok := ptrQueryAddr(tt.qname)
|
||||
assert.Equal(t, tt.wantOK, ok, "parse success mismatch")
|
||||
if tt.wantOK {
|
||||
assert.Equal(t, tt.want, got, "parsed address mismatch")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type mockRecordResolver struct {
|
||||
mx []*net.MX
|
||||
txt []string
|
||||
ns []*net.NS
|
||||
srv []*net.SRV
|
||||
cname string
|
||||
ptr []string
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *mockRecordResolver) LookupMX(context.Context, string) ([]*net.MX, error) {
|
||||
return m.mx, m.err
|
||||
}
|
||||
func (m *mockRecordResolver) LookupTXT(context.Context, string) ([]string, error) {
|
||||
return m.txt, m.err
|
||||
}
|
||||
func (m *mockRecordResolver) LookupNS(context.Context, string) ([]*net.NS, error) {
|
||||
return m.ns, m.err
|
||||
}
|
||||
func (m *mockRecordResolver) LookupSRV(context.Context, string, string, string) (string, []*net.SRV, error) {
|
||||
return "", m.srv, m.err
|
||||
}
|
||||
func (m *mockRecordResolver) LookupCNAME(context.Context, string) (string, error) {
|
||||
return m.cname, m.err
|
||||
}
|
||||
func (m *mockRecordResolver) LookupAddr(context.Context, string) ([]string, error) {
|
||||
return m.ptr, m.err
|
||||
}
|
||||
|
||||
func TestLookupRecords(t *testing.T) {
|
||||
notFound := &net.DNSError{IsNotFound: true, Name: "example.com."}
|
||||
|
||||
t.Run("MX success", func(t *testing.T) {
|
||||
r := &mockRecordResolver{mx: []*net.MX{{Host: "mail.example.com.", Pref: 10}}}
|
||||
rrs, rcode := LookupRecords(context.Background(), r, "example.com.", dns.TypeMX, 300)
|
||||
assert.Equal(t, dns.RcodeSuccess, rcode)
|
||||
require.Len(t, rrs, 1)
|
||||
assert.Equal(t, "mail.example.com.", rrs[0].(*dns.MX).Mx)
|
||||
})
|
||||
|
||||
t.Run("TXT short string is one character-string", func(t *testing.T) {
|
||||
r := &mockRecordResolver{txt: []string{"v=spf1 -all"}}
|
||||
rrs, rcode := LookupRecords(context.Background(), r, "example.com.", dns.TypeTXT, 300)
|
||||
assert.Equal(t, dns.RcodeSuccess, rcode)
|
||||
require.Len(t, rrs, 1)
|
||||
assert.Equal(t, []string{"v=spf1 -all"}, rrs[0].(*dns.TXT).Txt)
|
||||
})
|
||||
|
||||
t.Run("TXT chunks long strings", func(t *testing.T) {
|
||||
long := strings.Repeat("a", 300)
|
||||
r := &mockRecordResolver{txt: []string{long}}
|
||||
rrs, rcode := LookupRecords(context.Background(), r, "example.com.", dns.TypeTXT, 300)
|
||||
assert.Equal(t, dns.RcodeSuccess, rcode)
|
||||
require.Len(t, rrs, 1)
|
||||
txt := rrs[0].(*dns.TXT).Txt
|
||||
require.Len(t, txt, 2, "300-byte string should split into two character-strings")
|
||||
assert.Equal(t, 255, len(txt[0]))
|
||||
assert.Equal(t, 45, len(txt[1]))
|
||||
})
|
||||
|
||||
t.Run("NS success", func(t *testing.T) {
|
||||
r := &mockRecordResolver{ns: []*net.NS{{Host: "ns1.example.com."}}}
|
||||
rrs, rcode := LookupRecords(context.Background(), r, "example.com.", dns.TypeNS, 300)
|
||||
assert.Equal(t, dns.RcodeSuccess, rcode)
|
||||
require.Len(t, rrs, 1)
|
||||
assert.Equal(t, "ns1.example.com.", rrs[0].(*dns.NS).Ns)
|
||||
})
|
||||
|
||||
t.Run("SRV success", func(t *testing.T) {
|
||||
r := &mockRecordResolver{srv: []*net.SRV{{Target: "sip.example.com.", Port: 5060}}}
|
||||
rrs, rcode := LookupRecords(context.Background(), r, "_sip._tcp.example.com.", dns.TypeSRV, 300)
|
||||
assert.Equal(t, dns.RcodeSuccess, rcode)
|
||||
require.Len(t, rrs, 1)
|
||||
assert.Equal(t, uint16(5060), rrs[0].(*dns.SRV).Port)
|
||||
})
|
||||
|
||||
t.Run("CNAME success", func(t *testing.T) {
|
||||
r := &mockRecordResolver{cname: "target.example.com."}
|
||||
rrs, rcode := LookupRecords(context.Background(), r, "www.example.com.", dns.TypeCNAME, 300)
|
||||
assert.Equal(t, dns.RcodeSuccess, rcode)
|
||||
require.Len(t, rrs, 1)
|
||||
assert.Equal(t, "target.example.com.", rrs[0].(*dns.CNAME).Target)
|
||||
})
|
||||
|
||||
t.Run("CNAME equal to name is NODATA", func(t *testing.T) {
|
||||
r := &mockRecordResolver{cname: "example.com."}
|
||||
rrs, rcode := LookupRecords(context.Background(), r, "example.com.", dns.TypeCNAME, 300)
|
||||
assert.Equal(t, dns.RcodeSuccess, rcode)
|
||||
assert.Empty(t, rrs, "self-referential CNAME is NODATA")
|
||||
})
|
||||
|
||||
t.Run("PTR success", func(t *testing.T) {
|
||||
r := &mockRecordResolver{ptr: []string{"host.example.com."}}
|
||||
rrs, rcode := LookupRecords(context.Background(), r, "4.3.2.1.in-addr.arpa.", dns.TypePTR, 300)
|
||||
assert.Equal(t, dns.RcodeSuccess, rcode)
|
||||
require.Len(t, rrs, 1)
|
||||
assert.Equal(t, "host.example.com.", rrs[0].(*dns.PTR).Ptr)
|
||||
})
|
||||
|
||||
t.Run("PTR malformed name is NODATA", func(t *testing.T) {
|
||||
r := &mockRecordResolver{}
|
||||
rrs, rcode := LookupRecords(context.Background(), r, "example.com.", dns.TypePTR, 300)
|
||||
assert.Equal(t, dns.RcodeSuccess, rcode)
|
||||
assert.Empty(t, rrs)
|
||||
})
|
||||
|
||||
t.Run("not found is NODATA never NXDOMAIN", func(t *testing.T) {
|
||||
r := &mockRecordResolver{err: notFound}
|
||||
_, rcode := LookupRecords(context.Background(), r, "example.com.", dns.TypeMX, 300)
|
||||
assert.Equal(t, dns.RcodeSuccess, rcode, "missing record must not poison the name")
|
||||
})
|
||||
|
||||
t.Run("server failure maps to SERVFAIL", func(t *testing.T) {
|
||||
r := &mockRecordResolver{err: &net.DNSError{Err: "server misbehaving", IsTemporary: true}}
|
||||
_, rcode := LookupRecords(context.Background(), r, "example.com.", dns.TypeMX, 300)
|
||||
assert.Equal(t, dns.RcodeServerFailure, rcode)
|
||||
})
|
||||
|
||||
t.Run("unsupported type is NODATA", func(t *testing.T) {
|
||||
r := &mockRecordResolver{}
|
||||
rrs, rcode := LookupRecords(context.Background(), r, "example.com.", dns.TypeCAA, 300)
|
||||
assert.Equal(t, dns.RcodeSuccess, rcode)
|
||||
assert.Empty(t, rrs)
|
||||
})
|
||||
}
|
||||
|
||||
func TestStripOPT(t *testing.T) {
|
||||
rm := &dns.Msg{
|
||||
Extra: []dns.RR{
|
||||
|
||||
@@ -37,6 +37,12 @@ const (
|
||||
|
||||
type resolver interface {
|
||||
LookupNetIP(ctx context.Context, network, host string) ([]netip.Addr, error)
|
||||
LookupMX(ctx context.Context, name string) ([]*net.MX, error)
|
||||
LookupTXT(ctx context.Context, name string) ([]string, error)
|
||||
LookupNS(ctx context.Context, name string) ([]*net.NS, error)
|
||||
LookupSRV(ctx context.Context, service, proto, name string) (string, []*net.SRV, error)
|
||||
LookupCNAME(ctx context.Context, host string) (string, error)
|
||||
LookupAddr(ctx context.Context, addr string) ([]string, error)
|
||||
}
|
||||
|
||||
type firewaller interface {
|
||||
@@ -210,12 +216,6 @@ func (f *DNSForwarder) handleDNSQuery(logger *log.Entry, w dns.ResponseWriter, q
|
||||
qname, dns.TypeToString[question.Qtype], dns.ClassToString[question.Qclass])
|
||||
|
||||
resp := query.SetReply(query)
|
||||
network := resutil.NetworkForQtype(question.Qtype)
|
||||
if network == "" {
|
||||
resp.Rcode = dns.RcodeNotImplemented
|
||||
f.writeResponse(logger, w, resp, qname, startTime)
|
||||
return
|
||||
}
|
||||
|
||||
mostSpecificResId, matchingEntries := f.getMatchingEntries(strings.TrimSuffix(qname, "."))
|
||||
if mostSpecificResId == "" {
|
||||
@@ -227,9 +227,46 @@ func (f *DNSForwarder) handleDNSQuery(logger *log.Entry, w dns.ResponseWriter, q
|
||||
ctx, cancel := context.WithTimeout(context.Background(), upstreamTimeout)
|
||||
defer cancel()
|
||||
|
||||
reqHasEdns := query.IsEdns0() != nil
|
||||
|
||||
switch question.Qtype {
|
||||
case dns.TypeA, dns.TypeAAAA:
|
||||
f.handleAddressQuery(ctx, logger, w, resp, mostSpecificResId, matchingEntries, reqHasEdns, startTime)
|
||||
case dns.TypeMX, dns.TypeTXT, dns.TypeNS, dns.TypeSRV, dns.TypeCNAME, dns.TypePTR:
|
||||
f.handleRecordQuery(ctx, logger, w, resp, startTime)
|
||||
default:
|
||||
// The domain is routed here, so any other type is answered NODATA
|
||||
// (NOERROR, empty answer) rather than falling back to a resolver that
|
||||
// would poison the name with NXDOMAIN. The Extended DNS Error lets a
|
||||
// client tell this capability-driven NODATA apart from an
|
||||
// authoritative one. The OPT pseudo-record must not appear unless the
|
||||
// query advertised EDNS0.
|
||||
if reqHasEdns {
|
||||
attachEDE(resp, dns.ExtendedErrorCodeNotSupported, "netbird forwarder: unsupported query type")
|
||||
}
|
||||
f.writeResponse(logger, w, resp, qname, startTime)
|
||||
}
|
||||
}
|
||||
|
||||
// handleAddressQuery resolves A/AAAA queries, programs the firewall sets and
|
||||
// resolved-IP state, and caches the answer for resilience on upstream failure.
|
||||
func (f *DNSForwarder) handleAddressQuery(
|
||||
ctx context.Context,
|
||||
logger *log.Entry,
|
||||
w dns.ResponseWriter,
|
||||
resp *dns.Msg,
|
||||
mostSpecificResId route.ResID,
|
||||
matchingEntries []*ForwarderEntry,
|
||||
reqHasEdns bool,
|
||||
startTime time.Time,
|
||||
) {
|
||||
question := resp.Question[0]
|
||||
qname := strings.ToLower(question.Name)
|
||||
|
||||
network := resutil.NetworkForQtype(question.Qtype)
|
||||
result := resutil.LookupIP(ctx, f.resolver, network, qname, question.Qtype)
|
||||
if result.Err != nil {
|
||||
f.handleDNSError(ctx, logger, w, question, resp, qname, result, query.IsEdns0() != nil, startTime)
|
||||
f.handleDNSError(ctx, logger, w, question, resp, qname, result, reqHasEdns, startTime)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -240,6 +277,25 @@ func (f *DNSForwarder) handleDNSQuery(logger *log.Entry, w dns.ResponseWriter, q
|
||||
f.writeResponse(logger, w, resp, qname, startTime)
|
||||
}
|
||||
|
||||
// handleRecordQuery resolves non-address record types (MX, TXT, NS, SRV,
|
||||
// CNAME, PTR) through the host resolver. Missing records are answered NODATA so
|
||||
// the routed name is never poisoned with NXDOMAIN.
|
||||
func (f *DNSForwarder) handleRecordQuery(
|
||||
ctx context.Context,
|
||||
logger *log.Entry,
|
||||
w dns.ResponseWriter,
|
||||
resp *dns.Msg,
|
||||
startTime time.Time,
|
||||
) {
|
||||
question := resp.Question[0]
|
||||
qname := strings.ToLower(question.Name)
|
||||
|
||||
records, rcode := resutil.LookupRecords(ctx, f.resolver, qname, question.Qtype, f.ttl)
|
||||
resp.Rcode = rcode
|
||||
resp.Answer = append(resp.Answer, records...)
|
||||
f.writeResponse(logger, w, resp, qname, startTime)
|
||||
}
|
||||
|
||||
func (f *DNSForwarder) writeResponse(logger *log.Entry, w dns.ResponseWriter, resp *dns.Msg, qname string, startTime time.Time) {
|
||||
if err := w.WriteMsg(resp); err != nil {
|
||||
logger.Errorf("failed to write DNS response: %v", err)
|
||||
|
||||
@@ -133,6 +133,41 @@ func (m *MockResolver) LookupNetIP(ctx context.Context, network, host string) ([
|
||||
return args.Get(0).([]netip.Addr), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockResolver) LookupMX(ctx context.Context, name string) ([]*net.MX, error) {
|
||||
args := m.Called(ctx, name)
|
||||
recs, _ := args.Get(0).([]*net.MX)
|
||||
return recs, args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockResolver) LookupTXT(ctx context.Context, name string) ([]string, error) {
|
||||
args := m.Called(ctx, name)
|
||||
recs, _ := args.Get(0).([]string)
|
||||
return recs, args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockResolver) LookupNS(ctx context.Context, name string) ([]*net.NS, error) {
|
||||
args := m.Called(ctx, name)
|
||||
recs, _ := args.Get(0).([]*net.NS)
|
||||
return recs, args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockResolver) LookupSRV(ctx context.Context, service, proto, name string) (string, []*net.SRV, error) {
|
||||
args := m.Called(ctx, service, proto, name)
|
||||
recs, _ := args.Get(1).([]*net.SRV)
|
||||
return args.String(0), recs, args.Error(2)
|
||||
}
|
||||
|
||||
func (m *MockResolver) LookupCNAME(ctx context.Context, host string) (string, error) {
|
||||
args := m.Called(ctx, host)
|
||||
return args.String(0), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockResolver) LookupAddr(ctx context.Context, addr string) ([]string, error) {
|
||||
args := m.Called(ctx, addr)
|
||||
recs, _ := args.Get(0).([]string)
|
||||
return recs, args.Error(1)
|
||||
}
|
||||
|
||||
func TestDNSForwarder_SubdomainAccessLogic(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
@@ -545,12 +580,15 @@ func TestDNSForwarder_MultipleIPsInSingleUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDNSForwarder_ResponseCodes(t *testing.T) {
|
||||
// A type with no net.Resolver Lookup method (CAA) must answer NODATA
|
||||
// (NOERROR, empty) rather than NXDOMAIN/NOTIMP to avoid poisoning the name.
|
||||
tests := []struct {
|
||||
name string
|
||||
queryType uint16
|
||||
queryDomain string
|
||||
configured string
|
||||
expectedCode int
|
||||
expectEDE bool
|
||||
description string
|
||||
}{
|
||||
{
|
||||
@@ -562,28 +600,13 @@ func TestDNSForwarder_ResponseCodes(t *testing.T) {
|
||||
description: "RFC compliant REFUSED for unauthorized queries",
|
||||
},
|
||||
{
|
||||
name: "unsupported query type returns NOTIMP",
|
||||
queryType: dns.TypeMX,
|
||||
name: "unsupported query type returns NODATA",
|
||||
queryType: dns.TypeCAA,
|
||||
queryDomain: "example.com",
|
||||
configured: "example.com",
|
||||
expectedCode: dns.RcodeNotImplemented,
|
||||
description: "RFC compliant NOTIMP for unsupported types",
|
||||
},
|
||||
{
|
||||
name: "CNAME query returns NOTIMP",
|
||||
queryType: dns.TypeCNAME,
|
||||
queryDomain: "example.com",
|
||||
configured: "example.com",
|
||||
expectedCode: dns.RcodeNotImplemented,
|
||||
description: "CNAME queries not supported",
|
||||
},
|
||||
{
|
||||
name: "TXT query returns NOTIMP",
|
||||
queryType: dns.TypeTXT,
|
||||
queryDomain: "example.com",
|
||||
configured: "example.com",
|
||||
expectedCode: dns.RcodeNotImplemented,
|
||||
description: "TXT queries not supported",
|
||||
expectedCode: dns.RcodeSuccess,
|
||||
expectEDE: true,
|
||||
description: "Unsupported types answer NODATA, not NXDOMAIN/NOTIMP",
|
||||
},
|
||||
}
|
||||
|
||||
@@ -599,6 +622,7 @@ func TestDNSForwarder_ResponseCodes(t *testing.T) {
|
||||
|
||||
query := &dns.Msg{}
|
||||
query.SetQuestion(dns.Fqdn(tt.queryDomain), tt.queryType)
|
||||
query.SetEdns0(dns.DefaultMsgSize, false)
|
||||
|
||||
// Capture the written response
|
||||
var writtenResp *dns.Msg
|
||||
@@ -614,10 +638,213 @@ func TestDNSForwarder_ResponseCodes(t *testing.T) {
|
||||
// Check the response written to the writer
|
||||
require.NotNil(t, writtenResp, "Expected response to be written")
|
||||
assert.Equal(t, tt.expectedCode, writtenResp.Rcode, tt.description)
|
||||
assert.Empty(t, writtenResp.Answer, "Non-address response should carry no answers")
|
||||
|
||||
if tt.expectEDE {
|
||||
require.NotNil(t, writtenResp.IsEdns0(), "EDNS0 client should get an OPT in the reply")
|
||||
assert.True(t, hasEDE(writtenResp, dns.ExtendedErrorCodeNotSupported),
|
||||
"unsupported type NODATA should carry EDE Not Supported")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func hasEDE(m *dns.Msg, code uint16) bool {
|
||||
opt := m.IsEdns0()
|
||||
if opt == nil {
|
||||
return false
|
||||
}
|
||||
for _, o := range opt.Option {
|
||||
if ede, ok := o.(*dns.EDNS0_EDE); ok && ede.InfoCode == code {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func TestDNSForwarder_RecordQueries(t *testing.T) {
|
||||
notFound := &net.DNSError{IsNotFound: true, Name: "example.com"}
|
||||
|
||||
t.Run("MX records are forwarded", func(t *testing.T) {
|
||||
mockResolver := &MockResolver{}
|
||||
forwarder := newRecordTestForwarder(t, mockResolver, "example.com")
|
||||
|
||||
mockResolver.On("LookupMX", mock.Anything, "example.com.").
|
||||
Return([]*net.MX{{Host: "mail.example.com.", Pref: 10}}, nil).Once()
|
||||
|
||||
resp := runRecordQuery(t, forwarder, "example.com", dns.TypeMX)
|
||||
require.Equal(t, dns.RcodeSuccess, resp.Rcode)
|
||||
require.Len(t, resp.Answer, 1)
|
||||
mx, ok := resp.Answer[0].(*dns.MX)
|
||||
require.True(t, ok, "answer should be an MX record")
|
||||
assert.Equal(t, uint16(10), mx.Preference)
|
||||
assert.Equal(t, "mail.example.com.", mx.Mx)
|
||||
mockResolver.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("missing MX is NODATA not NXDOMAIN", func(t *testing.T) {
|
||||
mockResolver := &MockResolver{}
|
||||
forwarder := newRecordTestForwarder(t, mockResolver, "example.com")
|
||||
|
||||
// A not-found cannot prove the name is absent (it may exist with only
|
||||
// other record types), so it must answer NODATA, never NXDOMAIN.
|
||||
mockResolver.On("LookupMX", mock.Anything, "example.com.").
|
||||
Return(nil, notFound).Once()
|
||||
|
||||
resp := runRecordQuery(t, forwarder, "example.com", dns.TypeMX)
|
||||
assert.Equal(t, dns.RcodeSuccess, resp.Rcode, "missing record must be NODATA")
|
||||
assert.Empty(t, resp.Answer)
|
||||
mockResolver.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("NS records are forwarded", func(t *testing.T) {
|
||||
mockResolver := &MockResolver{}
|
||||
forwarder := newRecordTestForwarder(t, mockResolver, "example.com")
|
||||
|
||||
mockResolver.On("LookupNS", mock.Anything, "example.com.").
|
||||
Return([]*net.NS{{Host: "ns1.example.com."}}, nil).Once()
|
||||
|
||||
resp := runRecordQuery(t, forwarder, "example.com", dns.TypeNS)
|
||||
require.Equal(t, dns.RcodeSuccess, resp.Rcode)
|
||||
require.Len(t, resp.Answer, 1)
|
||||
ns, ok := resp.Answer[0].(*dns.NS)
|
||||
require.True(t, ok, "answer should be an NS record")
|
||||
assert.Equal(t, "ns1.example.com.", ns.Ns)
|
||||
mockResolver.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("missing NS is NODATA", func(t *testing.T) {
|
||||
mockResolver := &MockResolver{}
|
||||
forwarder := newRecordTestForwarder(t, mockResolver, "example.com")
|
||||
|
||||
mockResolver.On("LookupNS", mock.Anything, "example.com.").
|
||||
Return(nil, notFound).Once()
|
||||
|
||||
resp := runRecordQuery(t, forwarder, "example.com", dns.TypeNS)
|
||||
assert.Equal(t, dns.RcodeSuccess, resp.Rcode)
|
||||
assert.Empty(t, resp.Answer)
|
||||
mockResolver.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("SRV records are forwarded", func(t *testing.T) {
|
||||
mockResolver := &MockResolver{}
|
||||
forwarder := newRecordTestForwarder(t, mockResolver, "_sip._tcp.example.com")
|
||||
|
||||
mockResolver.On("LookupSRV", mock.Anything, "", "", "_sip._tcp.example.com.").
|
||||
Return("", []*net.SRV{{Target: "sip.example.com.", Port: 5060, Priority: 10, Weight: 5}}, nil).Once()
|
||||
|
||||
resp := runRecordQuery(t, forwarder, "_sip._tcp.example.com", dns.TypeSRV)
|
||||
require.Equal(t, dns.RcodeSuccess, resp.Rcode)
|
||||
require.Len(t, resp.Answer, 1)
|
||||
srv, ok := resp.Answer[0].(*dns.SRV)
|
||||
require.True(t, ok, "answer should be an SRV record")
|
||||
assert.Equal(t, "sip.example.com.", srv.Target)
|
||||
assert.Equal(t, uint16(5060), srv.Port)
|
||||
assert.Equal(t, uint16(10), srv.Priority)
|
||||
mockResolver.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("missing SRV is NODATA", func(t *testing.T) {
|
||||
mockResolver := &MockResolver{}
|
||||
forwarder := newRecordTestForwarder(t, mockResolver, "_sip._tcp.example.com")
|
||||
|
||||
mockResolver.On("LookupSRV", mock.Anything, "", "", "_sip._tcp.example.com.").
|
||||
Return("", nil, notFound).Once()
|
||||
|
||||
resp := runRecordQuery(t, forwarder, "_sip._tcp.example.com", dns.TypeSRV)
|
||||
assert.Equal(t, dns.RcodeSuccess, resp.Rcode)
|
||||
assert.Empty(t, resp.Answer)
|
||||
mockResolver.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("TXT records are forwarded", func(t *testing.T) {
|
||||
mockResolver := &MockResolver{}
|
||||
forwarder := newRecordTestForwarder(t, mockResolver, "example.com")
|
||||
|
||||
mockResolver.On("LookupTXT", mock.Anything, "example.com.").
|
||||
Return([]string{"v=spf1 -all"}, nil).Once()
|
||||
|
||||
resp := runRecordQuery(t, forwarder, "example.com", dns.TypeTXT)
|
||||
require.Equal(t, dns.RcodeSuccess, resp.Rcode)
|
||||
require.Len(t, resp.Answer, 1)
|
||||
txt, ok := resp.Answer[0].(*dns.TXT)
|
||||
require.True(t, ok, "answer should be a TXT record")
|
||||
assert.Equal(t, []string{"v=spf1 -all"}, txt.Txt)
|
||||
mockResolver.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("CNAME record is forwarded", func(t *testing.T) {
|
||||
mockResolver := &MockResolver{}
|
||||
forwarder := newRecordTestForwarder(t, mockResolver, "www.example.com")
|
||||
|
||||
mockResolver.On("LookupCNAME", mock.Anything, "www.example.com.").
|
||||
Return("target.example.com.", nil).Once()
|
||||
|
||||
resp := runRecordQuery(t, forwarder, "www.example.com", dns.TypeCNAME)
|
||||
require.Equal(t, dns.RcodeSuccess, resp.Rcode)
|
||||
require.Len(t, resp.Answer, 1)
|
||||
cname, ok := resp.Answer[0].(*dns.CNAME)
|
||||
require.True(t, ok, "answer should be a CNAME record")
|
||||
assert.Equal(t, "target.example.com.", cname.Target)
|
||||
mockResolver.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("CNAME equal to the name is NODATA", func(t *testing.T) {
|
||||
mockResolver := &MockResolver{}
|
||||
forwarder := newRecordTestForwarder(t, mockResolver, "example.com")
|
||||
|
||||
// No CNAME exists: LookupCNAME echoes the queried name back.
|
||||
mockResolver.On("LookupCNAME", mock.Anything, "example.com.").
|
||||
Return("example.com.", nil).Once()
|
||||
|
||||
resp := runRecordQuery(t, forwarder, "example.com", dns.TypeCNAME)
|
||||
assert.Equal(t, dns.RcodeSuccess, resp.Rcode)
|
||||
assert.Empty(t, resp.Answer, "self-referential CNAME means no CNAME record")
|
||||
mockResolver.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("PTR record is forwarded", func(t *testing.T) {
|
||||
mockResolver := &MockResolver{}
|
||||
forwarder := newRecordTestForwarder(t, mockResolver, "*.in-addr.arpa")
|
||||
|
||||
// The reverse name is parsed back to the address LookupAddr expects.
|
||||
mockResolver.On("LookupAddr", mock.Anything, "1.2.3.4").
|
||||
Return([]string{"host.example.com."}, nil).Once()
|
||||
|
||||
resp := runRecordQuery(t, forwarder, "4.3.2.1.in-addr.arpa", dns.TypePTR)
|
||||
require.Equal(t, dns.RcodeSuccess, resp.Rcode)
|
||||
require.Len(t, resp.Answer, 1)
|
||||
ptr, ok := resp.Answer[0].(*dns.PTR)
|
||||
require.True(t, ok, "answer should be a PTR record")
|
||||
assert.Equal(t, "host.example.com.", ptr.Ptr)
|
||||
mockResolver.AssertExpectations(t)
|
||||
})
|
||||
}
|
||||
|
||||
func newRecordTestForwarder(t *testing.T, r resolver, configured string) *DNSForwarder {
|
||||
t.Helper()
|
||||
forwarder := NewDNSForwarder(netip.MustParseAddrPort("127.0.0.1:0"), 300, nil, &peer.Status{}, nil)
|
||||
forwarder.resolver = r
|
||||
|
||||
d, err := domain.FromString(configured)
|
||||
require.NoError(t, err)
|
||||
forwarder.UpdateDomains([]*ForwarderEntry{{Domain: d, ResID: "test-res"}})
|
||||
return forwarder
|
||||
}
|
||||
|
||||
func runRecordQuery(t *testing.T, forwarder *DNSForwarder, qname string, qtype uint16) *dns.Msg {
|
||||
t.Helper()
|
||||
query := &dns.Msg{}
|
||||
query.SetQuestion(dns.Fqdn(qname), qtype)
|
||||
|
||||
mockWriter := &test.MockResponseWriter{}
|
||||
forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), mockWriter, query, time.Now())
|
||||
|
||||
resp := mockWriter.GetLastResponse()
|
||||
require.NotNil(t, resp, "expected response to be written")
|
||||
return resp
|
||||
}
|
||||
|
||||
func TestDNSForwarder_UpstreamFailureEDE(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
@@ -895,6 +895,16 @@ func (e *Engine) handleAutoUpdateVersion(autoUpdateSettings *mgmProto.AutoUpdate
|
||||
e.updateManager.SetVersion(autoUpdateSettings.Version, autoUpdateSettings.AlwaysUpdate)
|
||||
}
|
||||
|
||||
// phase times a sync sub-phase: it returns a function that records the elapsed
|
||||
// duration when called. Starting the timer at the call site keeps inter-phase
|
||||
// glue code out of the measurement.
|
||||
func (e *Engine) phase(name string) func() {
|
||||
start := time.Now()
|
||||
return func() {
|
||||
e.clientMetrics.RecordSyncPhase(e.ctx, name, time.Since(start))
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
@@ -914,7 +924,10 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
e.handleAutoUpdateVersion(update.NetworkMap.PeerConfig.AutoUpdate)
|
||||
}
|
||||
|
||||
if err := e.updateNetbirdConfig(update.GetNetbirdConfig()); err != nil {
|
||||
done := e.phase("netbird_config")
|
||||
err := e.updateNetbirdConfig(update.GetNetbirdConfig())
|
||||
done()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -928,11 +941,16 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := e.updateChecksIfNew(update.Checks); err != nil {
|
||||
done = e.phase("checks")
|
||||
err = e.updateChecksIfNew(update.Checks)
|
||||
done()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
done = e.phase("persist")
|
||||
e.persistSyncResponse(update)
|
||||
done()
|
||||
|
||||
// only apply new changes and ignore old ones
|
||||
if err := e.updateNetworkMap(nm); err != nil {
|
||||
@@ -1371,13 +1389,16 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
|
||||
dnsConfig := toDNSConfig(protoDNSConfig, e.wgInterface.Address())
|
||||
|
||||
done := e.phase("dns_server")
|
||||
if err := e.dnsServer.UpdateDNSServer(serial, dnsConfig); err != nil {
|
||||
log.Errorf("failed to update dns server, err: %v", err)
|
||||
}
|
||||
done()
|
||||
|
||||
e.routeManager.SetDNSForwarderPort(dnsConfig.ForwarderPort)
|
||||
|
||||
// apply routes first, route related actions might depend on routing being enabled
|
||||
done = e.phase("routes_classify")
|
||||
routes := toRoutes(networkMap.GetRoutes())
|
||||
serverRoutes, clientRoutes := e.routeManager.ClassifyRoutes(routes)
|
||||
|
||||
@@ -1386,29 +1407,60 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
e.connMgr.UpdateRouteHAMap(clientRoutes)
|
||||
log.Debugf("updated lazy connection manager with %d HA groups", len(clientRoutes))
|
||||
}
|
||||
done()
|
||||
|
||||
done = e.phase("routes_apply")
|
||||
dnsRouteFeatureFlag := toDNSFeatureFlag(networkMap)
|
||||
if err := e.routeManager.UpdateRoutes(serial, serverRoutes, clientRoutes, dnsRouteFeatureFlag); err != nil {
|
||||
log.Errorf("failed to update routes: %v", err)
|
||||
}
|
||||
done()
|
||||
|
||||
done = e.phase("filtering")
|
||||
if e.acl != nil {
|
||||
e.acl.ApplyFiltering(networkMap, dnsRouteFeatureFlag)
|
||||
}
|
||||
done()
|
||||
|
||||
done = e.phase("dns_forwarder")
|
||||
fwdEntries := toRouteDomains(e.config.WgPrivateKey.PublicKey().String(), routes)
|
||||
e.updateDNSForwarder(dnsRouteFeatureFlag, fwdEntries)
|
||||
done()
|
||||
|
||||
// Ingress forward rules
|
||||
done = e.phase("forward_rules")
|
||||
forwardingRules, err := e.updateForwardRules(networkMap.GetForwardingRules())
|
||||
if err != nil {
|
||||
log.Errorf("failed to update forward rules, err: %v", err)
|
||||
}
|
||||
done()
|
||||
|
||||
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers()))
|
||||
|
||||
done = e.phase("offline_peers")
|
||||
e.updateOfflinePeers(networkMap.GetOfflinePeers())
|
||||
done()
|
||||
|
||||
remotePeers, err := e.reconcilePeers(networkMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store
|
||||
done = e.phase("lazy_exclude")
|
||||
excludedLazyPeers := e.toExcludedLazyPeers(forwardingRules, remotePeers)
|
||||
e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers)
|
||||
done()
|
||||
|
||||
e.networkSerial = serial
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// reconcilePeers applies the remote peer list from the network map (removing,
|
||||
// modifying and adding peers, then updating SSH config) and returns the remote
|
||||
// peers with our own peer filtered out, for use by later sync steps.
|
||||
func (e *Engine) reconcilePeers(networkMap *mgmProto.NetworkMap) ([]*mgmProto.RemotePeerConfig, error) {
|
||||
// Filter out own peer from the remote peers list
|
||||
localPubKey := e.config.WgPrivateKey.PublicKey().String()
|
||||
remotePeers := make([]*mgmProto.RemotePeerConfig, 0, len(networkMap.GetRemotePeers()))
|
||||
@@ -1423,42 +1475,43 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
err := e.removeAllPeers()
|
||||
e.statusRecorder.FinishPeerListModifications()
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
err := e.removePeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = e.modifyPeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = e.addNewPeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.statusRecorder.FinishPeerListModifications()
|
||||
|
||||
e.updatePeerSSHHostKeys(remotePeers)
|
||||
|
||||
if err := e.updateSSHClientConfig(remotePeers); err != nil {
|
||||
log.Warnf("failed to update SSH client config: %v", err)
|
||||
}
|
||||
|
||||
e.updateSSHServerAuth(networkMap.GetSshAuth())
|
||||
return remotePeers, nil
|
||||
}
|
||||
|
||||
// must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store
|
||||
excludedLazyPeers := e.toExcludedLazyPeers(forwardingRules, remotePeers)
|
||||
e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers)
|
||||
done := e.phase("removed_peers")
|
||||
err := e.removePeers(remotePeers)
|
||||
done()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
e.networkSerial = serial
|
||||
done = e.phase("modified_peers")
|
||||
err = e.modifyPeers(remotePeers)
|
||||
done()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil
|
||||
done = e.phase("added_peers")
|
||||
err = e.addNewPeers(remotePeers)
|
||||
done()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
e.statusRecorder.FinishPeerListModifications()
|
||||
|
||||
e.updatePeerSSHHostKeys(remotePeers)
|
||||
|
||||
if err := e.updateSSHClientConfig(remotePeers); err != nil {
|
||||
log.Warnf("failed to update SSH client config: %v", err)
|
||||
}
|
||||
|
||||
e.updateSSHServerAuth(networkMap.GetSshAuth())
|
||||
|
||||
return remotePeers, nil
|
||||
}
|
||||
|
||||
func toDNSFeatureFlag(networkMap *mgmProto.NetworkMap) bool {
|
||||
|
||||
@@ -120,6 +120,30 @@ func (m *influxDBMetrics) RecordSyncDuration(_ context.Context, agentInfo AgentI
|
||||
m.trimLocked()
|
||||
}
|
||||
|
||||
func (m *influxDBMetrics) RecordSyncPhase(_ context.Context, agentInfo AgentInfo, phase string, duration time.Duration) {
|
||||
tags := fmt.Sprintf("deployment_type=%s,version=%s,os=%s,arch=%s,peer_id=%s,phase=%s",
|
||||
agentInfo.DeploymentType.String(),
|
||||
agentInfo.Version,
|
||||
agentInfo.OS,
|
||||
agentInfo.Arch,
|
||||
agentInfo.peerID,
|
||||
phase,
|
||||
)
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.samples = append(m.samples, influxSample{
|
||||
measurement: "netbird_sync_phase",
|
||||
tags: tags,
|
||||
fields: map[string]float64{
|
||||
"duration_seconds": duration.Seconds(),
|
||||
},
|
||||
timestamp: time.Now(),
|
||||
})
|
||||
m.trimLocked()
|
||||
}
|
||||
|
||||
func (m *influxDBMetrics) RecordLoginDuration(_ context.Context, agentInfo AgentInfo, duration time.Duration, success bool) {
|
||||
result := "success"
|
||||
if !success {
|
||||
|
||||
@@ -78,6 +78,25 @@ Tags:
|
||||
- `os`: Operating system (linux, darwin, windows, android, ios, etc.)
|
||||
- `arch`: CPU architecture (amd64, arm64, etc.)
|
||||
|
||||
### Sync Phase Timing
|
||||
|
||||
Measurement: `netbird_sync_phase`
|
||||
|
||||
Breaks down where time goes inside a single sync, so the total `netbird_sync` duration can be attributed to the sub-step that dominates.
|
||||
|
||||
| Field | Description |
|
||||
|-------|-------------|
|
||||
| `duration_seconds` | Time spent in one sub-phase of sync processing |
|
||||
|
||||
Tags:
|
||||
- `phase`: the sub-phase — `netbird_config`, `checks`, `persist`, `dns_server`, `routes_classify`, `routes_apply`, `filtering`, `dns_forwarder`, `forward_rules`, `offline_peers`, `removed_peers`, `modified_peers`, `added_peers`, `lazy_exclude`
|
||||
- `deployment_type`: "cloud" | "selfhosted" | "unknown"
|
||||
- `version`: NetBird version string
|
||||
- `os`: Operating system (linux, darwin, windows, android, ios, etc.)
|
||||
- `arch`: CPU architecture (amd64, arm64, etc.)
|
||||
|
||||
**Note:** this is wall-time per phase — it includes both CPU work and time spent waiting on locks. A slow phase points to *where* the time goes, not *why*; pair it with lock-wait metrics to tell contention apart from real work.
|
||||
|
||||
### Login Duration
|
||||
|
||||
Measurement: `netbird_login`
|
||||
@@ -191,4 +210,52 @@ docker compose exec influxdb influx query \
|
||||
|
||||
# Check ingest server health
|
||||
curl http://localhost:8087/health
|
||||
```
|
||||
```
|
||||
|
||||
## Analyzing a Debug Bundle
|
||||
|
||||
Metrics collection is always on, so every debug bundle ships a `metrics.txt` in InfluxDB line protocol — a timestamped time series of all recorded events (sync durations, sync phases, connection stages, login). You can replay it into the local stack and graph it, without a running client.
|
||||
|
||||
The bundle's `metrics.txt` is a rolling window (capped at 5 days / ~20k samples, see [Buffer Limits](#buffer-limits)). For a connection incident the relevant window is short (connection setup is seconds), so a bundle captured during the issue is enough.
|
||||
|
||||
### 1. Start the stack
|
||||
|
||||
```bash
|
||||
# From this directory (client/internal/metrics/infra)
|
||||
INFLUXDB_ADMIN_TOKEN=admin123 INFLUXDB_ADMIN_PASSWORD=admin123 GRAFANA_ADMIN_PASSWORD=admin123 \
|
||||
docker compose up -d
|
||||
```
|
||||
|
||||
(`admin123` are throwaway local credentials — fine for offline analysis.)
|
||||
|
||||
### 2. Clear any previous data
|
||||
|
||||
So you only see this bundle:
|
||||
|
||||
```bash
|
||||
docker exec influxdb influx delete --org netbird --bucket metrics --token admin123 \
|
||||
--start 1970-01-01T00:00:00Z --stop 2100-01-01T00:00:00Z
|
||||
```
|
||||
|
||||
### 3. Import the bundle's metrics.txt
|
||||
|
||||
InfluxDB is not exposed on the host, so import inside the container:
|
||||
|
||||
```bash
|
||||
docker cp /path/to/bundle/metrics.txt influxdb:/tmp/m.txt
|
||||
docker exec influxdb influx write --org netbird --bucket metrics --precision ns \
|
||||
--token admin123 --file /tmp/m.txt
|
||||
```
|
||||
|
||||
Re-importing the same file is idempotent (same measurement+tags+timestamp overwrites).
|
||||
|
||||
### 4. View the dashboards
|
||||
|
||||
Grafana on http://localhost:3001 (login `admin` / `admin123`), datasource pre-provisioned:
|
||||
|
||||
- **Where sync time goes:** http://localhost:3001/d/netbird-sync-phases/netbird-sync-phases-where-time-goes
|
||||
- **General client metrics:** http://localhost:3001/d/netbird-influxdb-metrics
|
||||
|
||||
**Set the time range** to cover the bundle's timestamps (e.g. "Last 7 days" or an absolute range matching when the bundle was taken) — with the default short range the panels look empty.
|
||||
|
||||
Bundles are distinguishable by the `version` tag; add a tag at import time (e.g. `sed 's/^netbird_\([a-z_]*\),/netbird_\1,bundle=mycase,/' metrics.txt`) if you want to compare several side by side.
|
||||
@@ -0,0 +1,259 @@
|
||||
{
|
||||
"annotations": {
|
||||
"list": []
|
||||
},
|
||||
"editable": true,
|
||||
"fiscalYearStartMonth": 0,
|
||||
"graphTooltip": 1,
|
||||
"links": [],
|
||||
"refresh": "",
|
||||
"schemaVersion": 39,
|
||||
"tags": [
|
||||
"netbird",
|
||||
"sync"
|
||||
],
|
||||
"templating": {
|
||||
"list": [
|
||||
{
|
||||
"current": {
|
||||
"text": "All",
|
||||
"value": "$__all"
|
||||
},
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"definition": "import \"influxdata/influxdb/schema\"\nschema.tagValues(bucket: \"metrics\", tag: \"version\")",
|
||||
"includeAll": true,
|
||||
"label": "version",
|
||||
"multi": true,
|
||||
"name": "version",
|
||||
"query": "import \"influxdata/influxdb/schema\"\nschema.tagValues(bucket: \"metrics\", tag: \"version\")",
|
||||
"refresh": 2,
|
||||
"type": "query",
|
||||
"allValue": ".*"
|
||||
}
|
||||
]
|
||||
},
|
||||
"time": {
|
||||
"from": "now-2d",
|
||||
"to": "now"
|
||||
},
|
||||
"timepicker": {},
|
||||
"timezone": "",
|
||||
"title": "NetBird Sync Phases (where time goes)",
|
||||
"uid": "netbird-sync-phases",
|
||||
"version": 1,
|
||||
"panels": [
|
||||
{
|
||||
"id": 1,
|
||||
"title": "Time per phase over time (stacked, ms)",
|
||||
"type": "timeseries",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 10,
|
||||
"w": 24,
|
||||
"x": 0,
|
||||
"y": 0
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "ms",
|
||||
"custom": {
|
||||
"drawStyle": "bars",
|
||||
"stacking": {
|
||||
"mode": "normal",
|
||||
"group": "A"
|
||||
},
|
||||
"fillOpacity": 80,
|
||||
"lineWidth": 0
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": {
|
||||
"displayMode": "table",
|
||||
"placement": "right",
|
||||
"calcs": [
|
||||
"max",
|
||||
"mean"
|
||||
]
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "multi",
|
||||
"sort": "desc"
|
||||
}
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"refId": "A",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync_phase\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> keep(columns: [\"_time\", \"_value\", \"phase\"])\n |> group(columns: [\"phase\"])"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"title": "p95 per phase (ms)",
|
||||
"type": "bargauge",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 11,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 10
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "ms",
|
||||
"color": {
|
||||
"mode": "continuous-GrYlRd"
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"displayMode": "gradient",
|
||||
"orientation": "horizontal",
|
||||
"reduceOptions": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"fields": "",
|
||||
"values": false
|
||||
},
|
||||
"showUnfilled": true
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"refId": "A",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync_phase\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> group(columns: [\"phase\"])\n |> quantile(q: 0.95)\n |> group()\n |> sort(columns: [\"_value\"], desc: true)"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 3,
|
||||
"title": "Per-phase stats (ms): mean / p95 / max",
|
||||
"type": "table",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 11,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 10
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "ms"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"showHeader": true,
|
||||
"sortBy": [
|
||||
{
|
||||
"displayName": "max",
|
||||
"desc": true
|
||||
}
|
||||
]
|
||||
},
|
||||
"transformations": [
|
||||
{
|
||||
"id": "merge",
|
||||
"options": {}
|
||||
}
|
||||
],
|
||||
"targets": [
|
||||
{
|
||||
"refId": "mean",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync_phase\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> group(columns: [\"phase\"])\n |> mean()\n |> group()\n |> keep(columns: [\"phase\", \"_value\"])\n |> rename(columns: {_value: \"mean\"})"
|
||||
},
|
||||
{
|
||||
"refId": "p95",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync_phase\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> group(columns: [\"phase\"])\n |> quantile(q: 0.95)\n |> group()\n |> keep(columns: [\"phase\", \"_value\"])\n |> rename(columns: {_value: \"p95\"})"
|
||||
},
|
||||
{
|
||||
"refId": "max",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync_phase\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> group(columns: [\"phase\"])\n |> max()\n |> group()\n |> keep(columns: [\"phase\", \"_value\"])\n |> rename(columns: {_value: \"max\"})"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 4,
|
||||
"title": "Total sync duration (netbird_sync, ms) \u2014 reference",
|
||||
"type": "timeseries",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 24,
|
||||
"x": 0,
|
||||
"y": 21
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "ms",
|
||||
"custom": {
|
||||
"drawStyle": "points",
|
||||
"pointSize": 5
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": {
|
||||
"displayMode": "table",
|
||||
"placement": "right",
|
||||
"calcs": [
|
||||
"max",
|
||||
"mean"
|
||||
]
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "single"
|
||||
}
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"refId": "A",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> keep(columns: [\"_time\", \"_value\", \"version\"])\n |> group(columns: [\"version\"])"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -59,6 +59,19 @@ var allowedMeasurements = map[string]measurementSpec{
|
||||
"peer_id": true,
|
||||
},
|
||||
},
|
||||
"netbird_sync_phase": {
|
||||
allowedFields: map[string]bool{
|
||||
"duration_seconds": true,
|
||||
},
|
||||
allowedTags: map[string]bool{
|
||||
"deployment_type": true,
|
||||
"version": true,
|
||||
"os": true,
|
||||
"arch": true,
|
||||
"peer_id": true,
|
||||
"phase": true,
|
||||
},
|
||||
},
|
||||
"netbird_login": {
|
||||
allowedFields: map[string]bool{
|
||||
"duration_seconds": true,
|
||||
|
||||
@@ -56,6 +56,9 @@ type metricsImplementation interface {
|
||||
// RecordSyncDuration records how long it took to process a sync message
|
||||
RecordSyncDuration(ctx context.Context, agentInfo AgentInfo, duration time.Duration)
|
||||
|
||||
// RecordSyncPhase records how long a single sub-phase of sync processing took
|
||||
RecordSyncPhase(ctx context.Context, agentInfo AgentInfo, phase string, duration time.Duration)
|
||||
|
||||
// RecordLoginDuration records how long the login to management took
|
||||
RecordLoginDuration(ctx context.Context, agentInfo AgentInfo, duration time.Duration, success bool)
|
||||
|
||||
@@ -127,6 +130,18 @@ func (c *ClientMetrics) RecordSyncDuration(ctx context.Context, duration time.Du
|
||||
c.impl.RecordSyncDuration(ctx, agentInfo, duration)
|
||||
}
|
||||
|
||||
// RecordSyncPhase records the duration of a single sub-phase of sync processing
|
||||
func (c *ClientMetrics) RecordSyncPhase(ctx context.Context, phase string, duration time.Duration) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
c.mu.RLock()
|
||||
agentInfo := c.agentInfo
|
||||
c.mu.RUnlock()
|
||||
|
||||
c.impl.RecordSyncPhase(ctx, agentInfo, phase, duration)
|
||||
}
|
||||
|
||||
// RecordLoginDuration records how long the login to management server took
|
||||
func (c *ClientMetrics) RecordLoginDuration(ctx context.Context, duration time.Duration, success bool) {
|
||||
if c == nil {
|
||||
|
||||
@@ -70,6 +70,9 @@ func (m *mockMetrics) RecordConnectionStages(_ context.Context, _ AgentInfo, _ s
|
||||
func (m *mockMetrics) RecordSyncDuration(_ context.Context, _ AgentInfo, _ time.Duration) {
|
||||
}
|
||||
|
||||
func (m *mockMetrics) RecordSyncPhase(_ context.Context, _ AgentInfo, _ string, _ time.Duration) {
|
||||
}
|
||||
|
||||
func (m *mockMetrics) RecordLoginDuration(_ context.Context, _ AgentInfo, _ time.Duration, _ bool) {
|
||||
}
|
||||
|
||||
|
||||
@@ -226,12 +226,11 @@ func (d *DnsInterceptor) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
|
||||
return
|
||||
}
|
||||
|
||||
// pass if non A/AAAA query
|
||||
if r.Question[0].Qtype != dns.TypeA && r.Question[0].Qtype != dns.TypeAAAA {
|
||||
d.continueToNextHandler(w, r, logger, "non A/AAAA query")
|
||||
return
|
||||
}
|
||||
|
||||
// All query types for an intercepted domain are forwarded to the peer's
|
||||
// DNS forwarder, which owns the name. Falling through to the system
|
||||
// resolver would let it answer NXDOMAIN for a name it isn't authoritative
|
||||
// for, poisoning the whole name (including the A/AAAA records the route
|
||||
// does serve). The forwarder answers NODATA for types it cannot resolve.
|
||||
d.mu.RLock()
|
||||
peerKey := d.currentPeerKey
|
||||
d.mu.RUnlock()
|
||||
@@ -293,19 +292,6 @@ func (d *DnsInterceptor) writeDNSError(w dns.ResponseWriter, r *dns.Msg, logger
|
||||
}
|
||||
}
|
||||
|
||||
// continueToNextHandler signals the handler chain to try the next handler
|
||||
func (d *DnsInterceptor) continueToNextHandler(w dns.ResponseWriter, r *dns.Msg, logger *log.Entry, reason string) {
|
||||
logger.Tracef("continuing to next handler for domain=%s reason=%s", r.Question[0].Name, reason)
|
||||
|
||||
resp := new(dns.Msg)
|
||||
resp.SetRcode(r, dns.RcodeNameError)
|
||||
// Set Zero bit to signal handler chain to continue
|
||||
resp.MsgHdr.Zero = true
|
||||
if err := w.WriteMsg(resp); err != nil {
|
||||
logger.Errorf("failed writing DNS continue response: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DnsInterceptor) getUpstreamIP(peerKey string) (netip.Addr, error) {
|
||||
peerAllowedIP, exists := d.peerStore.AllowedIP(peerKey)
|
||||
if !exists {
|
||||
|
||||
@@ -55,6 +55,14 @@ type GrpcClient struct {
|
||||
connStateCallback ConnStateNotifier
|
||||
connStateCallbackLock sync.RWMutex
|
||||
serverURL string
|
||||
|
||||
// syncStreamErr holds the last Sync stream error, or nil while the stream
|
||||
// is established and healthy. GetServerKey succeeds even when the peer
|
||||
// cannot sync (e.g. the server returns "settings not found"), so the
|
||||
// health probe must consult this to avoid reporting a healthy management
|
||||
// connection while the Sync stream keeps failing.
|
||||
syncStreamMu sync.RWMutex
|
||||
syncStreamErr error
|
||||
}
|
||||
|
||||
type ExposeRequest struct {
|
||||
@@ -364,6 +372,8 @@ func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes.
|
||||
stream, err := c.connectToSyncStream(ctx, serverPubKey, sysInfo)
|
||||
if err != nil {
|
||||
log.Debugf("failed to open Management Service stream: %s", err)
|
||||
c.notifyDisconnected(err)
|
||||
c.setSyncStreamDisconnected(err)
|
||||
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
|
||||
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
|
||||
}
|
||||
@@ -372,11 +382,13 @@ func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes.
|
||||
|
||||
log.Infof("connected to the Management Service stream")
|
||||
c.notifyConnected()
|
||||
c.setSyncStreamConnected()
|
||||
|
||||
// blocking until error
|
||||
err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler)
|
||||
if err != nil {
|
||||
c.notifyDisconnected(err)
|
||||
c.setSyncStreamDisconnected(err)
|
||||
if ctx.Err() != nil {
|
||||
log.Debugf("management connection context has been canceled, this usually indicates shutdown")
|
||||
return nil
|
||||
@@ -530,6 +542,13 @@ func (c *GrpcClient) IsHealthy() bool {
|
||||
log.Warnf("health check returned: %s", err)
|
||||
return false
|
||||
}
|
||||
|
||||
if syncErr := c.syncStreamError(); syncErr != nil {
|
||||
c.notifyDisconnected(syncErr)
|
||||
log.Warnf("management transport is up but the Sync stream is unhealthy: %s", syncErr)
|
||||
return false
|
||||
}
|
||||
|
||||
c.notifyConnected()
|
||||
return true
|
||||
}
|
||||
@@ -771,6 +790,24 @@ func (c *GrpcClient) SyncMeta(sysInfo *system.Info) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *GrpcClient) setSyncStreamConnected() {
|
||||
c.syncStreamMu.Lock()
|
||||
defer c.syncStreamMu.Unlock()
|
||||
c.syncStreamErr = nil
|
||||
}
|
||||
|
||||
func (c *GrpcClient) setSyncStreamDisconnected(err error) {
|
||||
c.syncStreamMu.Lock()
|
||||
defer c.syncStreamMu.Unlock()
|
||||
c.syncStreamErr = err
|
||||
}
|
||||
|
||||
func (c *GrpcClient) syncStreamError() error {
|
||||
c.syncStreamMu.RLock()
|
||||
defer c.syncStreamMu.RUnlock()
|
||||
return c.syncStreamErr
|
||||
}
|
||||
|
||||
func (c *GrpcClient) notifyDisconnected(err error) {
|
||||
c.connStateCallbackLock.RLock()
|
||||
defer c.connStateCallbackLock.RUnlock()
|
||||
|
||||
@@ -85,6 +85,7 @@ type GrpcClient struct {
|
||||
// receive backpressure as a dead stream: reconnecting cannot help, since the
|
||||
// new stream feeds the same worker, and only triggers a reconnect storm.
|
||||
receiveHandoffBlocked atomic.Bool
|
||||
watchdogWg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewClient creates a new Signal client
|
||||
@@ -200,10 +201,18 @@ func (c *GrpcClient) Receive(ctx context.Context, msgHandler func(msg *proto.Mes
|
||||
// Guard the receive direction: the transport can stay healthy while the
|
||||
// server stops delivering messages. The watchdog reconnects via cancelStream.
|
||||
c.markReceived()
|
||||
go c.watchReceiveStream(streamCtx, cancelStream)
|
||||
c.watchdogWg.Add(1)
|
||||
go func() {
|
||||
defer c.watchdogWg.Done()
|
||||
c.watchReceiveStream(streamCtx, cancelStream)
|
||||
}()
|
||||
|
||||
// start receiving messages from the Signal stream (from other peers through signal)
|
||||
err = c.receive(stream)
|
||||
|
||||
cancelStream()
|
||||
c.watchdogWg.Wait()
|
||||
|
||||
if err != nil {
|
||||
// Check the parent context, not streamCtx: a watchdog-triggered
|
||||
// cancelStream must reconnect, only a parent cancel is shutdown.
|
||||
@@ -400,7 +409,12 @@ func (c *GrpcClient) encryptMessage(msg *proto.Message) (*proto.EncryptedMessage
|
||||
|
||||
// Send sends a message to the remote Peer through the Signal Exchange.
|
||||
func (c *GrpcClient) Send(msg *proto.Message) error {
|
||||
return c.send(c.ctx, msg)
|
||||
}
|
||||
|
||||
// send delivers a message deriving per-attempt timeouts from parentCtx, so a
|
||||
// caller can abort an in-flight send by cancelling that context.
|
||||
func (c *GrpcClient) send(parentCtx context.Context, msg *proto.Message) error {
|
||||
if !c.Ready() {
|
||||
return fmt.Errorf("no connection to signal")
|
||||
}
|
||||
@@ -416,7 +430,7 @@ func (c *GrpcClient) Send(msg *proto.Message) error {
|
||||
if attempt > 1 {
|
||||
attemptTimeout = time.Duration(attempt) * 5 * time.Second
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(c.ctx, attemptTimeout)
|
||||
ctx, cancel := context.WithTimeout(parentCtx, attemptTimeout)
|
||||
|
||||
_, err = c.realClient.Send(ctx, encryptedMessage)
|
||||
|
||||
@@ -486,7 +500,7 @@ func (c *GrpcClient) watchReceiveStream(ctx context.Context, cancelStream contex
|
||||
}
|
||||
|
||||
if probeSentAt.IsZero() {
|
||||
if err := c.sendReceiveProbe(); err != nil {
|
||||
if err := c.sendReceiveProbe(ctx); err != nil {
|
||||
log.Debugf("failed to send signal receive probe: %v", err)
|
||||
}
|
||||
probeSentAt = time.Now()
|
||||
@@ -495,11 +509,13 @@ func (c *GrpcClient) watchReceiveStream(ctx context.Context, cancelStream contex
|
||||
}
|
||||
}
|
||||
|
||||
// sendReceiveProbe sends a self-addressed heartbeat. The Signal server routes it
|
||||
// back to this client, exercising the exact receive path the watchdog guards.
|
||||
func (c *GrpcClient) sendReceiveProbe() error {
|
||||
// sendReceiveProbe sends a self-addressed heartbeat bound to ctx, so cancelStream
|
||||
// aborts an in-flight probe instead of leaving the watchdog blocked on send timeouts.
|
||||
// The Signal server routes it back to this client, exercising the exact receive
|
||||
// path the watchdog guards.
|
||||
func (c *GrpcClient) sendReceiveProbe(ctx context.Context) error {
|
||||
self := c.key.PublicKey().String()
|
||||
return c.Send(&proto.Message{
|
||||
return c.send(ctx, &proto.Message{
|
||||
Key: self,
|
||||
RemoteKey: self,
|
||||
Body: &proto.Body{Type: proto.Body_HEARTBEAT},
|
||||
@@ -541,6 +557,9 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) er
|
||||
if err := c.decryptionWorker.AddMsg(c.ctx, msg); err != nil {
|
||||
log.Errorf("failed to add message to decryption worker: %v", err)
|
||||
}
|
||||
// Refresh liveness before clearing the flag so the window between here and
|
||||
// the next Recv does not read a stale timestamp as a dead stream.
|
||||
c.markReceived()
|
||||
c.receiveHandoffBlocked.Store(false)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -74,7 +75,7 @@ func TestReceiveProbeRoundTrips(t *testing.T) {
|
||||
t.Fatal("signal stream did not connect within timeout")
|
||||
}
|
||||
|
||||
require.NoError(t, client.sendReceiveProbe())
|
||||
require.NoError(t, client.sendReceiveProbe(ctx))
|
||||
|
||||
select {
|
||||
case <-received:
|
||||
@@ -106,3 +107,72 @@ func TestReceiveAliveTreatsHandoffBlockAsLiveness(t *testing.T) {
|
||||
c.markReceived()
|
||||
require.True(t, c.receiveAlive(), "a freshly received frame must keep the stream alive")
|
||||
}
|
||||
|
||||
// fakeRecvStream feeds the receive loop frames from a channel and reports EOF
|
||||
// once the channel is closed. Only Recv is exercised by the loop.
|
||||
type fakeRecvStream struct {
|
||||
sigProto.SignalExchange_ConnectStreamClient
|
||||
frames chan *sigProto.EncryptedMessage
|
||||
}
|
||||
|
||||
func (s *fakeRecvStream) Recv() (*sigProto.EncryptedMessage, error) {
|
||||
msg, ok := <-s.frames
|
||||
if !ok {
|
||||
return nil, io.EOF
|
||||
}
|
||||
return msg, nil
|
||||
}
|
||||
|
||||
// TestReceiveLoopRefreshesLivenessAfterBlockedHandoff drives the real receive
|
||||
// loop into a handoff that blocks past the inactivity threshold, then checks the
|
||||
// window after the handoff drains but before the next Recv. The loop must have
|
||||
// refreshed the timestamp on unblocking, otherwise that window reads the stale
|
||||
// pre-handoff timestamp as a dead stream and the watchdog tears down a healthy
|
||||
// connection.
|
||||
func TestReceiveLoopRefreshesLivenessAfterBlockedHandoff(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
c := &GrpcClient{ctx: ctx}
|
||||
|
||||
handling := make(chan struct{}, 8)
|
||||
gate := make(chan struct{})
|
||||
decrypt := func(*sigProto.EncryptedMessage) (*sigProto.Message, error) { return &sigProto.Message{}, nil }
|
||||
handler := func(*sigProto.Message) error {
|
||||
handling <- struct{}{}
|
||||
<-gate
|
||||
return nil
|
||||
}
|
||||
c.decryptionWorker = NewWorker(decrypt, handler)
|
||||
workerCtx, workerCancel := context.WithCancel(context.Background())
|
||||
go c.decryptionWorker.Work(workerCtx)
|
||||
t.Cleanup(workerCancel)
|
||||
|
||||
frames := make(chan *sigProto.EncryptedMessage)
|
||||
t.Cleanup(func() { close(frames) })
|
||||
go func() { _ = c.receive(&fakeRecvStream{frames: frames}) }()
|
||||
|
||||
// First frame: the worker drains it and parks in the blocking handler.
|
||||
frames <- &sigProto.EncryptedMessage{}
|
||||
<-handling
|
||||
// Second frame fills the worker's single-slot pool.
|
||||
frames <- &sigProto.EncryptedMessage{}
|
||||
// Third frame: the pool is full, so the loop parks on the handoff.
|
||||
frames <- &sigProto.EncryptedMessage{}
|
||||
|
||||
require.Eventually(t, c.receiveHandoffBlocked.Load, time.Second, time.Millisecond,
|
||||
"receive loop should park on the worker handoff")
|
||||
|
||||
// Simulate the handoff having blocked past the inactivity threshold.
|
||||
c.lastReceived.Store(time.Now().Add(-2 * receiveInactivityThreshold).UnixNano())
|
||||
require.True(t, c.receiveAlive(), "a loop parked on the handoff must stay alive")
|
||||
|
||||
// Drain the worker so the handoff returns and the loop resumes reading.
|
||||
close(gate)
|
||||
|
||||
// Once the handoff clears, the loop is parked on the next Recv with no frame
|
||||
// pending. The stream must still read as alive in that window.
|
||||
require.Eventually(t, func() bool { return !c.receiveHandoffBlocked.Load() }, time.Second, time.Millisecond,
|
||||
"handoff should drain once the worker is released")
|
||||
require.True(t, c.receiveAlive(),
|
||||
"the loop must refresh liveness when the handoff drains, before the next Recv")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user