最大化停车场利用率:RisingWave 的智能解决方案

文摘   科技   2024-06-18 16:52   北京  

作者: Martin|RisingWave Labs PM

随着城市扩展和城市化率提高,如何高效管理停车已成为市政和企业共同面对的挑战。在空间有限,但需求不断增加的情况下,最大化利用停车场是减少拥堵、降低排放并提升驾驶员体验的关键。

RisingWave 可以助力您解决这个挑战。作为一个流式数据库,RisingWave 可以彻底改变您停车场的利用率。首先,RisingWave 能够实时处理和分析从各种传感器、摄像头和其他物联网设备中获取的实时数据;然后,通过数据转换功能,RisingWave 可以识别模式、预测需求并优化停车位分配,确保每个可用车位都得到有效利用。

RisingWave 构建的可用停车位看板

在本文中,我们将介绍使用 RisingWave 实时监控停车位利用情况的两个场景,并探讨如何优化 RisingWave 的性能。

数据准备

在正式深入讲解两个场景及其优化前,让我们先进行一些数据准备,了解所用数据的 Schema 和产生过程。

Schema

我们假设信号数据存储在 Kafka 主题中,停车场数据则本地存储在 RisingWave 中。以下是示例数据的 Schema。

CREATE TABLE signals (
 start_at         timestamp-- 何时起停车位被占用
 end_at           timestamp-- 何时起停车位空出(值可以为空)
 last_updated_at  timestamp-- 何时生成并发送本信号
 space_id         int-- 停车位 ID
 level            int -- 停车位所在楼层
with (
 connector = 'kafka',
 topic = 'signals'
);

CREATE TABLE parking_lot (
  space_id         int-- 停车位 ID
  level            int-- 停车位所在楼层
  primary key (space_id)
);

数据产生过程

然后,我们需要先了解一下数据记录生成过程:假设在时间 T0 时,位于第 2 层的第 45 个停车位上方的传感器检测到有车辆停在其下方,这将生成一条记录: (T0, NULL, T0, 45, 2)

为应对可能的网络故障或临时服务中断,传感器需要不时重复传输记录到中央控制单元,以确保数据完整。例如,它可能在随机的时间间隔 T1 之后重复发送记录,此时,会生成又一条记录: (T0, NULL, T1, 45, 2)

当车辆在时间 T2 离开停车位时,传感器将再生成一个新记录:(T0, T2, T2, 45, 2),表示该车位现在空置。同样,为防止数据丢失,传感器会在短暂延迟后再次重新传输此信息。

现在,基于以上前期准备,我们将正式深入讲解使用 RisingWave 实时监控停车位利用情况的两个场景,并探讨如何优化。

场景1实时监测可用停车位

为了创建上文展示的“可用停车位看板“,RisingWave 需要确定当前空置的停车位数量,这需要检查最新停车位数据的 end_at 字段是否为空,以下是详细步骤。

首先,我们提取每个停车位的所有数据,保留其中具有最新 last_updated_at 时间戳的数据:

 create materialized view latest_signal_for_each_parking_space as
 with ranked as (
   select *, row_number() over (partition by space_id order by last_updated_at descas rn
   from signals
 )
 select * except (rn)
 from ranked
 where rn = 1;

以上代码根据 space_id 字段对数据进行分区,并在每个分区内按 last_updated_at 排序,从而保留最新信号。关于其中用到的 row_number() rank = 1,您可以参考我们的文档介绍[1]获取更多信息。

之后,我们根据最新信号,确定停车位是否被占用,并汇总停车场内每层被占用的车位总数。

create materialized view occupied_on_each_level as
select
 level,
 count(case when end_at is NULL then 1
           else 0
      endas occupied
from
 latest_signal_for_each_parking_space
group by
 level;

上代码中,如果最新信号显示 end_at 字段为 NULL,则该停车位被占用;如果 end_at 字段不为 NULL,则停车位为空。occupied 代表了每层被占用车位的总数。

然后,我们从每层的总停车位数中减去被占用车位数,以确定可用停车位数:

create materialized view available_on_each_level as
with
 total_on_each_level as (
  select
   level,
   count(*) as total
  from parking_lot
  group by level
 )
select
 t.level as level,
 total - occupied as available
from
 total_on_each_level t
inner join
 occupied_on_each_level o
on t.level = o.level;

场景2每小时停车位的使用率

为了计算每个停车位每小时的使用率,我们需要考虑每个信号的处同一状态时的重叠时间间隔。

例如,如果一个停车位从 7:00 被占用到 7:10,然后从 7:40 被占用到 8:00,那么从 7:00 到 8:00 的 60 分钟中,有 30 分钟被占用。这代表 50% 的使用率。

同样,如果一个停车位从 7:50 开始被占用,并一直被占用到 8:30(假设当前时间是8:30),那么从 7:00 到 8:00 的一个小时内,我们仅考虑从 7:50 到 8:00 的时段,即 60 分钟中的 10 分钟被使用。这代表 16.7% 的使用率。

对于后续的小时,例如从 8:00 到 9:00,我们只考虑从这个小时开始(8:00)、到当前时间(8:30)的部分,也就是说,我们认为此时的小时利用率是 100%。

要将此逻辑用 SQL 体现,我们需要计算每个停车位在每小时内被占用的百分比,而不考虑时间的进展。以下是具体实现代码:

-- 首先,仍然先对信号去重,只选取最新数据

create materialized view deduplicated_signals as
with ranked as (
  -- 数据按 space_id、level_id 和 start_at 分区,
  -- 只保留每组中的最新信号。
   select *, row_number() over (partition by space_id, level_id, start_at order by last_updated_at descas rn
   from signals
 )
 select * except (rn)
 from ranked
 where rn = 1;

-- 然后我们将它们分为两组:
-- 1. 停车位被占用
-- 2. 停车位没被占用

create materialized view occupying as
-- 我们使用当前时间作为占用结束时间
select space_id, level_id, start_at, timestamp '2024-05-13 14:54:02.91714' as end_at from deduplicated_signals where end_at is NULL;

create materialized view occupied as
select space_id, level_id, start_at, end_at from deduplicated_signals where end_at is not NULL;

-- 接下来,我们将这两类事件合并在一起
create materialized view both as
select * from occupying
union all
select * from occupied;

-- 我们获取信号重叠的所有 1 小时窗口

create materialized view split_window
as
select
  *,
  generate_series(
    date_trunc('HOUR', start_at - interval '0 HOUR''Europe/Stockholm')::timestamp,
    date_trunc('HOUR', end_at - interval '0 HOUR''Europe/Stockholm')::timestamp,
    interval '1 HOUR'
   )::timestamptz as window_start
from both;

-- 上面的 SQL 缺少每个窗口的结束时间。我们将其添加。

create materialized view occupy_window
as
select *,

 window_start + '1 HOUR' as window_end
from split_window;

-- 如上例所示,
-- 对于 1:30~2:00 和 4:00~4:10 等时间段,
-- 它们属于 1:00~2:00 和 4:00~5:00 窗口。
-- 但是,我们计算使用率时,只计入 1:30~2:00 和 4:00~4:10 部分。

create materialized view occupy_start_end
as
select
  *, greatest(window_start, start_at) as occupy_start, least(window_end, end_at) as occupy_end
from occupy_window;

-- 最后,我们可以计算使用率

create materialized view unavailability
as
select
 space_id,
 level,
  window_start,
  window_end,
  'HOURLY' as window_type,
  extract(epoch from (window_end - window_start)) as total_seconds,
  sum(extract(epoch from (occupy_end - occupy_start))) as occupy_seconds
from
  occupy_start_end
group by 1234;

-- 如果您喜欢百分比表示,可以使用以下代码

create materialized view percentage
as
select
 occupy_seconds::double / total_seconds as percentage
from
 unavailability;

以上代码中,对于当前时间,我们用的是固定的一个值,如果想把它变成真正动态的当前时间,我们可以使用所有信号last_updated_at 的最大值来表示所有正在进行的占用的结束时间。这样就能动态确定占用的结束时间。

在 SQL 中,您可以通过使用子查询来实现这一点,我们通过示例讲解如何实现,首先,创建一个新的物化视图:

-- 这是一个只有一行和两列的物化视图。
-- 1 作为常量是一个虚拟变量,用于稍后连接记录。

create materialized view current_max_time
as
select max(last_updated_at) as now1 as constant from signals;

然后我们修改 occupying 的定义,如下:

create materialized view occupying as
with tmp as (
  select space_id, level_id, start_at, 1 as constant
  from
  deduplicated_signals where end_at is NULL
)
select space_id, level_id, start_at, now as end_at from
-- 这里我们滥用了常量列来进行交叉积。
-- 这是因为 RisingWave 默认禁止嵌套循环连接,因为它的效率低下。
tmp inner join current_max_time on tmp.constant = current_max_time.constant;

其余的物化视图保持不变即可。

优化1减少重新计算频率

last_updated_at 列频繁更新时,依赖 max(last_updated_at) 可能会导致问题。这是因为 max(last_updated_at) 的每次更改都需要刷新所有后续物化视图,导致计算负担显著增加。

为了解决这个问题并最小化不必要的刷新,我们可以使用窗口函数,如下所示:

create materialized view current_max_time
as
select max(window_end) as now1 as constant
from
TUMBLE(signals, last_updated_at, INTERVAL '15 MINUTES')

有关时间窗口的更多用法,请参阅我们的官方文档[2]

现在,我们可以预期重新计算大约每 15 分钟触发一次。

优化2抑制回填期间的重新计算

先前的优化对于处理新进数据是有效的。然而,对于历史数据可能还不够。新数据的 last_updated_at 值是实时推进的,而历史数据则以更快的速度推进。这可能导致频繁的重新计算,尤其是在处理大量历史数据时。

为了解决这个问题,我们可以对  current_max_time 物化视图进行额外修改,如下所示:

-- 假设当前时间戳为 2024-05-20 12:05:35.607001+00

create materialized view current_max_time
as
select max('2024-05-20 12:05:35.607001+00'max(window_end)) as now1 as constant
from
TUMBLE(signals, last_updated_at, INTERVAL '15 MINUTES');

注意now 不会改变,直到 last_updated_at 赶上它。

总结

本文中,我们演示了如何应用 RisingWave 实时监控停车位利用情况。如果能够获取其他相应数据,还可以使用 RisingWave 实现更先进的功能。欢迎大家分享更多使用 RisingWave 进行实时监控和分析的解决方案。

参考资料

[1]

文档 sql pattern topn: https://docs.risingwave.com/docs/current/sql-pattern-topn

[2]

文档 tumble time window function: https://docs.risingwave.com/docs/current/sql-function-time-window/#tumble-time-window-function%E3%80%82

关于 RisingWave 

RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。
RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了近 150 名开源贡献者和近 3000 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。

往期推荐

技术内幕

如何上手 RisingWave 👉 新手入门教程

RisingWave 中文用户文档上线,阅读更高效!

深入探索 RisingWave 中的高可用性与容错机制

深入理解 RisingWave 流处理引擎(三):触发机制

深入理解 RisingWave 流处理引擎(二):计算模型

深入理解 RisingWave 流处理引擎(一):总览

用户案例
视源股份(CVTE)IT 流计算应用历程
尘锋 SCRM 如何使用 RisingWave 实时打宽
RisingWave 在超百亿管理规模对冲基金公司中的应用
金融科技公司 Kaito 使用 RisingWave 实现实时智能化
龙腾出行如何通过 RisingWave 实现实时数据分析
RisingWave 助力乾象投资打造实时监控平台

RisingWave中文开源社区
RisingWave 是一款开源分布式 SQL 流数据库,致力于大幅降低流计算使用门槛与复杂度。RisingWave 已为全球超百家企业构建新一代流处理与分析平台。
 最新文章