echo_clt.c 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. /*
  2. * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
  3. * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation; either version 2 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program; if not, write to the Free Software
  17. * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  18. */
  19. #include "test.h"
  20. #include <pjlib.h>
  21. #if INCLUDE_ECHO_CLIENT
  22. enum { BUF_SIZE = 512 };
  23. struct client
  24. {
  25. int sock_type;
  26. const char *server;
  27. int port;
  28. };
  29. static pj_atomic_t *totalBytes;
  30. static pj_atomic_t *timeout_counter;
  31. static pj_atomic_t *invalid_counter;
  32. #define MSEC_PRINT_DURATION 1000
  33. static int wait_socket(pj_sock_t sock, unsigned msec_timeout)
  34. {
  35. pj_fd_set_t fdset;
  36. pj_time_val timeout;
  37. timeout.sec = 0;
  38. timeout.msec = msec_timeout;
  39. pj_time_val_normalize(&timeout);
  40. PJ_FD_ZERO(&fdset);
  41. PJ_FD_SET(sock, &fdset);
  42. return pj_sock_select(FD_SETSIZE, &fdset, NULL, NULL, &timeout);
  43. }
  44. static int echo_client_thread(void *arg)
  45. {
  46. pj_sock_t sock;
  47. char send_buf[BUF_SIZE];
  48. char recv_buf[BUF_SIZE];
  49. char addr[PJ_INET_ADDRSTRLEN];
  50. pj_sockaddr_in addr;
  51. pj_str_t s;
  52. pj_status_t rc;
  53. pj_uint32_t buffer_id;
  54. pj_uint32_t buffer_counter;
  55. struct client *client = arg;
  56. pj_status_t last_recv_err = PJ_SUCCESS, last_send_err = PJ_SUCCESS;
  57. unsigned counter = 0;
  58. rc = app_socket(pj_AF_INET(), client->sock_type, 0, -1, &sock);
  59. if (rc != PJ_SUCCESS) {
  60. app_perror("...unable to create socket", rc);
  61. return -10;
  62. }
  63. rc = pj_sockaddr_in_init( &addr, pj_cstr(&s, client->server),
  64. (pj_uint16_t)client->port);
  65. if (rc != PJ_SUCCESS) {
  66. app_perror("...unable to resolve server", rc);
  67. return -15;
  68. }
  69. rc = pj_sock_connect(sock, &addr, sizeof(addr));
  70. if (rc != PJ_SUCCESS) {
  71. app_perror("...connect() error", rc);
  72. pj_sock_close(sock);
  73. return -20;
  74. }
  75. PJ_LOG(3,("", "...socket connected to %s:%d",
  76. pj_inet_ntop2(pj_AF_INET(), &addr.sin_addr,
  77. addr, sizeof(addr)),
  78. pj_ntohs(addr.sin_port)));
  79. pj_memset(send_buf, 'A', BUF_SIZE);
  80. send_buf[BUF_SIZE-1]='\0';
  81. /* Give other thread chance to initialize themselves! */
  82. pj_thread_sleep(200);
  83. //PJ_LOG(3,("", "...thread %p running", pj_thread_this()));
  84. buffer_id = (pj_uint32_t) pj_thread_this();
  85. buffer_counter = 0;
  86. *(pj_uint32_t*)send_buf = buffer_id;
  87. for (;;) {
  88. int rc;
  89. pj_ssize_t bytes;
  90. ++counter;
  91. //while (wait_socket(sock,0) > 0)
  92. // ;
  93. /* Send a packet. */
  94. bytes = BUF_SIZE;
  95. *(pj_uint32_t*)(send_buf+4) = ++buffer_counter;
  96. rc = pj_sock_send(sock, send_buf, &bytes, 0);
  97. if (rc != PJ_SUCCESS || bytes != BUF_SIZE) {
  98. if (rc != last_send_err) {
  99. app_perror("...send() error", rc);
  100. PJ_LOG(3,("", "...ignoring subsequent error.."));
  101. last_send_err = rc;
  102. pj_thread_sleep(100);
  103. }
  104. continue;
  105. }
  106. rc = wait_socket(sock, 500);
  107. if (rc == 0) {
  108. PJ_LOG(3,("", "...timeout"));
  109. bytes = 0;
  110. pj_atomic_inc(timeout_counter);
  111. } else if (rc < 0) {
  112. rc = pj_get_netos_error();
  113. app_perror("...select() error", rc);
  114. break;
  115. } else {
  116. /* Receive back the original packet. */
  117. bytes = 0;
  118. do {
  119. pj_ssize_t received = BUF_SIZE - bytes;
  120. rc = pj_sock_recv(sock, recv_buf+bytes, &received, 0);
  121. if (rc != PJ_SUCCESS || received == 0) {
  122. if (rc != last_recv_err) {
  123. app_perror("...recv() error", rc);
  124. PJ_LOG(3,("", "...ignoring subsequent error.."));
  125. last_recv_err = rc;
  126. pj_thread_sleep(100);
  127. }
  128. bytes = 0;
  129. received = 0;
  130. break;
  131. }
  132. bytes += received;
  133. } while (bytes != BUF_SIZE && bytes != 0);
  134. }
  135. if (bytes == 0)
  136. continue;
  137. if (pj_memcmp(send_buf, recv_buf, BUF_SIZE) != 0) {
  138. recv_buf[BUF_SIZE-1] = '\0';
  139. PJ_LOG(3,("", "...error: buffer %u has changed!\n"
  140. "send_buf=%s\n"
  141. "recv_buf=%s\n",
  142. counter, send_buf, recv_buf));
  143. pj_atomic_inc(invalid_counter);
  144. }
  145. /* Accumulate total received. */
  146. pj_atomic_add(totalBytes, bytes);
  147. }
  148. pj_sock_close(sock);
  149. return 0;
  150. }
  151. int echo_client(int sock_type, const char *server, int port)
  152. {
  153. pj_pool_t *pool;
  154. pj_thread_t *thread[ECHO_CLIENT_MAX_THREADS];
  155. pj_status_t rc;
  156. struct client client;
  157. int i;
  158. pj_atomic_value_t last_received;
  159. pj_timestamp last_report;
  160. client.sock_type = sock_type;
  161. client.server = server;
  162. client.port = port;
  163. pool = pj_pool_create( mem, NULL, 4000, 4000, NULL );
  164. rc = pj_atomic_create(pool, 0, &totalBytes);
  165. if (rc != PJ_SUCCESS) {
  166. PJ_LOG(3,("", "...error: unable to create atomic variable", rc));
  167. return -30;
  168. }
  169. rc = pj_atomic_create(pool, 0, &invalid_counter);
  170. rc = pj_atomic_create(pool, 0, &timeout_counter);
  171. PJ_LOG(3,("", "Echo client started"));
  172. PJ_LOG(3,("", " Destination: %s:%d",
  173. ECHO_SERVER_ADDRESS, ECHO_SERVER_START_PORT));
  174. PJ_LOG(3,("", " Press Ctrl-C to exit"));
  175. for (i=0; i<ECHO_CLIENT_MAX_THREADS; ++i) {
  176. rc = pj_thread_create( pool, NULL, &echo_client_thread, &client,
  177. PJ_THREAD_DEFAULT_STACK_SIZE, 0,
  178. &thread[i]);
  179. if (rc != PJ_SUCCESS) {
  180. app_perror("...error: unable to create thread", rc);
  181. return -10;
  182. }
  183. }
  184. last_received = 0;
  185. pj_get_timestamp(&last_report);
  186. for (;;) {
  187. pj_timestamp now;
  188. unsigned long received, cur_received;
  189. unsigned msec;
  190. pj_highprec_t bw;
  191. pj_time_val elapsed;
  192. unsigned bw32;
  193. pj_uint32_t timeout, invalid;
  194. pj_thread_sleep(1000);
  195. pj_get_timestamp(&now);
  196. elapsed = pj_elapsed_time(&last_report, &now);
  197. msec = PJ_TIME_VAL_MSEC(elapsed);
  198. received = pj_atomic_get(totalBytes);
  199. cur_received = received - last_received;
  200. bw = cur_received;
  201. pj_highprec_mul(bw, 1000);
  202. pj_highprec_div(bw, msec);
  203. bw32 = (unsigned)bw;
  204. last_report = now;
  205. last_received = received;
  206. timeout = pj_atomic_get(timeout_counter);
  207. invalid = pj_atomic_get(invalid_counter);
  208. PJ_LOG(3,("",
  209. "...%d threads, total bandwidth: %d KB/s, "
  210. "timeout=%d, invalid=%d",
  211. ECHO_CLIENT_MAX_THREADS, bw32/1000,
  212. timeout, invalid));
  213. }
  214. for (i=0; i<ECHO_CLIENT_MAX_THREADS; ++i) {
  215. pj_thread_join( thread[i] );
  216. }
  217. pj_pool_release(pool);
  218. return 0;
  219. }
  220. #else
  221. int dummy_echo_client;
  222. #endif /* INCLUDE_ECHO_CLIENT */