mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-16 15:26:40 +00:00
480 lines
12 KiB
Go
480 lines
12 KiB
Go
package inspect
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"net/textproto"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const (
|
|
icapVersion = "ICAP/1.0"
|
|
icapDefaultPort = "1344"
|
|
icapConnTimeout = 30 * time.Second
|
|
icapRWTimeout = 60 * time.Second
|
|
icapMaxPoolSize = 8
|
|
icapIdleTimeout = 60 * time.Second
|
|
icapMaxRespSize = 4 * 1024 * 1024 // 4 MB
|
|
)
|
|
|
|
// ICAPClient implements an ICAP (RFC 3507) client with persistent connection pooling.
|
|
type ICAPClient struct {
|
|
reqModURL *url.URL
|
|
respModURL *url.URL
|
|
pool chan *icapConn
|
|
mu sync.Mutex
|
|
log *log.Entry
|
|
maxPool int
|
|
}
|
|
|
|
type icapConn struct {
|
|
conn net.Conn
|
|
reader *bufio.Reader
|
|
lastUse time.Time
|
|
}
|
|
|
|
// NewICAPClient creates an ICAP client. Either or both URLs may be nil
|
|
// to disable that mode.
|
|
func NewICAPClient(logger *log.Entry, cfg *ICAPConfig) *ICAPClient {
|
|
maxPool := cfg.MaxConnections
|
|
if maxPool <= 0 {
|
|
maxPool = icapMaxPoolSize
|
|
}
|
|
|
|
return &ICAPClient{
|
|
reqModURL: cfg.ReqModURL,
|
|
respModURL: cfg.RespModURL,
|
|
pool: make(chan *icapConn, maxPool),
|
|
log: logger,
|
|
maxPool: maxPool,
|
|
}
|
|
}
|
|
|
|
// ReqMod sends an HTTP request to the ICAP REQMOD service for inspection.
|
|
// Returns the (possibly modified) request, or the original if ICAP returns 204.
|
|
// Returns nil, nil if REQMOD is not configured.
|
|
func (c *ICAPClient) ReqMod(req *http.Request) (*http.Request, error) {
|
|
if c.reqModURL == nil {
|
|
return req, nil
|
|
}
|
|
|
|
var reqBuf bytes.Buffer
|
|
if err := req.Write(&reqBuf); err != nil {
|
|
return nil, fmt.Errorf("serialize request: %w", err)
|
|
}
|
|
|
|
respBody, err := c.send("REQMOD", c.reqModURL, reqBuf.Bytes(), nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if respBody == nil {
|
|
return req, nil
|
|
}
|
|
|
|
modified, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(respBody)))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse ICAP modified request: %w", err)
|
|
}
|
|
return modified, nil
|
|
}
|
|
|
|
// RespMod sends an HTTP response to the ICAP RESPMOD service for inspection.
|
|
// Returns the (possibly modified) response, or the original if ICAP returns 204.
|
|
// Returns nil, nil if RESPMOD is not configured.
|
|
func (c *ICAPClient) RespMod(req *http.Request, resp *http.Response) (*http.Response, error) {
|
|
if c.respModURL == nil {
|
|
return resp, nil
|
|
}
|
|
|
|
var reqBuf bytes.Buffer
|
|
if err := req.Write(&reqBuf); err != nil {
|
|
return nil, fmt.Errorf("serialize request: %w", err)
|
|
}
|
|
|
|
var respBuf bytes.Buffer
|
|
if err := resp.Write(&respBuf); err != nil {
|
|
return nil, fmt.Errorf("serialize response: %w", err)
|
|
}
|
|
|
|
respBody, err := c.send("RESPMOD", c.respModURL, reqBuf.Bytes(), respBuf.Bytes())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if respBody == nil {
|
|
// 204 No Content: ICAP server didn't modify the response.
|
|
// Reconstruct from the buffered copy since resp.Body was consumed by Write.
|
|
reconstructed, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(respBuf.Bytes())), req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("reconstruct response after ICAP 204: %w", err)
|
|
}
|
|
return reconstructed, nil
|
|
}
|
|
|
|
modified, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(respBody)), req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse ICAP modified response: %w", err)
|
|
}
|
|
return modified, nil
|
|
}
|
|
|
|
// Close drains and closes all pooled connections.
|
|
func (c *ICAPClient) Close() {
|
|
close(c.pool)
|
|
for ic := range c.pool {
|
|
if err := ic.conn.Close(); err != nil {
|
|
c.log.Debugf("close ICAP connection: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// send executes an ICAP request and returns the encapsulated body from the response.
|
|
// Returns nil body for 204 No Content (no modification).
|
|
// Retries once on stale pooled connection (EOF on read).
|
|
func (c *ICAPClient) send(method string, serviceURL *url.URL, reqData, respData []byte) ([]byte, error) {
|
|
statusCode, headers, body, err := c.trySend(method, serviceURL, reqData, respData)
|
|
if err != nil && isStaleConnErr(err) {
|
|
// Retry once with a fresh connection (stale pool entry).
|
|
c.log.Debugf("ICAP %s: retrying after stale connection: %v", method, err)
|
|
statusCode, headers, body, err = c.trySend(method, serviceURL, reqData, respData)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
switch statusCode {
|
|
case 204:
|
|
return nil, nil
|
|
case 200:
|
|
return body, nil
|
|
default:
|
|
c.log.Debugf("ICAP %s returned status %d, headers: %v", method, statusCode, headers)
|
|
return nil, fmt.Errorf("ICAP %s: status %d", method, statusCode)
|
|
}
|
|
}
|
|
|
|
func (c *ICAPClient) trySend(method string, serviceURL *url.URL, reqData, respData []byte) (int, textproto.MIMEHeader, []byte, error) {
|
|
ic, err := c.getConn(serviceURL)
|
|
if err != nil {
|
|
return 0, nil, nil, fmt.Errorf("get ICAP connection: %w", err)
|
|
}
|
|
|
|
if err := c.writeRequest(ic, method, serviceURL, reqData, respData); err != nil {
|
|
if closeErr := ic.conn.Close(); closeErr != nil {
|
|
c.log.Debugf("close ICAP conn after write error: %v", closeErr)
|
|
}
|
|
return 0, nil, nil, fmt.Errorf("write ICAP %s: %w", method, err)
|
|
}
|
|
|
|
statusCode, headers, body, err := c.readResponse(ic)
|
|
if err != nil {
|
|
if closeErr := ic.conn.Close(); closeErr != nil {
|
|
c.log.Debugf("close ICAP conn after read error: %v", closeErr)
|
|
}
|
|
return 0, nil, nil, fmt.Errorf("read ICAP response: %w", err)
|
|
}
|
|
|
|
c.putConn(ic)
|
|
return statusCode, headers, body, nil
|
|
}
|
|
|
|
func isStaleConnErr(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
s := err.Error()
|
|
return strings.Contains(s, "EOF") || strings.Contains(s, "broken pipe") || strings.Contains(s, "connection reset")
|
|
}
|
|
|
|
func (c *ICAPClient) writeRequest(ic *icapConn, method string, serviceURL *url.URL, reqData, respData []byte) error {
|
|
if err := ic.conn.SetWriteDeadline(time.Now().Add(icapRWTimeout)); err != nil {
|
|
return fmt.Errorf("set write deadline: %w", err)
|
|
}
|
|
|
|
// For RESPMOD, split the serialized HTTP response into headers and body.
|
|
// The body must be sent chunked per RFC 3507.
|
|
var respHdr, respBody []byte
|
|
if respData != nil {
|
|
if idx := bytes.Index(respData, []byte("\r\n\r\n")); idx >= 0 {
|
|
respHdr = respData[:idx+4] // include the \r\n\r\n separator
|
|
respBody = respData[idx+4:]
|
|
} else {
|
|
respHdr = respData
|
|
}
|
|
}
|
|
|
|
var buf bytes.Buffer
|
|
|
|
// Request line
|
|
fmt.Fprintf(&buf, "%s %s %s\r\n", method, serviceURL.String(), icapVersion)
|
|
|
|
// Headers
|
|
host := serviceURL.Host
|
|
fmt.Fprintf(&buf, "Host: %s\r\n", host)
|
|
fmt.Fprintf(&buf, "Connection: keep-alive\r\n")
|
|
fmt.Fprintf(&buf, "Allow: 204\r\n")
|
|
|
|
// Build Encapsulated header
|
|
offset := 0
|
|
var encapParts []string
|
|
if reqData != nil {
|
|
encapParts = append(encapParts, fmt.Sprintf("req-hdr=%d", offset))
|
|
offset += len(reqData)
|
|
}
|
|
if respHdr != nil {
|
|
encapParts = append(encapParts, fmt.Sprintf("res-hdr=%d", offset))
|
|
offset += len(respHdr)
|
|
}
|
|
if len(respBody) > 0 {
|
|
encapParts = append(encapParts, fmt.Sprintf("res-body=%d", offset))
|
|
} else {
|
|
encapParts = append(encapParts, fmt.Sprintf("null-body=%d", offset))
|
|
}
|
|
fmt.Fprintf(&buf, "Encapsulated: %s\r\n", strings.Join(encapParts, ", "))
|
|
fmt.Fprintf(&buf, "\r\n")
|
|
|
|
// Encapsulated sections
|
|
if reqData != nil {
|
|
buf.Write(reqData)
|
|
}
|
|
if respHdr != nil {
|
|
buf.Write(respHdr)
|
|
}
|
|
// Body in chunked encoding (only when there is an actual body section).
|
|
// Per RFC 3507 Section 4.4.1, null-body must not include any entity data.
|
|
if len(respBody) > 0 {
|
|
fmt.Fprintf(&buf, "%x\r\n", len(respBody))
|
|
buf.Write(respBody)
|
|
buf.WriteString("\r\n")
|
|
buf.WriteString("0\r\n\r\n")
|
|
}
|
|
|
|
_, err := ic.conn.Write(buf.Bytes())
|
|
return err
|
|
}
|
|
|
|
func (c *ICAPClient) readResponse(ic *icapConn) (int, textproto.MIMEHeader, []byte, error) {
|
|
if err := ic.conn.SetReadDeadline(time.Now().Add(icapRWTimeout)); err != nil {
|
|
return 0, nil, nil, fmt.Errorf("set read deadline: %w", err)
|
|
}
|
|
|
|
tp := textproto.NewReader(ic.reader)
|
|
|
|
// Status line: "ICAP/1.0 200 OK"
|
|
statusLine, err := tp.ReadLine()
|
|
if err != nil {
|
|
return 0, nil, nil, fmt.Errorf("read status line: %w", err)
|
|
}
|
|
|
|
statusCode, err := parseICAPStatus(statusLine)
|
|
if err != nil {
|
|
return 0, nil, nil, err
|
|
}
|
|
|
|
// Headers
|
|
headers, err := tp.ReadMIMEHeader()
|
|
if err != nil {
|
|
return statusCode, nil, nil, fmt.Errorf("read ICAP headers: %w", err)
|
|
}
|
|
|
|
if statusCode == 204 {
|
|
return statusCode, headers, nil, nil
|
|
}
|
|
|
|
// Read encapsulated body based on Encapsulated header
|
|
body, err := c.readEncapsulatedBody(ic.reader, headers)
|
|
if err != nil {
|
|
return statusCode, headers, nil, fmt.Errorf("read encapsulated body: %w", err)
|
|
}
|
|
|
|
return statusCode, headers, body, nil
|
|
}
|
|
|
|
func (c *ICAPClient) readEncapsulatedBody(r *bufio.Reader, headers textproto.MIMEHeader) ([]byte, error) {
|
|
encap := headers.Get("Encapsulated")
|
|
if encap == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
// Find the body offset from the Encapsulated header.
|
|
// The last section with a non-zero offset is the body.
|
|
// Read everything from the reader as the encapsulated content.
|
|
var totalSize int
|
|
parts := strings.Split(encap, ",")
|
|
for _, part := range parts {
|
|
part = strings.TrimSpace(part)
|
|
eqIdx := strings.Index(part, "=")
|
|
if eqIdx < 0 {
|
|
continue
|
|
}
|
|
offset, err := strconv.Atoi(strings.TrimSpace(part[eqIdx+1:]))
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if offset > totalSize {
|
|
totalSize = offset
|
|
}
|
|
}
|
|
|
|
// Read all available encapsulated data (headers + body)
|
|
// The body section uses chunked encoding per RFC 3507
|
|
var buf bytes.Buffer
|
|
if totalSize > 0 {
|
|
// Read the header sections (everything before the body offset)
|
|
headerBytes := make([]byte, totalSize)
|
|
if _, err := io.ReadFull(r, headerBytes); err != nil {
|
|
return nil, fmt.Errorf("read encapsulated headers: %w", err)
|
|
}
|
|
buf.Write(headerBytes)
|
|
}
|
|
|
|
// Read chunked body
|
|
chunked := newChunkedReader(r)
|
|
body, err := io.ReadAll(io.LimitReader(chunked, icapMaxRespSize))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read chunked body: %w", err)
|
|
}
|
|
buf.Write(body)
|
|
|
|
return buf.Bytes(), nil
|
|
}
|
|
|
|
func (c *ICAPClient) getConn(serviceURL *url.URL) (*icapConn, error) {
|
|
// Try to get a pooled connection
|
|
for {
|
|
select {
|
|
case ic := <-c.pool:
|
|
if time.Since(ic.lastUse) > icapIdleTimeout {
|
|
if err := ic.conn.Close(); err != nil {
|
|
c.log.Debugf("close idle ICAP connection: %v", err)
|
|
}
|
|
continue
|
|
}
|
|
return ic, nil
|
|
default:
|
|
return c.dialConn(serviceURL)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *ICAPClient) putConn(ic *icapConn) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
ic.lastUse = time.Now()
|
|
select {
|
|
case c.pool <- ic:
|
|
default:
|
|
// Pool full, close connection.
|
|
if err := ic.conn.Close(); err != nil {
|
|
c.log.Debugf("close excess ICAP connection: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *ICAPClient) dialConn(serviceURL *url.URL) (*icapConn, error) {
|
|
host := serviceURL.Host
|
|
if _, _, err := net.SplitHostPort(host); err != nil {
|
|
host = net.JoinHostPort(host, icapDefaultPort)
|
|
}
|
|
|
|
conn, err := net.DialTimeout("tcp", host, icapConnTimeout)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dial ICAP %s: %w", host, err)
|
|
}
|
|
|
|
return &icapConn{
|
|
conn: conn,
|
|
reader: bufio.NewReader(conn),
|
|
lastUse: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
func parseICAPStatus(line string) (int, error) {
|
|
// "ICAP/1.0 200 OK"
|
|
parts := strings.SplitN(line, " ", 3)
|
|
if len(parts) < 2 {
|
|
return 0, fmt.Errorf("malformed ICAP status line: %q", line)
|
|
}
|
|
code, err := strconv.Atoi(parts[1])
|
|
if err != nil {
|
|
return 0, fmt.Errorf("parse ICAP status code %q: %w", parts[1], err)
|
|
}
|
|
return code, nil
|
|
}
|
|
|
|
// chunkedReader reads ICAP chunked encoding (same as HTTP chunked, terminated by "0\r\n\r\n").
|
|
type chunkedReader struct {
|
|
r *bufio.Reader
|
|
remaining int
|
|
done bool
|
|
}
|
|
|
|
func newChunkedReader(r *bufio.Reader) *chunkedReader {
|
|
return &chunkedReader{r: r}
|
|
}
|
|
|
|
func (cr *chunkedReader) Read(p []byte) (int, error) {
|
|
if cr.done {
|
|
return 0, io.EOF
|
|
}
|
|
|
|
if cr.remaining == 0 {
|
|
// Read chunk size line
|
|
line, err := cr.r.ReadString('\n')
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
line = strings.TrimSpace(line)
|
|
|
|
// Strip any chunk extensions
|
|
if idx := strings.Index(line, ";"); idx >= 0 {
|
|
line = line[:idx]
|
|
}
|
|
|
|
size, err := strconv.ParseInt(line, 16, 64)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("parse chunk size %q: %w", line, err)
|
|
}
|
|
|
|
if size == 0 {
|
|
cr.done = true
|
|
// Consume trailing \r\n
|
|
_, _ = cr.r.ReadString('\n')
|
|
return 0, io.EOF
|
|
}
|
|
|
|
if size < 0 || size > icapMaxRespSize {
|
|
return 0, fmt.Errorf("chunk size %d out of range (max %d)", size, icapMaxRespSize)
|
|
}
|
|
|
|
cr.remaining = int(size)
|
|
}
|
|
|
|
toRead := len(p)
|
|
if toRead > cr.remaining {
|
|
toRead = cr.remaining
|
|
}
|
|
|
|
n, err := cr.r.Read(p[:toRead])
|
|
cr.remaining -= n
|
|
|
|
if cr.remaining == 0 {
|
|
// Consume chunk-terminating \r\n
|
|
_, _ = cr.r.ReadString('\n')
|
|
}
|
|
|
|
return n, err
|
|
}
|