diff --git a/iface/bind/bind.go b/iface/bind/bind.go index 4f02bf076..b9dc1ba44 100644 --- a/iface/bind/bind.go +++ b/iface/bind/bind.go @@ -39,10 +39,11 @@ type ICEBind struct { // NetBird related variables transportNet transport.Net udpMux *UniversalUDPMuxDefault + worker *worker } func NewICEBind(transportNet transport.Net) *ICEBind { - return &ICEBind{ + b := &ICEBind{ batchSize: wgConn.DefaultBatchSize, udpAddrPool: sync.Pool{ @@ -76,6 +77,8 @@ func NewICEBind(transportNet transport.Net) *ICEBind { }, transportNet: transportNet, } + b.worker = newWorker(b.handlePkgs) + return b } type StdNetEndpoint struct { @@ -209,22 +212,8 @@ func (s *ICEBind) receiveIPv4(buffs [][]byte, sizes []int, eps []wgConn.Endpoint if err != nil { return 0, err } - for i := 0; i < numMsgs; i++ { - msg := &(*msgs)[i] - // todo: handle err - ok, _ := s.filterOutStunMessages(msg.Buffers, msg.N, msg.Addr) - if ok { - sizes[i] = 0 - } else { - sizes[i] = msg.N - } - - addrPort := msg.Addr.(*net.UDPAddr).AddrPort() - ep := asEndpoint(addrPort) - getSrcFromControl(msg.OOB, ep) - eps[i] = ep - } + s.worker.doWork((*msgs)[:numMsgs], sizes, eps) return numMsgs, nil } @@ -389,6 +378,20 @@ func (s *ICEBind) filterOutStunMessages(buffers [][]byte, n int, addr net.Addr) return false, nil } +func (s *ICEBind) handlePkgs(msg *ipv4.Message) (int, *StdNetEndpoint) { + // todo: handle err + size := 0 + ok, _ := s.filterOutStunMessages(msg.Buffers, msg.N, msg.Addr) + if !ok { + size = msg.N + } + + addrPort := msg.Addr.(*net.UDPAddr).AddrPort() + ep := asEndpoint(addrPort) + getSrcFromControl(msg.OOB, ep) + return size, ep +} + // endpointPool contains a re-usable set of mapping from netip.AddrPort to Endpoint. // This exists to reduce allocations: Putting a netip.AddrPort in an Endpoint allocates, // but Endpoints are immutable, so we can re-use them. diff --git a/iface/bind/worker.go b/iface/bind/worker.go new file mode 100644 index 000000000..3d3ed0b47 --- /dev/null +++ b/iface/bind/worker.go @@ -0,0 +1,61 @@ +package bind + +import ( + "runtime" + "sync" + + "golang.org/x/net/ipv4" + wgConn "golang.zx2c4.com/wireguard/conn" +) + +// todo: add close function +type worker struct { + jobOffer chan int + numOfWorker int + wg sync.WaitGroup + + jobFn func(msg *ipv4.Message) (int, *StdNetEndpoint) + + messages []ipv4.Message + sizes []int + eps []wgConn.Endpoint +} + +func newWorker(jobFn func(msg *ipv4.Message) (int, *StdNetEndpoint)) *worker { + w := &worker{ + jobOffer: make(chan int), + numOfWorker: runtime.NumCPU(), + jobFn: jobFn, + } + + w.populateWorkers() + return w +} + +func (w *worker) doWork(messages []ipv4.Message, sizes []int, eps []wgConn.Endpoint) { + w.messages = messages + w.sizes = sizes + w.eps = eps + + w.wg.Add(w.numOfWorker) + for i := 0; i < len(messages); i++ { + w.jobOffer <- i + } + w.wg.Wait() +} + +func (w *worker) populateWorkers() { + for i := 0; i < w.numOfWorker; i++ { + go w.loop() + } +} + +func (w *worker) loop() { + for { + select { + case msgPos := <-w.jobOffer: + w.sizes[msgPos], w.eps[msgPos] = w.jobFn(&w.messages[msgPos]) + w.wg.Done() + } + } +}