mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-21 09:46:40 +00:00
Compare commits
7 Commits
coderabbit
...
relay/2800
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
95ad62ea85 | ||
|
|
5b4aed781f | ||
|
|
276b6126c1 | ||
|
|
b23169de63 | ||
|
|
b82b4a07fc | ||
|
|
812b08c473 | ||
|
|
8f5cefaf3a |
@@ -230,7 +230,8 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHold
|
|||||||
c.statusRecorder.MarkSignalConnected()
|
c.statusRecorder.MarkSignalConnected()
|
||||||
|
|
||||||
relayURLs, token := parseRelayInfo(loginResp)
|
relayURLs, token := parseRelayInfo(loginResp)
|
||||||
relayManager := relayClient.NewManager(engineCtx, relayURLs, myPrivateKey.PublicKey().String())
|
relayManager := relayClient.NewManager(engineCtx, []string{"rels://temp-relay-quic.relay.netbird.io:443"}, myPrivateKey.PublicKey().String())
|
||||||
|
c.statusRecorder.SetRelayMgr(relayManager)
|
||||||
if len(relayURLs) > 0 {
|
if len(relayURLs) > 0 {
|
||||||
if token != nil {
|
if token != nil {
|
||||||
if err := relayManager.UpdateToken(token); err != nil {
|
if err := relayManager.UpdateToken(token); err != nil {
|
||||||
@@ -241,9 +242,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHold
|
|||||||
log.Infof("connecting to the Relay service(s): %s", strings.Join(relayURLs, ", "))
|
log.Infof("connecting to the Relay service(s): %s", strings.Join(relayURLs, ", "))
|
||||||
if err = relayManager.Serve(); err != nil {
|
if err = relayManager.Serve(); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return wrapErr(err)
|
|
||||||
}
|
}
|
||||||
c.statusRecorder.SetRelayMgr(relayManager)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
peerConfig := loginResp.GetPeerConfig()
|
peerConfig := loginResp.GetPeerConfig()
|
||||||
|
|||||||
@@ -537,6 +537,7 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
|||||||
|
|
||||||
relayMsg := wCfg.GetRelay()
|
relayMsg := wCfg.GetRelay()
|
||||||
if relayMsg != nil {
|
if relayMsg != nil {
|
||||||
|
// when we receive token we expect valid address list too
|
||||||
c := &auth.Token{
|
c := &auth.Token{
|
||||||
Payload: relayMsg.GetTokenPayload(),
|
Payload: relayMsg.GetTokenPayload(),
|
||||||
Signature: relayMsg.GetTokenSignature(),
|
Signature: relayMsg.GetTokenSignature(),
|
||||||
@@ -545,9 +546,16 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
|||||||
log.Errorf("failed to update relay token: %v", err)
|
log.Errorf("failed to update relay token: %v", err)
|
||||||
return fmt.Errorf("update relay token: %w", err)
|
return fmt.Errorf("update relay token: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.relayManager.UpdateServerURLs([]string{"rels://temp-relay-quic.relay.netbird.io:443"})
|
||||||
|
|
||||||
|
// Just in case the agent started with an MGM server where the relay was disabled but was later enabled.
|
||||||
|
// We can ignore all errors because the guard will manage the reconnection retries.
|
||||||
|
_ = e.relayManager.Serve()
|
||||||
|
} else {
|
||||||
|
e.relayManager.UpdateServerURLs(nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo update relay address in the relay manager
|
|
||||||
// todo update signal
|
// todo update signal
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -676,25 +676,23 @@ func (d *Status) GetRelayStates() []relay.ProbeResult {
|
|||||||
// extend the list of stun, turn servers with relay address
|
// extend the list of stun, turn servers with relay address
|
||||||
relayStates := slices.Clone(d.relayStates)
|
relayStates := slices.Clone(d.relayStates)
|
||||||
|
|
||||||
var relayState relay.ProbeResult
|
|
||||||
|
|
||||||
// if the server connection is not established then we will use the general address
|
// if the server connection is not established then we will use the general address
|
||||||
// in case of connection we will use the instance specific address
|
// in case of connection we will use the instance specific address
|
||||||
instanceAddr, err := d.relayMgr.RelayInstanceAddress()
|
instanceAddr, err := d.relayMgr.RelayInstanceAddress()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO add their status
|
// TODO add their status
|
||||||
if errors.Is(err, relayClient.ErrRelayClientNotConnected) {
|
|
||||||
for _, r := range d.relayMgr.ServerURLs() {
|
for _, r := range d.relayMgr.ServerURLs() {
|
||||||
relayStates = append(relayStates, relay.ProbeResult{
|
relayStates = append(relayStates, relay.ProbeResult{
|
||||||
URI: r,
|
URI: r,
|
||||||
|
Err: err,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return relayStates
|
return relayStates
|
||||||
}
|
}
|
||||||
relayState.Err = err
|
|
||||||
}
|
|
||||||
|
|
||||||
relayState.URI = instanceAddr
|
relayState := relay.ProbeResult{
|
||||||
|
URI: instanceAddr,
|
||||||
|
}
|
||||||
return append(relayStates, relayState)
|
return append(relayStates, relayState)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -46,8 +46,6 @@ type WorkerICE struct {
|
|||||||
hasRelayOnLocally bool
|
hasRelayOnLocally bool
|
||||||
conn WorkerICECallbacks
|
conn WorkerICECallbacks
|
||||||
|
|
||||||
selectedPriority ConnPriority
|
|
||||||
|
|
||||||
agent *ice.Agent
|
agent *ice.Agent
|
||||||
muxAgent sync.Mutex
|
muxAgent sync.Mutex
|
||||||
|
|
||||||
@@ -95,10 +93,8 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
|||||||
|
|
||||||
var preferredCandidateTypes []ice.CandidateType
|
var preferredCandidateTypes []ice.CandidateType
|
||||||
if w.hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" {
|
if w.hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" {
|
||||||
w.selectedPriority = connPriorityICEP2P
|
|
||||||
preferredCandidateTypes = icemaker.CandidateTypesP2P()
|
preferredCandidateTypes = icemaker.CandidateTypesP2P()
|
||||||
} else {
|
} else {
|
||||||
w.selectedPriority = connPriorityICETurn
|
|
||||||
preferredCandidateTypes = icemaker.CandidateTypes()
|
preferredCandidateTypes = icemaker.CandidateTypes()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -159,7 +155,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
|||||||
RelayedOnLocal: isRelayCandidate(pair.Local),
|
RelayedOnLocal: isRelayCandidate(pair.Local),
|
||||||
}
|
}
|
||||||
w.log.Debugf("on ICE conn read to use ready")
|
w.log.Debugf("on ICE conn read to use ready")
|
||||||
go w.conn.OnConnReady(w.selectedPriority, ci)
|
go w.conn.OnConnReady(selectedPriority(pair), ci)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
|
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
|
||||||
@@ -394,3 +390,11 @@ func isRelayed(pair *ice.CandidatePair) bool {
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func selectedPriority(pair *ice.CandidatePair) ConnPriority {
|
||||||
|
if isRelayed(pair) {
|
||||||
|
return connPriorityICETurn
|
||||||
|
} else {
|
||||||
|
return connPriorityICEP2P
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
4
go.mod
4
go.mod
@@ -71,6 +71,7 @@ require (
|
|||||||
github.com/pion/transport/v3 v3.0.1
|
github.com/pion/transport/v3 v3.0.1
|
||||||
github.com/pion/turn/v3 v3.0.1
|
github.com/pion/turn/v3 v3.0.1
|
||||||
github.com/prometheus/client_golang v1.19.1
|
github.com/prometheus/client_golang v1.19.1
|
||||||
|
github.com/quic-go/quic-go v0.48.1
|
||||||
github.com/rs/xid v1.3.0
|
github.com/rs/xid v1.3.0
|
||||||
github.com/shirou/gopsutil/v3 v3.24.4
|
github.com/shirou/gopsutil/v3 v3.24.4
|
||||||
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
|
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
|
||||||
@@ -151,11 +152,13 @@ require (
|
|||||||
github.com/go-logr/stdr v1.2.2 // indirect
|
github.com/go-logr/stdr v1.2.2 // indirect
|
||||||
github.com/go-ole/go-ole v1.3.0 // indirect
|
github.com/go-ole/go-ole v1.3.0 // indirect
|
||||||
github.com/go-redis/redis/v8 v8.11.5 // indirect
|
github.com/go-redis/redis/v8 v8.11.5 // indirect
|
||||||
|
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
|
||||||
github.com/go-text/render v0.1.0 // indirect
|
github.com/go-text/render v0.1.0 // indirect
|
||||||
github.com/go-text/typesetting v0.1.0 // indirect
|
github.com/go-text/typesetting v0.1.0 // indirect
|
||||||
github.com/gogo/protobuf v1.3.2 // indirect
|
github.com/gogo/protobuf v1.3.2 // indirect
|
||||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||||
github.com/google/btree v1.1.2 // indirect
|
github.com/google/btree v1.1.2 // indirect
|
||||||
|
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect
|
||||||
github.com/google/s2a-go v0.1.7 // indirect
|
github.com/google/s2a-go v0.1.7 // indirect
|
||||||
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
|
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
|
||||||
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
|
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
|
||||||
@@ -216,6 +219,7 @@ require (
|
|||||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect
|
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect
|
||||||
go.opentelemetry.io/otel/sdk v1.26.0 // indirect
|
go.opentelemetry.io/otel/sdk v1.26.0 // indirect
|
||||||
go.opentelemetry.io/otel/trace v1.26.0 // indirect
|
go.opentelemetry.io/otel/trace v1.26.0 // indirect
|
||||||
|
go.uber.org/mock v0.4.0 // indirect
|
||||||
go.uber.org/multierr v1.11.0 // indirect
|
go.uber.org/multierr v1.11.0 // indirect
|
||||||
golang.org/x/image v0.18.0 // indirect
|
golang.org/x/image v0.18.0 // indirect
|
||||||
golang.org/x/mod v0.17.0 // indirect
|
golang.org/x/mod v0.17.0 // indirect
|
||||||
|
|||||||
6
go.sum
6
go.sum
@@ -400,6 +400,7 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J
|
|||||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||||
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
||||||
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
||||||
|
github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
|
||||||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||||
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
|
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
|
||||||
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
|
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
|
||||||
@@ -605,6 +606,8 @@ github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+a
|
|||||||
github.com/prometheus/common v0.53.0/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U=
|
github.com/prometheus/common v0.53.0/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U=
|
||||||
github.com/prometheus/procfs v0.15.0 h1:A82kmvXJq2jTu5YUhSGNlYoxh85zLnKgPz4bMZgI5Ek=
|
github.com/prometheus/procfs v0.15.0 h1:A82kmvXJq2jTu5YUhSGNlYoxh85zLnKgPz4bMZgI5Ek=
|
||||||
github.com/prometheus/procfs v0.15.0/go.mod h1:Y0RJ/Y5g5wJpkTisOtqwDSo4HwhGmLB4VQSw2sQJLHk=
|
github.com/prometheus/procfs v0.15.0/go.mod h1:Y0RJ/Y5g5wJpkTisOtqwDSo4HwhGmLB4VQSw2sQJLHk=
|
||||||
|
github.com/quic-go/quic-go v0.48.1 h1:y/8xmfWI9qmGTc+lBr4jKRUWLGSlSigv847ULJ4hYXA=
|
||||||
|
github.com/quic-go/quic-go v0.48.1/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs=
|
||||||
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
||||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||||
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
|
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
|
||||||
@@ -753,6 +756,8 @@ go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v8
|
|||||||
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||||
|
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
|
||||||
|
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
|
||||||
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
|
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
|
||||||
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
||||||
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
||||||
@@ -963,6 +968,7 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||||||
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
auth "github.com/netbirdio/netbird/relay/auth/hmac"
|
auth "github.com/netbirdio/netbird/relay/auth/hmac"
|
||||||
|
"github.com/netbirdio/netbird/relay/client/dialer/quic"
|
||||||
"github.com/netbirdio/netbird/relay/client/dialer/ws"
|
"github.com/netbirdio/netbird/relay/client/dialer/ws"
|
||||||
"github.com/netbirdio/netbird/relay/healthcheck"
|
"github.com/netbirdio/netbird/relay/healthcheck"
|
||||||
"github.com/netbirdio/netbird/relay/messages"
|
"github.com/netbirdio/netbird/relay/messages"
|
||||||
@@ -93,10 +94,6 @@ func (cc *connContainer) writeMsg(msg Msg) {
|
|||||||
case cc.messages <- msg:
|
case cc.messages <- msg:
|
||||||
case <-cc.ctx.Done():
|
case <-cc.ctx.Done():
|
||||||
msg.Free()
|
msg.Free()
|
||||||
default:
|
|
||||||
msg.Free()
|
|
||||||
cc.log.Infof("message queue is full")
|
|
||||||
// todo consider to close the connection
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -140,7 +137,7 @@ type Client struct {
|
|||||||
instanceURL *RelayAddr
|
instanceURL *RelayAddr
|
||||||
muInstanceURL sync.Mutex
|
muInstanceURL sync.Mutex
|
||||||
|
|
||||||
onDisconnectListener func()
|
onDisconnectListener func(string)
|
||||||
onConnectedListener func()
|
onConnectedListener func()
|
||||||
listenerMutex sync.Mutex
|
listenerMutex sync.Mutex
|
||||||
}
|
}
|
||||||
@@ -179,8 +176,7 @@ func (c *Client) Connect() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.connect()
|
if err := c.connect(); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -233,7 +229,7 @@ func (c *Client) ServerInstanceURL() (string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SetOnDisconnectListener sets a function that will be called when the connection to the relay server is closed.
|
// SetOnDisconnectListener sets a function that will be called when the connection to the relay server is closed.
|
||||||
func (c *Client) SetOnDisconnectListener(fn func()) {
|
func (c *Client) SetOnDisconnectListener(fn func(string)) {
|
||||||
c.listenerMutex.Lock()
|
c.listenerMutex.Lock()
|
||||||
defer c.listenerMutex.Unlock()
|
defer c.listenerMutex.Unlock()
|
||||||
c.onDisconnectListener = fn
|
c.onDisconnectListener = fn
|
||||||
@@ -264,14 +260,20 @@ func (c *Client) Close() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) connect() error {
|
func (c *Client) connect() error {
|
||||||
conn, err := ws.Dial(c.connectionURL)
|
var conn net.Conn
|
||||||
|
var err error
|
||||||
|
if c.connectionURL == "rels://temp-relay-quic.relay.netbird.io:443" {
|
||||||
|
log.Infof("connecting to relay server %s using quic protocol", c.connectionURL)
|
||||||
|
conn, err = quic.Dial(c.connectionURL)
|
||||||
|
} else {
|
||||||
|
conn, err = ws.Dial(c.connectionURL)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c.relayConn = conn
|
c.relayConn = conn
|
||||||
|
|
||||||
err = c.handShake()
|
if err = c.handShake(); err != nil {
|
||||||
if err != nil {
|
|
||||||
cErr := conn.Close()
|
cErr := conn.Close()
|
||||||
if cErr != nil {
|
if cErr != nil {
|
||||||
c.log.Errorf("failed to close connection: %s", cErr)
|
c.log.Errorf("failed to close connection: %s", cErr)
|
||||||
@@ -345,7 +347,7 @@ func (c *Client) readLoop(relayConn net.Conn) {
|
|||||||
c.log.Infof("start to Relay read loop exit")
|
c.log.Infof("start to Relay read loop exit")
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
if c.serviceIsRunning && !internallyStoppedFlag.isSet() {
|
if c.serviceIsRunning && !internallyStoppedFlag.isSet() {
|
||||||
c.log.Debugf("failed to read message from relay server: %s", errExit)
|
c.log.Errorf("failed to read message from relay server: %s", errExit)
|
||||||
}
|
}
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
break
|
break
|
||||||
@@ -554,7 +556,7 @@ func (c *Client) notifyDisconnected() {
|
|||||||
if c.onDisconnectListener == nil {
|
if c.onDisconnectListener == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
go c.onDisconnectListener()
|
go c.onDisconnectListener(c.connectionURL)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) notifyConnected() {
|
func (c *Client) notifyConnected() {
|
||||||
|
|||||||
@@ -551,7 +551,7 @@ func TestCloseByServer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
disconnected := make(chan struct{})
|
disconnected := make(chan struct{})
|
||||||
relayClient.SetOnDisconnectListener(func() {
|
relayClient.SetOnDisconnectListener(func(_ string) {
|
||||||
log.Infof("client disconnected")
|
log.Infof("client disconnected")
|
||||||
close(disconnected)
|
close(disconnected)
|
||||||
})
|
})
|
||||||
|
|||||||
87
relay/client/dialer/quic/conn.go
Normal file
87
relay/client/dialer/quic/conn.go
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
package quic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/quic-go/quic-go"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type QuicAddr struct {
|
||||||
|
addr string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a QuicAddr) Network() string {
|
||||||
|
return "quic"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a QuicAddr) String() string {
|
||||||
|
return a.addr
|
||||||
|
}
|
||||||
|
|
||||||
|
type Conn struct {
|
||||||
|
session quic.Connection
|
||||||
|
remoteAddr QuicAddr
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConn(session quic.Connection, serverAddress string) net.Conn {
|
||||||
|
return &Conn{
|
||||||
|
session: session,
|
||||||
|
remoteAddr: QuicAddr{addr: serverAddress},
|
||||||
|
ctx: context.Background(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) Read(b []byte) (n int, err error) {
|
||||||
|
dgram, err := c.session.ReceiveDatagram(c.ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to read from QUIC session: %v", err)
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
n = copy(b, dgram)
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) Write(b []byte) (int, error) {
|
||||||
|
err := c.session.SendDatagram(b)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to write to QUIC stream: %v", err)
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return len(b), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) RemoteAddr() net.Addr {
|
||||||
|
if c.session != nil {
|
||||||
|
return c.session.RemoteAddr()
|
||||||
|
}
|
||||||
|
return c.remoteAddr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) LocalAddr() net.Addr {
|
||||||
|
if c.session != nil {
|
||||||
|
return c.session.LocalAddr()
|
||||||
|
}
|
||||||
|
return QuicAddr{addr: "unknown"}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) SetReadDeadline(t time.Time) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) SetWriteDeadline(t time.Time) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) SetDeadline(t time.Time) error {
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) Close() error {
|
||||||
|
return c.session.CloseWithError(0, "normal closure")
|
||||||
|
}
|
||||||
60
relay/client/dialer/quic/quic.go
Normal file
60
relay/client/dialer/quic/quic.go
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
package quic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/quic-go/quic-go"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
dialTimeout = 30 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
func Dial(address string) (net.Conn, error) {
|
||||||
|
quicURL, err := prepareURL(address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
tlsConf := &tls.Config{
|
||||||
|
InsecureSkipVerify: true, // Set to true only for testing
|
||||||
|
NextProtos: []string{"h2"}, // Ensure this matches the server's ALPN
|
||||||
|
}
|
||||||
|
|
||||||
|
quicConfig := &quic.Config{
|
||||||
|
KeepAlivePeriod: 15 * time.Second,
|
||||||
|
MaxIdleTimeout: 60 * time.Second,
|
||||||
|
EnableDatagrams: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo add support for custom dialer
|
||||||
|
|
||||||
|
session, err := quic.DialAddr(ctx, quicURL, tlsConf, quicConfig)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to dial to Relay server via QUIC '%s': %s", quicURL, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
conn := NewConn(session, address)
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareURL(address string) (string, error) {
|
||||||
|
if !strings.HasPrefix(address, "rel://") && !strings.HasPrefix(address, "rels://") {
|
||||||
|
return "", fmt.Errorf("unsupported scheme: %s", address)
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.HasPrefix(address, "rels://") {
|
||||||
|
return address[7:], nil
|
||||||
|
}
|
||||||
|
return address[6:], nil
|
||||||
|
}
|
||||||
@@ -2,6 +2,7 @@ package ws
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
@@ -59,6 +60,10 @@ func httpClientNbDialer() *http.Client {
|
|||||||
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||||
return customDialer.DialContext(ctx, network, addr)
|
return customDialer.DialContext(ctx, network, addr)
|
||||||
},
|
},
|
||||||
|
// Set up a TLS configuration that skips certificate verification
|
||||||
|
TLSClientConfig: &tls.Config{
|
||||||
|
InsecureSkipVerify: true, // This accepts invalid TLS certificates
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
return &http.Client{
|
return &http.Client{
|
||||||
|
|||||||
@@ -4,65 +4,120 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/cenkalti/backoff/v4"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
reconnectingTimeout = 5 * time.Second
|
reconnectingTimeout = 60 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// Guard manage the reconnection tries to the Relay server in case of disconnection event.
|
// Guard manage the reconnection tries to the Relay server in case of disconnection event.
|
||||||
type Guard struct {
|
type Guard struct {
|
||||||
ctx context.Context
|
// OnNewRelayClient is a channel that is used to notify the relay client about a new relay client instance.
|
||||||
relayClient *Client
|
OnNewRelayClient chan *Client
|
||||||
|
serverPicker *ServerPicker
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGuard creates a new guard for the relay client.
|
// NewGuard creates a new guard for the relay client.
|
||||||
func NewGuard(context context.Context, relayClient *Client) *Guard {
|
func NewGuard(sp *ServerPicker) *Guard {
|
||||||
g := &Guard{
|
g := &Guard{
|
||||||
ctx: context,
|
OnNewRelayClient: make(chan *Client, 1),
|
||||||
relayClient: relayClient,
|
serverPicker: sp,
|
||||||
}
|
}
|
||||||
return g
|
return g
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnDisconnected is called when the relay client is disconnected from the relay server. It will trigger the reconnection
|
// StartReconnectTrys is called when the relay client is disconnected from the relay server.
|
||||||
|
// It attempts to reconnect to the relay server. The function first tries a quick reconnect
|
||||||
|
// to the same server that was used before, if the server URL is still valid. If the quick
|
||||||
|
// reconnect fails, it starts a ticker to periodically attempt server picking until it
|
||||||
|
// succeeds or the context is done.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - ctx: The context to control the lifecycle of the reconnection attempts.
|
||||||
|
// - relayClient: The relay client instance that was disconnected.
|
||||||
// todo prevent multiple reconnection instances. In the current usage it should not happen, but it is better to prevent
|
// todo prevent multiple reconnection instances. In the current usage it should not happen, but it is better to prevent
|
||||||
func (g *Guard) OnDisconnected() {
|
func (g *Guard) StartReconnectTrys(ctx context.Context, relayClient *Client) {
|
||||||
if g.quickReconnect() {
|
if relayClient == nil {
|
||||||
|
goto RETRY
|
||||||
|
}
|
||||||
|
if g.isServerURLStillValid(relayClient) && g.quickReconnect(ctx, relayClient) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ticker := time.NewTicker(reconnectingTimeout)
|
RETRY:
|
||||||
|
ticker := exponentTicker(ctx)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
err := g.relayClient.Connect()
|
if err := g.retry(ctx); err != nil {
|
||||||
if err != nil {
|
log.Errorf("failed to pick new Relay server: %s", err)
|
||||||
log.Errorf("failed to reconnect to relay server: %s", err)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
case <-g.ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Guard) quickReconnect() bool {
|
func (g *Guard) retry(ctx context.Context) error {
|
||||||
ctx, cancel := context.WithTimeout(g.ctx, 1500*time.Millisecond)
|
log.Infof("try to pick up a new Relay server")
|
||||||
|
relayClient, err := g.serverPicker.PickServer(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// prevent to work with a deprecated Relay client instance
|
||||||
|
g.drainRelayClientChan()
|
||||||
|
|
||||||
|
g.OnNewRelayClient <- relayClient
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *Guard) quickReconnect(parentCtx context.Context, rc *Client) bool {
|
||||||
|
ctx, cancel := context.WithTimeout(parentCtx, 1500*time.Millisecond)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
|
|
||||||
if g.ctx.Err() != nil {
|
if parentCtx.Err() != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
log.Infof("try to reconnect to Relay server: %s", rc.connectionURL)
|
||||||
|
|
||||||
if err := g.relayClient.Connect(); err != nil {
|
if err := rc.Connect(); err != nil {
|
||||||
log.Errorf("failed to reconnect to relay server: %s", err)
|
log.Errorf("failed to reconnect to relay server: %s", err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *Guard) drainRelayClientChan() {
|
||||||
|
select {
|
||||||
|
case <-g.OnNewRelayClient:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *Guard) isServerURLStillValid(rc *Client) bool {
|
||||||
|
for _, url := range g.serverPicker.ServerURLs.Load().([]string) {
|
||||||
|
if url == rc.connectionURL {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func exponentTicker(ctx context.Context) *backoff.Ticker {
|
||||||
|
bo := backoff.WithContext(&backoff.ExponentialBackOff{
|
||||||
|
InitialInterval: 2 * time.Second,
|
||||||
|
Multiplier: 2,
|
||||||
|
MaxInterval: reconnectingTimeout,
|
||||||
|
Clock: backoff.SystemClock,
|
||||||
|
}, ctx)
|
||||||
|
|
||||||
|
return backoff.NewTicker(bo)
|
||||||
|
}
|
||||||
|
|||||||
@@ -58,11 +58,14 @@ type ManagerService interface {
|
|||||||
// unused relay connection and close it.
|
// unused relay connection and close it.
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
serverURLs []string
|
|
||||||
peerID string
|
peerID string
|
||||||
|
running bool
|
||||||
tokenStore *relayAuth.TokenStore
|
tokenStore *relayAuth.TokenStore
|
||||||
|
serverPicker *ServerPicker
|
||||||
|
|
||||||
relayClient *Client
|
relayClient *Client
|
||||||
|
// the guard logic can overwrite the relayClient variable, this mutex protect the usage of the variable
|
||||||
|
relayClientMu sync.Mutex
|
||||||
reconnectGuard *Guard
|
reconnectGuard *Guard
|
||||||
|
|
||||||
relayClients map[string]*RelayTrack
|
relayClients map[string]*RelayTrack
|
||||||
@@ -76,48 +79,54 @@ type Manager struct {
|
|||||||
// NewManager creates a new manager instance.
|
// NewManager creates a new manager instance.
|
||||||
// The serverURL address can be empty. In this case, the manager will not serve.
|
// The serverURL address can be empty. In this case, the manager will not serve.
|
||||||
func NewManager(ctx context.Context, serverURLs []string, peerID string) *Manager {
|
func NewManager(ctx context.Context, serverURLs []string, peerID string) *Manager {
|
||||||
return &Manager{
|
tokenStore := &relayAuth.TokenStore{}
|
||||||
|
|
||||||
|
m := &Manager{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
serverURLs: serverURLs,
|
|
||||||
peerID: peerID,
|
peerID: peerID,
|
||||||
tokenStore: &relayAuth.TokenStore{},
|
tokenStore: tokenStore,
|
||||||
|
serverPicker: &ServerPicker{
|
||||||
|
TokenStore: tokenStore,
|
||||||
|
PeerID: peerID,
|
||||||
|
},
|
||||||
relayClients: make(map[string]*RelayTrack),
|
relayClients: make(map[string]*RelayTrack),
|
||||||
onDisconnectedListeners: make(map[string]*list.List),
|
onDisconnectedListeners: make(map[string]*list.List),
|
||||||
}
|
}
|
||||||
|
m.serverPicker.ServerURLs.Store(serverURLs)
|
||||||
|
m.reconnectGuard = NewGuard(m.serverPicker)
|
||||||
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serve starts the manager. It will establish a connection to the relay server and start the relay cleanup loop for
|
// Serve starts the manager, attempting to establish a connection with the relay server.
|
||||||
// the unused relay connections. The manager will automatically reconnect to the relay server in case of disconnection.
|
// If the connection fails, it will keep trying to reconnect in the background.
|
||||||
|
// Additionally, it starts a cleanup loop to remove unused relay connections.
|
||||||
|
// The manager will automatically reconnect to the relay server in case of disconnection.
|
||||||
func (m *Manager) Serve() error {
|
func (m *Manager) Serve() error {
|
||||||
if m.relayClient != nil {
|
if m.running {
|
||||||
return fmt.Errorf("manager already serving")
|
return fmt.Errorf("manager already serving")
|
||||||
}
|
}
|
||||||
log.Debugf("starting relay client manager with %v relay servers", m.serverURLs)
|
m.running = true
|
||||||
|
log.Debugf("starting relay client manager with %v relay servers", m.serverPicker.ServerURLs.Load())
|
||||||
|
|
||||||
sp := ServerPicker{
|
client, err := m.serverPicker.PickServer(m.ctx)
|
||||||
TokenStore: m.tokenStore,
|
|
||||||
PeerID: m.peerID,
|
|
||||||
}
|
|
||||||
|
|
||||||
client, err := sp.PickServer(m.ctx, m.serverURLs)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
go m.reconnectGuard.StartReconnectTrys(m.ctx, nil)
|
||||||
|
} else {
|
||||||
|
m.storeClient(client)
|
||||||
}
|
}
|
||||||
m.relayClient = client
|
|
||||||
|
|
||||||
m.reconnectGuard = NewGuard(m.ctx, m.relayClient)
|
go m.listenGuardEvent(m.ctx)
|
||||||
m.relayClient.SetOnConnectedListener(m.onServerConnected)
|
go m.startCleanupLoop()
|
||||||
m.relayClient.SetOnDisconnectListener(func() {
|
return err
|
||||||
m.onServerDisconnected(client.connectionURL)
|
|
||||||
})
|
|
||||||
m.startCleanupLoop()
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be
|
// OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be
|
||||||
// established via the relay server. If the peer is on a different relay server, the manager will establish a new
|
// established via the relay server. If the peer is on a different relay server, the manager will establish a new
|
||||||
// connection to the relay server. It returns back with a net.Conn what represent the remote peer connection.
|
// connection to the relay server. It returns back with a net.Conn what represent the remote peer connection.
|
||||||
func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) {
|
func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) {
|
||||||
|
m.relayClientMu.Lock()
|
||||||
|
defer m.relayClientMu.Unlock()
|
||||||
|
|
||||||
if m.relayClient == nil {
|
if m.relayClient == nil {
|
||||||
return nil, ErrRelayClientNotConnected
|
return nil, ErrRelayClientNotConnected
|
||||||
}
|
}
|
||||||
@@ -146,6 +155,9 @@ func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) {
|
|||||||
|
|
||||||
// Ready returns true if the home Relay client is connected to the relay server.
|
// Ready returns true if the home Relay client is connected to the relay server.
|
||||||
func (m *Manager) Ready() bool {
|
func (m *Manager) Ready() bool {
|
||||||
|
m.relayClientMu.Lock()
|
||||||
|
defer m.relayClientMu.Unlock()
|
||||||
|
|
||||||
if m.relayClient == nil {
|
if m.relayClient == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@@ -159,6 +171,13 @@ func (m *Manager) SetOnReconnectedListener(f func()) {
|
|||||||
// AddCloseListener adds a listener to the given server instance address. The listener will be called if the connection
|
// AddCloseListener adds a listener to the given server instance address. The listener will be called if the connection
|
||||||
// closed.
|
// closed.
|
||||||
func (m *Manager) AddCloseListener(serverAddress string, onClosedListener OnServerCloseListener) error {
|
func (m *Manager) AddCloseListener(serverAddress string, onClosedListener OnServerCloseListener) error {
|
||||||
|
m.relayClientMu.Lock()
|
||||||
|
defer m.relayClientMu.Unlock()
|
||||||
|
|
||||||
|
if m.relayClient == nil {
|
||||||
|
return ErrRelayClientNotConnected
|
||||||
|
}
|
||||||
|
|
||||||
foreign, err := m.isForeignServer(serverAddress)
|
foreign, err := m.isForeignServer(serverAddress)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -177,6 +196,9 @@ func (m *Manager) AddCloseListener(serverAddress string, onClosedListener OnServ
|
|||||||
// RelayInstanceAddress returns the address of the permanent relay server. It could change if the network connection is
|
// RelayInstanceAddress returns the address of the permanent relay server. It could change if the network connection is
|
||||||
// lost. This address will be sent to the target peer to choose the common relay server for the communication.
|
// lost. This address will be sent to the target peer to choose the common relay server for the communication.
|
||||||
func (m *Manager) RelayInstanceAddress() (string, error) {
|
func (m *Manager) RelayInstanceAddress() (string, error) {
|
||||||
|
m.relayClientMu.Lock()
|
||||||
|
defer m.relayClientMu.Unlock()
|
||||||
|
|
||||||
if m.relayClient == nil {
|
if m.relayClient == nil {
|
||||||
return "", ErrRelayClientNotConnected
|
return "", ErrRelayClientNotConnected
|
||||||
}
|
}
|
||||||
@@ -185,13 +207,18 @@ func (m *Manager) RelayInstanceAddress() (string, error) {
|
|||||||
|
|
||||||
// ServerURLs returns the addresses of the relay servers.
|
// ServerURLs returns the addresses of the relay servers.
|
||||||
func (m *Manager) ServerURLs() []string {
|
func (m *Manager) ServerURLs() []string {
|
||||||
return m.serverURLs
|
return m.serverPicker.ServerURLs.Load().([]string)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasRelayAddress returns true if the manager is serving. With this method can check if the peer can communicate with
|
// HasRelayAddress returns true if the manager is serving. With this method can check if the peer can communicate with
|
||||||
// Relay service.
|
// Relay service.
|
||||||
func (m *Manager) HasRelayAddress() bool {
|
func (m *Manager) HasRelayAddress() bool {
|
||||||
return len(m.serverURLs) > 0
|
return len(m.serverPicker.ServerURLs.Load().([]string)) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) UpdateServerURLs(serverURLs []string) {
|
||||||
|
log.Infof("update relay server URLs: %v", serverURLs)
|
||||||
|
m.serverPicker.ServerURLs.Store(serverURLs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateToken updates the token in the token store.
|
// UpdateToken updates the token in the token store.
|
||||||
@@ -245,9 +272,7 @@ func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// if connection closed then delete the relay client from the list
|
// if connection closed then delete the relay client from the list
|
||||||
relayClient.SetOnDisconnectListener(func() {
|
relayClient.SetOnDisconnectListener(m.onServerDisconnected)
|
||||||
m.onServerDisconnected(serverAddress)
|
|
||||||
})
|
|
||||||
rt.relayClient = relayClient
|
rt.relayClient = relayClient
|
||||||
rt.Unlock()
|
rt.Unlock()
|
||||||
|
|
||||||
@@ -265,14 +290,37 @@ func (m *Manager) onServerConnected() {
|
|||||||
go m.onReconnectedListenerFn()
|
go m.onReconnectedListenerFn()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// onServerDisconnected start to reconnection for home server only
|
||||||
func (m *Manager) onServerDisconnected(serverAddress string) {
|
func (m *Manager) onServerDisconnected(serverAddress string) {
|
||||||
|
m.relayClientMu.Lock()
|
||||||
if serverAddress == m.relayClient.connectionURL {
|
if serverAddress == m.relayClient.connectionURL {
|
||||||
go m.reconnectGuard.OnDisconnected()
|
go m.reconnectGuard.StartReconnectTrys(m.ctx, m.relayClient)
|
||||||
}
|
}
|
||||||
|
m.relayClientMu.Unlock()
|
||||||
|
|
||||||
m.notifyOnDisconnectListeners(serverAddress)
|
m.notifyOnDisconnectListeners(serverAddress)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Manager) listenGuardEvent(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case rc := <-m.reconnectGuard.OnNewRelayClient:
|
||||||
|
m.storeClient(rc)
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) storeClient(client *Client) {
|
||||||
|
m.relayClientMu.Lock()
|
||||||
|
defer m.relayClientMu.Unlock()
|
||||||
|
|
||||||
|
m.relayClient = client
|
||||||
|
m.relayClient.SetOnConnectedListener(m.onServerConnected)
|
||||||
|
m.relayClient.SetOnDisconnectListener(m.onServerDisconnected)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Manager) isForeignServer(address string) (bool, error) {
|
func (m *Manager) isForeignServer(address string) (bool, error) {
|
||||||
rAddr, err := m.relayClient.ServerInstanceURL()
|
rAddr, err := m.relayClient.ServerInstanceURL()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -282,12 +330,7 @@ func (m *Manager) isForeignServer(address string) (bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) startCleanupLoop() {
|
func (m *Manager) startCleanupLoop() {
|
||||||
if m.ctx.Err() != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ticker := time.NewTicker(relayCleanupInterval)
|
ticker := time.NewTicker(relayCleanupInterval)
|
||||||
go func() {
|
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@@ -297,7 +340,6 @@ func (m *Manager) startCleanupLoop() {
|
|||||||
m.cleanUpUnusedRelays()
|
m.cleanUpUnusedRelays()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) cleanUpUnusedRelays() {
|
func (m *Manager) cleanUpUnusedRelays() {
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@@ -12,10 +13,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
connectionTimeout = 30 * time.Second
|
|
||||||
maxConcurrentServers = 7
|
maxConcurrentServers = 7
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
connectionTimeout = 30 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
type connResult struct {
|
type connResult struct {
|
||||||
RelayClient *Client
|
RelayClient *Client
|
||||||
Url string
|
Url string
|
||||||
@@ -24,20 +28,22 @@ type connResult struct {
|
|||||||
|
|
||||||
type ServerPicker struct {
|
type ServerPicker struct {
|
||||||
TokenStore *auth.TokenStore
|
TokenStore *auth.TokenStore
|
||||||
|
ServerURLs atomic.Value
|
||||||
PeerID string
|
PeerID string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sp *ServerPicker) PickServer(parentCtx context.Context, urls []string) (*Client, error) {
|
func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) {
|
||||||
ctx, cancel := context.WithTimeout(parentCtx, connectionTimeout)
|
ctx, cancel := context.WithTimeout(parentCtx, connectionTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
totalServers := len(urls)
|
totalServers := len(sp.ServerURLs.Load().([]string))
|
||||||
|
|
||||||
connResultChan := make(chan connResult, totalServers)
|
connResultChan := make(chan connResult, totalServers)
|
||||||
successChan := make(chan connResult, 1)
|
successChan := make(chan connResult, 1)
|
||||||
concurrentLimiter := make(chan struct{}, maxConcurrentServers)
|
concurrentLimiter := make(chan struct{}, maxConcurrentServers)
|
||||||
|
|
||||||
for _, url := range urls {
|
log.Debugf("pick server from list: %v", sp.ServerURLs.Load().([]string))
|
||||||
|
for _, url := range sp.ServerURLs.Load().([]string) {
|
||||||
// todo check if we have a successful connection so we do not need to connect to other servers
|
// todo check if we have a successful connection so we do not need to connect to other servers
|
||||||
concurrentLimiter <- struct{}{}
|
concurrentLimiter <- struct{}{}
|
||||||
go func(url string) {
|
go func(url string) {
|
||||||
@@ -78,7 +84,7 @@ func (sp *ServerPicker) processConnResults(resultChan chan connResult, successCh
|
|||||||
for numOfResults := 0; numOfResults < cap(resultChan); numOfResults++ {
|
for numOfResults := 0; numOfResults < cap(resultChan); numOfResults++ {
|
||||||
cr := <-resultChan
|
cr := <-resultChan
|
||||||
if cr.Err != nil {
|
if cr.Err != nil {
|
||||||
log.Debugf("failed to connect to Relay server: %s: %v", cr.Url, cr.Err)
|
log.Tracef("failed to connect to Relay server: %s: %v", cr.Url, cr.Err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Infof("connected to Relay server: %s", cr.Url)
|
log.Infof("connected to Relay server: %s", cr.Url)
|
||||||
|
|||||||
@@ -7,16 +7,19 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestServerPicker_UnavailableServers(t *testing.T) {
|
func TestServerPicker_UnavailableServers(t *testing.T) {
|
||||||
|
connectionTimeout = 5 * time.Second
|
||||||
|
|
||||||
sp := ServerPicker{
|
sp := ServerPicker{
|
||||||
TokenStore: nil,
|
TokenStore: nil,
|
||||||
PeerID: "test",
|
PeerID: "test",
|
||||||
}
|
}
|
||||||
|
sp.ServerURLs.Store([]string{"rel://dummy1", "rel://dummy2"})
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout+1)
|
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout+1)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
_, err := sp.PickServer(ctx, []string{"rel://dummy1", "rel://dummy2"})
|
_, err := sp.PickServer(ctx)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|||||||
112
relay/server/listener/quic/conn.go
Normal file
112
relay/server/listener/quic/conn.go
Normal file
@@ -0,0 +1,112 @@
|
|||||||
|
package quic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/quic-go/quic-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
writeTimeout = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
type Conn struct {
|
||||||
|
session quic.Connection
|
||||||
|
closed bool
|
||||||
|
closedMu sync.Mutex
|
||||||
|
ctx context.Context
|
||||||
|
ctxCancel context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConn(session quic.Connection) *Conn {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
return &Conn{
|
||||||
|
session: session,
|
||||||
|
ctx: ctx,
|
||||||
|
ctxCancel: cancel,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) Read(b []byte) (n int, err error) {
|
||||||
|
if c.isClosed() {
|
||||||
|
return 0, io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
dgram, err := c.session.ReceiveDatagram(c.ctx)
|
||||||
|
if err != nil {
|
||||||
|
return 0, c.ioErrHandling(err)
|
||||||
|
}
|
||||||
|
// Copy data to b, ensuring we don’t exceed the size of b
|
||||||
|
n = copy(b, dgram)
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) Write(b []byte) (int, error) {
|
||||||
|
err := c.session.SendDatagram(b)
|
||||||
|
return len(b), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) LocalAddr() net.Addr {
|
||||||
|
return c.session.LocalAddr()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) RemoteAddr() net.Addr {
|
||||||
|
return c.session.RemoteAddr()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) SetReadDeadline(t time.Time) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) SetWriteDeadline(t time.Time) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) SetDeadline(t time.Time) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) Close() error {
|
||||||
|
c.closedMu.Lock()
|
||||||
|
if c.closed {
|
||||||
|
c.closedMu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
c.closed = true
|
||||||
|
c.closedMu.Unlock()
|
||||||
|
|
||||||
|
c.ctxCancel() // Cancel the context
|
||||||
|
|
||||||
|
sessionErr := c.session.CloseWithError(0, "normal closure")
|
||||||
|
return sessionErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) isClosed() bool {
|
||||||
|
c.closedMu.Lock()
|
||||||
|
defer c.closedMu.Unlock()
|
||||||
|
return c.closed
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) ioErrHandling(err error) error {
|
||||||
|
if c.isClosed() {
|
||||||
|
return io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle QUIC-specific errors
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the connection was closed remotely
|
||||||
|
var appErr *quic.ApplicationError
|
||||||
|
if errors.As(err, &appErr) && appErr.ErrorCode == 0 { // 0 is normal closure
|
||||||
|
return io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
66
relay/server/listener/quic/listener.go
Normal file
66
relay/server/listener/quic/listener.go
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
package quic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
|
||||||
|
"github.com/quic-go/quic-go"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Listener struct {
|
||||||
|
// Address is the address to listen on
|
||||||
|
Address string
|
||||||
|
// TLSConfig is the TLS configuration for the server
|
||||||
|
TLSConfig *tls.Config
|
||||||
|
|
||||||
|
listener *quic.Listener
|
||||||
|
acceptFn func(conn net.Conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Listener) Listen(acceptFn func(conn net.Conn)) error {
|
||||||
|
l.acceptFn = acceptFn
|
||||||
|
|
||||||
|
quicCfg := &quic.Config{
|
||||||
|
EnableDatagrams: true,
|
||||||
|
}
|
||||||
|
listener, err := quic.ListenAddr(l.Address, l.TLSConfig, quicCfg)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create QUIC listener: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
l.listener = listener
|
||||||
|
log.Infof("QUIC server listening on address: %s", l.Address)
|
||||||
|
|
||||||
|
for {
|
||||||
|
session, err := listener.Accept(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, quic.ErrServerClosed) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Errorf("Failed to accept QUIC session: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("QUIC client connected from: %s", session.RemoteAddr())
|
||||||
|
conn := NewConn(session)
|
||||||
|
l.acceptFn(conn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Listener) Shutdown(ctx context.Context) error {
|
||||||
|
if l.listener == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("stopping QUIC listener")
|
||||||
|
if err := l.listener.Close(); err != nil {
|
||||||
|
return fmt.Errorf("listener shutdown failed: %v", err)
|
||||||
|
}
|
||||||
|
log.Infof("QUIC listener stopped")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -23,6 +23,7 @@ type Listener struct {
|
|||||||
|
|
||||||
server *http.Server
|
server *http.Server
|
||||||
acceptFn func(conn net.Conn)
|
acceptFn func(conn net.Conn)
|
||||||
|
log *log.Entry
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Listener) Listen(acceptFn func(conn net.Conn)) error {
|
func (l *Listener) Listen(acceptFn func(conn net.Conn)) error {
|
||||||
@@ -88,6 +89,8 @@ func (l *Listener) onAccept(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Infof("WS client connected from: %s", rAddr)
|
||||||
|
|
||||||
conn := NewConn(wsConn, lAddr, rAddr)
|
conn := NewConn(wsConn, lAddr, rAddr)
|
||||||
l.acceptFn(conn)
|
l.acceptFn(conn)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,13 +2,25 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"crypto/rsa"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"crypto/x509/pkix"
|
||||||
|
"encoding/pem"
|
||||||
|
"math/big"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-multierror"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"go.opentelemetry.io/otel/metric"
|
"go.opentelemetry.io/otel/metric"
|
||||||
|
|
||||||
|
nberrors "github.com/netbirdio/netbird/client/errors"
|
||||||
"github.com/netbirdio/netbird/relay/auth"
|
"github.com/netbirdio/netbird/relay/auth"
|
||||||
"github.com/netbirdio/netbird/relay/server/listener"
|
"github.com/netbirdio/netbird/relay/server/listener"
|
||||||
|
"github.com/netbirdio/netbird/relay/server/listener/quic"
|
||||||
"github.com/netbirdio/netbird/relay/server/listener/ws"
|
"github.com/netbirdio/netbird/relay/server/listener/ws"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -25,7 +37,7 @@ type ListenerConfig struct {
|
|||||||
// In a new HTTP connection, the server will accept the connection and pass it to the Relay server via the Accept method.
|
// In a new HTTP connection, the server will accept the connection and pass it to the Relay server via the Accept method.
|
||||||
type Server struct {
|
type Server struct {
|
||||||
relay *Relay
|
relay *Relay
|
||||||
wSListener listener.Listener
|
listeners []listener.Listener
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer creates a new relay server instance.
|
// NewServer creates a new relay server instance.
|
||||||
@@ -40,37 +52,119 @@ func NewServer(meter metric.Meter, exposedAddress string, tlsSupport bool, authV
|
|||||||
}
|
}
|
||||||
return &Server{
|
return &Server{
|
||||||
relay: relay,
|
relay: relay,
|
||||||
|
listeners: make([]listener.Listener, 0, 2),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Listen starts the relay server.
|
// Listen starts the relay server.
|
||||||
func (r *Server) Listen(cfg ListenerConfig) error {
|
func (r *Server) Listen(cfg ListenerConfig) error {
|
||||||
r.wSListener = &ws.Listener{
|
wSListener := &ws.Listener{
|
||||||
Address: cfg.Address,
|
Address: cfg.Address,
|
||||||
TLSConfig: cfg.TLSConfig,
|
TLSConfig: cfg.TLSConfig,
|
||||||
}
|
}
|
||||||
|
r.listeners = append(r.listeners, wSListener)
|
||||||
|
|
||||||
wslErr := r.wSListener.Listen(r.relay.Accept)
|
quicListener := &quic.Listener{
|
||||||
if wslErr != nil {
|
Address: cfg.Address,
|
||||||
log.Errorf("failed to bind ws server: %s", wslErr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return wslErr
|
if cfg.TLSConfig != nil {
|
||||||
|
quicListener.TLSConfig = cfg.TLSConfig
|
||||||
|
} else {
|
||||||
|
tlsConfig, err := generateTestTLSConfig()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
quicListener.TLSConfig = tlsConfig
|
||||||
|
}
|
||||||
|
r.listeners = append(r.listeners, quicListener)
|
||||||
|
|
||||||
|
errChan := make(chan error, len(r.listeners))
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
for _, l := range r.listeners {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(listener listener.Listener) {
|
||||||
|
defer wg.Done()
|
||||||
|
errChan <- listener.Listen(r.relay.Accept)
|
||||||
|
}(l)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
close(errChan)
|
||||||
|
var multiErr *multierror.Error
|
||||||
|
for err := range errChan {
|
||||||
|
multiErr = multierror.Append(multiErr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nberrors.FormatErrorOrNil(multiErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown stops the relay server. If there are active connections, they will be closed gracefully. In case of a context,
|
// Shutdown stops the relay server. If there are active connections, they will be closed gracefully. In case of a context,
|
||||||
// the connections will be forcefully closed.
|
// the connections will be forcefully closed.
|
||||||
func (r *Server) Shutdown(ctx context.Context) (err error) {
|
func (r *Server) Shutdown(ctx context.Context) error {
|
||||||
// stop service new connections
|
var multiErr *multierror.Error
|
||||||
if r.wSListener != nil {
|
for _, l := range r.listeners {
|
||||||
err = r.wSListener.Shutdown(ctx)
|
if err := l.Shutdown(ctx); err != nil {
|
||||||
|
multiErr = multierror.Append(multiErr, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
r.relay.Shutdown(ctx)
|
r.relay.Shutdown(ctx)
|
||||||
return
|
return nberrors.FormatErrorOrNil(multiErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// InstanceURL returns the instance URL of the relay server.
|
// InstanceURL returns the instance URL of the relay server.
|
||||||
func (r *Server) InstanceURL() string {
|
func (r *Server) InstanceURL() string {
|
||||||
return r.relay.instanceURL
|
return r.relay.instanceURL
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GenerateTestTLSConfig creates a self-signed certificate for testing
|
||||||
|
func generateTestTLSConfig() (*tls.Config, error) {
|
||||||
|
log.Infof("generating test TLS config")
|
||||||
|
privateKey, err := rsa.GenerateKey(rand.Reader, 2048)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
template := x509.Certificate{
|
||||||
|
SerialNumber: big.NewInt(1),
|
||||||
|
Subject: pkix.Name{
|
||||||
|
Organization: []string{"Test Organization"},
|
||||||
|
},
|
||||||
|
NotBefore: time.Now(),
|
||||||
|
NotAfter: time.Now().Add(time.Hour * 24 * 180), // Valid for 180 days
|
||||||
|
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
|
||||||
|
ExtKeyUsage: []x509.ExtKeyUsage{
|
||||||
|
x509.ExtKeyUsageServerAuth,
|
||||||
|
},
|
||||||
|
BasicConstraintsValid: true,
|
||||||
|
DNSNames: []string{"localhost"},
|
||||||
|
IPAddresses: []net.IP{net.ParseIP("192.168.0.10")},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create certificate
|
||||||
|
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
certPEM := pem.EncodeToMemory(&pem.Block{
|
||||||
|
Type: "CERTIFICATE",
|
||||||
|
Bytes: certDER,
|
||||||
|
})
|
||||||
|
|
||||||
|
privateKeyPEM := pem.EncodeToMemory(&pem.Block{
|
||||||
|
Type: "RSA PRIVATE KEY",
|
||||||
|
Bytes: x509.MarshalPKCS1PrivateKey(privateKey),
|
||||||
|
})
|
||||||
|
|
||||||
|
tlsCert, err := tls.X509KeyPair(certPEM, privateKeyPEM)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &tls.Config{
|
||||||
|
Certificates: []tls.Certificate{tlsCert},
|
||||||
|
NextProtos: []string{"netbird-relay"},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user