技术创想104 |ClickHouse 日志收集方案

文摘   科技   2024-04-18 17:49   北京  

概述

在当今快速发展的信息技术时代,日志数据的收集、存储和分析对于企业运维管理至关重要。本文档详细阐述了一套基于 ClickHouse 的日志收集方案,旨在克服现有系统的局限性,显著提升数据处理效率,降低运维成本,并增强系统的扩展性和适应性。

现状分析

1.1 当前日志收集系统

目前,我们的日志收集系统主要基于 Elastic Stack 的开源实现——OpenSearch。该系统通过 fluentbit 和 filebeat 两种方式采集服务日志和 nginx 日志,并通过 kafka 传输至 logstash 进行数据清洗,最终存储于 OpenSearch 中,并通过 Kibana 进行数据展示。

1.2 存在的问题

1. 数据恢复问题:研发团队频繁需要访问超过 7 天的历史数据,这通常需要从 S3 手动恢复数据,这一过程既耗时又低效;

2. 查询性能问题:在处理多天或超过一周的数据查询时,现有集群的性能表现不佳,难以满足业务需求;

3. 存储成本问题:OpenSearch 作为一款全文索引数据库,其存储成本相对较高,对企业成本控制构成挑战。

新系统要求

针对现有系统存在的问题,我们对新的日志收集系统提出以下要求:

1. 低磁盘使用率:新系统应能够有效压缩数据,降低存储成本;

2. 快速查询能力:新系统需要提供快速查询大量数据的能力,以满足业务需求;

3. 最小改动:新系统应尽可能与现有日志收集方式兼容,减少对现有系统的改动。

ClickHouse 简介

ClickHouse 是一款开源的列式数据库管理系统,专为高效的在线分析处理(OLAP)而设计。它以其出色的性能、极速查询和强大的数据处理能力而受到广泛赞誉。

3.1 特点

1. 列式存储:ClickHouse 采用列式存储引擎,提供高压缩率和出色的查询性能;

2. 分布式架构:支持分布式部署,确保了数据的高可用性和负载均衡;

3. 快速查询:通过优化查询执行计划和并行处理技术,实现了毫秒级的查询响应;

4. 灵活的数据模型:支持多种数据类型和复杂的数据结构,包括嵌套数据和map等;

5. 丰富的功能:提供聚合函数、窗口函数、数据压缩和导入导出等强大的数据处理功能;

6. SQL 兼容性:支持标准的 SQL 查询语言,便于开发者和分析师使用;

7. 活跃的生态系统:拥有一个活跃的开发者社区和丰富的生态系统,为用户提供持续的技术支持和新功能。

Vector 简介

Vector 是一款开源的数据管道工具,专注于简化日志、指标和事件数据的集成、路由和转换。它提供了灵活的配置选项和卓越的性能,使得用户能够轻松构建和管理复杂的数据管道。

4.1 特点

1. 数据源和目的地:支持多种数据源和目的地,包括本地文件、网络流和各类数据库等;

2. 强大的数据处理能力:内置丰富的数据处理函数和工具,可以对数据进行过滤、格式化和聚合等操作;

3. 插件化架构:通过加载不同的插件,用户可以按需扩展Vector的功能;

4. 基于 Rust 的高性能实现:Vector基于Rust语言构建,提供了高性能的实时数据处理能力;

5. 简洁易用的设计理念:Vector的设计直观简单,便于用户安装、配置和使用。

Chproxy 简介

Chproxy 是一个专为 ClickHouse 设计的开源代理工具,它的目的是简化ClickHouse集群的管理和维护工作。

5.1 特点

1. 智能负载均衡:Chproxy 作为负载均衡器,能够将客户端请求均匀地分配到集群中的各个节点;

2. 自动故障转移:在集群节点发生故障时,Chproxy 能够自动将请求重新路由到健康的节点;

3. 高效的连接池管理:Chproxy 提供连接池管理功能,减少了连接创建和销毁的开销;

4. 安全认证机制:支持安全认证,确保了客户端的合法性和访问权限的控制;

5. 灵活的配置选项:用户可以根据实际需求定制代理的行为,包括负载均衡策略和故障转移策略;

6. 监控与日志功能:Chproxy 内置了监控和日志功能,帮助用户及时发现并解决潜在问题。

ClickVisual 简介

