消息中间件在异步通信中⽤的最多,很多业务流程中,如果所有步骤都同步进⾏可能会导致核⼼流程耗时⾮常⻓,更重
要的是所有步骤都同步进⾏⼀旦⾮核⼼步骤失败会导致核⼼流程整体失败,因此在很多业务流程中Kafka就充当了异步
通信⻆⾊。
⼤规模分布式系统中的机器⾮常多⽽且分散在不同机房中,分布式系统带来的⼀个明显问题就是业务⽇志的查看、追踪
和分析等⾏为变得⼗分困难,对于集群规模在百台以上的系统,查询线上⽇志很恐怖。
为了应对这种场景统⼀⽇志系统应运⽽⽣,⽇志数据都是海量数据,通常为了不给系统带来额外负担⼀般会采⽤异步上
报,这⾥Kafka以其⾼吞吐量在⽇志处理中得到了很好的应⽤。
随着据量的增加,离线的计算会越来越慢,难以满⾜⽤户在某些场景下的实时性要求,因此很多解决⽅案中引⼊了实时
计算。
很多时候,即使是海量数据,我们也希望即时去查看⼀些数据指标,实时流计算应运⽽⽣。
实时流计算有两个特点,⼀个是实时,随时可以看数据;另⼀个是流。实时流计算有两个特点,⼀个是实时,随时可以看数据;另⼀个是流。
push模式由broker决定消息推送的速率,对于不同消费速率的consumer就不太好处理了。
消息系统都致⼒于让consumer以最⼤的速率最快速的消费消息,push模式下,当broker推送的速率远⼤于consumer消息系统都致⼒于让consumer以最⼤的速率最快速的消费消息,push模式下,当broker推送的速率远⼤于consumer
消费的速率时,consumer恐怕就要崩溃了。
例如消息发送设置了重试机制,并且异步发送,消息A和B设置相同的key,业务上A先发,B后发,由于⽹络或者其他
原因A发送失败,B发送成功;A由于发送失败就会重试且重试成功,这时候消息顺序B在前A在后,与业务发送顺序不
⼀致,如果需要解决这个问题,需要设置参数 max.in.flight.requests.per.connection=1 ,其含义是限制客户
端在单个连接上能够发送的未响应请求的个数,设置此值是1表示kafka broker在响应请求之前client不能再向同⼀个
broker发送请求,这个参数默认值是5
kafka的消息是不断追加到⽂件中的,这个特性使 kafka 可以充分利⽤磁盘的顺序读写性能
顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写
Kafka 可以配置异步刷盘,不开启同步刷盘,异步刷盘不需要等写⼊磁盘后返回消息投递的 ACK,所以它提⾼了消息发
送的吞吐量,降低了请求的延时
传统的 IO 流程,需要先把数据拷⻉到内核缓冲区,再从内核缓冲拷⻉到⽤户空间,应⽤程序处理完成以后,再拷⻉回
内核缓冲区内核缓冲区
这个过程中发⽣了多次数据拷⻉
将数据保存到内存中的 Map
(或其他数据结构)后,服务重启时这些数据不会自动保留,因为内存中的数据在进程结束时会被清除。要保留数据,可以考虑以下几种方法:
1. 持久化存储
将数据从 Map
持久化到文件或数据库中,以便在服务重启时能够恢复。
a. 使用文件存储
可以将 Map
的数据序列化并写入文件,例如使用 JSON 格式:
import java.io.*;
import java.util.HashMap;
import java.util.Map;
public class DataStorage {
private Map<String, String> dataMap = new HashMap<>();
// 保存数据到文件
public void saveToFile() throws IOException {
try (ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("data.ser"))) {
oos.writeObject(dataMap);
}
}
// 从文件加载数据
public void loadFromFile() throws IOException, ClassNotFoundException {
try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream("data.ser"))) {
dataMap = (Map<String, String>) ois.readObject();
}
}
}
// 使用 Redis 的 Java 客户端(如 Jedis)存储 Map 数据
import redis.clients.jedis.Jedis;
// 保存数据到 Redis
public void saveToRedis() {
try (Jedis jedis = new Jedis("localhost")) {
for (Map.Entry<String, String> entry : dataMap.entrySet()) {
jedis.set(entry.getKey(), entry.getValue());
}
}
}
// 从 Redis 加载数据
public void loadFromRedis() {
try (Jedis jedis = new Jedis("localhost")) {
for (String key : jedis.keys("*")) {
String value = jedis.get(key);
dataMap.put(key, value);
}
}
}
定期将 Map
中的数据快照保存到文件或数据库中,以便在服务重启时快速恢复。
在服务启动时,添加加载数据的逻辑,从持久化存储中读取数据并填充 Map
。
内存中的数据(如
Map
)在重启时不会保留。通过文件存储、数据库、或内存数据库等方法将数据持久化,可以在服务重启时恢复数据。
举例:单次拉取11条消息,每条消息耗时30s,11条消息耗时5分钟30秒,由于 max.poll.interval.ms 默认值5分
钟,所以消费者⽆法在5分钟内消费完,consumer会离开组,导致rebalance。
在消费完11条消息后,consumer会重新连接broker,再次rebalance,因为上次消费的offset未提交,再次拉取的消
息是之前消费过的消息,造成重复消费。
1、提⾼消费能⼒,提⾼单条消息的处理速度;根据实际场景可讲 max.poll.interval.ms 值设置⼤⼀点,避免不必
要的rebalance;可适当减⼩ max.poll.records 的值,默认值是500,可根据实际消息速率适当调⼩。
2、⽣成消息时,可加⼊唯⼀标识符如消息id,在消费端,保存最近的1000条消息id存⼊到redis或mysql中,消费的消
息时通过前置去重。
消费者内部根据线程数量创建等量的内存队列,对于需要顺序的⼀系列业务数据,根据key或者业务数据,放到同⼀个
内存队列中,然后线程从对应的内存队列中取出并操作
org.apache.kafka.clients.producer.ProducerInterceptor 接⼝。
该接⼝是Kafka提供的,⾥⾯有两个核⼼的⽅法。
onSend:该⽅法会在消息发送之前被调⽤。
onAcknowledgement:该⽅法会在消息成功提交或发送失败之后被调⽤。onAcknowledgement的调⽤要早于
callback的调⽤。值得注意的是,这个⽅法和onSend不是在同⼀个线程中被调⽤的,因此如果你在这两个⽅法中
调⽤了某个共享可变对象,⼀定要保证线程安全
在服务器上,进程和线程的产生通常与应用的运行模式和用户请求有关。以下是一些常见情况:
1. 进程
服务启动时:当后端服务(如 web 服务器、数据库等)启动时,操作系统会为其创建一个进程。
进程间通信:如果需要通过进程间通信(IPC)来处理任务,可能会启动新的进程。
2. 线程
多线程服务:在多线程应用中,服务会在启动时创建多个线程,准备处理并发请求。
用户请求处理:当用户请求接口时,后端服务通常会为每个请求分配一个线程来处理,从而提高响应能力。
3. 请求接口时的具体行为
单线程模式:在单线程服务中,所有请求可能会在同一个线程中排队处理。
多线程/进程模式:在多线程或多进程模式下,当一个请求到达时,可能会立即生成一个新的线程或进程,或者从线程池中借用一个现有的线程。
4. 异步处理
一些现代框架采用异步处理方式,通过事件循环和回调机制来处理请求,减少线程或进程的使用。
在服务器中启动多个服务时,通常会产生多个进程。以下是一些具体情况:
1. 每个服务一个进程
独立服务:大多数后端服务(如 web 服务器、数据库等)会以独立进程的形式运行。每个服务启动时,操作系统会为其分配一个新的进程。
资源隔离:进程之间相互独立,具有自己的内存空间,有助于提高安全性和稳定性。
2. 多线程服务
服务内部的多线程:一些服务可能内部实现为多线程结构,服务的每个进程可以包含多个线程来处理并发请求。这种情况下,服务的进程数量和线程数量是分开的。
线程池:在一些高性能服务中,可能会使用线程池来管理线程,以便复用已有的线程来处理请求,减少线程创建和销毁的开销。
3. 容器化服务
容器化:在使用 Docker 等容器技术时,通常每个服务在不同的容器中运行,每个容器也是一个独立的进程。这种方式可以方便地管理和扩展服务。
总结
启动多个服务时,通常是产生多个进程。
每个服务进程内部可以使用多线程来处理并发请求。
确保 replication.factor > min.insync.replicas ,如果两者相等,那么只要有⼀个副本挂机,整个分区就
⽆法正常⼯作了,我们不仅要改善消息的持久性,防⽌数据丢失,还要在不降低可⽤性的基础上完成,推荐设置
成 replication.factor = min.insync.replicas + 1 。
加群联系作者vx:xiaoda0423
仓库地址:github.com/webVueBlog/…