本文注重实战或者实现,不涉及 CAP,略提 ACID。本文适合基础分布式程序员:
1、本文会涉及集群中节点的 failover 和 recover 问题.2、本文会涉及事务及不透明事务的问题.3、本文会提到微博和 tweeter,并引出一个大数据问题. 由于分布式这个话题太大,事务这个话题也太大,我们从一个集群的一个小小节点开始谈起。
集群中存活的节点与同步
分布式系统中,如何判断一个节点(node)是否存活?kafka 这样认为:
1、此节点和 zookeeper 能喊话.(Keep sessions with zookeeper through heartbeats.)2、此节点如果是个从节点,必须能够尽可能忠实地反映主节点的数据变化。也就是说,必须能够在主节点写了新数据后,及时复制这些变化的数据,所谓及时,不能拉下太多哦. 那么,符合上面两个条件的节点就可以认为是存活的,也可以认为是同步的(in-sync).
关于第 1 点,大家对心跳都很熟悉,那么我们可以这样认为某个节点不能和 zookeeper 喊话了:
zookeeper-node:
var timer =
new timer()
.setInterval(10sec)
.onTime(slave-nodes,function(slave-nodes){
slave-nodes.forEach(node -> {
boolean isAlive = node.heartbeatACK(15sec);
if(!isAlive) {
node.numNotAlive += 1;
if(node.numNotAlive >= 3) {
node.declareDeadOrFailed();
slave-nodes.remove(node);
// 回调也可 leader-node-app.notifyNodeDeadOrFailed(node)
}
}else
node.numNotAlive = 0;
});
});
timer.run();
// 你可以回调也可以像下面这样简单的计时判断
leader-node-app:
var timer =
new timer()
.setInterval(10sec)
.onTime(slave-nodes,function(slave-nodes){
slave-nodes.forEach(node -> {
if(node.isDeadOrFailed) {
//node 不能和 zookeeper 喊话了
}
});
});
timer.run();
关于第二点,要稍微复杂点了,怎么搞呢?来这么分析:
1:数据 messages.:2:操作 op-log.3:偏移 position/offset.
// 1. 先考虑 messages
// 2. 再考虑 log 的 postion 或者 offset
// 3. 考虑 msg 和 off 都记录在同源数据库或者存储设备上.(database or storage-device.)
var timer =
new timer()
.setInterval(10sec)
.onTime(slave-nodes,function(nodes){
var core-of-cpu = 8;
// 嫌慢就并发呗 mod hash go!
nodes.groupParallel(core-of-cpu)
.forEach(node -> {
boolean nodeSucked = false;
if(node.ackTimeDiff > 30sec) {
//30 秒内没有回复,node 卡住了
nodeSucked = true;
}
if(node.logOffsetDiff > 100) {
//node 复制跟不上了,差距超过 100 条数据
nodeSucked = true;
}
if(nodeSucked) {
// 总之 node“死”掉了,其实到底死没死,谁知道呢?network-error 在分布式系统中或者节点失败这个事情是正常现象.
node.declareDeadOrFailed();
// 不和你玩啦,集群不要你了
nodes.remove(node);
// 该怎么处理呢,抛个事件吧.
fire-event-NodeDeadOrFailed(node);
}
});
});
timer.run();
上面的节点的状态管理一般由 zookeeper 来做,leader 或者 master 节点也会维护那么点状态。
那么应用中的 leader 或者 master 节点,只需要从 zookeeper 拉状态就可以,同时,上面的实现是不是一定最佳呢?不是的,而且多数操作可以合起来,但为了描述节点是否存活这个事儿,咱们这么写没啥问题。
节点死掉、失败、不同步了,咋处理呢?
好嘛,终于说到 failover 和 recover 了,那 failover 比较简单,因为还有其它的 slave 节点在,不影响数据读取。
1:同时多个 slave 节点失败了?没有 100% 的可用性. 数据中心和机房瘫痪、网络电缆切断、hacker 入侵删了你的根,总之你 rp 爆表了.2:如果主节点失败了,那 master-master 不行嘛?keep-alived 或者 LVS 或者你自己写 failover 吧.
高可用架构(HA)又是个大件儿了,此文不展开了。我们来关注下 recover 方面的东西,这里把视野打开点,不仅关注 slave 节点重启后追 log 来同步数据,我们看下在实际应用中,数据请求(包括读、写、更新)失败怎么办?
大家可能都会说,重试(retry)呗、重放(replay)呗或者干脆不管了呗!行,都行,这些都是策略,但具体怎么个搞法,你真的清楚了?
一个 bigdata 问题
我们先摆个探讨的背景:
问题:消息流,比如微博的微博(真绕),源源不断地流进我们的应用中,要处理这些消息,有个需求是这样的:Reach is the number of unique people exposed to a URL on Twitter.
那么,统计一下 3 小时内的本条微博(url)的 reach 总数。
怎么解决呢?
把某时间段内转发过某条微博(url)的人拉出来,把这些人的粉丝拉出来,去掉重复的人,然后求总数,就是要求的 reach.
为了简单,我们忽略掉日期,先看看这个方法行不行:
/** ———————————
* 1. 求出转发微博 (url) 的大 V.
* __________________________________*/
方法:getUrlToTweetersMap(String url_id)
SQL:/* 数据库 A,表 url_user 存储了转发某 url 的 user */
SELECT url_user.user_id as tweeter_id
FROM url_user
WHERE url_user.url_id = ${url_id}
返回:[user_1,…,user_m]
.
/** ———————————
* 2. 求出大 V 的粉丝
* __________________________________*/
方法 : getFollowers(String tweeter_id);
SQL : /* 数据库 B */
SELECT users.id as user_id
FROM users
WHERE users.followee_id = ${tweeter_id}
返回:tweeter 的粉丝
.
/** ———————————
* 3. 求出 Reach
* __________________________________*/
var url = queryArgs.getUrl();
var tweeters = getUrlToTweetersMap();
var result = new HashMap<String,Integer>();
tweeters.forEach(t -> {
// 你可以批量 in + 并发读来优化下面方法的性能
var followers = getFollowers(t.tweeter_id);
followers.forEach(f -> {
//hash 去重
result.put(f.user_id,1);
});
});
//Reach
return result.size();
顶呱呱,无论如何,求出了 Reach 啊!
其实这又引出了一个很重要的问题,也是很多大谈框架、设计、模式却往往忽视的问题:性能和数据库建模的关系。
1:数据量有多大?不知道读者有木有对这个问题的数据库 I / O 有点想法,或者虎躯一震呢?Computing reach is too intense for a single machine – it can require thousands of database calls and tens of millions of tuples. 在上面的数据库设计中避免了 JOIN,为了提高求大 V 粉丝的性能,可以将一批大 V 作为 batch/bulk,然后多个 batch 并发读,誓死搞死数据库。这里将微博到转发者表所在的库,与粉丝库分离,如果数据更大怎么办?库再分表 …
OK,假设你已经非常熟悉传统关系型数据库的分库分表及数据路由(读路径的聚合、写路径的分发)、或者你对于 sharding 技术也很熟悉、或者你良好的结合了 HBase 的横向扩展能力并有一致性策略来解决其二级索引问题. 总之,存储和读取的问题假设你已经解决了,那么分布式计算呢?
2:微博这种应用,人与人之间的关系成图状(网),你怎么建模存储?而不仅仅对应这个问题,比如:某人的好友的好友可能和某人有几分相熟?看看用 storm 怎么来解决分布式计算,并提供流式计算的能力:
// url 到大 V -> 数据库 1
TridentState urlToTweeters =
topology.newStaticState(getUrlToTweetersState());
// 大 V 到粉丝 -> 数据库 2
TridentState tweetersToFollowers =
topology.newStaticState(getTweeterToFollowersState());
topology.newDRPCStream(“reach”)
.stateQuery(urlToTweeters, new Fields(“args”), new MapGet(), new Fields(“tweeters”))
.each(new Fields(“tweeters”), new ExpandList(), new Fields(“tweeter”))
.shuffle() /* 大 V 的粉丝很多,所以需要分布式处理 */
.stateQuery(tweetersToFollowers, new Fields(“tweeter”), new MapGet(), new Fields(“followers”))
.parallelismHint(200) /* 粉丝很多,所以需要高并发 */
.each(new Fields(“followers”), new ExpandList(), new Fields(“follower”))
.groupBy(new Fields(“follower”))
.aggregate(new One(), new Fields(“one”)) /* 去重 */
.parallelismHint(20)
.aggregate(new Count(), new Fields(“reach”)); /* 计算 reach 数 */
最多处理一次(At most once)
回到主题,引出上面的例子,一是为了引出一个有关分布式(存储 + 计算)的问题,二是透漏这么点意思:码农,就应该关注设计和实现的东西,比如 Jay Kreps 是如何发明 Kafka 这个轮子的 : ]
如果你还是码农级别,咱来务点实吧,前面我们说到 recover,节点恢复的问题,那么我们恢复几个东西?
基本的:
1、节点状态 2、节点数据
本篇从数据上来讨论下这个问题,为使问题再简单点,我们考虑写数据的场景,如果我们用 write-ahead-log 的方式来保证数据复制和一致性,那么我们会怎么处理一致性问题呢?
1:主节点有新数据写入.2:从节点追 log, 准备复制这批新数据。从节点做两件事:(1). 把数据的 id 偏移写入 log;(2). 正要处理数据本身,从节点挂了。那么根据上文的节点存活条件,这个从节点挂了这件事被探测到了,从节点由维护人员手动或者其自己恢复了,那么在加入集群和小伙伴们继续玩耍之前,它要同步自己的状态和数据。问题来了:
如果根据 log 内的数据偏移来同步数据,那么,因为这个节点在处理数据之前就把偏移写好了,可是那批数据 lost-datas 没有得到处理,如果追 log 之后的数据来同步,那么那批数据 lost-datas 就丢了。在这种情况下,就叫作数据最多处理一次,也就是说数据会丢失。
最少处理一次(At least once)
好吧,丢失数据不能容忍,那么我们换种方式来处理:
1:主节点有新数据写入.2:从节点追 log, 准备复制这批新数据。从节点做两件事:(1). 先处理数据;(2). 正要把数据的 id 偏移写入 log,从节点挂了。问题又来了:
如果从节点追 log 来同步数据,那么因为那批数据 duplicated-datas 被处理过了,而数据偏移没有反映到 log 中,如果这样追,会导致这批数据重复。这种场景,从语义上来讲,就是数据最少处理一次,意味着数据处理会重复。
仅处理一次(Exactly once)
Transaction
好吧,数据重复也不能容忍?要求挺高啊。大家都追求的强一致性保证(这里是最终一致性),怎么来搞呢?换句话说,在更新数据的时候,事务能力如何保障呢?假设一批数据如下:
// 新到数据
{
transactionId:4
urlId:99
reach:5
}
现在要更新这批数据到库里或者 log 里,那么原来的情况是:
// 老数据
{
transactionId:3
urlId:99
reach:3
}
如果说可以保证如下三点:
1、事务 ID 的生成是强有序的.(隔离性,串行)2、同一个事务 ID 对应的一批数据相同.(幂等性,多次操作一个结果)3、单条数据会且仅会出现在某批数据中.(一致性,无遗漏无重复)
那么,放心大胆的更新好了:
// 更新后数据
{
transactionId:4
urlId:99
//3 + 5 = 8
reach:8
}
注意到这个更新是 ID 偏移和数据一起更新的,那么这个操作靠什么来保证:原子性。你的数据库不提供原子性?后文略有提及。
这里是更新成功了。如果更新的时候,节点挂了,那么库里或者 log 里的 id 偏移不写,数据也不处理,等节点恢复,就可以放心去同步,然后加入集群玩耍了。
所以说,要保证数据仅处理一次,还是挺困难的吧?
上面的保障“仅处理一次”这个语义的实现有什么问题呢?
性能问题。
这里已经使用了 batch 策略来减少到库或磁盘的 Round-Trip Time,那么这里的性能问题是什么呢?考虑一下,采用 master-master 架构来保证主节点的可用性,但是一个主节点失败了,到另一个主节点主持工作,是需要时间的。假设从节点正在同步,啪!主节点挂了!因为要保证仅处理一次的语义,所以原子性发挥作用,失败,回滚,然后从主节点拉失败的数据(你不能就近更新,因为这批数据可能已经变化了,或者你根本没缓存本批数据),结果是什么呢?
老主节点挂了,新的主节点还没启动,所以这次事务就卡在这里,直到数据同步的源——主节点可以响应请求。
如果不考虑性能,就此作罢,这也不是什么大事。
你似乎意犹未尽?来吧,看看“银弹”是什么?
Opaque-Transaction
现在,我们来追求这样一种效果:
某条数据在一批数据中(这批数据对应着一个事务),很可能会失败,但是它会在另一批数据中成功。换句话说,一批数据的事务 ID 一定相同。
来看看例子吧,老数据不变,只是多了个字段:prevReach。
// 老数据
{
transactionId:3
urlId:99
// 注意这里多了个字段,表示之前的 reach 的值
prevReach:2
reach:3
}
// 新到数据
{
transactionId:4
urlId:99
reach:5
}
这种情况,新事务的 ID 更大、更靠后,表明新事务可以执行,还等什么,直接更新,更新后数据如下:
// 新到数据
{
transactionId:4
urlId:99
// 注意这里更新为之前的值
prevReach:3
//3 + 5 = 8
reach:8
}
现在来看下另外的情况:
// 老数据
{
transactionId:3
urlId:99
prevReach:2
reach:3
}
// 新到数据
{
// 注意事务 ID 为 3,和老数据中的事务 ID 相同
transactionId:3
urlId:99
reach:5
}
这种情况怎么处理?是跳过吗?因为新数据的事务 ID 和库里或者 log 里的事务 ID 相同,按事务要求这次数据应该已经处理过了,跳过?不,这种事不能靠猜的,想想我们有的几个性质,其中关键一点就是:
给定一批数据,它们所属的事务 ID 相同。仔细体会下,上面那句话和下面这句话的差别:给定一个事务 ID,任何时候,其所关联的那批数据相同。
我们应该这么做,考虑到新到数据的事务 ID 和存储中的事务 ID 一致,所以这批数据可能被分别或者异步处理了,但是,这批数据对应的事务 ID 永远是同一个,那么,即使这批数据中的 A 部分先处理了,由于大家都是一个事务 ID,那么 A 部分的前值是可靠的。
所以,我们将依靠 prevReach 而不是 Reach 的值来更新:
// 更新后数据
{
transactionId:3
urlId:99
// 这个值不变
prevReach:2
//2 + 5 = 7
reach:7
}
你发现了什么呢?不同的事务 ID,导致了不同的值:
1:当事务 ID 为 4,大于存储中的事务 ID3,Reach 更新为 3 +5 = 8.2:当事务 ID 为 3,等于存储中的事务 ID3,Reach 更新为 2 +5 = 7. 这就是 Opaque Transaction.
这种事务能力是最强的了,可以保证事务异步提交。所以不用担心被卡住了,如果说集群中:
Transaction:
数据是分批处理的,每个事务 ID 对应一批确定、相同的数据. 保证事务 ID 的产生是强有序的. 保证分批的数据不重复、不遗漏. 如果事务失败,数据源丢失,那么后续事务就卡住直到数据源恢复.
Opaque-Transaction:
数据是分批处理的,每批数据有确定而唯一的事务 ID. 保证事务 ID 的产生是强有序的. 保证分批的数据不重复、不遗漏. 如果事务失败,数据源丢失,不影响后续事务,除非后续事务的数据源也丢了.
其实这个全局 ID 的设计也是门艺术:
冗余关联表的 ID,以减少 join,做到 O(1)取 ID. 冗余日期(long 型)字段,以避免 order by. 冗余过滤字段,以避免无二级索引(HBase)的尴尬. 存储 mod-hash 的值,以方便分库、分表后,应用层的数据路由书写. 这个内容也太多,话题也太大,就不在此展开了。
你现在知道 twitter 的 snowflake 生成全局唯一且有序的 ID 的重要性了。
两阶段提交
现在用 zookeeper 来做两阶段提交已经是入门级技术,所以也不展开了。
如果你的数据库不支持原子操作,那么考虑两阶段提交吧。
如果想免费学习 Java 工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty 源码分析的朋友可以加我的 Java 进阶群:478030634,群里有阿里大牛直播讲解技术,以及 Java 大型互联网技术的视频免费分享给大家。
结语
To be continued.