/* 
 * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 */
#ifndef __PJPP_PROACTOR_HPP__
#define __PJPP_PROACTOR_HPP__

#include <pj/ioqueue.h>
#include <pj++/pool.hpp>
#include <pj++/sock.hpp>
#include <pj++/timer.hpp>
#include <pj/errno.h>

class Pj_Proactor;
class Pj_Event_Handler;


//////////////////////////////////////////////////////////////////////////////
// Asynchronous operation key.
//
// Applications may inheric this class to put their application
// specific data.
//
class Pj_Async_Op : public pj_ioqueue_op_key_t
{
public:
    //
    // Construct with null handler.
    // App must call set_handler() before use.
    //
    Pj_Async_Op()
        : handler_(NULL)
    {
        pj_ioqueue_op_key_init(this, sizeof(*this));
    }

    //
    // Constructor.
    //
    explicit Pj_Async_Op(Pj_Event_Handler *handler)
        : handler_(handler)
    {
        pj_ioqueue_op_key_init(this, sizeof(*this));
    }

    //
    // Set handler.
    //
    void set_handler(Pj_Event_Handler *handler)
    {
        handler_ = handler;
    }

    //
    // Check whether operation is still pending for this key.
    //
    bool is_pending();

    //
    // Cancel the operation.
    //
    bool cancel(pj_ssize_t bytes_status=-PJ_ECANCELLED);

protected:
    Pj_Event_Handler *handler_;
};


//////////////////////////////////////////////////////////////////////////////
// Event handler.
//
// Applications should inherit this class to receive various event
// notifications.
//
// Applications should implement get_socket_handle().
//
class Pj_Event_Handler : public Pj_Object
{
    friend class Pj_Proactor;
public:
    //
    // Default constructor.
    //
    Pj_Event_Handler()
        : key_(NULL)
    {
        pj_memset(&timer_, 0, sizeof(timer_));
        timer_.user_data = this;
        timer_.cb = &timer_callback;
    }
    
    //
    // Destroy.
    //
    virtual ~Pj_Event_Handler()
    {
        unregister();
    }

    //
    // Unregister this handler from the ioqueue.
    //
    void unregister()
    {
        if (key_) {
            pj_ioqueue_unregister(key_);
            key_ = NULL;
        }
    }

    //
    // Get socket handle associated with this.
    //
    virtual pj_sock_t get_socket_handle()
    {
        return PJ_INVALID_SOCKET;
    }

    //
    // Start async receive.
    //
    pj_status_t recv( Pj_Async_Op *op_key, 
                      void *buf, pj_ssize_t *len, 
                      unsigned flags)
    {
        return pj_ioqueue_recv( key_, op_key,
                                buf, len, flags);
    }

    //
    // Start async recvfrom()
    //
    pj_status_t recvfrom( Pj_Async_Op *op_key, 
                          void *buf, pj_ssize_t *len, unsigned flags,
                          Pj_Inet_Addr *addr)
    {
        addr->addrlen_ = sizeof(Pj_Inet_Addr);
        return pj_ioqueue_recvfrom( key_, op_key, buf, len, flags,
                                    addr, &addr->addrlen_ );
    }

    //
    // Start async send()
    //
    pj_status_t send( Pj_Async_Op *op_key, 
                      const void *data, pj_ssize_t *len, 
                      unsigned flags)
    {
        return pj_ioqueue_send( key_, op_key, data, len, flags);
    }

    //
    // Start async sendto()
    //
    pj_status_t sendto( Pj_Async_Op *op_key,
                        const void *data, pj_ssize_t *len, unsigned flags,
                        const Pj_Inet_Addr &addr)
    {
        return pj_ioqueue_sendto(key_, op_key, data, len, flags,
                                 &addr, sizeof(addr));
    }

#if PJ_HAS_TCP
    //
    // Start async connect()
    //
    pj_status_t connect(const Pj_Inet_Addr &addr)
    {
        return pj_ioqueue_connect(key_, &addr, sizeof(addr));
    }

