acl  3.5.3.0
beanstalk.hpp
浏览该文件的文档.
1 #pragma once
2 #include <stdarg.h>
3 #include <vector>
4 #include "../acl_cpp_define.hpp"
5 #include "../stdlib/string.hpp"
6 #include "../stdlib/noncopyable.hpp"
7 #include "../stream/socket_stream.hpp"
8 
9 #if !defined(ACL_CLIENT_ONLY) && !defined(ACL_BEANSTALK_DISABLE)
10 
11 struct ACL_ARGV;
12 
13 namespace acl {
14 
15 /**
16  * 消息 ID 号从 1 开始递增(参加 beanstalkd 的 job.c 源程序中的如下内容:
17  * static uint64 next_id = 1; 及 make_job_with_id() 中的
18  * if (id) {
19  * j->r.id = id;
20  * if (id >= next_id) next_id = id + 1;
21  * } else {
22  * j->r.id = next_id++;
23  * }
24  * 消息优先级 pri 的取值范围为 0 ~ 4,294,968,295(最大无符号整数值),值越小
25  * 则优先级别越高,最高级别为 0 级
26  * 消息体默认最大长度为 65,535(最大无符号 short 值),该值可以在启动 beanstalkd 指定
27  * 更多内容请参考本项目 doc/ 目录下的 <beanstalk协议介绍.pdf>
28  * 本类中的命令过程内部会自动进行连接操作,在重连过程中,如果之前设置了 watch 及 use
29  * 队列,则会自动重试这些命令过程,所以一般来说不用显式调用 open 过程;当用户调用了
30  * close 函数后,不仅断开了与 beanstalkd 服务器的连接,同时会清除本类对象中存储的
31  * use 及 watch 队列
32  */
34 {
35 public:
36  /**
37  * 构造函数
38  * @param addr {const char*} beanstalkd 地址,格式:ip:port/domain:port
39  * @param conn_timeout {int} 连接服务器的超时时间(秒)
40  * @param retry {bool} 如果连接断了是否自动重连
41  */
42  beanstalk(const char* addr, int conn_timeout, bool retry = true);
43  ~beanstalk();
44 
45  /////////////////////////////////////////////////////////////////////
46  // 生产者调用的接口
47 
48  /**
49  * 选择所用的发送管道
50  * @param tube {const char*} 管道名称
51  * @return {bool} 是否成功
52  */
53  bool use(const char* tube);
54 
55  /**
56  * 向所选管道或缺省管理中发送消息
57  * @param data {const void*} 消息数据地址,可以是二进制
58  * @param len {size_t} data 数据体长度
59  * @param pri {unsigned} 优先级,值越小,优先级越高
60  * @param delay {unsigned} 表示将job放入ready队列需要等待的秒数
61  * @param ttr {unsigned} 表示允许一个worker执行该消息的秒数
62  * @return {unsigned long long} 返回所添加消息的消息号,
63  * 如果返回值 > 0 则表示添加成功,若 == 0 则表示添加失败
64  * (查看 beanstalkd 源码,可以看出消息号从 1 开始增加)
65  */
66  unsigned long long put(const void* data, size_t len,
67  unsigned pri = 1024, unsigned delay = 0, unsigned ttr = 60);
68 
69  /**
70  * 以格式字符串方式向所选管道或缺省管理中发送消息
71  * @param pri {unsigned} 优先级,值越小,优先级越高
72  * @param delay {unsigned} 表示将job放入ready队列需要等待的秒数
73  * @param ttr {unsigned} 表示允许一个worker执行该消息的秒数
74  * @param fmt {const char*} 格式字符串
75  * @return {unsigned long long} 返回所添加消息的消息号,
76  * 如果返回值 > 0 则表示添加成功,若 == 0 则表示添加失败
77  * (查看 beanstalkd 源码,可以看出消息号从 1 开始增加)
78  */
79  unsigned long long format_put(unsigned pri, unsigned delay, unsigned ttr,
80  const char* fmt, ...) ACL_CPP_PRINTF(5, 6);
81 
82  unsigned long long vformat_put(const char* fmt, va_list ap,
83  unsigned pri = 1024, unsigned delay = 0, unsigned ttr = 60);
84 
85  /**
86  * 以格式字符串方式向所选管道或缺省管理中发送消息,其中的
87  * 的 pri, delay, ttr 采用默认值
88  * @param fmt {const char*} 格式字符串
89  * @return {unsigned long long} 返回所添加消息的消息号,
90  * 如果返回值 > 0 则表示添加成功,若 == 0 则表示添加失败
91  * (查看 beanstalkd 源码,可以看出消息号从 1 开始增加)
92  */
93  unsigned long long format_put(const char* fmt, ...) ACL_CPP_PRINTF(2, 3);
94 
95  /////////////////////////////////////////////////////////////////////
96  // 消费者调用的接口
97 
98  /**
99  * 选择读取消息的管道,将其加入监控管理列表中,
100  * 不调用本函数,则使用缺省的管道(default)
101  * @param tube {const char*} 消息管道名称
102  * @return {unsigned} 返回值为关注的消息管道数, 返回值 > 0 则表示成功
103  */
104  unsigned watch(const char* tube);
105 
106  /**
107  * 取消关注(watch)一个接收消息的管道(tube)
108  * @param tube {const char*} 消息管道名称
109  * @return {unsigned} 返回值为剩余的消息关注管道数, 返回值 > 0 则表示
110  * 成功(因至少要关注一个缺省消息管道,所以正确情况下该返回值至少为 1),
111  * 如果返回值为 0 则说明输入的管道并未被关注或取消关注失败
112  */
113  unsigned ignore(const char* tube);
114 
115  /**
116  * 取消关注所有的接收消息的管道
117  * @return {unsigned} 返回值为剩余的消息关注管道数, 返回值 > 0 则表示
118  * 成功(因至少要关注一个缺省消息管道,所以正确情况下该返回值至少为 1),
119  * 返回 0 表示取消关注失败
120  */
121  unsigned ignore_all();
122 
123  /**
124  * 从消息输出管道中获取一条消息,但并不删除消息,可以设置
125  * 等待超时,如果设为 -1 则永远阻塞等待消息可用
126  * @param buf {string&} 存储获得的一条消息,函数内部会先清空该缓冲区
127  * @param timeout {int} 等待队列服务器返回消息的超时值,当为 -1
128  * 时,则无限期等待,当 > 0 时,则在该时间内若没有消息,则返回,
129  * 当 == 0 时,则立即返回一条消息或返回超时
130  * @return {unsigned long long} 返回所取得的消息号,若返回值 > 0
131  * 表示正确取到一个消息,否则说明出错或超时没有消息可用,其中当
132  * 返回 0 时,如果调用 get_error() 获得的内容为 TIMED_OUT 则表示
133  * 超时了,当为 DEADLINE_SOON 时则表示该消息已经被读取但在规定的 ttr
134  * (事务时间内) 没有被 delete_id
135  */
136  unsigned long long reserve(string& buf, int timeout = -1);
137 
138  /**
139  * 从队列服务器中删除指定 ID 号的消息
140  * @param id {unsigned long long} 消息号
141  * @return {bool} 是否成功删除
142  */
143  bool delete_id(unsigned long long id);
144 
145  /**
146  * 将一个已经被获取的消息重新放回ready队列(并将job状态置为 "ready"),
147  * 让该消息可以被其它连接获得
148  * @param id {unsigned long long} 消息号
149  * @param pri {unsigned} 优先级别
150  * @param delay {unsigned} 在该消息被放入ready队列之前需要等待的秒数
151  * @return {bool} 是否成功
152  */
153  bool release(unsigned long long id, unsigned pri = 1024,
154  unsigned delay = 0);
155 
156  /**
157  * 将一个消息的状态置为 "buried", Buried 消息被放在一个FIFO的链表中,
158  * 在客户端调用kick命令之前,这些消息将不会被服务端处理
159  * @param id {unsigned long long} 消息号
160  * @param pri {unsigned int} 优先级别
161  * @return {bool} 是否成功
162  */
163  bool bury(unsigned long long id, unsigned pri = 1024);
164 
165  /**
166  * 允许一个worker请求在一个消息获取更多执行的时间。这对于那些需要
167  * 长时间完成的消息是非常有用的,但同时也可能利用TTR的优势将一个消息
168  * 从一个无法完成工作的worker处移走。一个worker可以通过该命令来告诉
169  * 服务端它还在执行该job (比如:在收到DEADLINE_SOON是可以发生给命令)
170  * @param id {unsigned long long} 消息号
171  * @return {bool} 是否成功
172  */
173  bool touch(unsigned long long id);
174 
175  /////////////////////////////////////////////////////////////////////
176  // 其它接口
177 
178  /**
179  * 连接 beanstalkd 服务器,通常情况下不需要显示地调用该函数,上述命令
180  * 会自动根据需要自动调用本函数
181  * @return {bool} 否成功
182  */
183  bool open();
184 
185  /**
186  * 显示关闭与 beanstalkd 的连接,当该类实例析构时会尝试调用关闭过程,
187  * 调用本函数后,类对象内部的 tube_used_ 及 tubes_watched_ 会被释放
188  */
189  void close();
190 
191  /**
192  * 显示通知 beanstalkd 服务器退出连接(服务器收到此命令后会立即关闭连接)
193  */
194  void quit();
195 
196  /**
197  * 获取消息队列中指定的消息号的数据
198  * @param buf {string&} 如果消息存在则存储该条消息,函数内部会先清空该缓冲区
199  * @param id {unsigned long long} 指定的消息号
200  * @return {unsigned long long} 返回取得的 ready 状态消息号,
201  * 若返回值 > 0 则说明取得了一个消息,否则表示没有消息可用
202  */
203  unsigned long long peek(string& buf, unsigned long long id);
204 
205  /**
206  * 获得当前关注 (watch) 管道中的一条 ready 状态消息,
207  * 如果消息不存在也立即返回
208  * @param buf {string&} 如果消息存在则存储该条消息,函数内部会先清空该缓冲区
209  * @return {unsigned long long} 返回取得的 ready 状态消息号,
210  * 若返回值 > 0 则说明取得了一个消息,否则表示没有消息可用
211  */
212  unsigned long long peek_ready(string& buf);
213 
214  /**
215  * 获得当前关注 (watch) 管道中的一条 delayed 状态消息,
216  * 如果消息不存在也立即返回
217  * @param buf {string&} 如果消息存在则存储该条消息,函数内部会先清空该缓冲区
218  * @return {unsigned long long} 返回取得的 delayed 状态消息号,
219  * 若返回值 > 0 则说明取得了一个消息,否则表示没有消息可用
220  */
221  unsigned long long peek_delayed(string& buf);
222 
223  /**
224  * 获得当前关注 (watch) 管道中的一条 buried 状态消息,
225  * 如果消息不存在也立即返回
226  * @param buf {string&} 如果消息存在则存储该条消息,函数内部会先清空该缓冲区
227  * @return {unsigned long long} 返回取得的 buried 状态消息号,
228  * 若返回值 > 0 则说明取得了一个消息,否则表示没有消息可用
229  */
230  unsigned long long peek_buried(string& buf);
231 
232  /**
233  * 该命令只能针对当前正在使用的tube执行;它将 buried
234  * 或者 delayed 状态的消息移动到 ready 队列
235  * @param n {unsigned} 表示每次 kick 消息的上限,
236  * 服务端将最多 kick 的消息数量
237  * @return {int} 表示本次kick操作作用消息的数目,返回 -1 表示出错
238  */
239  int kick(unsigned n);
240 
241  /**
242  * 获得客户当前正在使用的消息管道
243  * @param buf {string&} 存储当前使用的消息管道,函数内部会先清空该缓冲区
244  * @return {bool} 是否成功获得
245  */
246  bool list_tube_used(string&buf);
247 
248  /**
249  * 获得已经存在的所有消息管道(tube)的列表集合
250  * @param buf {string&} 存储结果,函数内部会先清空该缓冲区
251  * @return {bool} 是否成功获得
252  */
253  bool list_tubes(string& buf);
254 
255  /**
256  * 获得当前关注(watch)的消息管道的集合
257  * @param buf {string&} 存储结果,函数内部会先清空该缓冲区
258  * @return {bool} 是否成功获得
259  */
260  bool list_tubes_watched(string& buf);
261 
262  /**
263  * 给定时间内暂停从指定消息管道(tube)中获取消息
264  * @param tube {const char*} 消息管道名
265  * @param delay {unsigned} 指定时间段
266  * @return {bool} 是否成功
267  */
268  bool pause_tube(const char* tube, unsigned delay);
269 
270  /////////////////////////////////////////////////////////////////////
271  // 公共接口
272  const char* get_error() const
273  {
274  return errbuf_.c_str();
275  }
276 
278  {
279  return conn_;
280  }
281 
282  /**
283  * 返回构造函数中 beanstalkd 的服务器地址,格式:ip:port
284  * @return {const char*} 永远返回非空的 beanstalkd 服务器地址
285  */
286  const char* get_addr() const
287  {
288  return addr_;
289  }
290 
291 private:
292  char* addr_;
293  int timeout_;
294  bool retry_;
295  string errbuf_;
296  char* tube_used_;
297  std::vector<char*> tubes_watched_;
298  socket_stream conn_;
299  unsigned long long peek_fmt(string& buf, const char* fmt, ...)
300  ACL_CPP_PRINTF(3, 4);
301  bool list_tubes_fmt(string& buf, const char* fmt, ...)
302  ACL_CPP_PRINTF(3, 4);
303 
304  unsigned ignore_one(const char* tube);
305  bool beanstalk_open();
306  bool beanstalk_use();
307  unsigned beanstalk_watch(const char* tube);
308  ACL_ARGV* beanstalk_request(const string& cmdline,
309  const void* data = NULL, size_t len = 0);
310 };
311 
312 } // namespace acl
313 
314 #endif // !defined(ACL_CLIENT_ONLY) && !defined(ACL_BEANSTALK_DISABLE)
#define ACL_CPP_PRINTF(format_idx, arg_idx)
Definition: atomic.hpp:75
const char * get_addr() const
Definition: beanstalk.hpp:286
socket_stream & get_conn()
Definition: beanstalk.hpp:277
const char * get_error() const
Definition: beanstalk.hpp:272
ACL_API void const char * fmt
Definition: acl_aio.h:771
#define ACL_CPP_API