SpringBoot集成zipkin和kafak

文摘   2024-03-11 11:00   江苏  
  1. 背景

    团队在组建的时忙于业务上线,很多基础设施一直都没弄,之后一直在新业务和历史坑中往返。至今,业务相对稳定,老板说研究下分布式系统的链路追踪,提高团队解决问题时的效率。


  2. 现状

    当前的服务已经有几十个了,每次生产反应有什么问题都是先A排查,确定不是自己的问题再说下游B,B确认自己负责的没问题再给C……这样前面的时间都浪费了。

    其实后面团队来过一个新同学,老板让研究下,结果这一下就过去了一年多,直到今天我负责这块

    目前最大的问题有:a.日志没有traceId,b.生产的机器又不能直接登录 c.看日志又是grafana,挤在一起眼都要瞎了找关键字


  3. 调研

    其实就方案来说,其实就是自研或找一些组件来代替。先问了下同学以前研究skywalking为什么放下了?遇到了哪些问题?大致是因为日志存储上需要较大空间,又需要额外的ES作为检索工具,所以运维也觉得麻烦,业务没用ES,日志定位上用感觉不划算,花那钱干啥,所以搁置了。

    自研的话其实这个网上蛮多的,基于MDC写一个Filter就OK, 我在自己的业务其实悄悄写了一个(为什么写?因为我从360金融出来刚到这边时查日志蛮shi不fen习tong惯ku):

    // https://blog.csdn.net/yangkai100522208/article/details/125390138@Order(Ordered.HIGHEST_PRECEDENCE)@Slf4jpublic class TraceFilter extends OncePerRequestFilter {    private static final String TRACE_ID_KEY = "traceId";    private static final String REQUEST_TRACE_ID = "Xpilot-Trace-Request";    private static final String RESPONSE_TRACE_ID = "Xpilot-Trace-Response";    private static final String ACTUATOR = "actuator";
    @Override protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException { String apiUrl = request.getRequestURI(); if (apiUrl.contains(ACTUATOR)){ //过滤检活机制 actuator/health filterChain.doFilter(request, response); }else { StopWatch stopWatch = new StopWatch(); stopWatch.start(); try { String responseTraceId; String requestTraceId = request.getHeader(REQUEST_TRACE_ID); if (StringUtils.isEmpty(requestTraceId)) { responseTraceId = traceStart(); } else { responseTraceId = traceStart(requestTraceId); } response.addHeader(RESPONSE_TRACE_ID, responseTraceId); filterChain.doFilter(request, response); } finally { stopWatch.stop(); log.info("web api [{}] execute finished,totally cost {} ms!", apiUrl, stopWatch.getTotalTimeMillis()); traceEnd(); } } }
    public static String traceStart() { String traceId = generateTraceId(); return traceStart(traceId); }
    public static String traceStart(String traceId) { MDC.put(TRACE_ID_KEY, traceId); return traceId; }
    public static void traceEnd() { MDC.clear(); }
    /** * 生成跟踪ID * * @return */ private static String generateTraceId() { return RandomStringUtils.randomAlphanumeric(11); }}<properties>     <!-- 在log配置文件中将traceId打印出来-->   <property name="PATTERN">%d{yyyy-MM-dd HH:mm:ss.SSS} [%X{traceId} ] -%5p [%thread] %level %logger - %msg%n</property> <Property name="FILE_PATH">/logs/${server_name}¥{}</Property> </properties>

    上面的代码其实抽出来写到common-tools,然后其他服务升级以下版本也是可以的。但推动他们做太难了。而且有2个缺点:

    a.如果业务内部用多线程或异步框架,线程号不一样 b.他们都引入的话每个服务生成的都是各自的,无法串起来,解决方案其实可以写一个Interceptor:

    //因为我们用的是feign,其他组件类似@Componentpublic class FeignClientInterceptor implements RequestInterceptor {    private final Logger log = LoggerFactory.getLogger(FeignClientInterceptor.class);    @Override    public void apply(RequestTemplate template) {        //todo 这里从header取值,有则用,没有则重新生成        template.header("requestId", UUID.randomUUID().toString());        log.info("feign header:{}", JSON.toJSONString(template.headers()));    }}

    组件方面其实类似的产品除了skywalking,还有如Cat(360金融用的就是这个)、Pinpoint、zipkin等。

    通过查阅资料:https://zhuanlan.zhihu.com/p/60436915、https://www.jianshu.com/p/9bb6601908,对比之下,挑了一个相对来说轻量级,业务方适配的话改起来也不麻烦的组件——zipkin。

    zipkin是啥:

    Zipkin 是一个分布式跟踪系统,它采集一些微服务延迟数据或其它需要采集的数据,有助于更好的解定位、解决问题。


    译:tracer位于你的应用程序中,并记录发生的操作序和元数据,tracer通常来说是一个类库工具,对用户使用透明。举例来说,使用tracer应用服务器会记录接收请求的时间和响应时间。收集到的跟踪数据称为 Span。

    整体架构图如下:


    zipkin使用traceId表示一次请求,一次请求可能需要n服务进行处理,每个服务的处理是一个span(理解为最小的工作单元-跨度Id),它包含:id,parentId,name,timestamp,duration,annotations等,trace就是span集合。

    微服务通常按业务区分(类似垂直分库)服务,通过对外暴露的一个接口,供客户端调用,一个业务可能需要很多个服务协同才能完成这个功能,如果链路上任何一个服务出现问题或者网络超时,都会形成导致接口调用失败。随着业务的不断扩张,服务之间互相调用会越来越复杂。就像这样:



    搞不好也有可能更复杂,看得让人虎躯一震,说排查问题非死即残也不为过



    Spring Cloud Sleuth 主要功能就是在分布式系统中提供追踪解决方案,并且兼容支持了 zipkin,你只需要在pom文件中引入相应的依赖(spring-cloud-sleuth-zipkin)即可,方便集成zipkin实现(指的是Zipkin Client,而不是Zipkin服务器)。

  4. zipkin踩坑

    有了这个关键字随便一搜全是,由于太过心急,想着跟资料一步步走,总不会错吧,直接从:https://repo1.maven.org/maven2/io/zipkin/zipkin-server/  这里下了个最新的包zipkin-server-2.24.3.jar,然后第一跪就来了



    莫名其妙,这么打击我真的好么,后来闲逛无意打开另一个人家说下载有讲究:


    故地重游一看,还真有2个jar,大意了!重新下载xxx.exec.jar:

    执行以下命令(官网有详细的说明:https://zipkin.io/pages/quickstart.html):

    //模仿的注意路径java -jar C:/xxx/xxx/Downloads/zipkin-server-2.24.3-exec.jar

    zipkin提供的默认端口是9411,图中最下面也有,本机启动的直接浏览器访问:http://localhost:9411 就可以看到组件对应的UI(每个版本可能不一样)

    目前来说成功的基础有了,下面就是如何和服务关联起来?这个资料也蛮多的。

    首先在配置文件中增加如下配置:

    spring.zipkin.base-url = http://localhost:9411/# 将采样比例设置为 1.0,也就是全部都需要。默认是0.1也就是10%,spring.sleuth.sampler.probability = 1.0#设置限速采集则probability失效# spring.sleuth.sampler.rate = 100

    我们目前采用的是apollo(properties)管理配置文件,所以写法和nacos(yaml)不一样,如果需要可以将.换成: 缩近,或者使用工具(https://toyaml.com/index.html)转换:



    其次:在应用中增加maven依赖,实测下来有两种方式:

    <!--链路追踪 Sleuth--><!--方法一:分别引入下面2个依赖--><!-- <dependency><groupId>org.springframework.cloud</groupId>  <artifactId>spring-cloud-starter-zipkin</artifactId>  </dependency><dependency>  <groupId>org.springframework.cloud</groupId>  <artifactId>spring-cloud-starter-sleuth</artifactId></dependency>-->
    <!--方法一:引入集成好的一个 --><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-sleuth-zipkin</artifactId></dependency>

    这里需要注意,有的资料说要在启动类上加@EnableZipkinServer解,我怀疑跟版本有关,反正我不加也可以跑起来:


    到这里为止,启动服务可以看到已经有数据在zipkin中了


    现在有一个问题就是这些数据不是外部触发的http请求,是我写的定时任务监控JVM中的使用情况:

    这样的频率一天跑下来有很多记录,统计这些没意义也浪费资源,所以就想着能不能过滤掉这些,然后就第二跪如期而遇,查询了N个资料,总结下来就是改几个地方:

    a. 配置skipPattern 不生效又试过skip_patternspring.sleuth.web.skipPattern = show_jvm_bytes,showJvmBytesb. 配置addtionalSkipPattern,不生效又试过:additional_skip_patternspring.sleuth.web.additionalSkipPattern = show_jvm_bytes,showJvmBytesc. 配置 async.enabled 不生效spring.sleuth.async.enabled = false  d.配置 scheduled.enabled 不生效spring.sleuth.scheduled.enabled=false  e.配置 quartz.enabled 不生效spring.sleuth.quartz.enabled=false


    看我apollo的配置文件,删了加,加了删,来来回回,人都麻了,最后使出了杀手锏——穷举,既然资料显示就这些配置,那我就把这些全部加上,然后再一个个注释掉,应用启了停,停了起,多少次记不得了,但总算试出来了,需要同时配置以下2个配置:

    spring.sleuth.async.enabled = falsespring.sleuth.scheduled.enabled = false

    其中任何一个关闭马上又会被zipkin采集到,真的太不容易了!感觉生活又充满了希望。接下来还有一个问题——简称第三硊


    上面介绍了,zipkin以traceId表示一次请求,spanId表示其中一个小单元,这2个ID如何出现在应用的日志中,帮助开发同学定准问题呢?资料显示的人家把上面的做完默认就有了,我这边每走一步都累,又查(hua)了好久,大致就是Spring默认是logback,而我们目前用的是log4j2,所以需要额外搞一下,熟悉的味道又来了

    a. 配置instrumentation-type 没用spring.sleuth.reactor.instrumentation-type=decorate_on_eachb. 配置增加 打印traceId & spanId 没用logging.pattern.level=%5p [${spring.zipkin.service.name:${spring.application.name:}},%X{traceId},%X{spanId}]c. 关掉sl4j日志 没用spring.sleuth.log.slf4j.enabled =false

    上面的排列组合我又穷举了一下,不能说一点没用,只能说毫无起色,这回运气没站在我这边。不卖关子,这里的正招是在现在的配置文件中增加[%X{X-B3-TraceId}-%X{X-B3-SpanId}-%X{X-B3-ParentSpanId}]

    <properties>  <!-- 注意观察pattern 我去掉了一个不需要的 -->  <property name="PATTERN">[%X{X-B3-TraceId} %X{X-B3-SpanId}] %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %level %logger - %msg%n</property>  <Property name="FILE_PATH">/logs/${server_name}</Property></properties>

    服务重启后可以看到启动成功时对应的traceId和spanId已在日志中显示 ,后续开发同学就可以通过zipkin中观察存疑的请求,拿到对应的Id去日志文件grep,一个请求的完整上下文就出来了。


  5. 持久化

    上面的步骤完成了应用日志向zipkin的发送与展示,存在一个问题就是zipkin默认是将数据存储在内存中,服务重启就会消失。所以想着能把这些存起来,方便追查历史问题。zipkin扩展提供了数据存储到ES、Mysql

    上面说了ES这玩意儿运维怕浪费钱,所以我选择熟悉的mysql,首先我们需要搞一个库来装这些数据,DDL的话我找的时候百度出来的资料推荐的github地址已经报404:

    然后我退到根目录去一层层找storage模块,发布地址已经变成:https://github.com/openzipkin/zipkin/blob/master/zipkin-storage/mysql-v1/src/main/resources/mysql.sql,如果你看到这里想自己试试的话,发现类似的问题不要慌,试试我说的办法,也许对你有帮助

    接下来就是找个机器执行里面的DDL,成功会刷新下,如果看到库和表都有的话,证明就可以了。



    然后启动的命令就要改一下了,指定对应的存储方式和机器信息:

    //模仿的注意路径 和${}中的变量 ,请自行更换java -jar C:/xxx/xxx/zipkin-server-2.24.3-exec.jar --STORAGE_TYPE=mysql --MYSQL_USER=${user} --MYSQL_PASS=${pwd} --MYSQL_HOST=${ip} --MYSQL_TCP_PORT=${port}--MYSQL_DB=${db}

    不出意外的话你会看到这个东东,证明你数据源指定成功,后续zipkin中的数据就会自动写到对应的表中,通过SQL语句查一下就可以看到了,祝你好运,提前给你庆祝先



    上图可知,数据持久化到DB后,traceId和spanId都是数字(DDL中定义的是BIGINT),但UI界面展示出来的是字符串,那如果要用traceId或spanId作为where条件查询DB又该如何做呢?我翻遍所有资料,最后在:https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/mysql-v1 找到了答案:

    -- 参数前+xselect * from zipkin_spans where trace_id = x'7cf94d650b00d531';
    -- 调用hex函数SELECT lower(concat(CASE trace_id_high WHEN '0' THEN '' ELSE hex(trace_id_high) END,hex(trace_id))) AS trace_id, lower(hex(parent_id)) as parent_id, lower(hex(id)) as span_id, name, from_unixtime(start_ts/1000000) as timestampFROM zipkin_spanswhere (start_ts/1000000) > UNIX_TIMESTAMP(now()) - 5 * 60;

    以些类推,该方法应该适合所有像这样存储数据的设计方式;既然查询可以这样写,那么清理必要的日志提升效率,删除也可以


  6. 扩展

    我懂大家,很多同学开发机都是windows,部署在Linux,那在Linux上如何弄呢?我也试着搞了一下:

    //step 1 .拉取zipkin,不指定版本为latestdocker pull openzipkin/zipkin:2.24.3

    然后按官网说明,启动docker:

    这里遇到一个问题,我启动后用机器外网访问: http://ip:9411 失败,询问运维同学得知我们这边机器对外的端口只有8000~8300,所以我们启动时指定的对外端口要调整到上面的范围内(也不能是机器上其他应用已占用的端口):

    // 因为端口限制,该命令失败 docker run --name zipkin-server -d --restart=always -p 9411:9411 openzipkin/zipkin:2.24.3 --STORAGE_TYPE=mysql --MYSQL_USER=${user} --MYSQL_PASS=${pwd} --MYSQL_HOST=${ip} --MYSQL_TCP_PORT=${port} // 调整端口启动成功 docker run --name zipkin-server -d --restart=always -p 8030:9411 openzipkin/zipkin:2.24.3 --STORAGE_TYPE=mysql --MYSQL_USER=${user} --MYSQL_PASS=${pwd} --MYSQL_HOST=${ip} --MYSQL_TCP_PORT=${port}



  7. UI说明

    每个版本的UI不一样,我看资料时网上有的UI还是老的,我这版本很多都合并了,大致解释如下:

    Annotation:

    用于定位一个request的开始和结束,记录了时间点,当这个annotation被记录了,则认为一个基本单元工作完成。cs/sr/ss/cr意义如下:

    cs:Client Start,表示客户端发起请求;一个span的开始

    ss:Server Start,表示服务端收到请求

    sf:Server Finish,表示服务端完成处理,并将结果发送给客户端

    cf:Client Finish,表示客户端获取到服务端返回信息;一个span的结束


    它们之间还存在如下判定关系,可以自行计算,用于定位是逻辑处理的性能问题还是网络延迟引起的性能问题:

    ss-cs:网络延迟

    sf-ss:逻辑处理时间

    cf-cs:整个流程时间

    目前测试环境上线后通过zipkin的依赖界面可以看到服务间的调用关系:


  8. 集成kafka

    上面是简单的整合,每一个服务产生消息后都要发往zipkin,由zipkin写DB。消息默认是以http的方式发送(所以网上很多资料才说spring.zipkin.sender.type = web可以不写)。配置该值应用部署大致如下:



    查看源码可知,目前zipkin支持以下4中方式收集产生的消息,每一个类上面都配置了特定的条件,用来路由配置对应的Sender:


    源码入口org.springframework.cloud.sleuth.zipkin2.sender.ZipkinSenderConfigurationImportSelector


    以web的方式存在一个问题:当zipkin服务挂掉了(亲测,不会阻塞业务),那后续产生的消息就会丢失,如果丢失的正好是对排查问题有关键作用的,那就完犊子了。所以可以借助MQ,应用产生消息先写MQ,zipkin按自己的“能力”去消费即可。调整后的部署结构图大致如下:

    这个过程踩了太多太多太多,网上的hello world都是集成的rabbit,我们应用正好使用的是kafka,自然就选择kafka作为消息中间件。

    修改对应的配置:

    spring.zipkin.sender.type = kafkaspring.kafka.bootstrap-servers = ip:port

    奇怪的事情发生了,触发请求后,zipkin UI没有对应的记录,查看DB也没有对应的记录,说明要么配置没生效,要么没消息产生。先排查kafka工作目录下的topic,打开数据文件(二进制需要装插件),大致可以看到有traceId相关的数据,那问题就剩下为什么没写库了

    梳理流程,发现这个时候角色应该是业务应用发生生产者,还差一个消费者,谁是消费者?想到应该不会让使用者还要实现什么接口来消费,当配置定为kafka时,zipkin自身应该是一个消费者,所以需要在启动zipkin的命令中同时指定kafka。如何指定呢?按照官网说明要增加参数:KAFKA_BOOTSTRAP_SERVERS,如何增加呢?

    windows还好:

    //模仿注意路径java -jar C:/xxx/xxx/zipkin-server-2.24.3-exec.jar --KAFKA_BOOTSTRAP_SERVERS=localhost:9091

    因为是测试集成kafka,为避免来回发版到测试环境,我本地搭得有kafka环境,所以指向的是localhost,你如果也是本地,那一定要记得先启动zookeeper、再启动kafka(关于2个环境如何搭建请看ZooKeeper 集群环境搭建与底层原理kafka 集群环境搭建踩过的坑这2篇)


    Linux下增加参数就比较难过了

    可能是我太菜,我理解很简单,在上面持久化命令成功的基础上,加一个对应的key不久行了么?实测结果是我放在前、放在后、加-d、加-e都试过了,全部失败,全部失败,全部失败,整的没脾气了,心想要不就用默认的http得了,反正也不影响业务,好在最后坚持下来了:

    docker run --name zipkin-server -e KAFKA_BOOTSTRAP_SERVERS=ip:port -d --restart=always -p 8030:9411 openzipkin/zipkin:2.24.3 --STORAGE_TYPE=mysql --MYSQL_USER=${user}--MYSQL_PASS=${owd} --MYSQL_HOST=${ip} --MYSQL_TCP_PORT=${port} --MYSQL_DB=zipkin


    反复试验,最终看到了久违的zipkinUI查询出数据了


  9. kafka扩展

    和运维沟通准备按这个模式部署生产,但反应有一个问题,当前服务是无论因为业务隔离还是因为性能我们搭了多个kafka实例,不是每一个服务配置的kafka地址都一样,但zipkin服务只能监听一个kafka,该如何是好呢?

    思路1:如果能独立指定zipkin kafka配置的key,那问题很好解决。查看源码得知kafka配置的key是固定的:spring.kafka.bootstrp.servers

    想想为什么不能指定义呢?

    站在作者的角度(猜一波:一个服务有可能使用很多spring cloud的组件,sleuth作为组件之一,统一用spring.kafka.bootstrap.server这个key没有问题,不然,凡是依赖kafka的组件都需要自定义一个kafka_key,但value又全部一样,暂且不说配置冗余、浪费用户时间,这种模式会不会很怪。所以,问题出现在使用方的结构上,无论什么原因致使业务方搭了多个kafka实例(集群),就应该都有对应的消费端。同理,zipkin若也搭成集群就可以解决——这是思路2。

    但现实生活是,集群需要钱买服务器,运维预算不允许,还给了我思路三

    反正spring都是开源的,那就修改源码,把里面的key换一个重新打成jar放到自己的私服上。我不知道啥时候给他的错觉我有这本事,道理是这个道理,但我自己几斤几两天天称,心里还是有点B数的,这个方案肯定不行

    思路4:操作spring中的bean,通过获取容器中的Bean拿到zipkin kafka对应类型的Bean对象,通过反射拿到对应的属性值,修改指定kafka地址地方为自定义key指向的地方。该方法逻辑成立的,但最终没采用,因为看源码有有新的办法,这里只给出伪代码:

    @ConditionalOnProperty(value = "spring.zipkin.sender.type", havingValue = "kafka")@AutoConfigureAfter(ZipkinKafkaSenderConfiguration.class)public class SwaggerConfig {    @Autowired    void init(KafkaSender sender){        Class<KafkaSender> kafkaSenderClass = KafkaSender.class;        Field propertiesFeild = kafkaSenderClass.getField("properties");        propertiesFeild.setAccessible(true);        Properties properties = (Properties)propertiesFeild.get(sender);        properties.put("bootstrap.servers", "localhost:9092");    }

    思路5分析源码(org.springframework.cloud.sleuth.zipkin2.ZipkinAutoConfiguration)发现,在构建zipkin 上报消息的地方有一个Sender,这个Sender的类型是前面讲的4个类型之一,该Bean创建的条件是当容器中不存在对应的bean才创建(ConditionalOnMissingBean),所以当配置的值为kafka时,我们先往容器中注册一个对应类型的Bean会导致源码中的这里不会在创建,程序运行时就会使用我们创建的Bean完成相关逻辑。


    综上我们按照些处逻辑创建一个zipkin kafka类型的Bean放置到容器上下文中, 为了不让最终zipkin的业务方都做这个事,所以写到common模块中,让common依赖引用zipkin相关maven,最终业务方适配zipkin只需要升级maven common模块的版本, 不用另行引入其它依赖:

    import lombok.extern.slf4j.Slf4j;import org.apache.kafka.common.serialization.ByteArraySerializer;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.autoconfigure.AutoConfigureAfter;import org.springframework.boot.autoconfigure.AutoConfigureBefore;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.boot.autoconfigure.kafka.KafkaProperties;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration;import org.springframework.cloud.sleuth.zipkin2.ZipkinProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import zipkin2.CheckResult;import zipkin2.Span;import zipkin2.reporter.AsyncReporter;import zipkin2.reporter.Reporter;import zipkin2.reporter.ReporterMetrics;import zipkin2.reporter.Sender;import zipkin2.reporter.kafka.KafkaSender;
    import java.util.Map;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;
    /** * @Title: * @Package * @Description: * @author: zhangzj5@xiaopeng.com * @date: * @version: V1.0 */@Slf4j@Configuration@ConditionalOnProperty(value = { "spring.sleuth.enabled", "spring.zipkin.enabled","spring.zipkin.kafka.bootstrap.servers"},matchIfMissing = true)@AutoConfigureBefore(TraceAutoConfiguration.class)@AutoConfigureAfter(name = "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration")@EnableConfigurationProperties({KafkaProperties.class,ZipkinProperties.class})public class ZipkinKafkaConfig {
    //包在内部类中是因为上面ConditionalOnProperty指定的其它条件的判断,为了不影响继续配置web的正常逻辑 //如果确定zipkin一定用kafka来做逻辑,则可以不包在该类中,直接注册一个kafka bean到窗口中 @ConditionalOnProperty( value = {"spring.zipkin.sender.type"}, havingValue = "kafka" ) public static class EnableKafkaSender{ @Value("${spring.zipkin.kafka.topic:zipkin}") private String topic; @Value("${spring.zipkin.kafka.bootstrap.servers:}") private String BOOTSTRAP_SERVERS; @Bean("zipkinReporter")//模仿org.springframework.cloud.sleuth.zipkin2.ZipkinAutoConfiguration下的reporter public Reporter<Span> reporter(ReporterMetrics reporterMetrics, ZipkinProperties zipkin, @Qualifier("kafkaZipKinSender") KafkaSender kafkaSender) { CheckResult checkResult = checkResult(kafkaSender, 1_000L); logCheckResult(kafkaSender, checkResult);
    // historical constraint. Note: AsyncReporter supports memory bounds AsyncReporter<Span> asyncReporter = AsyncReporter.builder(kafkaSender) .queuedMaxSpans(1000) .messageTimeout(zipkin.getMessageTimeout(), TimeUnit.SECONDS) .metrics(reporterMetrics).build(zipkin.getEncoder());
    return asyncReporter; } //为了编译通过 抄org.springframework.cloud.sleuth.zipkin2.ZipkinAutoConfiguration下的logCheckResult private void logCheckResult(Sender sender, CheckResult checkResult) { if (log.isDebugEnabled() && checkResult != null && checkResult.ok()) { log.debug("Check result of the [" + sender.toString() + "] is [" + checkResult + "]"); } else if (checkResult != null && !checkResult.ok()) { log.warn("Check result of the [" + sender.toString() + "] contains an error [" + checkResult + "]"); } } //为了编译通过 抄org.springframework.cloud.sleuth.zipkin2.ZipkinAutoConfiguration下的checkResult /** Limits {@link Sender#check()} to {@code deadlineMillis}. */ static CheckResult checkResult(Sender sender, long deadlineMillis) { CheckResult[] outcome = new CheckResult[1]; Thread thread = new Thread(sender + " check()") { @Override public void run() { try { outcome[0] = sender.check(); } catch (Throwable e) { outcome[0] = CheckResult.failed(e); } } }; thread.start(); try { thread.join(deadlineMillis); if (outcome[0] != null) { return outcome[0]; } thread.interrupt(); return CheckResult.failed(new TimeoutException( thread.getName() + " timed out after " + deadlineMillis + "ms")); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return CheckResult.failed(e); } } //自定义zipkin kafka key //模仿ZipkinKafkaSenderConfiguration 下的 kafkaSender @Bean("kafkaZipKinSender") KafkaSender kafkaSender(KafkaProperties config) { Map<String, Object> properties = config.buildProducerProperties(); properties.put("key.serializer", ByteArraySerializer.class.getName()); properties.put("value.serializer", ByteArraySerializer.class.getName()); // Kafka expects the input to be a String, but KafkaProperties returns a list //Object bootstrapServers = properties.get(BOOTSTRAP_SERVERS);a,b properties.put("bootstrap.servers", BOOTSTRAP_SERVERS); return KafkaSender.newBuilder().topic(this.topic).overrides(properties) .build(); } }}
    <!--common 依赖zipkin--><dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-sleuth-zipkin</artifactId>    <version>2.2.8.RELEASE</version></dependency>

    通过上面的代码可知,业务方适配zipkin时可指定zipkin 指向的kafka地址,key为:spring.zipkin.kafka.bootstrap.servers,实现了和默认的spring.kafka.bootstrap.servers区分。消息的生产和消费可通过监控地址查看:

    巨坑提醒:使用kafka时,测试环境下topic和consumer group都是可以自动创建的,几乎就没管这块。但上了生产后发现没消费,经排查是没有group,生产的kafka实例不会自动创建,提工单创建后group后消息开始消费落库了。

    想看zipkin server端的日志或优化性能的话可以增加参数:

    # 更多的参数说明可通过源码查看zipkin-server-shared.yml文件,里面有详细说明docker run --name zipkin-server -e KAFKA_BOOTSTRAP_SERVERS=${ip}:${port}-e KAFKA_STREAMS=10 -e LOGGING_LEVEL_ZIPKIN=DEBUG -e LOGGING_LEVEL_ZIPKIN2=DEBUG -e LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_WEB=DEBUG -d --restart=always -p 8030:9411 openzipkin/zipkin:2.24.3 --STORAGE_TYPE=mysql --MYSQL_USER=${user}--MYSQL_PASS=${pwd} --MYSQL_HOST=${host} --MYSQL_TCP_PORT=${port}--MYSQL_DB=zipkin --MYSQL_MAX_CONNECTIONS=50


  10. 整合

    如果原来的项目已经有traceId了,那再集成zipkin,又会重新生成一个到日志,看起来就有点怪怪的了。

    可替换原有traceId生成的地方,也变成zipkin结构下的traceId(参考:https://zhuanlan.zhihu.com/p/366790978):


    import brave.Tracer;@Autowired private Tracer tracer;//put in methodString traceId = tracer.currentSpan().context().traceIdString();


    写到这里一看有约8000字数,怕太多客官看得太累,本来准备zipkin的集成准备分成2篇:基础集成和kafka集成,转念一想,不看的无论分多细也不看,看的无论多长也会看,所以对看到这里的人说一声:谢谢


    划水专用:

    https://zipkin.io/pages/quickstart.html

    http://www.hzhcontrols.com/new-1613274.html

    https://cloud.spring.io/spring-cloud-sleuth/reference/html/

    https://zhuanlan.zhihu.com/p/535826598

    https://github.com/openzipkin-contrib/zipkin-storage-kafka/blob/master/module/src/main/resources/zipkin-server-storage-kafka-only-dependencies.yml

    https://cloud.spring.io/spring-cloud-static/spring-cloud-sleuth/1.2.1.RELEASE/#_adding_to_the_project

    https://www.cnblogs.com/huigui-mint/p/17464495.html

    https://cloud.spring.io/spring-cloud-static/spring-cloud-sleuth/1.2.1.RELEASE/#_adding_to_the_project


看到的同学觉得有用,点赞、关注、转发搞一个阔以不

少年易老学难成,一寸光阴不可轻。
——朱熹《偶成》

晚霞程序员
一位需要不断学习的30+程序员……