| 主要特点
高吞吐量和低延迟:Kafka能够处理非常高的消息吞吐量,适用于大规模数据处理和实时数据流。同时,它具有较低的消息传递延迟,能够提供快速的消息传递服务。
可扩展性:Kafka可以水平扩展,通过增加更多的节点来扩展处理能力和存储容量,保证系统的可靠性和性能。
持久性和可靠性:Kafka使用磁盘存储消息,确保消息的持久性和可靠性。同时,它支持消息的批量处理,并通过副本机制保证消息的可靠性,即使某些节点发生故障,也不会丢失消息。
分区和并发:Kafka的消息被分成多个分区,每个分区可以在不同的服务器上进行写入和读取,提高了并发性能。这有助于处理大量数据并满足各种需求场景,如基于Hadoop的批处理系统、低延迟的实时系统、流式处理引擎等。
支持流处理:Kafka提供了强大的流处理功能,可以进行实时数据处理、转换和分析。
| 应用场景
日志收集:Kafka可以收集各种服务的日志,并通过统一接口服务的方式开放给各种消费者,如Hadoop、HBase、Solr等。
消息系统:Kafka可以作为消息队列使用,实现生产者和消费者之间的解耦,并缓存消息等。
用户活动跟踪:Kafka经常被用来记录Web用户或APP用户的各种活动,如浏览网页、搜索、点击等。这些活动信息被各个服务器发布到Kafka的主题中,然后订阅者通过订阅这些主题来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘。
运营指标:Kafka也用来记录运营监控数据,包括收集各种分布式应用的数据,生产各种操作的集中反馈,如报警和报告。
数据集成和数据管道:Kafka可以用作数据集成和数据管道的中间件,在不同系统之间传递数据,实现数据的异步传输和解耦。
| 关键术语
Producer:消息生产者,即消息的源头,负责生成消息并发送到Kafka服务器。
Consumer:消息消费者,即消息的使用方,负责从Kafka服务器上消费消息。
Topic:主题,由用户定义并配置在Kafka服务器上,用于建立生产者和消费者之间的订阅关系。生产者发送消息到指定的主题下,消费者从这个主题下消费消息。
Partition:分区,一个主题下会分为多个分区,每个分区是一个有序的队列。分区中的每条消息都会被分配一个有序的ID(Offset)。
Broker:Kafka的服务器,用于存储消息。Kafka集群中的一台或多台服务器统称为Broker。
Group:消费者分组,用于归组同类消费者。在Kafka中,多个消费者可以共同消费一个主题下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,也称为消费者集群。
如需了解更多信息,可以访问其官方网站或查阅相关的技术文档。
Docker镜像 https://github.com/apache/kafka
GitHub地址 https://github.com/apache/kafka
安装&使用
关于Kafka的Docker镜像有很多,大多数都是旧的搭建模式:Zookeeper + kafka,新版kafka已经简化去掉Zookeeper,采用Apache Kafka Raft模式,在部署上更为方便。
秉承使用官方版本为原则,我们使用官方的Docker镜像进行部署,虽然官方文档有部署教程,但部署下来发现无法支持外网访问,在服务器内使用是正常的。
经过一番折腾排查才发现是配置问题,废话不多说,直接看教程吧。
Docker部署操作步骤:安装Docker、编写配置文件、启动容器。操作系统为腾讯云服务器Ubuntu。
| 安装docker
sudo apt update
sudo apt install -y docker.io docker-compose
| 编写配置文件docker-compose.yml
在home文件夹编写docker-compose.yml
version: '3'
services:
kafka-kraft:
image: apache/kafka:latest
hostname: kafka-kraft
container_name: kafka-kraft
ports:
- '9092:9092'
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://43.130.118.124:9092,PLAINTEXT://kafka-kraft:29092'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-kraft:29093'
KAFKA_LISTENERS: 'CONTROLLER://kafka-kraft:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT://kafka-kraft:29092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
在Docker镜像官方镜像有部署教程,但均不支持外网访问。如果需要支持外网访问,配置属性PLAINTEXT_HOST必须以公网IP:端口形式表示,不能使用0.0.0.0:9092或//:9092,否则都会无法支持外网访问。
| 启动容器
sudo docker-compose up -d
| 外网访问
使用Python连接kafka实现生产者和消费者功能。
# 安装模块
pip install kafka-python
生产者示例代码:
import json
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
# 创建主题
try:
admin_client = KafkaAdminClient(bootstrap_servers='43.130.118.124:9092')
new_topic = NewTopic(
name='test_topic',
num_partitions=1,
replication_factor=1 # 副本因子,应小于或等于Kafka集群中的broker数量
)
# 创建 topic
admin_client.create_topics([new_topic])
except: pass
# 发送消息
producer = KafkaProducer(bootstrap_servers=['43.130.118.124:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('test_topic', 'aaa')
producer.flush()
消费者示例代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer('test_topic',
bootstrap_servers='43.130.118.124:9092')
for message in consumer:
print(message.value.decode())
首先运行消费者代码,再运行生产者代码,最后在消费者运行窗口查看数据。
更多内容也可看笔者出版图书!