Spring Boot整合Kafka+SSE实现数据实时展示

科技   2024-09-22 11:01   河北  


为什么使用Kafka

不选用RabbitMQ或RocketMQ的主要原因在于Kafka作为Hadoop生态系统中的一个核心组件,其对于大数据环境具有出色的适应性和集成度。鉴于当前业务场景并未涉及对死信队列的需求,Kafka成为了一个理想的选择。然而,值得注意的是,当面对数据更新频率较低的情况时,Kafka的数据拉取速度可能会相应减缓,因此,在对消息处理的实时性有极高要求的场景下,可能需要考虑其他消息队列系统作为替代方案。采用消息队列的关键在于其提供的实时性保障以及作为广播机制进行高效消息分发的能力。

为什么使用SSE

在利用Websocket进行信息传输时,数据通常会被编码为二进制格式,这一过程可能会引入一定的时间开销。相比之下,Server-Sent Events (SSE) 直接以文本形式传输数据,从而避免了二进制编码带来的额外时间损耗。由于Websocket支持双向通信,当应用于日志读取等场景时,如果多个客户端连接至Websocket日志流,可能会因发送大量异常信息而给服务端和日志系统带来不必要的负担,甚至影响性能。而SSE作为单向通信协议,则无需担心此类问题,自然提升了系统的稳定性和安全性。

此外,SSE具备自动断线重连的功能,这一特性简化了客户端在网络波动或短暂断开时的恢复流程。相反,Websocket协议标准本身并不直接包含心跳机制,这意味着在长时间无数据交换的情况下,连接可能会被服务器或网络设备视为空闲而断开。因此,在使用Websocket时,通常需要开发者自行实现心跳机制来保持连接的活跃状态。

鉴于SSE的这些特点,它也可以作为Websocket的一个有效替代方案,特别是在对实时性要求不是极端严格的应用场景中。例如,对于普通项目而言,如果不需要实现复杂的双向交互或极低的延迟响应,那么使用SSE进行数据传输和状态更新就已经足够,同时还能享受到其简单易用和自动断线重连等优势。此外,通过巧妙利用SSE的特性和超时控制机制,还可以实现如扫码登录等特定功能,进一步提升用户体验和系统的灵活性。

代码示例

pom.xml引入SSE和Kafka

<!-- SSE,一般springboot开发web应用的都有 -->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
<!-- kafka,最主要的是第一个,剩下两个是测试用的 -->
       <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency
            <groupId>
org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>

application.properties增加Kafka配置信息

# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

配置Kafka信息

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

配置controller,通过web方式开启效果

@RestController
@RequestMapping(path = "sse")
public class KafkaSSEController {

    private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @Resource
    private SseEmitter sseEmitter;

    /**
     * @param message
     * @apiNote 发送信息到Kafka主题中
     */

    @PostMapping("/send")
    public void sendMessage(@RequestBody String message) {
        kafkaTemplate.send("my-topic", message);
    }

    /**
     * 监听Kafka数据
     *
     * @param message
     */

    @KafkaListener(topics = "my-topic", groupId = "my-group-id")
    public void consume(String message) {
        System.out.println("Received message: " + message);
        //使用接口建立起sse连接后,监听到kafka消息则会发送给对应链接
        SseEmitter sseEmitter = sseCache.get(id); if (sseEmitter != null) { sseEmitter.send(content); }
    }

    /**
     * 连接sse服务
     *
     * @param id
     * @return
     * @throws IOException
     */

    @GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter push(@RequestParam("id") String id) throws IOException {
        // 超时时间设置为5分钟,用于演示客户端自动重连
        SseEmitter sseEmitter = new SseEmitter(5_60_000L);
        // 设置前端的重试时间为1s
        // send(): 发送数据,如果传入的是一个非SseEventBuilder对象,那么传递参数会被封装到 data 中
        sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("连接成功"));
        sseCache.put(id, sseEmitter);
        System.out.println("add " + id);
        sseEmitter.send("你好", MediaType.APPLICATION_JSON);
        SseEmitter.SseEventBuilder data = SseEmitter.event().name("finish").id("6666").data("哈哈");
        sseEmitter.send(data);
        // onTimeout(): 超时回调触发
        sseEmitter.onTimeout(() -> {
            System.out.println(id + "超时");
            sseCache.remove(id);
        });
        // onCompletion(): 结束之后的回调触发
        sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
        return sseEmitter;
    }
    /**
     * http://127.0.0.1:8080/sse/push?id=7777&content=%E4%BD%A0%E5%93%88aaaaaa
     * @param id
     * @param content
     * @return
     * @throws IOException
     */

    @ResponseBody
    @GetMapping(path = "push")
    public String push(String id, String content) throws IOException {
        SseEmitter sseEmitter = sseCache.get(id);
        if (sseEmitter != null) {
            sseEmitter.send(content);
        }
        return "over";
    }

    @ResponseBody
    @GetMapping(path = "over")
    public String over(String id) {
        SseEmitter sseEmitter = sseCache.get(id);
        if (sseEmitter != null) {
            // complete(): 表示执行完毕,会断开连接
            sseEmitter.complete();
            sseCache.remove(id);
        }
        return "over";
    }

}

前端方式

<html>
  <head>
    <script>
      console.log('start')
      const clientId = "your_client_id_x"; // 设置客户端ID
      const eventSource = new EventSource(`http://localhost:9999/v1/sse/subscribe/${clientId}`); // 订阅服务器端的SSE

      eventSource.onmessage = event => {
        console.log(event.data)
        const message = JSON.parse(event.data);
        console.log(`Received message from server: ${message}`);
      };

      // 发送消息给服务器端 可通过 postman 调用,所以下面 sendMessage() 调用被注释掉了
      function sendMessage() {
        const message = "hello sse";
        fetch(`http://localhost:9999/v1/sse/publish/${clientId}`, {
          method: "POST",
          headers: { "Content-Type": "application/json" },
          body: JSON.stringify(message)
        });
        console.log('dddd'+JSON.stringify(message))
      }
      // sendMessage()
    
</script>
  </head>
</html>


Java技术前沿
专注分享Java技术,包括但不限于 SpringBoot,SpringCloud,Docker,消息中间件等。
 最新文章