使用 RisingWave、Upstash 和 Metabase 进行实时航班跟踪

文摘   科技   2024-05-28 16:01   北京  

在物流和航空领域,实时数据处理、数据可视化和综合看板至关重要。它们能够提升供应链性能、优化运输并提高效率。特别是对商务航空来说,实时分析可以提高效率、可持续性和客户关注度,从而改善决策并节约成本。它还帮助确保安全运行、优化空域使用并能协助空中交通管制员。总体而言,实时数据处理可以帮助航空业提升客户体验并改善业务运营。

在本文中,我们使用 RisingWave、Upstash 和 Metabase 建立了一个实时航班跟踪系统。我们利用 Aviationstack API 实时获取航班数据,然后将这些数据传输到 Upstash 的一个 Kafka 主题中。随后,我们将这些数据流摄取到 RisingWave 中,继而创建物化视图 (MV) 以进行深入的航班数据分析。物化视图随时提供最新数据,并可以即时查询。我们还使用 Metabase 创建图表、表格和综合的数据看板,用于实时航班跟踪。

使用 RisingWave、Upstash 和 Metabase 实现的实时航班跟踪

1在 Upstash 中设置 Kafka

Upstash 是一个无服务器平台,提供 Redis、Kafka 和 Qstash 服务,具有可扩展性、高级安全选项和专门支持的优势。Upstash Kafka 使用 Apache Kafka 进行部署,并提供一个无服务器的 Kafka 平台,配备连接器、模式注册表和监控,为有高级需求的客户提供各种计划。

注册 Upstash 账户

注册免费的 Upstash 云账户,以访问 Kafka 服务。要创建账户,请访问 Upstash Cloud Account[1]

Upstash:账户注册和登录流程

创建 Kafka 集群

登录后,使用以下信息创建 Kafka 集群:

  • Name: 给您的 Kafka 集群一个唯一名称以便识别。
  • Region: 选择您的 Kafka 集群托管的区域。
  • Type: 选择适合您需求的集群类型。

Upstash:创建 Kafka 集群

设置 Kafka 主题

创建 Kafka 集群后,设置 Kafka 主题。Upstash Kafka 提供一些默认配置,包括分区数量和保留策略,简化了设置过程。

Upstash:创建 Kafka 主题

创建 Kafka 集群和 Kafka 主题后,我们就可以利用 Upstash Kafka 和 RisingWave 的功能来构建流处理应用程序和管道。如果您还想了解更多有关以上流程的信息,请参阅 Upstash Kafka 文档[2]

我们摄取到 Upstash Kafka 主题的示例数据包含来自航空 API 的实时数据,包括机场名称、航班状态、航班位置等信息。

{
  "flight_date""2024-05-16",
  "flight_status""scheduled",
  "departure_airport""Auckland International",
  "departure_timezone""Pacific/Auckland",
  "departure_iata""AKL",
  "departure_icao""NZAA",
  "departure_terminal""D",
  "departure_gate""28",
  "departure_delay"null,
  "departure_scheduled""2024-05-16T06:30:00+00:00",
  "departure_estimated""2024-05-16T06:30:00+00:00",
  "departure_actual"null,
  "departure_estimated_runway"null,
  "departure_actual_runway"null,
  "arrival_airport""Wellington International",
  "arrival_timezone""Pacific/Auckland",
  "arrival_iata""WLG",
  "arrival_icao""NZWN",
  "arrival_terminal"null,
  "arrival_gate""15",
  "arrival_baggage"null,
  "arrival_delay"null,
  "arrival_scheduled""2024-05-16T07:40:00+00:00",
  "arrival_estimated""2024-05-16T07:40:00+00:00",
  "arrival_actual"null,
  "arrival_estimated_runway"null,
  "arrival_actual_runway"null,
  "airline_name""Singapore Airlines",
  "airline_iata""SQ",
  "airline_icao""SIA",
  "flight_number""SQ4438",
  "flight_iata""SQ4438",
  "flight_icao""SIA4438",
  "codeshared_airline_name""air new zealand",
  "codeshared_airline_iata""nz",
  "codeshared_airline_icao""anz",
  "codeshared_flight_number""401",
  "codeshared_flight_iata""nz401",
  "flight_info""Singapore Airlines flight SQ4438 is currently in the air, flying from Auckland International (AKL) to Wellington International (WLG)"
}

