123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514 |
- /*
- * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
- * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- */
- #ifndef __PJPP_PROACTOR_HPP__
- #define __PJPP_PROACTOR_HPP__
- #include <pj/ioqueue.h>
- #include <pj++/pool.hpp>
- #include <pj++/sock.hpp>
- #include <pj++/timer.hpp>
- #include <pj/errno.h>
- class Pj_Proactor;
- class Pj_Event_Handler;
- //////////////////////////////////////////////////////////////////////////////
- // Asynchronous operation key.
- //
- // Applications may inheric this class to put their application
- // specific data.
- //
- class Pj_Async_Op : public pj_ioqueue_op_key_t
- {
- public:
- //
- // Construct with null handler.
- // App must call set_handler() before use.
- //
- Pj_Async_Op()
- : handler_(NULL)
- {
- pj_ioqueue_op_key_init(this, sizeof(*this));
- }
- //
- // Constructor.
- //
- explicit Pj_Async_Op(Pj_Event_Handler *handler)
- : handler_(handler)
- {
- pj_ioqueue_op_key_init(this, sizeof(*this));
- }
- //
- // Set handler.
- //
- void set_handler(Pj_Event_Handler *handler)
- {
- handler_ = handler;
- }
- //
- // Check whether operation is still pending for this key.
- //
- bool is_pending();
- //
- // Cancel the operation.
- //
- bool cancel(pj_ssize_t bytes_status=-PJ_ECANCELLED);
- protected:
- Pj_Event_Handler *handler_;
- };
- //////////////////////////////////////////////////////////////////////////////
- // Event handler.
- //
- // Applications should inherit this class to receive various event
- // notifications.
- //
- // Applications should implement get_socket_handle().
- //
- class Pj_Event_Handler : public Pj_Object
- {
- friend class Pj_Proactor;
- public:
- //
- // Default constructor.
- //
- Pj_Event_Handler()
- : key_(NULL)
- {
- pj_memset(&timer_, 0, sizeof(timer_));
- timer_.user_data = this;
- timer_.cb = &timer_callback;
- }
-
- //
- // Destroy.
- //
- virtual ~Pj_Event_Handler()
- {
- unregister();
- }
- //
- // Unregister this handler from the ioqueue.
- //
- void unregister()
- {
- if (key_) {
- pj_ioqueue_unregister(key_);
- key_ = NULL;
- }
- }
- //
- // Get socket handle associated with this.
- //
- virtual pj_sock_t get_socket_handle()
- {
- return PJ_INVALID_SOCKET;
- }
- //
- // Start async receive.
- //
- pj_status_t recv( Pj_Async_Op *op_key,
- void *buf, pj_ssize_t *len,
- unsigned flags)
- {
- return pj_ioqueue_recv( key_, op_key,
- buf, len, flags);
- }
- //
- // Start async recvfrom()
- //
- pj_status_t recvfrom( Pj_Async_Op *op_key,
- void *buf, pj_ssize_t *len, unsigned flags,
- Pj_Inet_Addr *addr)
- {
- addr->addrlen_ = sizeof(Pj_Inet_Addr);
- return pj_ioqueue_recvfrom( key_, op_key, buf, len, flags,
- addr, &addr->addrlen_ );
- }
- //
- // Start async send()
- //
- pj_status_t send( Pj_Async_Op *op_key,
- const void *data, pj_ssize_t *len,
- unsigned flags)
- {
- return pj_ioqueue_send( key_, op_key, data, len, flags);
- }
- //
- // Start async sendto()
- //
- pj_status_t sendto( Pj_Async_Op *op_key,
- const void *data, pj_ssize_t *len, unsigned flags,
- const Pj_Inet_Addr &addr)
- {
- return pj_ioqueue_sendto(key_, op_key, data, len, flags,
- &addr, sizeof(addr));
- }
- #if PJ_HAS_TCP
- //
- // Start async connect()
- //
- pj_status_t connect(const Pj_Inet_Addr &addr)
- {
- return pj_ioqueue_connect(key_, &addr, sizeof(addr));
- }
- //
- // Start async accept().
- //
- pj_status_t accept( Pj_Async_Op *op_key,
- Pj_Socket *sock,
- Pj_Inet_Addr *local = NULL,
- Pj_Inet_Addr *remote = NULL)
- {
- int *addrlen = local ? &local->addrlen_ : NULL;
- return pj_ioqueue_accept( key_, op_key, &sock->sock_,
- local, remote, addrlen );
- }
- #endif
- protected:
- //////////////////
- // Overridables
- //////////////////
- //
- // Timeout callback.
- //
- virtual void on_timeout(int)
- {
- }
- //
- // On read complete callback.
- //
- virtual void on_read_complete( Pj_Async_Op*, pj_ssize_t)
- {
- }
- //
- // On write complete callback.
- //
- virtual void on_write_complete( Pj_Async_Op *, pj_ssize_t)
- {
- }
- #if PJ_HAS_TCP
- //
- // On connect complete callback.
- //
- virtual void on_connect_complete(pj_status_t)
- {
- }
- //
- // On new connection callback.
- //
- virtual void on_accept_complete( Pj_Async_Op*, pj_sock_t, pj_status_t)
- {
- }
- #endif
- private:
- pj_ioqueue_key_t *key_;
- pj_timer_entry timer_;
- friend class Pj_Proactor;
- friend class Pj_Async_Op;
- //
- // Static timer callback.
- //
- static void timer_callback( pj_timer_heap_t*,
- struct pj_timer_entry *entry)
- {
- Pj_Event_Handler *handler =
- (Pj_Event_Handler*) entry->user_data;
- handler->on_timeout(entry->id);
- }
- };
- inline bool Pj_Async_Op::is_pending()
- {
- return pj_ioqueue_is_pending(handler_->key_, this) != 0;
- }
- inline bool Pj_Async_Op::cancel(pj_ssize_t bytes_status)
- {
- return pj_ioqueue_post_completion(handler_->key_, this,
- bytes_status) == PJ_SUCCESS;
- }
- //////////////////////////////////////////////////////////////////////////////
- // Proactor
- //
- class Pj_Proactor : public Pj_Object
- {
- public:
- //
- // Default constructor, initializes to NULL.
- //
- Pj_Proactor()
- : ioq_(NULL), th_(NULL)
- {
- cb_.on_read_complete = &read_complete_cb;
- cb_.on_write_complete = &write_complete_cb;
- cb_.on_accept_complete = &accept_complete_cb;
- cb_.on_connect_complete = &connect_complete_cb;
- }
- //
- // Construct proactor.
- //
- Pj_Proactor( Pj_Pool *pool, pj_size_t max_fd,
- pj_size_t max_timer_entries )
- : ioq_(NULL), th_(NULL)
- {
- cb_.on_read_complete = &read_complete_cb;
- cb_.on_write_complete = &write_complete_cb;
- cb_.on_accept_complete = &accept_complete_cb;
- cb_.on_connect_complete = &connect_complete_cb;
- create(pool, max_fd, max_timer_entries);
- }
- //
- // Destructor.
- //
- ~Pj_Proactor()
- {
- destroy();
- }
- //
- // Create proactor.
- //
- pj_status_t create( Pj_Pool *pool, pj_size_t max_fd,
- pj_size_t timer_entry_count)
- {
- pj_status_t status;
- destroy();
- status = pj_ioqueue_create(pool->pool_(), max_fd, &ioq_);
- if (status != PJ_SUCCESS)
- return status;
-
- status = pj_timer_heap_create(pool->pool_(),
- timer_entry_count, &th_);
- if (status != PJ_SUCCESS) {
- pj_ioqueue_destroy(ioq_);
- ioq_ = NULL;
- return NULL;
- }
-
- return status;
- }
- //
- // Destroy proactor.
- //
- void destroy()
- {
- if (ioq_) {
- pj_ioqueue_destroy(ioq_);
- ioq_ = NULL;
- }
- if (th_) {
- pj_timer_heap_destroy(th_);
- th_ = NULL;
- }
- }
- //
- // Register handler.
- // This will call handler->get_socket_handle()
- //
- pj_status_t register_socket_handler(Pj_Pool *pool,
- Pj_Event_Handler *handler)
- {
- return pj_ioqueue_register_sock( pool->pool_(), ioq_,
- handler->get_socket_handle(),
- handler, &cb_, &handler->key_ );
- }
- //
- // Unregister handler.
- //
- static void unregister_handler(Pj_Event_Handler *handler)
- {
- if (handler->key_) {
- pj_ioqueue_unregister( handler->key_ );
- handler->key_ = NULL;
- }
- }
- //
- // Scheduler timer.
- //
- bool schedule_timer( Pj_Event_Handler *handler,
- const Pj_Time_Val &delay,
- int id=-1)
- {
- return schedule_timer(th_, handler, delay, id);
- }
- //
- // Cancel timer.
- //
- bool cancel_timer(Pj_Event_Handler *handler)
- {
- return pj_timer_heap_cancel(th_, &handler->timer_) == 1;
- }
- //
- // Handle events.
- //
- int handle_events(Pj_Time_Val *max_timeout)
- {
- Pj_Time_Val timeout(0, 0);
- int timer_count;
- timer_count = pj_timer_heap_poll( th_, &timeout );
- if (timeout.get_sec() < 0)
- timeout.sec = PJ_MAXINT32;
- /* If caller specifies maximum time to wait, then compare the value
- * with the timeout to wait from timer, and use the minimum value.
- */
- if (max_timeout && timeout >= *max_timeout) {
- timeout = *max_timeout;
- }
- /* Poll events in ioqueue. */
- int ioqueue_count;
- ioqueue_count = pj_ioqueue_poll(ioq_, &timeout);
- if (ioqueue_count < 0)
- return ioqueue_count;
- return ioqueue_count + timer_count;
- }
- //
- // Get the internal ioqueue object.
- //
- pj_ioqueue_t *get_io_queue()
- {
- return ioq_;
- }
- //
- // Get the internal timer heap object.
- //
- pj_timer_heap_t *get_timer_heap()
- {
- return th_;
- }
- private:
- pj_ioqueue_t *ioq_;
- pj_timer_heap_t *th_;
- pj_ioqueue_callback cb_;
- static bool schedule_timer( pj_timer_heap_t *timer,
- Pj_Event_Handler *handler,
- const Pj_Time_Val &delay,
- int id=-1)
- {
- handler->timer_.id = id;
- return pj_timer_heap_schedule(timer, &handler->timer_, &delay) == 0;
- }
- //
- // Static read completion callback.
- //
- static void read_complete_cb( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_ssize_t bytes_read)
- {
- Pj_Event_Handler *handler =
- (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
- handler->on_read_complete((Pj_Async_Op*)op_key, bytes_read);
- }
- //
- // Static write completion callback.
- //
- static void write_complete_cb(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_ssize_t bytes_sent)
- {
- Pj_Event_Handler *handler =
- (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
- handler->on_write_complete((Pj_Async_Op*)op_key, bytes_sent);
- }
- //
- // Static accept completion callback.
- //
- static void accept_complete_cb(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_sock_t new_sock,
- pj_status_t status)
- {
- Pj_Event_Handler *handler =
- (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
- handler->on_accept_complete((Pj_Async_Op*)op_key, new_sock, status);
- }
- //
- // Static connect completion callback.
- //
- static void connect_complete_cb(pj_ioqueue_key_t *key,
- pj_status_t status)
- {
- Pj_Event_Handler *handler =
- (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
- handler->on_connect_complete(status);
- }
- };
- #endif /* __PJPP_PROACTOR_HPP__ */
|