diff --git a/device/middle_device.go b/device/middle_device.go index 2a5d9b9..7dfbec8 100644 --- a/device/middle_device.go +++ b/device/middle_device.go @@ -163,6 +163,13 @@ func (d *MiddleDevice) pump(dev *closeAwareDevice) { batchSize := dev.BatchSize() logger.Debug("MiddleDevice: pump started for device") + // Recover from panic if readCh is closed while we're trying to send + defer func() { + if r := recover(); r != nil { + logger.Debug("MiddleDevice: pump recovered from panic (channel closed)") + } + }() + for { // Check if this device is closed if dev.IsClosed() { @@ -197,7 +204,12 @@ func (d *MiddleDevice) pump(dev *closeAwareDevice) { return } - // Try to send the result + // Try to send the result - check closed state first to avoid sending on closed channel + if d.closed.Load() { + logger.Debug("MiddleDevice: pump exiting, device closed before send") + return + } + select { case d.readCh <- readResult{bufs: bufs, sizes: sizes, offset: defaultOffset, n: n, err: err}: default: @@ -225,6 +237,13 @@ func (d *MiddleDevice) InjectOutbound(packet []byte) { if d.closed.Load() { return } + // Use defer/recover to handle panic from sending on closed channel + // This can happen during shutdown race conditions + defer func() { + if r := recover(); r != nil { + logger.Debug("MiddleDevice: InjectOutbound recovered from panic (channel closed)") + } + }() select { case d.injectCh <- packet: default: @@ -268,6 +287,8 @@ func (d *MiddleDevice) Close() error { d.cond.Broadcast() d.mu.Unlock() + // Close underlying devices first - this causes the pump goroutines to exit + // when their read operations return errors var lastErr error logger.Debug("MiddleDevice: Closing %d devices", len(devices)) for _, device := range devices { @@ -277,7 +298,12 @@ func (d *MiddleDevice) Close() error { } } + // Now close channels to unblock any remaining readers + // The pump should have exited by now, but close channels to be safe + close(d.readCh) + close(d.injectCh) close(d.events) + return lastErr } @@ -416,7 +442,11 @@ func (d *MiddleDevice) Read(bufs [][]byte, sizes []int, offset int) (n int, err // Now block waiting for data from readCh or injectCh select { - case res := <-d.readCh: + case res, ok := <-d.readCh: + if !ok { + // Channel closed, device is shutting down + return 0, io.EOF + } if res.err != nil { // Check if device was swapped if dev.IsClosed() { @@ -446,7 +476,11 @@ func (d *MiddleDevice) Read(bufs [][]byte, sizes []int, offset int) (n int, err } n = count - case pkt := <-d.injectCh: + case pkt, ok := <-d.injectCh: + if !ok { + // Channel closed, device is shutting down + return 0, io.EOF + } if len(bufs) == 0 { return 0, nil }