ioq_udp.c 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301
  1. /*
  2. * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
  3. * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation; either version 2 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program; if not, write to the Free Software
  17. * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  18. */
  19. #include "test.h"
  20. /**
  21. * \page page_pjlib_ioqueue_udp_test Test: I/O Queue (UDP)
  22. *
  23. * This file provides implementation to test the
  24. * functionality of the I/O queue when UDP socket is used.
  25. *
  26. *
  27. * This file is <b>pjlib-test/ioq_udp.c</b>
  28. *
  29. * \include pjlib-test/ioq_udp.c
  30. */
  31. #if INCLUDE_UDP_IOQUEUE_TEST
  32. #include <pjlib.h>
  33. #include <pj/compat/socket.h>
  34. #define THIS_FILE "test_udp"
  35. #define PORT 51233
  36. #define LOOP 2
  37. ///#define LOOP 2
  38. #define BUF_MIN_SIZE 32
  39. #define BUF_MAX_SIZE 2048
  40. #define SOCK_INACTIVE_MIN (1)
  41. #define SOCK_INACTIVE_MAX (PJ_IOQUEUE_MAX_HANDLES - 2)
  42. #define POOL_SIZE (2*BUF_MAX_SIZE + SOCK_INACTIVE_MAX*128 + 2048)
  43. #undef TRACE_
  44. #define TRACE_(msg) PJ_LOG(3,(THIS_FILE,"....." msg))
  45. #if 0
  46. # define TRACE__(args) PJ_LOG(3,args)
  47. #else
  48. # define TRACE__(args)
  49. #endif
  50. static pj_ssize_t callback_read_size,
  51. callback_write_size,
  52. callback_accept_status,
  53. callback_connect_status;
  54. static pj_ioqueue_key_t *callback_read_key,
  55. *callback_write_key,
  56. *callback_accept_key,
  57. *callback_connect_key;
  58. static pj_ioqueue_op_key_t *callback_read_op,
  59. *callback_write_op,
  60. *callback_accept_op;
  61. static void on_ioqueue_read(pj_ioqueue_key_t *key,
  62. pj_ioqueue_op_key_t *op_key,
  63. pj_ssize_t bytes_read)
  64. {
  65. callback_read_key = key;
  66. callback_read_op = op_key;
  67. callback_read_size = bytes_read;
  68. TRACE__((THIS_FILE, " callback_read_key = %p, bytes=%d",
  69. key, bytes_read));
  70. }
  71. static void on_ioqueue_write(pj_ioqueue_key_t *key,
  72. pj_ioqueue_op_key_t *op_key,
  73. pj_ssize_t bytes_written)
  74. {
  75. callback_write_key = key;
  76. callback_write_op = op_key;
  77. callback_write_size = bytes_written;
  78. }
  79. static void on_ioqueue_accept(pj_ioqueue_key_t *key,
  80. pj_ioqueue_op_key_t *op_key,
  81. pj_sock_t sock, int status)
  82. {
  83. PJ_UNUSED_ARG(sock);
  84. callback_accept_key = key;
  85. callback_accept_op = op_key;
  86. callback_accept_status = status;
  87. }
  88. static void on_ioqueue_connect(pj_ioqueue_key_t *key, int status)
  89. {
  90. callback_connect_key = key;
  91. callback_connect_status = status;
  92. }
  93. static pj_ioqueue_callback test_cb =
  94. {
  95. &on_ioqueue_read,
  96. &on_ioqueue_write,
  97. &on_ioqueue_accept,
  98. &on_ioqueue_connect,
  99. };
  100. #if defined(PJ_WIN32) || defined(PJ_WIN64)
  101. # define S_ADDR S_un.S_addr
  102. #else
  103. # define S_ADDR s_addr
  104. #endif
  105. /*
  106. * compliance_test()
  107. * To test that the basic IOQueue functionality works. It will just exchange
  108. * data between two sockets.
  109. */
  110. static int compliance_test(const pj_ioqueue_cfg *cfg)
  111. {
  112. pj_sock_t ssock=-1, csock=-1;
  113. pj_sockaddr_in addr, dst_addr;
  114. int addrlen;
  115. pj_pool_t *pool = NULL;
  116. char *send_buf, *recv_buf;
  117. pj_ioqueue_t *ioque = NULL;
  118. pj_ioqueue_key_t *skey = NULL, *ckey = NULL;
  119. pj_ioqueue_op_key_t read_op, write_op;
  120. int bufsize = BUF_MIN_SIZE;
  121. pj_ssize_t bytes;
  122. int status = -1;
  123. pj_str_t temp;
  124. pj_bool_t send_pending, recv_pending;
  125. pj_status_t rc;
  126. pj_str_t ioque_name;
  127. pj_str_t ioqueue_type;
  128. pj_set_os_error(PJ_SUCCESS);
  129. // Create pool.
  130. pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);
  131. // Allocate buffers for send and receive.
  132. send_buf = (char*)pj_pool_alloc(pool, bufsize);
  133. recv_buf = (char*)pj_pool_alloc(pool, bufsize);
  134. // Allocate sockets for sending and receiving.
  135. TRACE_("creating sockets...");
  136. rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, &ssock);
  137. if (rc==PJ_SUCCESS)
  138. rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, &csock);
  139. else
  140. csock = PJ_INVALID_SOCKET;
  141. if (rc != PJ_SUCCESS) {
  142. app_perror("...ERROR in pj_sock_socket()", rc);
  143. status=-1; goto on_error;
  144. }
  145. // Bind server socket.
  146. TRACE_("bind socket...");
  147. pj_bzero(&addr, sizeof(addr));
  148. addr.sin_family = pj_AF_INET();
  149. addr.sin_port = pj_htons(PORT);
  150. if (pj_sock_bind(ssock, &addr, sizeof(addr))) {
  151. status=-10; goto on_error;
  152. }
  153. // Create I/O Queue.
  154. TRACE_("create ioqueue...");
  155. rc = pj_ioqueue_create2(pool, PJ_IOQUEUE_MAX_HANDLES, cfg, &ioque);
  156. if (rc != PJ_SUCCESS) {
  157. status=-20; goto on_error;
  158. }
  159. ioque_name = pj_str((char*)pj_ioqueue_name());
  160. if (pj_strncmp(&ioque_name, pj_cstr(&ioqueue_type, "epoll"), 5) == 0 ||
  161. pj_strncmp(&ioque_name, pj_cstr(&ioqueue_type, "kqueue"), 6) == 0 ||
  162. pj_strncmp(&ioque_name, pj_cstr(&ioqueue_type, "iocp"), 4) == 0) {
  163. if (pj_ioqueue_get_os_handle(ioque) == NULL) {
  164. PJ_LOG(1,(
  165. THIS_FILE,
  166. "...pj_ioqueue_os_handle() unexpectedly returned NULL"
  167. ));
  168. status=-21; goto on_error;
  169. }
  170. }
  171. // Register server and client socket.
  172. // We put this after inactivity socket, hopefully this can represent the
  173. // worst waiting time.
  174. TRACE_("registering first sockets...");
  175. rc = pj_ioqueue_register_sock(pool, ioque, ssock, NULL,
  176. &test_cb, &skey);
  177. if (rc != PJ_SUCCESS) {
  178. app_perror("...error(10): ioqueue_register error", rc);
  179. status=-25; goto on_error;
  180. }
  181. TRACE_("registering second sockets...");
  182. rc = pj_ioqueue_register_sock( pool, ioque, csock, NULL,
  183. &test_cb, &ckey);
  184. if (rc != PJ_SUCCESS) {
  185. app_perror("...error(11): ioqueue_register error", rc);
  186. status=-26; goto on_error;
  187. }
  188. // Randomize send_buf.
  189. pj_create_random_string(send_buf, bufsize);
  190. // Init operation keys.
  191. pj_ioqueue_op_key_init(&read_op, sizeof(read_op));
  192. pj_ioqueue_op_key_init(&write_op, sizeof(write_op));
  193. // Register reading from ioqueue.
  194. TRACE_("start recvfrom...");
  195. pj_bzero(&addr, sizeof(addr));
  196. addrlen = sizeof(addr);
  197. bytes = bufsize;
  198. rc = pj_ioqueue_recvfrom(skey, &read_op, recv_buf, &bytes, 0,
  199. &addr, &addrlen);
  200. if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
  201. app_perror("...error: pj_ioqueue_recvfrom", rc);
  202. status=-28; goto on_error;
  203. } else if (rc == PJ_EPENDING) {
  204. recv_pending = 1;
  205. PJ_LOG(3, (THIS_FILE,
  206. "......ok: recvfrom returned pending"));
  207. } else {
  208. PJ_LOG(3, (THIS_FILE,
  209. "......error: recvfrom returned immediate ok!"));
  210. status=-29; goto on_error;
  211. }
  212. // Set destination address to send the packet.
  213. TRACE_("set destination address...");
  214. temp = pj_str("127.0.0.1");
  215. if ((rc=pj_sockaddr_in_init(&dst_addr, &temp, PORT)) != 0) {
  216. app_perror("...error: unable to resolve 127.0.0.1", rc);
  217. status=-290; goto on_error;
  218. }
  219. // Write must return the number of bytes.
  220. TRACE_("start sendto...");
  221. bytes = bufsize;
  222. rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, &dst_addr,
  223. sizeof(dst_addr));
  224. if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
  225. app_perror("...error: pj_ioqueue_sendto", rc);
  226. status=-30; goto on_error;
  227. } else if (rc == PJ_EPENDING) {
  228. send_pending = 1;
  229. PJ_LOG(3, (THIS_FILE,
  230. "......ok: sendto returned pending"));
  231. } else {
  232. send_pending = 0;
  233. PJ_LOG(3, (THIS_FILE,
  234. "......ok: sendto returned immediate success"));
  235. }
  236. // reset callback variables.
  237. callback_read_size = callback_write_size = 0;
  238. callback_accept_status = callback_connect_status = -2;
  239. callback_read_key = callback_write_key =
  240. callback_accept_key = callback_connect_key = NULL;
  241. callback_read_op = callback_write_op = NULL;
  242. // Poll if pending.
  243. while (send_pending || recv_pending) {
  244. int ret;
  245. pj_time_val timeout = { 5, 0 };
  246. TRACE_("poll...");
  247. #ifdef PJ_SYMBIAN
  248. ret = pj_symbianos_poll(-1, PJ_TIME_VAL_MSEC(timeout));
  249. #else
  250. ret = pj_ioqueue_poll(ioque, &timeout);
  251. #endif
  252. if (ret == 0) {
  253. PJ_LOG(1,(THIS_FILE, "...ERROR: timed out..."));
  254. status=-45; goto on_error;
  255. } else if (ret < 0) {
  256. app_perror("...ERROR in ioqueue_poll()", -ret);
  257. status=-50; goto on_error;
  258. }
  259. if (callback_read_key != NULL) {
  260. if (callback_read_size != bufsize) {
  261. status=-61; goto on_error;
  262. }
  263. if (callback_read_key != skey) {
  264. status=-65; goto on_error;
  265. }
  266. if (callback_read_op != &read_op) {
  267. status=-66; goto on_error;
  268. }
  269. if (pj_memcmp(send_buf, recv_buf, bufsize) != 0) {
  270. status=-67; goto on_error;
  271. }
  272. if (addrlen != sizeof(pj_sockaddr_in)) {
  273. status=-68; goto on_error;
  274. }
  275. if (addr.sin_family != pj_AF_INET()) {
  276. status=-69; goto on_error;
  277. }
  278. recv_pending = 0;
  279. }
  280. if (callback_write_key != NULL) {
  281. if (callback_write_size != bufsize) {
  282. status=-73; goto on_error;
  283. }
  284. if (callback_write_key != ckey) {
  285. status=-75; goto on_error;
  286. }
  287. if (callback_write_op != &write_op) {
  288. status=-76; goto on_error;
  289. }
  290. send_pending = 0;
  291. }
  292. }
  293. // Success
  294. status = 0;
  295. on_error:
  296. if (skey)
  297. pj_ioqueue_unregister(skey);
  298. else if (ssock != -1)
  299. pj_sock_close(ssock);
  300. if (ckey)
  301. pj_ioqueue_unregister(ckey);
  302. else if (csock != -1)
  303. pj_sock_close(csock);
  304. if (ioque != NULL)
  305. pj_ioqueue_destroy(ioque);
  306. pj_pool_release(pool);
  307. return status;
  308. }
  309. static void on_read_complete(pj_ioqueue_key_t *key,
  310. pj_ioqueue_op_key_t *op_key,
  311. pj_ssize_t bytes_read)
  312. {
  313. unsigned *p_packet_cnt = (unsigned*) pj_ioqueue_get_user_data(key);
  314. PJ_UNUSED_ARG(op_key);
  315. PJ_UNUSED_ARG(bytes_read);
  316. (*p_packet_cnt)++;
  317. }
  318. /*
  319. * unregister_test()
  320. * Check if callback is still called after socket has been unregistered or
  321. * closed.
  322. */
  323. static int unregister_test(const pj_ioqueue_cfg *cfg)
  324. {
  325. enum { RPORT = 50000, SPORT = 50001 };
  326. pj_pool_t *pool;
  327. pj_ioqueue_t *ioqueue;
  328. pj_sock_t ssock;
  329. pj_sock_t rsock, rsock2;
  330. int i, addrlen;
  331. pj_sockaddr_in addr;
  332. pj_ioqueue_key_t *key, *key2;
  333. pj_ioqueue_op_key_t opkey;
  334. pj_ioqueue_callback cb;
  335. unsigned packet_cnt;
  336. char sendbuf[10], recvbuf[10];
  337. void *user_data2 = (void*)(long)2;
  338. pj_ssize_t bytes;
  339. pj_time_val timeout;
  340. pj_status_t status;
  341. pool = pj_pool_create(mem, "test", 4000, 4000, NULL);
  342. if (!pool) {
  343. app_perror("Unable to create pool", PJ_ENOMEM);
  344. return -100;
  345. }
  346. status = pj_ioqueue_create2(pool, 1, cfg, &ioqueue);
  347. if (status != PJ_SUCCESS) {
  348. app_perror("Error creating ioqueue", status);
  349. return -110;
  350. }
  351. /* Create sender socket */
  352. status = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, SPORT, &ssock);
  353. if (status != PJ_SUCCESS) {
  354. app_perror("Error initializing socket", status);
  355. return -120;
  356. }
  357. /* Create receiver socket. */
  358. status = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, RPORT, &rsock);
  359. if (status != PJ_SUCCESS) {
  360. app_perror("Error initializing socket", status);
  361. return -130;
  362. }
  363. /* Register rsock to ioqueue. */
  364. pj_bzero(&cb, sizeof(cb));
  365. cb.on_read_complete = &on_read_complete;
  366. packet_cnt = 0;
  367. status = pj_ioqueue_register_sock(pool, ioqueue, rsock, &packet_cnt,
  368. &cb, &key);
  369. if (status != PJ_SUCCESS) {
  370. app_perror("Error registering to ioqueue", status);
  371. return -140;
  372. }
  373. /* Init operation key. */
  374. pj_ioqueue_op_key_init(&opkey, sizeof(opkey));
  375. /* Start reading. */
  376. bytes = sizeof(recvbuf);
  377. status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0);
  378. if (status != PJ_EPENDING) {
  379. app_perror("Expecting PJ_EPENDING, but got this", status);
  380. return -150;
  381. }
  382. /* Init destination address. */
  383. addrlen = sizeof(addr);
  384. status = pj_sock_getsockname(rsock, &addr, &addrlen);
  385. if (status != PJ_SUCCESS) {
  386. app_perror("getsockname error", status);
  387. return -160;
  388. }
  389. /* Override address with 127.0.0.1, since getsockname will return
  390. * zero in the address field.
  391. */
  392. addr.sin_addr = pj_inet_addr2("127.0.0.1");
  393. /* Init buffer to send */
  394. pj_ansi_strxcpy(sendbuf, "Hello0123", sizeof(sendbuf));
  395. /* Send one packet. */
  396. bytes = sizeof(sendbuf);
  397. status = pj_sock_sendto(ssock, sendbuf, &bytes, 0,
  398. &addr, sizeof(addr));
  399. if (status != PJ_SUCCESS) {
  400. app_perror("sendto error", status);
  401. return -170;
  402. }
  403. /* Check if packet is received. */
  404. timeout.sec = 1; timeout.msec = 0;
  405. #ifdef PJ_SYMBIAN
  406. pj_symbianos_poll(-1, 1000);
  407. #else
  408. pj_ioqueue_poll(ioqueue, &timeout);
  409. #endif
  410. if (packet_cnt != 1) {
  411. return -180;
  412. }
  413. /* Just to make sure things are settled.. */
  414. pj_thread_sleep(100);
  415. /* Start reading again. */
  416. bytes = sizeof(recvbuf);
  417. status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0);
  418. if (status != PJ_EPENDING) {
  419. app_perror("Expecting PJ_EPENDING, but got this", status);
  420. return -190;
  421. }
  422. /* Reset packet counter */
  423. packet_cnt = 0;
  424. /* Send one packet. */
  425. bytes = sizeof(sendbuf);
  426. status = pj_sock_sendto(ssock, sendbuf, &bytes, 0,
  427. &addr, sizeof(addr));
  428. if (status != PJ_SUCCESS) {
  429. app_perror("sendto error", status);
  430. return -200;
  431. }
  432. /* Now unregister and close socket. */
  433. status = pj_ioqueue_unregister(key);
  434. if (status != PJ_SUCCESS) {
  435. app_perror("pj_ioqueue_unregister error", status);
  436. return -201;
  437. }
  438. /* Poll ioqueue. */
  439. for (i=0; i<10; ++i) {
  440. #ifdef PJ_SYMBIAN
  441. pj_symbianos_poll(-1, 100);
  442. #else
  443. timeout.sec = 0; timeout.msec = 100;
  444. pj_ioqueue_poll(ioqueue, &timeout);
  445. #endif
  446. }
  447. /* Must NOT receive any packets after socket is closed! */
  448. if (packet_cnt > 0) {
  449. PJ_LOG(3,(THIS_FILE, "....errror: not expecting to receive packet "
  450. "after socket has been closed"));
  451. return -210;
  452. }
  453. /* Now unregister again, immediately and after PJ_IOQUEUE_KEY_FREE_DELAY.
  454. * It should return error, and most importantly, it must not crash.. */
  455. for (i=0; i<2; ++i) {
  456. status = pj_ioqueue_unregister(key);
  457. /*
  458. * as it turns out, double unregistration returns PJ_SUCCESS
  459. if (status == PJ_SUCCESS) {
  460. PJ_LOG(1, (THIS_FILE,
  461. "Expecting pj_ioqueue_unregister() error (i=%d)", i));
  462. return -220;
  463. }
  464. */
  465. pj_thread_sleep(PJ_IOQUEUE_KEY_FREE_DELAY + 100);
  466. }
  467. /*
  468. * Second stage of the test. Register another socket. Then unregister using
  469. * the previous key.
  470. */
  471. status = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, RPORT, &rsock2);
  472. if (status != PJ_SUCCESS) {
  473. app_perror("Error initializing socket (2)", status);
  474. return -330;
  475. }
  476. /* Register rsock to ioqueue. */
  477. status = pj_ioqueue_register_sock(pool, ioqueue, rsock2, user_data2,
  478. &cb, &key2);
  479. if (status != PJ_SUCCESS) {
  480. app_perror("Error registering to ioqueue (2)", status);
  481. return -340;
  482. }
  483. /* We shouldn't be able to unregister using the first key. Or should we?
  484. *
  485. * So basically we're simulating buggy application that is unregistering
  486. * an old key.
  487. *
  488. * With current ioqueue implementation, it will return success because
  489. * "key" is the same as "key2" (because ioueue's max_handles is 1 and due to
  490. * PJ_IOQUEUE_HAS_SAFE_UNREG in ioqueue). But what is the expected status
  491. * anyway? Should it return an error? Ideally, I think so. But since we
  492. * can't do that, I'm putting this as an INFO to remind us about this
  493. * behavior.
  494. */
  495. status = pj_ioqueue_unregister(key);
  496. if (status == PJ_SUCCESS) {
  497. PJ_LOG(3,(THIS_FILE, "....info: unregistering dead key was successful"));
  498. }
  499. /* Success */
  500. pj_sock_close(ssock);
  501. pj_ioqueue_destroy(ioqueue);
  502. pj_pool_release(pool);
  503. return 0;
  504. }
  505. /*
  506. * Testing with many handles.
  507. * This will just test registering PJ_IOQUEUE_MAX_HANDLES count
  508. * of sockets to the ioqueue.
  509. */
  510. static int many_handles_test(const pj_ioqueue_cfg *cfg)
  511. {
  512. enum { MAX = PJ_IOQUEUE_MAX_HANDLES };
  513. pj_pool_t *pool;
  514. pj_ioqueue_t *ioqueue;
  515. pj_sock_t *sock;
  516. pj_ioqueue_key_t **key;
  517. pj_status_t rc;
  518. int count, i; /* must be signed */
  519. PJ_LOG(3,(THIS_FILE,"...testing with so many handles"));
  520. pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
  521. if (!pool)
  522. return PJ_ENOMEM;
  523. key = (pj_ioqueue_key_t**)
  524. pj_pool_alloc(pool, MAX*sizeof(pj_ioqueue_key_t*));
  525. sock = (pj_sock_t*) pj_pool_alloc(pool, MAX*sizeof(pj_sock_t));
  526. /* Create IOQueue */
  527. rc = pj_ioqueue_create2(pool, MAX, cfg, &ioqueue);
  528. if (rc != PJ_SUCCESS || ioqueue == NULL) {
  529. app_perror("...error in pj_ioqueue_create", rc);
  530. return -10;
  531. }
  532. /* Register as many sockets. */
  533. for (count=0; count<MAX; ++count) {
  534. sock[count] = PJ_INVALID_SOCKET;
  535. rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, &sock[count]);
  536. if (rc != PJ_SUCCESS || sock[count] == PJ_INVALID_SOCKET) {
  537. PJ_LOG(3,(THIS_FILE, "....unable to create %d-th socket, rc=%d",
  538. count, rc));
  539. break;
  540. }
  541. key[count] = NULL;
  542. rc = pj_ioqueue_register_sock(pool, ioqueue, sock[count],
  543. NULL, &test_cb, &key[count]);
  544. if (rc != PJ_SUCCESS || key[count] == NULL) {
  545. PJ_LOG(3,(THIS_FILE, "....unable to register %d-th socket, rc=%d",
  546. count, rc));
  547. return -30;
  548. }
  549. }
  550. /* Test complete. */
  551. /* Now deregister and close all handles. */
  552. /* NOTE for RTEMS:
  553. * It seems that the order of close(sock) is pretty important here.
  554. * If we close the sockets with the same order as when they were created,
  555. * RTEMS doesn't seem to reuse the sockets, thus next socket created
  556. * will have descriptor higher than the last socket created.
  557. * If we close the sockets in the reverse order, then the descriptor will
  558. * get reused.
  559. * This used to cause problem with select ioqueue, since the ioqueue
  560. * always gives FD_SETSIZE for the first select() argument. This ioqueue
  561. * behavior can be changed with setting PJ_SELECT_NEEDS_NFDS macro.
  562. */
  563. for (i=count-1; i>=0; --i) {
  564. ///for (i=0; i<count; ++i) {
  565. rc = pj_ioqueue_unregister(key[i]);
  566. if (rc != PJ_SUCCESS) {
  567. app_perror("...error in pj_ioqueue_unregister", rc);
  568. }
  569. }
  570. rc = pj_ioqueue_destroy(ioqueue);
  571. if (rc != PJ_SUCCESS) {
  572. app_perror("...error in pj_ioqueue_destroy", rc);
  573. }
  574. pj_pool_release(pool);
  575. PJ_LOG(3,(THIS_FILE,"....many_handles_test() ok"));
  576. return 0;
  577. }
  578. #if PJ_HAS_THREADS
  579. typedef struct parallel_recv_data
  580. {
  581. unsigned buffer;
  582. pj_ssize_t len;
  583. } parallel_recv_data;
  584. static void on_read_complete2(pj_ioqueue_key_t *key,
  585. pj_ioqueue_op_key_t *op_key,
  586. pj_ssize_t bytes_read)
  587. {
  588. unsigned *p_packet_cnt = (unsigned*) pj_ioqueue_get_user_data(key);
  589. parallel_recv_data *ud = (parallel_recv_data*)op_key->user_data;
  590. if (bytes_read < 0) {
  591. pj_status_t status = (pj_status_t)-bytes_read;
  592. if (status==PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
  593. TRACE__((THIS_FILE, "......recv() fail with status=%d, retrying",
  594. status));
  595. ud->len = bytes_read = sizeof(ud->buffer);
  596. status = pj_ioqueue_recv(key, op_key, &ud->buffer, &ud->len, 0);
  597. if (status == PJ_EPENDING)
  598. return;
  599. }
  600. if (status != PJ_SUCCESS) {
  601. PJ_PERROR(3,(THIS_FILE, status, "......status=%d", status));
  602. return;
  603. }
  604. }
  605. assert (bytes_read==sizeof(unsigned));
  606. if (ud->buffer != *p_packet_cnt) {
  607. PJ_LOG(1,(THIS_FILE, "......error: invalid packet sequence "
  608. "(expecting %d, got %d)",
  609. *p_packet_cnt, ud->buffer));
  610. } else {
  611. TRACE__((THIS_FILE, "......recv() sequence %d", ud->buffer));
  612. }
  613. (*p_packet_cnt)++;
  614. }
  615. typedef struct parallel_thread_data
  616. {
  617. pj_ioqueue_t *ioqueue;
  618. pj_bool_t quit_flag;
  619. unsigned id, timeout, wakeup_cnt, event_cnt, err_cnt;
  620. } parallel_thread_data;
  621. static int parallel_worker_thread(void *p)
  622. {
  623. parallel_thread_data *arg = (parallel_thread_data*)p;
  624. pj_time_val t_end;
  625. pj_gettickcount(&t_end);
  626. t_end.sec += arg->timeout;
  627. while (!arg->quit_flag) {
  628. pj_time_val timeout = {0, 0};
  629. int rc;
  630. timeout.sec = arg->timeout;
  631. rc = pj_ioqueue_poll(arg->ioqueue, &timeout);
  632. if (rc >= 1) {
  633. assert(rc==1); /* we should receive packet one by one! */
  634. TRACE__((THIS_FILE, "......thread %d got event (total=%d)",
  635. arg->id, arg->wakeup_cnt));
  636. ++arg->wakeup_cnt;
  637. arg->event_cnt += rc;
  638. } else if (rc == 0) {
  639. if (!arg->quit_flag) {
  640. TRACE__((THIS_FILE, "......thread %d wakeup, no event (total=%d)",
  641. arg->id, arg->wakeup_cnt));
  642. ++arg->wakeup_cnt;
  643. }
  644. } else if (rc < 0) {
  645. TRACE__((THIS_FILE, "......thread %d got error", arg->id));
  646. ++arg->wakeup_cnt;
  647. ++arg->err_cnt;
  648. }
  649. }
  650. return 0;
  651. }
  652. /*
  653. * Parallel recv test. Test this scenario:
  654. * - create socket
  655. * - spawn N ioqueue_recv() operations on N threads
  656. * - repeat N times:
  657. * - send one packet to the socket
  658. * - on recv callback, do not re-invoke ioqueue_recv()
  659. * Expected result: we should receive N packets
  660. */
  661. static int parallel_recv_test(const pj_ioqueue_cfg *cfg)
  662. {
  663. pj_pool_t *pool;
  664. pj_sock_t ssock = PJ_INVALID_SOCKET, csock = PJ_INVALID_SOCKET;
  665. pj_ioqueue_t *ioqueue = NULL;
  666. pj_ioqueue_key_t *skey = NULL;
  667. pj_ioqueue_callback cb;
  668. enum {
  669. ASYNC_CNT = 16,
  670. PKT_SIZE = 16,
  671. SEND_DELAY_MSECS = 250,
  672. TIMEOUT_SECS = (SEND_DELAY_MSECS*ASYNC_CNT/1000)+2,
  673. };
  674. typedef int packet_t;
  675. pj_thread_t *threads[ASYNC_CNT];
  676. parallel_thread_data thread_datas[ASYNC_CNT], threads_total;
  677. pj_ioqueue_op_key_t recv_ops[ASYNC_CNT];
  678. parallel_recv_data recv_datas[ASYNC_CNT];
  679. unsigned i, async_send = 0, recv_packet_count = 0;
  680. int retcode;
  681. pool = pj_pool_create(mem, "test", 4000, 4000, NULL);
  682. if (!pool) {
  683. app_perror("Unable to create pool", PJ_ENOMEM);
  684. return -100;
  685. }
  686. CHECK(-110, app_socketpair(pj_AF_INET(), pj_SOCK_STREAM(), 0,
  687. &ssock, &csock));
  688. CHECK(-120, pj_ioqueue_create2(pool, 2, cfg, &ioqueue));
  689. pj_bzero(&cb, sizeof(cb));
  690. cb.on_read_complete = &on_read_complete2;
  691. CHECK(-130, pj_ioqueue_register_sock(pool, ioqueue, ssock, &recv_packet_count,
  692. &cb, &skey));
  693. /* spawn parallel recv()s */
  694. pj_bzero(recv_datas, sizeof(recv_datas));
  695. for (i=0; i<ASYNC_CNT; ++i) {
  696. pj_ioqueue_op_key_init(&recv_ops[i], sizeof(pj_ioqueue_op_key_t));
  697. recv_ops[i].user_data = &recv_datas[i];
  698. recv_datas[i].len = sizeof(packet_t);
  699. CHECK(-140, pj_ioqueue_recv(skey, &recv_ops[i], &recv_datas[i].buffer,
  700. &recv_datas[i].len, 0));
  701. }
  702. /* spawn polling threads */
  703. pj_bzero(thread_datas, sizeof(thread_datas));
  704. for (i=0; i<ASYNC_CNT; ++i) {
  705. parallel_thread_data *arg = &thread_datas[i];
  706. arg->ioqueue = ioqueue;
  707. arg->id = i;
  708. arg->timeout = TIMEOUT_SECS;
  709. CHECK(-150, pj_thread_create(pool, "parallel_thread",
  710. parallel_worker_thread, arg,
  711. 0, 0,&threads[i]));
  712. }
  713. /* now slowly send packet one by one. Let's hope the OS doesn't drop
  714. * our packet, since it's UDP
  715. */
  716. pj_thread_sleep(100); /* allow thread to start */
  717. for (i=0; i<ASYNC_CNT; ++i) {
  718. packet_t send_buf = i;
  719. pj_ssize_t len = sizeof(send_buf);
  720. pj_status_t status;
  721. pj_thread_sleep((i>0)*SEND_DELAY_MSECS);
  722. TRACE__((THIS_FILE, "....sending"));
  723. status = pj_sock_send(csock, &send_buf, &len, 0);
  724. if (status==PJ_EPENDING) {
  725. ++async_send;
  726. TRACE__((THIS_FILE, "......(was async sent)"));
  727. } else if (status != PJ_SUCCESS) {
  728. PJ_PERROR(1,(THIS_FILE, status, "......send error"));
  729. retcode = -160;
  730. goto on_return;
  731. }
  732. }
  733. /* Signal threads that it's done */
  734. for (i=0; i<ASYNC_CNT; ++i) {
  735. parallel_thread_data *arg = &thread_datas[i];
  736. arg->quit_flag = 1;
  737. }
  738. /* Wait until all threads quits */
  739. for (i=0; i<ASYNC_CNT; ++i) {
  740. CHECK(-170, pj_thread_join(threads[i]));
  741. CHECK(-180, pj_thread_destroy(threads[i]));
  742. }
  743. /* Display thread statistics */
  744. PJ_LOG(3,(THIS_FILE, "....Threads statistics:"));
  745. PJ_LOG(3,(THIS_FILE, " ============================="));
  746. PJ_LOG(3,(THIS_FILE, " Thread Wakeups Events Errors"));
  747. PJ_LOG(3,(THIS_FILE, " ============================="));
  748. pj_bzero(&threads_total, sizeof(threads_total));
  749. for (i=0; i<ASYNC_CNT; ++i) {
  750. parallel_thread_data *arg = &thread_datas[i];
  751. threads_total.wakeup_cnt += arg->wakeup_cnt;
  752. threads_total.event_cnt += arg->event_cnt;
  753. threads_total.err_cnt += arg->err_cnt;
  754. PJ_LOG(3,(THIS_FILE, " %6d %6d %6d %6d",
  755. arg->id, arg->wakeup_cnt, arg->event_cnt, arg->err_cnt));
  756. }
  757. retcode = 0;
  758. /* Analyze results */
  759. //assert(threads_total.event_cnt == recv_packet_count);
  760. if (recv_packet_count != ASYNC_CNT) {
  761. PJ_LOG(1,(THIS_FILE, "....error: rx packet count is %d (expecting %d)",
  762. recv_packet_count, ASYNC_CNT));
  763. retcode = -500;
  764. }
  765. if (threads_total.wakeup_cnt > ASYNC_CNT+async_send) {
  766. PJ_LOG(3,(THIS_FILE, "....info: total wakeup count is %d "
  767. "(the perfect count is %d). This shows that "
  768. "threads are woken up without getting any events",
  769. threads_total.wakeup_cnt, ASYNC_CNT+async_send));
  770. }
  771. if (threads_total.err_cnt > 0) {
  772. PJ_LOG(3,(THIS_FILE, "....info: total error count is %d "
  773. "(it should be 0)",
  774. threads_total.err_cnt));
  775. }
  776. if (retcode==0)
  777. PJ_LOG(3,(THIS_FILE, "....success"));
  778. on_return:
  779. if (skey)
  780. pj_ioqueue_unregister(skey);
  781. if (csock != PJ_INVALID_SOCKET)
  782. pj_sock_close(csock);
  783. if (ioqueue)
  784. pj_ioqueue_destroy(ioqueue);
  785. pj_pool_release(pool);
  786. return retcode;
  787. }
  788. #endif /* PJ_HAS_THREADS */
  789. /*
  790. * Multi-operation test.
  791. */
  792. /*
  793. * Benchmarking IOQueue
  794. */
  795. static int bench_test(const pj_ioqueue_cfg *cfg, int bufsize,
  796. int inactive_sock_count)
  797. {
  798. pj_sock_t ssock=-1, csock=-1;
  799. pj_sockaddr_in addr;
  800. pj_pool_t *pool = NULL;
  801. pj_sock_t *inactive_sock=NULL;
  802. pj_ioqueue_op_key_t *inactive_read_op;
  803. char *send_buf, *recv_buf;
  804. pj_ioqueue_t *ioque = NULL;
  805. pj_ioqueue_key_t *skey, *ckey, *keys[SOCK_INACTIVE_MAX+2];
  806. pj_timestamp t1, t2, t_elapsed;
  807. int rc=0, i; /* i must be signed */
  808. pj_str_t temp;
  809. char errbuf[PJ_ERR_MSG_SIZE];
  810. TRACE__((THIS_FILE, " bench test %d", inactive_sock_count));
  811. // Create pool.
  812. pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);
  813. // Allocate buffers for send and receive.
  814. send_buf = (char*)pj_pool_alloc(pool, bufsize);
  815. recv_buf = (char*)pj_pool_alloc(pool, bufsize);
  816. // Allocate sockets for sending and receiving.
  817. rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, &ssock);
  818. if (rc == PJ_SUCCESS) {
  819. rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, &csock);
  820. } else
  821. csock = PJ_INVALID_SOCKET;
  822. if (rc != PJ_SUCCESS) {
  823. app_perror("...error: pj_sock_socket()", rc);
  824. goto on_error;
  825. }
  826. // Bind server socket.
  827. pj_bzero(&addr, sizeof(addr));
  828. addr.sin_family = pj_AF_INET();
  829. addr.sin_port = pj_htons(PORT);
  830. if (pj_sock_bind(ssock, &addr, sizeof(addr)))
  831. goto on_error;
  832. pj_assert(inactive_sock_count+2 <= PJ_IOQUEUE_MAX_HANDLES);
  833. // Create I/O Queue.
  834. rc = pj_ioqueue_create2(pool, PJ_IOQUEUE_MAX_HANDLES, cfg, &ioque);
  835. if (rc != PJ_SUCCESS) {
  836. app_perror("...error: pj_ioqueue_create()", rc);
  837. goto on_error;
  838. }
  839. // Allocate inactive sockets, and bind them to some arbitrary address.
  840. // Then register them to the I/O queue, and start a read operation.
  841. inactive_sock = (pj_sock_t*)pj_pool_alloc(pool,
  842. inactive_sock_count*sizeof(pj_sock_t));
  843. inactive_read_op = (pj_ioqueue_op_key_t*)pj_pool_alloc(pool,
  844. inactive_sock_count*sizeof(pj_ioqueue_op_key_t));
  845. pj_bzero(&addr, sizeof(addr));
  846. addr.sin_family = pj_AF_INET();
  847. for (i=0; i<inactive_sock_count; ++i) {
  848. pj_ssize_t bytes;
  849. rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, &inactive_sock[i]);
  850. if (rc != PJ_SUCCESS || inactive_sock[i] < 0) {
  851. app_perror("...error: pj_sock_socket()", rc);
  852. goto on_error;
  853. }
  854. if ((rc=pj_sock_bind(inactive_sock[i], &addr, sizeof(addr))) != 0) {
  855. pj_sock_close(inactive_sock[i]);
  856. inactive_sock[i] = PJ_INVALID_SOCKET;
  857. app_perror("...error: pj_sock_bind()", rc);
  858. goto on_error;
  859. }
  860. rc = pj_ioqueue_register_sock(pool, ioque, inactive_sock[i],
  861. NULL, &test_cb, &keys[i]);
  862. if (rc != PJ_SUCCESS) {
  863. pj_sock_close(inactive_sock[i]);
  864. inactive_sock[i] = PJ_INVALID_SOCKET;
  865. app_perror("...error(1): pj_ioqueue_register_sock()", rc);
  866. PJ_LOG(3,(THIS_FILE, "....i=%d", i));
  867. goto on_error;
  868. }
  869. bytes = bufsize;
  870. pj_ioqueue_op_key_init(&inactive_read_op[i],
  871. sizeof(inactive_read_op[i]));
  872. rc = pj_ioqueue_recv(keys[i], &inactive_read_op[i], recv_buf, &bytes, 0);
  873. if (rc != PJ_EPENDING) {
  874. pj_sock_close(inactive_sock[i]);
  875. inactive_sock[i] = PJ_INVALID_SOCKET;
  876. app_perror("...error: pj_ioqueue_read()", rc);
  877. goto on_error;
  878. }
  879. }
  880. // Register server and client socket.
  881. // We put this after inactivity socket, hopefully this can represent the
  882. // worst waiting time.
  883. rc = pj_ioqueue_register_sock(pool, ioque, ssock, NULL,
  884. &test_cb, &skey);
  885. if (rc != PJ_SUCCESS) {
  886. app_perror("...error(2): pj_ioqueue_register_sock()", rc);
  887. goto on_error;
  888. }
  889. rc = pj_ioqueue_register_sock(pool, ioque, csock, NULL,
  890. &test_cb, &ckey);
  891. if (rc != PJ_SUCCESS) {
  892. app_perror("...error(3): pj_ioqueue_register_sock()", rc);
  893. goto on_error;
  894. }
  895. // Set destination address to send the packet.
  896. pj_sockaddr_in_init(&addr, pj_cstr(&temp, "127.0.0.1"), PORT);
  897. // Test loop.
  898. t_elapsed.u64 = 0;
  899. for (i=0; i<LOOP; ++i) {
  900. pj_ssize_t bytes;
  901. pj_ioqueue_op_key_t read_op, write_op;
  902. // Randomize send buffer.
  903. pj_create_random_string(send_buf, bufsize);
  904. // Init operation keys.
  905. pj_ioqueue_op_key_init(&read_op, sizeof(read_op));
  906. pj_ioqueue_op_key_init(&write_op, sizeof(write_op));
  907. // Start reading on the server side.
  908. bytes = bufsize;
  909. rc = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0);
  910. if (rc != PJ_EPENDING) {
  911. app_perror("...error: pj_ioqueue_read()", rc);
  912. break;
  913. }
  914. // Starts send on the client side.
  915. bytes = bufsize;
  916. rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0,
  917. &addr, sizeof(addr));
  918. if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
  919. app_perror("...error: pj_ioqueue_write()", rc);
  920. break;
  921. }
  922. if (rc == PJ_SUCCESS) {
  923. if (bytes < 0) {
  924. app_perror("...error: pj_ioqueue_sendto()",(pj_status_t)-bytes);
  925. break;
  926. }
  927. }
  928. // Begin time.
  929. pj_get_timestamp(&t1);
  930. // Poll the queue until we've got completion event in the server side.
  931. callback_read_key = NULL;
  932. callback_read_size = 0;
  933. TRACE__((THIS_FILE, " waiting for key = %p", skey));
  934. do {
  935. pj_time_val timeout = { 1, 0 };
  936. #ifdef PJ_SYMBIAN
  937. rc = pj_symbianos_poll(-1, PJ_TIME_VAL_MSEC(timeout));
  938. #else
  939. rc = pj_ioqueue_poll(ioque, &timeout);
  940. #endif
  941. TRACE__((THIS_FILE, " poll rc=%d", rc));
  942. } while (rc >= 0 && callback_read_key != skey);
  943. // End time.
  944. pj_get_timestamp(&t2);
  945. t_elapsed.u64 += (t2.u64 - t1.u64);
  946. if (rc < 0) {
  947. app_perror(" error: pj_ioqueue_poll", -rc);
  948. break;
  949. }
  950. // Compare recv buffer with send buffer.
  951. if (callback_read_size != bufsize ||
  952. pj_memcmp(send_buf, recv_buf, bufsize))
  953. {
  954. rc = -10;
  955. PJ_LOG(3,(THIS_FILE, " error: size/buffer mismatch"));
  956. break;
  957. }
  958. // Poll until all events are exhausted, before we start the next loop.
  959. do {
  960. pj_time_val timeout = { 0, 10 };
  961. #ifdef PJ_SYMBIAN
  962. PJ_UNUSED_ARG(timeout);
  963. rc = pj_symbianos_poll(-1, 100);
  964. #else
  965. rc = pj_ioqueue_poll(ioque, &timeout);
  966. #endif
  967. } while (rc>0);
  968. rc = 0;
  969. }
  970. // Print results
  971. if (rc == 0) {
  972. pj_timestamp tzero;
  973. pj_uint32_t usec_delay;
  974. tzero.u32.hi = tzero.u32.lo = 0;
  975. usec_delay = pj_elapsed_usec( &tzero, &t_elapsed);
  976. PJ_LOG(3, (THIS_FILE, "...%10d %15d % 9d",
  977. bufsize, inactive_sock_count, usec_delay));
  978. } else {
  979. PJ_LOG(2, (THIS_FILE, "...ERROR rc=%d (buf:%d, fds:%d)",
  980. rc, bufsize, inactive_sock_count+2));
  981. }
  982. // Cleaning up.
  983. for (i=inactive_sock_count-1; i>=0; --i) {
  984. pj_ioqueue_unregister(keys[i]);
  985. }
  986. pj_ioqueue_unregister(skey);
  987. pj_ioqueue_unregister(ckey);
  988. pj_ioqueue_destroy(ioque);
  989. pj_pool_release( pool);
  990. return rc;
  991. on_error:
  992. pj_strerror(pj_get_netos_error(), errbuf, sizeof(errbuf));
  993. PJ_LOG(1,(THIS_FILE, "...ERROR: %s", errbuf));
  994. if (ssock >= 0)
  995. pj_sock_close(ssock);
  996. if (csock >= 0)
  997. pj_sock_close(csock);
  998. for (i=0; i<inactive_sock_count && inactive_sock &&
  999. inactive_sock[i]!=PJ_INVALID_SOCKET; ++i)
  1000. {
  1001. pj_sock_close(inactive_sock[i]);
  1002. }
  1003. if (ioque != NULL)
  1004. pj_ioqueue_destroy(ioque);
  1005. pj_pool_release( pool);
  1006. return -1;
  1007. }
  1008. static int udp_ioqueue_test_imp(const pj_ioqueue_cfg *cfg)
  1009. {
  1010. int status;
  1011. int bufsize, sock_count;
  1012. char title[64];
  1013. pj_ansi_snprintf(title, sizeof(title), "%s (concur:%d, epoll_flags:0x%x)",
  1014. pj_ioqueue_name(), cfg->default_concurrency,
  1015. cfg->epoll_flags);
  1016. //goto pass1;
  1017. PJ_LOG(3, (THIS_FILE, "...compliance test (%s)", title));
  1018. if ((status=compliance_test(cfg)) != 0) {
  1019. return status;
  1020. }
  1021. PJ_LOG(3, (THIS_FILE, "....compliance test ok"));
  1022. PJ_LOG(3, (THIS_FILE, "...unregister test (%s)", title));
  1023. if ((status=unregister_test(cfg)) != 0) {
  1024. return status;
  1025. }
  1026. PJ_LOG(3, (THIS_FILE, "....unregister test ok"));
  1027. if ((status=many_handles_test(cfg)) != 0) {
  1028. return status;
  1029. }
  1030. //return 0;
  1031. PJ_LOG(4, (THIS_FILE, "...benchmarking different buffer size:"));
  1032. PJ_LOG(4, (THIS_FILE, "... note: buf=bytes sent, fds=# of fds, "
  1033. "elapsed=in timer ticks"));
  1034. //pass1:
  1035. PJ_LOG(3, (THIS_FILE, "...Benchmarking poll times for %s:", title));
  1036. PJ_LOG(3, (THIS_FILE, "...====================================="));
  1037. PJ_LOG(3, (THIS_FILE, "...Buf.size #inactive-socks Time/poll"));
  1038. PJ_LOG(3, (THIS_FILE, "... (bytes) (nanosec)"));
  1039. PJ_LOG(3, (THIS_FILE, "...====================================="));
  1040. //goto pass2;
  1041. for (bufsize=BUF_MIN_SIZE; bufsize <= BUF_MAX_SIZE; bufsize *= 2) {
  1042. if ((status=bench_test(cfg, bufsize, SOCK_INACTIVE_MIN)) != 0)
  1043. return status;
  1044. }
  1045. //pass2:
  1046. bufsize = 512;
  1047. for (sock_count=SOCK_INACTIVE_MIN+2;
  1048. sock_count<=SOCK_INACTIVE_MAX+2;
  1049. sock_count *= 2)
  1050. {
  1051. //PJ_LOG(3,(THIS_FILE, "...testing with %d fds", sock_count));
  1052. if ((status=bench_test(cfg, bufsize, sock_count-2)) != 0)
  1053. return status;
  1054. }
  1055. return 0;
  1056. }
  1057. int udp_ioqueue_test()
  1058. {
  1059. pj_ioqueue_epoll_flag epoll_flags[] = {
  1060. #if PJ_HAS_LINUX_EPOLL
  1061. PJ_IOQUEUE_EPOLL_AUTO,
  1062. PJ_IOQUEUE_EPOLL_EXCLUSIVE,
  1063. PJ_IOQUEUE_EPOLL_ONESHOT,
  1064. 0,
  1065. #else
  1066. PJ_IOQUEUE_EPOLL_AUTO,
  1067. #endif
  1068. };
  1069. pj_bool_t concurs[] = { PJ_TRUE, PJ_FALSE };
  1070. int i, rc, err = 0;
  1071. for (i=0; i<(int)PJ_ARRAY_SIZE(epoll_flags); ++i) {
  1072. pj_ioqueue_cfg cfg;
  1073. pj_ioqueue_cfg_default(&cfg);
  1074. cfg.epoll_flags = epoll_flags[i];
  1075. PJ_LOG(3, (THIS_FILE, "..%s UDP compliance test, epoll_flags=0x%x",
  1076. pj_ioqueue_name(), cfg.epoll_flags));
  1077. rc = udp_ioqueue_test_imp(&cfg);
  1078. if (rc != 0 && err==0)
  1079. err = rc;
  1080. }
  1081. for (i=0; i<(int)PJ_ARRAY_SIZE(concurs); ++i) {
  1082. pj_ioqueue_cfg cfg;
  1083. pj_ioqueue_cfg_default(&cfg);
  1084. cfg.default_concurrency = concurs[i];
  1085. PJ_LOG(3, (THIS_FILE, "..%s UDP compliance test, concurrency=%d",
  1086. pj_ioqueue_name(), cfg.default_concurrency));
  1087. rc = udp_ioqueue_test_imp(&cfg);
  1088. if (rc != 0 && err==0)
  1089. err = rc;
  1090. }
  1091. #if PJ_HAS_THREADS
  1092. for (i=0; i<(int)PJ_ARRAY_SIZE(epoll_flags); ++i) {
  1093. pj_ioqueue_cfg cfg;
  1094. pj_ioqueue_cfg_default(&cfg);
  1095. cfg.epoll_flags = epoll_flags[i];
  1096. PJ_LOG(3, (THIS_FILE, "..%s UDP parallel compliance test, epoll_flags=0x%x",
  1097. pj_ioqueue_name(), cfg.epoll_flags));
  1098. rc = parallel_recv_test(&cfg);
  1099. if (rc != 0 && err==0)
  1100. err = rc;
  1101. }
  1102. #endif
  1103. return err;
  1104. }
  1105. #else
  1106. /* To prevent warning about "translation unit is empty"
  1107. * when this test is disabled.
  1108. */
  1109. int dummy_uiq_udp;
  1110. #endif /* INCLUDE_UDP_IOQUEUE_TEST */