ioq_tcp.c 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999
  1. /*
  2. * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
  3. * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation; either version 2 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program; if not, write to the Free Software
  17. * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  18. */
  19. #include "test.h"
  20. /**
  21. * \page page_pjlib_ioqueue_tcp_test Test: I/O Queue (TCP)
  22. *
  23. * This file provides implementation to test the
  24. * functionality of the I/O queue when TCP socket is used.
  25. *
  26. *
  27. * This file is <b>pjlib-test/ioq_tcp.c</b>
  28. *
  29. * \include pjlib-test/ioq_tcp.c
  30. */
  31. #if INCLUDE_TCP_IOQUEUE_TEST
  32. #include <pjlib.h>
  33. #if PJ_HAS_TCP
  34. #define THIS_FILE "test_tcp"
  35. #define NON_EXISTANT_PORT 50123
  36. #define LOOP 100
  37. #define BUF_MIN_SIZE 32
  38. #define BUF_MAX_SIZE 2048
  39. #define SOCK_INACTIVE_MIN (4-2)
  40. #define SOCK_INACTIVE_MAX (PJ_IOQUEUE_MAX_HANDLES - 2)
  41. #define POOL_SIZE (2*BUF_MAX_SIZE + SOCK_INACTIVE_MAX*128 + 2048)
  42. static pj_ssize_t callback_read_size,
  43. callback_write_size,
  44. callback_accept_status,
  45. callback_connect_status;
  46. static unsigned callback_call_count;
  47. static pj_ioqueue_key_t *callback_read_key,
  48. *callback_write_key,
  49. *callback_accept_key,
  50. *callback_connect_key;
  51. static pj_ioqueue_op_key_t *callback_read_op,
  52. *callback_write_op,
  53. *callback_accept_op;
  54. static void on_ioqueue_read(pj_ioqueue_key_t *key,
  55. pj_ioqueue_op_key_t *op_key,
  56. pj_ssize_t bytes_read)
  57. {
  58. callback_read_key = key;
  59. callback_read_op = op_key;
  60. callback_read_size = bytes_read;
  61. callback_call_count++;
  62. }
  63. static void on_ioqueue_write(pj_ioqueue_key_t *key,
  64. pj_ioqueue_op_key_t *op_key,
  65. pj_ssize_t bytes_written)
  66. {
  67. callback_write_key = key;
  68. callback_write_op = op_key;
  69. callback_write_size = bytes_written;
  70. callback_call_count++;
  71. }
  72. static void on_ioqueue_accept(pj_ioqueue_key_t *key,
  73. pj_ioqueue_op_key_t *op_key,
  74. pj_sock_t sock,
  75. int status)
  76. {
  77. if (sock == PJ_INVALID_SOCKET) {
  78. if (status != PJ_SUCCESS) {
  79. /* Ignore. Could be blocking error */
  80. app_perror(".....warning: received error in on_ioqueue_accept() callback",
  81. status);
  82. } else {
  83. callback_accept_status = -61;
  84. PJ_LOG(3,("", "..... on_ioqueue_accept() callback was given "
  85. "invalid socket and status is %d", status));
  86. }
  87. } else {
  88. pj_sockaddr addr;
  89. int client_addr_len;
  90. client_addr_len = sizeof(addr);
  91. status = pj_sock_getsockname(sock, &addr, &client_addr_len);
  92. if (status != PJ_SUCCESS) {
  93. app_perror("...ERROR in pj_sock_getsockname()", status);
  94. }
  95. callback_accept_key = key;
  96. callback_accept_op = op_key;
  97. callback_accept_status = status;
  98. callback_call_count++;
  99. }
  100. }
  101. static void on_ioqueue_connect(pj_ioqueue_key_t *key, int status)
  102. {
  103. callback_connect_key = key;
  104. callback_connect_status = status;
  105. callback_call_count++;
  106. }
  107. static pj_ioqueue_callback test_cb =
  108. {
  109. &on_ioqueue_read,
  110. &on_ioqueue_write,
  111. &on_ioqueue_accept,
  112. &on_ioqueue_connect,
  113. };
  114. static int send_recv_test(pj_ioqueue_t *ioque,
  115. pj_ioqueue_key_t *skey,
  116. pj_ioqueue_key_t *ckey,
  117. void *send_buf,
  118. void *recv_buf,
  119. pj_ssize_t bufsize,
  120. pj_timestamp *t_elapsed)
  121. {
  122. pj_status_t status;
  123. pj_ssize_t bytes;
  124. pj_time_val timeout;
  125. pj_timestamp t1, t2;
  126. int i, pending_op = 0;
  127. pj_ioqueue_op_key_t read_op, write_op;
  128. // Init operation keys.
  129. pj_ioqueue_op_key_init(&read_op, sizeof(read_op));
  130. pj_ioqueue_op_key_init(&write_op, sizeof(write_op));
  131. // Start reading on the server side.
  132. bytes = bufsize;
  133. status = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0);
  134. if (status != PJ_SUCCESS && status != PJ_EPENDING) {
  135. app_perror("...pj_ioqueue_recv error", status);
  136. return -100;
  137. }
  138. if (status == PJ_EPENDING)
  139. ++pending_op;
  140. else {
  141. /* Does not expect to return error or immediate data. */
  142. return -115;
  143. }
  144. // Randomize send buffer.
  145. pj_create_random_string((char*)send_buf, bufsize);
  146. // Starts send on the client side.
  147. bytes = bufsize;
  148. status = pj_ioqueue_send(ckey, &write_op, send_buf, &bytes, 0);
  149. if (status != PJ_SUCCESS && bytes != PJ_EPENDING) {
  150. return -120;
  151. }
  152. if (status == PJ_EPENDING) {
  153. ++pending_op;
  154. }
  155. // Begin time.
  156. pj_get_timestamp(&t1);
  157. // Reset indicators
  158. callback_read_size = callback_write_size = 0;
  159. callback_read_key = callback_write_key = NULL;
  160. callback_read_op = callback_write_op = NULL;
  161. // Poll the queue until we've got completion event in the server side.
  162. status = 0;
  163. while (pending_op > 0) {
  164. timeout.sec = 1; timeout.msec = 0;
  165. #ifdef PJ_SYMBIAN
  166. PJ_UNUSED_ARG(ioque);
  167. status = pj_symbianos_poll(-1, 1000);
  168. #else
  169. status = pj_ioqueue_poll(ioque, &timeout);
  170. #endif
  171. if (status > 0) {
  172. if (callback_read_size) {
  173. if (callback_read_size != bufsize)
  174. return -160;
  175. if (callback_read_key != skey)
  176. return -161;
  177. if (callback_read_op != &read_op)
  178. return -162;
  179. }
  180. if (callback_write_size) {
  181. if (callback_write_key != ckey)
  182. return -163;
  183. if (callback_write_op != &write_op)
  184. return -164;
  185. }
  186. pending_op -= status;
  187. }
  188. if (status == 0) {
  189. PJ_LOG(3,("", "...error: timed out"));
  190. }
  191. if (status < 0) {
  192. return -170;
  193. }
  194. }
  195. // Pending op is zero.
  196. // Subsequent poll should yield zero too.
  197. for (i=0; i<10; ++i) {
  198. timeout.sec = 0;
  199. timeout.msec = 50;
  200. #ifdef PJ_SYMBIAN
  201. status = pj_symbianos_poll(-1, 1);
  202. #else
  203. status = pj_ioqueue_poll(ioque, &timeout);
  204. #endif
  205. if (status != 0)
  206. return -173;
  207. }
  208. // End time.
  209. pj_get_timestamp(&t2);
  210. t_elapsed->u32.lo += (t2.u32.lo - t1.u32.lo);
  211. // Compare recv buffer with send buffer.
  212. if (pj_memcmp(send_buf, recv_buf, bufsize) != 0) {
  213. return -180;
  214. }
  215. // Success
  216. return 0;
  217. }
  218. /*
  219. * Compliance test for success scenario.
  220. */
  221. static int compliance_test_0(const pj_ioqueue_cfg *cfg)
  222. {
  223. pj_sock_t ssock=-1, csock0=-1, csock1=-1;
  224. pj_sockaddr_in addr, client_addr, rmt_addr;
  225. int client_addr_len;
  226. pj_pool_t *pool = NULL;
  227. char *send_buf, *recv_buf;
  228. pj_ioqueue_t *ioque = NULL;
  229. pj_ioqueue_key_t *skey=NULL, *ckey0=NULL, *ckey1=NULL;
  230. pj_ioqueue_op_key_t accept_op;
  231. int bufsize = BUF_MIN_SIZE;
  232. int status = -1;
  233. int pending_op = 0;
  234. pj_timestamp t_elapsed;
  235. pj_str_t s;
  236. pj_status_t rc;
  237. // Create pool.
  238. pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);
  239. // Allocate buffers for send and receive.
  240. send_buf = (char*)pj_pool_alloc(pool, bufsize);
  241. recv_buf = (char*)pj_pool_alloc(pool, bufsize);
  242. // Create server socket and client socket for connecting
  243. rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, &ssock);
  244. if (rc != PJ_SUCCESS) {
  245. app_perror("...error creating socket", rc);
  246. status=-1; goto on_error;
  247. }
  248. rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, &csock1);
  249. if (rc != PJ_SUCCESS) {
  250. app_perror("...error creating socket", rc);
  251. status=-1; goto on_error;
  252. }
  253. // Bind server socket.
  254. pj_sockaddr_in_init(&addr, 0, 0);
  255. if ((rc=pj_sock_bind(ssock, &addr, sizeof(addr))) != 0 ) {
  256. app_perror("...bind error", rc);
  257. status=-10; goto on_error;
  258. }
  259. // Get server address.
  260. client_addr_len = sizeof(addr);
  261. rc = pj_sock_getsockname(ssock, &addr, &client_addr_len);
  262. if (rc != PJ_SUCCESS) {
  263. app_perror("...ERROR in pj_sock_getsockname()", rc);
  264. status=-15; goto on_error;
  265. }
  266. addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1"));
  267. // Create I/O Queue.
  268. rc = pj_ioqueue_create2(pool, PJ_IOQUEUE_MAX_HANDLES, cfg, &ioque);
  269. if (rc != PJ_SUCCESS) {
  270. app_perror("...ERROR in pj_ioqueue_create()", rc);
  271. status=-20; goto on_error;
  272. }
  273. // Init operation key.
  274. pj_ioqueue_op_key_init(&accept_op, sizeof(accept_op));
  275. // Register server socket and client socket.
  276. rc = pj_ioqueue_register_sock(pool, ioque, ssock, NULL, &test_cb, &skey);
  277. if (rc == PJ_SUCCESS)
  278. rc = pj_ioqueue_register_sock(pool, ioque, csock1, NULL, &test_cb,
  279. &ckey1);
  280. else
  281. ckey1 = NULL;
  282. if (rc != PJ_SUCCESS) {
  283. app_perror("...ERROR in pj_ioqueue_register_sock()", rc);
  284. status=-23; goto on_error;
  285. }
  286. // Server socket listen().
  287. if (pj_sock_listen(ssock, 5)) {
  288. app_perror("...ERROR in pj_sock_listen()", rc);
  289. status=-25; goto on_error;
  290. }
  291. // Server socket accept()
  292. client_addr_len = sizeof(pj_sockaddr_in);
  293. status = pj_ioqueue_accept(skey, &accept_op, &csock0,
  294. &client_addr, &rmt_addr, &client_addr_len);
  295. if (status != PJ_EPENDING) {
  296. app_perror("...ERROR in pj_ioqueue_accept()", rc);
  297. status=-30; goto on_error;
  298. }
  299. if (status==PJ_EPENDING) {
  300. ++pending_op;
  301. }
  302. // Client socket connect()
  303. status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr));
  304. if (status!=PJ_SUCCESS && status != PJ_EPENDING) {
  305. app_perror("...ERROR in pj_ioqueue_connect()", rc);
  306. status=-40; goto on_error;
  307. }
  308. if (status==PJ_EPENDING) {
  309. ++pending_op;
  310. }
  311. // Poll until connected
  312. callback_read_size = callback_write_size = 0;
  313. callback_accept_status = callback_connect_status = -2;
  314. callback_call_count = 0;
  315. callback_read_key = callback_write_key =
  316. callback_accept_key = callback_connect_key = NULL;
  317. callback_accept_op = callback_read_op = callback_write_op = NULL;
  318. while (pending_op) {
  319. pj_time_val timeout = {1, 0};
  320. #ifdef PJ_SYMBIAN
  321. callback_call_count = 0;
  322. pj_symbianos_poll(-1, PJ_TIME_VAL_MSEC(timeout));
  323. status = callback_call_count;
  324. #else
  325. status = pj_ioqueue_poll(ioque, &timeout);
  326. #endif
  327. if (status > 0) {
  328. if (callback_accept_status != -2) {
  329. if (callback_accept_status != 0) {
  330. status=-41; goto on_error;
  331. }
  332. if (callback_accept_key != skey) {
  333. status=-42; goto on_error;
  334. }
  335. if (callback_accept_op != &accept_op) {
  336. status=-43; goto on_error;
  337. }
  338. callback_accept_status = -2;
  339. }
  340. if (callback_connect_status != -2) {
  341. if (callback_connect_status != 0) {
  342. status=-50; goto on_error;
  343. }
  344. if (callback_connect_key != ckey1) {
  345. status=-51; goto on_error;
  346. }
  347. callback_connect_status = -2;
  348. }
  349. if (status > pending_op) {
  350. PJ_LOG(3,(THIS_FILE,
  351. "...error: pj_ioqueue_poll() returned %d "
  352. "(only expecting %d)",
  353. status, pending_op));
  354. return -52;
  355. }
  356. pending_op -= status;
  357. }
  358. }
  359. // There's no pending operation.
  360. // When we poll the ioqueue, there must not be events.
  361. if (pending_op == 0) {
  362. unsigned i;
  363. for (i=0; i<10; ++i) {
  364. pj_time_val timeout = {0, 50};
  365. #ifdef PJ_SYMBIAN
  366. status = pj_symbianos_poll(-1, PJ_TIME_VAL_MSEC(timeout));
  367. #else
  368. status = pj_ioqueue_poll(ioque, &timeout);
  369. #endif
  370. if (status != 0) {
  371. status=-60; goto on_error;
  372. }
  373. }
  374. }
  375. // Check accepted socket.
  376. if (csock0 == PJ_INVALID_SOCKET) {
  377. status = -69;
  378. app_perror("...accept() error", pj_get_os_error());
  379. goto on_error;
  380. }
  381. // Register newly accepted socket.
  382. rc = pj_ioqueue_register_sock(pool, ioque, csock0, NULL,
  383. &test_cb, &ckey0);
  384. if (rc != PJ_SUCCESS) {
  385. app_perror("...ERROR in pj_ioqueue_register_sock", rc);
  386. status = -70;
  387. goto on_error;
  388. }
  389. // Test send and receive.
  390. t_elapsed.u32.lo = 0;
  391. status = send_recv_test(ioque, ckey0, ckey1, send_buf,
  392. recv_buf, bufsize, &t_elapsed);
  393. if (status != 0) {
  394. goto on_error;
  395. }
  396. // Success
  397. status = 0;
  398. on_error:
  399. if (skey != NULL)
  400. pj_ioqueue_unregister(skey);
  401. else if (ssock != PJ_INVALID_SOCKET)
  402. pj_sock_close(ssock);
  403. if (ckey1 != NULL)
  404. pj_ioqueue_unregister(ckey1);
  405. else if (csock1 != PJ_INVALID_SOCKET)
  406. pj_sock_close(csock1);
  407. if (ckey0 != NULL)
  408. pj_ioqueue_unregister(ckey0);
  409. else if (csock0 != PJ_INVALID_SOCKET)
  410. pj_sock_close(csock0);
  411. if (ioque != NULL)
  412. pj_ioqueue_destroy(ioque);
  413. pj_pool_release(pool);
  414. return status;
  415. }
  416. /*
  417. * Compliance test for failed scenario.
  418. * In this case, the client connects to a non-existant service.
  419. */
  420. static int compliance_test_1(const pj_ioqueue_cfg *cfg)
  421. {
  422. pj_sock_t csock1=PJ_INVALID_SOCKET;
  423. pj_sockaddr_in addr;
  424. pj_pool_t *pool = NULL;
  425. pj_ioqueue_t *ioque = NULL;
  426. pj_ioqueue_key_t *ckey1 = NULL;
  427. int status = -1;
  428. int pending_op = 0;
  429. pj_str_t s;
  430. pj_status_t rc;
  431. // Create pool.
  432. pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);
  433. // Create I/O Queue.
  434. rc = pj_ioqueue_create2(pool, PJ_IOQUEUE_MAX_HANDLES, cfg, &ioque);
  435. if (!ioque) {
  436. status=-20; goto on_error;
  437. }
  438. // Create client socket
  439. rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, &csock1);
  440. if (rc != PJ_SUCCESS) {
  441. app_perror("...ERROR in pj_sock_socket()", rc);
  442. status=-1; goto on_error;
  443. }
  444. // Register client socket.
  445. rc = pj_ioqueue_register_sock(pool, ioque, csock1, NULL,
  446. &test_cb, &ckey1);
  447. if (rc != PJ_SUCCESS) {
  448. app_perror("...ERROR in pj_ioqueue_register_sock()", rc);
  449. status=-23; goto on_error;
  450. }
  451. // Initialize remote address.
  452. pj_sockaddr_in_init(&addr, pj_cstr(&s, "127.0.0.1"), NON_EXISTANT_PORT);
  453. // Client socket connect()
  454. status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr));
  455. if (status==PJ_SUCCESS) {
  456. // unexpectedly success!
  457. status = -30;
  458. goto on_error;
  459. }
  460. if (status != PJ_EPENDING) {
  461. // success
  462. } else {
  463. ++pending_op;
  464. }
  465. callback_connect_status = -2;
  466. callback_connect_key = NULL;
  467. // Poll until we've got result
  468. while (pending_op) {
  469. pj_time_val timeout = {1, 0};
  470. #ifdef PJ_SYMBIAN
  471. callback_call_count = 0;
  472. pj_symbianos_poll(-1, PJ_TIME_VAL_MSEC(timeout));
  473. status = callback_call_count;
  474. #else
  475. status = pj_ioqueue_poll(ioque, &timeout);
  476. #endif
  477. if (status > 0) {
  478. if (callback_connect_key==ckey1) {
  479. if (callback_connect_status == 0) {
  480. // unexpectedly connected!
  481. status = -50;
  482. goto on_error;
  483. }
  484. }
  485. if (status > pending_op) {
  486. PJ_LOG(3,(THIS_FILE,
  487. "...error: pj_ioqueue_poll() returned %d "
  488. "(only expecting %d)",
  489. status, pending_op));
  490. return -552;
  491. }
  492. pending_op -= status;
  493. if (pending_op == 0) {
  494. status = 0;
  495. }
  496. }
  497. }
  498. // There's no pending operation.
  499. // When we poll the ioqueue, there must not be events.
  500. if (pending_op == 0) {
  501. unsigned i;
  502. for (i=0; i<10; ++i) {
  503. pj_time_val timeout = {0, 50};
  504. #ifdef PJ_SYMBIAN
  505. status = pj_symbianos_poll(-1, PJ_TIME_VAL_MSEC(timeout));
  506. #else
  507. status = pj_ioqueue_poll(ioque, &timeout);
  508. #endif
  509. if (status != 0) {
  510. status=-60; goto on_error;
  511. }
  512. }
  513. }
  514. // Success
  515. status = 0;
  516. on_error:
  517. if (ckey1 != NULL)
  518. pj_ioqueue_unregister(ckey1);
  519. else if (csock1 != PJ_INVALID_SOCKET)
  520. pj_sock_close(csock1);
  521. if (ioque != NULL)
  522. pj_ioqueue_destroy(ioque);
  523. pj_pool_release(pool);
  524. return status;
  525. }
  526. /*
  527. * Repeated connect/accept on the same listener socket.
  528. */
  529. static int compliance_test_2(const pj_ioqueue_cfg *cfg)
  530. {
  531. #if defined(PJ_SYMBIAN) && PJ_SYMBIAN!=0
  532. enum { MAX_PAIR = 1, TEST_LOOP = 2 };
  533. #else
  534. enum { MAX_PAIR = 4, TEST_LOOP = 2 };
  535. #endif
  536. struct listener
  537. {
  538. pj_sock_t sock;
  539. pj_ioqueue_key_t *key;
  540. pj_sockaddr_in addr;
  541. int addr_len;
  542. } listener;
  543. struct server
  544. {
  545. pj_sock_t sock;
  546. pj_ioqueue_key_t *key;
  547. pj_sockaddr_in local_addr;
  548. pj_sockaddr_in rem_addr;
  549. int rem_addr_len;
  550. pj_ioqueue_op_key_t accept_op;
  551. } server[MAX_PAIR];
  552. struct client
  553. {
  554. pj_sock_t sock;
  555. pj_ioqueue_key_t *key;
  556. } client[MAX_PAIR];
  557. pj_pool_t *pool = NULL;
  558. char *send_buf, *recv_buf;
  559. pj_ioqueue_t *ioque = NULL;
  560. unsigned i, bufsize = BUF_MIN_SIZE;
  561. int status;
  562. int test_loop, pending_op = 0;
  563. pj_timestamp t_elapsed;
  564. pj_str_t s;
  565. pj_status_t rc;
  566. listener.sock = PJ_INVALID_SOCKET;
  567. listener.key = NULL;
  568. for (i=0; i<MAX_PAIR; ++i) {
  569. server[i].sock = PJ_INVALID_SOCKET;
  570. server[i].key = NULL;
  571. }
  572. for (i=0; i<MAX_PAIR; ++i) {
  573. client[i].sock = PJ_INVALID_SOCKET;
  574. client[i].key = NULL;
  575. }
  576. // Create pool.
  577. pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);
  578. // Create I/O Queue.
  579. rc = pj_ioqueue_create2(pool, PJ_IOQUEUE_MAX_HANDLES, cfg, &ioque);
  580. if (rc != PJ_SUCCESS) {
  581. app_perror("...ERROR in pj_ioqueue_create()", rc);
  582. return -10;
  583. }
  584. // Allocate buffers for send and receive.
  585. send_buf = (char*)pj_pool_alloc(pool, bufsize);
  586. recv_buf = (char*)pj_pool_alloc(pool, bufsize);
  587. // Create listener socket
  588. rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, &listener.sock);
  589. if (rc != PJ_SUCCESS) {
  590. app_perror("...error creating socket", rc);
  591. status=-20; goto on_error;
  592. }
  593. // Bind listener socket.
  594. pj_sockaddr_in_init(&listener.addr, 0, 0);
  595. if ((rc=pj_sock_bind(listener.sock, &listener.addr, sizeof(listener.addr))) != 0 ) {
  596. app_perror("...bind error", rc);
  597. status=-30; goto on_error;
  598. }
  599. // Get listener address.
  600. listener.addr_len = sizeof(listener.addr);
  601. rc = pj_sock_getsockname(listener.sock, &listener.addr, &listener.addr_len);
  602. if (rc != PJ_SUCCESS) {
  603. app_perror("...ERROR in pj_sock_getsockname()", rc);
  604. status=-40; goto on_error;
  605. }
  606. listener.addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1"));
  607. // Register listener socket.
  608. rc = pj_ioqueue_register_sock(pool, ioque, listener.sock, NULL, &test_cb,
  609. &listener.key);
  610. if (rc != PJ_SUCCESS) {
  611. app_perror("...ERROR", rc);
  612. status=-50; goto on_error;
  613. }
  614. // Listener socket listen().
  615. if (pj_sock_listen(listener.sock, 5)) {
  616. app_perror("...ERROR in pj_sock_listen()", rc);
  617. status=-60; goto on_error;
  618. }
  619. for (test_loop=0; test_loop < TEST_LOOP; ++test_loop) {
  620. // Client connect and server accept.
  621. for (i=0; i<MAX_PAIR; ++i) {
  622. rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, &client[i].sock);
  623. if (rc != PJ_SUCCESS) {
  624. app_perror("...error creating socket", rc);
  625. status=-70; goto on_error;
  626. }
  627. rc = pj_ioqueue_register_sock(pool, ioque, client[i].sock, NULL,
  628. &test_cb, &client[i].key);
  629. if (rc != PJ_SUCCESS) {
  630. app_perror("...error ", rc);
  631. status=-80; goto on_error;
  632. }
  633. // Server socket accept()
  634. pj_ioqueue_op_key_init(&server[i].accept_op,
  635. sizeof(server[i].accept_op));
  636. server[i].rem_addr_len = sizeof(pj_sockaddr_in);
  637. status = pj_ioqueue_accept(listener.key, &server[i].accept_op,
  638. &server[i].sock, &server[i].local_addr,
  639. &server[i].rem_addr,
  640. &server[i].rem_addr_len);
  641. if (status!=PJ_SUCCESS && status != PJ_EPENDING) {
  642. app_perror("...ERROR in pj_ioqueue_accept()", rc);
  643. status=-90; goto on_error;
  644. }
  645. if (status==PJ_EPENDING) {
  646. ++pending_op;
  647. }
  648. // Client socket connect()
  649. status = pj_ioqueue_connect(client[i].key, &listener.addr,
  650. sizeof(listener.addr));
  651. if (status!=PJ_SUCCESS && status != PJ_EPENDING) {
  652. app_perror("...ERROR in pj_ioqueue_connect()", rc);
  653. status=-100; goto on_error;
  654. }
  655. if (status==PJ_EPENDING) {
  656. ++pending_op;
  657. }
  658. // Poll until connection of this pair established
  659. while (pending_op) {
  660. pj_time_val timeout = {1, 0};
  661. #ifdef PJ_SYMBIAN
  662. status = pj_symbianos_poll(-1, PJ_TIME_VAL_MSEC(timeout));
  663. #else
  664. status = pj_ioqueue_poll(ioque, &timeout);
  665. #endif
  666. if (status > 0) {
  667. if (status > pending_op) {
  668. PJ_LOG(3,(THIS_FILE,
  669. "...error: pj_ioqueue_poll() returned %d "
  670. "(only expecting %d)",
  671. status, pending_op));
  672. return -110;
  673. }
  674. pending_op -= status;
  675. if (pending_op == 0) {
  676. status = 0;
  677. }
  678. }
  679. }
  680. }
  681. // There's no pending operation.
  682. // When we poll the ioqueue, there must not be events.
  683. if (pending_op == 0) {
  684. for (i=0; i<10; ++i) {
  685. pj_time_val timeout = {0, 50};
  686. #ifdef PJ_SYMBIAN
  687. status = pj_symbianos_poll(-1, PJ_TIME_VAL_MSEC(timeout));
  688. #else
  689. status = pj_ioqueue_poll(ioque, &timeout);
  690. #endif
  691. if (status != 0) {
  692. status=-120; goto on_error;
  693. }
  694. }
  695. }
  696. for (i=0; i<MAX_PAIR; ++i) {
  697. // Check server socket.
  698. if (server[i].sock == PJ_INVALID_SOCKET) {
  699. status = -130;
  700. app_perror("...accept() error", pj_get_os_error());
  701. goto on_error;
  702. }
  703. // Check addresses
  704. if (server[i].local_addr.sin_family != pj_AF_INET() ||
  705. server[i].local_addr.sin_addr.s_addr == 0 ||
  706. server[i].local_addr.sin_port == 0)
  707. {
  708. app_perror("...ERROR address not set", rc);
  709. status = -140;
  710. goto on_error;
  711. }
  712. if (server[i].rem_addr.sin_family != pj_AF_INET() ||
  713. server[i].rem_addr.sin_addr.s_addr == 0 ||
  714. server[i].rem_addr.sin_port == 0)
  715. {
  716. app_perror("...ERROR address not set", rc);
  717. status = -150;
  718. goto on_error;
  719. }
  720. // Register newly accepted socket.
  721. rc = pj_ioqueue_register_sock(pool, ioque, server[i].sock, NULL,
  722. &test_cb, &server[i].key);
  723. if (rc != PJ_SUCCESS) {
  724. app_perror("...ERROR in pj_ioqueue_register_sock", rc);
  725. status = -160;
  726. goto on_error;
  727. }
  728. // Test send and receive.
  729. t_elapsed.u32.lo = 0;
  730. status = send_recv_test(ioque, server[i].key, client[i].key,
  731. send_buf, recv_buf, bufsize, &t_elapsed);
  732. if (status != 0) {
  733. goto on_error;
  734. }
  735. }
  736. // Success
  737. status = 0;
  738. for (i=0; i<MAX_PAIR; ++i) {
  739. if (server[i].key != NULL) {
  740. pj_ioqueue_unregister(server[i].key);
  741. server[i].key = NULL;
  742. server[i].sock = PJ_INVALID_SOCKET;
  743. } else if (server[i].sock != PJ_INVALID_SOCKET) {
  744. pj_sock_close(server[i].sock);
  745. server[i].sock = PJ_INVALID_SOCKET;
  746. }
  747. if (client[i].key != NULL) {
  748. pj_ioqueue_unregister(client[i].key);
  749. client[i].key = NULL;
  750. client[i].sock = PJ_INVALID_SOCKET;
  751. } else if (client[i].sock != PJ_INVALID_SOCKET) {
  752. pj_sock_close(client[i].sock);
  753. client[i].sock = PJ_INVALID_SOCKET;
  754. }
  755. }
  756. }
  757. status = 0;
  758. on_error:
  759. for (i=0; i<MAX_PAIR; ++i) {
  760. if (server[i].key != NULL) {
  761. pj_ioqueue_unregister(server[i].key);
  762. server[i].key = NULL;
  763. server[i].sock = PJ_INVALID_SOCKET;
  764. } else if (server[i].sock != PJ_INVALID_SOCKET) {
  765. pj_sock_close(server[i].sock);
  766. server[i].sock = PJ_INVALID_SOCKET;
  767. }
  768. if (client[i].key != NULL) {
  769. pj_ioqueue_unregister(client[i].key);
  770. client[i].key = NULL;
  771. client[i].sock = PJ_INVALID_SOCKET;
  772. } else if (client[i].sock != PJ_INVALID_SOCKET) {
  773. pj_sock_close(client[i].sock);
  774. client[i].sock = PJ_INVALID_SOCKET;
  775. }
  776. }
  777. if (listener.key) {
  778. pj_ioqueue_unregister(listener.key);
  779. listener.key = NULL;
  780. } else if (listener.sock != PJ_INVALID_SOCKET) {
  781. pj_sock_close(listener.sock);
  782. listener.sock = PJ_INVALID_SOCKET;
  783. }
  784. if (ioque != NULL)
  785. pj_ioqueue_destroy(ioque);
  786. pj_pool_release(pool);
  787. return status;
  788. }
  789. static int tcp_ioqueue_test_impl(const pj_ioqueue_cfg *cfg)
  790. {
  791. int status;
  792. char title[64];
  793. pj_ansi_snprintf(title, sizeof(title), "%s (concur:%d, epoll_flags:0x%x)",
  794. pj_ioqueue_name(), cfg->default_concurrency,
  795. cfg->epoll_flags);
  796. PJ_LOG(3, (THIS_FILE, "..%s compliance test 0 (success scenario)",
  797. title));
  798. if ((status=compliance_test_0(cfg)) != 0) {
  799. PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status));
  800. return status;
  801. }
  802. PJ_LOG(3, (THIS_FILE, "..%s compliance test 1 (failed scenario)",
  803. title));
  804. if ((status=compliance_test_1(cfg)) != 0) {
  805. PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status));
  806. return status;
  807. }
  808. PJ_LOG(3, (THIS_FILE, "..%s compliance test 2 (repeated accept)",
  809. title));
  810. if ((status=compliance_test_2(cfg)) != 0) {
  811. PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status));
  812. return status;
  813. }
  814. return 0;
  815. }
  816. int tcp_ioqueue_test()
  817. {
  818. pj_ioqueue_epoll_flag epoll_flags[] = {
  819. PJ_IOQUEUE_EPOLL_AUTO,
  820. #if PJ_HAS_LINUX_EPOLL
  821. PJ_IOQUEUE_EPOLL_EXCLUSIVE,
  822. PJ_IOQUEUE_EPOLL_ONESHOT,
  823. 0
  824. #endif
  825. };
  826. pj_bool_t concurs[] = { PJ_TRUE, PJ_FALSE };
  827. int i, rc;
  828. for (i=0; i<(int)PJ_ARRAY_SIZE(epoll_flags); ++i) {
  829. pj_ioqueue_cfg cfg;
  830. pj_ioqueue_cfg_default(&cfg);
  831. cfg.epoll_flags = epoll_flags[i];
  832. PJ_LOG(3, (THIS_FILE, "..%s TCP compliance test, epoll_flags=0x%x",
  833. pj_ioqueue_name(), cfg.epoll_flags));
  834. rc = tcp_ioqueue_test_impl(&cfg);
  835. if (rc != 0)
  836. return rc;
  837. }
  838. for (i=0; i<(int)PJ_ARRAY_SIZE(concurs); ++i) {
  839. pj_ioqueue_cfg cfg;
  840. pj_ioqueue_cfg_default(&cfg);
  841. cfg.default_concurrency = concurs[i];
  842. PJ_LOG(3, (THIS_FILE, "..%s TCP compliance test, concurrency=%d",
  843. pj_ioqueue_name(), cfg.default_concurrency));
  844. rc = tcp_ioqueue_test_impl(&cfg);
  845. if (rc != 0)
  846. return rc;
  847. }
  848. return 0;
  849. }
  850. #endif /* PJ_HAS_TCP */
  851. #else
  852. /* To prevent warning about "translation unit is empty"
  853. * when this test is disabled.
  854. */
  855. int dummy_uiq_tcp;
  856. #endif /* INCLUDE_TCP_IOQUEUE_TEST */