ClickVisual 是一个为 ClickHouse 量身定制的开源日志查询、分析和报警的可视化平台。它为用户提供了一套完整的应用可靠性解决方案。

6.1 核心亮点

1. 直观的可视化查询面板:用户可以通过友好的界面进行日志查询,并直观地查看图表和原始日志条目;

2. 高效的日志索引功能:ClickVisual 支持用户设置和分析不同日志索引的占比,帮助用户更好地理解和管理日志数据;

3. 强大的查询构建器:平台提供了查询构建器,帮助用户快速构建复杂查询,并提供查询优化建议;

4. 完善的权限管理机制:ClickVisual 支持权限管理,确保了数据的安全性和完整性;

5. 实时监控能力:平台提供实时监控功能,用户可以实时跟踪 ClickHouse 数据库的运行状态。

日志收集流程

通过引入 ClickHouse、Vector、Chproxy 和 ClickVisual 等先进的工具和技术,我们提出的日志收集方案旨在全面解决现有系统的挑战。新方案不仅能够提供更高的查询性能和更低的存储成本,而且对现有系统的改动极小,能够快速部署并投入使用。我们坚信,这套方案将为企业的运维管理带来显著的提升和改进,为企业的长期发展和竞争力提供坚实的技术支持。

I. 当前日志格式:

  • nginx 日志:

1.1.1.1  -       2024-03-17T02:14:07+00:00       api.xxx.xx    GET /xxx/credit-result?externalReferenceUid=20000371526215 HTTP/1.1  200     945     382     -       Apache-HttpAsyncClient/3.2.3 (Java/1.8.0_242)   575     dsahhtrewiuqhtiueqwtwqiuetgewqgtwgeqigwet==       -       -       api.xxx.xx    ac589a2fb68c02a4b829615403622fa4        -       -       -       -       -       10.0.0.0:80     0.023   0.000   -       200
  • 服务日志:

{"@timestamp":1710756972.77967,"java_time":"2024-03-18 18:16:12.779","project_name":"xx-xxxxx","application_name":"xxxxx-xxxx-xxx-api","TID":"f6edb93bbfiue8f3a9d6fe2fd03bb9cf.177.1710787690640435","span_id":"","logger":"xxxx_xxxx_xxxxx.go/GetXxxXxxx():122","thread":"goroutine-2578716335","level":"INFO","stack_trace":"","msg":"[GetUserConfig][U7157648639]finish processing default branches cost time: 3.406592ms"}

II. ClickHouse 计划保留最近一年的数据;

III. 新的数据流:

  • fluentbit sidecar 方式采集服务日志 --> 中转 fluentbit --> kafka --> vector 清洗数据 --> chproxy 代理 --> ClickHouse 集群 --> ClickVisual 展示;

  • filebeat daemonset 方式采集 nginx 日志 --> kafka --> vector 清洗数据 --> chproxy 代理 --> ClickHouse 集群 --> ClickVisual 展示。

7.1 ClickHouse 建表

