acl  3.5.3.0
redis_client_pipeline.hpp
浏览该文件的文档.
1 #pragma once
2 #include "../acl_cpp_define.hpp"
3 #include "../stdlib/thread.hpp"
4 #include "../stdlib/string.hpp"
5 #include "../stdlib/tbox.hpp"
6 #include "../stdlib/mbox.hpp"
7 #include "redis_command.hpp"
8 
9 #if !defined(ACL_CLIENT_ONLY) && !defined(ACL_REDIS_DISABLE)
10 
11 namespace acl {
12 
13 #define USE_MBOX
14 
15 #ifdef USE_MBOX
16 # define BOX mbox
17 #else
18 # define BOX tbox
19 #endif
20 
21 class token_tree;
22 class redis_client;
23 
24 typedef enum {
25  redis_pipeline_t_cmd, // redis command type
26  redis_pipeline_t_redirect, // should redirect to another node
27  redis_pipeline_t_clusterdonw, // the redis node has been down
28  redis_pipeline_t_stop, // the current channel should stop
30 
31 /**
32  * the message for transfering between redis command, redis client pipline
33  * and redis pipeline channel, which holds the redis command or not.
34  */
36 public:
38  bool use_mbox = true)
39  : cmd_(cmd)
40  , type_(type)
41  , nchild_(0)
42  , timeout_(NULL)
43  , result_(NULL)
44  , addr_(NULL)
45  , redirect_count_(0)
46  , argc_(0)
47  , argv_(NULL)
48  , lens_(NULL)
49  {
50  if (use_mbox) {
51  mbox_ = new mbox<redis_pipeline_message>(false, false);
52  tbox_ = NULL;
53  } else {
54  tbox_ = new tbox<redis_pipeline_message>(false);
55  mbox_ = NULL;
56  }
57  }
58 
60  delete mbox_;
61  delete tbox_;
62  }
63 
65  type_ = type;
66  return *this;
67  }
68 
70  return type_;
71  }
72 
74  return cmd_;
75  }
76 
77  void set_option(size_t nchild, int* timeout) {
78  nchild_ = nchild;
79  timeout_ = timeout;
80  result_ = NULL;
81  addr_ = NULL;
82  redirect_count_ = 0;
83  }
84 
85  void set_request(size_t argc, const char** argv, size_t* lens) {
86  argc_ = argc;
87  argv_ = argv;
88  lens_ = lens;
89  }
90 
91  void set_addr(const char* addr) {
92  addr_ = addr;
93  if (addr) {
94  redirect_count_++;
95  }
96  }
97 
98  size_t get_nchild(void) const {
99  return nchild_;
100  }
101 
102  int* get_timeout(void) const {
103  return timeout_;
104  }
105 
106  void push(const redis_result* result) {
107  result_ = result;
108  if (mbox_) {
109  mbox_->push(this, false);
110  } else {
111  tbox_->push(this, false);
112  }
113  }
114 
115  const redis_result* wait(void) {
116  if (mbox_) {
117  mbox_->pop();
118  } else {
119  tbox_->pop();
120  }
121  return result_;
122  }
123 
124  const char* get_addr(void) const {
125  return addr_;
126  }
127 
128  size_t get_redirect_count(void) const {
129  return redirect_count_;
130  }
131 
132 private:
133  redis_command* cmd_;
134  redis_pipeline_type_t type_;
135  size_t nchild_;
136  int* timeout_;
139 
140  const redis_result* result_;
141  const char* addr_;
142  size_t redirect_count_;
143 
144 public:
145  size_t argc_;
146  const char** argv_;
147  size_t* lens_;
148 };
149 
151 
153 public:
155  const char* addr, int conn_timeout, int rw_timeout, bool retry);
157 
158  bool start_thread(void);
159  void stop_thread(void);
160 
161 public:
162  redis_pipeline_channel& set_passwd(const char* passwd);
163  const char* get_addr(void) const {
164  return addr_.c_str();
165  }
166 
167 protected:
168  // @override from acl::thread
169  void* run(void);
170 
171 private:
172  redis_client_pipeline& pipeline_;
173  string addr_;
174  string buf_;
175  redis_client* client_;
176  BOX<redis_pipeline_message> box_;
177  std::vector<redis_pipeline_message*> msgs_;
178 public:
179  void push(redis_pipeline_message* msg);
180 
181 private:
182  bool handle_messages(void);
183  bool flush_all(void);
184  bool wait_results(void);
185  bool wait_one(socket_stream& conn, redis_pipeline_message& msg);
186  void all_failed(void);
187 };
188 
189 /**
190  * redis pipline communication, be set and used in redis_command to
191  * improve the performance of redis commands, but not all redis commands
192  * in acl can be used in pipeline mode, such as below:
193  * 1. multiple keys operation
194  * 2. blocked operation such as SUBSCRIBE in pubsub, BLPOP in list
195  */
197 public:
198  redis_client_pipeline(const char* addr);
199  ~redis_client_pipeline(void);
200 
201  // start the pipeline thread
202  void start_thread(void);
203 
204  // stop the pipeline thread
205  void stop_thread(void);
206 
207 public:
208  // called by redis_command in pipeline mode
209  const redis_result* run(redis_pipeline_message& msg);
210 
211  // called by redis_pipeline_channel
212  void push(redis_pipeline_message* msg);
213 
214 public:
215  // set the password for connecting the redis server
216  redis_client_pipeline& set_password(const char* passwd);
217 
218  // set network IO timeout
219  redis_client_pipeline& set_timeout(int conn_timeout, int rw_timeout);
220 
221  // set if retry on IO failed in redis_client
222  redis_client_pipeline& set_retry(bool on);
223 
224  // set the max hash slot of redis, the default valud is 16384
225  redis_client_pipeline& set_max_slot(int max_slot);
226 
227  // set if connecting all the redis nodes after starting
228  redis_client_pipeline& set_preconnect(bool yes);
229 
230  // get the max hash slot of redis
231  int get_max_slot(void) const {
232  return max_slot_;
233  }
234 
235 protected:
236  // @override from acl::thread
237  void* run(void);
238 
239 private:
240  string addr_; // the default redis address
241  string passwd_; // password for connecting redis
242  int max_slot_; // the max hash slot for redis cluster
243  int conn_timeout_; // timeout to connect redis
244  int rw_timeout_; // IO timeout with redis
245  bool retry_; // if try again when disconnect from redis
246  bool preconn_; // if connecting all redis nodes when starting
247 
248  token_tree* channels_; // holds and manage all pipeline channels
249 
250  // the message queue for receiving redis message from other threads
251  BOX<redis_pipeline_message> box_;
252 
253  std::vector<char*> addrs_; // hold all redises addresses
254  const char** slot_addrs_; // map hash slot with address
255 
256  // set the hash slot with the specified redis address
257  void set_slot(int slot, const char* addr);
258 
259  // set all hash slots' addresses of all redises
260  void set_all_slot(void);
261 
262  // start all pipeline channels threads
263  void start_channels(void);
264 
265  // stop all pipeline channels threads
266  void stop_channels(void);
267 
268  // start one pipeline channel thread with the specified redis address
269  redis_pipeline_channel* start_channel(const char* addr);
270 
271  // stop one pipeline channel thread with the specified redis address
272  void stop_channel(const char* addr);
273 
274  // get one pipeline channel thread with the specified hash slot
275  redis_pipeline_channel* get_channel(int slot);
276 
277  // redirect one slot to another redis address
278  void redirect(const redis_pipeline_message& msg, int slot);
279 
280  // when one redis node down, we should clear the node's hash slot map
281  // and stop the pipeline channel thread
282  void cluster_down(const redis_pipeline_message& msg);
283 };
284 
285 /**
286  * sample:
287  * void main_thread(void) {
288  * acl::redis_client_pipeline pipeline("127.0.0.1:6379");
289  * pipeline.start_thread();
290  * // start some threads
291  * ...
292  * // wait for thease threads to exit and stop pipeline thread.
293  * pipeline.stop_thread();
294  * }
295  * // execute redis command in one thread
296  * void test_thread(acl::redis_client_pipeline& pipeline) {
297  * acl::redis cmd;
298  * cmd.set_pipeline(&pipeline);
299  * acl::string key;
300  * for (size_t i = 0; i < 100000; i++) {
301  * key.format("test-key-%d", (int) i);
302  * cmd.del(key);
303  * }
304  */
305 } // namespace acl
306 
307 #endif // !defined(ACL_CLIENT_ONLY) && !defined(ACL_REDIS_DISABLE)
const redis_result * wait(void)
redis_pipeline_message(redis_command *cmd, redis_pipeline_type_t type, bool use_mbox=true)
char * c_str() const
void set_option(size_t nchild, int *timeout)
redis_pipeline_channel(redis_client_pipeline &pipeline, const char *addr, int conn_timeout, int rw_timeout, bool retry)
redis_pipeline_message & set_type(redis_pipeline_type_t type)
void push(const redis_result *result)
const char * get_addr(void) const
redis_pipeline_type_t get_type(void) const
void push(redis_pipeline_message *msg)
#define ACL_CPP_API
redis_pipeline_channel & set_passwd(const char *passwd)
void set_request(size_t argc, const char **argv, size_t *lens)
const char * get_addr(void) const