使用 WarpStream、RisingWave 和 Grafana 进行实时网站监控

文摘   2024-05-16 14:20   新加坡  

本文将探讨一个结合 RisingWave、WarpStream 和 Grafana 的实时网站监控系统的设计。这种架构设计确保了可扩展性和响应速度,能够快速识别并减轻 Web 应用中的安全风险。

WarpStream[1]是一个与 Apache Kafka 兼容的数据流平台。在本文的架构设计中,WarpStream 会作为集中消息主干,实时收集和分发网站审计日志。然后,这些日志会无缝导入到 RisingWave[2],由 RisingWave 实现对传入数据流的持续分析和过滤,用其强大的处理能力有效检测潜在的安全威胁。

最后,我们会用 Grafana[3]创建一个综合的实时看板,用于展示用户活动、网站引荐、用户状态码、用户安全性档案等详细信息。这一个全方位的视角让安全相关指标的监控和分析更加高效。此外,对检测到的威胁,系统还可以生成警报,并支持自动响应,从而采取积极的安全监控方法。

1第一步:设置 WarpStream

首先,我们生成一些随机网站日志,然后将这些数据发送到 WarpStream 的 Topic。详情请参考 RisingWave 文档设置 WarpStream[4]

以下是向 WarpStream 发送的数据示例,包含了网站用户活动、请求和响应信息。

{
  "request_timestamp""2024-01-25T12:30:45.678Z",
  "ip_address""192.168.1.100",
  "user_agent""Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.4567.89 Safari/537.36",
  "url""<https://streamprocessingdemo.com>",
  "http_method""GET",
  "status_code"200,
  "response_time"120,
  "referrer""google",
  "user_id""12345",
  "username""john_doe",
  "user_action""view_page",
  "security_level""low"
}

2第二步:将数据流导入到 RisingWave

将数据流以 JSON 格式发送到 WarpStream 后,可以使用以下 SQL 语句将数据从 WarpStream 导入到 RisingWave。如果您是 RisingWave 新用户,可以先参考 RisingWave 文档中的快速入门[5]教程,了解如何安装使用。

创建数据源

以下查询创建了一个名为 website_logs_source 的数据源,用于从名为 website_logs 的 WarpStream Topic 中摄取数据。该查询使用 JSON 格式定义了数据模式,包含请求时间戳、IP地址、用户代理、URL、HTTP 方法、状态码、响应时间等字段。

