在物流和航空领域,实时数据处理、数据可视化和综合看板至关重要。它们能够提升供应链性能、优化运输并提高效率。特别是对商务航空来说,实时分析可以提高效率、可持续性和客户关注度,从而改善决策并节约成本。它还帮助确保安全运行、优化空域使用并能协助空中交通管制员。总体而言,实时数据处理可以帮助航空业提升客户体验并改善业务运营。
在本文中,我们使用 RisingWave、Upstash 和 Metabase 建立了一个实时航班跟踪系统。我们利用 Aviationstack API 实时获取航班数据,然后将这些数据传输到 Upstash 的一个 Kafka 主题中。随后,我们将这些数据流摄取到 RisingWave 中,继而创建物化视图 (MV) 以进行深入的航班数据分析。物化视图随时提供最新数据,并可以即时查询。我们还使用 Metabase 创建图表、表格和综合的数据看板,用于实时航班跟踪。
使用 RisingWave、Upstash 和 Metabase 实现的实时航班跟踪
1在 Upstash 中设置 Kafka
Upstash 是一个无服务器平台,提供 Redis、Kafka 和 Qstash 服务,具有可扩展性、高级安全选项和专门支持的优势。Upstash Kafka 使用 Apache Kafka 进行部署,并提供一个无服务器的 Kafka 平台,配备连接器、模式注册表和监控,为有高级需求的客户提供各种计划。
注册 Upstash 账户
注册免费的 Upstash 云账户,以访问 Kafka 服务。要创建账户,请访问 Upstash Cloud Account[1]。
创建 Kafka 集群
登录后,使用以下信息创建 Kafka 集群:
Name: 给您的 Kafka 集群一个唯一名称以便识别。 Region: 选择您的 Kafka 集群托管的区域。 Type: 选择适合您需求的集群类型。
设置 Kafka 主题
创建 Kafka 集群后,设置 Kafka 主题。Upstash Kafka 提供一些默认配置,包括分区数量和保留策略,简化了设置过程。
创建 Kafka 集群和 Kafka 主题后,我们就可以利用 Upstash Kafka 和 RisingWave 的功能来构建流处理应用程序和管道。如果您还想了解更多有关以上流程的信息,请参阅 Upstash Kafka 文档[2]。
我们摄取到 Upstash Kafka 主题的示例数据包含来自航空 API 的实时数据,包括机场名称、航班状态、航班位置等信息。
{
"flight_date": "2024-05-16",
"flight_status": "scheduled",
"departure_airport": "Auckland International",
"departure_timezone": "Pacific/Auckland",
"departure_iata": "AKL",
"departure_icao": "NZAA",
"departure_terminal": "D",
"departure_gate": "28",
"departure_delay": null,
"departure_scheduled": "2024-05-16T06:30:00+00:00",
"departure_estimated": "2024-05-16T06:30:00+00:00",
"departure_actual": null,
"departure_estimated_runway": null,
"departure_actual_runway": null,
"arrival_airport": "Wellington International",
"arrival_timezone": "Pacific/Auckland",
"arrival_iata": "WLG",
"arrival_icao": "NZWN",
"arrival_terminal": null,
"arrival_gate": "15",
"arrival_baggage": null,
"arrival_delay": null,
"arrival_scheduled": "2024-05-16T07:40:00+00:00",
"arrival_estimated": "2024-05-16T07:40:00+00:00",
"arrival_actual": null,
"arrival_estimated_runway": null,
"arrival_actual_runway": null,
"airline_name": "Singapore Airlines",
"airline_iata": "SQ",
"airline_icao": "SIA",
"flight_number": "SQ4438",
"flight_iata": "SQ4438",
"flight_icao": "SIA4438",
"codeshared_airline_name": "air new zealand",
"codeshared_airline_iata": "nz",
"codeshared_airline_icao": "anz",
"codeshared_flight_number": "401",
"codeshared_flight_iata": "nz401",
"flight_info": "Singapore Airlines flight SQ4438 is currently in the air, flying from Auckland International (AKL) to Wellington International (WLG)"
}
2将数据从 Upstash Kafka 摄取到 RisingWave
对于摄取和处理流数据,我们有两种可用方法:开源的 RisingWave 和托管服务 RisingWave Cloud。在本文中,我们将重点使用 RisingWave Cloud,它的用户体验更加便捷友好,简化了管理和利用 RisingWave 进行航班跟踪解决方案的操作。
创建 RisingWave 集群
要在 RisingWave Cloud[3] 中创建 RisingWave 集群并探索各种功能,您可以注册 Free Plan 来免费测试 RisingWave 的功能。有关如何创建 RisingWave 集群并开始使用的详细说明,请参阅官方 RisingWave 文档[4]。它将为您提供设置和探索 RisingWave 功能的分步指导。如果您需要额外的帮助,请加入我们的 Slack 社区[5]。
将数据流摄取到 RisingWave
现在我们已经在 Upstash 中(以 JSON 格式)设置了 Kafka 数据流,我们可以使用以下 SQL 语句连接到这些数据流。有关更多信息,请参阅在 RisingWave 中从 Upstash Kafka 摄取数据[6]。
CREATE SOURCE flight_tracking_source(
flight_date VARCHAR,
flight_status VARCHAR,
departure_airport VARCHAR,
departure_timezone VARCHAR,
departure_iata VARCHAR,
departure_icao VARCHAR,
departure_terminal VARCHAR,
departure_gate VARCHAR,
departure_delay INTERVAL,
departure_scheduled TIMESTAMP WITH TIME ZONE,
departure_estimated TIMESTAMP WITH TIME ZONE,
departure_actual TIMESTAMP WITH TIME ZONE,
departure_estimated_runway TIMESTAMP WITH TIME ZONE,
departure_actual_runway TIMESTAMP WITH TIME ZONE,
arrival_airport VARCHAR,
arrival_timezone VARCHAR,
arrival_iata VARCHAR,
arrival_icao VARCHAR,
arrival_terminal VARCHAR,
arrival_gate VARCHAR,
arrival_baggage VARCHAR,
arrival_delay INTERVAL,
arrival_scheduled TIMESTAMP WITH TIME ZONE,
arrival_estimated TIMESTAMP WITH TIME ZONE,
arrival_actual TIMESTAMP WITH TIME ZONE,
arrival_estimated_runway TIMESTAMP WITH TIME ZONE,
arrival_actual_runway TIMESTAMP WITH TIME ZONE,
airline_name VARCHAR,
airline_iata VARCHAR,
airline_icao VARCHAR,
flight_number VARCHAR,
flight_iata VARCHAR,
flight_icao VARCHAR,
codeshared_airline_name VARCHAR,
codeshared_airline_i
ata VARCHAR,
codeshared_airline_icao VARCHAR,
codeshared_flight_number VARCHAR,
codeshared_flight_iata VARCHAR,
flight_info VARCHAR
)
WITH(
connector='kafka',
topic ='flights_tracking',
properties.bootstrap.server ='delicate-herring-9260-us1-kafka.upstash.io:9092',
properties.sasl.mechanism = 'SCRAM-SHA-256',
properties.security.protocol = 'SASL_SSL',
properties.sasl.username = 'xxxxxx',
properties.sasl.password = 'xxxxxx',
scan.startup.mode ='earliest'
)FORMAT PLAIN ENCODE JSON;
通过 CREATE SOURCE
语句,RisingWave 已连接到数据流,但尚未开始消费数据。为了增量处理和存储数据,我们需要创建物化视图。创建物化视图后,RisingWave 将从指定的偏移量开始消费数据。
设置物化视图以分析航班数据
我们将创建不同的物化视图,这些视图跟踪并提取与 flight_tracking_source
相关的各种航班信息。这些信息包括航班日期、状态、出发和到达详细信息(机场、时区、IATA 码、ICAO 码、计划和估计时间)、航空公司信息(名称、IATA 码、ICAO 码)、航班号和标识符(IATA 和 ICAO 码)以及一般航班信息。
使用物化视图是因为它们始终提供最新数据。例如,以下查询创建了一个名为 Airline_Flight_Counts
的物化视图,该视图按小时间隔计算每个航空公司的航班数量。它使用了上文创建的 flight_tracking_source
,并按航空公司名称和一小时的时间窗口对数据进行分组。
CREATE MATERIALIZED VIEW Airline_Flight_Counts
SELECT airline_name,
COUNT(airline_name) AS total_flights,
window_start, window_end
FROM TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL '1 hour')
GROUP BY airline_name,window_start, window_end
ORDER BY total_flights desc;
以下查询创建了一个名为 Airport_Summary
的物化视图,该视图按小时间隔计算每个机场的到达和出发航班总数。计算结果按机场和一小时的时间窗口进行分组,并按航班总数降序排列。
CREATE MATERIALIZED VIEW Airport_Summary
WITH ArrivalCounts AS (
SELECT
arrival_airport,
COUNT(arrival_airport) AS total_flights_arrival,
window_start,
window_end
FROM
TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL '1 hour')
GROUP BY
arrival_airport,
window_start,
window_end
),
DepartureCounts AS (
SELECT
departure_airport,
COUNT(departure_airport) AS total_flights_departure,
window_start,
window_end
FROM
TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL '1 hour')
GROUP BY
departure_airport,
window_start,
window_end
)
SELECT
ArrivalCounts.arrival_airport,
ArrivalCounts.total_flights_arrival,
DepartureCounts.departure_airport,
DepartureCounts.total_flights_departure,
ArrivalCounts.window_start,
ArrivalCounts.window_end
FROM
ArrivalCounts
INNER JOIN
DepartureCounts ON ArrivalCounts.window_start = DepartureCounts.window_start
AND ArrivalCounts.window_end = DepartureCounts.window_end
AND ArrivalCounts.arrival_airport = DepartureCounts.departure_airport
ORDER BY
ArrivalCounts.total_flights_arrival DESC,
DepartureCounts.total_flights_departure DESC;
以下查询创建了一个名为 Timezone_Summary
的物化视图,该视图按小时间隔计算每个时区的到达和出发航班总数。它使用数据源 flight_tracking_source
,按时区和一小时的时间窗口对数据进行分组,然后按航班总数降序排列结果。
CREATE MATERIALIZED VIEW Timezone_Summary
WITH ArrivalCounts AS (
SELECT
arrival_timezone,
COUNT(arrival_timezone) AS total_flights_arrival,
window_start,
window_end
FROM
TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL '1 hour')
GROUP BY
arrival_timezone,
window_start,
window_end
),
DepartureCounts AS (
SELECT
departure_timezone,
COUNT(departure_timezone) AS total_flights_departure,
window_start,
window_end
FROM
TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL '1 hour')
GROUP BY
departure_timezone,
window_start,
window_end
)
SELECT
ArrivalCounts.arrival_timezone,
ArrivalCounts.total_flights_arrival,
DepartureCounts.departure_timezone,
DepartureCounts.total_flights_departure,
ArrivalCounts.window_start,
ArrivalCounts.window_end
FROM
ArrivalCounts
INNER JOIN
DepartureCounts ON ArrivalCounts.window_start = DepartureCounts.window_start
AND ArrivalCounts.window_end = DepartureCounts.window_end
AND ArrivalCounts.arrival_timezone = DepartureCounts.departure_timezone
ORDER BY
ArrivalCounts.total_flights_arrival DESC,
DepartureCounts.total_flights_departure DESC;
3使用 Metabase 可视化
Metabase 是一个开源的商业智能工具,可以将数据可视化并共享数据见解。它让您能用简单的方法,基于数据库数据,创建各种图表、看板和指标。
将 RisingWave 连接到 Metabase
由于 RisingWave 兼容 PostgreSQL,您可以将 Metabase 连接到 RisingWave 作为数据源,并在流数据上构建分析。
您可以在 Metabase 中使用 RisingWave 作为数据源,使用 RisingWave 中的表和物化视图创建可视化图表和综合看板。要了解具体步骤,请参阅配置 Metabase 以读取 RisingWave 数据[7]。
成功将 RisingWave 连接到 Metabase 后,我们将 RisingWave 中的物化视图作为数据源添加,以创建表格、各种图表和综合看板。
使用 Metabase 可视化数据
我们使用在 RisingWave 中的物化视图和源(如 flight_tracking_source
、Airline_Flight_Counts
、Airport_Summary
和 Timezone_Summary
)创建这些表格、图表和综合看板。
以下是一个综合看板,展示了一系列用于实时航班跟踪的图表。它提供了航班操作的整体视图,提供按航空公司、机场和时区分类的航班总数的洞察。此外,它还提供当前航班的详细信息,让用户得以全面监控并提出见解。
4总结
在本文中,我们使用 Upstash、RisingWave 和 Metabase 开发了一个实时航班跟踪系统。因为 RisingWave 提供了广泛的源和目标连接器,配置和连接变得非常简单。我们将实时航班数据摄取到 Upstash 的 Kafka 主题中,然后将其发送到 RisingWave,并创建物化视图以进行深入分析。最后,我们使用 Metabase 创建了可视化图表和实时看板,使用户能够监控航班运营并做出明智的决策。
参考资料
Upstash Cloud Account: https://console.upstash.com/kafka
[2]Upstash Kafka 文档: https://upstash.com/docs/kafka
[3]RisingWave Cloud: https://cloud.risingwave.com/
[4]官方 RisingWave 文档: https://docs.risingwave.com/docs/current/intro/
[5]Slack 社区: https://www.risingwave.com/slack
[6]在 RisingWave 中从 Upstash Kafka 摄取数据: https://docs.risingwave.com/docs/current/ingest-from-upstash-kafka/
[7]配置 Metabase 以读取 RisingWave 数据: https://docs.risingwave.com/docs/current/metabase-integration/
关于 RisingWave
往期推荐
技术内幕