12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514 |
- /*
- * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
- * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- */
- #include <pj/ioqueue.h>
- #include <pj/os.h>
- #include <pj/lock.h>
- #include <pj/pool.h>
- #include <pj/string.h>
- #include <pj/sock.h>
- #include <pj/array.h>
- #include <pj/log.h>
- #include <pj/assert.h>
- #include <pj/errno.h>
- #include <pj/compat/socket.h>
- #if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
- # include <winsock2.h>
- #elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
- # include <winsock.h>
- #endif
- #if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
- # include <mswsock.h>
- #endif
- /* The address specified in AcceptEx() must be 16 more than the size of
- * SOCKADDR (source: MSDN).
- */
- #define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)
- typedef struct generic_overlapped
- {
- WSAOVERLAPPED overlapped;
- pj_ioqueue_operation_e operation;
- } generic_overlapped;
- /*
- * OVERLAPPPED structure for send and receive.
- */
- typedef struct ioqueue_overlapped
- {
- WSAOVERLAPPED overlapped;
- pj_ioqueue_operation_e operation;
- WSABUF wsabuf;
- pj_sockaddr_in dummy_addr;
- int dummy_addrlen;
- } ioqueue_overlapped;
- #if PJ_HAS_TCP
- /*
- * OVERLAP structure for accept.
- */
- typedef struct ioqueue_accept_rec
- {
- WSAOVERLAPPED overlapped;
- pj_ioqueue_operation_e operation;
- pj_sock_t newsock;
- pj_sock_t *newsock_ptr;
- int *addrlen;
- void *remote;
- void *local;
- char accept_buf[2 * ACCEPT_ADDR_LEN];
- } ioqueue_accept_rec;
- #endif
- /*
- * Structure to hold pending operation key.
- */
- union operation_key
- {
- generic_overlapped generic;
- ioqueue_overlapped overlapped;
- #if PJ_HAS_TCP
- ioqueue_accept_rec accept;
- #endif
- };
- /* Type of handle in the key. */
- enum handle_type
- {
- HND_IS_UNKNOWN,
- HND_IS_FILE,
- HND_IS_SOCKET,
- };
- enum { POST_QUIT_LEN = 0xFFFFDEADUL };
- /*
- * Structure for individual socket.
- */
- struct pj_ioqueue_key_t
- {
- PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
- pj_ioqueue_t *ioqueue;
- HANDLE hnd;
- void *user_data;
- enum handle_type hnd_type;
- pj_ioqueue_callback cb;
- pj_bool_t allow_concurrent;
- pj_grp_lock_t *grp_lock;
- #if PJ_HAS_TCP
- int connecting;
- #endif
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- pj_atomic_t *ref_count;
- pj_bool_t closing;
- pj_time_val free_time;
- pj_mutex_t *mutex;
- #endif
- };
- /*
- * IO Queue structure.
- */
- struct pj_ioqueue_t
- {
- pj_ioqueue_cfg cfg;
- HANDLE iocp;
- pj_lock_t *lock;
- pj_bool_t auto_delete_lock;
- pj_bool_t default_concurrency;
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- pj_ioqueue_key_t active_list;
- pj_ioqueue_key_t free_list;
- pj_ioqueue_key_t closing_list;
- #endif
- /* These are to keep track of connecting sockets */
- #if PJ_HAS_TCP
- unsigned event_count;
- HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
- unsigned connecting_count;
- HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
- pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
- #endif
- };
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- /* Prototype */
- static void scan_closing_keys(pj_ioqueue_t *ioqueue);
- #endif
- #if PJ_HAS_TCP
- /*
- * Process the socket when the overlapped accept() completed.
- */
- static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
- ioqueue_accept_rec *accept_overlapped)
- {
- struct sockaddr *local;
- struct sockaddr *remote;
- int locallen, remotelen;
- pj_status_t status;
- PJ_CHECK_STACK();
- /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
- * addresses can be obtained with getsockname() and getpeername().
- */
- status = setsockopt(accept_overlapped->newsock, SOL_SOCKET,
- SO_UPDATE_ACCEPT_CONTEXT,
- (char*)&key->hnd,
- sizeof(SOCKET));
- /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
- * So ignore the error status.
- */
- /* Operation complete immediately. */
- if (accept_overlapped->addrlen) {
- GetAcceptExSockaddrs( accept_overlapped->accept_buf,
- 0,
- ACCEPT_ADDR_LEN,
- ACCEPT_ADDR_LEN,
- &local,
- &locallen,
- &remote,
- &remotelen);
- if (*accept_overlapped->addrlen >= locallen) {
- if (accept_overlapped->local)
- pj_memcpy(accept_overlapped->local, local, locallen);
- if (accept_overlapped->remote)
- pj_memcpy(accept_overlapped->remote, remote, locallen);
- } else {
- if (accept_overlapped->local)
- pj_bzero(accept_overlapped->local,
- *accept_overlapped->addrlen);
- if (accept_overlapped->remote)
- pj_bzero(accept_overlapped->remote,
- *accept_overlapped->addrlen);
- }
- *accept_overlapped->addrlen = locallen;
- }
- if (accept_overlapped->newsock_ptr)
- *accept_overlapped->newsock_ptr = accept_overlapped->newsock;
- accept_overlapped->operation = 0;
- }
- static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
- {
- pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
- HANDLE hEvent = ioqueue->connecting_handles[pos];
- /* Remove key from array of connecting handles. */
- pj_array_erase(ioqueue->connecting_keys, sizeof(key),
- ioqueue->connecting_count, pos);
- pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
- ioqueue->connecting_count, pos);
- --ioqueue->connecting_count;
- /* Disassociate the socket from the event. */
- WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
- /* Put event object to pool. */
- if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
- ioqueue->event_pool[ioqueue->event_count++] = hEvent;
- } else {
- /* Shouldn't happen. There should be no more pending connections
- * than max.
- */
- pj_assert(0);
- CloseHandle(hEvent);
- }
- }
- /*
- * Poll for the completion of non-blocking connect().
- * If there's a completion, the function return the key of the completed
- * socket, and 'result' argument contains the connect() result. If connect()
- * succeeded, 'result' will have value zero, otherwise will have the error
- * code.
- */
- static int check_connecting( pj_ioqueue_t *ioqueue )
- {
- if (ioqueue->connecting_count) {
- int i, count;
- struct
- {
- pj_ioqueue_key_t *key;
- pj_status_t status;
- } events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1];
- pj_lock_acquire(ioqueue->lock);
- for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) {
- DWORD result;
- result = WaitForMultipleObjects(ioqueue->connecting_count,
- ioqueue->connecting_handles,
- FALSE, 0);
- if (result >= WAIT_OBJECT_0 &&
- result < WAIT_OBJECT_0+ioqueue->connecting_count)
- {
- WSANETWORKEVENTS net_events;
- /* Got completed connect(). */
- unsigned pos = result - WAIT_OBJECT_0;
- events[count].key = ioqueue->connecting_keys[pos];
- /* See whether connect has succeeded. */
- WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd,
- ioqueue->connecting_handles[pos],
- &net_events);
- events[count].status =
- PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
- /* Erase socket from pending connect. */
- erase_connecting_socket(ioqueue, pos);
- } else {
- /* No more events */
- break;
- }
- }
- pj_lock_release(ioqueue->lock);
- /* Call callbacks. */
- for (i=0; i<count; ++i) {
- if (events[i].key->cb.on_connect_complete) {
- events[i].key->cb.on_connect_complete(events[i].key,
- events[i].status);
- }
- }
- return count;
- }
- return 0;
-
- }
- #endif
- /*
- * pj_ioqueue_name()
- */
- PJ_DEF(const char*) pj_ioqueue_name(void)
- {
- return "iocp";
- }
- PJ_DEF(void) pj_ioqueue_cfg_default(pj_ioqueue_cfg *cfg)
- {
- pj_bzero(cfg, sizeof(*cfg));
- cfg->epoll_flags = PJ_IOQUEUE_DEFAULT_EPOLL_FLAGS;
- cfg->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
- }
- PJ_DEF(pj_status_t) pj_ioqueue_clear_key( pj_ioqueue_key_t *key )
- {
- PJ_ASSERT_RETURN(key, PJ_EINVAL);
- pj_ioqueue_lock_key(key);
- key->connecting = 0;
- pj_ioqueue_unlock_key(key);
- return PJ_SUCCESS;
- }
- /*
- * pj_ioqueue_create()
- */
- PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
- pj_size_t max_fd,
- pj_ioqueue_t **p_ioqueue)
- {
- return pj_ioqueue_create2(pool, max_fd, NULL, p_ioqueue);
- }
- /*
- * pj_ioqueue_create2()
- */
- PJ_DEF(pj_status_t) pj_ioqueue_create2(pj_pool_t *pool,
- pj_size_t max_fd,
- const pj_ioqueue_cfg *cfg,
- pj_ioqueue_t **p_ioqueue)
- {
- pj_ioqueue_t *ioqueue;
- pj_size_t i;
- pj_status_t rc;
- PJ_UNUSED_ARG(max_fd);
- PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
- rc = sizeof(union operation_key);
- /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
- PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
- sizeof(union operation_key), PJ_EBUG);
- /* Create IOCP */
- ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
- if (cfg)
- pj_memcpy(&ioqueue->cfg, cfg, sizeof(*cfg));
- else
- pj_ioqueue_cfg_default(&ioqueue->cfg);
- ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
- if (ioqueue->iocp == NULL)
- return PJ_RETURN_OS_ERROR(GetLastError());
- /* Create IOCP mutex */
- rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock);
- if (rc != PJ_SUCCESS) {
- CloseHandle(ioqueue->iocp);
- return rc;
- }
- ioqueue->auto_delete_lock = PJ_TRUE;
- ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- /*
- * Create and initialize key pools.
- */
- pj_list_init(&ioqueue->active_list);
- pj_list_init(&ioqueue->free_list);
- pj_list_init(&ioqueue->closing_list);
- /* Preallocate keys according to max_fd setting, and put them
- * in free_list.
- */
- for (i=0; i<max_fd; ++i) {
- pj_ioqueue_key_t *key;
- key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
- rc = pj_atomic_create(pool, 0, &key->ref_count);
- if (rc != PJ_SUCCESS) {
- key = ioqueue->free_list.next;
- while (key != &ioqueue->free_list) {
- pj_atomic_destroy(key->ref_count);
- pj_mutex_destroy(key->mutex);
- key = key->next;
- }
- CloseHandle(ioqueue->iocp);
- return rc;
- }
- rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex);
- if (rc != PJ_SUCCESS) {
- pj_atomic_destroy(key->ref_count);
- key = ioqueue->free_list.next;
- while (key != &ioqueue->free_list) {
- pj_atomic_destroy(key->ref_count);
- pj_mutex_destroy(key->mutex);
- key = key->next;
- }
- CloseHandle(ioqueue->iocp);
- return rc;
- }
- pj_list_push_back(&ioqueue->free_list, key);
- }
- #endif
- *p_ioqueue = ioqueue;
- PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
- return PJ_SUCCESS;
- }
- /*
- * pj_ioqueue_destroy()
- */
- PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
- {
- #if PJ_HAS_TCP
- unsigned i;
- #endif
- pj_ioqueue_key_t *key;
- PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
- pj_lock_acquire(ioqueue->lock);
- #if PJ_HAS_TCP
- /* Destroy events in the pool */
- for (i=0; i<ioqueue->event_count; ++i) {
- CloseHandle(ioqueue->event_pool[i]);
- }
- ioqueue->event_count = 0;
- #endif
- if (CloseHandle(ioqueue->iocp) != TRUE)
- return PJ_RETURN_OS_ERROR(GetLastError());
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- /* Destroy reference counters */
- key = ioqueue->active_list.next;
- while (key != &ioqueue->active_list) {
- pj_atomic_destroy(key->ref_count);
- pj_mutex_destroy(key->mutex);
- key = key->next;
- }
- key = ioqueue->closing_list.next;
- while (key != &ioqueue->closing_list) {
- pj_atomic_destroy(key->ref_count);
- pj_mutex_destroy(key->mutex);
- key = key->next;
- }
- key = ioqueue->free_list.next;
- while (key != &ioqueue->free_list) {
- pj_atomic_destroy(key->ref_count);
- pj_mutex_destroy(key->mutex);
- key = key->next;
- }
- #endif
- pj_lock_release(ioqueue->lock);
- if (ioqueue->auto_delete_lock)
- pj_lock_destroy(ioqueue->lock);
- return PJ_SUCCESS;
- }
- PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue,
- pj_bool_t allow)
- {
- PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
- ioqueue->default_concurrency = allow;
- return PJ_SUCCESS;
- }
- /*
- * pj_ioqueue_set_lock()
- */
- PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
- pj_lock_t *lock,
- pj_bool_t auto_delete )
- {
- PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
- if (ioqueue->auto_delete_lock) {
- pj_lock_destroy(ioqueue->lock);
- }
- ioqueue->lock = lock;
- ioqueue->auto_delete_lock = auto_delete;
- return PJ_SUCCESS;
- }
- /*
- * pj_ioqueue_register_sock2()
- */
- PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool,
- pj_ioqueue_t *ioqueue,
- pj_sock_t sock,
- pj_grp_lock_t *grp_lock,
- void *user_data,
- const pj_ioqueue_callback *cb,
- pj_ioqueue_key_t **key )
- {
- HANDLE hioq;
- pj_ioqueue_key_t *rec;
- u_long value;
- int rc;
- PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
- pj_lock_acquire(ioqueue->lock);
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- /* Scan closing list first to release unused keys.
- * Must do this with lock acquired.
- */
- scan_closing_keys(ioqueue);
- /* If safe unregistration is used, then get the key record from
- * the free list.
- */
- pj_assert(!pj_list_empty(&ioqueue->free_list));
- if (pj_list_empty(&ioqueue->free_list)) {
- pj_lock_release(ioqueue->lock);
- return PJ_ETOOMANY;
- }
- rec = ioqueue->free_list.next;
- pj_list_erase(rec);
- /* Set initial reference count to 1 */
- pj_assert(pj_atomic_get(rec->ref_count) == 0);
- pj_atomic_inc(rec->ref_count);
- rec->closing = 0;
- #else
- rec = (pj_ioqueue_key_t *)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
- #endif
- /* Build the key for this socket. */
- rec->ioqueue = ioqueue;
- rec->hnd = (HANDLE)sock;
- rec->hnd_type = HND_IS_SOCKET;
- rec->user_data = user_data;
- pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
- /* Set concurrency for this handle */
- rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency);
- if (rc != PJ_SUCCESS) {
- pj_lock_release(ioqueue->lock);
- return rc;
- }
- #if PJ_HAS_TCP
- rec->connecting = 0;
- #endif
- /* Set socket to nonblocking. */
- value = 1;
- rc = ioctlsocket(sock, FIONBIO, &value);
- if (rc != 0) {
- pj_lock_release(ioqueue->lock);
- return PJ_RETURN_OS_ERROR(WSAGetLastError());
- }
- /* Associate with IOCP */
- hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
- if (!hioq) {
- pj_lock_release(ioqueue->lock);
- return PJ_RETURN_OS_ERROR(GetLastError());
- }
- /* Group lock */
- rec->grp_lock = grp_lock;
- if (rec->grp_lock) {
- /* IOCP backend doesn't have group lock functionality, so
- * you should not use it other than for experimental purposes.
- */
- PJ_TODO(INTEGRATE_GROUP_LOCK);
- // pj_grp_lock_add_ref_dbg(rec->grp_lock, "ioqueue", 0);
- }
- *key = rec;
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- pj_list_push_back(&ioqueue->active_list, rec);
- #endif
- pj_lock_release(ioqueue->lock);
- return PJ_SUCCESS;
- }
- /*
- * pj_ioqueue_register_sock()
- */
- PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
- pj_ioqueue_t *ioqueue,
- pj_sock_t sock,
- void *user_data,
- const pj_ioqueue_callback *cb,
- pj_ioqueue_key_t **key )
- {
- return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data, cb,
- key);
- }
- /*
- * pj_ioqueue_get_user_data()
- */
- PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
- {
- PJ_ASSERT_RETURN(key, NULL);
- return key->user_data;
- }
- /*
- * pj_ioqueue_set_user_data()
- */
- PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
- void *user_data,
- void **old_data )
- {
- PJ_ASSERT_RETURN(key, PJ_EINVAL);
-
- if (old_data)
- *old_data = key->user_data;
- key->user_data = user_data;
- return PJ_SUCCESS;
- }
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- /* Decrement the key's reference counter, and when the counter reach zero,
- * destroy the key.
- */
- static void decrement_counter(pj_ioqueue_key_t *key)
- {
- if (pj_atomic_dec_and_get(key->ref_count) == 0) {
- pj_lock_acquire(key->ioqueue->lock);
- pj_assert(key->closing == 1);
- pj_gettickcount(&key->free_time);
- key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
- pj_time_val_normalize(&key->free_time);
- pj_list_erase(key);
- pj_list_push_back(&key->ioqueue->closing_list, key);
- pj_lock_release(key->ioqueue->lock);
- }
- }
- #endif
- /*
- * Poll the I/O Completion Port, execute callback,
- * and return the key and bytes transferred of the last operation.
- */
- static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
- pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
- {
- DWORD dwBytesTransferred, dwKey;
- generic_overlapped *pOv;
- pj_ioqueue_key_t *key;
- pj_ssize_t size_status = -1;
- BOOL rcGetQueued;
- /* Poll for completion status. */
- rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransferred,
- &dwKey, (OVERLAPPED**)&pOv,
- dwTimeout);
- /* The return value is:
- * - nonzero if event was dequeued.
- * - zero and pOv==NULL if no event was dequeued.
- * - zero and pOv!=NULL if event for failed I/O was dequeued.
- */
- if (pOv) {
- pj_bool_t has_lock;
- /* Event was dequeued for either successfull or failed I/O */
- key = (pj_ioqueue_key_t*)dwKey;
- size_status = dwBytesTransferred;
- /* Report to caller regardless */
- if (p_bytes)
- *p_bytes = size_status;
- if (p_key)
- *p_key = key;
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- /* We shouldn't call callbacks if key is quitting. */
- if (key->closing)
- return PJ_TRUE;
- /* If concurrency is disabled, lock the key
- * (and save the lock status to local var since app may change
- * concurrency setting while in the callback) */
- if (key->allow_concurrent == PJ_FALSE) {
- pj_mutex_lock(key->mutex);
- has_lock = PJ_TRUE;
- } else {
- has_lock = PJ_FALSE;
- }
- /* Now that we get the lock, check again that key is not closing */
- if (key->closing) {
- if (has_lock) {
- pj_mutex_unlock(key->mutex);
- }
- return PJ_TRUE;
- }
- /* Increment reference counter to prevent this key from being
- * deleted
- */
- pj_atomic_inc(key->ref_count);
- #else
- PJ_UNUSED_ARG(has_lock);
- #endif
- /* Carry out the callback */
- switch (pOv->operation) {
- case PJ_IOQUEUE_OP_READ:
- case PJ_IOQUEUE_OP_RECV:
- case PJ_IOQUEUE_OP_RECV_FROM:
- pOv->operation = 0;
- if (key->cb.on_read_complete)
- key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
- size_status);
- break;
- case PJ_IOQUEUE_OP_WRITE:
- case PJ_IOQUEUE_OP_SEND:
- case PJ_IOQUEUE_OP_SEND_TO:
- pOv->operation = 0;
- if (key->cb.on_write_complete)
- key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
- size_status);
- break;
- #if PJ_HAS_TCP
- case PJ_IOQUEUE_OP_ACCEPT:
- /* special case for accept. */
- ioqueue_on_accept_complete(key, (ioqueue_accept_rec*)pOv);
- if (key->cb.on_accept_complete) {
- ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
- pj_status_t status = PJ_SUCCESS;
- pj_sock_t newsock;
- newsock = accept_rec->newsock;
- accept_rec->newsock = PJ_INVALID_SOCKET;
- if (newsock == PJ_INVALID_SOCKET) {
- int dwError = WSAGetLastError();
- if (dwError == 0) dwError = OSERR_ENOTCONN;
- status = PJ_RETURN_OS_ERROR(dwError);
- }
- key->cb.on_accept_complete(key, (pj_ioqueue_op_key_t*)pOv,
- newsock, status);
-
- }
- break;
- case PJ_IOQUEUE_OP_CONNECT:
- #endif
- case PJ_IOQUEUE_OP_NONE:
- pj_assert(0);
- break;
- }
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- decrement_counter(key);
- if (has_lock)
- pj_mutex_unlock(key->mutex);
- #endif
- return PJ_TRUE;
- }
- /* No event was queued. */
- return PJ_FALSE;
- }
- /*
- * pj_ioqueue_unregister()
- */
- PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
- {
- unsigned i;
- pj_bool_t has_lock;
- enum { RETRY = 10 };
- PJ_ASSERT_RETURN(key, PJ_EINVAL);
- #if PJ_HAS_TCP
- if (key->connecting) {
- unsigned pos;
- pj_ioqueue_t *ioqueue;
- ioqueue = key->ioqueue;
- /* Erase from connecting_handles */
- pj_lock_acquire(ioqueue->lock);
- for (pos=0; pos < ioqueue->connecting_count; ++pos) {
- if (ioqueue->connecting_keys[pos] == key) {
- erase_connecting_socket(ioqueue, pos);
- break;
- }
- }
- key->connecting = 0;
- pj_lock_release(ioqueue->lock);
- }
- #endif
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- /* Mark key as closing before closing handle. */
- key->closing = 1;
- /* If concurrency is disabled, wait until the key has finished
- * processing the callback
- */
- if (key->allow_concurrent == PJ_FALSE) {
- pj_mutex_lock(key->mutex);
- has_lock = PJ_TRUE;
- } else {
- has_lock = PJ_FALSE;
- }
- #else
- PJ_UNUSED_ARG(has_lock);
- #endif
-
- /* Close handle (the only way to disassociate handle from IOCP).
- * We also need to close handle to make sure that no further events
- * will come to the handle.
- */
- /* Update 2008/07/18 (https://github.com/pjsip/pjproject/issues/575):
- * - It seems that CloseHandle() in itself does not actually close
- * the socket (i.e. it will still appear in "netstat" output). Also
- * if we only use CloseHandle(), an "Invalid Handle" exception will
- * be raised in WSACleanup().
- * - MSDN documentation says that CloseHandle() must be called after
- * closesocket() call (see
- * http://msdn.microsoft.com/en-us/library/ms724211(VS.85).aspx).
- * But turns out that this will raise "Invalid Handle" exception
- * in debug mode.
- * So because of this, we replaced CloseHandle() with closesocket()
- * instead. These was tested on WinXP SP2.
- */
- //CloseHandle(key->hnd);
- pj_sock_close((pj_sock_t)key->hnd);
- /* Reset callbacks */
- key->cb.on_accept_complete = NULL;
- key->cb.on_connect_complete = NULL;
- key->cb.on_read_complete = NULL;
- key->cb.on_write_complete = NULL;
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- /* Even after handle is closed, I suspect that IOCP may still try to
- * do something with the handle, causing memory corruption when pool
- * debugging is enabled.
- *
- * Forcing context switch seems to have fixed that, but this is quite
- * an ugly solution..
- *
- * Update 2008/02/13:
- * This should not happen if concurrency is disallowed for the key.
- * So at least application has a solution for this (i.e. by disallowing
- * concurrency in the key).
- */
- //This will loop forever if unregistration is done on the callback.
- //Doing this with RETRY I think should solve the IOCP setting the
- //socket signalled, without causing the deadlock.
- //while (pj_atomic_get(key->ref_count) != 1)
- // pj_thread_sleep(0);
- for (i=0; pj_atomic_get(key->ref_count) != 1 && i<RETRY; ++i)
- pj_thread_sleep(0);
- /* Decrement reference counter to destroy the key. */
- decrement_counter(key);
- if (has_lock)
- pj_mutex_unlock(key->mutex);
- #endif
- return PJ_SUCCESS;
- }
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- /* Scan the closing list, and put pending closing keys to free list.
- * Must do this with ioqueue mutex held.
- */
- static void scan_closing_keys(pj_ioqueue_t *ioqueue)
- {
- if (!pj_list_empty(&ioqueue->closing_list)) {
- pj_time_val now;
- pj_ioqueue_key_t *key;
- pj_gettickcount(&now);
-
- /* Move closing keys to free list when they've finished the closing
- * idle time.
- */
- key = ioqueue->closing_list.next;
- while (key != &ioqueue->closing_list) {
- pj_ioqueue_key_t *next = key->next;
- pj_assert(key->closing != 0);
- if (PJ_TIME_VAL_GTE(now, key->free_time)) {
- pj_list_erase(key);
- pj_list_push_back(&ioqueue->free_list, key);
- }
- key = next;
- }
- }
- }
- #endif
- /*
- * pj_ioqueue_poll()
- *
- * Poll for events.
- */
- PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
- {
- DWORD dwMsec;
- #if PJ_HAS_TCP
- int connect_count = 0;
- #endif
- int event_count = 0;
- PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
- /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
- dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
- /* Poll for completion status. */
- event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
- #if PJ_HAS_TCP
- /* Check the connecting array, only when there's no activity. */
- if (event_count == 0) {
- connect_count = check_connecting(ioqueue);
- if (connect_count > 0)
- event_count += connect_count;
- }
- #endif
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- /* Check the closing keys only when there's no activity and when there are
- * pending closing keys.
- */
- if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
- pj_lock_acquire(ioqueue->lock);
- scan_closing_keys(ioqueue);
- pj_lock_release(ioqueue->lock);
- }
- #endif
- /* Return number of events. */
- return event_count;
- }
- /*
- * pj_ioqueue_recv()
- *
- * Initiate overlapped WSARecv() operation.
- */
- PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- void *buffer,
- pj_ssize_t *length,
- pj_uint32_t flags )
- {
- /*
- * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
- * addrlen here. But unfortunately it generates EINVAL... :-(
- * -bennylp
- */
- int rc;
- DWORD bytesRead;
- DWORD dwFlags = 0;
- union operation_key *op_key_rec;
- PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- /* Check key is not closing */
- if (key->closing)
- return PJ_ECANCELLED;
- #endif
- op_key_rec = (union operation_key*)op_key->internal__;
- op_key_rec->overlapped.wsabuf.buf = buffer;
- op_key_rec->overlapped.wsabuf.len = *length;
- dwFlags = flags;
-
- /* Try non-overlapped received first to see if data is
- * immediately available.
- */
- if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
- rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
- &bytesRead, &dwFlags, NULL, NULL);
- if (rc == 0) {
- *length = bytesRead;
- return PJ_SUCCESS;
- } else {
- DWORD dwError = WSAGetLastError();
- if (dwError != WSAEWOULDBLOCK) {
- *length = -1;
- return PJ_RETURN_OS_ERROR(dwError);
- }
- }
- }
- dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
- /*
- * No immediate data available.
- * Register overlapped Recv() operation.
- */
- pj_bzero( &op_key_rec->overlapped.overlapped,
- sizeof(op_key_rec->overlapped.overlapped));
- op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
- rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
- &bytesRead, &dwFlags,
- &op_key_rec->overlapped.overlapped, NULL);
- if (rc == SOCKET_ERROR) {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus!=WSA_IO_PENDING) {
- *length = -1;
- return PJ_STATUS_FROM_OS(dwStatus);
- }
- }
- /* Pending operation has been scheduled. */
- return PJ_EPENDING;
- }
- /*
- * pj_ioqueue_recvfrom()
- *
- * Initiate overlapped RecvFrom() operation.
- */
- PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- void *buffer,
- pj_ssize_t *length,
- pj_uint32_t flags,
- pj_sockaddr_t *addr,
- int *addrlen)
- {
- int rc;
- DWORD bytesRead;
- DWORD dwFlags = 0;
- union operation_key *op_key_rec;
- PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- /* Check key is not closing */
- if (key->closing)
- return PJ_ECANCELLED;
- #endif
- op_key_rec = (union operation_key*)op_key->internal__;
- op_key_rec->overlapped.wsabuf.buf = buffer;
- op_key_rec->overlapped.wsabuf.len = *length;
- dwFlags = flags;
-
- /* Try non-overlapped received first to see if data is
- * immediately available.
- */
- if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
- rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
- &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
- if (rc == 0) {
- *length = bytesRead;
- return PJ_SUCCESS;
- } else {
- DWORD dwError = WSAGetLastError();
- if (dwError != WSAEWOULDBLOCK) {
- *length = -1;
- return PJ_RETURN_OS_ERROR(dwError);
- }
- }
- }
- dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
- /*
- * No immediate data available.
- * Register overlapped Recv() operation.
- */
- pj_bzero( &op_key_rec->overlapped.overlapped,
- sizeof(op_key_rec->overlapped.overlapped));
- op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
- rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
- &bytesRead, &dwFlags, addr, addrlen,
- &op_key_rec->overlapped.overlapped, NULL);
- if (rc == SOCKET_ERROR) {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus!=WSA_IO_PENDING) {
- *length = -1;
- return PJ_STATUS_FROM_OS(dwStatus);
- }
- }
-
- /* Pending operation has been scheduled. */
- return PJ_EPENDING;
- }
- /*
- * pj_ioqueue_send()
- *
- * Initiate overlapped Send operation.
- */
- PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- const void *data,
- pj_ssize_t *length,
- pj_uint32_t flags )
- {
- return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
- }
- /*
- * pj_ioqueue_sendto()
- *
- * Initiate overlapped SendTo operation.
- */
- PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- const void *data,
- pj_ssize_t *length,
- pj_uint32_t flags,
- const pj_sockaddr_t *addr,
- int addrlen)
- {
- int rc;
- DWORD bytesWritten;
- DWORD dwFlags;
- union operation_key *op_key_rec;
- PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- /* Check key is not closing */
- if (key->closing)
- return PJ_ECANCELLED;
- #endif
- op_key_rec = (union operation_key*)op_key->internal__;
- /*
- * First try blocking write.
- */
- op_key_rec->overlapped.wsabuf.buf = (void*)data;
- op_key_rec->overlapped.wsabuf.len = *length;
- dwFlags = flags;
- if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
- rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
- &bytesWritten, dwFlags, addr, addrlen,
- NULL, NULL);
- if (rc == 0) {
- *length = bytesWritten;
- return PJ_SUCCESS;
- } else {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus != WSAEWOULDBLOCK) {
- *length = -1;
- return PJ_RETURN_OS_ERROR(dwStatus);
- }
- }
- }
- dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
- /*
- * Data can't be sent immediately.
- * Schedule asynchronous WSASend().
- */
- pj_bzero( &op_key_rec->overlapped.overlapped,
- sizeof(op_key_rec->overlapped.overlapped));
- op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
- rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
- &bytesWritten, dwFlags, addr, addrlen,
- &op_key_rec->overlapped.overlapped, NULL);
- if (rc == SOCKET_ERROR) {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus!=WSA_IO_PENDING)
- return PJ_STATUS_FROM_OS(dwStatus);
- }
- /* Asynchronous operation successfully submitted. */
- return PJ_EPENDING;
- }
- #if PJ_HAS_TCP
- /*
- * pj_ioqueue_accept()
- *
- * Initiate overlapped accept() operation.
- */
- PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_sock_t *new_sock,
- pj_sockaddr_t *local,
- pj_sockaddr_t *remote,
- int *addrlen)
- {
- BOOL rc;
- DWORD bytesReceived;
- pj_status_t status;
- union operation_key *op_key_rec;
- SOCKET sock;
- PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- /* Check key is not closing */
- if (key->closing)
- return PJ_ECANCELLED;
- #endif
- /*
- * See if there is a new connection immediately available.
- */
- sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
- if (sock != INVALID_SOCKET) {
- /* Yes! New socket is available! */
- if (local && addrlen) {
- int status_;
- /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
- * addresses can be obtained with getsockname() and getpeername().
- */
- status_ = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
- (char*)&key->hnd, sizeof(SOCKET));
- /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
- * So ignore the error status.
- */
- status_ = getsockname(sock, local, addrlen);
- if (status_ != 0) {
- DWORD dwError = WSAGetLastError();
- closesocket(sock);
- return PJ_RETURN_OS_ERROR(dwError);
- }
- }
- *new_sock = sock;
- return PJ_SUCCESS;
- } else {
- DWORD dwError = WSAGetLastError();
- if (dwError != WSAEWOULDBLOCK) {
- return PJ_RETURN_OS_ERROR(dwError);
- }
- }
- /*
- * No connection is immediately available.
- * Must schedule an asynchronous operation.
- */
- op_key_rec = (union operation_key*)op_key->internal__;
-
- status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0,
- &op_key_rec->accept.newsock);
- if (status != PJ_SUCCESS)
- return status;
- op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
- op_key_rec->accept.addrlen = addrlen;
- op_key_rec->accept.local = local;
- op_key_rec->accept.remote = remote;
- op_key_rec->accept.newsock_ptr = new_sock;
- pj_bzero( &op_key_rec->accept.overlapped,
- sizeof(op_key_rec->accept.overlapped));
- rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
- op_key_rec->accept.accept_buf,
- 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
- &bytesReceived,
- &op_key_rec->accept.overlapped );
- if (rc == TRUE) {
- ioqueue_on_accept_complete(key, &op_key_rec->accept);
- return PJ_SUCCESS;
- } else {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus!=WSA_IO_PENDING)
- return PJ_STATUS_FROM_OS(dwStatus);
- }
- /* Asynchronous Accept() has been submitted. */
- return PJ_EPENDING;
- }
- /*
- * pj_ioqueue_connect()
- *
- * Initiate overlapped connect() operation (well, it's non-blocking actually,
- * since there's no overlapped version of connect()).
- */
- PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
- const pj_sockaddr_t *addr,
- int addrlen )
- {
- HANDLE hEvent;
- pj_ioqueue_t *ioqueue;
- PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- /* Check key is not closing */
- if (key->closing)
- return PJ_ECANCELLED;
- #endif
- /* Initiate connect() */
- if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
- DWORD dwStatus;
- dwStatus = WSAGetLastError();
- if (dwStatus != WSAEWOULDBLOCK) {
- return PJ_RETURN_OS_ERROR(dwStatus);
- }
- } else {
- /* Connect has completed immediately! */
- return PJ_SUCCESS;
- }
- ioqueue = key->ioqueue;
- /* Add to the array of connecting socket to be polled */
- pj_lock_acquire(ioqueue->lock);
- if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
- pj_lock_release(ioqueue->lock);
- return PJ_ETOOMANYCONN;
- }
- /* Get or create event object. */
- if (ioqueue->event_count) {
- hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
- --ioqueue->event_count;
- } else {
- hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
- if (hEvent == NULL) {
- DWORD dwStatus = GetLastError();
- pj_lock_release(ioqueue->lock);
- return PJ_STATUS_FROM_OS(dwStatus);
- }
- }
- /* Mark key as connecting.
- * We can't use array index since key can be removed dynamically.
- */
- key->connecting = 1;
- /* Associate socket events to the event object. */
- if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
- CloseHandle(hEvent);
- pj_lock_release(ioqueue->lock);
- return PJ_RETURN_OS_ERROR(WSAGetLastError());
- }
- /* Add to array. */
- ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
- ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
- ioqueue->connecting_count++;
- pj_lock_release(ioqueue->lock);
- return PJ_EPENDING;
- }
- #endif /* #if PJ_HAS_TCP */
- PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
- pj_size_t size )
- {
- pj_bzero(op_key, size);
- }
- PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key )
- {
- BOOL rc;
- DWORD bytesTransferred;
- rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
- &bytesTransferred, FALSE );
- if (rc == FALSE) {
- return GetLastError()==ERROR_IO_INCOMPLETE;
- }
- return FALSE;
- }
- PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_ssize_t bytes_status )
- {
- BOOL rc;
- rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
- (long)key, (OVERLAPPED*)op_key );
- if (rc == FALSE) {
- return PJ_RETURN_OS_ERROR(GetLastError());
- }
- return PJ_SUCCESS;
- }
- PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
- pj_bool_t allow)
- {
- PJ_ASSERT_RETURN(key, PJ_EINVAL);
- /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
- * disabled.
- */
- PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
- key->allow_concurrent = allow;
- return PJ_SUCCESS;
- }
- PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
- {
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- return pj_mutex_lock(key->mutex);
- #else
- PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
- #endif
- }
- PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
- {
- #if PJ_IOQUEUE_HAS_SAFE_UNREG
- return pj_mutex_unlock(key->mutex);
- #else
- PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
- #endif
- }
- PJ_DEF(pj_oshandle_t) pj_ioqueue_get_os_handle( pj_ioqueue_t *ioqueue )
- {
- return ioqueue ? (pj_oshandle_t)ioqueue->iocp : NULL;
- }
|