# 每台机器都需要建一张本地表# 加了 on cluster 就只需在集群中的一台执行即可CREATE TABLE logs.nginxlogs_prod on cluster dw_log(    `timestamp` DateTime,    `remote_addr` String,    `remote_user` String,    `host` String,    `request_method` String,    `request_http_version` String,    `status` Int,    `bytes_sent` Int,    `body_bytes_sent` Int,    `http_referer` String,    `useragent` String,    `request_length` Int,    `http_authorization` String,    `http_x_forwarded_proto` String,    `http_x_forwarded_for` String,    `server_name` String,    `request_id` String,    `geoip2_data_city_name` String,    `geoip2_data_country_name` String,    `geoip2_data_city_location_latitude` String,    `geoip2_data_city_location_longitude` String,    `geoip2_data_city_region_name` String,    `upstream_addr` String,    `upstream_response_time` Decimal,    `upstream_connect_time` Decimal,    `upstream_status` Int,    `topic` String,    `uri_path` String,    `uri_param` String,    `device` String,    `major` String,    `minor` String,    `name` String,    `os` String,    `os_full` String,    `os_major` String,    `os_minor` String,    `os_name` String,    `os_patch` String,    `os_version` String,    `patch` String,    `version` String)ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/nginxlogs_prod', '{replica}')PARTITION BY toYYYYMMDD(timestamp)ORDER BY date(timestamp)TTL toDateTime(timestamp) + toIntervalDay(365)# 集群中的每台机器都需要创建CREATE TABLE logs.nginxlogs_prod_distributed(    `timestamp` DateTime,    `remote_addr` String,    `remote_user` String,    `host` String,    `request_method` String,    `request_http_version` String,    `status` Int32,    `bytes_sent` Int32,    `body_bytes_sent` Int32,    `http_referer` String,    `useragent` String,    `request_length` Int32,    `http_authorization` String,    `http_x_forwarded_proto` String,    `http_x_forwarded_for` String,    `server_name` String,    `request_id` String,    `geoip2_data_city_name` String,    `geoip2_data_country_name` String,    `geoip2_data_city_location_latitude` String,    `geoip2_data_city_location_longitude` String,    `geoip2_data_city_region_name` String,    `upstream_addr` String,    `upstream_response_time` Float32,    `upstream_connect_time` Float32,    `upstream_status` Int32,    `topic` String,    `uri_path` String,    `uri_param` String,    `device` String,    `major` String,    `minor` String,    `name` String,    `os` String,    `os_full` String,    `os_major` String,    `os_minor` String,    `os_name` String,    `os_patch` String,    `os_version` String,    `patch` String,    `version` String)ENGINE = Distributed(dw_log, logs, nginxlogs_prod, rand());
CREATE TABLE logs.servicelogs_prod on cluster dw_log( `timestamp` DateTime, `TID` String, `application_name` String, `caller_class_name` String, `caller_file_name` String, `caller_line_number` Int, `caller_method_name` String, `level` String, `logger` String, `msg` String, `project_name` String, `stack_trace` String, `thread` String)ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/servicelogs_prod', '{replica}')PARTITION BY toYYYYMMDD(timestamp)ORDER BY date(timestamp)TTL toDateTime(timestamp) + toIntervalDay(365)# 集群中的每台机器都需要创建CREATE TABLE logs.servicelogs_prod_distributed( `timestamp` DateTime, `TID` String, `application_name` String, `caller_class_name` String, `caller_file_name` String, `caller_line_number` Int, `caller_method_name` String, `level` String, `logger` String, `msg` String, `project_name` String, `stack_trace` String, `thread` String)ENGINE = Distributed(dw_log, logs, servicelogs_prod, rand());

7.2 Chproxy 配置

# Whether to print debug logs.## By default debug logs are disabled.log_debug: true
hack_me_please: true
server: http: listen_addr: ":9090"
users: - name: "default" to_cluster: "logs" to_user: "default" deny_http: false allow_cors: true password: "xxx"
# by default each cluster has `default` user which can be overridden by section `users`clusters: - name: "logs" scheme: "http" nodes: ["IP1:8123", "IP2:8123", "IP3:8123"] users: - name: "default" password: "xxx"

7.3 Vector 配置