2将数据从 Upstash Kafka 摄取到 RisingWave

对于摄取和处理流数据,我们有两种可用方法:开源的 RisingWave 和托管服务 RisingWave Cloud。在本文中,我们将重点使用 RisingWave Cloud,它的用户体验更加便捷友好,简化了管理和利用 RisingWave 进行航班跟踪解决方案的操作。

创建 RisingWave 集群

要在 RisingWave Cloud[3] 中创建 RisingWave 集群并探索各种功能,您可以注册 Free Plan 来免费测试 RisingWave 的功能。有关如何创建 RisingWave 集群并开始使用的详细说明,请参阅官方 RisingWave 文档[4]。它将为您提供设置和探索 RisingWave 功能的分步指导。如果您需要额外的帮助,请加入我们的 Slack 社区[5]

RisingWave Cloud:账户注册和登录流程

将数据流摄取到 RisingWave

现在我们已经在 Upstash 中(以 JSON 格式)设置了 Kafka 数据流,我们可以使用以下 SQL 语句连接到这些数据流。有关更多信息,请参阅在 RisingWave 中从 Upstash Kafka 摄取数据[6]

CREATE SOURCE flight_tracking_source(
    flight_date VARCHAR,
    flight_status VARCHAR,

    departure_airport VARCHAR,
    departure_timezone VARCHAR,
    departure_iata VARCHAR,
    departure_icao VARCHAR,
    departure_terminal VARCHAR,
    departure_gate VARCHAR,
    departure_delay INTERVAL,
    departure_scheduled TIMESTAMP WITH TIME ZONE,
    departure_estimated TIMESTAMP WITH TIME ZONE,
    departure_actual TIMESTAMP WITH TIME ZONE,
    departure_estimated_runway TIMESTAMP WITH TIME ZONE,
    departure_actual_runway TIMESTAMP WITH TIME ZONE,

    arrival_airport VARCHAR,
    arrival_timezone VARCHAR,
    arrival_iata VARCHAR,
    arrival_icao VARCHAR,
    arrival_terminal VARCHAR,
    arrival_gate VARCHAR,
    arrival_baggage VARCHAR,
    arrival_delay INTERVAL,
    arrival_scheduled TIMESTAMP WITH TIME ZONE,
    arrival_estimated TIMESTAMP WITH TIME ZONE,
    arrival_actual TIMESTAMP WITH TIME ZONE,
    arrival_estimated_runway TIMESTAMP WITH TIME ZONE,
    arrival_actual_runway TIMESTAMP WITH TIME ZONE,

    airline_name VARCHAR,
    airline_iata VARCHAR,
    airline_icao VARCHAR,

    flight_number VARCHAR,
    flight_iata VARCHAR,
    flight_icao VARCHAR,

    codeshared_airline_name VARCHAR,
    codeshared_airline_i

ata VARCHAR,
    codeshared_airline_icao VARCHAR,
    codeshared_flight_number VARCHAR,
    codeshared_flight_iata VARCHAR,
    flight_info VARCHAR
)
WITH(
connector='kafka',
topic ='flights_tracking',
properties.bootstrap.server ='delicate-herring-9260-us1-kafka.upstash.io:9092',
properties.sasl.mechanism = 'SCRAM-SHA-256',
properties.security.protocol = 'SASL_SSL',
properties.sasl.username = 'xxxxxx',
properties.sasl.password = 'xxxxxx',
scan.startup.mode ='earliest'
)FORMAT PLAIN ENCODE JSON;

