go连接kafka基本操作

发布于:2025-03-13 ⋅ 阅读:(15) ⋅ 点赞:(0)

本博文源于笔者对kafka的学习,先遵循着对kafka的简单学习,然后go操作一下kafka,包括发送消息、消费消息、列出所有topic,与创建topic。内容都已经由笔者亲自测试过。

1.kafka的学习

1.1 启动kafka与zookeeper

kafka与zookeeper是相关联的

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

1.2 创建topic

bin/kafka-topics.sh --create --topic hello --bootstrap-server 主机名:9092

1.3 生产消息

bin/kafka-console-producer.sh --broker-list 主机名:9092 --topic hello

运行后可以发送多条,ctrl+c退出

1.4 消费之前的消息

bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --from-beginning --topic hello

1.5 指定偏移量消费

bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --partition 0 --offset 1 --topic hello

1.4 消费最新的信息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello

2 go操作

2.1 发送消息

// Kafka 配置
const (
	KafkaBroker = "u8sMaster:9092" // 替换为你的 Kafka Broker 地址
	KafkaTopic  = "k8s-version"          // Kafka 主题
)

func main() {
	sendMesgKafka()
}

func sendMesgKafka() {
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers:  []string{KafkaBroker},
		Topic:    KafkaTopic,
		Balancer: &kafka.LeastBytes{},
	})

	err := w.WriteMessages(context.Background(),
		kafka.Message{
			Key:   []byte("Key-A"),
			Value: []byte("one!"),
		},
		kafka.Message{
			Key:   []byte("Key-B"),
			Value: []byte("two!"),
		},
		kafka.Message{
			Key:   []byte("Key-C"),
			Value: []byte("three!"),
		},
	)

	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	if err := w.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}

	fmt.Println("Message sent successfully")

}

2.2 消费消息

// to consume messages
topic := "test"
partition := 0

conn, err := kafka.DialLeader(context.Background(), "tcp", "u8sMaster:9092", topic, partition)
if err != nil {
    log.Fatal("failed to dial leader:", err)
}

conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

b := make([]byte, 10e3) // 10KB max per message
for {
    n, err := batch.Read(b)
    if err != nil {
        break
    }
    fmt.Println(string(b[:n]))
}

if err := batch.Close(); err != nil {
    log.Fatal("failed to close batch:", err)
}

if err := conn.Close(); err != nil {
    log.Fatal("failed to close connection:", err)
}

2.3 列出所有topic

func main() {
	conn, err := kafka.Dial("tcp", "u8sMaster:9092")
	if err != nil {
	    panic(err.Error())
	}
	defer conn.Close()
	
	partitions, err := conn.ReadPartitions()
	if err != nil {
	    panic(err.Error())
	}
	
	m := map[string]struct{}{}
	
	for _, p := range partitions {
	    m[p.Topic] = struct{}{}
	}
	for k := range m {
	    fmt.Println(k)
	}
}

2.4 创建topic

func main() {
		conn, err := kafka.DialLeader(context.Background(), "tcp", "u9sMaster:9092", "topic2", 0)
		if err != nil {
		    panic(err.Error())
		}
}

精准地创建topic

func main() {
	conn, err := kafka.Dial("tcp", "u8sMaster:9092")
	if err != nil {
	    panic(err.Error())
	}
	defer conn.Close()
	controller, err := conn.Controller()
	if err != nil {
	    panic(err.Error())
	}
	var connLeader *kafka.Conn
	connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
	if err != nil {
	    panic(err.Error())
	}
	defer connLeader.Close()
}

这里省略了kafka集群的配置,未来有机会补充

参考文档

参考文档一


网站公告

今日签到

点亮在社区的每一天
去签到