springboot第79集:各种锁,线程池企业级高并发亿级数据处理

科技   2024-10-21 22:54   广东  

个人简介:我是哪吒

全栈架构师Java | 微服务集群方向

一个人可能走得更快,但一群人一定会走得更远

对于 MySQL 单表数据超过亿条记录的优化,以下是一些建议:

1. 索引优化

  • 创建合适的索引:确保对常用查询条件、排序字段和连接字段建立索引。
  • 使用复合索引:对于多个条件的查询,考虑使用复合索引,以减少索引的数量和提升查询效率。

2. 分区表

  • 数据分区:根据某个字段(如时间、ID范围等)进行分区,能有效地减少单个表的大小,从而提升查询性能。

3. 表结构优化

  • 字段类型选择:合理选择字段数据类型,避免使用过大的类型,如尽量使用 INT 替代 BIGINT
  • 避免 NULL:尽量避免在可以用默认值替代的情况下使用 NULL,这样可以减少存储空间。

4. 查询优化

  • **避免 SELECT ***:只查询需要的字段,减少数据传输量。
  • 使用 EXPLAIN:分析查询执行计划,找出慢查询并优化。

5. 批量操作

  • 批量插入/更新:对于大量数据的插入或更新,使用批量操作,减少事务开销。
  • 使用事务:在批量操作时使用事务,提高操作的原子性和一致性。

6. 数据归档

  • 历史数据归档:将不常用的数据迁移到其他表或数据库中,保持主表数据较小,提高查询性能。

7. 配置优化

  • 调整 MySQL 配置:根据硬件资源调整 MySQL 的配置,如增加 innodb_buffer_pool_size、调整 max_connections 等。

8. 定期维护

  • 表维护:定期进行表的 OPTIMIZE TABLE 操作,清理碎片,提高存储效率。

9. 使用缓存

  • 应用层缓存:使用 Redis 或 Memcached 等缓存技术,减少对数据库的直接访问,提高性能。

⼀般情况下,⾮公平锁能提升⼀定的效率。但是⾮公平锁可能会发⽣线程饥饿(有

⼀些线程⻓时间得不到锁)的情况。所以要根据实际的需求来选择⾮公平锁和公平

锁。

ReentrantLock⽀持⾮公平锁和公平锁两种。

  • 空指针检查:防止 NullPointerException

  • 索引范围检查:防止 ArrayIndexOutOfBoundsExceptionIndexOutOfBoundsException

  • 类型检查:防止 ClassCastException

  • 数字格式检查:防止 NumberFormatException

  • 除零检查:防止 ArithmeticException

  • 根据RPC调用方式选择操作路径

    • HTTP:如果 rpc 的值为 "http",则使用 httpInvokeService 发起HTTP请求,调用日志服务。
    • Feign:如果 rpc 的值为 "feign",则通过 actionLogFeignService 进行远程服务调用。
    • 代码中首先根据 rpc 变量的值选择不同的日志写入方式:

  • HTTP调用方式

    • 500:可能是用于控制最大重试次数(如在网络不稳定时使用),但该参数值在当前代码未明确其作用。
    • 1000 * 10:表示超时时间设置为10秒(单位为毫秒)。
    • 使用 httpInvokeService.invoke() 发送HTTP请求,向日志服务发送日志数据。调用过程中,将 actionLog 对象通过 GsonUtils.getJsonFromObject() 方法转换为 JSON 格式,然后进行传递。

    • 该方法的两个参数:

  • Feign调用方式

    • 如果 rpc 值为 "feign",则使用 actionLogFeignServiceadd 方法来直接调用日志服务。
    • Feign 是一种声明式的HTTP客户端,它简化了与远程服务的交互,通过接口调用远程服务。
  • 异常处理

    • try-catch 结构中,捕获调用过程中可能发生的异常。如果异常发生,记录错误日志,并返回一个标识失败的 RestRet 对象。
    • 通过 logger.info() 方法记录失败的日志信息,便于后续排查问题。
  • 返回结果

    • 无论是HTTP调用还是Feign调用,成功时返回远程服务的响应(RestRet 对象)。失败时则返回一个失败的 RestRet.createFAIL() 对象。

