123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- import ctypes
- import time
- import sys
- import os
- import re
- import subprocess
- from inc_cfg import *
- import inc_const
- import inc_util as util
- G_INUNIX = False
- if sys.platform.lower().find("win32")!=-1 or sys.platform.lower().find("microsoft")!=-1:
- G_INUNIX = False
- else:
- G_INUNIX = True
- FDEVNULL = None
- SIPP_PATH = 'sipp'
- SIPP_PORT = 50070
- SIPP_PARAM = "-m 1 -i 127.0.0.1 -p " + str(SIPP_PORT)
- SIPP_TIMEOUT = 60
- SIPP_BG_MODE = False
- PJSUA_INST_PARAM = []
- PJSUA_EXPECTS = []
- PJSUA_DEF_PARAM = "--null-audio --max-calls=1 --no-tcp --id=sip:a@localhost --username=a --realm=*"
- SIPP_SCEN_XML = ""
- if ARGS[1].endswith('.xml'):
- SIPP_SCEN_XML = ARGS[1]
- else:
- exit(-99)
- def resolve_pjsua_port(mo):
- return str(PJSUA_INST_PARAM[int(mo.group(1))].sip_port)
- def resolve_pjsua_uri(mo):
- return PJSUA_INST_PARAM[int(mo.group(1))].uri[1:-1]
- def resolve_driver_macros(st):
- st = re.sub("\$SIPP_PORT", str(SIPP_PORT), st)
- st = re.sub("\$SIPP_URI", "sip:sipp@127.0.0.1:"+str(SIPP_PORT), st)
- st = re.sub("\$PJSUA_PORT\[(\d+)\]", resolve_pjsua_port, st)
- st = re.sub("\$PJSUA_URI\[(\d+)\]", resolve_pjsua_uri, st)
- return st
- if os.access(SIPP_SCEN_XML[:-4]+".py", os.R_OK):
-
- cfg_file = util.load_module_from_file("cfg_file", SIPP_SCEN_XML[:-4]+".py")
- for ua_idx, ua_param in enumerate(cfg_file.PJSUA):
- ua_param = resolve_driver_macros(ua_param)
- PJSUA_INST_PARAM.append(InstanceParam("pjsua"+str(ua_idx), ua_param))
- if DEFAULT_TELNET and hasattr(cfg_file, 'PJSUA_CLI_EXPECTS'):
- PJSUA_EXPECTS = cfg_file.PJSUA_CLI_EXPECTS
- else:
- PJSUA_EXPECTS = cfg_file.PJSUA_EXPECTS
- else:
-
- if os.path.basename(SIPP_SCEN_XML)[0:3] == "uas":
-
- ua_param = PJSUA_DEF_PARAM + " sip:127.0.0.1:" + str(SIPP_PORT)
- else:
-
- ua_param = PJSUA_DEF_PARAM + " --auto-answer=200"
- PJSUA_INST_PARAM.append(InstanceParam("pjsua", ua_param))
- def start_sipp():
- global SIPP_BG_MODE
- sipp_proc = None
- sipp_param = SIPP_PARAM + " -sf " + SIPP_SCEN_XML
- if SIPP_BG_MODE:
- sipp_param = sipp_param + " -bg"
- if SIPP_TIMEOUT:
- sipp_param = sipp_param + " -timeout "+str(SIPP_TIMEOUT)+"s -timeout_error" + " -deadcall_wait "+str(SIPP_TIMEOUT)+"s"
-
- sipp_param = sipp_param + " 127.0.0.1:" + str(PJSUA_INST_PARAM[0].sip_port)
-
- fullcmd = os.path.normpath(SIPP_PATH) + " " + sipp_param
- print("Running SIPP: " + fullcmd)
- if SIPP_BG_MODE:
- sipp_proc = subprocess.Popen(fullcmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=G_INUNIX, universal_newlines=False)
- else:
-
- global FDEVNULL
-
- FDEVNULL = open("logs/sipp_output.tmp", 'w')
- sipp_proc = subprocess.Popen(fullcmd, shell=G_INUNIX, stdout=FDEVNULL, stderr=FDEVNULL)
- if not SIPP_BG_MODE:
- if sipp_proc == None or sipp_proc.poll():
- return None
- return sipp_proc
- else:
-
- pid = 0
- r = re.compile("PID=\[(\d+)\]", re.I)
- while True:
- line = sipp_proc.stdout.readline()
- pid_r = r.search(line)
- if pid_r:
- pid = int(pid_r.group(1))
- break
- if not sipp_proc.poll():
- break
- if pid != 0:
-
- if (sys.platform == "win32"):
- SYNCHRONIZE = 0x00100000
- PROCESS_QUERY_INFORMATION = 0x0400
- hnd = ctypes.windll.kernel32.OpenProcess(SYNCHRONIZE | PROCESS_QUERY_INFORMATION, False, pid)
- pid = hnd
- return pid
- def wait_sipp(sipp):
- if not SIPP_BG_MODE:
- global FDEVNULL
- sipp.wait()
- FDEVNULL.close()
- return sipp.returncode
- else:
- print("Waiting SIPp (PID=" + str(sipp) + ") to exit..")
- wait_cnt = 0
- while True:
- try:
- wait_cnt = wait_cnt + 1
- [pid_, ret_code] = os.waitpid(sipp, 0)
- if sipp == pid_:
-
- ret_code = ret_code >> 8
-
- if (sys.platform == "win32"):
- ctypes.windll.kernel32.CloseHandle(sipp)
-
- return ret_code
- except os.error:
- if wait_cnt <= 5:
- print("Retry ("+str(wait_cnt)+") waiting SIPp..")
- else:
- return -99
- def exec_pjsua_expects(t, sipp):
-
- ua = []
- for ua_idx in range(len(PJSUA_INST_PARAM)):
- ua.append(t.process[ua_idx])
- ua_err_st = ""
- while len(PJSUA_EXPECTS):
- expect = PJSUA_EXPECTS.pop(0)
- ua_idx = expect[0]
- expect_st = expect[1]
- send_cmd = resolve_driver_macros(expect[2])
- timeout = expect[3] if len(expect)>=4 else 0
-
- try:
- if expect_st != "":
- if timeout > 0:
- ua[ua_idx].expect(expect_st, raise_on_error = True, timeout = timeout)
- else:
- ua[ua_idx].expect(expect_st, raise_on_error = True)
- if send_cmd != "":
- ua[ua_idx].send(send_cmd)
- except TestError as e:
- ua_err_st = e.desc
- break;
- except:
- ua_err_st = "Unknown error"
- break;
-
-
-
-
-
-
-
-
-
-
-
-
-
- return ua_err_st
- def sipp_err_to_str(err_code):
- if err_code == 0:
- return "All calls were successful"
- elif err_code == 1:
- return "At least one call failed"
- elif err_code == 97:
- return "exit on internal command. Calls may have been processed"
- elif err_code == 99:
- return "Normal exit without calls processed"
- elif err_code == -1:
- return "Fatal error (timeout)"
- elif err_code == -2:
- return "Fatal error binding a socket"
- else:
- return "Unknown error"
- def TEST_FUNC(t):
- sipp_ret_code = 0
- ua_err_st = ""
- sipp = start_sipp()
- if not sipp:
- raise TestError("Failed starting SIPp")
- ua_err_st = exec_pjsua_expects(t, sipp)
- sipp_ret_code = wait_sipp(sipp)
- if ua_err_st != "":
- raise TestError(ua_err_st)
- if sipp_ret_code:
- rc = ctypes.c_byte(sipp_ret_code).value
- raise TestError("SIPp returned error " + str(rc) + ": " + sipp_err_to_str(rc))
- test = TestParam(SIPP_SCEN_XML[:-4],
- PJSUA_INST_PARAM,
- TEST_FUNC)
|