用mongodb构建延时队列

延时队列(DelayQueue)的使用场景有很多,比如订单类的系统,用户创建订单后一段时间内如果没有付款,那么要把用户的这个订单关闭掉,同时把库存还原回去。解决的方案有很多,一种是用定时任务,定时去扫描符合条件的数据出来进行出来,还有一种就是把这个丢到延时队列里面,等时间到了自动出列之后处理。

由于我们业务场景需要实时的延时队列,也就是必须准时处理,如果通过定时扫描的话,如果时间间隔短,会任务太多处理不过来,如果时间间隔长,会导致中间的有一些延时出列了。之前有调研过一些已有的产品,像twitter的beanstalkd,就是用来处理延时的任务的。不过当时由于在测试环境跑起来了,但是在线上却一直不可用,另外也没有人熟悉这个产品,怕后期带来运维的问题,所以也没有用起来,于是决定自己开发了。google一把,很多人在吹嘘用mongodb替换消息队列(MQ),看起来也很简单,于是我们准备试一下,用来做延时队列。这一试,就定下来了,然后在线上跑了1年多了。

先说一下我们部署mongodb的机器配置,4核CPU,8G内存的VM。你没看错,我们所有数据库都部署在淘宝聚石塔的虚拟机上面。这里不是做广告,如果你要做应用,建议不要用聚石塔,哈哈。我们是做淘宝业务,有“安全性”的限制,所以才用。

之所以用mongodb做“队列”,是因为mongodb有一个叫findAndModify的操作,这个操作是原子性的,也就是你可以修改一条记录的同时把老的记录返回。基于这个操作,我们可以在把一条记录标记为处理中的同时获取到这条数据,这样别人如果同时也是进行这样的处理,由于这个操作具备原子性,你们处理的任务不会重复,所以简单实现了出列的问题。所以简单的, 我们可以这样设计我们的延时队列里面的字段设计如下:

timestamp:出列的时间戳,这样我们可以根据系统当前的时间判断该记录要不要出列

status:状态,用来标记该记录目前是可以出列还是已经出列。0、1就可以搞定了

data:数据,表示你要保存到队列中的数据,出列后根据这个数据来处理任务。

定义好了,出列的操作就是:


//获取当前时间戳
long current = getCurrentTimeMillis();
//0是初始化状态,表示可以出列, 1表示已经出列过了.
DBObject obj = collection.findAndModify({status:0, timestamp:{$lte: current}},{$set:{status: 1}});
//没有命中, 说明当前不需要出列
if(obj == null){
return null;
}
//获取数据, 序列化为二进制的话, 可以兼容各种各样的数据格式
byte[] data = obj.get("data");
//反序列化
return SerializableUtil.deserial(data);

上面是伪代码, 既不是java的也不是javascript的.

是不是很简单,我们的队列搞定了。
如果故事到这里就结束了,那我就不写这篇文章了,呵呵呵呵呵。

由于业务的增长,我们在延时队列里的数据越来越多,处理任务的机器不够用了,不得不剥离成多台机来处理这些任务,这时候作为一个“队列”的挑战才刚刚开始。

由于多台机,你必须要让每台机处理的数据没有重复。上面的findAndModify已经很好的满足需求了是不是?是的,确实满足了。但是findAndModify有个致命的弱点,就是不支持批量findAndModify。由于每天要处理上千万的延时数据,如果全部请求都压到mongodb上,我们的mongodb吃不消,关键我们也不想多加机器。所以问题来了,我们要批量的findAndModify。于是乎,我们顺延这findAndModify的特性,自己弄了一套批量操作的findAndModify,代价是引入分布式锁,在批量find和批量modify的这段时间加分布式锁,这样能保证数据不会被重复处理。由于分布式锁我们用了zookeeper来实现,zookeeper的链路稳定性是我见过最差了,没有之一。时不时客户端就会断开连接,时不时客户端就会session expired。而面对这些问题,zookeeper是没有直接提供接口给你解决的,因为他的接口实在是太底层了,估计写那套client的人以前是搞底层系统开发的,所有问题都要调用方来处理,真不是一般的难用。zookeeper的问题始终是可以解决的,但接下来一个更大的问题是队列的吞吐量(qps)。由于改成了批量处理之后,mongodb平时的负载确实变低了好多。而平时一次出列1000条数据,不会带来拥堵的问题,直到那一天。

