RisingWave Demo:快速挖掘社交平台数据价值

文摘   科技   2024-08-08 16:04   北京  

社交媒体平台每天处理来自数千万用户的海量消息。因为热门话题在不断变化,对企业来说,跟上这些帖子是具有挑战性的。但分析这些消息是必要的,因为它能让企业通过了解消费者和竞争对手的价值观来做出更具战略性的商业决策。

为了跟踪话题,像 Twitter 这样的社交媒体平台使用了话题标签来展示帖子内容。话题标签的使用次数能反映出用户参与度。如果某话题标签经常被使用,它表明这个特定的话题很受欢迎。如果我们跟踪话题标签随时间的使用频率,可以确定观众参与度是在增加还是在减少。

在本教程中,您将学习如何使用 RisingWave 从文本数据中提取有价值的洞。我们为本教程设置了一个演示集群,以便您可以轻松尝试。

1开始之前

  • 确保您的环境中安装了 Docker[1]Docker Compose[2]。请注意,Docker Compose 包含在 Windows 和 MacOS 的 Docker Desktop 中。如果您使用 Docker Desktop,请确保在启动演示集群之前已经运行。

  • 确保您的环境中安装了 PostgreSQL[3]交互式终端 psql。详细说明请参阅下载 PostgreSQL[4]

2启动演示集群

在演示集群中,我们打包了 RisingWave 和一个工作负载生成器。一旦集群启动,工作负载生成器将开始生成随机流量并将其输入到 Kafka。

首先,将 RisingWave[5]仓库克隆到环境中。

git clone https://github.com/risingwavelabs/risingwave.git

导航到 integration_tests/cdn-metrics 目录,并从 Docker Compose 文件启动演示集群。

cd risingwave/integration_tests/cdn-metrics
docker compose up -d

命令未找到?

Compose V2 中的默认命令行句法以 docker compose 开头。详见 Docker 文档
如果您使用的是 Compose V1,请改用 docker-compose

这将启动必要的 RisingWave 组件,包括 Frontend 节点、Compute 节点、Meta 节点和 MinIO。工作负载生成器将开始生成随机数据并将其输入到 Kafka Topic。在这个演示集群中,物化视图的数据将存储在 MinIO 实例中。

连接到 RisingWave 以管理数据流并执行数据分析。

psql -h localhost -p 4566 -d dev -U root

3将 RisingWave 连接到数据流

本教程将使用 RisingWave 来消费数据流并执行数据分析。推文将被用作示例数据,这样我们就可以查询给定日期上最受欢迎的话题标签,以跟踪流行话题。

以下是推文和 Twitter 用户的 Schema。在 tweet Schema 中,text 包含了一条推文的内容,created_at 包含了一条推文发布的日期和时间。话题标签将从 text 中提取。

{
    "data": {
        "created_at""2020-02-12T17:09:56.000Z",
        "id""1227640996038684673",
        "text""Doctors: Googling stuff online does not make you a doctor\n\nDevelopers: https://t.co/mrju5ypPkb",
        "lang""English"
    },
    "author": {
        "created_at""2013-12-14T04:35:55.000Z",
        "id""2244994945",
        "name""Singularity Data",
        "username""singularitty"
    }
}

使用以下 SQL 语句连接到数据流。

