springboot第69集:字节跳动后端二面经,一文让你走出微服务迷雾架构周刊

科技   其他   2024-04-07 21:07   广东  

1.  简介

1.1  消息队列简介

1.1.1  什么是消息队列

消息队列,英文名:Message Queue,经常缩写为MQ。从字面上来理解,消息队列是一种用来存储消息的队列。来看一下下面的代码:

// 1. 创建一个保存字符串的队列
QueuestringQueue = new LinkedList();

// 2. 往消息队列中放入消息
stringQueue.offer( "hello" );

// 3. 从消息队列中取出消息并打印
System.out.println(stringQueue.poll());

上述代码,创建了一个队列,先往队列中添加了一个消息,然后又从队列中取出了一个消息。这说明了队列是可以用来存取消息的。

我们可以简单理解消息队列就是将需要传输的数据存放在队列中

1.1.1  消息队列中间件

消息队列中间件就是用来存储消息的软件(组件)。举个例子来理解,为了分析网站的用户行为,我们需要记录用户的访问日志。这些一条条的日志,可以看成是一条条的消息,我们可以将它们保存到消息队列中。将来有一些应用程序需要处理这些日志,就可以随时将这些消息取出来处理。


目前市面上的消息队列有很多,例如:Kafka、RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ等。

消息队列的应用场景

电商网站中,新的用户注册时,需要将用户的信息保存到数据库中,同时还需要额外发送注册的邮件通知、以及短信注册码给用户。但因为发送邮件、发送注册短信需要连接外部的服务器,需要额外等待一段时间,此时,就可以使用消息队列来进行异步处理,从而实现快速响应。

image.png
image.png

1.1.1.1  日志处理(大数据领域常见)

大型电商网站(淘宝、京东、国美、苏宁...)、App(抖音、美团、滴滴等)等需要分析用户行为,要根据用户的访问行为来发现用户的喜好以及活跃情况,需要在页面上收集大量的用户访问信息。

image.png
image.png
image.png
image.png
image.png

点对点模式特点:

每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中)

发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;

接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;
image.png
image.png

Kafka是由Apache软件基金会开发的一个开源流平台,由Scala和Java编写。Kafka的Apache官网是这样介绍Kakfa的。

Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:

1. 发布和订阅流数据流,类似于消息队列或者是企业消息传递系统

2. 以容错的持久化方式存储数据流

处理数据流

1. Publish and subscribe:发布与订阅

2. Store:存储

3. Process:处理

我们通常将Apache Kafka用在两类程序:

1. 建立实时数据管道,以可靠地在系统或应用程序之间获取数据

2. 构建实时流应用程序,以转换或响应数据流

image.png

上图,我们可以看到:

1. Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。

2. Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。

3. Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到

数据库中。

4. Stream Processors:流处理器可以Kafka中拉取数据,也可以将数据写入到Kafka中。

Kafka比ActiveMQ牛逼得多

特性ActiveMQRabbitMQKafkaRocketMQ
所属社区/公司ApacheMozilla Public LicenseApacheApache/Ali
成熟度成熟成熟成熟比较成熟
生产者-消费者模式支持支持支持支持
发布-订阅支持支持支持支持
REQUEST-REPLY支持支持-支持
API完备性低(静态配置)
多语言支持支持JAVA优先语言无关支持,JAVA优先支持
单机呑吐量万级(最差)万级十万级十万级(最高)
消息延迟-微秒级毫秒级-
可用性高(主从)高(主从)非常高(分布式)
消息丢失-理论上不会丢失-
消息重复-可控制理论上会有重复-
事务支持不支持支持支持
文档的完备性
提供快速入门
首次部署难度-

可以注意到Kafka的版本号为:kafka_2.12-2.4.1,因为kafka主要是使用scala语言开发的,2.12为scala的版本号。http://kafka.apache.org/downloads可以查看到每个版本的发布时间。

image.png
image.png
image.png
image.png
image.png
目录名称说明
binKafka的所有执行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等
configKafka的所有配置文件
libs运行Kafka所需要的所有JAR包
logsKafka的所有日志文件,如果Kafka出现一些问题,需要到该目录中去查看异常信息
site-docsKafka的网站帮助文件
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png

基于1个分区1个副本的基准测试

测试步骤:

1. 启动Kafka集群

2. 创建一个1个分区1个副本的topic: benchmark

