From 01375ad67eb1c71c4cb7764e4fce8b80783c9116 Mon Sep 17 00:00:00 2001 From: Masih Yeganeh Date: Wed, 17 Apr 2019 18:32:26 +0430 Subject: [PATCH 1/5] Adds protocol support to brokers I needed to extract EXTERNAL://ip:port from zookeeper messages. It can now do that --- kazoo.go | 91 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 89 insertions(+), 2 deletions(-) diff --git a/kazoo.go b/kazoo.go index 4465b45..99d4e4d 100644 --- a/kazoo.go +++ b/kazoo.go @@ -135,6 +135,93 @@ func (kz *Kazoo) Brokers() (map[int32]string, error) { return result, nil } +// Brokers returns a map of all the brokers that make part of the +// Kafka cluster that is registered in Zookeeper and have specific protocol (PLAINTEXT, EXTERNAL, ...). +func (kz *Kazoo) SpecificBrokers(protocol string) (map[int32]string, error) { + root := fmt.Sprintf("%s/brokers/ids", kz.conf.Chroot) + children, _, err := kz.conn.Children(root) + if err != nil { + return nil, err + } + + type brokerEntry struct { + ListenerSecurityProtocolMap struct { + PLAINTEXT string `json:"PLAINTEXT"` + EXTERNAL string `json:"EXTERNAL"` + } `json:"listener_security_protocol_map"` + Endpoints []string `json:"endpoints"` + JmxPort int `json:"jmx_port"` + Host string `json:"host"` + Timestamp string `json:"timestamp"` + Port int `json:"port"` + Version int `json:"version"` + } + + result := make(map[int32]string) + children: + for _, child := range children { + brokerID, err := strconv.ParseInt(child, 10, 32) + if err != nil { + return nil, err + } + + value, _, err := kz.conn.Get(path.Join(root, child)) + if err != nil { + return nil, err + } + + var brokerNode brokerEntry + if err := json.Unmarshal(value, &brokerNode); err != nil { + return nil, err + } + + for _, endpoint := range brokerNode.Endpoints { + if strings.Index(endpoint, strings.ToUpper(protocol)) == 0 { + result[int32(brokerID)] = strings.Replace(endpoint, strings.ToUpper(protocol)+"://", "", 1) + continue children + } + } + + result[int32(brokerID)] = fmt.Sprintf("%s:%d", brokerNode.Host, brokerNode.Port) + } + + return result, nil +} + +// BrokerList returns a slice of broker addresses that can be used to connect to +// the Kafka cluster, e.g. using `sarama.NewAsyncProducer()`. +func (kz *Kazoo) SpecificBrokerList(protocol string) ([]string, error) { + brokers, err := kz.SpecificBrokers(protocol) + if err != nil { + return nil, err + } + + result := make([]string, 0, len(brokers)) + for _, broker := range brokers { + result = append(result, broker) + } + + return result, nil +} + +// BrokerIDList returns a sorted slice of broker ids that can be used for manipulating topics and partitions.`. +func (kz *Kazoo) SpecificbrokerIDList(protocol string) ([]int32, error) { + brokers, err := kz.SpecificBrokers(protocol) + if err != nil { + return nil, err + } + + result := make([]int32, 0, len(brokers)) + for id := range brokers { + result = append(result, id) + } + + // return sorted list to match the offical kafka sdks + sort.Sort(int32Slice(result)) + + return result, nil +} + // BrokerList returns a slice of broker addresses that can be used to connect to // the Kafka cluster, e.g. using `sarama.NewAsyncProducer()`. func (kz *Kazoo) BrokerList() ([]string, error) { @@ -195,9 +282,9 @@ func (kz *Kazoo) Close() error { return nil } -//////////////////////////////////////////////////////////////////////// +// ////////////////////////////////////////////////////////////////////// // Util methods -//////////////////////////////////////////////////////////////////////// +// ////////////////////////////////////////////////////////////////////// // Exists checks existence of a node func (kz *Kazoo) exists(node string) (ok bool, err error) { From 994691394225301b7e876e9fdf79b3972478fc45 Mon Sep 17 00:00:00 2001 From: Masih Yeganeh Date: Thu, 18 Apr 2019 11:42:04 +0430 Subject: [PATCH 2/5] Update kazoo.go --- kazoo.go | 87 +++++++------------------------------------------------- 1 file changed, 11 insertions(+), 76 deletions(-) diff --git a/kazoo.go b/kazoo.go index 99d4e4d..917dafd 100644 --- a/kazoo.go +++ b/kazoo.go @@ -52,6 +52,9 @@ type Config struct { // The chroot the Kafka installation is registerde under. Defaults to "". Chroot string + // The protocol (PLAINTEXT, EXTERNAL, ...). Defaults to "". + Protocol string + // The amount of time the Zookeeper client can be disconnected from the Zookeeper cluster // before the cluster will get rid of watches and ephemeral nodes. Defaults to 1 second. Timeout time.Duration @@ -63,6 +66,7 @@ type Config struct { // NewConfig instantiates a new Config struct with sane defaults. func NewConfig() *Config { return &Config{ + Protocol: "", Timeout: 1 * time.Second, Logger: zk.DefaultLogger, } @@ -107,43 +111,6 @@ func (kz *Kazoo) Brokers() (map[int32]string, error) { return nil, err } - type brokerEntry struct { - Host string `json:"host"` - Port int `json:"port"` - } - - result := make(map[int32]string) - for _, child := range children { - brokerID, err := strconv.ParseInt(child, 10, 32) - if err != nil { - return nil, err - } - - value, _, err := kz.conn.Get(path.Join(root, child)) - if err != nil { - return nil, err - } - - var brokerNode brokerEntry - if err := json.Unmarshal(value, &brokerNode); err != nil { - return nil, err - } - - result[int32(brokerID)] = fmt.Sprintf("%s:%d", brokerNode.Host, brokerNode.Port) - } - - return result, nil -} - -// Brokers returns a map of all the brokers that make part of the -// Kafka cluster that is registered in Zookeeper and have specific protocol (PLAINTEXT, EXTERNAL, ...). -func (kz *Kazoo) SpecificBrokers(protocol string) (map[int32]string, error) { - root := fmt.Sprintf("%s/brokers/ids", kz.conf.Chroot) - children, _, err := kz.conn.Children(root) - if err != nil { - return nil, err - } - type brokerEntry struct { ListenerSecurityProtocolMap struct { PLAINTEXT string `json:"PLAINTEXT"` @@ -158,7 +125,6 @@ func (kz *Kazoo) SpecificBrokers(protocol string) (map[int32]string, error) { } result := make(map[int32]string) - children: for _, child := range children { brokerID, err := strconv.ParseInt(child, 10, 32) if err != nil { @@ -175,10 +141,13 @@ func (kz *Kazoo) SpecificBrokers(protocol string) (map[int32]string, error) { return nil, err } - for _, endpoint := range brokerNode.Endpoints { - if strings.Index(endpoint, strings.ToUpper(protocol)) == 0 { - result[int32(brokerID)] = strings.Replace(endpoint, strings.ToUpper(protocol)+"://", "", 1) - continue children + if kz.conf.Protocol != "" { + protocol := strings.ToUpper(kz.conf.Protocol) + for _, endpoint := range brokerNode.Endpoints { + if strings.Index(endpoint, protocol) == 0 { + result[int32(brokerID)] = strings.Replace(endpoint, protocol+"://", "", 1) + continue children + } } } @@ -188,40 +157,6 @@ func (kz *Kazoo) SpecificBrokers(protocol string) (map[int32]string, error) { return result, nil } -// BrokerList returns a slice of broker addresses that can be used to connect to -// the Kafka cluster, e.g. using `sarama.NewAsyncProducer()`. -func (kz *Kazoo) SpecificBrokerList(protocol string) ([]string, error) { - brokers, err := kz.SpecificBrokers(protocol) - if err != nil { - return nil, err - } - - result := make([]string, 0, len(brokers)) - for _, broker := range brokers { - result = append(result, broker) - } - - return result, nil -} - -// BrokerIDList returns a sorted slice of broker ids that can be used for manipulating topics and partitions.`. -func (kz *Kazoo) SpecificbrokerIDList(protocol string) ([]int32, error) { - brokers, err := kz.SpecificBrokers(protocol) - if err != nil { - return nil, err - } - - result := make([]int32, 0, len(brokers)) - for id := range brokers { - result = append(result, id) - } - - // return sorted list to match the offical kafka sdks - sort.Sort(int32Slice(result)) - - return result, nil -} - // BrokerList returns a slice of broker addresses that can be used to connect to // the Kafka cluster, e.g. using `sarama.NewAsyncProducer()`. func (kz *Kazoo) BrokerList() ([]string, error) { From de9f0e50d029c113431887b6782f8ddf3c0a3857 Mon Sep 17 00:00:00 2001 From: Masih Yeganeh Date: Thu, 18 Apr 2019 11:44:32 +0430 Subject: [PATCH 3/5] Update kazoo.go --- kazoo.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/kazoo.go b/kazoo.go index 917dafd..5723f43 100644 --- a/kazoo.go +++ b/kazoo.go @@ -112,16 +112,13 @@ func (kz *Kazoo) Brokers() (map[int32]string, error) { } type brokerEntry struct { - ListenerSecurityProtocolMap struct { - PLAINTEXT string `json:"PLAINTEXT"` - EXTERNAL string `json:"EXTERNAL"` - } `json:"listener_security_protocol_map"` - Endpoints []string `json:"endpoints"` - JmxPort int `json:"jmx_port"` - Host string `json:"host"` - Timestamp string `json:"timestamp"` - Port int `json:"port"` - Version int `json:"version"` + ListenerSecurityProtocolMap map[string]string `json:"listener_security_protocol_map"` + Endpoints []string `json:"endpoints"` + JmxPort int `json:"jmx_port"` + Host string `json:"host"` + Timestamp string `json:"timestamp"` + Port int `json:"port"` + Version int `json:"version"` } result := make(map[int32]string) From 0aa7399a8c2ccd925b10c16e098c6397ec3ae428 Mon Sep 17 00:00:00 2001 From: Masih Yeganeh Date: Thu, 18 Apr 2019 11:46:05 +0430 Subject: [PATCH 4/5] Update kazoo.go --- kazoo.go | 1 + 1 file changed, 1 insertion(+) diff --git a/kazoo.go b/kazoo.go index 5723f43..0061306 100644 --- a/kazoo.go +++ b/kazoo.go @@ -122,6 +122,7 @@ func (kz *Kazoo) Brokers() (map[int32]string, error) { } result := make(map[int32]string) + children: for _, child := range children { brokerID, err := strconv.ParseInt(child, 10, 32) if err != nil { From 9a5b2c10a9419ebcf7791a5f786e5deafe379195 Mon Sep 17 00:00:00 2001 From: Masih Yeganeh Date: Thu, 18 Apr 2019 11:46:39 +0430 Subject: [PATCH 5/5] Update kazoo.go --- kazoo.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kazoo.go b/kazoo.go index 0061306..18c18e8 100644 --- a/kazoo.go +++ b/kazoo.go @@ -215,9 +215,9 @@ func (kz *Kazoo) Close() error { return nil } -// ////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////// // Util methods -// ////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////// // Exists checks existence of a node func (kz *Kazoo) exists(node string) (ok bool, err error) {