    //
    // Start async accept().
    //
    pj_status_t accept( Pj_Async_Op *op_key,
                        Pj_Socket *sock, 
                        Pj_Inet_Addr *local = NULL, 
                        Pj_Inet_Addr *remote = NULL)
    {
        int *addrlen = local ? &local->addrlen_ : NULL;
        return pj_ioqueue_accept( key_, op_key, &sock->sock_,
                                  local, remote, addrlen );
    }

#endif

protected:
    //////////////////
    // Overridables
    //////////////////

    //
    // Timeout callback.
    //
    virtual void on_timeout(int) 
    {
    }

    //
    // On read complete callback.
    //
    virtual void on_read_complete( Pj_Async_Op*, pj_ssize_t) 
    {
    }

    //
    // On write complete callback.
    //
    virtual void on_write_complete( Pj_Async_Op *, pj_ssize_t) 
    {
    }

#if PJ_HAS_TCP
    //
    // On connect complete callback.
    //
    virtual void on_connect_complete(pj_status_t) 
    {
    }

    //
    // On new connection callback.
    //
    virtual void on_accept_complete( Pj_Async_Op*, pj_sock_t, pj_status_t) 
    {
    }

#endif


private:
    pj_ioqueue_key_t *key_;
    pj_timer_entry    timer_;

    friend class Pj_Proactor;
    friend class Pj_Async_Op;

    //
    // Static timer callback.
    //
    static void timer_callback( pj_timer_heap_t*, 
                                struct pj_timer_entry *entry)
    {
        Pj_Event_Handler *handler = 
            (Pj_Event_Handler*) entry->user_data;

        handler->on_timeout(entry->id);
    }
};

inline bool Pj_Async_Op::is_pending()
{
    return pj_ioqueue_is_pending(handler_->key_, this) != 0;
}

inline bool Pj_Async_Op::cancel(pj_ssize_t bytes_status)
{
    return pj_ioqueue_post_completion(handler_->key_, this, 
                                      bytes_status) == PJ_SUCCESS;
}

//////////////////////////////////////////////////////////////////////////////
// Proactor
//
class Pj_Proactor : public Pj_Object
{
public:
    //
    // Default constructor, initializes to NULL.
    //
    Pj_Proactor()
        : ioq_(NULL), th_(NULL)
    {
        cb_.on_read_complete    = &read_complete_cb;
        cb_.on_write_complete   = &write_complete_cb;
        cb_.on_accept_complete  = &accept_complete_cb;
        cb_.on_connect_complete = &connect_complete_cb;
    }

    //
    // Construct proactor.
    //
    Pj_Proactor( Pj_Pool *pool, pj_size_t max_fd,
                 pj_size_t max_timer_entries )
    : ioq_(NULL), th_(NULL)
    {
        cb_.on_read_complete    = &read_complete_cb;
        cb_.on_write_complete   = &write_complete_cb;
        cb_.on_accept_complete  = &accept_complete_cb;
        cb_.on_connect_complete = &connect_complete_cb;

        create(pool, max_fd, max_timer_entries);
    }

    //
    // Destructor.
    //
    ~Pj_Proactor()
    {
        destroy();
    }

    //
    // Create proactor.
    //
    pj_status_t create( Pj_Pool *pool, pj_size_t max_fd, 
                        pj_size_t timer_entry_count)
    {
        pj_status_t status;

        destroy();

        status = pj_ioqueue_create(pool->pool_(), max_fd, &ioq_);
        if (status != PJ_SUCCESS) 
            return status;
        
        status = pj_timer_heap_create(pool->pool_(), 
                                      timer_entry_count, &th_);
        if (status != PJ_SUCCESS) {
            pj_ioqueue_destroy(ioq_);
            ioq_ = NULL;
            return NULL;
        }
        
        return status;
    }

    //
    // Destroy proactor.
    //
    void destroy()
    {
        if (ioq_) {
            pj_ioqueue_destroy(ioq_);
            ioq_ = NULL;
        }
        if (th_) {
            pj_timer_heap_destroy(th_);
            th_ = NULL;
        }
    }