3. 同时运行生产者、消费者基准测试程序

4. 观察结果

image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
<**repositories**><!-- 代码库 -->  
    <**repository**>  
        <**id**>central</**id**>  
        <**url**>http://maven.aliyun.com/nexus/content/groups/public//</**url**>  
        <**releases**>  
            <**enabled**>true</**enabled**>  
        </**releases**>  
        <**snapshots**>  
            <**enabled**>true</**enabled**>  
            <**updatePolicy**>always</**updatePolicy**>  
            <**checksumPolicy**>fail</**checksumPolicy**>  
        </**snapshots**>  
    </**repository**>  
</**repositories**>  
  
<**dependencies**>  
    <!-- kafka客户端工具 -->  
    <**dependency**>  
        <**groupId**>org.apache.kafka</**groupId**>  
        <**artifactId**>kafka-clients</**artifactId**>  
        <**version**>2.4.1</**version**>  
    </**dependency**>  
  
    <!-- 工具类 -->  
    <**dependency**>  
        <**groupId**>org.apache.commons</**groupId**>  
        <**artifactId**>commons-io</**artifactId**>  
        <**version**>1.3.2</**version**>  
    </**dependency**>  
  
    <!-- SLF桥接LOG4J日志 -->  
    <**dependency**>  
        <**groupId**>org.slf4j</**groupId**>  
        <**artifactId**>slf4j-log4j12</**artifactId**>  
        <**version**>1.7.6</**version**>  
    </**dependency**>  
  
    <!-- SLOG4J日志 -->  
    <**dependency**>  
        <**groupId**>log4j</**groupId**>  
        <**artifactId**>log4j</**artifactId**>  
        <**version**>1.2.16</**version**>  
    </**dependency**>  
</**dependencies**>  
  
<**build**>  
    <**plugins**>  
        <**plugin**>  
            <**groupId**>org.apache.maven.plugins</**groupId**>  
            <**artifactId**>maven-compiler-plugin</**artifactId**>  
            <**version**>3.7.0</**version**>  
            <**configuration**>  
                <**source**>1.8</**source**>  
                <**target**>1.8</**target**>  
            </**configuration**>  
        </**plugin**>  
    </**plugins**>  
</**build**> 

将log4j.properties配置文件放入到resources文件夹中

**log4j.rootLogger**=**INFO,stdout****  
****log4j.appender.stdout**=**org.apache.log4j.ConsoleAppender****  
****log4j.appender.stdout.layout**=**org.apache.log4j.PatternLayout****  
****log4j.appender.stdout.layout.ConversionPattern**= **%5p - %m%n**
image.png
**public class** KafkaProducerTest {  
    **public static void** main(String[] args) {  
        // 1. 创建用于连接Kafka的Properties配置  
        Properties props = **new** Properties();  
        props.put( **"bootstrap.servers"** , **"192.168.88.100:9092"** );  
        props.put( **"acks"** , **"all"** );  
        props.put( **"key.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );  
        props.put( **"value.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );  
  
        // 2. 创建一个生产者对象KafkaProducer  
        KafkaProducer<String, String> producer = **new** KafkaProducer<String, String>(props);  
  
        // 3. 调用send发送1-100消息到指定Topic test  
        **for**(**int** i = 0; i < 100; ++i) {  
            **try** {  
                // 获取返回值Future,该对象封装了返回值  
                Future<RecordMetadata> future = producer.send(**new** ProducerRecord<String, String>( **"test"** , **null**, i + **""** ));  
                // 调用一个Future.get()方法等待响应  
                future.get();  
            } **catch** (InterruptedException e) {  
                e.printStackTrace();  
            } **catch** (ExecutionException e) {  
                e.printStackTrace();  
            }  
        }  
  
        // 5. 关闭生产者  
        producer.close();  
    }  
}
image.png
image.png
**public class** KafkaProducerTest {  
    **public static void** main(String[] args) {  
        // 1. 创建用于连接Kafka的Properties配置  
        Properties props = **new** Properties();  
        props.put( **"bootstrap.servers"** , **"node1.itcast.cn:9092"** );  
        props.put( **"acks"** , **"all"** );  
        props.put( **"key.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );  
        props.put( **"value.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );  
  
        // 2. 创建一个生产者对象KafkaProducer  
        KafkaProducer<String, String> producer = **new** KafkaProducer<String, String>(props);  
  
        // 3. 调用send发送1-100消息到指定Topic test  
        **for**(**int** i = 0; i < 100; ++i) {  
            **try** {  
                // 获取返回值Future,该对象封装了返回值  
                Future<RecordMetadata> future = producer.send(**new** ProducerRecord<String, String>( **"test"** , **null**, i + **""** ));  
                // 调用一个Future.get()方法等待响应  
                future.get();  
            } **catch** (InterruptedException e) {  
                e.printStackTrace();  
            } **catch** (ExecutionException e) {  
                e.printStackTrace();  
            }  
        }  
  
        // 5. 关闭生产者  
        producer.close();  
    }  
}
**public class** KafkaProducerTest {  
    **public static void** main(String[] args) {  
        // 1. 创建用于连接Kafka的Properties配置  
        Properties props = **new** Properties();  
        props.put( **"bootstrap.servers"** , **"node1.itcast.cn:9092"** );  
        props.put( **"acks"** , **"all"** );  
        props.put( **"key.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );  
        props.put( **"value.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );  
  
        // 2. 创建一个生产者对象KafkaProducer  
        KafkaProducer<String, String> producer = **new** KafkaProducer<String, String>(props);  
  
        // 3. 调用send发送1-100消息到指定Topic test  
        **for**(**int** i = 0; i < 100; ++i) {  
            // 一、同步方式  
            // 获取返回值Future,该对象封装了返回值  
            // Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));  
            // 调用一个Future.get()方法等待响应  
            // future.get();  
  
            // 二、带回调函数异步方式  
            producer.send(**new** ProducerRecord<String, String>( **"test"** , **null**, i + **""** ), **new** Callback() {  
                @Override  
                **public void** onCompletion(RecordMetadata metadata, Exception exception) {  
                    **if**(exception != **null**) {  
                        System.***out***.println( **"** **发送消息出现异常** **"** );  
                    }  
                    **else** {  
                        String topic = metadata.topic();  
                        **int** partition = metadata.partition();  
                        **long** offset = metadata.offset();  
  
                        System.***out***.println( **"** **发送消息到** **Kafka** **中的名字为** **"** + topic + **"** **的主题,第** **"** + partition + **"** **分区,第** **"** + offset + **"** **条数据成功** **!"** );  
                    }  
                }  
            });  
        }  
  
        // 5. 关闭生产者  
        producer.close();  
    }  
}
image.png
image.png
image.png

ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)

ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker。

Kafka正在逐步想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据

1.1.1  producer(生产者)

生产者负责将数据推送给broker的topic

1.1.2  consumer(消费者)

消费者负责从broker的topic中拉取数据,并自己进行处理

1.1.3  consumer group(消费者组)

image.png
image.png
image.png
image.png
image.png
image.png
// 3. 发送1-100数字到Kafka的test主题中  
**while**(**true**) {  
    **for** (**int** i = 1; i <= 100; ++i) {  
        // 注意:send方法是一个异步方法,它会将要发送的数据放入到一个buffer中,然后立即返回  
        // 这样可以让消息发送变得更高效  
        producer.send(**new** ProducerRecord<>( **"test"** , i + **""** ));  
    }  
    Thread.*sleep*(3000);  
}
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
*// 1. 创建消费者*

    **public** **static** Consumer **<** String, String **>**  createConsumer() {

         *// 1. 创建Kafka消费者配置*

        Properties **props** **=** **new** Properties();

        **props**.setProperty( **"** **bootstrap.servers** **"** ,  **"** **node1.itcast.cn:9092** **"** );

        **props**.setProperty( **"** **group.id** **"** ,  **"** **ods_user** **"** );

        **props**.put( **"** **isolation.level** **"** , **"** **read_committed** **"** );

        **props**.setProperty( **"** **enable.auto.commit** **"** ,  **"** **false** **"** );

        **props**.setProperty( **"** **key.deserializer** **"** ,  **"** **org.apache.kafka.common.serialization.StringDeserializer** **"** );

        **props**.setProperty( **"** **value.deserializer** **"** ,  **"** **org.apache.kafka.common.serialization.StringDeserializer** **"** );

 

         *// 2. 创建Kafka消费者*

        KafkaConsumer<String, String> **consumer** **=** **new** KafkaConsumer<>(props);

 

         *// 3. 订阅要消费的主题*

        **consumer**.subscribe(**Arrays**.asList( **"** **ods_user** **"** ));

        

        **return** consumer;

}