sources:  kafka-nginxlogs-prod:    type: kafka    bootstrap_servers: server1:9092,server2:9092,server3:9092    group_id: vector-nginxlogs-prod    auto_offset_reset: earliest    topics:      - nginxlogs-prod
kafka-servicelogs-prod: type: kafka bootstrap_servers: server1:9092,server2:9092,server3:9092 group_id: vector-servicelogs-prod auto_offset_reset: earliest topics: - servicelogs-prod
transforms: remap-kafka-servicelogs-prod: inputs: - "kafka-servicelogs-prod" type: "remap" source: >- . = parse_json!(.message)
# 删除 kafka 自带时间戳 del(.timestamp)
.timestamp = del(."@timestamp")
if is_float(.timestamp) { .timestamp = floor(.timestamp)
.timestamp = to_string(.timestamp)
.timestamp = parse_timestamp!(.timestamp, format: "%s")
.timestamp = parse_regex!( to_string(.timestamp), r'(?P<time>\d+\-\d+\-\d+T\d+:\d+:\d+)' ) .timestamp = .timestamp.time } else { .timestamp = parse_regex!( .timestamp, r'(?P<time>\d+\-\d+\-\d+T\d+:\d+:\d+)' ) .timestamp = .timestamp.time }
.caller_line_number = to_int!(.caller_line_number)
remap-kafka-nginxlogs-prod: inputs: - "kafka-nginxlogs-prod" type: "remap" source: >- . = parse_csv!( .message, delimiter: "\t" )
log = {}
log.remote_addr = .[0]
log.time_iso8601 = .[2]
log.host = .[3]
log.request = .[4]
log.status = .[5]
log.bytes_sent = .[6]
log.body_bytes_sent = .[7]
log.http_referer = .[8]
log.http_user_agent = .[9]
log.request_length = .[10]
log.http_x_forwarded_proto = .[12]
log.http_x_forwarded_for = .[13]
log.server_name = .[14]
log.request_id = .[15]
log.geoip2_data_city_name = .[16]
log.geoip2_data_country_name = .[17]
log.geoip2_data_city_location_latitude = .[18]
log.geoip2_data_city_location_longitude = .[19]
log.geoip2_data_city_region_name = .[20]
log.upstream_addr = .[21]
log.upstream_response_time = .[22]
log.upstream_connect_time = .[23]
log.upstream_status = .[25]
log.status = to_int!(log.status)
log.bytes_sent = to_int!(log.bytes_sent)
log.body_bytes_sent = to_int!(log.body_bytes_sent)
log.request_length = to_int!(log.request_length)
if log.upstream_status == "-" { del(log.upstream_status)
del(log.upstream_connect_time)
del(log.upstream_response_time) } else { log.upstream_response_time = to_float!(log.upstream_response_time)
log.upstream_connect_time = to_float!(log.upstream_connect_time)
log.upstream_status = to_int!(log.upstream_status) }
http_user_agent = parse_user_agent!( log.http_user_agent, mode: "enriched" )
os = http_user_agent.os
log.os = os.family
log.os_full = log.os + " " + os.major
log.os_major = os.major
log.os_minor = os.minor
log.os_name = log.os
log.os_patch = os.patch
log.os_version = os.version
browser = http_user_agent.browser
log.major = browser.major
log.minor = browser.minor
log.name = browser.family
log.version = browser.version
log.patch = browser.patch
log.device = http_user_agent.device.family
log.request_http_version = del(log.http_version)
log.request = parse_csv!(log.request, " ")
log.request_method = log.request[0]
log.request_url = log.request[1]
log.request_http_version = log.request[2]
del(log.request)
log.request_url = parse_grok!( log.request_url, "%{URIPATH:uri_path}(?:%{URIPARAM:uri_param})?" )
request_url = log.request_url
log.uri_param = request_url.uri_param
log.url_path = request_url.uri_path
del(log.request_url)
log.timestamp = parse_regex!( log.time_iso8601, r'(?P<time>\d+\-\d+\-\d+T\d+:\d+:\d+)' )
log.timestamp = log.timestamp.time
del(log.time_iso8601)
. = log
sinks: kafka-servicelogs-prod-2-ck: type: clickhouse inputs: - remap-kafka-servicelogs-prod endpoint: "http://localhost:9090" database: logs table: servicelogs_prod_distributed skip_unknown_fields: true auth: strategy: basic user: default password: xxxx request: concurrency: adaptive kafka-nginxlogs-prod-2-ck: type: clickhouse inputs: - remap-kafka-nginxlogs-prod endpoint: "http://localhost:9090" database: logs table: nginxlogs_prod_distributed skip_unknown_fields: true auth: strategy: basic user: default password: xxx request: concurrency: adaptive

7.4 ClickVisual 配置

