代码拉取完成,页面将自动刷新
package utils
import (
"crypto/tls"
"fmt"
"io"
"net/url"
"github.com/pkg/errors"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/rancher/types/config/dialer"
kafka "github.com/segmentio/kafka-go"
)
var (
kafkaTestData = kafka.Message{Value: []byte(testMessage)}
)
type kafkaTestWrap struct {
*v3.KafkaConfig
}
func (w *kafkaTestWrap) TestReachable(dial dialer.Dialer, includeSendTestLog bool) error {
if w.SaslUsername != "" && w.SaslPassword != "" {
//TODO: Now we don't have a out of the box Kafka go client fit our request which both support sasl and could pass conn to it.
//kafka-go has a PR to support sasl, but not merge yet due to the mantainer want support Negotiation and Kerberos as well, we will add test func to sasl after the sasl in kafka-go is stable
return nil
}
if w.ZookeeperEndpoint != "" {
url, err := url.Parse(w.ZookeeperEndpoint)
if err != nil {
return errors.Wrapf(err, "couldn't parse url %s", w.ZookeeperEndpoint)
}
var tlsConfig *tls.Config
if url.Scheme == "https" {
tlsConfig, err = buildTLSConfig(w.Certificate, w.ClientCert, w.ClientKey, "", "", url.Hostname(), true)
if err != nil {
return err
}
}
conn, err := newTCPConn(dial, url.Host, tlsConfig, true)
if err != nil {
return err
}
conn.Close()
return nil
}
for _, v := range w.BrokerEndpoints {
url, err := url.Parse(v)
if err != nil {
return errors.Wrapf(err, "couldn't parse url %s", v)
}
var tlsConfig *tls.Config
if url.Scheme == "https" {
tlsConfig, err = buildTLSConfig(w.Certificate, w.ClientCert, w.ClientKey, "", "", url.Hostname(), true)
if err != nil {
return err
}
}
if includeSendTestLog {
if err := w.sendData2Kafka(url.Host, dial, tlsConfig); err != nil {
return err
}
}
}
return nil
}
func (w *kafkaTestWrap) sendData2Kafka(smartHost string, dial dialer.Dialer, tlsConfig *tls.Config) error {
leaderConn, err := w.kafkaConn(dial, tlsConfig, smartHost)
if err != nil {
return err
}
defer leaderConn.Close()
if _, err := leaderConn.WriteMessages(kafkaTestData); err != nil {
return errors.Wrap(err, "couldn't write test message to kafka")
}
return nil
}
func (w *kafkaTestWrap) kafkaConn(dial dialer.Dialer, config *tls.Config, smartHost string) (*kafka.Conn, error) {
defaultPartition := 0
conn, err := newTCPConn(dial, smartHost, config, false)
if err != nil {
return nil, err
}
kafkaConn := kafka.NewConn(conn, w.Topic, defaultPartition)
topicConf := kafka.TopicConfig{
Topic: w.Topic,
NumPartitions: 1,
ReplicationFactor: 1,
}
if err := kafkaConn.CreateTopics(topicConf); err != nil {
kafkaConn.Close()
return nil, errors.Wrapf(wrapErrEOF(err), "couldn't create kafka topic %s", w.Topic)
}
partitions, err := kafkaConn.ReadPartitions(w.Topic)
if err != nil {
kafkaConn.Close()
return nil, errors.Wrap(wrapErrEOF(err), "couldn't read kafka partitions")
}
var leader kafka.Broker
for _, v := range partitions {
if v.ID == defaultPartition {
leader = v.Leader
}
}
leaderSmartHost := fmt.Sprintf("%s:%d", leader.Host, leader.Port)
if leaderSmartHost == smartHost {
return kafkaConn, nil
}
kafkaConn.Close()
LeaderConn, err := newTCPConn(dial, leaderSmartHost, config, false)
if err != nil {
return nil, err
}
return kafka.NewConn(LeaderConn, w.Topic, defaultPartition), nil
}
func wrapErrEOF(err error) error {
if err == io.EOF {
return errors.New("unexpected EOF, connection closed by remote server")
}
return err
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。