proactor.hpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. /*
  2. * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
  3. * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation; either version 2 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program; if not, write to the Free Software
  17. * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  18. */
  19. #ifndef __PJPP_PROACTOR_HPP__
  20. #define __PJPP_PROACTOR_HPP__
  21. #include <pj/ioqueue.h>
  22. #include <pj++/pool.hpp>
  23. #include <pj++/sock.hpp>
  24. #include <pj++/timer.hpp>
  25. #include <pj/errno.h>
  26. class Pj_Proactor;
  27. class Pj_Event_Handler;
  28. //////////////////////////////////////////////////////////////////////////////
  29. // Asynchronous operation key.
  30. //
  31. // Applications may inheric this class to put their application
  32. // specific data.
  33. //
  34. class Pj_Async_Op : public pj_ioqueue_op_key_t
  35. {
  36. public:
  37. //
  38. // Construct with null handler.
  39. // App must call set_handler() before use.
  40. //
  41. Pj_Async_Op()
  42. : handler_(NULL)
  43. {
  44. pj_ioqueue_op_key_init(this, sizeof(*this));
  45. }
  46. //
  47. // Constructor.
  48. //
  49. explicit Pj_Async_Op(Pj_Event_Handler *handler)
  50. : handler_(handler)
  51. {
  52. pj_ioqueue_op_key_init(this, sizeof(*this));
  53. }
  54. //
  55. // Set handler.
  56. //
  57. void set_handler(Pj_Event_Handler *handler)
  58. {
  59. handler_ = handler;
  60. }
  61. //
  62. // Check whether operation is still pending for this key.
  63. //
  64. bool is_pending();
  65. //
  66. // Cancel the operation.
  67. //
  68. bool cancel(pj_ssize_t bytes_status=-PJ_ECANCELLED);
  69. protected:
  70. Pj_Event_Handler *handler_;
  71. };
  72. //////////////////////////////////////////////////////////////////////////////
  73. // Event handler.
  74. //
  75. // Applications should inherit this class to receive various event
  76. // notifications.
  77. //
  78. // Applications should implement get_socket_handle().
  79. //
  80. class Pj_Event_Handler : public Pj_Object
  81. {
  82. friend class Pj_Proactor;
  83. public:
  84. //
  85. // Default constructor.
  86. //
  87. Pj_Event_Handler()
  88. : key_(NULL)
  89. {
  90. pj_memset(&timer_, 0, sizeof(timer_));
  91. timer_.user_data = this;
  92. timer_.cb = &timer_callback;
  93. }
  94. //
  95. // Destroy.
  96. //
  97. virtual ~Pj_Event_Handler()
  98. {
  99. unregister();
  100. }
  101. //
  102. // Unregister this handler from the ioqueue.
  103. //
  104. void unregister()
  105. {
  106. if (key_) {
  107. pj_ioqueue_unregister(key_);
  108. key_ = NULL;
  109. }
  110. }
  111. //
  112. // Get socket handle associated with this.
  113. //
  114. virtual pj_sock_t get_socket_handle()
  115. {
  116. return PJ_INVALID_SOCKET;
  117. }
  118. //
  119. // Start async receive.
  120. //
  121. pj_status_t recv( Pj_Async_Op *op_key,
  122. void *buf, pj_ssize_t *len,
  123. unsigned flags)
  124. {
  125. return pj_ioqueue_recv( key_, op_key,
  126. buf, len, flags);
  127. }
  128. //
  129. // Start async recvfrom()
  130. //
  131. pj_status_t recvfrom( Pj_Async_Op *op_key,
  132. void *buf, pj_ssize_t *len, unsigned flags,
  133. Pj_Inet_Addr *addr)
  134. {
  135. addr->addrlen_ = sizeof(Pj_Inet_Addr);
  136. return pj_ioqueue_recvfrom( key_, op_key, buf, len, flags,
  137. addr, &addr->addrlen_ );
  138. }
  139. //
  140. // Start async send()
  141. //
  142. pj_status_t send( Pj_Async_Op *op_key,
  143. const void *data, pj_ssize_t *len,
  144. unsigned flags)
  145. {
  146. return pj_ioqueue_send( key_, op_key, data, len, flags);
  147. }
  148. //
  149. // Start async sendto()
  150. //
  151. pj_status_t sendto( Pj_Async_Op *op_key,
  152. const void *data, pj_ssize_t *len, unsigned flags,
  153. const Pj_Inet_Addr &addr)
  154. {
  155. return pj_ioqueue_sendto(key_, op_key, data, len, flags,
  156. &addr, sizeof(addr));
  157. }
  158. #if PJ_HAS_TCP
  159. //
  160. // Start async connect()
  161. //
  162. pj_status_t connect(const Pj_Inet_Addr &addr)
  163. {
  164. return pj_ioqueue_connect(key_, &addr, sizeof(addr));
  165. }
  166. //
  167. // Start async accept().
  168. //
  169. pj_status_t accept( Pj_Async_Op *op_key,
  170. Pj_Socket *sock,
  171. Pj_Inet_Addr *local = NULL,
  172. Pj_Inet_Addr *remote = NULL)
  173. {
  174. int *addrlen = local ? &local->addrlen_ : NULL;
  175. return pj_ioqueue_accept( key_, op_key, &sock->sock_,
  176. local, remote, addrlen );
  177. }
  178. #endif
  179. protected:
  180. //////////////////
  181. // Overridables
  182. //////////////////
  183. //
  184. // Timeout callback.
  185. //
  186. virtual void on_timeout(int)
  187. {
  188. }
  189. //
  190. // On read complete callback.
  191. //
  192. virtual void on_read_complete( Pj_Async_Op*, pj_ssize_t)
  193. {
  194. }
  195. //
  196. // On write complete callback.
  197. //
  198. virtual void on_write_complete( Pj_Async_Op *, pj_ssize_t)
  199. {
  200. }
  201. #if PJ_HAS_TCP
  202. //
  203. // On connect complete callback.
  204. //
  205. virtual void on_connect_complete(pj_status_t)
  206. {
  207. }
  208. //
  209. // On new connection callback.
  210. //
  211. virtual void on_accept_complete( Pj_Async_Op*, pj_sock_t, pj_status_t)
  212. {
  213. }
  214. #endif
  215. private:
  216. pj_ioqueue_key_t *key_;
  217. pj_timer_entry timer_;
  218. friend class Pj_Proactor;
  219. friend class Pj_Async_Op;
  220. //
  221. // Static timer callback.
  222. //
  223. static void timer_callback( pj_timer_heap_t*,
  224. struct pj_timer_entry *entry)
  225. {
  226. Pj_Event_Handler *handler =
  227. (Pj_Event_Handler*) entry->user_data;
  228. handler->on_timeout(entry->id);
  229. }
  230. };
  231. inline bool Pj_Async_Op::is_pending()
  232. {
  233. return pj_ioqueue_is_pending(handler_->key_, this) != 0;
  234. }
  235. inline bool Pj_Async_Op::cancel(pj_ssize_t bytes_status)
  236. {
  237. return pj_ioqueue_post_completion(handler_->key_, this,
  238. bytes_status) == PJ_SUCCESS;
  239. }
  240. //////////////////////////////////////////////////////////////////////////////
  241. // Proactor
  242. //
  243. class Pj_Proactor : public Pj_Object
  244. {
  245. public:
  246. //
  247. // Default constructor, initializes to NULL.
  248. //
  249. Pj_Proactor()
  250. : ioq_(NULL), th_(NULL)
  251. {
  252. cb_.on_read_complete = &read_complete_cb;
  253. cb_.on_write_complete = &write_complete_cb;
  254. cb_.on_accept_complete = &accept_complete_cb;
  255. cb_.on_connect_complete = &connect_complete_cb;
  256. }
  257. //
  258. // Construct proactor.
  259. //
  260. Pj_Proactor( Pj_Pool *pool, pj_size_t max_fd,
  261. pj_size_t max_timer_entries )
  262. : ioq_(NULL), th_(NULL)
  263. {
  264. cb_.on_read_complete = &read_complete_cb;
  265. cb_.on_write_complete = &write_complete_cb;
  266. cb_.on_accept_complete = &accept_complete_cb;
  267. cb_.on_connect_complete = &connect_complete_cb;
  268. create(pool, max_fd, max_timer_entries);
  269. }
  270. //
  271. // Destructor.
  272. //
  273. ~Pj_Proactor()
  274. {
  275. destroy();
  276. }
  277. //
  278. // Create proactor.
  279. //
  280. pj_status_t create( Pj_Pool *pool, pj_size_t max_fd,
  281. pj_size_t timer_entry_count)
  282. {
  283. pj_status_t status;
  284. destroy();
  285. status = pj_ioqueue_create(pool->pool_(), max_fd, &ioq_);
  286. if (status != PJ_SUCCESS)
  287. return status;
  288. status = pj_timer_heap_create(pool->pool_(),
  289. timer_entry_count, &th_);
  290. if (status != PJ_SUCCESS) {
  291. pj_ioqueue_destroy(ioq_);
  292. ioq_ = NULL;
  293. return NULL;
  294. }
  295. return status;
  296. }
  297. //
  298. // Destroy proactor.
  299. //
  300. void destroy()
  301. {
  302. if (ioq_) {
  303. pj_ioqueue_destroy(ioq_);
  304. ioq_ = NULL;
  305. }
  306. if (th_) {
  307. pj_timer_heap_destroy(th_);
  308. th_ = NULL;
  309. }
  310. }
  311. //
  312. // Register handler.
  313. // This will call handler->get_socket_handle()
  314. //
  315. pj_status_t register_socket_handler(Pj_Pool *pool,
  316. Pj_Event_Handler *handler)
  317. {
  318. return pj_ioqueue_register_sock( pool->pool_(), ioq_,
  319. handler->get_socket_handle(),
  320. handler, &cb_, &handler->key_ );
  321. }
  322. //
  323. // Unregister handler.
  324. //
  325. static void unregister_handler(Pj_Event_Handler *handler)
  326. {
  327. if (handler->key_) {
  328. pj_ioqueue_unregister( handler->key_ );
  329. handler->key_ = NULL;
  330. }
  331. }
  332. //
  333. // Scheduler timer.
  334. //
  335. bool schedule_timer( Pj_Event_Handler *handler,
  336. const Pj_Time_Val &delay,
  337. int id=-1)
  338. {
  339. return schedule_timer(th_, handler, delay, id);
  340. }
  341. //
  342. // Cancel timer.
  343. //
  344. bool cancel_timer(Pj_Event_Handler *handler)
  345. {
  346. return pj_timer_heap_cancel(th_, &handler->timer_) == 1;
  347. }
  348. //
  349. // Handle events.
  350. //
  351. int handle_events(Pj_Time_Val *max_timeout)
  352. {
  353. Pj_Time_Val timeout(0, 0);
  354. int timer_count;
  355. timer_count = pj_timer_heap_poll( th_, &timeout );
  356. if (timeout.get_sec() < 0)
  357. timeout.sec = PJ_MAXINT32;
  358. /* If caller specifies maximum time to wait, then compare the value
  359. * with the timeout to wait from timer, and use the minimum value.
  360. */
  361. if (max_timeout && timeout >= *max_timeout) {
  362. timeout = *max_timeout;
  363. }
  364. /* Poll events in ioqueue. */
  365. int ioqueue_count;
  366. ioqueue_count = pj_ioqueue_poll(ioq_, &timeout);
  367. if (ioqueue_count < 0)
  368. return ioqueue_count;
  369. return ioqueue_count + timer_count;
  370. }
  371. //
  372. // Get the internal ioqueue object.
  373. //
  374. pj_ioqueue_t *get_io_queue()
  375. {
  376. return ioq_;
  377. }
  378. //
  379. // Get the internal timer heap object.
  380. //
  381. pj_timer_heap_t *get_timer_heap()
  382. {
  383. return th_;
  384. }
  385. private:
  386. pj_ioqueue_t *ioq_;
  387. pj_timer_heap_t *th_;
  388. pj_ioqueue_callback cb_;
  389. static bool schedule_timer( pj_timer_heap_t *timer,
  390. Pj_Event_Handler *handler,
  391. const Pj_Time_Val &delay,
  392. int id=-1)
  393. {
  394. handler->timer_.id = id;
  395. return pj_timer_heap_schedule(timer, &handler->timer_, &delay) == 0;
  396. }
  397. //
  398. // Static read completion callback.
  399. //
  400. static void read_complete_cb( pj_ioqueue_key_t *key,
  401. pj_ioqueue_op_key_t *op_key,
  402. pj_ssize_t bytes_read)
  403. {
  404. Pj_Event_Handler *handler =
  405. (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
  406. handler->on_read_complete((Pj_Async_Op*)op_key, bytes_read);
  407. }
  408. //
  409. // Static write completion callback.
  410. //
  411. static void write_complete_cb(pj_ioqueue_key_t *key,
  412. pj_ioqueue_op_key_t *op_key,
  413. pj_ssize_t bytes_sent)
  414. {
  415. Pj_Event_Handler *handler =
  416. (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
  417. handler->on_write_complete((Pj_Async_Op*)op_key, bytes_sent);
  418. }
  419. //
  420. // Static accept completion callback.
  421. //
  422. static void accept_complete_cb(pj_ioqueue_key_t *key,
  423. pj_ioqueue_op_key_t *op_key,
  424. pj_sock_t new_sock,
  425. pj_status_t status)
  426. {
  427. Pj_Event_Handler *handler =
  428. (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
  429. handler->on_accept_complete((Pj_Async_Op*)op_key, new_sock, status);
  430. }
  431. //
  432. // Static connect completion callback.
  433. //
  434. static void connect_complete_cb(pj_ioqueue_key_t *key,
  435. pj_status_t status)
  436. {
  437. Pj_Event_Handler *handler =
  438. (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
  439. handler->on_connect_complete(status);
  440. }
  441. };
  442. #endif /* __PJPP_PROACTOR_HPP__ */