关于负载均衡策略的快速介绍。使用 Golang IBM/sarama 在 Kafka 主题上消费新添加的分区中的事件。
译自Do not miss messages when Kafka Topic Partitioned,作者 Emre Savcı。
简介
在事件驱动通信时代,Kafka是事实上的标准消息代理之一,它具有主题和消费者组的概念。
在Kafka中,一个主题可以有多个分区,因此可以通过这种方式提高消息处理的并行性。这使我们能够将消费者应用程序扩展到多个实例。
使用Kafka时,可能会向主题添加新的分区。如果配置不正确,消费者可能会错过新分区中的消息,因此进行适当的设置非常重要。
在本文中,我将向您展示如何在本地运行Kafka代理,然后配置消费者以从主题消费消息。在消费主题的同时,我们将创建新的分区,并观察我们的消费者如何自动接收来自新分区的消息。
我们将使用Golang作为编程语言,并使用IBM/sarama作为Kafka库。
https://kajalrawal.medium.com/kafka-broker-kafka-topic-consumer-and-record-flow-in-kafka-ec55104977b8
在Docker中运行Kafka
为了测试我们的消费者和生产者,我们将使用Docker快速启动一个Kafka代理。将以下docker compose配置复制并粘贴到docker-compose.yml
中:
version: '2' services: zookeeper-1: image: confluentinc/cp-zookeeper:7.4.4 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22181:2181 kafka-1: image: confluentinc/cp-kafka:7.4.4 depends_on: - zookeeper-1 ports: - 29092:29092 - 9092:9092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092,PLAINTEXT_HOST://127.0.0.1:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
运行容器:
> docker-compose up -d WARN[0000] /Users/emre.savci/Desktop/my-projects/kafka/docker-compose.yml: the attribute `version` is obsolete, it will be ignored, please remove it to avoid potential confusion WARN[0000] Found orphan containers ([kafka-kafka-2-1 kafka-zookeeper-2-1]) for this project. If you removed or renamed this service in your compose file, you can run this command with the --remove-orphans flag to clean it up. [+] Running 2/2 ✔ Container kafka-zookeeper-1-1 Started 0.3s ✔ Container kafka-kafka-1-1 Started
负载均衡策略
为了处理分区,我们可以根据用例选择不同的负载均衡策略。
我将使用Sarama的源代码注释来解释负载均衡策略。
轮询
此策略返回轮询负载均衡策略,该策略以交替顺序将分区分配给成员。
例如,有两个主题(t0,t1)和两个消费者(m0,m1),每个主题有三个分区(p0,p1,p2):
M0: [t0p0, t0p2, t1p1] M1: [t0p1, t1p0, t1p2]
范围
此策略返回范围负载均衡策略,这是默认策略,它将分区作为范围分配给消费者组成员。
这遵循与https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html相同的逻辑。
两个主题T1和T2,每个主题有六个分区(0..5)和两个成员(M1,M2)的示例:
M1: {T1: [0, 1, 2], T2: [0, 1, 2]} M2: {T1: [3, 4, 5], T2: [3, 4, 5]}
粘性
此策略返回粘性负载均衡策略,该策略尝试保留之前的分配,同时保持均衡的分区分配。
主题T有六个分区(0..5)和两个成员(M1,M2)的示例:
M1: {T: [0, 2, 4]} M2: {T: [1, 3, 5]}
在重新分配和添加额外消费者后,您可能会得到如下分配计划:
M1: {T: [0, 2]} M2: {T: [1, 3]} M3: {T: [4, 5]}
演示
示例消费者
type ExampleConsumer struct { Ready chan bool } // Setup is run before the consumer starts consuming, providing an opportunity to setup things. func (consumer *ExampleConsumer) Setup(session sarama.ConsumerGroupSession) error { // Mark the consumer as ready close(consumer.Ready) fmt.Println("Setup ", "memberid ", session.MemberID(), "sessionid ", session.GenerationID(), "claims ", session.Claims()) return nil } // Cleanup is run at the end of a session, to cleanup resources. func (consumer *ExampleConsumer) Cleanup(session sarama.ConsumerGroupSession) error { fmt.Println("Cleanup ", "memberid ", session.MemberID(), "sessionid ", session.GenerationID(), "claims ", session.Claims()) return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (consumer *ExampleConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { fmt.Println("ConsumeClaim ", "memberid ", session.MemberID(), "sessionid ", session.GenerationID(), "claims ", session.Claims()) for message := range claim.Messages() { fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s, partition = %d\n", string(message.Value), message.Timestamp, message.Topic, message.Partition) session.MarkMessage(message, "") } return nil }
配置和运行
现在让我们配置并运行消费者:
package main import ( "context" "fmt" "log" "time" "github.com/IBM/sarama" ) func main() { brokers := []string{"localhost:9092"} topic := "example-topic" groupID := "example-group" config := sarama.NewConfig() config.Version = sarama.V3_3_0_0 // Set Kafka version config.Consumer.Group.Heartbeat.Interval = 1 * time.Second config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Metadata.RefreshFrequency = time.Minute * 1 config.Metadata.Full = false // Set up group strategies config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{ sarama.NewBalanceStrategyRoundRobin(), } // Create a new consumer group consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config) if err != nil { log.Fatalf("Error creating consumer group: %v", err) } defer consumerGroup.Close() ctx := context.Background() consumer := &ExampleConsumer{Ready: make(chan bool)} // Start consumer group session go func() { for { if err := consumerGroup.Consume(ctx, []string{topic}, consumer); err != nil { log.Fatalf("Error in consumer group: %v", err) } // Check if context was canceled, signaling the application to stop if ctx.Err() != nil { fmt.Println("Context was canceled") return } // Reset consumer ready channel consumer.Ready = make(chan bool) } }() // Wait for consumer to be ready <-consumer.Ready log.Println("Consumer group is ready") <-make(chan struct{}) }
请注意,我们选择轮询作为我们的平衡策略。
生产者代码
我们将从生产者开始,自动将消息发送到主题中的每个分区。
package main import ( "fmt" "time" "github.com/IBM/sarama" ) func main() { brokers := []string{"localhost:9092"} topic := "example-topic" config := sarama.NewConfig() config.Version = sarama.V3_3_0_0 // Set Kafka version config.Metadata.RefreshFrequency = 1 * time.Minute config.Producer.Return.Successes = true config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 3 config.Producer.Partitioner = sarama.NewRoundRobinPartitioner producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { fmt.Println("Error creating producer: ", err) return } defer producer.Close() for { partition, _, err := producer.SendMessage(&sarama.ProducerMessage{ Topic: topic, Value: sarama.StringEncoder("Hello, World!"), }) if err != nil { fmt.Println("Error producing message: ", err) } else { fmt.Println("Produced message to partition ", partition) } time.Sleep(2 * time.Second) } }
现在我们可以看到我们的消费者在运行了。
整体运行
我将启动前面提供的消费者和生产者代码。等待几秒钟后,我们将向Kafka主题添加第二个分区,然后是第三个分区。下面将提供消费者日志,以演示消费者如何处理这些更改。
运行代码
.../consumer > go run main.go .../producer > go run main.go
创建分区
让我们进入我们的Kafka容器,并在我们的消费者和生产者运行时运行以下命令:
> docker exec -it <container-id> /bin/sh > ./bin/kafka-topics --bootstrap-server localhost:9092 \ --alter --topic example-topic \ --partitions 2
观察日志,您将看到消费者开始监听新的分区。几分钟后,让我们创建另一个分区:
> ./bin/kafka-topics --bootstrap-server localhost:9092 \ --alter --topic example-topic \ --partitions 3
同样,我们的消费者能够消费新的分区。
以下是消费者日志:
Setup memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 1 claims map[example-topic:[0]] 2024/12/13 22:25:21 Consumer group is ready ConsumeClaim memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 1 claims map[example-topic:[0]] Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Cleanup memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 1 claims map[example-topic:[0]] Setup memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 2 claims map[example-topic:[0 1]] ConsumeClaim memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 2 claims map[example-topic:[0 1]] ConsumeClaim memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 2 claims map[example-topic:[0 1]] Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Cleanup memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 2 claims map[example-topic:[0 1]] Setup memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 3 claims map[example-topic:[0 1 2]] ConsumeClaim memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 3 claims map[example-topic:[0 1 2]] ConsumeClaim memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 3 claims map[example-topic:[0 1 2]] ConsumeClaim memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 3 claims map[example-topic:[0 1 2]] Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 2 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 2 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1
我们可以很容易地看到我们的消费者被告知新的分区,并开始从这些分区读取消息。
结论
✅ 为您的消费者组配置添加合适的负载均衡策略
✅ 在配置中设置可接受的元数据刷新频率
✅ 根据您的需要选择最早或最新的初始偏移量
感谢您的阅读,希望您觉得这篇文章引人入胜。