[app]isMultiCopy = false  # multi-copy mode: Redis must be configured, otherwise the service cannot startsecretKey = "secretKey"rootURL = "https://clickvisual.example.com"baseURL = "/api/admin/login/"permissionFile = './config/resource.yaml'serveFromSubPath = false
[casbin.rule]path = "./config/rbac.conf"
[server.http]# HTTP server Hosthost = "0.0.0.0"# HTTP server Portport = 19001embedPath = "dist"maxAge = 86400
[server.governor]# Governor server hosthost = "0.0.0.0"# Governor server portport = 19011
[logger]# log level, available level: "debug", "info", "warn", "error", "panic", "fatal"level = "info"name = "clickvisual.log"
# if isMultiCopy is true[redis]debug = trueaddr = "127.0.0.1:6379"writeTimeout = "3s"password = "**"
[mysql]debug = true# database DSNdsn = "clickvisual:xxx@tcp(127.0.0.1:3306)/clickvisual?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=True&loc=Local&readTimeout=1s&timeout=1s&writeTimeout=3s"# log levellevel = "debug"# maximum number of connections in the idle connection pool for databasemaxIdleConns = 5# maximum number of open connections for databasemaxOpenConns = 10# maximum amount of time a connectionconnMaxLifetime = "300s"
[auth]mode = "memstore" # redis memstorename = "clickvisual_session"debug = trueKeypairs = "secret"# if use mode redis# redisSize = 10# redisNetwork = "tcp"# redisAddr = ""# redisPassword = ""
[auth.anonymous]# enable anonymous accessenabled = false
[auth.proxy]enabled = falseisAutoLogin = falseheaderName = "X-CLICKVISUAL-USER"headerNickName = "X-CLICKVISUAL-NICKNAME"rootTokenKey = "X-CLICKVISUAL-TOKEN"rootTokenValue = "xxx"
[[auth.tps]]typ = "gitlab"enable = trueclientId = "xxx"clientSecret = "gloas-xxx"allowSignUp = truescopes = ["api"]authUrl = "https://repo.example.com/oauth/authorize"tokenUrl = "https://repo.example.com/oauth/token"apiUrl = "https://repo.example.com/api/v4"allowedDomains = []teamIds = []allowedOrganizations = []
[prom2click]enable = true
[[prom2click.cfgs]]host = "127.0.0.1"port = 9222clickhouseDSN = "tcp://127.0.0.1:9000"clickhouseDB = "metrics"clickhouseTable = "samples"

7.5 日志展示

结语

随着企业数据量的不断增长,一个高效、可靠且成本效益的日志收集系统变得尤为重要。本文档提出的基于 ClickHouse 的日志收集方案,不仅解决了现有系统的多个痛点,还为企业未来的数据管理和分析工作奠定了坚实的基础。我们期待这套方案能够帮助企业更好地理解和利用日志数据,推动企业在数字化转型的道路上走得更远。

附录

9.1 性能优化实践

9.1.1 ClickHouse 的性能优化:

  • 数据分区:合理地对数据进行分区可以提高查询效率,减少不必要的数据扫描;

  • 索引优化:创建合适的索引可以显著提升查询速度,特别是在处理大量数据时;

  • 查询优化:使用 EXPLAIN 等工具分析查询计划,优化查询语句,避免全表扫描;

  • 硬件优化:合理的硬件配置,如使用 SSD 硬盘、增加内存等,也能显著提升 ClickHouse 的性能。

9.1.2 Vector 的性能调优:

  • 并行处理:Vector 支持并行处理数据,可以通过配置来优化其性能;

  • 资源限制:合理配置 Vector 的 CPU 和内存资源,避免单个任务占用过多资源导致系统负载过高;

  • 数据压缩:在数据传输过程中启用压缩,可以减少网络带宽的占用和存储空间的使用。

9.1.3 Chproxy 的性能监控:

    • 监控指标:监控 Chproxy 的 CPU 使用率、内存使用、请求延迟等关键指标;

    • 日志分析:定期分析 Chproxy 的日志,及时发现并解决潜在的性能问题;

    • 压力测试:定期进行压力测试,评估 Chproxy 在高负载下的表现,确保其稳定性和可靠性。

    9.1.4 ClickVisual 的用户体验优化:

      • 界面设计:提供直观、易用的界面设计,降低用户的学习成本;

      • 交互优化:优化用户操作流程,减少用户在查询和分析过程中的操作步骤;

      • 性能监控:提供实时的性能监控功能,帮助用户了解系统状态,及时发现并解决问题。

      9.2 安全性考量

      1. 数据加密:在数据传输和存储过程中使用加密技术,确保数据的安全性;

      2. 访问控制:实施严格的访问控制策略,确保只有授权用户才能访问敏感数据;

      3. 审计日志:记录所有用户的操作,便于追踪和审计。


      关于领创集团

      (Advance Intelligence Group)
      领创集团成立于 2016年,致力于通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈。集团旗下包含企业业务和消费者业务两大板块,企业业务包含 ADVANCE.AI 和 Ginee,分别为银行、金融、金融科技、零售和电商行业客户提供基于 AI 技术的数字身份验证、风险管理产品和全渠道电商服务解决方案;消费者业务 Atome Financial 包括亚洲领先的数字金融服务平台 Atome 等。2021年 9月,领创集团宣布完成超4亿美元 D 轮融资,融资完成后领创集团估值已超 20亿美元,成为新加坡最大的独立科技创业公司之一。




      领创集团Advance Group
      领创集团是亚太地区AI技术驱动的科技集团。
       最新文章