From 5df06bddd5d77738749724918d9420477a5484ff Mon Sep 17 00:00:00 2001 From: gunli Date: Mon, 24 Feb 2025 16:44:33 +0800 Subject: [PATCH 1/7] fix: potential data race --- pulsar/consumer_multitopic_test.go | 6 +++-- pulsar/internal/connection.go | 13 +++++++--- pulsar/producer_partition.go | 41 ++++++++++++++++++------------ 3 files changed, 38 insertions(+), 22 deletions(-) diff --git a/pulsar/consumer_multitopic_test.go b/pulsar/consumer_multitopic_test.go index cd236ecc22..30ae5ccd17 100644 --- a/pulsar/consumer_multitopic_test.go +++ b/pulsar/consumer_multitopic_test.go @@ -18,18 +18,20 @@ package pulsar import ( + "context" "errors" "fmt" "strings" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/apache/pulsar-client-go/pulsar/internal" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsaradmin" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" - "github.com/stretchr/testify/assert" ) func TestMultiTopicConsumerReceive(t *testing.T) { @@ -330,7 +332,7 @@ func (dummyConnection) SendRequestNoWait(_ *pb.BaseCommand) error { return nil } -func (dummyConnection) WriteData(_ internal.Buffer) { +func (dummyConnection) WriteData(_ context.Context, _ internal.Buffer) { } func (dummyConnection) RegisterListener(_ uint64, _ internal.ConnectionListener) error { diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 57fc724190..9c00f87e81 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -18,6 +18,7 @@ package internal import ( + "context" "crypto/tls" "crypto/x509" "errors" @@ -78,7 +79,7 @@ type ConnectionListener interface { type Connection interface { SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error)) SendRequestNoWait(req *pb.BaseCommand) error - WriteData(data Buffer) + WriteData(ctx context.Context, data Buffer) RegisterListener(id uint64, listener ConnectionListener) error UnregisterListener(id uint64) AddConsumeHandler(id uint64, handler ConsumerHandler) error @@ -450,12 +451,14 @@ func (c *connection) runPingCheck(pingCheckTicker *time.Ticker) { } } -func (c *connection) WriteData(data Buffer) { +func (c *connection) WriteData(ctx context.Context, data Buffer) { select { case c.writeRequestsCh <- data: // Channel is not full return - + case <-ctx.Done(): + c.log.Debug("Write data context cancelled") + return default: // Channel full, fallback to probe if connection is closed } @@ -465,7 +468,9 @@ func (c *connection) WriteData(data Buffer) { case c.writeRequestsCh <- data: // Successfully wrote on the channel return - + case <-ctx.Done(): + c.log.Debug("Write data context cancelled") + return case <-time.After(100 * time.Millisecond): // The channel is either: // 1. blocked, in which case we need to wait until we have space diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 448f780cfd..b9436fd548 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -394,7 +394,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error { pi.sentAt = time.Now() pi.Unlock() p.pendingQueue.Put(pi) - p._getConn().WriteData(pi.buffer) + p._getConn().WriteData(pi.ctx, pi.buffer) if pi == lastViewItem { break @@ -837,6 +837,8 @@ func (p *partitionProducer) internalSingleSend( type pendingItem struct { sync.Mutex + ctx context.Context + cancel context.CancelFunc buffer internal.Buffer sequenceID uint64 createdAt time.Time @@ -846,6 +848,21 @@ type pendingItem struct { flushCallback func(err error) } +func (i *pendingItem) done(err error) { + if i.isDone { + return + } + i.isDone = true + buffersPool.Put(i.buffer) + if i.flushCallback != nil { + i.flushCallback(err) + } + + if i.cancel != nil { + i.cancel() + } +} + func (p *partitionProducer) internalFlushCurrentBatch() { if p.batchBuilder == nil { // batch is not enabled @@ -895,14 +912,17 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, return default: now := time.Now() + ctx, cancel := context.WithCancel(context.Background()) p.pendingQueue.Put(&pendingItem{ + ctx: ctx, + cancel: cancel, createdAt: now, sentAt: now, buffer: buffer, sequenceID: sequenceID, sendRequests: callbacks, }) - p._getConn().WriteData(buffer) + p._getConn().WriteData(ctx, buffer) } } @@ -1579,14 +1599,14 @@ type sendRequest struct { uuid string chunkRecorder *chunkRecorder - /// resource management + // resource management memLimit internal.MemoryLimitController reservedMem int64 semaphore internal.Semaphore reservedSemaphore int - /// convey settable state + // convey settable state sendAsBatch bool transaction *transaction @@ -1659,7 +1679,7 @@ func (sr *sendRequest) done(msgID MessageID, err error) { } func (p *partitionProducer) blockIfQueueFull() bool { - //DisableBlockIfQueueFull == false means enable block + // DisableBlockIfQueueFull == false means enable block return !p.options.DisableBlockIfQueueFull } @@ -1732,17 +1752,6 @@ type flushRequest struct { err error } -func (i *pendingItem) done(err error) { - if i.isDone { - return - } - i.isDone = true - buffersPool.Put(i.buffer) - if i.flushCallback != nil { - i.flushCallback(err) - } -} - // _setConn sets the internal connection field of this partition producer atomically. // Note: should only be called by this partition producer when a new connection is available. func (p *partitionProducer) _setConn(conn internal.Connection) { From 6267bc3cfb6a6da7b8dbe3120e81a2ae4a99f39a Mon Sep 17 00:00:00 2001 From: gunli Date: Mon, 24 Feb 2025 17:00:11 +0800 Subject: [PATCH 2/7] stop writing if ctx is done --- pulsar/internal/connection.go | 43 +++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 9c00f87e81..67e445f92f 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -130,6 +130,11 @@ type request struct { callback func(command *pb.BaseCommand, err error) } +type dataRequest struct { + ctx context.Context + data Buffer +} + type connection struct { started int32 connectionTimeout time.Duration @@ -158,7 +163,7 @@ type connection struct { incomingRequestsCh chan *request closeCh chan struct{} readyCh chan struct{} - writeRequestsCh chan Buffer + writeRequestsCh chan *dataRequest pendingLock sync.Mutex pendingReqs map[uint64]*request @@ -210,7 +215,7 @@ func newConnection(opts connectionOptions) *connection { // partition produces writing on a single connection. In general it's // good to keep this above the number of partition producers assigned // to a single connection. - writeRequestsCh: make(chan Buffer, 256), + writeRequestsCh: make(chan *dataRequest, 256), listeners: make(map[uint64]ConnectionListener), consumerHandlers: make(map[uint64]ConsumerHandler), metrics: opts.metrics, @@ -422,11 +427,11 @@ func (c *connection) run() { return // TODO: this never gonna be happen } c.internalSendRequest(req) - case data := <-c.writeRequestsCh: - if data == nil { + case req := <-c.writeRequestsCh: + if req == nil { return } - c.internalWriteData(data) + c.internalWriteData(req.ctx, req.data) case <-pingSendTicker.C: c.sendPing() @@ -453,7 +458,7 @@ func (c *connection) runPingCheck(pingCheckTicker *time.Ticker) { func (c *connection) WriteData(ctx context.Context, data Buffer) { select { - case c.writeRequestsCh <- data: + case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}: // Channel is not full return case <-ctx.Done(): @@ -465,7 +470,7 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) { for { select { - case c.writeRequestsCh <- data: + case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}: // Successfully wrote on the channel return case <-ctx.Done(): @@ -486,11 +491,25 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) { } -func (c *connection) internalWriteData(data Buffer) { +func (c *connection) internalWriteData(ctx context.Context, data Buffer) { c.log.Debug("Write data: ", data.ReadableBytes()) - if _, err := c.cnx.Write(data.ReadableSlice()); err != nil { - c.log.WithError(err).Warn("Failed to write on connection") - c.Close() + if ctx == nil { + if _, err := c.cnx.Write(data.ReadableSlice()); err != nil { + c.log.WithError(err).Warn("Failed to write on connection") + c.Close() + } + + return + } + + select { + case <-ctx.Done(): + return + default: + if _, err := c.cnx.Write(data.ReadableSlice()); err != nil { + c.log.WithError(err).Warn("Failed to write on connection") + c.Close() + } } } @@ -515,7 +534,7 @@ func (c *connection) writeCommand(cmd *pb.BaseCommand) { } c.writeBuffer.WrittenBytes(cmdSize) - c.internalWriteData(c.writeBuffer) + c.internalWriteData(nil, c.writeBuffer) } func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) { From 8aa2ec4253323275e07bba53e5aebd25944c3751 Mon Sep 17 00:00:00 2001 From: gunli Date: Wed, 26 Feb 2025 15:10:44 +0800 Subject: [PATCH 3/7] pass a not nil context --- pulsar/internal/connection.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 67e445f92f..2cbfc81831 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -493,15 +493,6 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) { func (c *connection) internalWriteData(ctx context.Context, data Buffer) { c.log.Debug("Write data: ", data.ReadableBytes()) - if ctx == nil { - if _, err := c.cnx.Write(data.ReadableSlice()); err != nil { - c.log.WithError(err).Warn("Failed to write on connection") - c.Close() - } - - return - } - select { case <-ctx.Done(): return @@ -534,7 +525,7 @@ func (c *connection) writeCommand(cmd *pb.BaseCommand) { } c.writeBuffer.WrittenBytes(cmdSize) - c.internalWriteData(nil, c.writeBuffer) + c.internalWriteData(context.Background(), c.writeBuffer) } func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) { From d7c6e3df305ff2ca80599e2521bf23251e0790b5 Mon Sep 17 00:00:00 2001 From: gunli Date: Wed, 26 Feb 2025 16:45:56 +0800 Subject: [PATCH 4/7] check nil ctx --- pulsar/internal/connection.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 2cbfc81831..e437789986 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -493,6 +493,10 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) { func (c *connection) internalWriteData(ctx context.Context, data Buffer) { c.log.Debug("Write data: ", data.ReadableBytes()) + if ctx == nil { + return + } + select { case <-ctx.Done(): return From 4a390e71f369715aa708000b82799e85aaf5107e Mon Sep 17 00:00:00 2001 From: gunli Date: Thu, 27 Feb 2025 12:38:05 +0800 Subject: [PATCH 5/7] revert --- pulsar/internal/connection.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index e437789986..8a01b721ac 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -494,6 +494,11 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) { func (c *connection) internalWriteData(ctx context.Context, data Buffer) { c.log.Debug("Write data: ", data.ReadableBytes()) if ctx == nil { + if _, err := c.cnx.Write(data.ReadableSlice()); err != nil { + c.log.WithError(err).Warn("Failed to write on connection") + c.Close() + } + return } From d0057524c555aed39495c21ea223676a45c98413 Mon Sep 17 00:00:00 2001 From: gunli Date: Thu, 27 Feb 2025 16:37:39 +0800 Subject: [PATCH 6/7] delete ctx nil check --- pulsar/internal/connection.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 8a01b721ac..2ad1acb5fd 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -493,14 +493,6 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) { func (c *connection) internalWriteData(ctx context.Context, data Buffer) { c.log.Debug("Write data: ", data.ReadableBytes()) - if ctx == nil { - if _, err := c.cnx.Write(data.ReadableSlice()); err != nil { - c.log.WithError(err).Warn("Failed to write on connection") - c.Close() - } - - return - } select { case <-ctx.Done(): From 2e7753b6e509a27c6f9024e5073b4b41134e91ca Mon Sep 17 00:00:00 2001 From: gunli Date: Thu, 27 Feb 2025 16:50:13 +0800 Subject: [PATCH 7/7] revert pendingItem.done() to its old position --- pulsar/producer_partition.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b9436fd548..f6523124c0 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -848,21 +848,6 @@ type pendingItem struct { flushCallback func(err error) } -func (i *pendingItem) done(err error) { - if i.isDone { - return - } - i.isDone = true - buffersPool.Put(i.buffer) - if i.flushCallback != nil { - i.flushCallback(err) - } - - if i.cancel != nil { - i.cancel() - } -} - func (p *partitionProducer) internalFlushCurrentBatch() { if p.batchBuilder == nil { // batch is not enabled @@ -1752,6 +1737,21 @@ type flushRequest struct { err error } +func (i *pendingItem) done(err error) { + if i.isDone { + return + } + i.isDone = true + buffersPool.Put(i.buffer) + if i.flushCallback != nil { + i.flushCallback(err) + } + + if i.cancel != nil { + i.cancel() + } +} + // _setConn sets the internal connection field of this partition producer atomically. // Note: should only be called by this partition producer when a new connection is available. func (p *partitionProducer) _setConn(conn internal.Connection) {