线程池的懒加载与资源管理

  • 当前的线程池 execvolatile 的,避免了多线程并发下的可见性问题,但未使用线程池时会持有不必要的资源。建议在需要时才初始化线程池,并在任务执行完毕后进行合理的资源释放。

并行执行效率的提升

  • 当前代码在所有任务提交后,通过 Future.get() 阻塞等待所有任务完成。虽然可以确保所有任务执行完成,但这是串行等待的过程。建议使用 CompletableFuture.allOf() 来并行等待所有任务,避免一个任务阻塞主线程,提高执行效率。

通过 CompletableFuture.allOf(),可以同时等待所有任务完成,避免任务间的阻塞等待。这样可以最大化利用线程池的并行处理能力。

任务异常处理的优化

  • 当前代码中每个任务的异常会在阻塞等待时捕获并处理,但如果某个任务在执行过程中抛出了异常,其他任务仍会继续执行。建议在异步任务内部处理异常,保证每个任务在执行过程中都能安全结束。

这样可以确保任务在执行过程中出现异常时,不会影响其他任务的执行。

线程池使用的优化

  • 线程池 exec 是全局的且未初始化。如果线程池没有初始化,代码会抛出 NullPointerException。建议在方法执行之前检查线程池是否已初始化,或者使用 getExecutorService() 方法懒加载线程池。

可配置化

  • 当前同步任务数是固定的。如果未来需要动态调整同步任务或扩展同步类型,建议将任务类型和数量通过配置文件或参数管理,避免硬编码。

从远程服务获取换电订单数据,进行同步处理并批量插入数据库。同步过程是分页执行的,首先获取上次同步的时间段,然后分页拉取远程数据,并进行批量插入。同步完成后更新同步时间。

并发处理

  • 如果数据量非常大,分页处理时间较长,考虑将同步任务拆分为多个并发任务执行。例如,可以按时间段进行分区,或者并发处理不同的数据集。

数据去重处理

  • 如果同步过程可能会遇到重复数据,可以在插入数据库时增加数据去重逻辑,防止同一条数据多次插入。

使用了双重检查锁(Double-Checked Locking)来初始化线程池 exec。这是一个典型的线程安全的懒加载(Lazy Initialization)模式。PostConstruct 注解确保这个方法在对象创建后自动调用,初始化线程池以用于数据同步任务。

尽管双重检查锁模式在性能上已经有所优化,但如果初始化逻辑复杂,仍可能导致潜在的性能问题。在现代 JVM 中,volatile 关键字已经可以确保单例的安全初始化

根据任务类型动态调整核心线程数和最大线程数

  • 如果同步任务的执行时间差异较大(某些任务耗时长,某些任务耗时短),可以考虑通过配置文件或根据实际负载动态调整核心线程数和最大线程数。

避免 SQL 性能瓶颈

  • DISTINCT(id) 在大型数据集上可能会导致性能问题,因为去重操作增加了查询的复杂度。如果有索引,可以考虑先筛选后去重或者查询非重复的主键。

  • selectListOptional 的结合

    • selectList 会返回一个列表,而在这里只需要获取一条数据。可以使用 selectOne 方法来直接获取结果,避免额外的 Optional 操作,从而简化代码。
  • Optional 的冗余

    • selectOne 返回 null 时,自然会返回 null,不需要额外使用 Optional 进行包装和过滤,可以简化逻辑。
  • 性能与优化

    • limit 1 的使用是合理的,可以限制结果集的大小,提升查询效率。
    • 建议对 updateT 字段建立索引,以加快排序操作。

获取前一天的日期DateUtil.offsetDay(new Date(), -1) 计算出当前日期的前一天,并格式化为字符串(如 "2023-10-10")。

optimizeTable 函数用于对指定的数据库表执行优化操作。优化表的目的是通过整理碎片、重建索引等方式提升数据库的查询性能和存储效率。

