mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 08:16:39 +00:00
Improve rego policy processing performance
This commit is contained in:
@@ -295,6 +295,8 @@ func (a *Account) GetPeerNetworkMap(peerID, dnsDomain string) *NetworkMap {
|
|||||||
}
|
}
|
||||||
peersToConnect = append(peersToConnect, p)
|
peersToConnect = append(peersToConnect, p)
|
||||||
}
|
}
|
||||||
|
log.Tracef("sync for peer with pubKey %s have %d to connect and %d expired peers from %d aclPeers",
|
||||||
|
a.Peers[peerID].Key, len(peersToConnect), len(expiredPeers), len(aclPeers))
|
||||||
// Please mind, that the returned route.Route objects will contain Peer.Key instead of Peer.ID.
|
// Please mind, that the returned route.Route objects will contain Peer.Key instead of Peer.ID.
|
||||||
routesUpdate := a.getRoutesToSync(peerID, peersToConnect)
|
routesUpdate := a.getRoutesToSync(peerID, peersToConnect)
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"html/template"
|
"html/template"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/management/server/activity"
|
"github.com/netbirdio/netbird/management/server/activity"
|
||||||
"github.com/netbirdio/netbird/management/server/status"
|
"github.com/netbirdio/netbird/management/server/status"
|
||||||
@@ -112,6 +113,8 @@ type Policy struct {
|
|||||||
|
|
||||||
// Rules of the policy
|
// Rules of the policy
|
||||||
Rules []*PolicyRule
|
Rules []*PolicyRule
|
||||||
|
|
||||||
|
preparedEvalQuery *rego.PreparedEvalQuery
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy returns a copy of the policy.
|
// Copy returns a copy of the policy.
|
||||||
@@ -123,6 +126,10 @@ func (p *Policy) Copy() *Policy {
|
|||||||
Enabled: p.Enabled,
|
Enabled: p.Enabled,
|
||||||
Query: p.Query,
|
Query: p.Query,
|
||||||
}
|
}
|
||||||
|
if p.preparedEvalQuery != nil {
|
||||||
|
preparedEvalQuery := *p.preparedEvalQuery
|
||||||
|
c.preparedEvalQuery = &preparedEvalQuery
|
||||||
|
}
|
||||||
for _, r := range p.Rules {
|
for _, r := range p.Rules {
|
||||||
c.Rules = append(c.Rules, r.Copy())
|
c.Rules = append(c.Rules, r.Copy())
|
||||||
}
|
}
|
||||||
@@ -159,6 +166,19 @@ func (p *Policy) UpdateQueryFromRules() error {
|
|||||||
queries = append(queries, buff.String())
|
queries = append(queries, buff.String())
|
||||||
}
|
}
|
||||||
p.Query = strings.Join(queries, "\n")
|
p.Query = strings.Join(queries, "\n")
|
||||||
|
|
||||||
|
stmt, err := rego.New(
|
||||||
|
rego.Query("data.netbird.all"),
|
||||||
|
rego.Module("netbird", defaultPolicyModule),
|
||||||
|
rego.Module("netbird-query", p.Query),
|
||||||
|
).PrepareForEval(context.TODO())
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("prepare rego statement for eval")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
p.preparedEvalQuery = &stmt
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -231,24 +251,14 @@ func (f *FirewallRule) parseFromRegoResult(value interface{}) error {
|
|||||||
func (a *Account) queryPeersAndFwRulesByRego(
|
func (a *Account) queryPeersAndFwRulesByRego(
|
||||||
peerID string,
|
peerID string,
|
||||||
queryNumber int,
|
queryNumber int,
|
||||||
query string,
|
stmt *rego.PreparedEvalQuery,
|
||||||
) ([]*Peer, []*FirewallRule) {
|
) ([]*Peer, []*FirewallRule) {
|
||||||
input := map[string]interface{}{
|
input := map[string]interface{}{
|
||||||
"peer_id": peerID,
|
"peer_id": peerID,
|
||||||
"peers": a.Peers,
|
"peers": a.Peers,
|
||||||
"groups": a.Groups,
|
"groups": a.Groups,
|
||||||
}
|
}
|
||||||
|
start := time.Now()
|
||||||
stmt, err := rego.New(
|
|
||||||
rego.Query("data.netbird.all"),
|
|
||||||
rego.Module("netbird", defaultPolicyModule),
|
|
||||||
rego.Module(fmt.Sprintf("netbird-%d", queryNumber), query),
|
|
||||||
).PrepareForEval(context.TODO())
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("get Rego query")
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
evalResult, err := stmt.Eval(
|
evalResult, err := stmt.Eval(
|
||||||
context.TODO(),
|
context.TODO(),
|
||||||
rego.EvalInput(input),
|
rego.EvalInput(input),
|
||||||
@@ -257,6 +267,7 @@ func (a *Account) queryPeersAndFwRulesByRego(
|
|||||||
log.WithError(err).Error("eval Rego query")
|
log.WithError(err).Error("eval Rego query")
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
log.Debugf("time evaluating policy for peer %s is: %s", peerID, time.Now().Sub(start))
|
||||||
|
|
||||||
if len(evalResult) == 0 || len(evalResult[0].Expressions) == 0 {
|
if len(evalResult) == 0 || len(evalResult[0].Expressions) == 0 {
|
||||||
log.Trace("empty Rego query eval result")
|
log.Trace("empty Rego query eval result")
|
||||||
@@ -271,12 +282,16 @@ func (a *Account) queryPeersAndFwRulesByRego(
|
|||||||
src := make(map[string]struct{})
|
src := make(map[string]struct{})
|
||||||
peers := make([]*Peer, 0, len(expressions))
|
peers := make([]*Peer, 0, len(expressions))
|
||||||
rules := make([]*FirewallRule, 0, len(expressions))
|
rules := make([]*FirewallRule, 0, len(expressions))
|
||||||
|
|
||||||
|
expTime := make([]time.Duration, 0)
|
||||||
for _, v := range expressions {
|
for _, v := range expressions {
|
||||||
rule := &FirewallRule{}
|
rule := &FirewallRule{}
|
||||||
|
start = time.Now()
|
||||||
if err := rule.parseFromRegoResult(v); err != nil {
|
if err := rule.parseFromRegoResult(v); err != nil {
|
||||||
log.WithError(err).Error("parse Rego query eval result")
|
log.WithError(err).Error("parse Rego query eval result")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
expTime = append(expTime, time.Now().Sub(start))
|
||||||
rules = append(rules, rule)
|
rules = append(rules, rule)
|
||||||
switch rule.Direction {
|
switch rule.Direction {
|
||||||
case "dst":
|
case "dst":
|
||||||
@@ -294,7 +309,15 @@ func (a *Account) queryPeersAndFwRulesByRego(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if len(expTime) > 0 {
|
||||||
|
n := time.Duration(0)
|
||||||
|
for _, d := range expTime {
|
||||||
|
n = n + d
|
||||||
|
}
|
||||||
|
|
||||||
|
avg := time.Duration(int(n) / len(expTime))
|
||||||
|
log.Debugf("time evaluating policy expressions for peer %s is: %s for %d expressions", peerID, avg, len(expTime))
|
||||||
|
}
|
||||||
added := make(map[string]struct{})
|
added := make(map[string]struct{})
|
||||||
if _, ok := src[peerID]; ok {
|
if _, ok := src[peerID]; ok {
|
||||||
for id := range dst {
|
for id := range dst {
|
||||||
@@ -325,7 +348,16 @@ func (a *Account) getPeersByPolicy(peerID string) (peers []*Peer, rules []*Firew
|
|||||||
if !policy.Enabled {
|
if !policy.Enabled {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
p, r := a.queryPeersAndFwRulesByRego(peerID, i, policy.Query)
|
if policy.preparedEvalQuery == nil {
|
||||||
|
log.Debugf("generating a new statement for policy %s", policy.ID)
|
||||||
|
err := policy.UpdateQueryFromRules()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("unable to update query from rules, skiping policy %s, error: %s", policy.ID, err)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
p, r := a.queryPeersAndFwRulesByRego(peerID, i, policy.preparedEvalQuery)
|
||||||
for _, peer := range p {
|
for _, peer := range p {
|
||||||
if _, ok := peersSeen[peer.ID]; ok {
|
if _, ok := peersSeen[peer.ID]; ok {
|
||||||
continue
|
continue
|
||||||
|
|||||||
Reference in New Issue
Block a user