CREATE SOURCE website_logs_source (
    request_timestamp TIMESTAMP,
    ip_address VARCHAR,
    user_agent TEXT,
    url TEXT,
    http_method VARCHAR,
    status_code INTEGER,
    response_time INTEGER,
    referrer TEXT,
    user

_id VARCHAR,
    username VARCHAR,
    user_action VARCHAR,
    security_level VARCHAR
)WITH (
  connector='kafka',
  topic = 'website_logs',
  properties.bootstrap.server = 'message_queue:29092',
  scan.startup.mode = 'earliest'
FORMAT PLAIN ENCODE JSON;

创建用于数据分析的物化视图

接下来,我们将创建 5 个物化视图,用于分析多种信息。

物化视图 1:分析网站用户指标

以下查询建立了名为 website_user_metrics 的物化视图,用于生成关于用户活动的聚合统计信息。这个物化视图计算了多种指标,包括总请求、响应时间、错误计数、登录/登出计数及独立 IP 地址等,以上统计信息按 1 分钟间隔呈现,并对每个用户及相应时间窗口进行统计。

CREATE MATERIALIZED VIEW website_user_metrics AS
WITH UserActivityStats AS (
    SELECT
        username,
        COUNT(username) AS total_requests,
        window_start,
        window_end,
        MIN(response_time) AS min_response_time,
        MAX(response_time) AS max_response_time,
        AVG(response_time) AS avg_response_time,
        PERCENTILE_CONT(0.5WITHIN GROUP (ORDER BY response_time) AS median_response_time,
        SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 ENDAS total_errors,
        SUM(CASE WHEN user_action = 'login' THEN 1 ELSE 0 ENDAS login_count,
        SUM(CASE WHEN user_action = 'logout' THEN 1 ELSE 0 ENDAS logout_count,
        COUNT(DISTINCT ip_address) AS unique_ips
    FROM TUMBLE (website_logs_source, request_timestamp, INTERVAL '1 MINUTES')
    GROUP BY username, window_start, window_end
)

SELECT
    username,
    total_requests,
    min_response_time,
    max_response_time,
    avg_response_time,
    median_response_time,
    total_errors,
    login_count,
    logout_count,
    unique_ips,
    window_start,
    window_end
FROM UserActivityStats;

物化视图 2:分析时间窗口中排名最高的用户行为

以下查询生成名为 top_user_actions 的物化视图,识别并排名网站日志中三种最频繁的用户行为,时间间隔为 1 分钟。它使用窗口函数根据行为发生次数分配排名,展示了行为名称、发生次数及相应时间窗口等信息。

CREATE MATERIALIZED VIEW top_user_actions AS
WITH ranked_user_actions AS (
    SELECT
        user_action,
        COUNT(user_action) AS count_user_activity,
        window_start,
        window_end,
        ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY COUNT(user_action) DESCAS action_rank
    FROM TUMBLE (website_logs_source, request_timestamp, INTERVAL '1 MINUTES')
    GROUP BY user_action, window_start, window_end
)
SELECT
    user_action,
    count_user_activity,
    window_start,
    window_end
FROM ranked_user_actions
WHERE action_rank <= 5
ORDER BY window_start, action_rank;

物化视图 3:分析用户互动中的引荐活动

以下查询创建了名为 referrer_activity_summary 的物化视图,提供基于引荐者的网站活动概要,时间间隔为 1 分钟。该物化视图聚合了多个指标,包括引荐访问次数、页面访问次数、互动次数、内容互动次数和窗口互动次数。这些指标与相应的时间窗口一起呈现,用于分析和评估。

CREATE MATERIALIZED VIEW referrer_activity_summary AS
SELECT
    referrer,
    COUNT(referrer) AS referrer_visit_count,
    SUM(CASE WHEN user_action IN ('view_page''navigate_page'THEN 1 ELSE 0 ENDAS page_visits,
    SUM(CASE WHEN user_action IN ('submit_form''login''logout'THEN 1 ELSE 0 ENDAS interactions,
    SUM(CASE WHEN user_action IN ('scroll_page''download_file''upload_file'THEN 1 ELSE 0 ENDAS content_interactions,
    SUM(CASE WHEN user_action IN ('close_window''open_new_tab'THEN 1 ELSE 0 ENDAS window_interactions,
    window_start,
    window_end
FROM TUMBLE(website_logs_source, request_timestamp, INTERVAL '1 MINUTES')
GROUP BY referrer,

 window_start, window_end;

物化视图 4:分析用户 HTTP 状态码

以下查询建立名为 status_code_analysis_summary 的物化视图,分析并汇总网站日志中 HTTP 状态码的分布,时间间隔为 1 分钟。该物化视图计算的指标包括:每个状态码的计数、每个状态码的平均响应时间、以及每个时间窗口内状态码的累计计数和百分比。这些指标与相应的时间窗口一起呈现,按时间窗口和状态码的降序排序。

CREATE MATERIALIZED VIEW status_code_analysis_summary
WITH Status_Code_Analysis AS (
    SELECT
        status_code,
        COUNT(status_code) AS count_status_code,
        AVG(response_time) AS avg_response_time,
        SUM(COUNT(status_code)) OVER (PARTITION BY window_start, window_end ORDER BY status_code) AS cumulative_count,
        100.0 * SUM(COUNT(status_code)) OVER (PARTITION BY window_start, window_end ORDER BY status_code) / SUM(COUNT(status_code)) OVER (PARTITION BY window_start, window_end) AS cumulative_percentage,
        window_start,
        window_end
    FROM TUMBLE (website_logs_source, request_timestamp, INTERVAL '1 MINUTES')
    GROUP BY status_code, window_start, window_end
)

SELECT
    status_code,
    count_status_code,
    avg_response_time,
    cumulative_count,
    cumulative_percentage,
    window_start,
    window_end
FROM Status_Code_Analysis
ORDER BY window_start DESC, status_code;

物化视图 5:分析用户安全性与安全级别

以下查询建立名为 security_level_analysis_summary 的物化视图,用于分析并汇总网站日志安全级别,时间间隔为 1 分钟。该物化视图计算的指标包括:每个安全级别的计数、每个安全级别的平均响应时间,以及每个时间窗口中安全级别的中位数计数。这些指标与相应的时间窗口一起呈现,按时间窗口和安全级别的中位数计数的降序排序。

CREATE MATERIALIZED VIEW security_level_analysis_summary AS
WITH Security_Profiling AS (
    SELECT
        security_level,
        COUNT(security_level) AS count_security_level,
        AVG(response_time) AS avg_response_time,
        PERCENTILE_DISC (0.5WITHIN GROUP (ORDER BY security_level) AS median_count_security_level,
        window_start,
        window_end
    FROM TUMBLE (website_logs_source, request_timestamp, INTERVAL '1 MINUTES')
    GROUP BY security_level, window_start, window_end
)

SELECT
    security_level,
    count_security_level,
    avg_response_time,
    median_count_security_level,
    window_start,
    window_end
FROM Security_Profiling
ORDER BY window_start DESC, median_count_security_level;

3第三步:将数据从 RisingWave 发送到 Apache Grafana 进行可视化

本步骤中,我们将配置 Grafana, 从 RisingWave 读取数据,然后构建可视化图表。

将 RisingWave 连接到 Grafana

要在 Grafana 中使用 RisingWave 作为数据源并创建可视化图表和看板,请按照 RisingWave 文档配置 Grafana[6]

在根据文档连接 RisingWave 与 Grafana 后,您就可以将 RisingWave 的物化视图作为表格纳入 Grafana 来设计图表,并构建综合全面的看板。

使用 Grafana 进行数据可视化:表格、图表和看板

本节陈列了一些可构建的图表和看板。比如,以下表格由上文创建的 website_logs_source 数据源生成。

website_logs_source

以下图表由名为 website_user_metrics 的物化视图生成,提供基于网站日志的用户活动的聚合统计信息。

website_user_metrics

以下图表基于 top_user_actions 物化视图创建,用于识别并排名网站日志中排名前五的用户行为。

top_user_actions

以下图表由 referrer_activity_summary 物化视图生成,用以汇总基于引荐者的网站活动。

referrer_activity_summary

以下图表由 status_code_analysis_summary 物化视图生成,用于分析并汇总网站日志中 HTTP 状态码的分布。

status_code_analysis_summary

以下图表由 security_level_analysis_summary 物化视图生成,用于分析并汇总网站日志安全级别。

security_level_analysis_summary

以下则是一个综合看板,包含了上述一系列实时监控网站的图表,提供了全面的安全监控功能,增强了威胁检测和响应能力。

看板

4总结

本文介绍了能够整合 RisingWave、WarpStream 和 Grafana 的实时网站监控系统。整个系统的设置过程清楚简单,要监控每个指标,您只需要在 RisingWave 中创建一个物化视图并在 Grafana 中进行可视化。上文展示的分析仅作为示例,为您提供灵感。如果您的数据点已经准备就绪,并提供必要的数据,可以尝试更复杂的分析和转换逻辑。欢迎您的进一步探索,如有任何问题或需要帮助,请随时联系我们。

参考资料

[1]

WarpStream: https://www.warpstream.com/

[2]

RisingWave: https://risingwave.com/

[3]

Grafana: https://grafana.com/

[4]

设置 WarpStream: https://docs.risingwave.com/docs/current/ingest-from-warpstream/#set-up-warpstream

[5]

快速入门: https://docs.risingwave.com/docs/current/get-started/

[6]

配置 Grafana: https://docs.risingwave.com/docs/current/grafana-integration/

关于 RisingWave 

RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。
RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了 150+ 名开源贡献者和 3000+ 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。


往期推荐

技术内幕

如何上手 RisingWave 👉 新手入门教程

RisingWave 中文用户文档上线,阅读更高效!

深入探索 RisingWave 中的高可用性与容错机制

深入理解 RisingWave 流处理引擎(三):触发机制

深入理解 RisingWave 流处理引擎(二):计算模型

深入理解 RisingWave 流处理引擎(一):总览

用户案例
视源股份(CVTE)IT 流计算应用历程
尘锋 SCRM 如何使用 RisingWave 实时打宽
RisingWave 在超百亿管理规模对冲基金公司中的应用
金融科技公司 Kaito 使用 RisingWave 实现实时智能化
龙腾出行如何通过 RisingWave 实现实时数据分析
RisingWave 助力乾象投资打造实时监控平台

RisingWave中文开源社区
RisingWave 是一款开源分布式 SQL 流数据库,致力于大幅降低流计算使用门槛与复杂度。RisingWave 已为全球超百家企业构建新一代流处理与分析平台。
 最新文章