在当今快速发展的信息技术时代,日志数据的收集、存储和分析对于企业运维管理至关重要。本文档详细阐述了一套基于 ClickHouse 的日志收集方案,旨在克服现有系统的局限性,显著提升数据处理效率,降低运维成本,并增强系统的扩展性和适应性。
现状分析
1.1 当前日志收集系统
1.2 存在的问题
1. 数据恢复问题:研发团队频繁需要访问超过 7 天的历史数据,这通常需要从 S3 手动恢复数据,这一过程既耗时又低效;
2. 查询性能问题:在处理多天或超过一周的数据查询时,现有集群的性能表现不佳,难以满足业务需求;
3. 存储成本问题:OpenSearch 作为一款全文索引数据库,其存储成本相对较高,对企业成本控制构成挑战。
新系统要求
针对现有系统存在的问题,我们对新的日志收集系统提出以下要求:
1. 低磁盘使用率:新系统应能够有效压缩数据,降低存储成本;
2. 快速查询能力:新系统需要提供快速查询大量数据的能力,以满足业务需求;
3. 最小改动:新系统应尽可能与现有日志收集方式兼容,减少对现有系统的改动。
ClickHouse 简介
ClickHouse 是一款开源的列式数据库管理系统,专为高效的在线分析处理(OLAP)而设计。它以其出色的性能、极速查询和强大的数据处理能力而受到广泛赞誉。
3.1 特点
1. 列式存储:ClickHouse 采用列式存储引擎,提供高压缩率和出色的查询性能;
2. 分布式架构:支持分布式部署,确保了数据的高可用性和负载均衡;
3. 快速查询:通过优化查询执行计划和并行处理技术,实现了毫秒级的查询响应;
4. 灵活的数据模型:支持多种数据类型和复杂的数据结构,包括嵌套数据和map等;
5. 丰富的功能:提供聚合函数、窗口函数、数据压缩和导入导出等强大的数据处理功能;
6. SQL 兼容性:支持标准的 SQL 查询语言,便于开发者和分析师使用;
7. 活跃的生态系统:拥有一个活跃的开发者社区和丰富的生态系统,为用户提供持续的技术支持和新功能。
Vector 是一款开源的数据管道工具,专注于简化日志、指标和事件数据的集成、路由和转换。它提供了灵活的配置选项和卓越的性能,使得用户能够轻松构建和管理复杂的数据管道。
4.1 特点
1. 数据源和目的地:支持多种数据源和目的地,包括本地文件、网络流和各类数据库等;
2. 强大的数据处理能力:内置丰富的数据处理函数和工具,可以对数据进行过滤、格式化和聚合等操作;
3. 插件化架构:通过加载不同的插件,用户可以按需扩展Vector的功能;
4. 基于 Rust 的高性能实现:Vector基于Rust语言构建,提供了高性能的实时数据处理能力;
5. 简洁易用的设计理念:Vector的设计直观简单,便于用户安装、配置和使用。
Chproxy 是一个专为 ClickHouse 设计的开源代理工具,它的目的是简化ClickHouse集群的管理和维护工作。
5.1 特点
1. 智能负载均衡:Chproxy 作为负载均衡器,能够将客户端请求均匀地分配到集群中的各个节点;
2. 自动故障转移:在集群节点发生故障时,Chproxy 能够自动将请求重新路由到健康的节点;
3. 高效的连接池管理:Chproxy 提供连接池管理功能,减少了连接创建和销毁的开销;
4. 安全认证机制:支持安全认证,确保了客户端的合法性和访问权限的控制;
5. 灵活的配置选项:用户可以根据实际需求定制代理的行为,包括负载均衡策略和故障转移策略;
6. 监控与日志功能:Chproxy 内置了监控和日志功能,帮助用户及时发现并解决潜在问题。
ClickVisual 简介
ClickVisual 是一个为 ClickHouse 量身定制的开源日志查询、分析和报警的可视化平台。它为用户提供了一套完整的应用可靠性解决方案。
6.1 核心亮点
1. 直观的可视化查询面板:用户可以通过友好的界面进行日志查询,并直观地查看图表和原始日志条目;
2. 高效的日志索引功能:ClickVisual 支持用户设置和分析不同日志索引的占比,帮助用户更好地理解和管理日志数据;
3. 强大的查询构建器:平台提供了查询构建器,帮助用户快速构建复杂查询,并提供查询优化建议;
4. 完善的权限管理机制:ClickVisual 支持权限管理,确保了数据的安全性和完整性;
5. 实时监控能力:平台提供实时监控功能,用户可以实时跟踪 ClickHouse 数据库的运行状态。
日志收集流程
通过引入 ClickHouse、Vector、Chproxy 和 ClickVisual 等先进的工具和技术,我们提出的日志收集方案旨在全面解决现有系统的挑战。新方案不仅能够提供更高的查询性能和更低的存储成本,而且对现有系统的改动极小,能够快速部署并投入使用。我们坚信,这套方案将为企业的运维管理带来显著的提升和改进,为企业的长期发展和竞争力提供坚实的技术支持。
I. 当前日志格式:
nginx 日志:
1.1.1.1 - 2024-03-17T02:14:07+00:00 api.xxx.xx GET /xxx/credit-result?externalReferenceUid=20000371526215 HTTP/1.1 200 945 382 - Apache-HttpAsyncClient/3.2.3 (Java/1.8.0_242) 575 dsahhtrewiuqhtiueqwtwqiuetgewqgtwgeqigwet== - - api.xxx.xx ac589a2fb68c02a4b829615403622fa4 - - - - - 10.0.0.0:80 0.023 0.000 - 200
服务日志:
{"@timestamp":1710756972.77967,"java_time":"2024-03-18 18:16:12.779","project_name":"xx-xxxxx","application_name":"xxxxx-xxxx-xxx-api","TID":"f6edb93bbfiue8f3a9d6fe2fd03bb9cf.177.1710787690640435","span_id":"","logger":"xxxx_xxxx_xxxxx.go/GetXxxXxxx():122","thread":"goroutine-2578716335","level":"INFO","stack_trace":"","msg":"[GetUserConfig][U7157648639]finish processing default branches cost time: 3.406592ms"}
II. ClickHouse 计划保留最近一年的数据;
III. 新的数据流:
fluentbit sidecar 方式采集服务日志 --> 中转 fluentbit --> kafka --> vector 清洗数据 --> chproxy 代理 --> ClickHouse 集群 --> ClickVisual 展示;
filebeat daemonset 方式采集 nginx 日志 --> kafka --> vector 清洗数据 --> chproxy 代理 --> ClickHouse 集群 --> ClickVisual 展示。
7.1 ClickHouse 建表
# 每台机器都需要建一张本地表
# 加了 on cluster 就只需在集群中的一台执行即可
CREATE TABLE logs.nginxlogs_prod on cluster dw_log
(
`timestamp` DateTime,
`remote_addr` String,
`remote_user` String,
`host` String,
`request_method` String,
`request_http_version` String,
`status` Int,
`bytes_sent` Int,
`body_bytes_sent` Int,
`http_referer` String,
`useragent` String,
`request_length` Int,
`http_authorization` String,
`http_x_forwarded_proto` String,
`http_x_forwarded_for` String,
`server_name` String,
`request_id` String,
`geoip2_data_city_name` String,
`geoip2_data_country_name` String,
`geoip2_data_city_location_latitude` String,
`geoip2_data_city_location_longitude` String,
`geoip2_data_city_region_name` String,
`upstream_addr` String,
`upstream_response_time` Decimal,
`upstream_connect_time` Decimal,
`upstream_status` Int,
`topic` String,
`uri_path` String,
`uri_param` String,
`device` String,
`major` String,
`minor` String,
`name` String,
`os` String,
`os_full` String,
`os_major` String,
`os_minor` String,
`os_name` String,
`os_patch` String,
`os_version` String,
`patch` String,
`version` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/nginxlogs_prod', '{replica}')
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY date(timestamp)
TTL toDateTime(timestamp) + toIntervalDay(365)
# 集群中的每台机器都需要创建
CREATE TABLE logs.nginxlogs_prod_distributed
(
`timestamp` DateTime,
`remote_addr` String,
`remote_user` String,
`host` String,
`request_method` String,
`request_http_version` String,
`status` Int32,
`bytes_sent` Int32,
`body_bytes_sent` Int32,
`http_referer` String,
`useragent` String,
`request_length` Int32,
`http_authorization` String,
`http_x_forwarded_proto` String,
`http_x_forwarded_for` String,
`server_name` String,
`request_id` String,
`geoip2_data_city_name` String,
`geoip2_data_country_name` String,
`geoip2_data_city_location_latitude` String,
`geoip2_data_city_location_longitude` String,
`geoip2_data_city_region_name` String,
`upstream_addr` String,
`upstream_response_time` Float32,
`upstream_connect_time` Float32,
`upstream_status` Int32,
`topic` String,
`uri_path` String,
`uri_param` String,
`device` String,
`major` String,
`minor` String,
`name` String,
`os` String,
`os_full` String,
`os_major` String,
`os_minor` String,
`os_name` String,
`os_patch` String,
`os_version` String,
`patch` String,
`version` String
)
ENGINE = Distributed(dw_log, logs, nginxlogs_prod, rand());
CREATE TABLE logs.servicelogs_prod on cluster dw_log
(
`timestamp` DateTime,
`TID` String,
`application_name` String,
`caller_class_name` String,
`caller_file_name` String,
`caller_line_number` Int,
`caller_method_name` String,
`level` String,
`logger` String,
`msg` String,
`project_name` String,
`stack_trace` String,
`thread` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/servicelogs_prod', '{replica}')
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY date(timestamp)
TTL toDateTime(timestamp) + toIntervalDay(365)
# 集群中的每台机器都需要创建
CREATE TABLE logs.servicelogs_prod_distributed
(
`timestamp` DateTime,
`TID` String,
`application_name` String,
`caller_class_name` String,
`caller_file_name` String,
`caller_line_number` Int,
`caller_method_name` String,
`level` String,
`logger` String,
`msg` String,
`project_name` String,
`stack_trace` String,
`thread` String
)
ENGINE = Distributed(dw_log, logs, servicelogs_prod, rand());
7.2 Chproxy 配置
# Whether to print debug logs.
#
# By default debug logs are disabled.
log_debug: true
hack_me_please: true
server:
http:
listen_addr: ":9090"
users:
- name: "default"
to_cluster: "logs"
to_user: "default"
deny_http: false
allow_cors: true
password: "xxx"
# by default each cluster has `default` user which can be overridden by section `users`
clusters:
- name: "logs"
scheme: "http"
nodes: ["IP1:8123", "IP2:8123", "IP3:8123"]
users:
- name: "default"
password: "xxx"
7.3 Vector 配置
sources:
kafka-nginxlogs-prod:
type: kafka
bootstrap_servers: server1:9092,server2:9092,server3:9092
group_id: vector-nginxlogs-prod
auto_offset_reset: earliest
topics:
- nginxlogs-prod
kafka-servicelogs-prod:
type: kafka
bootstrap_servers: server1:9092,server2:9092,server3:9092
group_id: vector-servicelogs-prod
auto_offset_reset: earliest
topics:
- servicelogs-prod
transforms:
remap-kafka-servicelogs-prod:
inputs:
- "kafka-servicelogs-prod"
type: "remap"
source: >-
. = parse_json!(.message)
# 删除 kafka 自带时间戳
del(.timestamp)
.timestamp = del(."@timestamp")
if is_float(.timestamp) {
.timestamp = floor(.timestamp)
.timestamp = to_string(.timestamp)
.timestamp = parse_timestamp!(.timestamp, format: "%s")
.timestamp = parse_regex!(
to_string(.timestamp),
r'(?P<time>\d+\-\d+\-\d+T\d+:\d+:\d+)'
)
.timestamp = .timestamp.time
} else {
.timestamp = parse_regex!(
.timestamp,
r'(?P<time>\d+\-\d+\-\d+T\d+:\d+:\d+)'
)
.timestamp = .timestamp.time
}
.caller_line_number = to_int!(.caller_line_number)
remap-kafka-nginxlogs-prod:
inputs:
- "kafka-nginxlogs-prod"
type: "remap"
source: >-
. = parse_csv!(
.message,
delimiter: "\t"
)
log = {}
log.remote_addr = .[0]
log.time_iso8601 = .[2]
log.host = .[3]
log.request = .[4]
log.status = .[5]
log.bytes_sent = .[6]
log.body_bytes_sent = .[7]
log.http_referer = .[8]
log.http_user_agent = .[9]
log.request_length = .[10]
log.http_x_forwarded_proto = .[12]
log.http_x_forwarded_for = .[13]
log.server_name = .[14]
log.request_id = .[15]
log.geoip2_data_city_name = .[16]
log.geoip2_data_country_name = .[17]
log.geoip2_data_city_location_latitude = .[18]
log.geoip2_data_city_location_longitude = .[19]
log.geoip2_data_city_region_name = .[20]
log.upstream_addr = .[21]
log.upstream_response_time = .[22]
log.upstream_connect_time = .[23]
log.upstream_status = .[25]
log.status = to_int!(log.status)
log.bytes_sent = to_int!(log.bytes_sent)
log.body_bytes_sent = to_int!(log.body_bytes_sent)
log.request_length = to_int!(log.request_length)
if log.upstream_status == "-" {
del(log.upstream_status)
del(log.upstream_connect_time)
del(log.upstream_response_time)
} else {
log.upstream_response_time = to_float!(log.upstream_response_time)
log.upstream_connect_time = to_float!(log.upstream_connect_time)
log.upstream_status = to_int!(log.upstream_status)
}
http_user_agent = parse_user_agent!(
log.http_user_agent,
mode: "enriched"
)
os = http_user_agent.os
log.os = os.family
log.os_full = log.os + " " + os.major
log.os_major = os.major
log.os_minor = os.minor
log.os_name = log.os
log.os_patch = os.patch
log.os_version = os.version
browser = http_user_agent.browser
log.major = browser.major
log.minor = browser.minor
log.name = browser.family
log.version = browser.version
log.patch = browser.patch
log.device = http_user_agent.device.family
log.request_http_version = del(log.http_version)
log.request = parse_csv!(log.request, " ")
log.request_method = log.request[0]
log.request_url = log.request[1]
log.request_http_version = log.request[2]
del(log.request)
log.request_url = parse_grok!(
log.request_url,
"%{URIPATH:uri_path}(?:%{URIPARAM:uri_param})?"
)
request_url = log.request_url
log.uri_param = request_url.uri_param
log.url_path = request_url.uri_path
del(log.request_url)
log.timestamp = parse_regex!(
log.time_iso8601,
r'(?P<time>\d+\-\d+\-\d+T\d+:\d+:\d+)'
)
log.timestamp = log.timestamp.time
del(log.time_iso8601)
. = log
sinks:
kafka-servicelogs-prod-2-ck:
type: clickhouse
inputs:
- remap-kafka-servicelogs-prod
endpoint: "http://localhost:9090"
database: logs
table: servicelogs_prod_distributed
skip_unknown_fields: true
auth:
strategy: basic
user: default
password: xxxx
request:
concurrency: adaptive
kafka-nginxlogs-prod-2-ck:
type: clickhouse
inputs:
- remap-kafka-nginxlogs-prod
endpoint: "http://localhost:9090"
database: logs
table: nginxlogs_prod_distributed
skip_unknown_fields: true
auth:
strategy: basic
user: default
password: xxx
request:
concurrency: adaptive
7.4 ClickVisual 配置
[app]
isMultiCopy = false # multi-copy mode: Redis must be configured, otherwise the service cannot start
secretKey = "secretKey"
rootURL = "https://clickvisual.example.com"
baseURL = "/api/admin/login/"
permissionFile = './config/resource.yaml'
serveFromSubPath = false
[casbin.rule]
path = "./config/rbac.conf"
[server.http]
# HTTP server Host
host = "0.0.0.0"
# HTTP server Port
port = 19001
embedPath = "dist"
maxAge = 86400
[server.governor]
# Governor server host
host = "0.0.0.0"
# Governor server port
port = 19011
[logger]
# log level, available level: "debug", "info", "warn", "error", "panic", "fatal"
level = "info"
name = "clickvisual.log"
# if isMultiCopy is true
[redis]
debug = true
addr = "127.0.0.1:6379"
writeTimeout = "3s"
password = "**"
[mysql]
debug = true
# database DSN
dsn = "clickvisual:xxx@tcp(127.0.0.1:3306)/clickvisual?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=True&loc=Local&readTimeout=1s&timeout=1s&writeTimeout=3s"
# log level
level = "debug"
# maximum number of connections in the idle connection pool for database
maxIdleConns = 5
# maximum number of open connections for database
maxOpenConns = 10
# maximum amount of time a connection
connMaxLifetime = "300s"
[auth]
mode = "memstore" # redis memstore
name = "clickvisual_session"
debug = true
Keypairs = "secret"
# if use mode redis
# redisSize = 10
# redisNetwork = "tcp"
# redisAddr = ""
# redisPassword = ""
[auth.anonymous]
# enable anonymous access
enabled = false
[auth.proxy]
enabled = false
isAutoLogin = false
headerName = "X-CLICKVISUAL-USER"
headerNickName = "X-CLICKVISUAL-NICKNAME"
rootTokenKey = "X-CLICKVISUAL-TOKEN"
rootTokenValue = "xxx"
[[auth.tps]]
typ = "gitlab"
enable = true
clientId = "xxx"
clientSecret = "gloas-xxx"
allowSignUp = true
scopes = ["api"]
authUrl = "https://repo.example.com/oauth/authorize"
tokenUrl = "https://repo.example.com/oauth/token"
apiUrl = "https://repo.example.com/api/v4"
allowedDomains = []
teamIds = []
allowedOrganizations = []
[prom2click]
enable = true
[[prom2click.cfgs]]
host = "127.0.0.1"
port = 9222
clickhouseDSN = "tcp://127.0.0.1:9000"
clickhouseDB = "metrics"
clickhouseTable = "samples"
7.5 日志展示
结语
随着企业数据量的不断增长,一个高效、可靠且成本效益的日志收集系统变得尤为重要。本文档提出的基于 ClickHouse 的日志收集方案,不仅解决了现有系统的多个痛点,还为企业未来的数据管理和分析工作奠定了坚实的基础。我们期待这套方案能够帮助企业更好地理解和利用日志数据,推动企业在数字化转型的道路上走得更远。
附录
9.1 性能优化实践
9.1.1 ClickHouse 的性能优化:
数据分区:合理地对数据进行分区可以提高查询效率,减少不必要的数据扫描;
索引优化:创建合适的索引可以显著提升查询速度,特别是在处理大量数据时;
查询优化:使用 EXPLAIN 等工具分析查询计划,优化查询语句,避免全表扫描;
硬件优化:合理的硬件配置,如使用 SSD 硬盘、增加内存等,也能显著提升 ClickHouse 的性能。
9.1.2 Vector 的性能调优:
并行处理:Vector 支持并行处理数据,可以通过配置来优化其性能;
资源限制:合理配置 Vector 的 CPU 和内存资源,避免单个任务占用过多资源导致系统负载过高;
数据压缩:在数据传输过程中启用压缩,可以减少网络带宽的占用和存储空间的使用。
9.1.3 Chproxy 的性能监控:
监控指标:监控 Chproxy 的 CPU 使用率、内存使用、请求延迟等关键指标;
日志分析:定期分析 Chproxy 的日志,及时发现并解决潜在的性能问题;
压力测试:定期进行压力测试,评估 Chproxy 在高负载下的表现,确保其稳定性和可靠性。
9.1.4 ClickVisual 的用户体验优化:
界面设计:提供直观、易用的界面设计,降低用户的学习成本;
交互优化:优化用户操作流程,减少用户在查询和分析过程中的操作步骤;
性能监控:提供实时的性能监控功能,帮助用户了解系统状态,及时发现并解决问题。
9.2 安全性考量
数据加密:在数据传输和存储过程中使用加密技术,确保数据的安全性;
访问控制:实施严格的访问控制策略,确保只有授权用户才能访问敏感数据;
审计日志:记录所有用户的操作,便于追踪和审计。
关于领创集团