号段模式是一种常见的 ID 生成策略,在高并发场景中广泛应用。其核心思想是,发号服务每次从数据库获取一批 ID,并将这些 ID 缓存到本地。业务系统每次请求 ID 时,首先会判断本地缓存是否有可用的 ID。如果有,则直接分配给请求方;如果没有,则重新从数据库中批量获取 ID。通过这种方式,减少了频繁访问数据库的压力。
1. 优缺点分析
1.1 优点
• 减少数据库访问压力:在号段模式下,发号服务不需要每次请求都访问数据库,这大大减少了数据库的负载,提高了系统的性能。
• 提高系统可用性与可靠性:由于发号服务能缓存 ID,当数据库出现短时故障时,系统仍能正常运行,增强了系统的容错性。
• 扩展性强:随着业务的拓展和系统的增长,号段模式便于实现分库分表,支持更多的 ID 生成需求。例如,在处理高并发订单号生成时,可以根据业务需求快速扩展生成规则。
• 易于调整:随着业务的变化,可以灵活地调整号段的大小或增加号段的数量,适应不同的需求变化。
1.2. 缺点
• 依赖数据库:号段模式依赖数据库来管理 ID 的分配,数据库本身的性能和稳定性会直接影响 ID 生成服务的可用性。如果数据库负载过重或发生故障,可能会导致 ID 生成服务的瓶颈。
• ID 无业务含义:虽然号段生成的 ID 高效且简洁,但其值通常是纯数字,无法携带任何业务信息,因此对于某些业务场景(如订单号、用户编号等),其可读性和业务关联性较差。
2. 应用场景
号段模式特别适用于中等并发量的场景,尤其是在不想引入额外中间件(如 Redis)时。它常用于生成订单号、支付流水号等需要高效生成且不频繁变更的 ID。例如,在订单管理系统中,当系统订单量大幅增长时,号段模式可以通过简单的横向扩展来满足业务需求,而无需大规模修改系统架构。
3. 代码实现示例
以下是基于号段模式生成 ID 的具体实现。该方案可以集成在微服务中,数据表和代码逻辑简单易懂,适合用于生成订单号等高并发需求场景。
3.1 数据库设计
首先,在数据库中创建一张 code_seq 表,用于存储每个号段的基本信息。
drop TABLE IF EXISTS `code_seq`;
create TABLE `code_seq` (
`env` VARCHAR(10) NOT NULL COMMENT '环境编号',
`prefix` INT(10) NOT NULL COMMENT 'code前缀',
`seq` INT(10) NOT NULL COMMENT '当前序列',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY(`prefix`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT 'CODE编号表';
通过向 code_seq 表中插入多条数据,我们可以为每个环境(如 T1)定义多个不同的 prefix,这些 prefix 用于生成不同的 ID。
INSERT INTO code_seq (env, prefix, seq)
VALUES
('T1', 1001, 0),
('T1', 1002, 0),
('T1', 1003, 0),
('T1', 1004, 0),
('T1', 1005, 0),
('T1', 1006, 0),
('T1', 1007, 0),
('T1', 1008, 0),
('T1', 1009, 0),
('T1', 1010, 0);
3.2 代码设计
整体UML类图设计如下
•
CodeHandler
:主类,提供初始化和生成Code方法。主要采用并发原子类减少并发依赖,从数据库可以捞取一批codePrefixSet,在code缓存不到UPDATE_THRESHOLD比例时,会预加载下一批code到缓存中,其代码如下:
public class CodeHandler {
private static final double UPDATE_THRESHOLD = 0.9f;
private final String name;
private final Set<Integer> codePrefixSet = new ConcurrentSkipListSet<>();
private final BlockingQueue<CodeSegment> codeSegmentBlockingQueue = new LinkedBlockingQueue<>(1);
private final AtomicBoolean needLoading = new AtomicBoolean(false);
private final AtomicBoolean statusHealthy = new AtomicBoolean(false);
private final String idc = "T1";
private CodeSeqService codeSeqService;
private CodeTpsMonitor codeTpsMonitor;
private Executor loadingExecutor;
public CodeHandler(String name) {
this.name = name;
}
public void setCodeSeqService(CodeSeqService codeSeqService) {
this.codeSeqService = codeSeqService;
}
public void init() {
this.loadingExecutor = Executors.newSingleThreadExecutor(new BasicThreadFactory.Builder().namingPattern("order-handle-" + name).daemon(true).priority(Thread.MAX_PRIORITY).build());
this.codeTpsMonitor = new CodeTpsMonitor(name);
this.codeTpsMonitor.start();
initQueue();
}
private void initQueue() {
CodeSeqRet codeSeqRet = loadCodeAlloc();
try {
codeSegmentBlockingQueue.put(new CodeSegment(codeSeqRet.getPrefix(), codeSeqRet.getEnd(), codeSeqRet.getStart()));
} catch (InterruptedException e) {
log.error("initQueue put error", e);
}
}
private CodeSeqRet loadCodeAlloc() {
CodeSeqRet result = null;
try {
loadCodePrefix();
int retryCount = 0;
while (Objects.isNull(result) && !codePrefixSet.isEmpty() && retryCount <= 2) {
int index = SecureRandomUtil.getInstance().nextInt(codePrefixSet.size());
Integer prefix = (codePrefixSet.toArray(new Integer[codePrefixSet.size()]))[index];
CodeSeqRet codeSeqRet = codeSeqService.generateCodeByPrefix(idc, prefix, codeTpsMonitor.getStep().get());
int retCode = Objects.isNull(codeSeqRet) ? 1 : codeSeqRet.getResult();
if (retCode == 0) {
result = codeSeqRet;
} else if (retCode == 1) {
this.codePrefixSet.remove(prefix);
if (this.codePrefixSet.isEmpty()) {
loadCodePrefix();
}
} else {
retryCount++;
}
}
} finally {
this.statusHealthy.set(result != null);
}
if (Objects.isNull(result)) {
throw new IllegalStateException("load code from db error!");
}
return result;
}
private void loadCodePrefix() {
if (!codePrefixSet.isEmpty()) {
return;
}
codePrefixSet.addAll(codeSeqService.selectPrefixByEnv(idc));
if (codePrefixSet.isEmpty()) {
throw new IllegalStateException("no code prefix,plz check db config.");
}
}
public String getCode() {
this.codeTpsMonitor.increase();
String code = null;
int retryNum = 0;
while (Objects.isNull(code)) {
CodeSegment curSegment = this.codeSegmentBlockingQueue.peek();
if (Objects.nonNull(curSegment)) {
if (curSegment.getIdle() <= UPDATE_THRESHOLD * codeTpsMonitor.getStep().get() && this.needLoading.compareAndSet(false, true)) {
this.loadingExecutor.execute(new CodeLoader());
}
code = curSegment.getCode();
if (Objects.isNull(code)) {
this.codeSegmentBlockingQueue.poll();
}
} else {
if (!this.statusHealthy.get() || retryNum > 2) {
throw new IllegalStateException("create code failed,no available codes.");
}
}
retryNum++;
if (Objects.isNull(code)) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
}
}
return code;
}
private class CodeLoader implements Runnable {
@Override
public void run() {
CodeSeqRet codeSeqRet = null;
while (Objects.isNull(codeSeqRet)) {
try {
codeSeqRet = loadCodeAlloc();
} catch (Exception e) {
log.error("load code error.", e);
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}
}
try {
CodeHandler.this.codeSegmentBlockingQueue.put(new CodeSegment(codeSeqRet.getPrefix(), codeSeqRet.getEnd(), codeSeqRet.getStart()));
} catch (InterruptedException e) {
log.error("CodeLoader put queue error", e);
}
CodeHandler.this.needLoading.set(false);
}
}
}
•
CodeSegment
:封装了当前号段的信息,提供了获取下一个 ID 的功能。
public class CodeSegment {
private final int prefix;
private final int maxSeq;
private AtomicLong curSequence;
public CodeSegment(int prefix,int maxSeq,long start) {
this.prefix = prefix;
this.maxSeq = maxSeq;
curSequence = new AtomicLong(start);
}
public String getCode() {
long value = curSequence.getAndIncrement();
if (value <= maxSeq) {
return prefix + String.format(Locale.ENGLISH,"%07d",value);
} else {
return null;
}
}
public long getIdle() {
return maxSeq - curSequence.get() + 1;
}
}
•
CodeTpsMonitor
:监控当前的 TPS,可以根据CODE生成值动态调整预获取CODE的批量值
public class CodeTpsMonitor implements Runnable {
public static final int INITIAL_BATCH_COUNT = 100;
private static final int MAX_BATCH_COUNT = 1000;
private final AtomicInteger step = new AtomicInteger(INITIAL_BATCH_COUNT);
private final String name;
private AtomicInteger count = new AtomicInteger(0);
private long startTime;
private ScheduledExecutorService scheduledExecutorService;
public CodeTpsMonitor(String name) {
this.name = name;
}
public void start() {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder()
.namingPattern("check-" + name + "-order-thread").daemon(true).build());
scheduledExecutorService.scheduleWithFixedDelay(this, 1, 5, TimeUnit.SECONDS);
}
public void increase() {
count.incrementAndGet();
}
public AtomicInteger getStep() {
return step;
}
@Override
public void run() {
//重置count和时间
long start = startTime;
int reqNum = count.getAndSet(0);
startTime = System.currentTimeMillis();
long timeCost = startTime - start;
final long tps = reqNum * 1000 / timeCost;
int newBatchCount;
if (tps < INITIAL_BATCH_COUNT) {
newBatchCount = INITIAL_BATCH_COUNT;
} else if (tps > MAX_BATCH_COUNT) {
newBatchCount = MAX_BATCH_COUNT;
} else {
newBatchCount = (int) tps;
}
step.set(newBatchCount);
}
}
•
CodeSeqService
:与数据库交互,负责从数据库中获取和更新 ID。
public class CodeSeqService {
private static final int MAX_SEQ = 999999999;
@Resource
private CodeSeqMapper codeSeqMapper;
public List<Integer> selectPrefixByEnv(String env) {
return codeSeqMapper.selectPrefixByEnv(env, MAX_SEQ);
}
@Transactional(timeout = 300, isolation = Isolation.REPEATABLE_READ, rollbackFor = Throwable.class)
public CodeSeqRet generateCodeByPrefix(String env, Integer prefix, Integer step) {
log.info("generateCodeByPrefix begin env={},prefix={},step={}", env, prefix, step);
//加上行锁
CodeSeq codeSeq = codeSeqMapper.selectCodeSeqByPrefix(env, prefix);
CodeSeqRet codeSeqRet = new CodeSeqRet();
codeSeqRet.setResult(0);
codeSeqRet.setPrefix(prefix);
if (Objects.isNull(codeSeq) || Objects.isNull(codeSeq.getSequence())) {
codeSeqRet.setResult(1);
return codeSeqRet;
}
if (codeSeq.getSequence() > MAX_SEQ) {
codeSeqRet.setResult(1);
return codeSeqRet;
}
if (MAX_SEQ - codeSeq.getSequence() + 1 < step) {
codeSeqRet.setStart(codeSeq.getSequence());
codeSeqRet.setEnd(MAX_SEQ);
} else {
codeSeqRet.setStart(codeSeq.getSequence());
codeSeqRet.setEnd(codeSeq.getSequence() + step - 1);
}
codeSeq.setSequence(codeSeqRet.getEnd() + 1);
int ret = codeSeqMapper.updateCodeSeqByPrefix(codeSeq);
if (ret <= 0) {
log.info("update error,ret={},codeSeq={}", ret, codeSeq);
codeSeqRet.setResult(2);
return codeSeqRet;
}
return codeSeqRet;
}
}
•
CodeSeqRet
: 实体
public class CodeSeqRet {
private int prefix;
/**
* 0:正常 1:无可用 2:失败
*/
private Integer result;
private Integer start;
private Integer end;
}
•
CodeSeqMapper
: Mybaits接口
public interface CodeSeqMapper {
List<Integer> selectPrefixByEnv(@Param("env") String env,@Param("maxSeq") Integer maxSeq);
CodeSeq selectCodeSeqByPrefix(@Param("env") String env, @Param("prefix") Integer prefix);
int updateCodeSeqByPrefix(CodeSeq codeSeq);
}
•
CodeSeqMapper
:Mybatis XML文件
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.toby.dynamic.data.source.db.dao.config.CodeSeqMapper">
<resultMap id="paramConfig" type="com.toby.dynamic.data.source.db.model.CodeSeq">
<result column="env" property="env" jdbcType="VARCHAR"/>
<result column="prefix" property="prefix" jdbcType="INTEGER"/>
<result column="seq" property="sequence" jdbcType="INTEGER"/>
</resultMap>
<select id="selectPrefixByEnv" resultType="java.lang.Integer">
select
`prefix`
from code_seq where `env`=#{env} and `seq` <= #{maxSeq};
</select>
<select id="selectCodeSeqByPrefix" resultMap="paramConfig">
select `env`,`prefix`,`seq` from code_seq where `env` = #{env} and `prefix` = #{prefix} for update;
</select>
<update id="updateCodeSeqByPrefix" parameterType="com.toby.dynamic.data.source.db.model.CodeSeq" >
update code_seq set `seq` = #{sequence} where `env` = #{env} and `prefix` = #{prefix};
</update>
</mapper>
最终TEST用例调用如下:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class CodeTest {
@Autowired
private CodeSeqService codeSeqService;
@Test
public void createCodeTestCase01() {
log.info("createCodeTestCase01 begin.");
CodeHandler codeHandler = new CodeHandler("code-create");
codeHandler.setCodeSeqService(codeSeqService);
codeHandler.init();
ExecutorService executorService = Executors.newFixedThreadPool(5, new ThreadFactory() {
private volatile int index = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "t" + (index++));
}
});
CountDownLatch countDownLatch = new CountDownLatch(5);
executorService.submit(new CreateCodeJob(codeHandler, 200, countDownLatch));
executorService.submit(new CreateCodeJob(codeHandler, 200, countDownLatch));
executorService.submit(new CreateCodeJob(codeHandler, 200, countDownLatch));
executorService.submit(new CreateCodeJob(codeHandler, 200, countDownLatch));
executorService.submit(new CreateCodeJob(codeHandler, 200, countDownLatch));
// LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
try {
countDownLatch.await();
} catch (InterruptedException e) {
log.error("countDownLatch.await() error", e);
}
executorService.shutdown();
}
private static class CreateCodeJob implements Runnable {
private CodeHandler codeHandler;
private int times;
private CountDownLatch countDownLatch;
public CreateCodeJob(CodeHandler codeHandler, int times, CountDownLatch countDownLatch) {
this.codeHandler = codeHandler;
this.times = times;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
while (times > 0) {
String code = codeHandler.getCode();
log.info("threadName:{},code={}", Thread.currentThread().getName(), code);
times--;
}
countDownLatch.countDown();
}
}}
如喜欢本文,请点击右上角,把文章分享到朋友圈
如有想了解学习的技术点,请留言给若飞安排分享
因公众号更改推送规则,请点“在看”并加“星标”第一时间获取精彩技术分享
·END·
相关阅读:
作者:技术驿站
来源:juejin.cn/post/7315846728732688422
版权申明:内容来源网络,仅供学习研究,版权归原创者所有。如有侵权烦请告知,我们会立即删除并表示歉意。谢谢!
我们都是架构师!
关注架构师(JiaGouX),添加“星标”
获取每天技术干货,一起成为牛逼架构师
技术群请加若飞:1321113940 进架构师群
投稿、合作、版权等邮箱:admin@137x.com