编写一个方法 createProducer,返回一个生产者对象。注意:需要配置事务的id,开启了事务会默认开启幂等性。

image.png
image.png
1.1.1.1  编写代码消费并生产数据

实现步骤:

1. 调用之前实现的方法,创建消费者、生产者对象

2. 生产者调用initTransactions初始化事务

3. 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic

(1) 生产者开启事务

(2) 消费者拉取消息

(3) 遍历拉取到的消息,并进行预处理(将1转换为男,0转换为女)

(4) 生产消息到dwd_user topic中

(5) 提交偏移量到事务中

(6) 提交事务

(7) 捕获异常,如果出现异常,则取消事务

**public** **static** void main(String[] args) {

        Consumer<String, String> **consumer** **=** createConsumer();

        Producer<String, String> **producer** **=** createProducer();

         *// 初始化事务*

        **producer**.initTransactions();

 

        **while**(true) {

            **try** {

                 *// 1. 开启事务*

                **producer**.beginTransaction();

                 *// 2. 定义Map结构,用于保存分区对应的offset*

                Map<TopicPartition, OffsetAndMetadata> **offsetCommits** **=** **new** HashMap<>();

                 *// 2. 拉取消息*

                ConsumerRecords<String, String> **records** **=** **consumer**.poll(**Duration**.ofSeconds(2));

                **for** (ConsumerRecord<String, String> **record**  **:**  records) {

                     *// 3. 保存偏移量*

                    **offsetCommits**.put(**new** TopicPartition(**record**.topic(), **record**.partition()),

                            **new** OffsetAndMetadata(**record**.offset() + 1));

                     *// 4. 进行转换处理*

                    String[] **fields** **=** **record**.value().split( **"** **,** **"** );

                    fields[1] **=** fields[1].equalsIgnoreCase( **"** **1** **"** )  **?**   **"** **男** **"** **:** **"** **女** **"** ;

                    String **message** **=** fields[0]  **+**   **"** **,** **"**   **+**  fields[1]  **+**   **"** **,** **"**   **+**  fields[2];

                     *// 5. 生产消息到dwd_user*

                    **producer**.send(**new** ProducerRecord<>( **"** **dwd_user** **"** , message));

                }

                 *// 6. 提交偏移量到事务*

                **producer**.sendOffsetsToTransaction(offsetCommits,  **"** **ods_user** **"** );

                 *// 7. 提交事务*

                **producer**.commitTransaction();

            } **catch** (Exception **e**) {

                 *// 8. 放弃事务*

                **producer**.abortTransaction();

            }

        }

    }