    //
    // Register handler.
    // This will call handler->get_socket_handle()
    //
    pj_status_t register_socket_handler(Pj_Pool *pool, 
                                        Pj_Event_Handler *handler)
    {
        return   pj_ioqueue_register_sock( pool->pool_(), ioq_,
                                           handler->get_socket_handle(),
                                           handler, &cb_, &handler->key_ );
    }

    //
    // Unregister handler.
    //
    static void unregister_handler(Pj_Event_Handler *handler)
    {
        if (handler->key_) {
            pj_ioqueue_unregister( handler->key_ );
            handler->key_ = NULL;
        }
    }

    //
    // Scheduler timer.
    //
    bool schedule_timer( Pj_Event_Handler *handler, 
                         const Pj_Time_Val &delay, 
                         int id=-1)
    {
        return schedule_timer(th_, handler, delay, id);
    }

    //
    // Cancel timer.
    //
    bool cancel_timer(Pj_Event_Handler *handler)
    {
        return pj_timer_heap_cancel(th_, &handler->timer_) == 1;
    }

    //
    // Handle events.
    //
    int handle_events(Pj_Time_Val *max_timeout)
    {
        Pj_Time_Val timeout(0, 0);
        int timer_count;

        timer_count = pj_timer_heap_poll( th_, &timeout );

        if (timeout.get_sec() < 0) 
            timeout.sec = PJ_MAXINT32;

        /* If caller specifies maximum time to wait, then compare the value 
         * with the timeout to wait from timer, and use the minimum value.
         */
        if (max_timeout && timeout >= *max_timeout) {
            timeout = *max_timeout;
        }

        /* Poll events in ioqueue. */
        int ioqueue_count;

        ioqueue_count = pj_ioqueue_poll(ioq_, &timeout);
        if (ioqueue_count < 0)
            return ioqueue_count;

        return ioqueue_count + timer_count;
    }

    //
    // Get the internal ioqueue object.
    //
    pj_ioqueue_t *get_io_queue()
    {
        return ioq_;
    }

    //
    // Get the internal timer heap object.
    //
    pj_timer_heap_t *get_timer_heap()
    {
        return th_;
    }

private:
    pj_ioqueue_t *ioq_;
    pj_timer_heap_t *th_;
    pj_ioqueue_callback cb_;

    static bool schedule_timer( pj_timer_heap_t *timer, 
                                Pj_Event_Handler *handler,
                                const Pj_Time_Val &delay, 
                                int id=-1)
    {
        handler->timer_.id = id;
        return pj_timer_heap_schedule(timer, &handler->timer_, &delay) == 0;
    }


    //
    // Static read completion callback.
    //
    static void read_complete_cb( pj_ioqueue_key_t *key, 
                                  pj_ioqueue_op_key_t *op_key, 
                                  pj_ssize_t bytes_read)
    {
        Pj_Event_Handler *handler = 
            (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);

        handler->on_read_complete((Pj_Async_Op*)op_key, bytes_read);
    }

    //
    // Static write completion callback.
    //
    static void write_complete_cb(pj_ioqueue_key_t *key, 
                                  pj_ioqueue_op_key_t *op_key,
                                  pj_ssize_t bytes_sent)
    {
        Pj_Event_Handler *handler = 
            (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);

        handler->on_write_complete((Pj_Async_Op*)op_key, bytes_sent);
    }

    //
    // Static accept completion callback.
    //
    static void accept_complete_cb(pj_ioqueue_key_t *key, 
                                   pj_ioqueue_op_key_t *op_key,
                                   pj_sock_t new_sock,
                                   pj_status_t status)
    {
        Pj_Event_Handler *handler = 
            (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);

        handler->on_accept_complete((Pj_Async_Op*)op_key, new_sock, status);
    }

    //
    // Static connect completion callback.
    //
    static void connect_complete_cb(pj_ioqueue_key_t *key, 
                                    pj_status_t status)
    {
        Pj_Event_Handler *handler = 
            (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);

        handler->on_connect_complete(status);
    }

};

#endif  /* __PJPP_PROACTOR_HPP__ */