调用数据层操作:通过 mapper.optimizeTable(tableName),将优化请求发送到数据库对应的 Mapper 层,执行底层 SQL 操作。mapper 是一个数据访问对象(DAO),负责与数据库交互。optimizeTable 方法在 Mapper 中可能执行的是 SQL 中的 OPTIMIZE TABLE 语句,该语句用于整理表中由于删除或更新记录而产生的碎片。

  • 整理碎片:在数据频繁更新或删除时,表可能会产生内部碎片,导致性能下降。优化表可以重组数据,提升查询效率。

  • 重建索引:优化过程会重建表的索引,有助于改善检索速度。

  • 释放空间:有些数据库引擎在优化表后可以释放已被删除数据占用的空间。

优化数据库表尤其是在大数据量和频繁更新的表中十分常见,它能提高性能,减少碎片,提高查询的效率。

  1. @Configuration:

  • 作用:标注该类为 Spring 的配置类,使 Spring 容器能够将其识别为配置类。
  • 优化理由:无需优化,保持原有标注即可。
  • @Bean(name = "mybatisPlusInterceptor") :

    • 作用:将方法返回的对象注册为 Spring 容器中的一个 Bean,并指定 Bean 的名称为 "mybatisPlusInterceptor"
    • 优化理由:Bean 名称可以根据项目需求进行修改或保持不变。
  • public MybatisPlusInterceptor mybatisPlusInterceptor() :

    • 作用:该方法负责创建 MybatisPlusInterceptor 对象,并为其添加分页功能。
    • 优化建议:可以添加一些额外的拦截器(例如性能分析、数据权限控制等),根据业务需求扩展功能。
  • MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); :

    • 作用:创建 MybatisPlusInterceptor 拦截器实例,用于添加和管理多个 Mybatis 拦截器。
    • 优化建议:创建拦截器对象是必要的,无需更改。
  • interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.CLICK_HOUSE)); :

    • 作用:为 MybatisPlusInterceptor 添加一个内部的分页拦截器,指定数据库类型为 ClickHouse。这意味着分页查询时会根据 ClickHouse 的数据库特性生成相应的 SQL 语句。
    • 优化建议:当前数据库类型为 ClickHouse,如果数据库类型更改为 MySQL 或其他数据库,需根据实际情况设置相应的 DbType
  • return interceptor; :

    • 作用:将配置好的拦截器返回,以便 Spring 容器管理和注入到 Mybatis 中。
    • 优化建议:逻辑无误,保持原样。

    逻辑优化(针对其他可能的需求):

    • 多数据库支持:如果项目中使用了多个数据库,可以根据不同的条件动态设置数据库类型。
    • 其他拦截器的支持:可以考虑添加如性能分析、乐观锁等其他拦截器,以增强 Mybatis 的功能。
    xxl:
      job:
        admin:
          address: http://xxl-job-admin
        accessToken: yourAccessToken
        executor:
          appname: yourAppName
          address: yourAddress
          ip: yourIp
          port: 9999
          logpath: /data/logs/xxl-job
          logretentiondays: 30

    CONTINUE(100, "继续"),
    SWITCHING_PROTOCOLS(101, "切换协议"),
    PROCESSING(102, "处理中"),
    CHECKPOINT(103, "检查点"),
    OK(200, "成功"),
    CREATED(201, "已创建"),
    ACCEPTED(202, "已接受"),
    NON_AUTHORITATIVE_INFORMATION(203, "非权威信息"),
    NO_CONTENT(204, "无内容"),
    RESET_CONTENT(205, "重置内容"),
    PARTIAL_CONTENT(206, "部分内容"),
    MULTI_STATUS(207, "多状态"),
    ALREADY_REPORTED(208, "已报告"),
    IM_USED(226, "IM已使用"),
    MULTIPLE_CHOICES(300, "多种选择"),
    MOVED_PERMANENTLY(301, "永久移动"),
    FOUND(302, "找到"),
    MOVED_TEMPORARILY(302, "临时移动"),
    SEE_OTHER(303, "查看其他"),
    NOT_MODIFIED(304, "未修改"),
    USE_PROXY(305, "使用代理"),
    TEMPORARY_REDIRECT(307, "临时重定向"),
    PERMANENT_REDIRECT(308, "永久重定向"),
    BAD_REQUEST(400, "错误请求"),
    UNAUTHORIZED(401, "未经授权"),
    PAYMENT_REQUIRED(402, "需要支付"),
    FORBIDDEN(403, "禁止"),
    NOT_FOUND(404, "未找到"),
    METHOD_NOT_ALLOWED(405, "方法不允许"),
    NOT_ACCEPTABLE(406, "不可接受"),
    PROXY_AUTHENTICATION_REQUIRED(407, "需要代理身份验证"),
    REQUEST_TIMEOUT(408, "请求超时"),
    CONFLICT(409, "冲突"),
    GONE(410, "已删除"),
    LENGTH_REQUIRED(411, "需要长度"),
    PRECONDITION_FAILED(412, "前置条件失败"),
    PAYLOAD_TOO_LARGE(413, "负载过大"),
    URI_TOO_LONG(414, "URI过长"),
    UNSUPPORTED_MEDIA_TYPE(415, "不支持的媒体类型"),
    REQUESTED_RANGE_NOT_SATISFIABLE(416, "请求的范围无法满足"),
    EXPECTATION_FAILED(417, "期望失败"),
    I_AM_A_TEAPOT(418, "我是个茶壶"),
    UNPROCESSABLE_ENTITY(422, "不可处理的实体"),
    LOCKED(423, "锁定"),
    FAILED_DEPENDENCY(424, "依赖失败"),
    TOO_EARLY(425, "过早"),
    UPGRADE_REQUIRED(426, "需要升级"),
    PRECONDITION_REQUIRED(428, "需要前置条件"),
    TOO_MANY_REQUESTS(429, "请求过多"),
    REQUEST_HEADER_FIELDS_TOO_LARGE(431, "请求头字段过大"),
    UNAVAILABLE_FOR_LEGAL_REASONS(451, "因法律原因不可用"),
    INTERNAL_SERVER_ERROR(500, "内部服务器错误"),
    NOT_IMPLEMENTED(501, "未实现"),
    BAD_GATEWAY(502, "错误网关"),
    SERVICE_UNAVAILABLE(503, "服务不可用"),
    GATEWAY_TIMEOUT(504, "网关超时"),
    HTTP_VERSION_NOT_SUPPORTED(505, "HTTP版本不受支持"),
    VARIANT_ALSO_NEGOTIATES(506, "变体也协商"),
    INSUFFICIENT_STORAGE(507, "存储不足"),
    LOOP_DETECTED(508, "检测到循环"),
    BANDWIDTH_LIMIT_EXCEEDED(509, "带宽限制超出"),
    NOT_EXTENDED(510, "未扩展"),
    NETWORK_AUTHENTICATION_REQUIRED(511, "需要网络身份验证");

    private LocalDateTime time;  // 使用 LocalDateTime 来表示时间,便于后续时间运算和格式化


    @NotNull(message = "时间不能为空")
    private String time;

    @Min(value = 0, message = "总数不能为负数")
    private Integer total;

    • @Data:

      • 作用:由 Lombok 自动生成该类的 gettersetterequalshashCodetoString 方法。减少手动编写这些常见方法的代码量,保持代码简洁。
      • 优化理由:该注解极大简化了 Java Bean 类中的冗余代码,便于维护。
    • @AllArgsConstructor:

      • 作用:由 Lombok 自动生成带有所有字段的构造函数,便于在创建对象时同时初始化所有字段。
      • 优化理由:通常用于在实例化对象时需要直接传递所有参数的场景,减少手动编写构造函数。
    • @NoArgsConstructor:

      • 作用:由 Lombok 自动生成无参构造函数,便于在某些场景下需要无参构造器时使用(如使用框架时需要对象的无参构造函数)。
      • 优化理由:当对象初始化不需要传递参数时(如框架反射创建对象),此构造器可以有效简化代码。

    公平锁与⾮公平锁

    这⾥的“公平”,其实通俗意义来说就是“先来后到”,也就是FIFO。如果对⼀个锁来

    说,先对锁获取请求的线程⼀定会先被满⾜,后对锁获取请求的线程后被满⾜,那

    这个锁就是公平的。反之,那就是不公平的。

    ⼀般情况下,⾮公平锁能提升⼀定的效率。但是⾮公平锁可能会发⽣线程饥饿(有

    ⼀些线程⻓时间得不到锁)的情况。所以要根据实际的需求来选择⾮公平锁和公平

    锁。

    ReentrantLock⽀持⾮公平锁和公平锁两种。

    读写锁和排它锁

    synchronized⽤的锁和ReentrantLock,其实都是“排它锁”。也就是

    说,这些锁在同⼀时刻只允许⼀个线程进⾏访问。

    ⽽读写锁可以再同⼀时刻允许多个读线程访问。Java提供了

    ReentrantReadWriteLock类作为读写锁的默认实现,内部维护了两个锁:⼀个读

    锁,⼀个写锁。通过分离读锁和写锁,使得在“读多写少”的环境下,⼤⼤地提⾼了

    性能。

    注意,即使⽤读写锁,在写线程访问时,所有的读线程和其它写线程均被阻

    塞。

    可⻅,只是synchronized是远远不能满⾜多样化的业务对锁的要求的。

    众所周知,JDK中关于并发的类⼤多都在 java.util.concurrent (以下简称juc)包

    下。⽽juc.locks包看名字就知道,是提供了⼀些并发锁的⼯具类的。

    AQS(AbstractQueuedSynchronizer)就是在这个包下。

    抽象类AQS/AQLS/AOS

    这三个抽象类有⼀定的关系,所以这⾥放到⼀起讲。

    ⾸先我们看AQS(AbstractQueuedSynchronizer),

    它是在JDK 1.5 发布的,提供了⼀个“队列同步器”的基本功能实现。⽽AQS⾥⾯的

    “资源”是⽤⼀个 int 类型的数据来表示的,有时候我们的业务需求资源的数量超出

    了 int 的范围,所以在JDK 1.6 中,多了⼀个

    AQLS(AbstractQueuedLongSynchronizer)。它的代码跟AQS⼏乎⼀样,只是把

    资源的类型变成了 long 类型。

    AQS和AQLS都继承了⼀个类叫AOS(AbstractOwnableSynchronizer)。这个类

    也是在JDK 1.6 中出现的。这个类只有⼏⾏简单的代码。从源码类上的注释可以知

    道,它是⽤于表示锁与持有者之间的关系(独占模式)。可以看⼀下它的主要⽅

    接⼝Condition/Lock/ReadWriteLock

    juc.locks包下共有三个接⼝: Condition 、 Lock 、 ReadWriteLock 。其中,Lock

    和ReadWriteLock从名字就可以看得出来,分别是锁和读写锁的意思。Lock接⼝⾥

    ⾯有⼀些获取锁和释放锁的⽅法声明,⽽ReadWriteLock⾥⾯只有两个⽅法,分别

    返回“读锁”和“写锁”:

    那为什么既然有Object的监视器⽅法了,还要⽤Condition呢?

    Condition和Object的wait/notify基本相似。其中,Condition的await⽅法对应的是

    Object的wait⽅法,⽽Condition的signal/signalAll⽅法则对应Object的

    notify/notifyAll()。但Condition类似于Object的等待/通知机制的加强版。

    ReentrantLock

    ReentrantLock是⼀个⾮抽象类,它是Lock接⼝的JDK默认实现,实现了锁的基本

    功能。从名字上看,它是⼀个”可重⼊“锁,从源码上看,它内部有⼀个抽象

    类 Sync ,是继承了AQS,⾃⼰实现的⼀个同步器。同时,ReentrantLock内部有

    两个⾮抽象类 NonfairSync 和 FairSync ,它们都继承了Sync。从名字上看得出,

    分别是”⾮公平同步器“和”公平同步器“的意思。这意味着ReentrantLock可以⽀持”公

    平锁“和”⾮公平锁“。

    通过看着两个同步器的源码可以发现,它们的实现都是”独占“的。都调⽤了AOS

    的 setExclusiveOwnerThread ⽅法,所以ReentrantLock的锁的”独占“的,也就是

    说,它的锁都是”排他锁“,不能共享。

    在ReentrantLock的构造⽅法⾥,可以传⼊⼀个 boolean 类型的参数,来指定它是

    否是⼀个公平锁,默认情况下是⾮公平的。这个参数⼀旦实例化后就不能修改,只

    能通过 isFair() ⽅法来查看

    订单超时未支付关闭方案

    jdk队列, 保存订单,订单放入队列,订单超时,修改订单状态,到数据库,服务重启,同步数据

    DelayQueue
    orderid=123 timeout=10
    orderid=456 timeout=15
    orderid=789 timeout=16

    适用场景:系统本身是单体应用,建议采用这种,简单

    优点:1.简单,jdk自带的队列,2.最上手,成本低
    缺点:订单最大的时候占用内存多,任务只能由本机器自己消费,无法被其他机器来协助消费

    RocketMQ

    1.保存数据,2.发送延迟消息到rocketmq,3达到延迟时间,处理业务,4.修改订单状态,到数据库

    适用场景:日均量百万,千万级别且订单延迟精度要求高的场景。

    提升接口性能的方法

    索引优化:通过优化数据库索引,减少查询时间,提高接口响应速度,索引优化是提升数据库查询性能的关键步骤

    sql优化:改进sql,避免复杂的查询和不必要的数据处理,精简高效sql是提升性能的基础,一般就是从是否使用上索引上考虑

    远程调用并行化,异步处理

    避免大事务:合理控制事务大小,避免长事物影响数据库性能,细化锁力度,减少锁竞争,提升系统并发能性能

    分页处理:对大量数据进行分页,避免一次性加在过多的数据,实施分页加载,减轻单次请求,提高响应速度

    缓存策略:合理使用缓存,减少数据库访问频率,加快数据读取速度,缓存常用数据,减少数据库访问,快速响应用户请求

    ReentrantReadWriteLock

    这个类也是⼀个⾮抽象类,它是ReadWriteLock接⼝的JDK默认实现。它与

    ReentrantLock的功能类似,同样是可重⼊的,⽀持⾮公平锁和公平锁。不同的

    是,它还⽀持”读写锁“。

    ReentrantReadWriteLock内部的结构⼤概是这样:

    它同样是内部维护了两个同步器。且维护了两个Lock的实现类

    ReadLock和WriteLock。从源码可以发现,这两个内部类⽤的是外部类的同步器。

    ReentrantReadWriteLock实现了读写锁,但它有⼀个⼩弊端,就是在“写”操作的时

    候,其它线程不能写也不能读。我们称这种现象为“写饥饿”

    乐观读锁的意思就是先假定在这个锁获取期间,共享变量不会被改变,既然

    假定不会被改变,那就不需要上锁。在获取乐观读锁之后进⾏了⼀些操作,

    然后⼜调⽤了validate⽅法,这个⽅法就是⽤来验证tryOptimisticRead之后,

    是否有写操作执⾏过,如果有,则获取⼀个悲观读锁,这⾥的悲观读锁和

    ReentrantReadWriteLock中的读锁类似,也是个共享锁。

    StampedLock⽤这个long类型的变量的前7位(LG_READERS)来表示读锁,每获

    取⼀个悲观读锁,就加1(RUNIT),每释放⼀个悲观读锁,就减1。⽽悲观读锁最

    多只能装128个(7位限制),很容易溢出,所以⽤⼀个int类型的变量来存储溢出的

    悲观读锁。

    写锁⽤state变量剩下的位来表示,每次获取⼀个写锁,就加0000 1000

    0000(WBIT)。需要注意的是,写锁在释放的时候,并不是减WBIT,⽽是再加

    WBIT。这是为了让每次写锁都留下痕迹,解决CAS中的ABA问题,也为乐观锁检

    查变化validate⽅法提供基础。

    乐观读锁就⽐较简单了,并没有真正改变state的值,⽽是在获取锁的时候记录

    state的写状态,在操作完成后去检查state的写状态部分是否发⽣变化,上⽂提到

    了,每次写锁都会留下痕迹,也是为了这⾥乐观锁检查变化提供⽅便。

    总的来说,StampedLock的性能是⾮常优异的,基本上可以取代

    ReentrantReadWriteLock的作⽤。

    RocketMQ

    消费组中包含多个消费者,同⼀个组内的消费者是竞争消费的关系,每个消费者负责消费组内的⼀部分消息。默认

    情况,如果⼀条消息被消费者Consumer1消费了,那同组的其他消费者就不会再收到这条消息。

    Message Queue(消息队列),⼀个 Topic 下可以设置多个消息队列,Topic 包括多个 Message Queue ,如果

    ⼀个 Consumer 需要获取 Topic下所有的消息,就要遍历所有的 Message Queue。

    RocketMQ还有⼀些其它的Queue——例如ConsumerQueue。

    消息消费模式有两种:Clustering(集群消费)和Broadcasting(⼴播消费)。

    默认情况下就是集群消费,这种模式下 ⼀个消费者组共同消费⼀个主题的多个队列,⼀个队列只会被⼀个消费者消费 ,

    如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。

    ⽽⼴播消费消息会发给消费者组中的每⼀个消费者进⾏消费。

    NameServer

    NameServer 是⼀个⽆状态的服务器,⻆⾊类似于 Kafka使⽤的 Zookeeper,但⽐ Zookeeper 更轻量。

    特点:

    每个 NameServer 结点之间是相互独⽴,彼此没有任何信息交互。

    Nameserver 被设计成⼏乎是⽆状态的,通过部署多个结点来标识⾃⼰是⼀个伪集群,Producer 在发送消息

    前从 NameServer 中获取 Topic 的路由信息也就是发往哪个 Broker,Consumer 也会定时从 NameServer

    获取 Topic 的路由信息,Broker 在启动时会向 NameServer 注册,并定时进⾏⼼跳连接,且定时同步维护的

    Topic 到 NameServer。

    功能主要有两个:

    1、和Broker 结点保持⻓连接。

    2、维护 Topic 的路由信息。

    Broker

    消息存储和中转⻆⾊,负责存储和转发消息。

    Broker 内部维护着⼀个个 Consumer Queue,⽤来存储消息的索引,真正存储消息的地⽅是

    CommitLog(⽇志⽂件)。

    单个 Broker 与所有的 Nameserver 保持着⻓连接和⼼跳,并会定时将 Topic 信息同步到 NameServer,和

    NameServer 的通信底层是通过 Netty 实现的。

    Producer

    消息⽣产者,业务端负责发送消息,由⽤户⾃⾏实现和分布式部署。

    Producer由⽤户进⾏分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延

    时,⽀持快速失败。

    RocketMQ 提供了三种⽅式发送消息:同步、异步和单向

    同步发送:同步发送指消息发送⽅发出数据后会在收到接收⽅发回响应之后才发下⼀个数据包。⼀般⽤于重

    要通知消息,例如重要通知邮件、营销短信。

    异步发送:异步发送指发送⽅发出数据后,不等接收⽅发回响应,接着发送下个数据包,⼀般⽤于可能链路

    耗时较⻓⽽对响应时间敏感的业务场景,例如⽤户视频上传后通知启动转码服务。

    单向发送:单向发送是指只负责发送消息⽽不等待服务器回应且没有回调函数触发,适⽤于某些耗时⾮常短

    但对可靠性要求并不⾼的场景,例如⽇志收集。

    ⽣产

    在⽣产阶段,主要通过请求确认机制,来保证消息的可靠传递。

    1、同步发送的时候,要注意处理响应结果和异常。如果返回响应OK,表示消息成功发送到了Broker,如果

    响应失败,或者发⽣其它异常,都应该重试。

    2、异步发送的时候,应该在回调⽅法⾥检查,如果发送失败或者异常,都应该进⾏重试。

    3、如果发⽣超时的情况,也可以通过查询⽇志的API,来检查是否在Broker存储成功。

    存储

    存储阶段,可以通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该

    使⽤同步。

    1、消息只要持久化到CommitLog(⽇志⽂件)中,即使Broker宕机,未消费的消息也能重新恢复再消费。

    2、Broker的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息⼀定存储在pagecache中(内存

    中),但是同步刷盘更可靠,它是Producer发送消息后等数据持久化到磁盘之后再返回响应给Producer。

    3、Broker通过主从模式来保证⾼可⽤,Broker⽀持Master和Slave同步复制、Master和Slave异步复制模

    式,⽣产者的消息都是发送给Master,但是消费既可以从Master消费,也可以从Slave消费。同步复制模式

    可以保证即使Master宕机,消息肯定在Slave中有备份,保证了消息不会丢失。

    消费

    从Consumer⻆度分析,如何保证消息被成功消费?

    Consumer保证消息成功消费的关键在于确认的时机,不要在收到消息后就⽴即发送消费确认,⽽是应该在执

    ⾏完所有消费业务逻辑之后,再发送消费确认。因为消息队列维护了消费的位置,逻辑执⾏失败了,没有确

    认,再去队列拉取消息,就还是之前的⼀条。

    如何保证RocketMQ的⾼可⽤?

    NameServer因为是⽆状态,且不相互通信的,所以只要集群部署就可以保证⾼可⽤

    RocketMQ的⾼可⽤主要是在体现在Broker的读和写的⾼可⽤,Broker的⾼可⽤是通过 集群 和 主从 实现的。

    简单来说,RocketMQ是⼀个分布式消息队列,也就是 消息队列 + 分布式系统 。

    作为消息队列,它是 发 - 存 - 收 的⼀个模型,对应的就是Producer、Broker、Cosumer;作为分布式系统,它要

    有服务端、客户端、注册中⼼,对应的就是Broker、Producer/Consumer、NameServer

    所以我们看⼀下它主要的⼯作流程:RocketMQ由NameServer注册中⼼集群、Producer⽣产者集群、Consumer

    消费者集群和若⼲Broker(RocketMQ进程)组成:

    1. Broker在启动的时候去向所有的NameServer注册,并保持⻓连接,每30s发送⼀次⼼跳

    2. Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择⼀台服务器来发

    送消息

    1. Conusmer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费

    Broker是怎么保存数据的呢?

    RocketMQ主要的存储⽂件包括CommitLog⽂件、ConsumeQueue⽂件、Indexfile⽂件。

    CommitLog:消息主体以及元数据的存储主体,存储Producer端写⼊的消息主体内容,消息内容不是定⻓

    的。单个⽂件⼤⼩默认1G, ⽂件名⻓度为20位,左边补零,剩余为起始偏移量,⽐如

    00000000000000000000代表了第⼀个⽂件,起始偏移量为0,⽂件⼤⼩为1G=1073741824;当第⼀个⽂件

    写满了,第⼆个⽂件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序

    写⼊⽇志⽂件,当⽂件满了,写⼊下⼀个⽂件。

    CommitLog⽂件保存于${Rocket_Home}/store/commitlog⽬录中,从图中我们可以明显看出来⽂件名的偏移

    量,每个⽂件默认1G,写满后⾃动⽣成⼀个新的⽂件

    ConsumeQueue:消息消费队列,引⼊的⽬的主要是提⾼消息消费的性能,由于RocketMQ是基于主题

    topic的订阅模式,消息消费是针对主题进⾏的,如果要遍历commitlog⽂件中根据topic检索消息是⾮常低效

    的。

    Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消

    息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息⼤⼩size和消息Tag的

    HashCode值。

    ConsumeQueue⽂件可以看成是基于Topic的CommitLog索引⽂件,故ConsumeQueue⽂件夹的组织⽅式如下:

    topic/queue/file三层组织结构,具体存储路径为:

    $HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样ConsumeQueue⽂件采取定⻓设计,每⼀

    个条⽬共20个字节,分别为8字节的CommitLog物理偏移量、4字节的消息⻓度、8字节tag hashcode,单个⽂件

    由30W个条⽬组成,可以像数组⼀样随机访问每⼀个条⽬,每个ConsumeQueue⽂件⼤⼩约5.72M;

    IndexFile:IndexFile(索引⽂件)提供了⼀种可以通过key或时间区间来查询消息的⽅法。Index⽂件的存

    储位置是: {fileName},⽂件名fileName是以创建时的时间戳命名的,固定的单个IndexFile⽂件⼤⼩约为

    400M,⼀个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在⽂件系统中实现HashMap结

    构,故RocketMQ的索引⽂件其底层实现为hash索引。

    可以通过零拷⻉的⽅式,减少⽤户态与内核态的上下⽂切换和内存拷⻉的次数,⽤来提升I/O的性能。零拷

    ⻉⽐较常⻅的实现⽅式是mmap,这种机制在Java中是通过MappedByteBuffer实现的。

    加群联系作者vx:xiaoda0423

    仓库地址:https://github.com/webVueBlog/JavaGuideInterview

    算法猫叔
    程序员:进一寸有一寸的欢喜
     最新文章