SpringBoot + MyBatis 实现号段模式的分布式ID~

科技   2024-11-14 09:08   江苏  

号段模式是一种常见的 ID 生成策略,在高并发场景中广泛应用。其核心思想是,发号服务每次从数据库获取一批 ID,并将这些 ID 缓存到本地。业务系统每次请求 ID 时,首先会判断本地缓存是否有可用的 ID。如果有,则直接分配给请求方;如果没有,则重新从数据库中批量获取 ID。通过这种方式,减少了频繁访问数据库的压力。

1. 优缺点分析

1.1 优点

  • • 减少数据库访问压力:在号段模式下,发号服务不需要每次请求都访问数据库,这大大减少了数据库的负载,提高了系统的性能。

  • • 提高系统可用性与可靠性:由于发号服务能缓存 ID,当数据库出现短时故障时,系统仍能正常运行,增强了系统的容错性。

  • • 扩展性强:随着业务的拓展和系统的增长,号段模式便于实现分库分表,支持更多的 ID 生成需求。例如,在处理高并发订单号生成时,可以根据业务需求快速扩展生成规则。

  • • 易于调整:随着业务的变化,可以灵活地调整号段的大小或增加号段的数量,适应不同的需求变化。

1.2. 缺点

  • • 依赖数据库:号段模式依赖数据库来管理 ID 的分配,数据库本身的性能和稳定性会直接影响 ID 生成服务的可用性。如果数据库负载过重或发生故障,可能会导致 ID 生成服务的瓶颈。

  • • ID 无业务含义:虽然号段生成的 ID 高效且简洁,但其值通常是纯数字,无法携带任何业务信息,因此对于某些业务场景(如订单号、用户编号等),其可读性和业务关联性较差。

2. 应用场景

号段模式特别适用于中等并发量的场景,尤其是在不想引入额外中间件(如 Redis)时。它常用于生成订单号、支付流水号等需要高效生成且不频繁变更的 ID。例如,在订单管理系统中,当系统订单量大幅增长时,号段模式可以通过简单的横向扩展来满足业务需求,而无需大规模修改系统架构。

3. 代码实现示例

以下是基于号段模式生成 ID 的具体实现。该方案可以集成在微服务中,数据表和代码逻辑简单易懂,适合用于生成订单号等高并发需求场景。

3.1 数据库设计

首先,在数据库中创建一张 code_seq 表,用于存储每个号段的基本信息。

drop TABLE IF EXISTS `code_seq`;