通过 CREATE SOURCE 语句,RisingWave 已连接到数据流,但尚未开始消费数据。为了增量处理和存储数据,我们需要创建物化视图。创建物化视图后,RisingWave 将从指定的偏移量开始消费数据。

设置物化视图以分析航班数据

我们将创建不同的物化视图,这些视图跟踪并提取与 flight_tracking_source 相关的各种航班信息。这些信息包括航班日期、状态、出发和到达详细信息(机场、时区、IATA 码、ICAO 码、计划和估计时间)、航空公司信息(名称、IATA 码、ICAO 码)、航班号和标识符(IATA 和 ICAO 码)以及一般航班信息。

使用物化视图是因为它们始终提供最新数据。例如,以下查询创建了一个名为 Airline_Flight_Counts 的物化视图,该视图按小时间隔计算每个航空公司的航班数量。它使用了上文创建的 flight_tracking_source ,并按航空公司名称和一小时的时间窗口对数据进行分组。

CREATE MATERIALIZED VIEW Airline_Flight_Counts
SELECT airline_name,
COUNT(airline_name) AS total_flights,
window_start, window_end
FROM TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL '1 hour')
GROUP BY airline_name,window_start, window_end
ORDER BY total_flights desc;

以下查询创建了一个名为 Airport_Summary 的物化视图,该视图按小时间隔计算每个机场的到达和出发航班总数。计算结果按机场和一小时的时间窗口进行分组,并按航班总数降序排列。

CREATE MATERIALIZED VIEW Airport_Summary
WITH ArrivalCounts AS (
    SELECT
        arrival_airport,
        COUNT(arrival_airport) AS total_flights_arrival,
        window_start,
        window_end
    FROM
        TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL '1 hour')
    GROUP BY
        arrival_airport,
        window_start,
        window_end
),
DepartureCounts AS (
    SELECT
        departure_airport,
        COUNT(departure_airport) AS total_flights_departure,
        window_start,
        window_end
    FROM
        TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL '1 hour')
    GROUP BY
        departure_airport,
        window_start,
        window_end
)
SELECT
    ArrivalCounts.arrival_airport,
    ArrivalCounts.total_flights_arrival,
    DepartureCounts.departure_airport,
    DepartureCounts.total_flights_departure,
    ArrivalCounts.window_start,
    ArrivalCounts.window_end
FROM
    ArrivalCounts
INNER JOIN
    DepartureCounts ON ArrivalCounts.window_start = DepartureCounts.window_start
    AND ArrivalCounts.window_end = DepartureCounts.window_end
    AND ArrivalCounts.arrival_airport = DepartureCounts.departure_airport
ORDER BY
    ArrivalCounts.total_flights_arrival DESC,
    DepartureCounts.total_flights_departure DESC;

以下查询创建了一个名为 Timezone_Summary 的物化视图,该视图按小时间隔计算每个时区的到达和出发航班总数。它使用数据源 flight_tracking_source,按时区和一小时的时间窗口对数据进行分组,然后按航班总数降序排列结果。

CREATE MATERIALIZED VIEW Timezone_Summary
WITH ArrivalCounts AS (
    SELECT
        arrival_timezone,
        COUNT(arrival_timezone) AS total_flights_arrival,
        window_start,
        window_end
    FROM
        TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL '1 hour')
    GROUP BY
        arrival_timezone,
        window_start,
        window_end
),
DepartureCounts AS (
    SELECT
        departure_timezone,
        COUNT(departure_timezone) AS total_flights_departure,
        window_start,
        window_end
    FROM
        TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL '1 hour')
    GROUP BY
        departure_timezone,
        window_start,
        window_end
)
SELECT
    ArrivalCounts.arrival_timezone,
    ArrivalCounts.total_flights_arrival,
    DepartureCounts.departure_timezone,
    DepartureCounts.total_flights_departure,
    ArrivalCounts.window_start,
    ArrivalCounts.window_end
FROM
    ArrivalCounts
