Add parallel processing

This commit is contained in:
Zoltan Papp
2023-04-17 16:44:45 +02:00
parent d0e51dfc11
commit c80cb22cc0
2 changed files with 80 additions and 16 deletions

View File

@@ -39,10 +39,11 @@ type ICEBind struct {
// NetBird related variables
transportNet transport.Net
udpMux *UniversalUDPMuxDefault
worker *worker
}
func NewICEBind(transportNet transport.Net) *ICEBind {
return &ICEBind{
b := &ICEBind{
batchSize: wgConn.DefaultBatchSize,
udpAddrPool: sync.Pool{
@@ -76,6 +77,8 @@ func NewICEBind(transportNet transport.Net) *ICEBind {
},
transportNet: transportNet,
}
b.worker = newWorker(b.handlePkgs)
return b
}
type StdNetEndpoint struct {
@@ -209,22 +212,8 @@ func (s *ICEBind) receiveIPv4(buffs [][]byte, sizes []int, eps []wgConn.Endpoint
if err != nil {
return 0, err
}
for i := 0; i < numMsgs; i++ {
msg := &(*msgs)[i]
// todo: handle err
ok, _ := s.filterOutStunMessages(msg.Buffers, msg.N, msg.Addr)
if ok {
sizes[i] = 0
} else {
sizes[i] = msg.N
}
addrPort := msg.Addr.(*net.UDPAddr).AddrPort()
ep := asEndpoint(addrPort)
getSrcFromControl(msg.OOB, ep)
eps[i] = ep
}
s.worker.doWork((*msgs)[:numMsgs], sizes, eps)
return numMsgs, nil
}
@@ -389,6 +378,20 @@ func (s *ICEBind) filterOutStunMessages(buffers [][]byte, n int, addr net.Addr)
return false, nil
}
func (s *ICEBind) handlePkgs(msg *ipv4.Message) (int, *StdNetEndpoint) {
// todo: handle err
size := 0
ok, _ := s.filterOutStunMessages(msg.Buffers, msg.N, msg.Addr)
if !ok {
size = msg.N
}
addrPort := msg.Addr.(*net.UDPAddr).AddrPort()
ep := asEndpoint(addrPort)
getSrcFromControl(msg.OOB, ep)
return size, ep
}
// endpointPool contains a re-usable set of mapping from netip.AddrPort to Endpoint.
// This exists to reduce allocations: Putting a netip.AddrPort in an Endpoint allocates,
// but Endpoints are immutable, so we can re-use them.

61
iface/bind/worker.go Normal file
View File

@@ -0,0 +1,61 @@
package bind
import (
"runtime"
"sync"
"golang.org/x/net/ipv4"
wgConn "golang.zx2c4.com/wireguard/conn"
)
// todo: add close function
type worker struct {
jobOffer chan int
numOfWorker int
wg sync.WaitGroup
jobFn func(msg *ipv4.Message) (int, *StdNetEndpoint)
messages []ipv4.Message
sizes []int
eps []wgConn.Endpoint
}
func newWorker(jobFn func(msg *ipv4.Message) (int, *StdNetEndpoint)) *worker {
w := &worker{
jobOffer: make(chan int),
numOfWorker: runtime.NumCPU(),
jobFn: jobFn,
}
w.populateWorkers()
return w
}
func (w *worker) doWork(messages []ipv4.Message, sizes []int, eps []wgConn.Endpoint) {
w.messages = messages
w.sizes = sizes
w.eps = eps
w.wg.Add(w.numOfWorker)
for i := 0; i < len(messages); i++ {
w.jobOffer <- i
}
w.wg.Wait()
}
func (w *worker) populateWorkers() {
for i := 0; i < w.numOfWorker; i++ {
go w.loop()
}
}
func (w *worker) loop() {
for {
select {
case msgPos := <-w.jobOffer:
w.sizes[msgPos], w.eps[msgPos] = w.jobFn(&w.messages[msgPos])
w.wg.Done()
}
}
}