导读
概览 TiKV 各模块的内存消耗,建立全局认知;
深入 Raft Store,介绍写请求的处理流程;
讨论处理流程中的内存消耗,并介绍 Raft Store 的内存监控与管理;
具体的源码实现。
Storage:内存用于维护写事务的 Latch 以及 resolved-ts 模块追踪写事务的 Lock。
Coprocessor:内存用于缓存查询中间结果,与 gRPC 响应客户端速度相关,响应缓慢时内存消耗堆积。
Raft Store:内存用于 Raft Messages & Entries & Committed Entries、Entry Cache 和 Region metadata。下文会详细介绍这三大部分。
Raft Engine:基于 Append-only Log 结构,可视为只有 L0 的 LSM-tree,内存用于 Memtable 和 Block Cache,后者默认使用 2% 的系统内存。
KV Engine (RocksDB):同样存在 LSM-tree 的内存消耗。Block Cache 用于缓存用户数据,默认分配 45% 的系统内存以保障读性能稳定。
PreRaft:负责接受,预处理和传递请求/消息给 Raft 系统处理;
Raft:负责日志的复制与持久化;
Apply:负责将日志应用到状态机。
前排提醒:
序列图的参与者除明确指出的 Service、Module 和 Thread 外,均为 TiKV 代码中的 struct 实例对象(下同);
序列图刻意忽略了 Raft Batch 处理以简化流程(下同)。
Leader:gRPC Service 收到写请求后,通过 Storage 模块和 RaftRouter 转发,生成 Raft Command,并通过 channel 将其发送给目标 Region 的 PeerFsm,最终交由 RaftBatchSystem 处理。
Leader 或 Follower:收到 Raft Message 后,经过类似的流程后,将消息交由 RaftBatchSystem 处理。
特别提醒:
虚线箭头表示异步操作,既可以是直接的异步调用,也可以是由异步事件触发的操作。
Leader:首先 Raft 线程从 RaftBatchSystem 拉取 PeerFsm 进行处理,向 Raft 模块发起 Propose,得到日志 Entries(未提交的 Entries,区别于下一幅图将介绍的 Committed Entries,)。随后将这些 Entries 异步地发往 Follower。接着将它们缓存到 Entry Cache(下文会详细介绍 Entry Cache),并发起异步地日志持久化操作。最后推进下 Raft 状态。
Follower:处理流程与 Leader 类似,但处理的是 Append 而非 Propose。Follower 在处理 Append 请求时会同步更新自己的 Commit Index。此外,Follower 必须同步地持久化日志后,才能响应 Leader 的请求。
Leader 和 Follower:Raft 线程收到 Commit 消息后,通过 Raft 模块从 Entry Cache 中拿到 Committed Entries,注册这些 Entries 到 Entry Cache(下文会介绍这一步的用处),并将其发给到 ApplyFsm,最终 ApplyFsm 交由 ApplyBatchSystem 中的 Apply 线程应用到状态机。对于 Leader,在 Apply 完成后,会执行相应的回调函数并通知 Storage 模块写请求处理结束。
为什么有了 raft-rs 的 Unstable(代码定义)这一 log buffer,还需要 Entry Cache?因为 Unstable 在日志持久化后会清理掉缓存的日志 entries(代码),而有时仍需要读取这些 Entries,就不得不穿透到 Raft Engine 进行读取。
为什么有了 Raft Engine Block Cache,还需要 Entry Cache?因为 Raft Engine 不是一个 per-region 的存储。在 Raft Engine 中,所有 Region 的日志会都写入同一个文件。因此,同一 Region 的日志可能位于不同的文件 Block 中,批量读取同一个 Region 的日志需要读取多个 Block,空间局部性不好,很可能穿透 Block Cache 引入额外的磁盘 IO。此外,Block Cache 中缓存的是磁盘格式的数据,读取时还需要进行解码,增加了额外的 CPU 开销。
策略:在系统内存资源充裕时,TiKV 会自动回收不再使用的内存。
实现:正如上文提到的,Raft Message 和 Entries 在完成其生命周期后会立即释放。而对于 Entry Cache 中缓存的 entries,Follower 节点在 Apply 阶段完成后立即释放,而 Leader 节点在 Raft tick 中定期检查并清理那些已成功复制到大多数的 entries。
目标:此策略通过在系统负载较轻时自动释放内存,减少无效占用,从而优化内存利用率。
2. 内存淘汰
当且仅当这些 entries 的日志已经持久化,才允许被清理,以确保在未来需要时能够从磁盘重新加载。
策略:当系统内存压力增大,接近高水位线(memory_usage_limit * 0.9)时,如果此时 Entry Cache 的内存消耗超出阈值(evict_cache_on_memory_ratio), TiKV 会触发内存淘汰,清理一些未来仍可能会用到的内存对象。
实现:内存淘汰检查会在两种情况下触发:一是当 Entry Cache 发生写入时,二是 Raft tick 中定期触发。当确定需要内存淘汰时,会清理部分 Entry Cache entries 和积压在 Apply 阶段的 Committed Entries(两者由 Entry Cache 管理,这就是为什么上文的写入流程图中有一步是将 Committed Entries 注册到 Entry Cache)。
目标:此策略通过牺牲部分未来的内存缓存以减少当前的内存消耗,尽管这会增加未来的磁盘 I/O。
3. 拒绝写入
策略:当系统内存压力爆表,超出高水位线时,如果此时 Entry Cache 和 Raft Message 等对象的内存消耗超出阈值(reject_messages_on_memory_ratio),TiKV 会拒绝新的写入请求。(这种情况可能发生在因为磁盘故障导致日志无法持久化,Entre Cache 和积压的 Committed Entries 都无法淘汰时。)
目标:此策略通过过拒绝部分写入请求,防止系统因 OOM 而终止,降低爆炸半径,为系统自动恢复争取时间。
采用一个树状的结构记录各个模块以及其子模块的内存消耗。
监控粒度为节点级别而不是 Region 级别。
在对象创建和销毁的时候分别增加和减少其内存消耗。
周期性地上报监控数据。
pub struct MemoryTrace {
pub id: Id,
trace: AtomicUsize,
children: HashMap<Id, Arc<MemoryTrace>>,
}
pub static ref MEMTRACE_ROOT: Arc<MemoryTrace> = mem_trace!(
raftstore,
[
peers,
applys,
entry_cache,
// ...
raft_messages,
raft_entries
])
/// Memory usage for raft peers fsms.
pub static ref MEMTRACE_PEERS: Arc<MemoryTrace> =
MEMTRACE_ROOT.sub_trace(Id::Name("peers"));
/// ...
/// Heap size trace for received Raft Messages.
pub static ref MEMTRACE_RAFT_MESSAGES: Arc<MemoryTrace> =
MEMTRACE_ROOT.sub_trace(Id::Name("raft_messages"));
/// ...
impl<EK, ER> RaftRouter<EK, ER>
where
EK: KvEngine,
ER: RaftEngine,
{
pub fn send_raft_message(
&self,
msg: RaftMessage,
) -> std::result::Result<(), TrySendError<RaftMessage>>
// ...
let mut heap_size = 0;
for e in msg.get_message().get_entries() {
heap_size += bytes_capacity(&e.data) + bytes_capacity(&e.context);
}
let event = TraceEvent::Add(heap_size);
// ...
MEMTRACE_RAFT_MESSAGES.trace(event);
defer!(if send_failed.get() {
MEMTRACE_RAFT_MESSAGES.trace(TraceEvent::Sub(heap_size));
});
// ...
}
}
impl<'a, EK, ER, T: Transport> PeerFsmDelegate<'a, EK, ER, T>
where
EK: KvEngine,
ER: RaftEngine,
{
fn on_raft_message(&mut self, m: Box<InspectedRaftMessage>) -> Result<()> {
// ...
defer!({
// ...
MEMTRACE_RAFT_MESSAGES.trace(TraceEvent::Sub(heap_size));
if stepped.get() {
unsafe {
// It could be less than exact for entry overwriting.
*memtrace_raft_Entries += heap_size;
MEMTRACE_RAFT_Entries.trace(TraceEvent::Add(heap_size));
}
}
});
// ...
}
self.core.background_worker.spawn_interval_task(
DEFAULT_MEMTRACE_FLUSH_INTERVAL,
move || {
let now = Instant::now();
mem_trace_metrics.flush(now);
},
);
impl<T: Storage> RaftLog<T> {
/// ...
/// Grabs a slice of entries from the raft. Unlike a rust slice pointer, these are
/// returned by value. The result is truncated to the max_size in bytes.
pub fn slice(
&self,
low: u64,
high: u64,
max_size: impl Into<Option<u64>>,
context: GetEntriesContext,
) -> Result<Vec<Entry>> {
// ...
if low < self.unstable.offset {
// ...
match self.store.entries(low, unstable_high, max_size, context)
}
// ...
}
// ...
}
pub fn compact_to(&mut self, mut idx: u64) -> u64 {
if idx > self.persisted + 1 {
// Only the persisted entries can be compacted
idx = self.persisted + 1;
}
// ...
while let Some(cached_entries) = self.trace.pop_front() {
// Do not evict cached entries if not all of them are persisted.
// After PR #16626, it is possible that applying entries are not
// yet fully persisted. Therefore, it should not free these
// entries until they are completely persisted.
if cached_entries.range.start >= idx || cached_entries.range.end > self.persisted + 1 {
self.trace.push_front(cached_entries);
let trace_len = self.trace.len();
let trace_cap = self.trace.capacity();
if trace_len < SHRINK_CACHE_CAPACITY && trace_cap > SHRINK_CACHE_CAPACITY {
self.trace.shrink_to(SHRINK_CACHE_CAPACITY);
}
break;
}
let (_, dangle_size) = cached_entries.take_entries();
// ...
idx = cmp::max(cached_entries.range.end, idx);
}
// ...
let cache_first_idx = self.first_index().unwrap_or(u64::MAX);
// ...
let cache_last_idx = self.cache.back().unwrap().get_index();
// Use `cache_last_idx + 1` to make sure cache can be cleared completely if
// necessary.
let compact_to = (cmp::min(cache_last_idx + 1, idx) - cache_first_idx) as usize;
for e in self.cache.drain(..compact_to) {
// ...
}
// ...
}
TiKV Raft 快照全流程丨TiKV 源码解读(二十二)
文章转载自PingCAP。点击这里阅读原文了解更多。
CNCF概况(幻灯片)
扫描二维码联系我们!
CNCF (Cloud Native Computing Foundation)成立于2015年12月,隶属于Linux Foundation,是非营利性组织。
CNCF(云原生计算基金会)致力于培育和维护一个厂商中立的开源生态系统,来推广云原生技术。我们通过将最前沿的模式民主化,让这些创新为大众所用。请关注CNCF微信公众号。