Compare commits

...

2 Commits

Author SHA1 Message Date
braginini
ab579f5de0 chore: [management] - fix golint 2022-01-01 14:53:49 +01:00
braginini
09eeb71af2 chore: [management] - replace proactive peer updates with periodic updates 2022-01-01 14:47:57 +01:00
3 changed files with 48 additions and 9 deletions

View File

@@ -3,6 +3,7 @@ package server
import ( import (
"context" "context"
"fmt" "fmt"
"math/rand"
"time" "time"
"github.com/golang/protobuf/ptypes/timestamp" "github.com/golang/protobuf/ptypes/timestamp"
@@ -95,6 +96,8 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
if s.config.TURNConfig.TimeBasedCredentials { if s.config.TURNConfig.TimeBasedCredentials {
s.turnCredentialsManager.SetupRefresh(peerKey.String()) s.turnCredentialsManager.SetupRefresh(peerKey.String())
} }
s.schedulePeerUpdates(srv.Context(), peerKey.String(), peer)
// keep a connection to the peer and send updates when available // keep a connection to the peer and send updates when available
for { for {
select { select {
@@ -135,6 +138,39 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
} }
} }
func (s *Server) schedulePeerUpdates(context context.Context, peerKey string, peer *Peer) {
//todo: introduce the following logic:
// add a ModificationId to the Account entity (ModificationId increments by 1 if there was a change to the account network map)
// periodically fetch changes of the Account providing ModificationId
// if ModificationId is < then the one of the Account, then send changes
// Client has to handle modification id as well
go func() {
for {
select {
case <-context.Done():
log.Debugf("peer update cancelled %s", peerKey)
return
default:
maxSleep := 6
minSleep := 3
sleep := rand.Intn(maxSleep-minSleep) + minSleep
time.Sleep(time.Duration(sleep) * time.Second)
peers, err := s.accountManager.GetPeersForAPeer(peerKey)
if err != nil {
continue
}
update := toSyncResponse(s.config, peer, peers, nil)
err = s.peersUpdateManager.SendUpdate(peerKey, &UpdateMessage{Update: update})
if err != nil {
continue
}
}
}
}()
}
func (s *Server) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) (*Peer, error) { func (s *Server) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) (*Peer, error) {
meta := req.GetMeta() meta := req.GetMeta()
@@ -158,12 +194,13 @@ func (s *Server) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) (*Pe
return nil, status.Errorf(codes.NotFound, "provided setup key doesn't exists") return nil, status.Errorf(codes.NotFound, "provided setup key doesn't exists")
} }
peers, err := s.accountManager.GetPeersForAPeer(peer.Key) // notify other peers of our registration - uncomment if you want to bring back peer update logic
/*peers, err := s.accountManager.GetPeersForAPeer(peer.Key)
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, "internal server error") return nil, status.Error(codes.Internal, "internal server error")
} }
// notify other peers of our registration
for _, remotePeer := range peers { for _, remotePeer := range peers {
// exclude notified peer and add ourselves // exclude notified peer and add ourselves
peersToSend := []*Peer{peer} peersToSend := []*Peer{peer}
@@ -178,7 +215,7 @@ func (s *Server) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) (*Pe
// todo rethink if we should keep this return // todo rethink if we should keep this return
return nil, err return nil, err
} }
} }*/
return peer, nil return peer, nil
} }

View File

@@ -321,9 +321,9 @@ var _ = Describe("Management service", func() {
}) })
}) })
Context("when there are 50 peers registered under one account", func() { Context("when there are 30 peers registered under one account", func() {
Context("when there are 10 more peers registered under the same account", func() { Context("when there are 10 more peers registered under the same account", func() {
Specify("all of the 50 peers will get updates of 10 newly registered peers", func() { Specify("all of the 20 peers will have 29 peer to connect to (total 30-1 itself)", func() {
initialPeers := 20 initialPeers := 20
additionalPeers := 10 additionalPeers := 10
@@ -336,7 +336,7 @@ var _ = Describe("Management service", func() {
} }
wg := sync2.WaitGroup{} wg := sync2.WaitGroup{}
wg.Add(initialPeers + initialPeers*additionalPeers) wg.Add(initialPeers)
var clients []mgmtProto.ManagementService_SyncClient var clients []mgmtProto.ManagementService_SyncClient
for _, peer := range peers { for _, peer := range peers {
@@ -368,9 +368,10 @@ var _ = Describe("Management service", func() {
resp := &mgmtProto.SyncResponse{} resp := &mgmtProto.SyncResponse{}
err = pb.Unmarshal(decryptedBytes, resp) err = pb.Unmarshal(decryptedBytes, resp)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if len(resp.GetRemotePeers()) > 0 { if len(resp.GetRemotePeers()) == 29 {
//only consider peer updates //only consider peer updates
wg.Done() wg.Done()
return
} }
} }
}() }()

View File

@@ -123,6 +123,7 @@ func (am *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, e
return nil, err return nil, err
} }
// notify peer itself
err = am.peersUpdateManager.SendUpdate(peerKey, err = am.peersUpdateManager.SendUpdate(peerKey,
&UpdateMessage{ &UpdateMessage{
Update: &proto.SyncResponse{ Update: &proto.SyncResponse{
@@ -134,7 +135,7 @@ func (am *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, e
} }
//notify other peers of the change //notify other peers of the change
peers, err := am.Store.GetAccountPeers(accountId) /*peers, err := am.Store.GetAccountPeers(accountId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -156,7 +157,7 @@ func (am *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
} } */
am.peersUpdateManager.CloseChannel(peerKey) am.peersUpdateManager.CloseChannel(peerKey)
return peer, nil return peer, nil