用mongodb构建延时队列

延时队列(DelayQueue)的使用场景有很多,比如订单类的系统,用户创建订单后一段时间内如果没有付款,那么要把用户的这个订单关闭掉,同时把库存还原回去。解决的方案有很多,一种是用定时任务,定时去扫描符合条件的数据出来进行出来,还有一种就是把这个丢到延时队列里面,等时间到了自动出列之后处理。

由于我们业务场景需要实时的延时队列,也就是必须准时处理,如果通过定时扫描的话,如果时间间隔短,会任务太多处理不过来,如果时间间隔长,会导致中间的有一些延时出列了。之前有调研过一些已有的产品,像twitter的beanstalkd,就是用来处理延时的任务的。不过当时由于在测试环境跑起来了,但是在线上却一直不可用,另外也没有人熟悉这个产品,怕后期带来运维的问题,所以也没有用起来,于是决定自己开发了。google一把,很多人在吹嘘用mongodb替换消息队列(MQ),看起来也很简单,于是我们准备试一下,用来做延时队列。这一试,就定下来了,然后在线上跑了1年多了。

先说一下我们部署mongodb的机器配置,4核CPU,8G内存的VM。你没看错,我们所有数据库都部署在淘宝聚石塔的虚拟机上面。这里不是做广告,如果你要做应用,建议不要用聚石塔,哈哈。我们是做淘宝业务,有“安全性”的限制,所以才用。

之所以用mongodb做“队列”,是因为mongodb有一个叫findAndModify的操作,这个操作是原子性的,也就是你可以修改一条记录的同时把老的记录返回。基于这个操作,我们可以在把一条记录标记为处理中的同时获取到这条数据,这样别人如果同时也是进行这样的处理,由于这个操作具备原子性,你们处理的任务不会重复,所以简单实现了出列的问题。所以简单的, 我们可以这样设计我们的延时队列里面的字段设计如下:

timestamp:出列的时间戳,这样我们可以根据系统当前的时间判断该记录要不要出列

status:状态,用来标记该记录目前是可以出列还是已经出列。0、1就可以搞定了

data:数据,表示你要保存到队列中的数据,出列后根据这个数据来处理任务。

定义好了,出列的操作就是:


//获取当前时间戳
long current = getCurrentTimeMillis();
//0是初始化状态,表示可以出列, 1表示已经出列过了.
DBObject obj = collection.findAndModify({status:0, timestamp:{$lte: current}},{$set:{status: 1}});
//没有命中, 说明当前不需要出列
if(obj == null){
return null;
}
//获取数据, 序列化为二进制的话, 可以兼容各种各样的数据格式
byte[] data = obj.get("data");
//反序列化
return SerializableUtil.deserial(data);

上面是伪代码, 既不是java的也不是javascript的.

是不是很简单,我们的队列搞定了。
如果故事到这里就结束了,那我就不写这篇文章了,呵呵呵呵呵。

由于业务的增长,我们在延时队列里的数据越来越多,处理任务的机器不够用了,不得不剥离成多台机来处理这些任务,这时候作为一个“队列”的挑战才刚刚开始。

由于多台机,你必须要让每台机处理的数据没有重复。上面的findAndModify已经很好的满足需求了是不是?是的,确实满足了。但是findAndModify有个致命的弱点,就是不支持批量findAndModify。由于每天要处理上千万的延时数据,如果全部请求都压到mongodb上,我们的mongodb吃不消,关键我们也不想多加机器。所以问题来了,我们要批量的findAndModify。于是乎,我们顺延这findAndModify的特性,自己弄了一套批量操作的findAndModify,代价是引入分布式锁,在批量find和批量modify的这段时间加分布式锁,这样能保证数据不会被重复处理。由于分布式锁我们用了zookeeper来实现,zookeeper的链路稳定性是我见过最差了,没有之一。时不时客户端就会断开连接,时不时客户端就会session expired。而面对这些问题,zookeeper是没有直接提供接口给你解决的,因为他的接口实在是太底层了,估计写那套client的人以前是搞底层系统开发的,所有问题都要调用方来处理,真不是一般的难用。zookeeper的问题始终是可以解决的,但接下来一个更大的问题是队列的吞吐量(qps)。由于改成了批量处理之后,mongodb平时的负载确实变低了好多。而平时一次出列1000条数据,不会带来拥堵的问题,直到那一天。

没错,那一天就是2013年的天猫双十一,我们系统之前为此做了扩容,机器数量从平时的20台加到了50+台。我们预估当天的数据量会达到平日的6倍。但实际上那天的量,远超我们想象。订单量达到了平时的10倍以上,在双十一前10分钟,我们的延时队列就被冲垮了。我着急了,一看,堵了100+W的数据了。过了一个小时之后,这个堵的数据量达到了800+W。眼看着这么多订单,白花花的就溜走了。直到第二天10点,我们终于解决了这个问题。

解决方案很简单粗暴,就是把批量findAndModify这套方案直接干掉,由于总共有6台任务处理机器,我们直接使用单机的模式,一台机器使用一个队列,把分布式锁抛弃掉。也是用批量的方式,批量出列,处理数据,然后批量删除。效率极高,在后面的几次高峰期都顶住了压力,“顺利”度过了双十一。这种模式也带来一个问题,如果处理数据的机器下线了怎么办?这涉及到了数据迁移的问题,好解决,我们写了个小工具,可以很快的把遗留的数据迁移到另外的机器上。

另外的解决方案是把架构搞成C/S的,做成一个单独的server来访问mongodb,任务处理系统都来订阅这个server。这样也不会有锁的问题,只不过当业务量继续增加,需要扩展服务器的时候,同样的问题还是会来临。上面的单机解决方案看起来是很简单,但实际上效果也是很不错的。当你真正贴近业务的时候,你会发现不是要做一个架构多么牛逼的系统,而是应该做一个能简单维护却能很好解决问题的系统。

如果你想设计延时队列,可以先参考java的延时队列的实现:

java.util.concurrent.DelayQueue

如果可以直接在上面加一层持久层,就可以简单实现延时队列了。

一些总结:能不用锁的地方一定不要用锁,锁会把你的性能耗尽。能不用分布式锁的地方一定不要用,如果你的系统需要使用分布式锁,可以想想有没有其他的简单粗暴的方案,也许会有更好的搜获。分布式系统的一个特点是高并发,如果架上了分布式锁,很可能让你的系统变成了串行处理的系统了,这样就违背分布式系统的初衷了。

附 beanstalkd 项目地址:http://kr.github.io/beanstalkd/