create TABLE `code_seq` (
    `env` VARCHAR(10NOT NULL COMMENT '环境编号',
    `prefix` INT(10NOT NULL COMMENT 'code前缀',
    `seq` INT(10NOT NULL COMMENT '当前序列',
    `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    PRIMARY KEY(`prefix`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT 'CODE编号表';

通过向 code_seq 表中插入多条数据,我们可以为每个环境(如 T1)定义多个不同的 prefix,这些 prefix 用于生成不同的 ID。

INSERT INTO code_seq (env, prefix, seq)
VALUES
    ('T1'10010),
    ('T1'10020),
    ('T1'10030),
    ('T1'10040),
    ('T1'10050),
    ('T1'10060),
    ('T1'10070),
    ('T1'10080),
    ('T1'10090),
    ('T1'10100);

3.2 代码设计

整体UML类图设计如下

  • • CodeHandler:主类,提供初始化和生成Code方法。主要采用并发原子类减少并发依赖,从数据库可以捞取一批codePrefixSet,在code缓存不到UPDATE_THRESHOLD比例时,会预加载下一批code到缓存中,其代码如下:

public class CodeHandler {
    private static final double UPDATE_THRESHOLD = 0.9f;

    private final String name;

    private final Set<Integer> codePrefixSet = new ConcurrentSkipListSet<>();

    private final BlockingQueue<CodeSegment> codeSegmentBlockingQueue = new LinkedBlockingQueue<>(1);

    private final AtomicBoolean needLoading = new AtomicBoolean(false);

    private final AtomicBoolean statusHealthy = new AtomicBoolean(false);

    private final String idc = "T1";

    private CodeSeqService codeSeqService;

    private CodeTpsMonitor codeTpsMonitor;

    private Executor loadingExecutor;

    public CodeHandler(String name) {
        this.name = name;
    }

    public void setCodeSeqService(CodeSeqService codeSeqService) {
        this.codeSeqService = codeSeqService;
    }

    public void init() {
        this.loadingExecutor = Executors.newSingleThreadExecutor(new BasicThreadFactory.Builder().namingPattern("order-handle-" + name).daemon(true).priority(Thread.MAX_PRIORITY).build());
        this.codeTpsMonitor = new CodeTpsMonitor(name);
        this.codeTpsMonitor.start();
        initQueue();
    }

    private void initQueue() {
        CodeSeqRet codeSeqRet = loadCodeAlloc();
        try {
            codeSegmentBlockingQueue.put(new CodeSegment(codeSeqRet.getPrefix(), codeSeqRet.getEnd(), codeSeqRet.getStart()));
        } catch (InterruptedException e) {
            log.error("initQueue put error", e);
        }
    }


    private CodeSeqRet loadCodeAlloc() {
        CodeSeqRet result = null;
        try {
            loadCodePrefix();
            int retryCount = 0;
            while (Objects.isNull(result) && !codePrefixSet.isEmpty() && retryCount <= 2) {
                int index = SecureRandomUtil.getInstance().nextInt(codePrefixSet.size());
                Integer prefix = (codePrefixSet.toArray(new Integer[codePrefixSet.size()]))[index];
                CodeSeqRet codeSeqRet = codeSeqService.generateCodeByPrefix(idc, prefix, codeTpsMonitor.getStep().get());
                int retCode = Objects.isNull(codeSeqRet) ? 1 : codeSeqRet.getResult();
                if (retCode == 0) {
                    result = codeSeqRet;
                } else if (retCode == 1) {
                    this.codePrefixSet.remove(prefix);
                    if (this.codePrefixSet.isEmpty()) {
                        loadCodePrefix();
                    }
                } else {
                    retryCount++;
                }
            }
        } finally {
            this.statusHealthy.set(result != null);
        }
        if (Objects.isNull(result)) {
            throw new IllegalStateException("load code from db error!");
        }
        return result;
    }

    private void loadCodePrefix() {
        if (!codePrefixSet.isEmpty()) {
            return;
        }
        codePrefixSet.addAll(codeSeqService.selectPrefixByEnv(idc));
        if (codePrefixSet.isEmpty()) {
            throw new IllegalStateException("no code prefix,plz check db config.");
        }
    }

    public String getCode() {
        this.codeTpsMonitor.increase();
        String code = null;
        int retryNum = 0;
        while (Objects.isNull(code)) {
            CodeSegment curSegment = this.codeSegmentBlockingQueue.peek();
            if (Objects.nonNull(curSegment)) {
                if (curSegment.getIdle() <= UPDATE_THRESHOLD * codeTpsMonitor.getStep().get() && this.needLoading.compareAndSet(falsetrue)) {
                    this.loadingExecutor.execute(new CodeLoader());
                }
                code = curSegment.getCode();
                if (Objects.isNull(code)) {
                    this.codeSegmentBlockingQueue.poll();
                }
            } else {
                if (!this.statusHealthy.get() || retryNum > 2) {
                    throw new IllegalStateException("create code failed,no available codes.");
                }
            }
            retryNum++;
            if (Objects.isNull(code)) {
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
            }
        }
        return code;
    }

    private class CodeLoader implements Runnable {

        @Override
        public void run() {
            CodeSeqRet codeSeqRet = null;
            while (Objects.isNull(codeSeqRet)) {
                try {
                    codeSeqRet = loadCodeAlloc();
                } catch (Exception e) {
                    log.error("load code error.", e);

                    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
                }
            }
            try {
                CodeHandler.this.codeSegmentBlockingQueue.put(new CodeSegment(codeSeqRet.getPrefix(), codeSeqRet.getEnd(), codeSeqRet.getStart()));
            } catch (InterruptedException e) {
                log.error("CodeLoader put queue error", e);
            }

            CodeHandler.this.needLoading.set(false);
        }
    }
}
  • • CodeSegment:封装了当前号段的信息,提供了获取下一个 ID 的功能。

public class CodeSegment {
    private final int prefix;

    private final int maxSeq;

    private AtomicLong curSequence;

    public CodeSegment(int prefix,int maxSeq,long start) {
        this.prefix = prefix;
        this.maxSeq = maxSeq;
        curSequence = new AtomicLong(start);
    }

    public String getCode() {
        long value = curSequence.getAndIncrement();
        if (value <= maxSeq) {
            return prefix + String.format(Locale.ENGLISH,"%07d",value);
        } else {
            return null;
        }
    }

    public long getIdle() {
        return maxSeq - curSequence.get() + 1;
    }
}
  • • CodeTpsMonitor:监控当前的 TPS,可以根据CODE生成值动态调整预获取CODE的批量值

public class CodeTpsMonitor implements Runnable {
    public static final int INITIAL_BATCH_COUNT = 100;

    private static final int MAX_BATCH_COUNT = 1000;

    private final AtomicInteger step = new AtomicInteger(INITIAL_BATCH_COUNT);

    private final String name;

    private AtomicInteger count = new AtomicInteger(0);

    private long startTime;

    private ScheduledExecutorService scheduledExecutorService;

    public CodeTpsMonitor(String name) {
        this.name = name;
    }

    public void start() {
        scheduledExecutorService = new ScheduledThreadPoolExecutor(1new BasicThreadFactory.Builder()
                .namingPattern("check-" + name + "-order-thread").daemon(true).build());
        scheduledExecutorService.scheduleWithFixedDelay(this15, TimeUnit.SECONDS);
    }

    public void increase() {
        count.incrementAndGet();
    }

    public AtomicInteger getStep() {
        return step;
    }

    @Override
    public void run() {
        //重置count和时间
        long start = startTime;
        int reqNum = count.getAndSet(0);
        startTime = System.currentTimeMillis();

        long timeCost = startTime - start;
        final long tps = reqNum * 1000 / timeCost;
        int newBatchCount;
        if (tps < INITIAL_BATCH_COUNT) {
            newBatchCount = INITIAL_BATCH_COUNT;
        } else if (tps > MAX_BATCH_COUNT) {
            newBatchCount = MAX_BATCH_COUNT;
        } else {
            newBatchCount = (int) tps;
        }
        step.set(newBatchCount);
    }
}
  • • CodeSeqService:与数据库交互,负责从数据库中获取和更新 ID。

public class CodeSeqService {
    private static final int MAX_SEQ = 999999999;

    @Resource
    private CodeSeqMapper codeSeqMapper;

    public List<Integer> selectPrefixByEnv(String env) {
        return codeSeqMapper.selectPrefixByEnv(env, MAX_SEQ);
    }

    @Transactional(timeout = 300, isolation = Isolation.REPEATABLE_READ, rollbackFor = Throwable.class)
    public CodeSeqRet generateCodeByPrefix(String env, Integer prefix, Integer step) {
        log.info("generateCodeByPrefix begin env={},prefix={},step={}", env, prefix, step);
        //加上行锁
        CodeSeq codeSeq = codeSeqMapper.selectCodeSeqByPrefix(env, prefix);
        CodeSeqRet codeSeqRet = new CodeSeqRet();
        codeSeqRet.setResult(0);
        codeSeqRet.setPrefix(prefix);

        if (Objects.isNull(codeSeq) || Objects.isNull(codeSeq.getSequence())) {
            codeSeqRet.setResult(1);
            return codeSeqRet;
        }
        if (codeSeq.getSequence() > MAX_SEQ) {
            codeSeqRet.setResult(1);
            return codeSeqRet;
        }

        if (MAX_SEQ - codeSeq.getSequence() + 1 < step) {
            codeSeqRet.setStart(codeSeq.getSequence());
            codeSeqRet.setEnd(MAX_SEQ);
        } else {
            codeSeqRet.setStart(codeSeq.getSequence());
            codeSeqRet.setEnd(codeSeq.getSequence() + step - 1);
        }
        codeSeq.setSequence(codeSeqRet.getEnd() + 1);
        int ret = codeSeqMapper.updateCodeSeqByPrefix(codeSeq);
        if (ret <= 0) {
            log.info("update error,ret={},codeSeq={}", ret, codeSeq);
            codeSeqRet.setResult(2);
            return codeSeqRet;
        }
        return codeSeqRet;
    }
}    
  • • CodeSeqRet : 实体

public class CodeSeqRet {
    private int prefix;

    /**
     * 0:正常 1:无可用 2:失败
     */

    private Integer result;

    private Integer start;

    private Integer end;
}
  • • CodeSeqMapper : Mybaits接口

public interface CodeSeqMapper {

    List<Integer> selectPrefixByEnv(@Param("env") String env,@Param("maxSeq") Integer maxSeq);

    CodeSeq selectCodeSeqByPrefix(@Param("env") String env, @Param("prefix") Integer prefix);

    int updateCodeSeqByPrefix(CodeSeq codeSeq);
}
  • • CodeSeqMapper:Mybatis XML文件

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">

<mapper namespace="com.toby.dynamic.data.source.db.dao.config.CodeSeqMapper">
    <resultMap id="paramConfig" type="com.toby.dynamic.data.source.db.model.CodeSeq">
        <result column="env" property="env" jdbcType="VARCHAR"/>
        <result column="prefix" property="prefix" jdbcType="INTEGER"/>
        <result column="seq" property="sequence" jdbcType="INTEGER"/>
    </resultMap>
    <select id="selectPrefixByEnv" resultType="java.lang.Integer">
        select
        `prefix`
        from code_seq where `env`=#{env} and `seq` &lt;= #{maxSeq};
    </select>

    <select id="selectCodeSeqByPrefix" resultMap="paramConfig">
        select `env`,`prefix`,`seq` from code_seq where `env` = #{env} and `prefix` = #{prefix} for update;
    </select>
    <update id="updateCodeSeqByPrefix" parameterType="com.toby.dynamic.data.source.db.model.CodeSeq" >
        update code_seq set `seq` = #{sequence} where `env` = #{env} and `prefix` = #{prefix};
    </update>
</mapper>

最终TEST用例调用如下:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class CodeTest {
    @Autowired
    private CodeSeqService codeSeqService;

    @Test
    public void createCodeTestCase01() {
        log.info("createCodeTestCase01 begin.");
        CodeHandler codeHandler = new CodeHandler("code-create");
        codeHandler.setCodeSeqService(codeSeqService);
        codeHandler.init();
        ExecutorService executorService = Executors.newFixedThreadPool(5new ThreadFactory() {
            private volatile int index = 0;

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "t" + (index++));
            }
        });
        CountDownLatch countDownLatch = new CountDownLatch(5);
        executorService.submit(new CreateCodeJob(codeHandler, 200, countDownLatch));
        executorService.submit(new CreateCodeJob(codeHandler, 200, countDownLatch));
        executorService.submit(new CreateCodeJob(codeHandler, 200, countDownLatch));
        executorService.submit(new CreateCodeJob(codeHandler, 200, countDownLatch));
        executorService.submit(new CreateCodeJob(codeHandler, 200, countDownLatch));
//        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            log.error("countDownLatch.await() error", e);
        }
        executorService.shutdown();
    }

    private static class CreateCodeJob implements Runnable {
        private CodeHandler codeHandler;
        private int times;
        private CountDownLatch countDownLatch;

        public CreateCodeJob(CodeHandler codeHandler, int times, CountDownLatch countDownLatch) {
            this.codeHandler = codeHandler;
            this.times = times;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            while (times > 0) {
                String code = codeHandler.getCode();
                log.info("threadName:{},code={}", Thread.currentThread().getName(), code);
                times--;
            }
            countDownLatch.countDown();
        }
    }
}


友情提示:点赞+在看,Bug少一半!

JAVA日知录
写代码的架构师,做架构的程序员! 实战、源码、数据库、架构...只要你来,你想了解的这里都有!
 最新文章