本文将探讨一个结合 RisingWave、WarpStream 和 Grafana 的实时网站监控系统的设计。这种架构设计确保了可扩展性和响应速度,能够快速识别并减轻 Web 应用中的安全风险。
WarpStream[1]是一个与 Apache Kafka 兼容的数据流平台。在本文的架构设计中,WarpStream 会作为集中消息主干,实时收集和分发网站审计日志。然后,这些日志会无缝导入到 RisingWave[2],由 RisingWave 实现对传入数据流的持续分析和过滤,用其强大的处理能力有效检测潜在的安全威胁。
最后,我们会用 Grafana[3]创建一个综合的实时看板,用于展示用户活动、网站引荐、用户状态码、用户安全性档案等详细信息。这一个全方位的视角让安全相关指标的监控和分析更加高效。此外,对检测到的威胁,系统还可以生成警报,并支持自动响应,从而采取积极的安全监控方法。
1第一步:设置 WarpStream
首先,我们生成一些随机网站日志,然后将这些数据发送到 WarpStream 的 Topic。详情请参考 RisingWave 文档设置 WarpStream[4]。
以下是向 WarpStream 发送的数据示例,包含了网站用户活动、请求和响应信息。
{
"request_timestamp": "2024-01-25T12:30:45.678Z",
"ip_address": "192.168.1.100",
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.4567.89 Safari/537.36",
"url": "<https://streamprocessingdemo.com>",
"http_method": "GET",
"status_code": 200,
"response_time": 120,
"referrer": "google",
"user_id": "12345",
"username": "john_doe",
"user_action": "view_page",
"security_level": "low"
}
2第二步:将数据流导入到 RisingWave
将数据流以 JSON 格式发送到 WarpStream 后,可以使用以下 SQL 语句将数据从 WarpStream 导入到 RisingWave。如果您是 RisingWave 新用户,可以先参考 RisingWave 文档中的快速入门[5]教程,了解如何安装使用。
创建数据源
以下查询创建了一个名为 website_logs_source
的数据源,用于从名为 website_logs
的 WarpStream Topic 中摄取数据。该查询使用 JSON 格式定义了数据模式,包含请求时间戳、IP地址、用户代理、URL、HTTP 方法、状态码、响应时间等字段。
CREATE SOURCE website_logs_source (
request_timestamp TIMESTAMP,
ip_address VARCHAR,
user_agent TEXT,
url TEXT,
http_method VARCHAR,
status_code INTEGER,
response_time INTEGER,
referrer TEXT,
user
_id VARCHAR,
username VARCHAR,
user_action VARCHAR,
security_level VARCHAR
)WITH (
connector='kafka',
topic = 'website_logs',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
创建用于数据分析的物化视图
接下来,我们将创建 5 个物化视图,用于分析多种信息。
物化视图 1:分析网站用户指标
以下查询建立了名为 website_user_metrics
的物化视图,用于生成关于用户活动的聚合统计信息。这个物化视图计算了多种指标,包括总请求、响应时间、错误计数、登录/登出计数及独立 IP 地址等,以上统计信息按 1 分钟间隔呈现,并对每个用户及相应时间窗口进行统计。
CREATE MATERIALIZED VIEW website_user_metrics AS
WITH UserActivityStats AS (
SELECT
username,
COUNT(username) AS total_requests,
window_start,
window_end,
MIN(response_time) AS min_response_time,
MAX(response_time) AS max_response_time,
AVG(response_time) AS avg_response_time,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY response_time) AS median_response_time,
SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) AS total_errors,
SUM(CASE WHEN user_action = 'login' THEN 1 ELSE 0 END) AS login_count,
SUM(CASE WHEN user_action = 'logout' THEN 1 ELSE 0 END) AS logout_count,
COUNT(DISTINCT ip_address) AS unique_ips
FROM TUMBLE (website_logs_source, request_timestamp, INTERVAL '1 MINUTES')
GROUP BY username, window_start, window_end
)
SELECT
username,
total_requests,
min_response_time,
max_response_time,
avg_response_time,
median_response_time,
total_errors,
login_count,
logout_count,
unique_ips,
window_start,
window_end
FROM UserActivityStats;
物化视图 2:分析时间窗口中排名最高的用户行为
以下查询生成名为 top_user_actions
的物化视图,识别并排名网站日志中三种最频繁的用户行为,时间间隔为 1 分钟。它使用窗口函数根据行为发生次数分配排名,展示了行为名称、发生次数及相应时间窗口等信息。
CREATE MATERIALIZED VIEW top_user_actions AS
WITH ranked_user_actions AS (
SELECT
user_action,
COUNT(user_action) AS count_user_activity,
window_start,
window_end,
ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY COUNT(user_action) DESC) AS action_rank
FROM TUMBLE (website_logs_source, request_timestamp, INTERVAL '1 MINUTES')
GROUP BY user_action, window_start, window_end
)
SELECT
user_action,
count_user_activity,
window_start,
window_end
FROM ranked_user_actions
WHERE action_rank <= 5
ORDER BY window_start, action_rank;
物化视图 3:分析用户互动中的引荐活动
以下查询创建了名为 referrer_activity_summary
的物化视图,提供基于引荐者的网站活动概要,时间间隔为 1 分钟。该物化视图聚合了多个指标,包括引荐访问次数、页面访问次数、互动次数、内容互动次数和窗口互动次数。这些指标与相应的时间窗口一起呈现,用于分析和评估。
CREATE MATERIALIZED VIEW referrer_activity_summary AS
SELECT
referrer,
COUNT(referrer) AS referrer_visit_count,
SUM(CASE WHEN user_action IN ('view_page', 'navigate_page') THEN 1 ELSE 0 END) AS page_visits,
SUM(CASE WHEN user_action IN ('submit_form', 'login', 'logout') THEN 1 ELSE 0 END) AS interactions,
SUM(CASE WHEN user_action IN ('scroll_page', 'download_file', 'upload_file') THEN 1 ELSE 0 END) AS content_interactions,
SUM(CASE WHEN user_action IN ('close_window', 'open_new_tab') THEN 1 ELSE 0 END) AS window_interactions,
window_start,
window_end
FROM TUMBLE(website_logs_source, request_timestamp, INTERVAL '1 MINUTES')
GROUP BY referrer,
window_start, window_end;
物化视图 4:分析用户 HTTP 状态码
以下查询建立名为 status_code_analysis_summary
的物化视图,分析并汇总网站日志中 HTTP 状态码的分布,时间间隔为 1 分钟。该物化视图计算的指标包括:每个状态码的计数、每个状态码的平均响应时间、以及每个时间窗口内状态码的累计计数和百分比。这些指标与相应的时间窗口一起呈现,按时间窗口和状态码的降序排序。
CREATE MATERIALIZED VIEW status_code_analysis_summary
WITH Status_Code_Analysis AS (
SELECT
status_code,
COUNT(status_code) AS count_status_code,
AVG(response_time) AS avg_response_time,
SUM(COUNT(status_code)) OVER (PARTITION BY window_start, window_end ORDER BY status_code) AS cumulative_count,
100.0 * SUM(COUNT(status_code)) OVER (PARTITION BY window_start, window_end ORDER BY status_code) / SUM(COUNT(status_code)) OVER (PARTITION BY window_start, window_end) AS cumulative_percentage,
window_start,
window_end
FROM TUMBLE (website_logs_source, request_timestamp, INTERVAL '1 MINUTES')
GROUP BY status_code, window_start, window_end
)
SELECT
status_code,
count_status_code,
avg_response_time,
cumulative_count,
cumulative_percentage,
window_start,
window_end
FROM Status_Code_Analysis
ORDER BY window_start DESC, status_code;
物化视图 5:分析用户安全性与安全级别
以下查询建立名为 security_level_analysis_summary
的物化视图,用于分析并汇总网站日志安全级别,时间间隔为 1 分钟。该物化视图计算的指标包括:每个安全级别的计数、每个安全级别的平均响应时间,以及每个时间窗口中安全级别的中位数计数。这些指标与相应的时间窗口一起呈现,按时间窗口和安全级别的中位数计数的降序排序。
CREATE MATERIALIZED VIEW security_level_analysis_summary AS
WITH Security_Profiling AS (
SELECT
security_level,
COUNT(security_level) AS count_security_level,
AVG(response_time) AS avg_response_time,
PERCENTILE_DISC (0.5) WITHIN GROUP (ORDER BY security_level) AS median_count_security_level,
window_start,
window_end
FROM TUMBLE (website_logs_source, request_timestamp, INTERVAL '1 MINUTES')
GROUP BY security_level, window_start, window_end
)
SELECT
security_level,
count_security_level,
avg_response_time,
median_count_security_level,
window_start,
window_end
FROM Security_Profiling
ORDER BY window_start DESC, median_count_security_level;
3第三步:将数据从 RisingWave 发送到 Apache Grafana 进行可视化
本步骤中,我们将配置 Grafana, 从 RisingWave 读取数据,然后构建可视化图表。
将 RisingWave 连接到 Grafana
要在 Grafana 中使用 RisingWave 作为数据源并创建可视化图表和看板,请按照 RisingWave 文档配置 Grafana[6]。
在根据文档连接 RisingWave 与 Grafana 后,您就可以将 RisingWave 的物化视图作为表格纳入 Grafana 来设计图表,并构建综合全面的看板。
使用 Grafana 进行数据可视化:表格、图表和看板
本节陈列了一些可构建的图表和看板。比如,以下表格由上文创建的 website_logs_source
数据源生成。
以下图表由名为 website_user_metrics
的物化视图生成,提供基于网站日志的用户活动的聚合统计信息。
以下图表基于 top_user_actions
物化视图创建,用于识别并排名网站日志中排名前五的用户行为。
以下图表由 referrer_activity_summary
物化视图生成,用以汇总基于引荐者的网站活动。
以下图表由 status_code_analysis_summary
物化视图生成,用于分析并汇总网站日志中 HTTP 状态码的分布。
以下图表由 security_level_analysis_summary
物化视图生成,用于分析并汇总网站日志安全级别。
以下则是一个综合看板,包含了上述一系列实时监控网站的图表,提供了全面的安全监控功能,增强了威胁检测和响应能力。
4总结
本文介绍了能够整合 RisingWave、WarpStream 和 Grafana 的实时网站监控系统。整个系统的设置过程清楚简单,要监控每个指标,您只需要在 RisingWave 中创建一个物化视图并在 Grafana 中进行可视化。上文展示的分析仅作为示例,为您提供灵感。如果您的数据点已经准备就绪,并提供必要的数据,可以尝试更复杂的分析和转换逻辑。欢迎您的进一步探索,如有任何问题或需要帮助,请随时联系我们。
参考资料
WarpStream: https://www.warpstream.com/
[2]RisingWave: https://risingwave.com/
[3]Grafana: https://grafana.com/
[4]设置 WarpStream: https://docs.risingwave.com/docs/current/ingest-from-warpstream/#set-up-warpstream
[5]快速入门: https://docs.risingwave.com/docs/current/get-started/
[6]配置 Grafana: https://docs.risingwave.com/docs/current/grafana-integration/
关于 RisingWave
往期推荐
技术内幕