Docker搭建一款开源的分布式消息系统

文摘   2024-12-06 14:51   广东  
系统介绍
Kafka是一个功能强大且灵活的分布式消息系统,适用于各种数据处理和实时数据流场景。通过利用其高吞吐量、低延迟、可扩展性、持久性和可靠性等特点,可以高效地处理大量数据并满足各种业务需求。

| 主要特点

  • 高吞吐量和低延迟:Kafka能够处理非常高的消息吞吐量,适用于大规模数据处理和实时数据流。同时,它具有较低的消息传递延迟,能够提供快速的消息传递服务。

  • 可扩展性:Kafka可以水平扩展,通过增加更多的节点来扩展处理能力和存储容量,保证系统的可靠性和性能。

  • 持久性和可靠性:Kafka使用磁盘存储消息,确保消息的持久性和可靠性。同时,它支持消息的批量处理,并通过副本机制保证消息的可靠性,即使某些节点发生故障,也不会丢失消息。

  • 分区和并发:Kafka的消息被分成多个分区,每个分区可以在不同的服务器上进行写入和读取,提高了并发性能。这有助于处理大量数据并满足各种需求场景,如基于Hadoop的批处理系统、低延迟的实时系统、流式处理引擎等。

  • 支持流处理:Kafka提供了强大的流处理功能,可以进行实时数据处理、转换和分析。

| 应用场景

  • 日志收集:Kafka可以收集各种服务的日志,并通过统一接口服务的方式开放给各种消费者,如Hadoop、HBase、Solr等。

  • 消息系统:Kafka可以作为消息队列使用,实现生产者和消费者之间的解耦,并缓存消息等。

  • 用户活动跟踪:Kafka经常被用来记录Web用户或APP用户的各种活动,如浏览网页、搜索、点击等。这些活动信息被各个服务器发布到Kafka的主题中,然后订阅者通过订阅这些主题来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘。

  • 运营指标:Kafka也用来记录运营监控数据,包括收集各种分布式应用的数据,生产各种操作的集中反馈,如报警和报告。

  • 数据集成和数据管道:Kafka可以用作数据集成和数据管道的中间件,在不同系统之间传递数据,实现数据的异步传输和解耦。

| 关键术语

  • Producer:消息生产者,即消息的源头,负责生成消息并发送到Kafka服务器。

  • Consumer:消息消费者,即消息的使用方,负责从Kafka服务器上消费消息。

  • Topic:主题,由用户定义并配置在Kafka服务器上,用于建立生产者和消费者之间的订阅关系。生产者发送消息到指定的主题下,消费者从这个主题下消费消息。

  • Partition:分区,一个主题下会分为多个分区,每个分区是一个有序的队列。分区中的每条消息都会被分配一个有序的ID(Offset)。

  • Broker:Kafka的服务器,用于存储消息。Kafka集群中的一台或多台服务器统称为Broker。

  • Group:消费者分组,用于归组同类消费者。在Kafka中,多个消费者可以共同消费一个主题下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,也称为消费者集群。

如需了解更多信息,可以访问其官方网站或查阅相关的技术文档。

Docker镜像
https://github.com/apache/kafka
GitHub地址
https://github.com/apache/kafka

安装&使用

———

关于Kafka的Docker镜像有很多,大多数都是旧的搭建模式:Zookeeper + kafka,新版kafka已经简化去掉Zookeeper,采用Apache Kafka Raft模式,在部署上更为方便。

秉承使用官方版本为原则,我们使用官方的Docker镜像进行部署,虽然官方文档有部署教程,但部署下来发现无法支持外网访问,在服务器内使用是正常的。

经过一番折腾排查才发现是配置问题,废话不多说,直接看教程吧。

Docker部署操作步骤:安装Docker、编写配置文件、启动容器。操作系统为腾讯云服务器Ubuntu。

| 安装docker

sudo apt updatesudo apt install -y docker.io docker-compose

| 编写配置文件docker-compose.yml

在home文件夹编写docker-compose.yml

version: '3'services:  kafka-kraft:    image: apache/kafka:latest    hostname: kafka-kraft    container_name: kafka-kraft    ports:      - '9092:9092'    environment:      KAFKA_NODE_ID: 1      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://43.130.118.124:9092,PLAINTEXT://kafka-kraft:29092'      KAFKA_PROCESS_ROLES: 'broker,controller'      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-kraft:29093'      KAFKA_LISTENERS: 'CONTROLLER://kafka-kraft:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT://kafka-kraft:29092'      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'

在Docker镜像官方镜像有部署教程,但均不支持外网访问。如果需要支持外网访问,配置属性PLAINTEXT_HOST必须以公网IP:端口形式表示,不能使用0.0.0.0:9092或//:9092,否则都会无法支持外网访问。

| 启动容器

sudo docker-compose up -d

| 外网访问

使用Python连接kafka实现生产者和消费者功能。

# 安装模块pip install kafka-python

生产者示例代码:

import jsonfrom kafka import KafkaProducerfrom kafka.admin import KafkaAdminClient, NewTopic# 创建主题try:    admin_client = KafkaAdminClient(bootstrap_servers='43.130.118.124:9092')    new_topic = NewTopic(        name='test_topic',        num_partitions=1,        replication_factor=1  # 副本因子,应小于或等于Kafka集群中的broker数量    )    # 创建 topic    admin_client.create_topics([new_topic])except: pass# 发送消息producer = KafkaProducer(bootstrap_servers=['43.130.118.124:9092'],                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))producer.send('test_topic', 'aaa')producer.flush()

消费者示例代码:

from kafka import KafkaConsumerconsumer = KafkaConsumer('test_topic',                         bootstrap_servers='43.130.118.124:9092')for message in consumer:    print(message.value.decode())

首先运行消费者代码,再运行生产者代码,最后在消费者运行窗口查看数据。

关注公众号,为你推荐更多原创干货!

更多内容也可看笔者出版图书

—————————

幼稚猿
分享各类技术资讯和教程,出版多本IT图书《Django+Vue系统架构设计与实现》、《Golang+Vue.js商城项目实战》等
 最新文章