Exiting the middle device works now?

Former-commit-id: d76b3c366f
This commit is contained in:
Owen
2025-12-31 11:33:00 -05:00
parent f08b17c7bd
commit aeb908b68c

View File

@@ -163,6 +163,13 @@ func (d *MiddleDevice) pump(dev *closeAwareDevice) {
batchSize := dev.BatchSize()
logger.Debug("MiddleDevice: pump started for device")
// Recover from panic if readCh is closed while we're trying to send
defer func() {
if r := recover(); r != nil {
logger.Debug("MiddleDevice: pump recovered from panic (channel closed)")
}
}()
for {
// Check if this device is closed
if dev.IsClosed() {
@@ -197,7 +204,12 @@ func (d *MiddleDevice) pump(dev *closeAwareDevice) {
return
}
// Try to send the result
// Try to send the result - check closed state first to avoid sending on closed channel
if d.closed.Load() {
logger.Debug("MiddleDevice: pump exiting, device closed before send")
return
}
select {
case d.readCh <- readResult{bufs: bufs, sizes: sizes, offset: defaultOffset, n: n, err: err}:
default:
@@ -225,6 +237,13 @@ func (d *MiddleDevice) InjectOutbound(packet []byte) {
if d.closed.Load() {
return
}
// Use defer/recover to handle panic from sending on closed channel
// This can happen during shutdown race conditions
defer func() {
if r := recover(); r != nil {
logger.Debug("MiddleDevice: InjectOutbound recovered from panic (channel closed)")
}
}()
select {
case d.injectCh <- packet:
default:
@@ -268,6 +287,8 @@ func (d *MiddleDevice) Close() error {
d.cond.Broadcast()
d.mu.Unlock()
// Close underlying devices first - this causes the pump goroutines to exit
// when their read operations return errors
var lastErr error
logger.Debug("MiddleDevice: Closing %d devices", len(devices))
for _, device := range devices {
@@ -277,7 +298,12 @@ func (d *MiddleDevice) Close() error {
}
}
// Now close channels to unblock any remaining readers
// The pump should have exited by now, but close channels to be safe
close(d.readCh)
close(d.injectCh)
close(d.events)
return lastErr
}
@@ -416,7 +442,11 @@ func (d *MiddleDevice) Read(bufs [][]byte, sizes []int, offset int) (n int, err
// Now block waiting for data from readCh or injectCh
select {
case res := <-d.readCh:
case res, ok := <-d.readCh:
if !ok {
// Channel closed, device is shutting down
return 0, io.EOF
}
if res.err != nil {
// Check if device was swapped
if dev.IsClosed() {
@@ -446,7 +476,11 @@ func (d *MiddleDevice) Read(bufs [][]byte, sizes []int, offset int) (n int, err
}
n = count
case pkt := <-d.injectCh:
case pkt, ok := <-d.injectCh:
if !ok {
// Channel closed, device is shutting down
return 0, io.EOF
}
if len(bufs) == 0 {
return 0, nil
}