This commit is contained in:
pascal
2026-01-16 12:01:52 +01:00
parent 3b832d1f21
commit 183619d1e1
20 changed files with 34 additions and 525 deletions

View File

@@ -77,7 +77,6 @@ func (s *Server) Start() error {
return fmt.Errorf("failed to listen: %w", err)
}
// Configure gRPC server with keepalive
s.grpcServer = grpc.NewServer(
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 30 * time.Second,
@@ -114,7 +113,6 @@ func (s *Server) Stop(ctx context.Context) error {
log.Info("Stopping gRPC server...")
// Cancel all active streams
s.mu.Lock()
for _, streamCtx := range s.streams {
streamCtx.cancel()
@@ -123,7 +121,6 @@ func (s *Server) Stop(ctx context.Context) error {
s.streams = make(map[string]*StreamContext)
s.mu.Unlock()
// Graceful stop with timeout
stopped := make(chan struct{})
go func() {
s.grpcServer.GracefulStop()
@@ -154,7 +151,6 @@ func (s *Server) Stream(stream pb.ProxyService_StreamServer) error {
controlID := fmt.Sprintf("control-%d", time.Now().Unix())
// Create stream context
streamCtx := &StreamContext{
stream: stream,
sendChan: make(chan *pb.ProxyMessage, 100),
@@ -163,22 +159,18 @@ func (s *Server) Stream(stream pb.ProxyService_StreamServer) error {
controlID: controlID,
}
// Register stream
s.mu.Lock()
s.streams[controlID] = streamCtx
s.mu.Unlock()
log.Infof("Control service connected: %s", controlID)
// Start goroutine to send ProxyMessages to control service
sendDone := make(chan error, 1)
go s.sendLoop(streamCtx, sendDone)
// Start goroutine to receive ControlMessages from control service
recvDone := make(chan error, 1)
go s.receiveLoop(streamCtx, recvDone)
// Wait for either send or receive to complete
select {
case err := <-sendDone:
log.Infof("Control service %s send loop ended: %v", controlID, err)
@@ -202,7 +194,6 @@ func (s *Server) sendLoop(streamCtx *StreamContext, done chan<- error) {
return
}
// Send ProxyMessage to control service
if err := streamCtx.stream.Send(msg); err != nil {
log.Errorf("Failed to send message to control service: %v", err)
done <- err
@@ -219,7 +210,6 @@ func (s *Server) sendLoop(streamCtx *StreamContext, done chan<- error) {
// receiveLoop handles receiving ControlMessages from the control service
func (s *Server) receiveLoop(streamCtx *StreamContext, done chan<- error) {
for {
// Receive ControlMessage from control service (client)
controlMsg, err := streamCtx.stream.Recv()
if err != nil {
log.Debugf("Stream receive error: %v", err)
@@ -227,7 +217,6 @@ func (s *Server) receiveLoop(streamCtx *StreamContext, done chan<- error) {
return
}
// Handle different ControlMessage types
switch m := controlMsg.Message.(type) {
case *pb.ControlMessage_Event:
if s.handler != nil {
@@ -271,7 +260,6 @@ func (s *Server) SendProxyMessage(msg *pb.ProxyMessage) {
for _, streamCtx := range s.streams {
select {
case streamCtx.sendChan <- msg:
// Message queued successfully
default:
log.Warn("Send channel full, dropping message")
}