背景
团队在组建的时忙于业务上线,很多基础设施一直都没弄,之后一直在新业务和历史坑中往返。至今,业务相对稳定,老板说研究下分布式系统的链路追踪,提高团队解决问题时的效率。
现状
当前的服务已经有几十个了,每次生产反应有什么问题都是先A排查,确定不是自己的问题再说下游B,B确认自己负责的没问题再给C……这样前面的时间都浪费了。
其实后面团队来过一个新同学,老板让研究下,结果这一下就过去了一年多,直到今天我负责这块。
目前最大的问题有:a.日志没有traceId,b.生产的机器又不能直接登录 c.看日志又是grafana,挤在一起眼都要瞎了找关键字。
调研
其实就方案来说,其实就是自研或找一些组件来代替。先问了下同学以前研究skywalking为什么放下了?遇到了哪些问题?大致是因为日志存储上需要较大空间,又需要额外的ES作为检索工具,所以运维也觉得麻烦,业务没用ES,日志定位上用感觉不划算,花那钱干啥,所以搁置了。
自研的话其实这个网上蛮多的,基于MDC写一个Filter就OK, 我在自己的业务其实悄悄写了一个(为什么写?因为我从360金融出来刚到这边时查日志蛮shi不fen习tong惯ku):
// https://blog.csdn.net/yangkai100522208/article/details/125390138
@Order(Ordered.HIGHEST_PRECEDENCE)
@Slf4j
public class TraceFilter extends OncePerRequestFilter {
private static final String TRACE_ID_KEY = "traceId";
private static final String REQUEST_TRACE_ID = "Xpilot-Trace-Request";
private static final String RESPONSE_TRACE_ID = "Xpilot-Trace-Response";
private static final String ACTUATOR = "actuator";
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
String apiUrl = request.getRequestURI();
if (apiUrl.contains(ACTUATOR)){
//过滤检活机制 actuator/health
filterChain.doFilter(request, response);
}else {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
try {
String responseTraceId;
String requestTraceId = request.getHeader(REQUEST_TRACE_ID);
if (StringUtils.isEmpty(requestTraceId)) {
responseTraceId = traceStart();
} else {
responseTraceId = traceStart(requestTraceId);
}
response.addHeader(RESPONSE_TRACE_ID, responseTraceId);
filterChain.doFilter(request, response);
} finally {
stopWatch.stop();
log.info("web api [{}] execute finished,totally cost {} ms!", apiUrl, stopWatch.getTotalTimeMillis());
traceEnd();
}
}
}
public static String traceStart() {
String traceId = generateTraceId();
return traceStart(traceId);
}
public static String traceStart(String traceId) {
MDC.put(TRACE_ID_KEY, traceId);
return traceId;
}
public static void traceEnd() {
MDC.clear();
}
/**
* 生成跟踪ID
*
* @return
*/
private static String generateTraceId() {
return RandomStringUtils.randomAlphanumeric(11);
}
}
<properties>
<!-- 在log配置文件中将traceId打印出来-->
<property name="PATTERN">%d{yyyy-MM-dd HH:mm:ss.SSS} [%X{traceId} ] -%5p [%thread] %level %logger - %msg%n</property>
<Property name="FILE_PATH">/logs/${server_name}¥{ }</Property>
</properties>
上面的代码其实抽出来写到common-tools,然后其他服务升级以下版本也是可以的。但推动他们做太难了。而且有2个缺点:
a.如果业务内部用多线程或异步框架,线程号不一样 b.他们都引入的话每个服务生成的都是各自的,无法串起来,解决方案其实可以写一个Interceptor:
//因为我们用的是feign,其他组件类似
@Component
public class FeignClientInterceptor implements RequestInterceptor {
private final Logger log = LoggerFactory.getLogger(FeignClientInterceptor.class);
@Override
public void apply(RequestTemplate template) {
//todo 这里从header取值,有则用,没有则重新生成
template.header("requestId", UUID.randomUUID().toString());
log.info("feign header:{}", JSON.toJSONString(template.headers()));
}
}
组件方面其实类似的产品除了skywalking,还有如Cat(360金融用的就是这个)、Pinpoint、zipkin等。
通过查阅资料:https://zhuanlan.zhihu.com/p/60436915、https://www.jianshu.com/p/9bb6601908,对比之下,挑了一个相对来说轻量级,业务方适配的话改起来也不麻烦的组件——zipkin。
zipkin是啥:
译:Zipkin 是一个分布式跟踪系统,它采集一些微服务延迟数据或其它需要采集的数据,有助于更好的解定位、解决问题。
译:tracer位于你的应用程序中,并记录发生的操作序和元数据,tracer通常来说是一个类库工具,对用户使用透明。举例来说,使用tracer应用服务器会记录接收请求的时间和响应时间。收集到的跟踪数据称为 Span。
整体架构图如下:
zipkin使用traceId表示一次请求,一次请求可能需要n服务进行处理,每个服务的处理是一个span(理解为最小的工作单元-跨度Id),它包含:id,parentId,name,timestamp,duration,annotations等,trace就是span集合。
微服务通常按业务区分(类似垂直分库)服务,通过对外暴露的一个接口,供客户端调用,一个业务可能需要很多个服务协同才能完成这个功能,如果链路上任何一个服务出现问题或者网络超时,都会形成导致接口调用失败。随着业务的不断扩张,服务之间互相调用会越来越复杂。就像这样:
搞不好也有可能更复杂,看得让人虎躯一震,说排查问题非死即残也不为过:
Spring Cloud Sleuth 主要功能就是在分布式系统中提供追踪解决方案,并且兼容支持了 zipkin,你只需要在pom文件中引入相应的依赖(spring-cloud-sleuth-zipkin)即可,方便集成zipkin实现(指的是Zipkin Client,而不是Zipkin服务器)。
zipkin踩坑
有了这个关键字随便一搜全是,由于太过心急,想着跟资料一步步走,总不会错吧,直接从:https://repo1.maven.org/maven2/io/zipkin/zipkin-server/ 这里下了个最新的包zipkin-server-2.24.3.jar,然后第一跪就来了:
莫名其妙,这么打击我真的好么,后来闲逛无意打开另一个人家说下载有讲究:
故地重游一看,还真有2个jar,大意了!重新下载xxx.exec.jar:
执行以下命令(官网有详细的说明:https://zipkin.io/pages/quickstart.html):
//模仿的注意路径
java -jar C:/xxx/xxx/Downloads/zipkin-server-2.24.3-exec.jar
zipkin提供的默认端口是9411,图中最下面也有,本机启动的直接浏览器访问:http://localhost:9411 就可以看到组件对应的UI(每个版本可能不一样)
目前来说成功的基础有了,下面就是如何和服务关联起来?这个资料也蛮多的。
首先:在配置文件中增加如下配置:
spring.zipkin.base-url = http://localhost:9411/
# 将采样比例设置为 1.0,也就是全部都需要。默认是0.1也就是10%,
spring.sleuth.sampler.probability = 1.0
#设置限速采集则probability失效
# spring.sleuth.sampler.rate = 100
我们目前采用的是apollo(properties)管理配置文件,所以写法和nacos(yaml)不一样,如果需要可以将.换成: 缩近,或者使用工具(https://toyaml.com/index.html)转换:
其次:在应用中增加maven依赖,实测下来有两种方式:
<!--链路追踪 Sleuth-->
<!--方法一:分别引入下面2个依赖-->
<!-- <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>-->
<!--方法一:引入集成好的一个 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth-zipkin</artifactId>
</dependency>
这里需要注意,有的资料说要在启动类上加@EnableZipkinServer注解,我怀疑跟版本有关,反正我不加也可以跑起来:
到这里为止,启动服务可以看到已经有数据在zipkin中了
现在有一个问题就是这些数据不是外部触发的http请求,是我写的定时任务监控JVM中的使用情况:
这样的频率一天跑下来有很多记录,统计这些没意义也浪费资源,所以就想着能不能过滤掉这些,然后就第二跪如期而遇,查询了N个资料,总结下来就是改几个地方:
a. 配置skipPattern 不生效又试过skip_pattern
spring.sleuth.web.skipPattern = show_jvm_bytes,showJvmBytes
b. 配置addtionalSkipPattern,不生效又试过:additional_skip_pattern
spring.sleuth.web.additionalSkipPattern = show_jvm_bytes,showJvmBytes
c. 配置 async.enabled 不生效
spring.sleuth.async.enabled = false
d.配置 scheduled.enabled 不生效
spring.sleuth.scheduled.enabled=false
e.配置 quartz.enabled 不生效
spring.sleuth.quartz.enabled=false
看我apollo的配置文件,删了加,加了删,来来回回,人都麻了,最后使出了杀手锏——穷举,既然资料显示就这些配置,那我就把这些全部加上,然后再一个个注释掉,应用启了停,停了起,多少次记不得了,但总算试出来了,需要同时配置以下2个配置:
spring.sleuth.async.enabled = false
spring.sleuth.scheduled.enabled = false
其中任何一个关闭马上又会被zipkin采集到,真的太不容易了!感觉生活又充满了希望。接下来还有一个问题——简称第三硊:
上面介绍了,zipkin以traceId表示一次请求,spanId表示其中一个小单元,这2个ID如何出现在应用的日志中,帮助开发同学定准问题呢?资料显示的人家把上面的做完默认就有了,我这边每走一步都累,又查(hua)了好久,大致就是Spring默认是logback,而我们目前用的是log4j2,所以需要额外搞一下,熟悉的味道又来了:
a. 配置instrumentation-type 没用
spring.sleuth.reactor.instrumentation-type=decorate_on_each
b. 配置增加 打印traceId & spanId 没用
logging.pattern.level=%5p [${spring.zipkin.service.name:${spring.application.name:}},%X{traceId},%X{spanId}]
c. 关掉sl4j日志 没用
spring.sleuth.log.slf4j.enabled =false
上面的排列组合我又穷举了一下,不能说一点没用,只能说毫无起色,这回运气没站在我这边。不卖关子,这里的正招是在现在的配置文件中增加:[%X{X-B3-TraceId}-%X{X-B3-SpanId}-%X{X-B3-ParentSpanId}]
<properties>
<!-- 注意观察pattern 我去掉了一个不需要的 -->
<property name="PATTERN">[%X{X-B3-TraceId} %X{X-B3-SpanId}] %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %level %logger - %msg%n</property>
<Property name="FILE_PATH">/logs/${server_name}</Property>
</properties>
服务重启后可以看到启动成功时对应的traceId和spanId已在日志中显示 ,后续开发同学就可以通过zipkin中观察存疑的请求,拿到对应的Id去日志文件grep,一个请求的完整上下文就出来了。
持久化
上面的步骤完成了应用日志向zipkin的发送与展示,存在一个问题就是zipkin默认是将数据存储在内存中,服务重启就会消失。所以想着能把这些存起来,方便追查历史问题。zipkin扩展提供了数据存储到ES、Mysql
上面说了ES这玩意儿运维怕浪费钱,所以我选择熟悉的mysql,首先我们需要搞一个库来装这些数据,DDL的话我找的时候百度出来的资料推荐的github地址已经报404:
然后我退到根目录去一层层找storage模块,发布地址已经变成:https://github.com/openzipkin/zipkin/blob/master/zipkin-storage/mysql-v1/src/main/resources/mysql.sql,如果你看到这里想自己试试的话,发现类似的问题不要慌,试试我说的办法,也许对你有帮助。
接下来就是找个机器执行里面的DDL,成功会刷新下,如果看到库和表都有的话,证明就可以了。
然后启动的命令就要改一下了,指定对应的存储方式和机器信息:
//模仿的注意路径 和${}中的变量 ,请自行更换
java -jar C:/xxx/xxx/zipkin-server-2.24.3-exec.jar --STORAGE_TYPE=mysql --MYSQL_USER=${user} --MYSQL_PASS=${pwd} --MYSQL_HOST=${ip} --MYSQL_TCP_PORT=${port}--MYSQL_DB=${db}
不出意外的话你会看到这个东东,证明你数据源指定成功,后续zipkin中的数据就会自动写到对应的表中,通过SQL语句查一下就可以看到了,祝你好运,提前给你庆祝先。
上图可知,数据持久化到DB后,traceId和spanId都是数字(DDL中定义的是BIGINT),但UI界面展示出来的是字符串,那如果要用traceId或spanId作为where条件查询DB又该如何做呢?我翻遍所有资料,最后在:https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/mysql-v1 找到了答案:
-- 参数前+x
select * from zipkin_spans where trace_id = x'7cf94d650b00d531';
-- 调用hex函数
SELECT lower(concat(CASE trace_id_high
WHEN '0' THEN ''
ELSE hex(trace_id_high)
END,hex(trace_id))) AS trace_id,
lower(hex(parent_id)) as parent_id,
lower(hex(id)) as span_id,
name,
from_unixtime(start_ts/1000000) as timestamp
FROM zipkin_spans
where (start_ts/1000000) > UNIX_TIMESTAMP(now()) - 5 * 60;
以些类推,该方法应该适合所有像这样存储数据的设计方式;既然查询可以这样写,那么清理必要的日志提升效率,删除也可以。
扩展
我懂大家,很多同学开发机都是windows,部署在Linux,那在Linux上如何弄呢?我也试着搞了一下:
//step 1 .拉取zipkin,不指定版本为latest
docker pull openzipkin/zipkin:2.24.3
然后按官网说明,启动docker:
这里遇到一个问题,我启动后用机器外网访问: http://ip:9411 失败,询问运维同学得知我们这边机器对外的端口只有8000~8300,所以我们启动时指定的对外端口要调整到上面的范围内(也不能是机器上其他应用已占用的端口):
// 因为端口限制,该命令失败
docker run --name zipkin-server -d --restart=always -p 9411:9411 openzipkin/zipkin:2.24.3 --STORAGE_TYPE=mysql --MYSQL_USER=${user} --MYSQL_PASS=${pwd} --MYSQL_HOST=${ip} --MYSQL_TCP_PORT=${port}
// 调整端口启动成功
docker run --name zipkin-server -d --restart=always -p 8030:9411 openzipkin/zipkin:2.24.3 --STORAGE_TYPE=mysql --MYSQL_USER=${user} --MYSQL_PASS=${pwd} --MYSQL_HOST=${ip} --MYSQL_TCP_PORT=${port}
UI说明
每个版本的UI不一样,我看资料时网上有的UI还是老的,我这版本很多都合并了,大致解释如下:
Annotation:
用于定位一个request的开始和结束,记录了时间点,当这个annotation被记录了,则认为一个基本单元工作完成。cs/sr/ss/cr意义如下:
cs:Client Start,表示客户端发起请求;一个span的开始
ss:Server Start,表示服务端收到请求
sf:Server Finish,表示服务端完成处理,并将结果发送给客户端
cf:Client Finish,表示客户端获取到服务端返回信息;一个span的结束
它们之间还存在如下判定关系,可以自行计算,用于定位是逻辑处理的性能问题还是网络延迟引起的性能问题:
ss-cs:网络延迟
sf-ss:逻辑处理时间
cf-cs:整个流程时间
目前测试环境上线后通过zipkin的依赖界面可以看到服务间的调用关系:
集成kafka
上面是简单的整合,每一个服务产生消息后都要发往zipkin,由zipkin写DB。消息默认是以http的方式发送(所以网上很多资料才说spring.zipkin.sender.type = web可以不写)。配置该值应用部署大致如下:
查看源码可知,目前zipkin支持以下4中方式收集产生的消息,每一个类上面都配置了特定的条件,用来路由配置对应的Sender:
源码入口:org.springframework.cloud.sleuth.zipkin2.sender.ZipkinSenderConfigurationImportSelector
以web的方式存在一个问题:当zipkin服务挂掉了(亲测,不会阻塞业务),那后续产生的消息就会丢失,如果丢失的正好是对排查问题有关键作用的,那就完犊子了。所以可以借助MQ,应用产生消息先写MQ,zipkin按自己的“能力”去消费即可。调整后的部署结构图大致如下:
这个过程踩了太多太多太多坑,网上的hello world都是集成的rabbit,我们应用正好使用的是kafka,自然就选择kafka作为消息中间件。
修改对应的配置:
spring.zipkin.sender.type = kafka
spring.kafka.bootstrap-servers = ip:port
奇怪的事情发生了,触发请求后,zipkin UI没有对应的记录,查看DB也没有对应的记录,说明要么配置没生效,要么没消息产生。先排查kafka工作目录下的topic,打开数据文件(二进制需要装插件),大致可以看到有traceId相关的数据,那问题就剩下为什么没写库了。
梳理流程,发现这个时候角色应该是业务应用发生生产者,还差一个消费者,谁是消费者?想到应该不会让使用者还要实现什么接口来消费,当配置定为kafka时,zipkin自身应该是一个消费者,所以需要在启动zipkin的命令中同时指定kafka。如何指定呢?按照官网说明要增加参数:KAFKA_BOOTSTRAP_SERVERS,如何增加呢?
windows还好:
//模仿注意路径
java -jar C:/xxx/xxx/zipkin-server-2.24.3-exec.jar --KAFKA_BOOTSTRAP_SERVERS=localhost:9091
因为是测试集成kafka,为避免来回发版到测试环境,我本地搭得有kafka环境,所以指向的是localhost,你如果也是本地,那一定要记得先启动zookeeper、再启动kafka(关于2个环境如何搭建请看《 》,《kafka 集群环境搭建踩过的坑》这2篇)
Linux下增加参数就比较难过了:
可能是我太菜,我理解很简单,在上面持久化命令成功的基础上,加一个对应的key不久行了么?实测结果是我放在前、放在后、加-d、加-e都试过了,全部失败,全部失败,全部失败,整的没脾气了,心想要不就用默认的http得了,反正也不影响业务,好在最后坚持下来了:
docker run --name zipkin-server -e KAFKA_BOOTSTRAP_SERVERS=ip:port -d --restart=always -p 8030:9411 openzipkin/zipkin:2.24.3 --STORAGE_TYPE=mysql --MYSQL_USER=${user}--MYSQL_PASS=${owd} --MYSQL_HOST=${ip} --MYSQL_TCP_PORT=${port} --MYSQL_DB=zipkin
反复试验,最终看到了久违的zipkinUI查询出数据了
kafka扩展
和运维沟通准备按这个模式部署生产,但反应有一个问题,当前服务是无论因为业务隔离还是因为性能我们搭了多个kafka实例,不是每一个服务配置的kafka地址都一样,但zipkin服务只能监听一个kafka,该如何是好呢?
思路1:如果能独立指定zipkin kafka配置的key,那问题很好解决。查看源码得知kafka配置的key是固定的:spring.kafka.bootstrp.servers
想想为什么不能指定义呢?
站在作者的角度(猜一波):一个服务有可能使用很多spring cloud的组件,sleuth作为组件之一,统一用spring.kafka.bootstrap.server这个key没有问题,不然,凡是依赖kafka的组件都需要自定义一个kafka_key,但value又全部一样,暂且不说配置冗余、浪费用户时间,这种模式会不会很怪。所以,问题出现在使用方的结构上,无论什么原因致使业务方搭了多个kafka实例(集群),就应该都有对应的消费端。同理,zipkin若也搭成集群就可以解决——这是思路2。
但现实生活是,集群需要钱买服务器,运维预算不允许,还给了我思路三:
反正spring都是开源的,那就修改源码,把里面的key换一个重新打成jar放到自己的私服上。我不知道啥时候给他的错觉我有这本事,道理是这个道理,但我自己几斤几两天天称,心里还是有点B数的,这个方案肯定不行。
思路4:操作spring中的bean,通过获取容器中的Bean拿到zipkin kafka对应类型的Bean对象,通过反射拿到对应的属性值,修改指定kafka地址地方为自定义key指向的地方。该方法逻辑成立的,但最终没采用,因为看源码有有新的办法,这里只给出伪代码:
@ConditionalOnProperty(value = "spring.zipkin.sender.type", havingValue = "kafka")
@AutoConfigureAfter(ZipkinKafkaSenderConfiguration.class)
public class SwaggerConfig {
@Autowired
void init(KafkaSender sender){
Class<KafkaSender> kafkaSenderClass = KafkaSender.class;
Field propertiesFeild = kafkaSenderClass.getField("properties");
propertiesFeild.setAccessible(true);
Properties properties = (Properties)propertiesFeild.get(sender);
properties.put("bootstrap.servers", "localhost:9092");
}
思路5:分析源码(org.springframework.cloud.sleuth.zipkin2.ZipkinAutoConfiguration)发现,在构建zipkin 上报消息的地方有一个Sender,这个Sender的类型是前面讲的4个类型之一,该Bean创建的条件是当容器中不存在对应的bean才创建(ConditionalOnMissingBean),所以当配置的值为kafka时,我们先往容器中注册一个对应类型的Bean会导致源码中的这里不会在创建,程序运行时就会使用我们创建的Bean完成相关逻辑。
综上我们按照些处逻辑创建一个zipkin kafka类型的Bean放置到容器上下文中, 为了不让最终zipkin的业务方都做这个事,所以写到common模块中,让common依赖引用zipkin相关maven,最终业务方适配zipkin只需要升级maven common模块的版本, 不用另行引入其它依赖:
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration;
import org.springframework.cloud.sleuth.zipkin2.ZipkinProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import zipkin2.CheckResult;
import zipkin2.Span;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.Reporter;
import zipkin2.reporter.ReporterMetrics;
import zipkin2.reporter.Sender;
import zipkin2.reporter.kafka.KafkaSender;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @Title:
* @Package
* @Description:
* @author: zhangzj5@xiaopeng.com
* @date:
* @version: V1.0
*/
public class ZipkinKafkaConfig {
//包在内部类中是因为上面ConditionalOnProperty指定的其它条件的判断,为了不影响继续配置web的正常逻辑
//如果确定zipkin一定用kafka来做逻辑,则可以不包在该类中,直接注册一个kafka bean到窗口中
value = { },
havingValue =
)
public static class EnableKafkaSender{
private String topic;
private String BOOTSTRAP_SERVERS;
//模仿org.springframework.cloud.sleuth.zipkin2.ZipkinAutoConfiguration下的reporter
public Reporter<Span> reporter(ReporterMetrics reporterMetrics,
ZipkinProperties zipkin, KafkaSender kafkaSender) {
CheckResult checkResult = checkResult(kafkaSender, 1_000L);
logCheckResult(kafkaSender, checkResult);
// historical constraint. Note: AsyncReporter supports memory bounds
AsyncReporter<Span> asyncReporter = AsyncReporter.builder(kafkaSender)
.queuedMaxSpans(1000)
.messageTimeout(zipkin.getMessageTimeout(), TimeUnit.SECONDS)
.metrics(reporterMetrics).build(zipkin.getEncoder());
return asyncReporter;
}
//为了编译通过 抄org.springframework.cloud.sleuth.zipkin2.ZipkinAutoConfiguration下的logCheckResult
private void logCheckResult(Sender sender, CheckResult checkResult) {
if (log.isDebugEnabled() && checkResult != null && checkResult.ok()) {
log.debug("Check result of the [" + sender.toString() + "] is [" + checkResult
+ "]");
}
else if (checkResult != null && !checkResult.ok()) {
log.warn("Check result of the [" + sender.toString() + "] contains an error ["
+ checkResult + "]");
}
}
//为了编译通过 抄org.springframework.cloud.sleuth.zipkin2.ZipkinAutoConfiguration下的checkResult
/** Limits {@link Sender#check()} to {@code deadlineMillis}. */
static CheckResult checkResult(Sender sender, long deadlineMillis) {
CheckResult[] outcome = new CheckResult[1];
Thread thread = new Thread(sender + " check()") {
public void run() {
try {
outcome[0] = sender.check();
}
catch (Throwable e) {
outcome[0] = CheckResult.failed(e);
}
}
};
thread.start();
try {
thread.join(deadlineMillis);
if (outcome[0] != null) {
return outcome[0];
}
thread.interrupt();
return CheckResult.failed(new TimeoutException(
thread.getName() + " timed out after " + deadlineMillis + "ms"));
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CheckResult.failed(e);
}
}
//自定义zipkin kafka key
//模仿ZipkinKafkaSenderConfiguration 下的 kafkaSender
KafkaSender kafkaSender(KafkaProperties config) {
Map<String, Object> properties = config.buildProducerProperties();
properties.put("key.serializer", ByteArraySerializer.class.getName());
properties.put("value.serializer", ByteArraySerializer.class.getName());
// Kafka expects the input to be a String, but KafkaProperties returns a list
//Object bootstrapServers = properties.get(BOOTSTRAP_SERVERS);a,b
properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
return KafkaSender.newBuilder().topic(this.topic).overrides(properties)
.build();
}
}
}
<!--common 依赖zipkin-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth-zipkin</artifactId>
<version>2.2.8.RELEASE</version>
</dependency>
通过上面的代码可知,业务方适配zipkin时可指定zipkin 指向的kafka地址,key为:spring.zipkin.kafka.bootstrap.servers,实现了和默认的spring.kafka.bootstrap.servers区分。消息的生产和消费可通过监控地址查看:
巨坑提醒:使用kafka时,测试环境下topic和consumer group都是可以自动创建的,几乎就没管这块。但上了生产后发现没消费,经排查是没有group,生产的kafka实例不会自动创建,提工单创建后group后消息开始消费落库了。
想看zipkin server端的日志或优化性能的话可以增加参数:
# 更多的参数说明可通过源码查看zipkin-server-shared.yml文件,里面有详细说明
docker run --name zipkin-server -e KAFKA_BOOTSTRAP_SERVERS=${ip}:${port}-e KAFKA_STREAMS=10 -e LOGGING_LEVEL_ZIPKIN=DEBUG -e LOGGING_LEVEL_ZIPKIN2=DEBUG -e LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_WEB=DEBUG -d --restart=always -p 8030:9411 openzipkin/zipkin:2.24.3 --STORAGE_TYPE=mysql --MYSQL_USER=${user}--MYSQL_PASS=${pwd} --MYSQL_HOST=${host} --MYSQL_TCP_PORT=${port}--MYSQL_DB=zipkin --MYSQL_MAX_CONNECTIONS=50
整合
如果原来的项目已经有traceId了,那再集成zipkin,又会重新生成一个到日志,看起来就有点怪怪的了。
可替换原有traceId生成的地方,也变成zipkin结构下的traceId(参考:https://zhuanlan.zhihu.com/p/366790978):
import brave.Tracer;
private Tracer tracer;
//put in method
String traceId = tracer.currentSpan().context().traceIdString();
写到这里一看有约8000字数,怕太多客官看得太累,本来准备zipkin的集成准备分成2篇:基础集成和kafka集成,转念一想,不看的无论分多细也不看,看的无论多长也会看,所以对看到这里的人说一声:谢谢。
划水专用:
https://zipkin.io/pages/quickstart.html
http://www.hzhcontrols.com/new-1613274.html
https://cloud.spring.io/spring-cloud-sleuth/reference/html/
https://zhuanlan.zhihu.com/p/535826598
https://github.com/openzipkin-contrib/zipkin-storage-kafka/blob/master/module/src/main/resources/zipkin-server-storage-kafka-only-dependencies.yml
https://cloud.spring.io/spring-cloud-static/spring-cloud-sleuth/1.2.1.RELEASE/#_adding_to_the_project
https://www.cnblogs.com/huigui-mint/p/17464495.html
https://cloud.spring.io/spring-cloud-static/spring-cloud-sleuth/1.2.1.RELEASE/#_adding_to_the_project
看到的同学觉得有用,点赞、关注、转发搞一个阔以不