关于mq:简单易用的任务队列beanstalkd

64次阅读

共计 4488 个字符,预计需要花费 12 分钟才能阅读完成。

更多技术文章,请关注我的集体博客 www.immaxfang.com 和小公众号 Max 的学习札记

概述

beanstalkd 是一个简略疾速的分布式工作队列零碎,协定基于 ASCII 编码运行在 TCP 上。其最后设计的目标是通过后盾异步执行耗时工作的形式升高高容量 Web 利用的页面延时。其具备简略、轻量、易用等特点,也反对对工作优先级、延时 / 超时重发等管制,同时还有泛滥语言版本的客户端反对,这些长处使得它成为各种须要队列零碎场景的一种常见抉择。

beanstalkd 长处

  • 如他官网的介绍,simple&fast,应用非常简单,适宜须要引入音讯队列又不想引入 kafka 这类重型的 mq,保护成本低;同时,它的性能十分高,大部分场景下都能够 cover 住。
  • 反对长久化
  • 反对音讯优先级,topic,延时音讯,音讯重试等
  • 支流语言客户端都反对,还能够依据 beanstalkd 协定自行实现。

beanstalkd 有余

  • 无最大内存管制,当业务音讯极多时,服务可能会不稳固。
  • 官网没有提供集群故障切换计划(主从或哨兵等),须要本人解决。

beanstalkd 重点概念

  • job

工作,队列中的根本单元,每个 job 都会有 id 和优先级。有点相似其余音讯队列中的 message 的概念。但 job 有各种状态,下文介绍生命周期局部会重点介绍。job 寄存在 tube 中。

  • tube

管道,用来存储同一类型的 job。有点相似其余音讯队列中的 topic 的概念。beanstalkd 通过 tube 来实现多任务队列,beanstalkd 中能够有多个管道,每个管道有本人的 producer 和 consumer,管道之间相互不影响。

  • producer

job 生产者。通过 put 命令将一个 job 放入到一个 tube 中。

  • consumer

job 消费者。通过 reserve 来获取 job,通过 delete、release、bury 来扭转 job 的状态。

beanstalkd 生命周期

上文介绍到,beanstalkd 中 job 有状态辨别,在整个生命周期中,job 可能有四种状态:READY, RESERVED, DELAYED, BURIED。只有处于 READY 状态的 job 能力被生产。下图介绍了各状态之间的流转状况。

producer 在创立 job 的时候有两种形式,put 和 put with delay(延时工作)。
如果 producer 应用 put 间接创立一个 job 时,该 job 就处于 READY 状态,期待 consumer 解决。
如果 producer 应用 put with delay 形式创立 job,该 job 的初始状态为 DELAYED 状态,期待延迟时间过后才变更为 READY 状态。
以上两种形式创立的 job 都会传入一个 TTR(超时机制),当 job 处于 RESERVED 状态时,TTR 开始倒计时,当 TTR 倒计时完,job 状态还没有扭转,则会认为该 job 解决失败,会被从新放回到队列中。

consumer 获取到(reserve)一个 READY 状态的 job 之后,该 job 的状态就会变更为 RESERVED。此时,其余的 consumer 就不能再操作该 job 了。当 consumer 实现该 job 之后,能够抉择 delete,release,或 bury 操作。

  • delete,job 被删除,从 beanstalkd 中革除,当前也无奈再获取到,生命周期完结。
  • release,能够把该 job 从新变更为 READY 状态,使得其余的 consumer 能够持续获取和执行该 job,也能够应用 release with delay 延时操作,这样会先进入 DELAYED 状态,延迟时间达到后再变为 READY。
  • bury,能够将 job 休眠,等须要的时候,在将休眠的 job 通过 kick 命令变更回 READY 状态,也能够通过 delete 间接删除 BURIED 状态的 job。

处于 BURIED 状态的 job,能够通过 kick 重回 READY 状态,也能够通过 delete 删除 job。

为什么设计这个 BURIED 状态呢?
个别咱们能够用这个状态来做异样捕捉,例如执行超时或者异样的 job,咱们能够将其置为 BURIED 状态,这样做有几个益处:
1. 能够便面这些异样的 job 间接被放回队列重试,影响失常的队列生产(这些失败一次的 job,很有可能再次失败)。如果没有这个 BURIED 状态,如果咱们要独自隔离,个别咱们会应用一个新的 tube 独自寄存这些异样的 job,应用独自的 consumer 生产。这样就不会影响失常的新音讯生产。特地是失败率比拟高的时候,会占用很多的失常资源。
2. 便于人工排查,下面曾经讲到,能够将异样的 job 置为 BURIED 状态,这样人工排查时重点关注这个状态就能够了。

beanstalkd 个性

长久化

通过 binlog 将 job 及其状态记录到本地文件,当 beanstalkd 重启时,能够通过读取 binlog 来复原之前的 job 状态。

分布式

