diff --git a/docker/client.go b/docker/client.go index 2a42023..adcc15c 100644 --- a/docker/client.go +++ b/docker/client.go @@ -10,6 +10,7 @@ import ( "time" "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" "github.com/fosrl/newt/logger" @@ -321,3 +322,128 @@ func getHostContainer(dockerContext context.Context, dockerClient *client.Client return &hostContainer, nil } + +// EventCallback defines the function signature for handling Docker events +type EventCallback func(containers []Container) + +// EventMonitor handles Docker event monitoring +type EventMonitor struct { + client *client.Client + ctx context.Context + cancel context.CancelFunc + callback EventCallback + socketPath string + enforceNetworkValidation bool +} + +// NewEventMonitor creates a new Docker event monitor +func NewEventMonitor(socketPath string, enforceNetworkValidation bool, callback EventCallback) (*EventMonitor, error) { + if socketPath == "" { + socketPath = "unix:///var/run/docker.sock" + } + + if !strings.Contains(socketPath, "://") { + socketPath = "unix://" + socketPath + } + + cli, err := client.NewClientWithOpts( + client.WithHost(socketPath), + client.WithAPIVersionNegotiation(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create Docker client: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + return &EventMonitor{ + client: cli, + ctx: ctx, + cancel: cancel, + callback: callback, + socketPath: socketPath, + enforceNetworkValidation: enforceNetworkValidation, + }, nil +} + +// Start begins monitoring Docker events +func (em *EventMonitor) Start() error { + logger.Info("Starting Docker event monitoring") + + // Filter for container events we care about + eventFilters := filters.NewArgs() + eventFilters.Add("type", "container") + eventFilters.Add("event", "create") + eventFilters.Add("event", "start") + eventFilters.Add("event", "stop") + eventFilters.Add("event", "destroy") + eventFilters.Add("event", "die") + eventFilters.Add("event", "pause") + eventFilters.Add("event", "unpause") + + // Start listening for events + eventCh, errCh := em.client.Events(em.ctx, events.ListOptions{ + Filters: eventFilters, + }) + + go func() { + defer func() { + if err := em.client.Close(); err != nil { + logger.Error("Error closing Docker client: %v", err) + } + }() + + for { + select { + case event := <-eventCh: + logger.Debug("Docker event received: %s %s for container %s", event.Action, event.Type, event.Actor.ID[:12]) + + // Fetch updated container list and trigger callback + go em.handleEvent(event) + + case err := <-errCh: + if err != nil && err != context.Canceled { + logger.Error("Docker event stream error: %v", err) + // Try to reconnect after a brief delay + time.Sleep(5 * time.Second) + if em.ctx.Err() == nil { + logger.Info("Attempting to reconnect to Docker event stream") + eventCh, errCh = em.client.Events(em.ctx, events.ListOptions{ + Filters: eventFilters, + }) + } + } + return + + case <-em.ctx.Done(): + logger.Info("Docker event monitoring stopped") + return + } + } + }() + + return nil +} + +// handleEvent processes a Docker event and triggers the callback with updated container list +func (em *EventMonitor) handleEvent(event events.Message) { + // Add a small delay to ensure Docker has fully processed the event + time.Sleep(100 * time.Millisecond) + + containers, err := ListContainers(em.socketPath, em.enforceNetworkValidation) + if err != nil { + logger.Error("Failed to list containers after Docker event %s: %v", event.Action, err) + return + } + + logger.Debug("Triggering callback with %d containers after Docker event %s", len(containers), event.Action) + em.callback(containers) +} + +// Stop stops the event monitoring +func (em *EventMonitor) Stop() { + logger.Info("Stopping Docker event monitoring") + if em.cancel != nil { + em.cancel() + } +} diff --git a/main.go b/main.go index 33b55a5..54c4985 100644 --- a/main.go +++ b/main.go @@ -413,6 +413,7 @@ func main() { var pm *proxy.ProxyManager var connected bool var wgData WgData + var dockerEventMonitor *docker.EventMonitor if acceptClients { setupClients(client) @@ -1265,6 +1266,34 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub } defer client.Close() + // Initialize Docker event monitoring if Docker socket is available and monitoring is enabled + if dockerSocket != "" { + logger.Info("Initializing Docker event monitoring") + dockerEventMonitor, err = docker.NewEventMonitor(dockerSocket, dockerEnforceNetworkValidationBool, func(containers []docker.Container) { + // Send updated container list via websocket when Docker events occur + logger.Debug("Docker event detected, sending updated container list (%d containers)", len(containers)) + err := client.SendMessage("newt/socket/containers", map[string]interface{}{ + "containers": containers, + }) + if err != nil { + logger.Error("Failed to send updated container list after Docker event: %v", err) + } else { + logger.Debug("Updated container list sent successfully") + } + }) + + if err != nil { + logger.Error("Failed to create Docker event monitor: %v", err) + } else { + err = dockerEventMonitor.Start() + if err != nil { + logger.Error("Failed to start Docker event monitoring: %v", err) + } else { + logger.Info("Docker event monitoring started successfully") + } + } + } + // Wait for interrupt signal sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) @@ -1273,6 +1302,10 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub // Close clients first (including WGTester) closeClients() + if dockerEventMonitor != nil { + dockerEventMonitor.Stop() + } + if healthMonitor != nil { healthMonitor.Stop() }