From d2f987e77a49524e99a3faf7dd96c705e182cdd8 Mon Sep 17 00:00:00 2001 From: gunli Date: Fri, 7 Mar 2025 12:37:12 +0800 Subject: [PATCH 1/4] rebase to master to merge #1343 --- pulsar/internal/connection.go | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index a04fa2a270..57fc724190 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -411,26 +411,16 @@ func (c *connection) run() { c.log.Debugf("Connection run starting with request capacity=%d queued=%d", cap(c.incomingRequestsCh), len(c.incomingRequestsCh)) - go func() { - for { - select { - case <-c.closeCh: - c.failLeftRequestsWhenClose() - return - - case req := <-c.incomingRequestsCh: - if req == nil { - return // TODO: this never gonna be happen - } - c.internalSendRequest(req) - } - } - }() - for { select { case <-c.closeCh: + c.failLeftRequestsWhenClose() return + case req := <-c.incomingRequestsCh: + if req == nil { + return // TODO: this never gonna be happen + } + c.internalSendRequest(req) case data := <-c.writeRequestsCh: if data == nil { return From ed211807f801824aca19e200392cfe3fc00ced17 Mon Sep 17 00:00:00 2001 From: gunli Date: Fri, 7 Mar 2025 13:16:57 +0800 Subject: [PATCH 2/4] make consumer is not nil --- pulsar/consumer_test.go | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index edef44045d..4266c9863d 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -44,14 +44,15 @@ import ( "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" - "github.com/apache/pulsar-client-go/pulsar/crypto" - "github.com/apache/pulsar-client-go/pulsar/internal" - pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" - plog "github.com/apache/pulsar-client-go/pulsar/log" "github.com/google/uuid" "github.com/pierrec/lz4/v4" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/proto" + + "github.com/apache/pulsar-client-go/pulsar/crypto" + "github.com/apache/pulsar-client-go/pulsar/internal" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" + plog "github.com/apache/pulsar-client-go/pulsar/log" ) var ( @@ -564,17 +565,22 @@ func TestPartitionTopicsConsumerPubSubEncryption(t *testing.T) { assert.Equal(t, topic+"-partition-1", topics[1]) assert.Equal(t, topic+"-partition-2", topics[2]) - consumer, err := client.Subscribe(ConsumerOptions{ - Topic: topic, - SubscriptionName: "my-sub", - Type: Exclusive, - ReceiverQueueSize: 10, - Decryption: &MessageDecryptionInfo{ - KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", - "crypto/testdata/pri_key_rsa.pem"), - ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionFail, - }, - }) + var consumer Consumer + // create consumer, make sure it's not nil + require.Eventually(t, func() bool { + consumer, err = client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Exclusive, + ReceiverQueueSize: 10, + Decryption: &MessageDecryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionFail, + }, + }) + return err == nil + }, 15*time.Second, 1*time.Second) assert.Nil(t, err) defer consumer.Close() From 281984678d50fcfd5708f1849522f5779f68c6dd Mon Sep 17 00:00:00 2001 From: gunli Date: Fri, 7 Mar 2025 13:38:56 +0800 Subject: [PATCH 3/4] revert test case --- pulsar/consumer_test.go | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 4266c9863d..bc9d4fecd3 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -565,22 +565,17 @@ func TestPartitionTopicsConsumerPubSubEncryption(t *testing.T) { assert.Equal(t, topic+"-partition-1", topics[1]) assert.Equal(t, topic+"-partition-2", topics[2]) - var consumer Consumer - // create consumer, make sure it's not nil - require.Eventually(t, func() bool { - consumer, err = client.Subscribe(ConsumerOptions{ - Topic: topic, - SubscriptionName: "my-sub", - Type: Exclusive, - ReceiverQueueSize: 10, - Decryption: &MessageDecryptionInfo{ - KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", - "crypto/testdata/pri_key_rsa.pem"), - ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionFail, - }, - }) - return err == nil - }, 15*time.Second, 1*time.Second) + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Exclusive, + ReceiverQueueSize: 10, + Decryption: &MessageDecryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionFail, + }, + }) assert.Nil(t, err) defer consumer.Close() From 276d4618de25c33621a3fed54a17caf67d5d3790 Mon Sep 17 00:00:00 2001 From: gunli Date: Fri, 7 Mar 2025 13:41:04 +0800 Subject: [PATCH 4/4] revert test case --- pulsar/consumer_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index bc9d4fecd3..edef44045d 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -44,15 +44,14 @@ import ( "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" - "github.com/google/uuid" - "github.com/pierrec/lz4/v4" - "github.com/stretchr/testify/assert" - "google.golang.org/protobuf/proto" - "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/apache/pulsar-client-go/pulsar/internal" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" plog "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/google/uuid" + "github.com/pierrec/lz4/v4" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" ) var (