在 beanstalkd 的文档中,其实是反对分布式的,其设计思维和 Memcached 相似,beanstalkd 各个 server 之间并不知道彼此的存在,是通过 client 实现分布式以及依据 tube 名称去特定的 server 上获取 job。贴一篇专门探讨 beanstalkd 分布式的文章,Beanstalkd 的一种分布式计划

工作延时

人造反对延时工作,能够在创立 job 时指定延时工夫,也能够当 job 被解决完后能力后,消费者应用 release with delay 将 job 再次放入队列延时执行。

工作优先级

producer 生成的 job 能够给他调配优先级,反对 0 到 2^32 的优先级,值越小,优先级越高,默认优先级为 1024。优先级高的 job 会被 consumer 优先执行。

超时机制

为了避免某个 consumer 长时间占用 job 但无奈解决实现的状况,beanstalkd 的 reserve 操作反对设置 timeout 工夫 (TTR)。如果 consumer 不能在 TTR 内发送 delete、release 或 bury 命令扭转 job 状态,那么 beanstalkd 会认为工作解决失败,会将 job 从新置为 READY 状态供其余 consumer 生产。
如果消费者曾经预知可能无奈在 TTR 内实现该 job,则能够发送 touch 命令,使得 beanstalkd 从新计算 TTR。

工作预留

有一个 BURIED 状态能够作为缓冲,具体特点见上文生命周期中对于 BURIED 状态的介绍。

装置及配置

以下以 ubuntu 为例,安转 beanstalkd:

sudo apt-get update
sudo apt-get install beanstalkd
vi /etc/sysconfig/beanstalkd
# 增加如下内容
BEANSTALKD_BINLOG_DIR=/data/beanstalkd/binlog

能够通过 beanstalkd 命令来运行服务,并且能够增加多种参数。命令的格局如下:

beanstalkd [OPTIONS]

 -b DIR   wal directory
 -f MS    fsync at most once every MS milliseconds (use -f0 for "always fsync")
 -F       never fsync (default)
 -l ADDR  listen on address (default is 0.0.0.0)
 -p PORT  listen on port (default is 11300)
 -u USER  become user and group
 -z BYTES set the maximum job size in bytes (default is 65535)
 -s BYTES set the size of each wal file (default is 10485760)
            (will be rounded up to a multiple of 512 bytes)
 -c       compact the binlog (default)
 -n       do not compact the binlog
 -v       show version information
 -V       increase verbosity
 -h       show this help

如下咱们启动一个 beanstalkd 服务,并开启 binlog:

nohup beanstalkd -l 0.0.0.0 -p 11300 -b /data/beanstalkd/binlog/ &

beanstalkd 管理工具

官网举荐的一些管理工具:Tools
笔者罕用的管理工具:https://github.com/ptrofimov/beanstalk_console
如果只是简略的操作和查看 beanstalkd,能够应用 telnet 工具,而后执行 stats,use,put,watch 等:

$ telnet 127.0.0.1 11300
stats

理论利用

beansralkd 有很多语言版本的客户端实现,官网提供了一些客户端列表 beanstalkd 客户端列表。
如果现有的这些库不满足需要,也能够自行实现,参考 beanstalkd 协定。

以下以 go 为例,简略演示下 beanstalkd 罕用解决操作。

go get github.com/beanstalkd/go-beanstalk

生产者

向默认的 tube 中投入 job:

id, err := conn.Put([]byte("myjob"), 1, 0, time.Minute)
if err != nil {panic(err)
}
fmt.Println("job", id)

向指定的 tube 中投入 job:

tube := &beanstalk.Tube{Conn: conn, Name: "mytube"}
id, err := tube.Put([]byte("myjob"), 1, 0, time.Minute)
if err != nil {panic(err)
}
fmt.Println("job", id)

消费者

生产默认的 tube 中的 job:

id, body, err := conn.Reserve(5 * time.Second)
if err != nil {panic(err)
}
fmt.Println("job", id)
fmt.Println(string(body))

生产指定的 tube (此处指定多个) 中的 job:

tubeSet := beanstalk.NewTubeSet(conn, "mytube1", "mytube2")
id, body, err := tubeSet.Reserve(10 * time.Hour)
if err != nil {panic(err)
}
fmt.Println("job", id)
fmt.Println(string(body))

beanstalkd 应用小 tips

  • 能够通过指定 tube,在 put 的时候将 job 放入指定的 tube 中,否则会放入 default 的 tube 中。
  • beanstalkd 反对长久化,在启动时应用 -b参数来开启 binlog,通过binog 能够将 job 及其状态记录到文件里。当从新应用 -b 参数重启 beanstalkd,将读取 binlog 来复原之前的 job 及状态。

参考资料

  • Beanstalkd 官网
  • Beanstalkd 中文协定
  • Beanstalkd 学习钻研

更多技术文章,请关注我的集体博客 www.immaxfang.com 和小公众号 Max 的学习札记

正文完
 0