From ca1722ed10607ad6c6378f945930f74b0d710c9e Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Wed, 28 Jun 2023 02:08:09 +0200 Subject: [PATCH] Handle the stream sending in thread safe way --- keepalive/keep_alive.go | 1 + keepalive/monitor.go | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/keepalive/keep_alive.go b/keepalive/keep_alive.go index c9b3b62aa..b69aa53ab 100644 --- a/keepalive/keep_alive.go +++ b/keepalive/keep_alive.go @@ -44,6 +44,7 @@ func (k *KeepAlive) StreamInterceptor() grpc.StreamServerInterceptor { } m := &ioMonitor{ + sync.Mutex{}, sync.Mutex{}, stream, time.Now(), diff --git a/keepalive/monitor.go b/keepalive/monitor.go index 3fe50c6f6..493773808 100644 --- a/keepalive/monitor.go +++ b/keepalive/monitor.go @@ -8,13 +8,16 @@ import ( ) type ioMonitor struct { - mu sync.Mutex + mu sync.Mutex + streamLock sync.Mutex grpc.ServerStream lastSeen time.Time } func (l *ioMonitor) sendMsg(m interface{}) error { l.updateLastSeen() + l.streamLock.Lock() + defer l.streamLock.Unlock() return l.ServerStream.SendMsg(m) }