关于负载均衡策略的快速介绍。使用 Golang IBM/sarama 在 Kafka 主题上消费新添加的分区中的事件。
译自Do not miss messages when Kafka Topic Partitioned,作者 Emre Savcı。
为了测试我们的消费者和生产者,我们将使用Docker快速启动一个Kafka代理。将以下docker compose配置复制并粘贴到docker-compose.yml
version: '2' services: zookeeper-1: image: confluentinc/cp-zookeeper:7.4.4 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22181:2181 kafka-1: image: confluentinc/cp-kafka:7.4.4 depends_on: - zookeeper-1 ports: - 29092:29092 - 9092:9092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://,PLAINTEXT_HOST:// KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
> docker-compose up -d WARN[0000] /Users/emre.savci/Desktop/my-projects/kafka/docker-compose.yml: the attribute `version` is obsolete, it will be ignored, please remove it to avoid potential confusion WARN[0000] Found orphan containers ([kafka-kafka-2-1 kafka-zookeeper-2-1]) for this project. If you removed or renamed this service in your compose file, you can run this command with the --remove-orphans flag to clean it up. [+] Running 2/2 ✔ Container kafka-zookeeper-1-1 Started 0.3s ✔ Container kafka-kafka-1-1 Started
M0: [t0p0, t0p2, t1p1] M1: [t0p1, t1p0, t1p2]
M1: {T1: [0, 1, 2], T2: [0, 1, 2]} M2: {T1: [3, 4, 5], T2: [3, 4, 5]}
M1: {T: [0, 2, 4]} M2: {T: [1, 3, 5]}
M1: {T: [0, 2]} M2: {T: [1, 3]} M3: {T: [4, 5]}
type ExampleConsumer struct { Ready chan bool } // Setup is run before the consumer starts consuming, providing an opportunity to setup things. func (consumer *ExampleConsumer) Setup(session sarama.ConsumerGroupSession) error { // Mark the consumer as ready close(consumer.Ready) fmt.Println("Setup ", "memberid ", session.MemberID(), "sessionid ", session.GenerationID(), "claims ", session.Claims()) return nil } // Cleanup is run at the end of a session, to cleanup resources. func (consumer *ExampleConsumer) Cleanup(session sarama.ConsumerGroupSession) error { fmt.Println("Cleanup ", "memberid ", session.MemberID(), "sessionid ", session.GenerationID(), "claims ", session.Claims()) return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (consumer *ExampleConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { fmt.Println("ConsumeClaim ", "memberid ", session.MemberID(), "sessionid ", session.GenerationID(), "claims ", session.Claims()) for message := range claim.Messages() { fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s, partition = %d\n", string(message.Value), message.Timestamp, message.Topic, message.Partition) session.MarkMessage(message, "") } return nil }
package main import ( "context" "fmt" "log" "time" "github.com/IBM/sarama" ) func main() { brokers := []string{"localhost:9092"} topic := "example-topic" groupID := "example-group" config := sarama.NewConfig() config.Version = sarama.V3_3_0_0 // Set Kafka version config.Consumer.Group.Heartbeat.Interval = 1 * time.Second config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Metadata.RefreshFrequency = time.Minute * 1 config.Metadata.Full = false // Set up group strategies config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{ sarama.NewBalanceStrategyRoundRobin(), } // Create a new consumer group consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config) if err != nil { log.Fatalf("Error creating consumer group: %v", err) } defer consumerGroup.Close() ctx := context.Background() consumer := &ExampleConsumer{Ready: make(chan bool)} // Start consumer group session go func() { for { if err := consumerGroup.Consume(ctx, []string{topic}, consumer); err != nil { log.Fatalf("Error in consumer group: %v", err) } // Check if context was canceled, signaling the application to stop if ctx.Err() != nil { fmt.Println("Context was canceled") return } // Reset consumer ready channel consumer.Ready = make(chan bool) } }() // Wait for consumer to be ready <-consumer.Ready log.Println("Consumer group is ready") <-make(chan struct{}) }
package main import ( "fmt" "time" "github.com/IBM/sarama" ) func main() { brokers := []string{"localhost:9092"} topic := "example-topic" config := sarama.NewConfig() config.Version = sarama.V3_3_0_0 // Set Kafka version config.Metadata.RefreshFrequency = 1 * time.Minute config.Producer.Return.Successes = true config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 3 config.Producer.Partitioner = sarama.NewRoundRobinPartitioner producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { fmt.Println("Error creating producer: ", err) return } defer producer.Close() for { partition, _, err := producer.SendMessage(&sarama.ProducerMessage{ Topic: topic, Value: sarama.StringEncoder("Hello, World!"), }) if err != nil { fmt.Println("Error producing message: ", err) } else { fmt.Println("Produced message to partition ", partition) } time.Sleep(2 * time.Second) } }
.../consumer > go run main.go .../producer > go run main.go
> docker exec -it <container-id> /bin/sh > ./bin/kafka-topics --bootstrap-server localhost:9092 \ --alter --topic example-topic \ --partitions 2
> ./bin/kafka-topics --bootstrap-server localhost:9092 \ --alter --topic example-topic \ --partitions 3
Setup memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 1 claims map[example-topic:[0]] 2024/12/13 22:25:21 Consumer group is ready ConsumeClaim memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 1 claims map[example-topic:[0]] Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Cleanup memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 1 claims map[example-topic:[0]] Setup memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 2 claims map[example-topic:[0 1]] ConsumeClaim memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 2 claims map[example-topic:[0 1]] ConsumeClaim memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 2 claims map[example-topic:[0 1]] Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Cleanup memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 2 claims map[example-topic:[0 1]] Setup memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 3 claims map[example-topic:[0 1 2]] ConsumeClaim memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 3 claims map[example-topic:[0 1 2]] ConsumeClaim memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 3 claims map[example-topic:[0 1 2]] ConsumeClaim memberid sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid 3 claims map[example-topic:[0 1 2]] Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 2 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1 Hello, World!, example-topic, partition: 2 Hello, World!, example-topic, partition: 0 Hello, World!, example-topic, partition: 1
✅ 为您的消费者组配置添加合适的负载均衡策略
✅ 在配置中设置可接受的元数据刷新频率
✅ 根据您的需要选择最早或最新的初始偏移量