没错,那一天就是2013年的天猫双十一,我们系统之前为此做了扩容,机器数量从平时的20台加到了50+台。我们预估当天的数据量会达到平日的6倍。但实际上那天的量,远超我们想象。订单量达到了平时的10倍以上,在双十一前10分钟,我们的延时队列就被冲垮了。我着急了,一看,堵了100+W的数据了。过了一个小时之后,这个堵的数据量达到了800+W。眼看着这么多订单,白花花的就溜走了。直到第二天10点,我们终于解决了这个问题。

解决方案很简单粗暴,就是把批量findAndModify这套方案直接干掉,由于总共有6台任务处理机器,我们直接使用单机的模式,一台机器使用一个队列,把分布式锁抛弃掉。也是用批量的方式,批量出列,处理数据,然后批量删除。效率极高,在后面的几次高峰期都顶住了压力,“顺利”度过了双十一。这种模式也带来一个问题,如果处理数据的机器下线了怎么办?这涉及到了数据迁移的问题,好解决,我们写了个小工具,可以很快的把遗留的数据迁移到另外的机器上。

另外的解决方案是把架构搞成C/S的,做成一个单独的server来访问mongodb,任务处理系统都来订阅这个server。这样也不会有锁的问题,只不过当业务量继续增加,需要扩展服务器的时候,同样的问题还是会来临。上面的单机解决方案看起来是很简单,但实际上效果也是很不错的。当你真正贴近业务的时候,你会发现不是要做一个架构多么牛逼的系统,而是应该做一个能简单维护却能很好解决问题的系统。

如果你想设计延时队列,可以先参考java的延时队列的实现:

java.util.concurrent.DelayQueue

如果可以直接在上面加一层持久层,就可以简单实现延时队列了。

一些总结:能不用锁的地方一定不要用锁,锁会把你的性能耗尽。能不用分布式锁的地方一定不要用,如果你的系统需要使用分布式锁,可以想想有没有其他的简单粗暴的方案,也许会有更好的搜获。分布式系统的一个特点是高并发,如果架上了分布式锁,很可能让你的系统变成了串行处理的系统了,这样就违背分布式系统的初衷了。

附 beanstalkd 项目地址:http://kr.github.io/beanstalkd/

 

mongodb 修改器学习

1. 执行update相关操作都可以使用修改器.
2. 修改器是为了不整个大文档来替换修改, 而是修改局部
3. 详情:
3.1 $inc (自增)
为增加某个字段的数值
例子:
原文档如下:

{
 "_id" : ObjectId("4b253b067525f35f94b60a31"),
 "url" : "www.example.com",
 "pageviews" : 52
}

执行修改器 $inc
db.test.update({url:’www.example.com’},{$inc:{pageviews: 1}})
表示为pageviews这个 key 的值 加上1. 后面是对应要加上的值.
注意, $inc中, 字段对应的值只能是 数字. 但是可以是 正数也可以是负数. 如果是负数, 就是执行一个减法操作.
3.2 $set (设置)

修改某个字段的值为指定的值. 这里的修改是与字段类型无关的. 你可以把一个string修改成long或者一个对象都行.
例子:
原文档如下:

{
"_id" : ObjectId("4b253b067525f35f94b60a31"),
"name" : "joe",
"age" : 30,
"sex" : "male",
"location" : "Wisconsin"
}

修改性别, 变成false (String -> Boolean)
执行修改器 $set

db.test.update({name:'joe'}, {$set:{sex:false}})