INNER JOIN
    DepartureCounts ON ArrivalCounts.window_start = DepartureCounts.window_start
    AND ArrivalCounts.window_end = DepartureCounts.window_end
    AND ArrivalCounts.arrival_timezone = DepartureCounts.departure_timezone
ORDER BY
    ArrivalCounts.total_flights_arrival DESC,
    DepartureCounts.total_flights_departure DESC;

3使用 Metabase 可视化

Metabase 是一个开源的商业智能工具,可以将数据可视化并共享数据见解。它让您能用简单的方法,基于数据库数据,创建各种图表、看板和指标。

将 RisingWave 连接到 Metabase

由于 RisingWave 兼容 PostgreSQL,您可以将 Metabase 连接到 RisingWave 作为数据源,并在流数据上构建分析。

您可以在 Metabase 中使用 RisingWave 作为数据源,使用 RisingWave 中的表和物化视图创建可视化图表和综合看板。要了解具体步骤,请参阅配置 Metabase 以读取 RisingWave 数据[7]

成功将 RisingWave 连接到 Metabase 后,我们将 RisingWave 中的物化视图作为数据源添加,以创建表格、各种图表和综合看板。

使用 Metabase 可视化数据

我们使用在 RisingWave 中的物化视图和源(如 flight_tracking_sourceAirline_Flight_CountsAirport_SummaryTimezone_Summary)创建这些表格、图表和综合看板。

航班数据概览表

按航空公司跟踪航班

按机场跟踪航班

按时区跟踪航班

以下是一个综合看板,展示了一系列用于实时航班跟踪的图表。它提供了航班操作的整体视图,提供按航空公司、机场和时区分类的航班总数的洞察。此外,它还提供当前航班的详细信息,让用户得以全面监控并提出见解。

实时航班跟踪的综合看板

4总结

在本文中,我们使用 Upstash、RisingWave 和 Metabase 开发了一个实时航班跟踪系统。因为 RisingWave 提供了广泛的源和目标连接器,配置和连接变得非常简单。我们将实时航班数据摄取到 Upstash 的 Kafka 主题中,然后将其发送到 RisingWave,并创建物化视图以进行深入分析。最后,我们使用 Metabase 创建了可视化图表和实时看板,使用户能够监控航班运营并做出明智的决策。

参考资料

[1]

Upstash Cloud Account: https://console.upstash.com/kafka

[2]

Upstash Kafka 文档: https://upstash.com/docs/kafka

[3]

RisingWave Cloud: https://cloud.risingwave.com/

[4]

官方 RisingWave 文档: https://docs.risingwave.com/docs/current/intro/

[5]

Slack 社区: https://www.risingwave.com/slack

[6]

在 RisingWave 中从 Upstash Kafka 摄取数据: https://docs.risingwave.com/docs/current/ingest-from-upstash-kafka/

[7]

配置 Metabase 以读取 RisingWave 数据: https://docs.risingwave.com/docs/current/metabase-integration/

关于 RisingWave 

RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。
RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了 150+ 名开源贡献者和 3000+ 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。


往期推荐

技术内幕

如何上手 RisingWave 👉 新手入门教程

RisingWave 中文用户文档上线,阅读更高效!

深入探索 RisingWave 中的高可用性与容错机制

深入理解 RisingWave 流处理引擎(三):触发机制

深入理解 RisingWave 流处理引擎(二):计算模型

深入理解 RisingWave 流处理引擎(一):总览

用户案例
视源股份(CVTE)IT 流计算应用历程
尘锋 SCRM 如何使用 RisingWave 实时打宽
RisingWave 在超百亿管理规模对冲基金公司中的应用
金融科技公司 Kaito 使用 RisingWave 实现实时智能化
龙腾出行如何通过 RisingWave 实现实时数据分析
RisingWave 助力乾象投资打造实时监控平台

RisingWave中文开源社区
RisingWave 是一款开源分布式 SQL 流数据库,致力于大幅降低流计算使用门槛与复杂度。RisingWave 已为全球超百家企业构建新一代流处理与分析平台。
 最新文章