image.png
image.png

1.  分区和副本机制

1.1  生产者分区写入策略

生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中

1. 轮询分区策略

2. 随机分区策略

3. 按key分区分配策略

4. 自定义分区策略

1.1.1  轮询策略

image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
指标意义
Brokers Spreadbroker使用率
Brokers Skew分区是否倾斜
Brokers Leader Skewleader partition是否存在倾斜
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
指标****单分区单副本(ack=0)****单分区单副本(ack=1)****单分区单副本(ack=-1/all)****
吞吐量165875.991109/s每秒16.5W条记录93092.533979/s每秒9.3W条记录73586.766156 /s每秒7.3W调记录
吞吐速率158.19 MB/sec88.78 MB/sec70.18 MB
平均延迟时间192.43 ms346.62 ms438.77 ms
最大延迟时间670.00 ms1003.00 ms1884.00 ms
image.png
image.png
指标****单分区单副本(ack=0)****单分区单副本(ack=1)****
吞吐量165875.991109 records/sec每秒16.5W条记录93092.533979 records/sec每秒9.3W条记录
吞吐速率158.19 MB/sec每秒约160MB数据88.78 MB/sec每秒约89MB数据
平均延迟时间192.43 ms avg latency346.62 ms avg latency
最大延迟时间670.00 ms max latency1003.00 ms max latency
image.png

加群联系作者vx:xiaoda0423

仓库地址:https://github.com/webVueBlog/JavaGuideInterview

算法猫叔
程序员:进一寸有一寸的欢喜
 最新文章