diff --git a/kazoo.go b/kazoo.go index 4465b45..18c18e8 100644 --- a/kazoo.go +++ b/kazoo.go @@ -52,6 +52,9 @@ type Config struct { // The chroot the Kafka installation is registerde under. Defaults to "". Chroot string + // The protocol (PLAINTEXT, EXTERNAL, ...). Defaults to "". + Protocol string + // The amount of time the Zookeeper client can be disconnected from the Zookeeper cluster // before the cluster will get rid of watches and ephemeral nodes. Defaults to 1 second. Timeout time.Duration @@ -63,6 +66,7 @@ type Config struct { // NewConfig instantiates a new Config struct with sane defaults. func NewConfig() *Config { return &Config{ + Protocol: "", Timeout: 1 * time.Second, Logger: zk.DefaultLogger, } @@ -108,11 +112,17 @@ func (kz *Kazoo) Brokers() (map[int32]string, error) { } type brokerEntry struct { - Host string `json:"host"` - Port int `json:"port"` + ListenerSecurityProtocolMap map[string]string `json:"listener_security_protocol_map"` + Endpoints []string `json:"endpoints"` + JmxPort int `json:"jmx_port"` + Host string `json:"host"` + Timestamp string `json:"timestamp"` + Port int `json:"port"` + Version int `json:"version"` } result := make(map[int32]string) + children: for _, child := range children { brokerID, err := strconv.ParseInt(child, 10, 32) if err != nil { @@ -129,6 +139,16 @@ func (kz *Kazoo) Brokers() (map[int32]string, error) { return nil, err } + if kz.conf.Protocol != "" { + protocol := strings.ToUpper(kz.conf.Protocol) + for _, endpoint := range brokerNode.Endpoints { + if strings.Index(endpoint, protocol) == 0 { + result[int32(brokerID)] = strings.Replace(endpoint, protocol+"://", "", 1) + continue children + } + } + } + result[int32(brokerID)] = fmt.Sprintf("%s:%d", brokerNode.Host, brokerNode.Port) }