acl  3.5.3.0
redis_stream.hpp
浏览该文件的文档.
1 #pragma once
2 #include "../acl_cpp_define.hpp"
3 #include "../stdlib/string.hpp"
4 #include "redis_command.hpp"
5 
6 #if !defined(ACL_CLIENT_ONLY) && !defined(ACL_REDIS_DISABLE)
7 
8 namespace acl
9 {
10 
12 {
13  string name;
14  string value;
15 };
16 
18 {
19  string id;
20  std::vector<redis_stream_field> fields;
21 };
22 
24 {
25  string key;
26  std::vector<redis_stream_message> messages;
27 
28  bool empty(void) const
29  {
30  return messages.empty();
31  }
32 
33  size_t size(void) const
34  {
35  return messages.size();
36  }
37 };
38 
40 {
41  string name;
42  size_t pending;
43  size_t idle;
44 
46  {
47  pending = 0;
48  idle = 0;
49  }
50 };
51 
53 {
54  string name;
56  size_t consumers;
57  size_t pending;
58 
60  {
61  consumers = 0;
62  pending = 0;
63  }
64 };
65 
67 {
68  size_t length;
71  size_t groups;
75 
77  {
78  length = 0;
79  radix_tree_keys = 0;
80  radix_tree_nodes = 0;
81  groups = 0;
82  }
83 };
84 
86 {
87  string name;
89 
91  {
92  pending_number = 0;
93  }
94 };
95 
97 {
98  string smallest_id;
99  string greatest_id;
100  std::vector<redis_pending_consumer> consumers;
101 
102  bool empty(void) const
103  {
104  return consumers.empty();
105  }
106 
107  size_t size(void) const
108  {
109  return consumers.size();
110  }
111 };
112 
114 {
115  string id;
116  string consumer;
117  unsigned long long elapsed;
118  size_t delivered;
119 
121  {
122  elapsed = 0;
123  delivered = 0;
124  }
125 };
126 
128 {
129  std::map<string, redis_pending_message> messages;
130 
131  bool empty(void) const
132  {
133  return messages.empty();
134  }
135 
136  size_t size(void) const
137  {
138  return messages.size();
139  }
140 };
141 
142 class ACL_CPP_API redis_stream : virtual public redis_command
143 {
144 public:
145  redis_stream(void);
146  redis_stream(redis_client* conn);
149 
151  redis_stream(redis_client_cluster* cluster, size_t max_conns);
152 
153  virtual ~redis_stream(void);
154 
155  /////////////////////////////////////////////////////////////////////
156 
157  /**
158  * appends the specified stream entry to the stream at the specified key
159  * @param key {const char*} the specified key of the stream
160  * @param fields {const std::map<string, string>&} holds all the entries
161  * to be appended to the stream, the map's key is the entry's name,
162  * and the value is the entry's value
163  * @param result {string&} will hold the message-id of the added entry
164  * @param id {const char*} a stream entry ID identifies a given entry
165  * inside a stream, default "*" mean that redis-server will choose
166  * one ID internal. When the user specified and explicit ID, the ID's
167  * format is look like 1526919030474-55 that includes two numbers
168  * separated by '-', the minimum valid ID is 0-1
169  * param maxlen {size_t} if > 0, limit the size of the stream
170  * @return {bool} return true if entry was added successfully, or some
171  * error happened which the error reason can be acquied by calling
172  * result_error() of the base class redis_command.
173  */
174  bool xadd(const char* key, const std::map<string, string>& fields,
175  string& result, const char* id = "*");
176  bool xadd(const char* key, const std::vector<string>& names,
177  const std::vector<string>& values,
178  string& result, const char* id = "*");
179  bool xadd(const char* key, const std::vector<const char*>& names,
180  const std::vector<const char*>& values, string& result,
181  const char* id = "*");
182  bool xadd(const char* key, const char* names[], const size_t names_len[],
183  const char* values[], const size_t values_len[], size_t argc,
184  string& result, const char* id = "*");
185  bool xadd_with_maxlen(const char* key, size_t maxlen,
186  const std::map<string, string>& fields, string& result,
187  const char* id = "*");
188 
189  /**
190  * returns the number of entries inside a stream.
191  * @param key {const char*} the specified key of the stream
192  * @return {int} value >= 0 if the command was executed correctly, -1
193  * will returned if some error happened.
194  */
195  int xlen(const char* key);
196 
197  /**
198  * removes the specified entries from a stream, and returns the number
199  * of entries deleted, that may be different from the number of IDs
200  * passed to the command in case certain IDs do not exist.
201  * @param key {const char*} the specified key of the stream
202  * @param id {const char*} a stream entry ID look like 1526919030474-55
203  * @return {int} return the number of entries actually deleted, if some
204  * error happened -1 will be returned.
205  */
206  int xdel(const char* key, const char* id);
207 
208  /**
209  * removes some entries with the specified IDs, and returns the number
210  * of entries deleted.
211  * @param key {const char*}
212  * @param ids {const std::vector<string>&} holds the entries' IDs to
213  * be deleted
214  * @return {int}
215  */
216  int xdel(const char* key, const std::vector<string>& ids);
217  int xdel(const char* key, const std::vector<const char*>& ids);
218 
219  /**
220  * trims the stream to a given number of items, evicting older items
221  * (items with lower IDs) if needed.
222  * @param key {const char*}
223  * @param maxlen {size_t} specify the latest exactly items to be deleted
224  * @param tilde {bool} if true, the number of items to be deleted is
225  * not exactly equal the maxlen, the real number maybe more than the
226  * maxlen with a few tens, but never less than the maxlen
227  * @return return the number of entries deleted from the stream
228  */
229  int xtrim(const char* key, size_t maxlen, bool tilde = false);
230 
231  /////////////////////////////////////////////////////////////////////
232 
233  /**
234  * read data from one or multiple streams, only returning entries with
235  * an ID greater than the last received ID reported by the caller.
236  * @param messages {redis_stream_messages&} will hold the read's items,
237  * redis_stream_messages defined above
238  * @param streams {const std::map<string, string>&} holds the specified
239  * streams' keys to be read by users
240  * @param count {size_t} specifies the max count of items to be read,
241  * no limit when 0 was set
242  * @param block {ssize_t} specifies the read timeout, block if 0 set,
243  * no-block if -1 set
244  * @return {bool} return the status of executing the xread command
245  */
246  bool xread(redis_stream_messages& messages,
247  const std::map<string, string>& streams,
248  size_t count = 1000, ssize_t block = 0);
249 
250  /**
251  * the XREADGROUP command is a special version of the XREAD command
252  * with support for consumer groups.
253  * @param messages {redis_stream_messages&}
254  * @param group {const char*} the consumer group
255  * @param consumer {const char*} the consumer belonging to the group
256  * @param streams {const std::map<string, string>&} holds the streams'
257  * keys and IDs for each streams, the map's key is the stream's key
258  * and the map's value is the stream's ID option, which can be one
259  * of the following two:
260  * 1. The special > ID, which means that the consumer want to receive
261  * only messages that were never delivered to any other consumer.
262  * It just means, give me new messages.
263  * 2. Any other ID, that is, 0 or any other valid ID or incomplete ID
264  * (just the millisecond time part), will have the effect of
265  * returning entries that are pending for the consumer sending the
266  * command. So basically if the ID is not >, then the command will
267  * just let the client access its pending entries: delivered to it,
268  * but not yet acknowledged.
269  * @param count {size_t}
270  * @param block {ssize_t} set the blocked timeout waiting for messages,
271  * if block is 0, will block until getting one message at least;
272  * if block is -1, don't block for messages.
273  * @param noack {bool} The NOACK subcommand can be used to avoid adding
274  * the message to the PEL in cases where reliability is not a
275  * requirement and the occasional message loss is acceptable. This is
276  * equivalent to acknowledging the message when it is read.
277  * @return {bool} return the status of xreadgroup command
278  */
279  bool xreadgroup(redis_stream_messages& messages, const char* group,
280  const char* consumer, const std::map<string, string>& streams,
281  size_t count = 1000, ssize_t block = 0, bool noack = false);
282 
283  /**
284  * the XREADGROUP with NOACK subcommand for reading messages.
285  * @param messages {redis_stream_messages&}
286  * @param group {const char*}
287  * @param consumer {const char*}
288  * @param streams {const std::map<string, string>&}
289  * @param count {size_t}
290  * @param block {ssize_t}
291  * @return {bool}
292  */
293  bool xreadgroup_with_noack(redis_stream_messages& messages,
294  const char* group, const char* consumer,
295  const std::map<string, string>& streams,
296  size_t count = 1000, ssize_t block = 0);
297 
298  /**
299  * The command returns the stream entries matching a given range of IDs.
300  * @param messages {redis_stream_messages&}
301  * @param key {const char*}
302  * @param start {const char*} the start ID of the query interval;
303  * '-' means starting from the minimum ID possible inside a stream
304  * @param end {const char*} the end ID of the query interval;
305  * '+' means the end of the maximum ID possible inside a stream
306  * @param count {size_t} reduce the number of entries reported
307  * @return {bool}
308  */
309  bool xrange(redis_stream_messages& messages, const char* key,
310  const char* start = "-", const char* end = "+",
311  size_t count = 1000);
312 
313  /**
314  * Return a range of elements in a stream, with IDs matching the
315  * specified IDs interval, in reverse order (from greater to smaller
316  * IDs) compared to XRANGE.
317  * @param messages {redis_stream_messages&}
318  * @param key {const char*}
319  * @param start {const char*} start with the higher ID
320  * @param end (const char*} end with the lower ID
321  * @param count {size_t}
322  * @return {bool}
323  */
324  bool xrevrange(redis_stream_messages& messages, const char* key,
325  const char* start = "+", const char* end = "-",
326  size_t count = 1000);
327 
328  /////////////////////////////////////////////////////////////////////
329 
330  /**
331  * In the context of a stream consumer group, this command changes
332  * the ownership of a pending message, so that the new owner is
333  * the consumer specified as the command argument.
334  * @param messages {std::vector<redis_stream_message>&} holds the
335  * messages been XLAIMed
336  * @param key {const char*}
337  * @param group {const char*}
338  * @param consumer {const char*}
339  * @param min_idle_time {long}
340  * @param ids {const std::vector<string>&} the IDs to be XCLAIMed
341  * @param idle {size_t}
342  * @param time_ms {long long}
343  * @param retry_count {int}
344  * @param force {bool}
345  * @return {bool}
346  */
347  bool xclaim(std::vector<redis_stream_message>& messages,
348  const char* key, const char* group, const char* consumer,
349  long min_idle_time, const std::vector<string>& ids,
350  size_t idle = 0, long long time_ms = -1,
351  int retry_count = -1, bool force = false);
352 
353  /**
354  * XCLAIM with the JUSTID subcommand
355  */
356  bool xclaim_with_justid(std::vector<string>& messages_ids,
357  const char* key, const char* group, const char* consumer,
358  long min_idle_time, const std::vector<string>& ids,
359  size_t idle = 0, long long time_ms = -1,
360  int retry_count = -1, bool force = false);
361 
362  /////////////////////////////////////////////////////////////////////
363 
364  /**
365  * Removes one message from the pending entries list (PEL) of
366  * a stream consumer group.
367  * @param key {const char*}
368  * @param group {const char*}
369  * @param id {const char*}
370  * @return {int} return integer >= 0 if ok, -1 if error
371  */
372  int xack(const char* key, const char* group, const char* id);
373 
374  /**
375  * Removes one or multiple message from the pending entries list (PEL)
376  * of a stream consumer group.
377  * @param key {const char*}
378  * @param group {const char*}
379  * @param ids {const std::vector<string>&}
380  * @return {int} return count of messages been acked, return -1 if error
381  */
382  int xack(const char* key, const char* group,
383  const std::vector<string>& ids);
384  int xack(const char* key, const char* group,
385  const std::vector<const char*>& ids);
386  int xack(const char* key, const char* group,
387  const std::list<string>& ids, size_t size);
388  int xack(const char* key, const char* group,
389  const std::list<const char*>& ids, size_t size);
390 
391  /////////////////////////////////////////////////////////////////////
392 
393  /**
394  * The XPENDING command with SUMMARY subcommand.
395  * @param key {const char*}
396  * @param group {const char*}
397  * @param result {redis_pending_summary&} defined above
398  * @return {bool}
399  */
400  bool xpending_summary(const char* key, const char* group,
401  redis_pending_summary& result);
402 
403  /**
404  * The XPENDING command with DETAIL subcommand.
405  * @param result {redis_pending_summary&} defined above
406  * @param key {const char*}
407  * @param group {const char*}
408  * @param start_id {const char*}
409  * @param end_id {const char*}
410  * @param count {size_t} limit the max count to be saved in result
411  * @param consumer {const char*}
412  * @return {bool}
413  */
414  bool xpending_detail(redis_pending_detail& result,
415  const char* key, const char* group,
416  const char* start_id = "-", const char* end_id = "+",
417  size_t count = 1, const char* consumer = NULL);
418 
419  /////////////////////////////////////////////////////////////////////
420 
421  /**
422  * The XGROUP command with the subcommand HELP
423  * @param result {std::vector<string>&} will hold the result
424  * @return {bool}
425  */
426  bool xgroup_help(std::vector<string>& result);
427 
428  /**
429  * The XGROUP command with the subcommand CREATE
430  * @param key {const char*}
431  * @param group {const char*}
432  * @param id {const char*} the ID of the last item in the stream to
433  * consider already delivered, "$" means the ID of the last item
434  * in the stream
435  * @param mkstream {bool} when mkstream is true, the stream with the
436  * specified key will be created if the stream doesn't exist
437  * @return {bool}
438  */
439  bool xgroup_create(const char* key, const char* group,
440  const char* id = "$", bool mkstream = true);
441 
442  /**
443  * The XGROUP command with the subcommand DESTROY. With this command,
444  * the consumer group will be destroyed even if there are active
445  * consumers and pending messages, so make sure to call this command
446  * only when really needed.
447  * @param key {const char*}
448  * @param group {const char*}
449  * @return {int} return the number of pending messages for the
450  * specified group, return -1 if error
451  */
452  int xgroup_destroy(const char* key, const char* group);
453  bool xgroup_setid(const char* key, const char* group,
454  const char* id = "$");
455 
456  /**
457  * The XGROUP command with the subcommand DELCONSUMER. With this command,
458  * just remove a given consumer from a consumer group.
459  * @param key {const char*}
460  * @param group {const char*}
461  * @param consumer {const char*}
462  * @return {int} return the number of the pending messages for the
463  * specified consumer, return -1 if error
464  */
465  int xgroup_delconsumer(const char* key, const char* group,
466  const char* consumer);
467 
468  /////////////////////////////////////////////////////////////////////
469 
470  /**
471  * The XINFO command with the subcommand HELP
472  * @param result {std::vector<string>&}
473  * @return {bool}
474  */
475  bool xinfo_help(std::vector<string>& result);
476 
477  /**
478  * The XINFO command with the subcommand CONSUMERS. With this command,
479  * every consumer in a specific consumer group can be got.
480  * @param key {const char*}
481  * @param group {const char*}
482  * @param result {std::map<string, redis_xinfo_consumer>&}
483  * @return {bool}
484  */
485  bool xinfo_consumers(const char* key, const char* group,
486  std::map<string, redis_xinfo_consumer>& result);
487 
488  /**
489  * The XINFO command with the subcommand GROUPS.
490  * @param key {const char*}
491  * @param result {std::map<string, redis_xinfo_group>&}
492  * @return {bool}
493  */
494  bool xinfo_groups(const char* key,
495  std::map<string, redis_xinfo_group>& result);
496 
497  /**
498  * The XINFO command with the subcommand STREAM. In this form the
499  * command returns general information about the stream stored
500  * at the specified key.
501  * @param key {const char*}
502  * @param result {redis_stream_info&} devined about
503  * @return {bool}
504  */
505  bool xinfo_stream(const char* key, redis_stream_info& result);
506 
507  /////////////////////////////////////////////////////////////////////
508 
509 private:
510  void build(const char* cmd, const char* key, const char* id,
511  const std::map<string, string>& fields);
512  void build(const char* cmd, const char* key, const char* id,
513  const std::vector<string>& names,
514  const std::vector<string>& values);
515  void build(const char* cmd, const char* key, const char* id,
516  const std::vector<const char*>& names,
517  const std::vector<const char*>& values);
518  void build(const char* cmd, const char* key, const char* id,
519  const char* names[], const size_t names_len[],
520  const char* values[], const size_t values_len[], size_t argc);
521  void build(const std::map<string, string>& streams, size_t i,
522  size_t count, ssize_t block, bool noack = false);
523  void xread_build(const std::map<string, string>& streams,
524  size_t count, ssize_t block);
525  void xreadgroup_build(const char* group, const char* consumer,
526  const std::map<string, string>& streams,
527  size_t count, ssize_t block, bool noack);
528  bool get_results(redis_stream_messages& messages);
529  bool get_messages(const redis_result& rr, redis_stream_messages& messages);
530  bool get_one_message(const redis_result& rr, redis_stream_message& message);
531  bool range(redis_stream_messages& messages, const char* cmd,
532  const char* key, const char* start, const char* end, size_t count);
533 
534  bool get_one_consumer(const redis_result& rr, redis_xinfo_consumer& consumer);
535  bool get_one_group(const redis_result& rr, redis_xinfo_group& group);
536  bool get_pending_consumer(const redis_result& rr,
537  redis_pending_consumer& consumer);
538  bool get_pending_message(const redis_result& rr,
539  redis_pending_message& message);
540 
541  void xclaim_build(const char* key, const char* group,
542  const char* consumer, long min_idle_time,
543  const std::vector<string>& ids, size_t idle, long long time_ms,
544  int retry_count, bool force, bool justid);
545 
546 };
547 
548 } // namespace acl
549 
550 #endif // !defined(ACL_CLIENT_ONLY) && !defined(ACL_REDIS_DISABLE)
size_t size(void) const
size_t size(void) const
#define ACL_CPP_DEPRECATED
Definition: atomic.hpp:86
redis_stream_message last_entry
std::vector< redis_stream_field > fields
unsigned long long elapsed
std::vector< redis_stream_message > messages
redis_stream_message first_entry
std::vector< redis_pending_consumer > consumers
std::map< string, redis_pending_message > messages
#define ACL_CPP_API
size_t size(void) const