Exiting the middle device works now?

This commit is contained in:
Owen
2025-12-31 11:33:00 -05:00
parent c85fcc434b
commit d76b3c366f

View File

@@ -163,6 +163,13 @@ func (d *MiddleDevice) pump(dev *closeAwareDevice) {
batchSize := dev.BatchSize() batchSize := dev.BatchSize()
logger.Debug("MiddleDevice: pump started for device") logger.Debug("MiddleDevice: pump started for device")
// Recover from panic if readCh is closed while we're trying to send
defer func() {
if r := recover(); r != nil {
logger.Debug("MiddleDevice: pump recovered from panic (channel closed)")
}
}()
for { for {
// Check if this device is closed // Check if this device is closed
if dev.IsClosed() { if dev.IsClosed() {
@@ -197,7 +204,12 @@ func (d *MiddleDevice) pump(dev *closeAwareDevice) {
return return
} }
// Try to send the result // Try to send the result - check closed state first to avoid sending on closed channel
if d.closed.Load() {
logger.Debug("MiddleDevice: pump exiting, device closed before send")
return
}
select { select {
case d.readCh <- readResult{bufs: bufs, sizes: sizes, offset: defaultOffset, n: n, err: err}: case d.readCh <- readResult{bufs: bufs, sizes: sizes, offset: defaultOffset, n: n, err: err}:
default: default:
@@ -225,6 +237,13 @@ func (d *MiddleDevice) InjectOutbound(packet []byte) {
if d.closed.Load() { if d.closed.Load() {
return return
} }
// Use defer/recover to handle panic from sending on closed channel
// This can happen during shutdown race conditions
defer func() {
if r := recover(); r != nil {
logger.Debug("MiddleDevice: InjectOutbound recovered from panic (channel closed)")
}
}()
select { select {
case d.injectCh <- packet: case d.injectCh <- packet:
default: default:
@@ -268,6 +287,8 @@ func (d *MiddleDevice) Close() error {
d.cond.Broadcast() d.cond.Broadcast()
d.mu.Unlock() d.mu.Unlock()
// Close underlying devices first - this causes the pump goroutines to exit
// when their read operations return errors
var lastErr error var lastErr error
logger.Debug("MiddleDevice: Closing %d devices", len(devices)) logger.Debug("MiddleDevice: Closing %d devices", len(devices))
for _, device := range devices { for _, device := range devices {
@@ -277,7 +298,12 @@ func (d *MiddleDevice) Close() error {
} }
} }
// Now close channels to unblock any remaining readers
// The pump should have exited by now, but close channels to be safe
close(d.readCh)
close(d.injectCh)
close(d.events) close(d.events)
return lastErr return lastErr
} }
@@ -416,7 +442,11 @@ func (d *MiddleDevice) Read(bufs [][]byte, sizes []int, offset int) (n int, err
// Now block waiting for data from readCh or injectCh // Now block waiting for data from readCh or injectCh
select { select {
case res := <-d.readCh: case res, ok := <-d.readCh:
if !ok {
// Channel closed, device is shutting down
return 0, io.EOF
}
if res.err != nil { if res.err != nil {
// Check if device was swapped // Check if device was swapped
if dev.IsClosed() { if dev.IsClosed() {
@@ -446,7 +476,11 @@ func (d *MiddleDevice) Read(bufs [][]byte, sizes []int, offset int) (n int, err
} }
n = count n = count
case pkt := <-d.injectCh: case pkt, ok := <-d.injectCh:
if !ok {
// Channel closed, device is shutting down
return 0, io.EOF
}
if len(bufs) == 0 { if len(bufs) == 0 {
return 0, nil return 0, nil
} }