udp_echo_srv_sync.c 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. /*
  2. * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
  3. * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation; either version 2 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program; if not, write to the Free Software
  17. * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  18. */
  19. #include "test.h"
  20. #include <pjlib.h>
  21. static pj_atomic_t *total_bytes;
  22. static pj_bool_t thread_quit_flag = 0;
  23. static int worker_thread(void *arg)
  24. {
  25. pj_sock_t sock = (pj_sock_t)arg;
  26. char buf[512];
  27. pj_status_t last_recv_err = PJ_SUCCESS, last_write_err = PJ_SUCCESS;
  28. while (!thread_quit_flag) {
  29. pj_ssize_t len;
  30. pj_status_t rc;
  31. pj_sockaddr_in addr;
  32. int addrlen;
  33. len = sizeof(buf);
  34. addrlen = sizeof(addr);
  35. rc = pj_sock_recvfrom(sock, buf, &len, 0, &addr, &addrlen);
  36. if (rc != 0) {
  37. if (rc != last_recv_err) {
  38. app_perror("...recv error", rc);
  39. last_recv_err = rc;
  40. }
  41. continue;
  42. }
  43. pj_atomic_add(total_bytes, (pj_atomic_value_t)len);
  44. rc = pj_sock_sendto(sock, buf, &len, 0, &addr, addrlen);
  45. if (rc != PJ_SUCCESS) {
  46. if (rc != last_write_err) {
  47. app_perror("...send error", rc);
  48. last_write_err = rc;
  49. }
  50. continue;
  51. }
  52. }
  53. return 0;
  54. }
  55. int echo_srv_sync(void)
  56. {
  57. pj_pool_t *pool;
  58. pj_sock_t sock;
  59. pj_thread_t *thread[ECHO_SERVER_MAX_THREADS];
  60. pj_status_t rc;
  61. int i;
  62. pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
  63. if (!pool)
  64. return -5;
  65. rc = pj_atomic_create(pool, 0, &total_bytes);
  66. if (rc != PJ_SUCCESS) {
  67. app_perror("...unable to create atomic_var", rc);
  68. return -6;
  69. }
  70. rc = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(),0, ECHO_SERVER_START_PORT, &sock);
  71. if (rc != PJ_SUCCESS) {
  72. app_perror("...socket error", rc);
  73. return -10;
  74. }
  75. for (i=0; i<ECHO_SERVER_MAX_THREADS; ++i) {
  76. rc = pj_thread_create(pool, NULL, &worker_thread, (void*)sock,
  77. PJ_THREAD_DEFAULT_STACK_SIZE, 0,
  78. &thread[i]);
  79. if (rc != PJ_SUCCESS) {
  80. app_perror("...unable to create thread", rc);
  81. return -20;
  82. }
  83. }
  84. PJ_LOG(3,("", "...UDP echo server running with %d threads at port %d",
  85. ECHO_SERVER_MAX_THREADS, ECHO_SERVER_START_PORT));
  86. PJ_LOG(3,("", "...Press Ctrl-C to abort"));
  87. echo_srv_common_loop(total_bytes);
  88. return 0;
  89. }
  90. int echo_srv_common_loop(pj_atomic_t *bytes_counter)
  91. {
  92. pj_highprec_t last_received, avg_bw, highest_bw;
  93. pj_time_val last_print;
  94. unsigned count;
  95. const char *ioqueue_name;
  96. last_received = 0;
  97. pj_gettimeofday(&last_print);
  98. avg_bw = highest_bw = 0;
  99. count = 0;
  100. ioqueue_name = pj_ioqueue_name();
  101. for (;;) {
  102. pj_highprec_t received, cur_received, bw;
  103. unsigned msec;
  104. pj_time_val now, duration;
  105. pj_thread_sleep(1000);
  106. received = cur_received = pj_atomic_get(bytes_counter);
  107. cur_received = cur_received - last_received;
  108. pj_gettimeofday(&now);
  109. duration = now;
  110. PJ_TIME_VAL_SUB(duration, last_print);
  111. msec = PJ_TIME_VAL_MSEC(duration);
  112. bw = cur_received;
  113. pj_highprec_mul(bw, 1000);
  114. pj_highprec_div(bw, msec);
  115. last_print = now;
  116. last_received = received;
  117. avg_bw = avg_bw + bw;
  118. count++;
  119. PJ_LOG(3,("", "%s UDP (%d threads): %u KB/s (avg=%u KB/s) %s",
  120. ioqueue_name,
  121. ECHO_SERVER_MAX_THREADS,
  122. (unsigned)(bw / 1000),
  123. (unsigned)(avg_bw / count / 1000),
  124. (count==20 ? "<ses avg>" : "")));
  125. if (count==20) {
  126. if (avg_bw/count > highest_bw)
  127. highest_bw = avg_bw/count;
  128. count = 0;
  129. avg_bw = 0;
  130. PJ_LOG(3,("", "Highest average bandwidth=%u KB/s",
  131. (unsigned)(highest_bw/1000)));
  132. }
  133. }
  134. PJ_UNREACHED(return 0;)
  135. }