表示将 name为joe的这个对象的 sex修改为false
3.3 $unset (删除字段)
这个修改器是为了删除某个指定的字段及其值.
例子:
原文档如下:

{
 "_id" : ObjectId("4f9105377522000000006be0"), 
 "howlong" : 6, 
 "lovePerson" : "zhanying", 
 "person" : "jiacheo"
}

把howlong删除
执行修改器 $unset

db.test.update({person:'jiacheo'},{$unset:{howlong:1}})

后面的1没具体意义, 相当于确认
3.4 $push (把一个数据放到数组里面)
例子:
原文档如下:

{
 "_id" : ObjectId("4f9105377522000000006be0"), 
 "lovePerson" : "zhanying", 
 "person" : "jiacheo", 
 "supporters" : [ "gongjin" ]
}

执行修改器, 增加一个人到supporters里面

db.test.update({person:'jiacheo'},{$push:{supporters:'ziming'}})

这时候变成:

{ 
 "_id" : ObjectId("4f9105377522000000006be0"),
 "lovePerson" : "zhanying",
 "person" : "jiacheo",
 "supporters" : [
   "gongjin", "ziming" 
 ]
}

若果再执行一遍上面的代码, 那么数组还是会加上同样的名称, 如果你需要排同, 可以用后面这个修改器
3.5 $addToSet (将一个对象放到集合里面. 集合里面不会出现两个重复的一模一样的对象)
例子:
原文档如下:

{
"_id" : ObjectId("4f9105377522000000006be0"),
"lovePerson" : "zhanying",
"person" : "jiacheo",
"supporters" : [
"gongjin",
"ziming"
]
}

执行修改器 $addToSet

db.test.update({person:'jiacheo'},{$addToSet:{supporters:'ziming'}})

文档内容没有改变..
以上两个修改器还可以配合 $each, 把一个数据里的数据append到另一个数组上去(后者会排重)
比如:

db.test.update({person:'jiacheo'}, {$addToSet:{supporters:{$each: ["gongjin","ziming","liuxun","feidu"] }}})

执行后, 结果如下:

{
 "_id" : ObjectId("4f9105377522000000006be0"),
 "lovePerson" : "zhanying",
 "person" : "jiacheo",
 "supporters" : ["gongjin", "ziming", "feidu", "liuxun" 
 ]
}

3.6 $pop 队列出列 (按数组索引顺序出列(正序或者倒序))
例子:
原文档如下:

{
 "_id" : ObjectId("4f9105377522000000006be0"), 
 "lovePerson" : "zhanying", 
 "person" : "jiacheo",
 "supporters" : ["gongjin", "ziming", "feidu", "liuxun" 
 ]
}

执行修改器 $pop

db.test.update({person:'jiacheo'},{$pop:{supporters:1}})

修改后变为

{
 "_id" : ObjectId("4f9105377522000000006be0"),
 "lovePerson" : "zhanying",
 "person" : "jiacheo",
 "supporters" : [ "gongjin", "ziming", "feidu" ]
}

value 值为1 表示数组索引靠后的先出列, 也就是后进先出, 相当于出栈的概念. value值为-1表示数组索引考前的先出列, 也就是先进先出, 相当于一个FIFO的队列
3.7 $pull (符合条件的元素出列)
与上一个相比, 这个修改器会把符合条件的元素从数组中删除, 而不是简单的按照索引值的大小来处理.
原文档如下:

{
 "_id" : ObjectId("4f9105377522000000006be0"),
 "lovePerson" : "zhanying",
 "person" : "jiacheo",
 "supporters" : [ "gongjin", "ziming", "feidu" ]
}

执行修改器 $pull

db.test.update({person:'jiacheo'},{$pull:{supporters: 'gongjin'}})

修改后:

{
 "_id" : ObjectId("4f9105377522000000006be0"),
 "lovePerson" : "zhanying",
 "person" : "jiacheo",
 "supporters" : [ "ziming", "feidu" ]
}

可以见, gongjin从supporters中被remove掉了.
这里所有被匹配的元素都会被出列