acl  3.5.3.0
tbox_array.hpp
浏览该文件的文档.
1 #pragma once
2 #include "../acl_cpp_define.hpp"
3 #include <list>
4 #include <stdlib.h>
5 #include <string.h>
6 #include "thread_mutex.hpp"
7 #include "thread_cond.hpp"
8 #include "noncopyable.hpp"
9 
10 namespace acl
11 {
12 
13 /**
14  * 用于线程之间的消息通信,通过线程条件变量及线程锁实现
15  *
16  * 示例:
17  *
18  * class myobj
19  * {
20  * public:
21  * myobj(void) {}
22  * ~myobj(void) {}
23  *
24  * void test(void) { printf("hello world\r\n"); }
25  * };
26  *
27  * acl::tbox_array<myobj> tbox;
28  *
29  * void thread_producer(void)
30  * {
31  * myobj* o = new myobj;
32  * tbox.push(o);
33  * }
34  *
35  * void thread_consumer(void)
36  * {
37  * myobj* o = tbox.pop();
38  * o->test();
39  * delete o;
40  * }
41  */
42 
43 template<typename T>
44 class tbox_array : public noncopyable
45 {
46 public:
47  /**
48  * 构造方法
49  * @param free_obj {bool} 当 tbox_array 销毁时,是否自动检查并释放
50  * 未被消费的动态对象
51  */
52  tbox_array(bool free_obj = true)
53  : capacity_(10000)
54  , off_curr_(0)
55  , off_next_(0)
56  , waiters_(0)
57  , free_obj_(free_obj)
58  , cond_(&lock_)
59  {
60  array_ = (T**) malloc(sizeof(T*) * capacity_);
61  }
62 
64  {
65  clear(free_obj_);
66  free(array_);
67  }
68 
69  /**
70  * 清理消息队列中未被消费的消息对象
71  * @param free_obj {bool} 释放调用 delete 方法删除消息对象
72  */
73  void clear(bool free_obj = false)
74  {
75  if (free_obj) {
76  for (size_t i = off_curr_; i < off_next_; i++) {
77  delete array_[i];
78  }
79  }
80  }
81 
82  /**
83  * 发送消息对象,本过程是添加完对象后先解锁后通知
84  * @param t {T*} 非空消息对象
85  * @param notify_first {bool} 如果为 true,则先通知后解锁,否则先解锁
86  * 后通知,注意二者的区别
87  * @return {bool}
88  */
89  bool push(T* t, bool notify_first = false)
90  {
91  if (lock_.lock() == false) {
92  abort();
93  }
94 
95  if (off_next_ == capacity_) {
96  if (off_curr_ >= 10000) {
97 #if 1
98  size_t n = 0;
99  for (size_t i = off_curr_; i < off_next_; i++) {
100  array_[n++] = array_[i];
101  }
102 #else
103  memmove(array_, array_ + off_curr_,
104  (off_next_ - off_curr_) * sizeof(T*));
105 #endif
106 
107  off_next_ -= off_curr_;
108  off_curr_ = 0;
109  } else {
110  capacity_ += 10000;
111  array_ = (T**) realloc(array_, sizeof(T*) * capacity_);
112  }
113  }
114  array_[off_next_++] = t;
115 
116  if (notify_first) {
117  if (cond_.notify() == false) {
118  abort();
119  }
120  if (lock_.unlock() == false) {
121  abort();
122  }
123  } else {
124  if (lock_.unlock() == false) {
125  abort();
126  }
127  if (cond_.notify() == false) {
128  abort();
129  }
130  }
131 
132  return true;
133  }
134 
135  /**
136  * 接收消息对象
137  * @param wait_ms {int} >= 0 时设置等待超时时间(毫秒级别),
138  * 否则永远等待直到读到消息对象或出错
139  * @param found {bool*} 非空时用来存放是否获得了一个消息对象,主要用在
140  * 当允许传递空对象时的检查
141  * @return {T*} 非 NULL 表示获得一个消息对象,返回 NULL 时得需要做进一
142  * 步检查,生产者如果 push 了一个空对象(NULL),则消费者也会获得 NULL,
143  * 但此时仍然认为获得了一个消息对象,只不过为空对象;如果 wait_ms 参数
144  * 为 -1 时返回 NULL 依然认为获得了一个空消息对象,如果 wait_ms 大于
145  * 等于 0 时返回 NULL,则应该检查 found 参数的值为 true 还是 false 来
146  * 判断是否获得了一个空消息对象
147  */
148  T* pop(int wait_ms = -1, bool* found = NULL)
149  {
150  long long n = ((long long) wait_ms) * 1000;
151  bool found_flag;
152 
153  if (lock_.lock() == false) {
154  abort();
155  }
156  while (true) {
157  T* t = peek(found_flag);
158  if (found_flag) {
159  if (lock_.unlock() == false) {
160  abort();
161  }
162  if (found) {
163  *found = found_flag;
164  }
165  return t;
166  }
167 
168  // 注意调用顺序,必须先调用 wait 再判断 wait_ms
169  waiters_++;
170  if (!cond_.wait(n, true) && wait_ms >= 0) {
171  waiters_--;
172  if (lock_.unlock() == false) {
173  abort();
174  }
175  if (found) {
176  *found = false;
177  }
178  return NULL;
179  }
180  waiters_--;
181  }
182  }
183 
184  /**
185  * 返回当前存在于消息队列中的消息数量
186  * @return {size_t}
187  */
188  size_t size(void) const
189  {
190  return off_next_ - off_curr_;
191  }
192 
193 public:
194  void lock(void)
195  {
196  if (lock_.lock() == false) {
197  abort();
198  }
199  }
200 
201  void unlock(void)
202  {
203  if (lock_.unlock() == false) {
204  abort();
205  }
206  }
207 
208 private:
209  T** array_;
210  size_t capacity_;
211  size_t off_curr_;
212  size_t off_next_;
213  size_t waiters_;
214  bool free_obj_;
215  thread_mutex lock_;
216  thread_cond cond_;
217 
218  T* peek(bool& found_flag)
219  {
220  if (off_curr_ == off_next_) {
221  found_flag = false;
222  if (off_curr_ > 0) {
223  off_curr_ = off_next_ = 0;
224  }
225  return NULL;
226  }
227 
228  found_flag = true;
229  T* t = array_[off_curr_++];
230  return t;
231  }
232 };
233 
234 } // namespace acl
void lock(void)
Definition: tbox_array.hpp:194
void clear(bool free_obj=false)
Definition: tbox_array.hpp:73
tbox_array(bool free_obj=true)
Definition: tbox_array.hpp:52
size_t size(void) const
Definition: tbox_array.hpp:188
void unlock(void)
Definition: tbox_array.hpp:201
bool wait(long long microseconds=-1, bool locked=false)
bool push(T *t, bool notify_first=false)
Definition: tbox_array.hpp:89
bool unlock(void)
T * pop(int wait_ms=-1, bool *found=NULL)
Definition: tbox_array.hpp:148
bool notify(void)