concur_test.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. /*
  2. * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
  3. * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation; either version 2 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program; if not, write to the Free Software
  17. * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  18. */
  19. #include "test.h"
  20. #if INCLUDE_CONCUR_TEST
  21. #define THIS_FILE "concur_test.c"
  22. /****************************************************************************/
  23. #define WORKER_THREAD_CNT 4
  24. #define SERVER_THREAD_CNT 4
  25. #define MAX_SOCK_CLIENTS (PJ_IOQUEUE_MAX_HANDLES/2)
  26. struct stun_test_session
  27. {
  28. pj_stun_config stun_cfg;
  29. pj_lock_t *lock;
  30. pj_thread_t *worker_threads[WORKER_THREAD_CNT];
  31. pj_sock_t server_sock;
  32. int server_port;
  33. pj_thread_t *server_threads[SERVER_THREAD_CNT];
  34. pj_event_t *server_event;
  35. pj_bool_t thread_quit_flag;
  36. /* Test parameters: */
  37. struct {
  38. int client_got_response;
  39. pj_bool_t server_wait_for_event;
  40. pj_bool_t server_drop_request;
  41. int client_sleep_after_start;
  42. int client_sleep_before_destroy;
  43. } param;
  44. };
  45. static int server_thread_proc(void *p)
  46. {
  47. struct stun_test_session *test_sess = (struct stun_test_session*)p;
  48. pj_pool_t *pool;
  49. pj_status_t status;
  50. PJ_LOG(4,(THIS_FILE, "Server thread running"));
  51. pool = pj_pool_create(test_sess->stun_cfg.pf, "server", 512, 512, NULL);
  52. while (!test_sess->thread_quit_flag) {
  53. pj_time_val timeout = {0, 10};
  54. pj_fd_set_t rdset;
  55. int n;
  56. /* Serve client */
  57. PJ_FD_ZERO(&rdset);
  58. PJ_FD_SET(test_sess->server_sock, &rdset);
  59. n = pj_sock_select((int)test_sess->server_sock+1, &rdset,
  60. NULL, NULL, &timeout);
  61. if (n==1 && PJ_FD_ISSET(test_sess->server_sock, &rdset)) {
  62. pj_uint8_t pkt[512];
  63. pj_ssize_t pkt_len;
  64. pj_size_t res_len;
  65. pj_sockaddr client_addr;
  66. int addr_len;
  67. pj_stun_msg *stun_req, *stun_res;
  68. pj_pool_reset(pool);
  69. /* Got query */
  70. pkt_len = sizeof(pkt);
  71. addr_len = sizeof(client_addr);
  72. status = pj_sock_recvfrom(test_sess->server_sock, pkt, &pkt_len,
  73. 0, &client_addr, &addr_len);
  74. if (status != PJ_SUCCESS) {
  75. continue;
  76. }
  77. status = pj_stun_msg_decode(pool, pkt, pkt_len,
  78. PJ_STUN_IS_DATAGRAM,
  79. &stun_req, NULL, NULL);
  80. if (status != PJ_SUCCESS) {
  81. PJ_PERROR(1,(THIS_FILE, status, "STUN request decode error"));
  82. continue;
  83. }
  84. status = pj_stun_msg_create_response(pool, stun_req,
  85. PJ_STUN_SC_BAD_REQUEST, NULL,
  86. &stun_res);
  87. if (status != PJ_SUCCESS) {
  88. PJ_PERROR(1,(THIS_FILE, status, "STUN create response error"));
  89. continue;
  90. }
  91. status = pj_stun_msg_encode(stun_res, pkt, sizeof(pkt), 0,
  92. NULL, &res_len);
  93. if (status != PJ_SUCCESS) {
  94. PJ_PERROR(1,(THIS_FILE, status, "STUN encode error"));
  95. continue;
  96. }
  97. /* Ignore request */
  98. if (test_sess->param.server_drop_request)
  99. continue;
  100. /* Wait for signal to continue */
  101. if (test_sess->param.server_wait_for_event)
  102. pj_event_wait(test_sess->server_event);
  103. pkt_len = res_len;
  104. pj_sock_sendto(test_sess->server_sock, pkt, &pkt_len, 0,
  105. &client_addr, pj_sockaddr_get_len(&client_addr));
  106. }
  107. }
  108. pj_pool_release(pool);
  109. PJ_LOG(4,(THIS_FILE, "Server thread quitting"));
  110. return 0;
  111. }
  112. static int worker_thread_proc(void *p)
  113. {
  114. struct stun_test_session *test_sess = (struct stun_test_session*)p;
  115. PJ_LOG(4,(THIS_FILE, "Worker thread running"));
  116. while (!test_sess->thread_quit_flag) {
  117. pj_time_val timeout = {0, 10};
  118. pj_timer_heap_poll(test_sess->stun_cfg.timer_heap, NULL);
  119. pj_ioqueue_poll(test_sess->stun_cfg.ioqueue, &timeout);
  120. }
  121. PJ_LOG(4,(THIS_FILE, "Worker thread quitting"));
  122. return 0;
  123. }
  124. static pj_bool_t stun_sock_on_status(pj_stun_sock *stun_sock,
  125. pj_stun_sock_op op,
  126. pj_status_t status)
  127. {
  128. struct stun_test_session *test_sess = (struct stun_test_session*)pj_stun_sock_get_user_data(stun_sock);
  129. PJ_UNUSED_ARG(op);
  130. PJ_UNUSED_ARG(status);
  131. test_sess->param.client_got_response++;
  132. return PJ_TRUE;
  133. }
  134. static int stun_destroy_test_session(struct stun_test_session *test_sess)
  135. {
  136. unsigned i;
  137. pj_stun_sock_cb stun_cb;
  138. pj_status_t status;
  139. pj_stun_sock *stun_sock[MAX_SOCK_CLIENTS];
  140. pj_bzero(&stun_cb, sizeof(stun_cb));
  141. stun_cb.on_status = &stun_sock_on_status;
  142. pj_event_reset(test_sess->server_event);
  143. /* Create all clients first */
  144. for (i=0; i<MAX_SOCK_CLIENTS; ++i) {
  145. char name[10];
  146. pj_ansi_snprintf(name, sizeof(name), "stun%02d", i);
  147. status = pj_stun_sock_create(&test_sess->stun_cfg, name, pj_AF_INET(),
  148. &stun_cb, NULL, test_sess,
  149. &stun_sock[i]);
  150. if (status != PJ_SUCCESS) {
  151. PJ_PERROR(1,(THIS_FILE, status, "Error creating stun socket"));
  152. return -10;
  153. }
  154. }
  155. /* Start resolution */
  156. for (i=0; i<MAX_SOCK_CLIENTS; ++i) {
  157. pj_str_t server_ip = pj_str("127.0.0.1");
  158. status = pj_stun_sock_start(stun_sock[i], &server_ip,
  159. (pj_uint16_t)test_sess->server_port, NULL);
  160. if (status != PJ_SUCCESS) {
  161. PJ_PERROR(1,(THIS_FILE, status, "Error starting stun socket"));
  162. return -20;
  163. }
  164. }
  165. /* settle down */
  166. pj_thread_sleep(test_sess->param.client_sleep_after_start);
  167. /* Resume server threads */
  168. pj_event_set(test_sess->server_event);
  169. pj_thread_sleep(test_sess->param.client_sleep_before_destroy);
  170. /* Destroy clients */
  171. for (i=0; i<MAX_SOCK_CLIENTS; ++i) {
  172. status = pj_stun_sock_destroy(stun_sock[i]);
  173. if (status != PJ_SUCCESS) {
  174. PJ_PERROR(1,(THIS_FILE, status, "Error destroying stun socket"));
  175. }
  176. }
  177. /* Give some time to ioqueue to free sockets */
  178. pj_thread_sleep(PJ_IOQUEUE_KEY_FREE_DELAY);
  179. return 0;
  180. }
  181. static int stun_destroy_test(void)
  182. {
  183. enum { LOOP = 500 };
  184. struct stun_test_session test_sess;
  185. pj_sockaddr bind_addr;
  186. int addr_len;
  187. pj_caching_pool cp;
  188. pj_pool_t *pool;
  189. unsigned i;
  190. pj_status_t status;
  191. int rc = 0;
  192. PJ_LOG(3,(THIS_FILE, " STUN destroy concurrency test"));
  193. pj_bzero(&test_sess, sizeof(test_sess));
  194. pj_caching_pool_init(&cp, NULL, 0);
  195. pool = pj_pool_create(&cp.factory, "testsess", 512, 512, NULL);
  196. pj_stun_config_init(&test_sess.stun_cfg, &cp.factory, 0, NULL, NULL);
  197. status = pj_timer_heap_create(pool, 1023, &test_sess.stun_cfg.timer_heap);
  198. pj_assert(status == PJ_SUCCESS);
  199. status = pj_lock_create_recursive_mutex(pool, NULL, &test_sess.lock);
  200. pj_assert(status == PJ_SUCCESS);
  201. pj_timer_heap_set_lock(test_sess.stun_cfg.timer_heap, test_sess.lock, PJ_TRUE);
  202. pj_assert(status == PJ_SUCCESS);
  203. status = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &test_sess.stun_cfg.ioqueue);
  204. pj_assert(status == PJ_SUCCESS);
  205. pj_sock_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, &test_sess.server_sock);
  206. pj_sockaddr_init(pj_AF_INET(), &bind_addr, NULL, 0);
  207. status = pj_sock_bind(test_sess.server_sock, &bind_addr, pj_sockaddr_get_len(&bind_addr));
  208. pj_assert(status == PJ_SUCCESS);
  209. /* Set socket to nonblocking to avoid stuck in recv/recvfrom() on concurrent events */
  210. app_set_sock_nb(test_sess.server_sock);
  211. addr_len = sizeof(bind_addr);
  212. status = pj_sock_getsockname(test_sess.server_sock, &bind_addr, &addr_len);
  213. pj_assert(status == PJ_SUCCESS);
  214. test_sess.server_port = pj_sockaddr_get_port(&bind_addr);
  215. status = pj_event_create(pool, NULL, PJ_TRUE, PJ_FALSE, &test_sess.server_event);
  216. pj_assert(status == PJ_SUCCESS);
  217. for (i=0; i<SERVER_THREAD_CNT; ++i) {
  218. status = pj_thread_create(pool, NULL,
  219. &server_thread_proc, &test_sess,
  220. 0, 0, &test_sess.server_threads[i]);
  221. pj_assert(status == PJ_SUCCESS);
  222. }
  223. for (i=0; i<WORKER_THREAD_CNT; ++i) {
  224. status = pj_thread_create(pool, NULL,
  225. &worker_thread_proc, &test_sess,
  226. 0, 0, &test_sess.worker_threads[i]);
  227. pj_assert(status == PJ_SUCCESS);
  228. }
  229. PJ_UNUSED_ARG(status);
  230. /* Test 1: Main thread calls destroy while callback is processing response */
  231. PJ_LOG(3,(THIS_FILE, " Destroy in main thread while callback is running"));
  232. for (i=0; i<LOOP; ++i) {
  233. int sleep = pj_rand() % 5;
  234. PJ_LOG(3,(THIS_FILE, " Try %-3d of %d", i+1, LOOP));
  235. /* Test 1: destroy at the same time when receiving response */
  236. pj_bzero(&test_sess.param, sizeof(test_sess.param));
  237. test_sess.param.client_sleep_after_start = 20;
  238. test_sess.param.client_sleep_before_destroy = sleep;
  239. test_sess.param.server_wait_for_event = PJ_TRUE;
  240. stun_destroy_test_session(&test_sess);
  241. PJ_LOG(3,(THIS_FILE,
  242. " stun test a: sleep delay:%d: clients with response: %d",
  243. sleep, test_sess.param.client_got_response));
  244. /* Test 2: destroy at the same time with STUN retransmit timer */
  245. test_sess.param.server_drop_request = PJ_TRUE;
  246. test_sess.param.client_sleep_after_start = 0;
  247. test_sess.param.client_sleep_before_destroy = PJ_STUN_RTO_VALUE;
  248. test_sess.param.server_wait_for_event = PJ_FALSE;
  249. stun_destroy_test_session(&test_sess);
  250. PJ_LOG(3,(THIS_FILE, " stun test b: retransmit concurrency"));
  251. /* Test 3: destroy at the same time with receiving response
  252. * AND STUN retransmit timer */
  253. test_sess.param.client_got_response = 0;
  254. test_sess.param.server_drop_request = PJ_FALSE;
  255. test_sess.param.client_sleep_after_start = PJ_STUN_RTO_VALUE;
  256. test_sess.param.client_sleep_before_destroy = 0;
  257. test_sess.param.server_wait_for_event = PJ_TRUE;
  258. stun_destroy_test_session(&test_sess);
  259. PJ_LOG(3,(THIS_FILE,
  260. " stun test c: clients with response: %d",
  261. test_sess.param.client_got_response));
  262. pj_thread_sleep(10);
  263. ice_one_conc_test(&test_sess.stun_cfg, PJ_FALSE);
  264. pj_thread_sleep(10);
  265. }
  266. /* Avoid compiler warning */
  267. goto on_return;
  268. on_return:
  269. test_sess.thread_quit_flag = PJ_TRUE;
  270. for (i=0; i<SERVER_THREAD_CNT; ++i) {
  271. pj_thread_join(test_sess.server_threads[i]);
  272. }
  273. for (i=0; i<WORKER_THREAD_CNT; ++i) {
  274. pj_thread_join(test_sess.worker_threads[i]);
  275. }
  276. pj_event_destroy(test_sess.server_event);
  277. pj_sock_close(test_sess.server_sock);
  278. pj_ioqueue_destroy(test_sess.stun_cfg.ioqueue);
  279. pj_timer_heap_destroy(test_sess.stun_cfg.timer_heap);
  280. pj_pool_release(pool);
  281. pj_caching_pool_destroy(&cp);
  282. PJ_LOG(3,(THIS_FILE, " Done. rc=%d", rc));
  283. return rc;
  284. }
  285. int concur_test(void)
  286. {
  287. stun_destroy_test();
  288. return 0;
  289. }
  290. #endif /* INCLUDE_CONCUR_TEST */