CREATE SOURCE twitter (
    data STRUCT < created_at TIMESTAMP WITH TIME ZONE,
    id VARCHAR,
    text VARCHAR,
    lang VARCHAR >,
    author STRUCT < created_at TIMESTAMP WITH TIME ZONE,
    id VARCHAR,
    name VARCHAR,
    username VARCHAR,
    followers INT >
WITH (
    connector = 'kafka',
    topic = 'twitter',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
FORMAT PLAIN ENCODE JSON;

请注意,SQL 语句使用了 STRUCT 数据类型。有关 STRUCT 数据类型的详细信息,请参阅数据类型[6] 。

4定义物化视图并分析数据

本教程将创建一个物化视图,用于跟踪每个话题标签每天的使用频率。

首先,使用 regexp_matches 函数从推文中提取所有使用的话题标签。例如,给定以下推文:

regexp_matches 函数将找到推文中与 RegEx 模式 #\w+ 匹配的所有文本。这将提取推文中的所有话题标签并将它们存储在一个数组中。

       hashtag        |        created_at
----------------------+--------------------------
[#RisingWave, #cloud] | 2022-05-18 17:00:00+00:00

然后 unnest 函数将把数组中的每个项目分离成单独的行。

   hashtag   |        created_at
----------------------------------------
 #RisingWave | 2022-05-18 17:00:00+00:00
   #cloud    | 2022-05-18 17:00:00+00:00

最后,我们可以按 hashtagwindow_start 分组,计算每个话题标签每天的使用次数。

CREATE MATERIALIZED VIEW hot_hashtags AS WITH tags AS (
    SELECT
        unnest(regexp_matches((data).text, '#\w+''g')) AS hashtag,
        (data).created_at AS created_at
    FROM
        twitter
)
SELECT
    hashtag,
    COUNT(*) AS hashtag_occurrences,
    window_start
FROM
    TUMBLE(tags, created_at, INTERVAL '1 day')
GROUP BY
    hashtag,
    window_start;

5查询结果

我们可以查询十个最常使用的话题标签。

SELECT * FROM hot_hashtags
ORDER BY hashtag_occurrences DESC
LIMIT 10;

结果可能如下所示:

  hashtag  | hashtag_occurrences |       window_start
------------------------------------------------------------
   #Multi  |         262         | 2022-08-18 00:00:00+00:00
   #zero   |         198         | 2022-08-18 00:00:00+00:00
 knowledge |         150         | 2022-08-18 00:00:00+00:00
   #Open   |         148         | 2022-08-18 00:00:00+00:00
   #User   |         142         | 2022-08-18 00:00:00+00:00
  #Cross   |         141         | 2022-08-18 00:00:00+00:00
  #local   |         139         | 2022-08-18 00:00:00+00:00
  #client  |         138         | 2022-08-18 00:00:00+00:00
  #system  |         135         | 2022-08-18 00:00:00+00:00
    #Re    |         132         | 2022-08-18 00:00:00+00:00

如果工作负载生成器运行多天,将显示不同日期最常使用的话题标签。

完成后,运行以下命令断开 RisingWave 的连接。

\q

可选:要删除容器和生成的数据,请使用以下命令。

docker compose down -v

6总结

在本教程中,我们学到了:

  • 如何使用 RisingWave 定义嵌套表。

  • 如何使用正则表达式从字符串中提取字符组合。

本 Demo 只是抛砖引玉,欢迎大家充分利用 RisingWave 的强大功能挖掘其在各个领域的更多应用。

参考资料
[1]

Docker: https://docs.docker.com/get-docker/

[2]

Docker Compose: https://docs.docker.com/compose/install/

[3]

PostgreSQL: https://www.postgresql.org/docs/current/app-psql.html

[4]

下载 PostgreSQL: https://www.postgresql.org/download/

[5]

RisingWave: https://github.com/risingwavelabs/risingwave

[6]

数据类型: https://docs.risingwave.com/docs/current/sql-data-types/

关于 RisingWave 

RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。
RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了 150+ 名开源贡献者和 3000+ 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。

往期推荐

技术内幕

如何上手 RisingWave 👉 新手入门教程

RisingWave 中文用户文档上线,阅读更高效!

深入探索 RisingWave 中的高可用性与容错机制

深入理解 RisingWave 流处理引擎(三):触发机制

深入理解 RisingWave 流处理引擎(二):计算模型

深入理解 RisingWave 流处理引擎(一):总览

用户案例
视源股份(CVTE)IT 流计算应用历程
尘锋 SCRM 如何使用 RisingWave 实时打宽
RisingWave 在超百亿管理规模对冲基金公司中的应用
金融科技公司 Kaito 使用 RisingWave 实现实时智能化
龙腾出行如何通过 RisingWave 实现实时数据分析
RisingWave 助力乾象投资打造实时监控平台

RisingWave中文开源社区
RisingWave 是一款开源分布式 SQL 流数据库,致力于大幅降低流计算使用门槛与复杂度。RisingWave 已为全球超百家企业构建新一代流处理与分析平台。
 最新文章