React to docker events

This commit is contained in:
Owen
2025-09-21 11:19:52 -04:00
parent 92cedd00b3
commit 60873f0a4f
2 changed files with 159 additions and 0 deletions

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/fosrl/newt/logger"
@@ -321,3 +322,128 @@ func getHostContainer(dockerContext context.Context, dockerClient *client.Client
return &hostContainer, nil
}
// EventCallback defines the function signature for handling Docker events
type EventCallback func(containers []Container)
// EventMonitor handles Docker event monitoring
type EventMonitor struct {
client *client.Client
ctx context.Context
cancel context.CancelFunc
callback EventCallback
socketPath string
enforceNetworkValidation bool
}
// NewEventMonitor creates a new Docker event monitor
func NewEventMonitor(socketPath string, enforceNetworkValidation bool, callback EventCallback) (*EventMonitor, error) {
if socketPath == "" {
socketPath = "unix:///var/run/docker.sock"
}
if !strings.Contains(socketPath, "://") {
socketPath = "unix://" + socketPath
}
cli, err := client.NewClientWithOpts(
client.WithHost(socketPath),
client.WithAPIVersionNegotiation(),
)
if err != nil {
return nil, fmt.Errorf("failed to create Docker client: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
return &EventMonitor{
client: cli,
ctx: ctx,
cancel: cancel,
callback: callback,
socketPath: socketPath,
enforceNetworkValidation: enforceNetworkValidation,
}, nil
}
// Start begins monitoring Docker events
func (em *EventMonitor) Start() error {
logger.Info("Starting Docker event monitoring")
// Filter for container events we care about
eventFilters := filters.NewArgs()
eventFilters.Add("type", "container")
eventFilters.Add("event", "create")
eventFilters.Add("event", "start")
eventFilters.Add("event", "stop")
eventFilters.Add("event", "destroy")
eventFilters.Add("event", "die")
eventFilters.Add("event", "pause")
eventFilters.Add("event", "unpause")
// Start listening for events
eventCh, errCh := em.client.Events(em.ctx, events.ListOptions{
Filters: eventFilters,
})
go func() {
defer func() {
if err := em.client.Close(); err != nil {
logger.Error("Error closing Docker client: %v", err)
}
}()
for {
select {
case event := <-eventCh:
logger.Debug("Docker event received: %s %s for container %s", event.Action, event.Type, event.Actor.ID[:12])
// Fetch updated container list and trigger callback
go em.handleEvent(event)
case err := <-errCh:
if err != nil && err != context.Canceled {
logger.Error("Docker event stream error: %v", err)
// Try to reconnect after a brief delay
time.Sleep(5 * time.Second)
if em.ctx.Err() == nil {
logger.Info("Attempting to reconnect to Docker event stream")
eventCh, errCh = em.client.Events(em.ctx, events.ListOptions{
Filters: eventFilters,
})
}
}
return
case <-em.ctx.Done():
logger.Info("Docker event monitoring stopped")
return
}
}
}()
return nil
}
// handleEvent processes a Docker event and triggers the callback with updated container list
func (em *EventMonitor) handleEvent(event events.Message) {
// Add a small delay to ensure Docker has fully processed the event
time.Sleep(100 * time.Millisecond)
containers, err := ListContainers(em.socketPath, em.enforceNetworkValidation)
if err != nil {
logger.Error("Failed to list containers after Docker event %s: %v", event.Action, err)
return
}
logger.Debug("Triggering callback with %d containers after Docker event %s", len(containers), event.Action)
em.callback(containers)
}
// Stop stops the event monitoring
func (em *EventMonitor) Stop() {
logger.Info("Stopping Docker event monitoring")
if em.cancel != nil {
em.cancel()
}
}

33
main.go
View File

@@ -413,6 +413,7 @@ func main() {
var pm *proxy.ProxyManager
var connected bool
var wgData WgData
var dockerEventMonitor *docker.EventMonitor
if acceptClients {
setupClients(client)
@@ -1265,6 +1266,34 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
}
defer client.Close()
// Initialize Docker event monitoring if Docker socket is available and monitoring is enabled
if dockerSocket != "" {
logger.Info("Initializing Docker event monitoring")
dockerEventMonitor, err = docker.NewEventMonitor(dockerSocket, dockerEnforceNetworkValidationBool, func(containers []docker.Container) {
// Send updated container list via websocket when Docker events occur
logger.Debug("Docker event detected, sending updated container list (%d containers)", len(containers))
err := client.SendMessage("newt/socket/containers", map[string]interface{}{
"containers": containers,
})
if err != nil {
logger.Error("Failed to send updated container list after Docker event: %v", err)
} else {
logger.Debug("Updated container list sent successfully")
}
})
if err != nil {
logger.Error("Failed to create Docker event monitor: %v", err)
} else {
err = dockerEventMonitor.Start()
if err != nil {
logger.Error("Failed to start Docker event monitoring: %v", err)
} else {
logger.Info("Docker event monitoring started successfully")
}
}
}
// Wait for interrupt signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
@@ -1273,6 +1302,10 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
// Close clients first (including WGTester)
closeClients()
if dockerEventMonitor != nil {
dockerEventMonitor.Stop()
}
if healthMonitor != nil {
healthMonitor.Stop()
}