背景
目前在eBay,我们最大的HDFS集群运行着5000多台DataNode节点,存储了近500PB的数据,NameNode节点每天向用户提供超过55亿次的数据访问。随着集群规模的不断增长,NameNode逐渐成为了整个HDFS集群的瓶颈。
为了更好地支撑如此海量的数据和服务,打破集群的瓶颈,我们使用了HDFS Federation来对NameNode进行扩展。与很多公司一样,eBay也使用ViewFs来访问和管理扩展后的NameNode。
ViewFs使用配置在客户端的挂载表来确定目录所在的NameNode,所以添加新NameNode和搬迁目录,都需要更改客户端的配置文件让这些改动生效。数据的搬迁需要用户的介入,单单与客户沟通就需要耗费大量的时间和精力,更不用提搬迁过后,所有用户的配置更新和服务重启。因此,在2021年,我们把目光瞄向了HDFS Router-based Federation。
HDFS Router-based Federation (RBF)
HDFS Router-based Federation是一个起源于Hadoop开源社区的访问和管理HDFS Federation的解决方案。它在客户端和NameNode之间添加了一个代理层-Router,将原本保存在客户端的挂载表转移到了服务端,存储在State Store中。用户不再直接跟NameNode打交道,转而访问Router,Router根据服务端的挂载信息将访问转发到正确的NameNode。任何NameNode的变动和挂载表的更新都被Router隐藏了起来,添加新的NameNode和迁移目录对客户端变得完全透明。
(点击可查看大图)
想必大家已经对RBF耳熟能详了,这里就不做展开了。接下来我们来聊一聊eBay对于RBF做了哪些功能和性能上的改进,以及我们的数据搬迁方案。
1
功能的改进
IP的穿透
客户端的IP地址,对于NameNode来说十分重要。Block locality的选择,以及audit log等等都需要客户端的IP地址。通常情况下,NameNode是通过网络连接的远端地址拿到客户端IP的,由于引入了Router,这个IP地址实际上变成了Router的地址。社区有相关的JIRA ticket解决这个问题,使用CallerContext保存客户端的IP地址,但我们已经使用CallerContext传输其他的数据,解析起来很麻烦而且容易出问题,所以选择了向RPC Header增加新的可选条目,来保存真实的客户端IP。如果这个条目不存在,NameNode还是使用老的方式取得客户端的IP。
同时,我们将Router部署在LB之后,使得Router的扩展对用户完全透明。但这样子也带来了另外的问题,就是LB将客户端的IP地址屏蔽掉了,Router看到的客户端IP其实是LB的地址。我们也是利用这个新加的RPC Header来解决这个问题,在客户端发送RPC请求的时候将IP地址传入到Header里,使得真实的客户端IP能被发送到Router和NameNode。
ViewFs兼容RBF
在eBay,绝大多数用户都是使用ViewFs访问HDFS的,这些路径广泛存在于客户的脚本,配置文件中,更改起来十分麻烦。所以,为了减少用户的改动,我们需要ViewFs去支持和兼容RBF。
ViewFs依赖配置在本地的挂载表去为路径寻找对应的NameNode,我们改进的思路,就是将ViewFs的根目录挂载到Router,这样使用ViewFs对NameNode的访问,会全部发送到Router。客户端只需要添加一条新的挂载设置,便可以实现ViewFs对RBF的兼容。
(点击可查看大图)
滚动升级的支持
客户端在提交Job到YARN集群的时候,会先去NameNode申请Token,这些Token会被ResourceManager分发给NodeManager,供给Container使用去访问HDFS集群。具体申请NameNode还是Router的token,是由客户端决定的。在eBay的场景中,客户端申请的都是NameNode的Token。在YARN集群升级的过程中,会出现部分Container访问Router,部分访问NameNode的场景,如果Container只用NameNode的Token去访问,就会出现错误。
为了解决这个问题,我们在ResourceManager中实现了自动补充Token的功能。ResourceManager会在Job提交的时候,检查Context中的Token,如果发现缺失,就会申请相应的Token,补充进Context。这样无论访问的是NameNode还是Router,都不再会因为缺少Token发生错误了。
RetryCache
由于我们部署了多个Router,客户端可能通过任何一个Router去访问NameNode,这也就意味着,客户端一旦因为超时发生重试,NameNode所获取到的ClientId和CallId都会是不同的,这也意味着RetryCache没有办法发挥作用,可能会导致数据丢失。
这个问题在社区的JIRA ticket https://issues.apache.org/jira/browse/HDFS-15079 中得到了解决,使用的是CallContext来保存客户端的ClientId和CallId。但因为前述的原因,我们并没有选择这种方式。我们选择了修改RPC Header,来保存客户端的ClientId和CallId,再修改getClientId和getCallId这些方法,让NameNode可以拿到正确的客户端id,使RetryCache可以正常的发挥作用,避免重试导致的数据丢失。
2
性能的优化
解决Connection瓶颈
在最初使用NNloadGeneratorMR做性能测试的时候,我们发现NameNode的ops始终不高。通过分析Heap Dump和jstack,我们发现Router向同一个NameNode发送RPC请求的时候,使用的Connection对象只有一个,所有的发送请求都卡在了这个Connection的synchronized锁上。
(点击可查看大图)
实际上,Router有一个ConnectionPool的数据结构,对应每个NameNode,Router都有一个独立的连接池。但无论在这个连接池中申请多少个连接,底层对应的Connection对象只有一个,这就造成了严重的性能瓶颈。
(点击可查看大图)
社区的ticket https://issues.apache.org/jira/browse/HADOOP-13144 和https://issues.apache.org/jira/browse/HDFS-13274
也都有在尝试去改进这个问题。
我们选择了给ConnectionId添加一个uuid,Router的ConnectionPool在生成新的连接的时候,使用这个uuid创建Client的Connection。这样可以保证ConnectionPool的Connection和Client的Connection一一对应,解决了单一Connection造成的性能瓶颈。在性能测试中,单台Router每条的操作数相比之前提高了200%。
(点击可查看大图)
异步化和隔离性
在eBay的各个Namespace中,数据和访问分布不均,有的NameNode元数据规模已经达到了12亿以上,每天的访问量也十分巨大,性能相较其他NameNode也差了很多。因为Router是共享的,整个Router的访问很有可能被性能差的NameNode影响到,从而影响到其他NameNode的访问。
为此我们做了专门的测试,我们在一台NameNode上提供延迟的访问,在另外一台NameNode上提供正常的访问,用两个NNloadGeneratorMR通过Router分别去访问这两台NameNode。结果在测试开始大约5分钟之后,提供正常访问的NameNode的ops也下降到几百。
(点击可查看大图)
这是因为,大部分Router的Handler都被慢NameNode的访问占用了,无法提供多余的Handler给其他NameNode使用。社区也有一个ticket https://issues.apache.org/jira/browse/HDFS-14090 尝试解决这个问题,这个ticket允许用户为每一个Namespace配置可用的Router Handler的数量,超过可用的额度就需要等待。
但这个实现的问题在于,它判断可用额度的地方在RouterRpcClient,这个时候访问已经取得了Router的Handler,等待的时候实际上也占用了这个Handler。所以,慢NameNode的访问仍然有可能用尽Router的Handler,只不过在最多1秒之后,Handler就会被释放。但这对于我们来说,并不是一个好的选择。
最后,我们选择了在Router里实现异步的RPC,让Router的Handler尽快得到释放。大致的思想就是,在Connection发送完对NameNode的请求之后立即返回,在NameNode的访问结果返回之后,通过Callback的方式,把结果返回给Router处理,进而返回给客户端。这个实现,类似于异步的EditLog,但是要复杂很多,尤其是在create和rename2这类要多次调用NameNode的方法里。需要注意的是,在异步调用里,解决好正常调用和异步调用处理结果的race condition,避免NameNode返回的结果被处理和返回两次。
(点击可查看大图)
经过RPC异步化的改进,单台Router的每秒处理能力相比之前又提升了65%。
(点击可查看大图)
同时,为了更好的隔离性,我们还对Client和它的sendParamsExecutor做了改进。
通常情况下,一个Router只有一个Client实例,所有从这个Router发出的RPC请求都共享这个实例。我们不能很好地控制面向每个NameNode的并发访问数量,并且sendParamsExecutor也很有可能被慢NameNode的访问挤满。
我们的解决办法,是为Router创建多个Client实例,将其和Namespace一一对应,并且为每个Client实例创建单独的sendParamsExecutor。这样做一方面方便我们控制Router对每个Namespace的并发访问量,另一方面sendParamsExecutor的使用也互不影响,保证了Router对每个Namespace的访问质量。
这些改进,结合异步RPC,极大的提高了Router内部的隔离型。下面这张图展示了改进后的测试结果。我们重复跑了三次测试,每次NameNode的ops都没有受到影响,始终保持在35000/s以上。
(点击可查看大图)
3
数据搬迁
在数据搬迁方面,我们使用的是小米贡献给开源社区的解决方案https://issues.apache.org/jira/browse/HDFS-15294。
在这个方案里,整个数据的搬迁被分为一轮初始的数据复制和多轮增量的复制,直到数据的增量足够小,才收回写权限,进行最后一轮的数据复制。初始的复制依赖于HDFS Snapshot,而增量复制依赖的是Snapshot Diff。
在此基础上,我们也做了一些改进。
为了保证数据的完整性,我们在数据搬迁结束后,会对源目录和目标目录的文件数量和大小进行对比,只有对比结果完全一致,我们才会进行后续的更新挂载点等操作。同时,为了减少收回写权限对SLA Job的影响,我们为最终的数据搬迁实现了时间窗,通过将时间窗定义在空闲时间,来避免对SLA Job的影响。
在最初进行数据搬迁的时候,任务总会在对比结果阶段发生文件大小不匹配的错误。通过检查,我们发现是Snapshot在处理仍在写的文件时会有问题。具体的表现是如果一个文件在我们创建Snapshot时已经存在,并且还在写,那么Snapshot里这个文件的长度是会随着文件的写入一直变化的,如果在创建下一个Snapshot前这个文件被关闭了,那么下一次的Snapshot Diff中将不存在这个文件的身影,就会导致数据的丢失。这也是为什么对比文件大小时会出现不一致的情况。
这个问题已经在https://issues.apache.org/jira/browse/HDFS-11402
得到了解决,在配置文件中添加dfs.namenode.snapshot.capture.openfiles为true,NameNode可以记录文件在创建Snapshot时的瞬时长度,Snapshot Diff也就能记录下这些文件的变化了。
在开源的方案中,使用了distcp进行数据复制,复制效率不高,并且在复制过程中,需要额外占用与搬迁数据大小相同的存储空间,这对于动辄搬迁数十PB的我们来说,几乎是不可能接受的。所以,我们选择了使用FastCopy进行数据搬迁,将FastCopy和distcp进行集成,用FastCopy替换掉distcp里文件复制的部分,并且我们改造了DataNode计算空间使用的方式,通过获取Block文件的Hardlink数,避免FastCopy的数据被重复计算,使我们搬迁海量数据成为可能。
目前这套搬迁方案已经成功的在eBay使用,帮助我们搬迁了数百PB的数据。其中,最大的一次数据搬迁,涉及到100PB的文件,我们在15个小时之内顺利完成,影响用户写入的时间控制在了15分钟以内。
END