123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440 |
- import sys
- import re
- import os
- import subprocess
- import random
- import telnetlib
- import time
- import threading
- import traceback
- import getopt
- import inc_cfg as inc
- import inc_const as const
- import inc_util as util
- # Vars
- G_EXE = "" # pjsua executable path
- G_INUNIX = False # flags that test is running in Unix
- # Usage string
- usage = \
- """
- run.py - Automated test driver
- Usage:
- run.py [options] MODULE CONFIG
- Options:
- --exe, -e pjsua executable path
- --null-audio, -n use null audio
- Sample:
- run.py -n mod_run.py scripts-run/100_simple.py
- """
- # Parse arguments
- try:
- opts, args = getopt.getopt(sys.argv[1:], "hne:", ["help", "null-audio", "exe="])
- except getopt.GetoptError as err:
- print(str(err))
- print(usage)
- sys.exit(2)
- for o, a in opts:
- if o in ("-h", "--help"):
- print(usage)
- sys.exit()
- elif o in ("-n", "--null-audio"):
- inc.HAS_SND_DEV = 0
- elif o in ("-e", "--exe"):
- G_EXE = a
- else:
- print("Unknown options")
- sys.exit(2)
- if len(args) != 2:
- print("Invalid arguments")
- print(usage)
- sys.exit(2)
- # Set global ARGS to be used by modules
- inc.ARGS = args
- # Get the pjsua executable name
- if G_EXE == "":
- if sys.platform.find("win32")!=-1:
- EXE_DIR = "../../pjsip-apps/bin/"
- EXECUTABLES = [ "pjsua_vc6d.exe",
- "pjsua_vc6.exe",
- "pjsua-i386-Win32-vc8-Debug.exe",
- "pjsua-i386-Win32-vc8-Debug-Dynamic.exe",
- "pjsua-i386-Win32-vc8-Debug-Static.exe",
- "pjsua-i386-Win32-vc8-Release.exe",
- "pjsua-i386-Win32-vc8-Release-Dynamic.exe",
- "pjsua-i386-Win32-vc8-Release-Static.exe",
- "pjsua-i386-Win32-vc14-Debug.exe",
- "pjsua-i386-Win32-vc14-Debug-Dynamic.exe",
- "pjsua-i386-Win32-vc14-Debug-Static.exe",
- "pjsua-i386-Win32-vc14-Release.exe",
- "pjsua-i386-Win32-vc14-Release-Dynamic.exe",
- "pjsua-i386-Win32-vc14-Release-Static.exe"
- ]
- e_ts = 0
- for e in EXECUTABLES:
- e = EXE_DIR + e
- if os.access(e, os.F_OK):
- st = os.stat(e)
- if e_ts==0 or e_ts<st.st_mtime:
- G_EXE = e
- e_ts = st.st_mtime
- if G_EXE=="":
- print("Unable to find valid pjsua. Please build pjsip first")
- sys.exit(1)
- G_INUNIX = False
- else:
- f = open("../../build.mak", "r")
- while True:
- line = f.readline()
- if not line:
- break
- if line.find("TARGET_NAME")!=-1:
- print(line)
- G_EXE="../../pjsip-apps/bin/pjsua-" + line.split(":= ")[1]
- break
- if G_EXE=="":
- print("Unable to find ../../../build.mak. Please build pjsip first")
- sys.exit(1)
- G_INUNIX = True
- else:
- if sys.platform.lower().find("win32")!=-1 or sys.platform.lower().find("microsoft")!=-1:
- G_INUNIX = False
- else:
- G_INUNIX = True
- G_EXE = G_EXE.rstrip("\n\r \t")
- ###################################
- # Poor man's 'expect'-like class
- class Expect(threading.Thread):
- proc = None
- telnet = None
- use_telnet = False
- echo = False
- trace_enabled = False
- inst_param = None
- rh = re.compile(const.DESTROYED)
- ra = re.compile(const.ASSERT, re.I)
- rr = re.compile(const.STDOUT_REFRESH)
- t0 = time.time()
- output = ""
- lock = threading.Lock()
- running = False
- def __init__(self, inst_param):
- threading.Thread.__init__(self)
- self.inst_param = inst_param
- self.name = inst_param.name
- self.echo = inst_param.echo_enabled
- self.trace_enabled = inst_param.trace_enabled
- self.use_telnet = inst_param.telnet_enabled
- self.telnet = None
-
- def run(self):
- if self.use_telnet:
- fullcmd = G_EXE + " " + self.inst_param.arg + " --use-cli --no-cli-console --cli-telnet-port=%d" % (self.inst_param.telnet_port)
- self.trace("Popen " + fullcmd)
- self.proc = subprocess.Popen(fullcmd, shell=G_INUNIX)
-
- # start telnet-ing to pjsua, raise exception if telnet fails after 5s
- t0 = time.time()
- while self.proc.poll() is None and self.telnet is None:
- try:
- time.sleep(0.01)
- self.telnet = telnetlib.Telnet('127.0.0.1', port=self.inst_param.telnet_port, timeout=60)
- except Exception as e:
- t1 = time.time()
- dur = int(t1 - t0)
- if dur > 5:
- raise inc.TestError(self.name + ": Timeout connecting to pjsua: " + repr(e))
- self.running = True
- while self.proc.poll() is None:
- line = self.telnet.read_until(b'\n', 60)
- linestr = line.decode('utf-8')
- if linestr == "" or const.DESTROYED in linestr:
- break;
-
- #Print the line if echo is ON
- if self.echo:
- try:
- print(self.name + ": " + linestr.rstrip())
- except UnicodeEncodeError:
- print((self.name + ": " + linestr.rstrip()).encode('utf-8'))
- self.lock.acquire()
- self.output += linestr
- self.lock.release()
- self.running = False
- else:
- fullcmd = G_EXE + " " + self.inst_param.arg + " --stdout-refresh=5 --stdout-refresh-text=" + const.STDOUT_REFRESH
- if not self.inst_param.enable_buffer:
- fullcmd = fullcmd + " --stdout-no-buf"
- self.trace("Popen " + fullcmd)
- self.proc = subprocess.Popen(fullcmd, shell=G_INUNIX, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False)
- self.running = True
- while self.proc.poll() is None:
- line = self.proc.stdout.readline()
- if line == "":
- break;
-
- #Print the line if echo is ON
- if self.echo:
- print(self.name + ": " + line.rstrip())
- self.lock.acquire()
- self.output += line
- self.lock.release()
- self.running = False
- def send(self, cmd):
- self.trace("send " + cmd)
- if self.use_telnet:
- self.telnet.write((cmd + '\r\n').encode('utf-8'))
- else:
- self.proc.stdin.writelines(cmd + "\n")
- self.proc.stdin.flush()
-
- def expect(self, pattern, raise_on_error=True, title="", timeout=15):
- # no prompt for telnet
- if self.use_telnet and pattern==const.PROMPT:
- return
- self.trace("expect " + pattern)
- r = re.compile(pattern, re.I)
- found_at = -1
- t0 = time.time()
- while found_at < 0:
- self.lock.acquire()
- lines = self.output.splitlines()
-
- for i, line in enumerate(lines):
- # Search for expected text
- if r.search(line) != None:
- found_at = i
- break
-
- # Trap assertion error
- if raise_on_error:
- if self.ra.search(line) != None:
- self.lock.release()
- raise inc.TestError(self.name + ": " + line)
- self.output = '\n'.join(lines[found_at+1:])+"\n" if found_at >= 0 else ""
- self.lock.release()
-
- if found_at >= 0:
- return line
- if not self.running:
- if raise_on_error:
- raise inc.TestError(self.name + ": Premature EOF")
- break
- else:
- t1 = time.time()
- dur = int(t1 - t0)
- if dur > timeout:
- self.trace("Timed-out!")
- if raise_on_error:
- raise inc.TestError(self.name + " " + title + ": Timeout expecting pattern: \"" + pattern + "\"")
- break
- else:
- time.sleep(0.01)
- return None
-
- def get_config(self, key_config):
- self.send("dd")
- line = self.expect(key_config);
- return line
- def sync_stdout(self):
- if not self.use_telnet:
- self.trace("sync_stdout")
- cmd = "echo 1" + str(random.randint(1000,9999))
- self.send(cmd)
- self.expect(cmd)
- def wait(self):
- self.trace("wait")
- self.join()
- self.proc.communicate()
- if self.telnet:
- self.telnet.close()
- def trace(self, s):
- if self.trace_enabled:
- now = time.time()
- fmt = self.name + ": " + "================== " + s + " ==================" + " [at t=%(time)03d]"
- try:
- print(fmt % {'time':int(now - self.t0)})
- except UnicodeEncodeError:
- print((fmt % {'time':int(now - self.t0)}).encode('utf-8'))
- #########################
- # Error handling
- def handle_error(errmsg, t, close_processes = True):
- print("====== Caught error: " + errmsg + " ======")
- if (close_processes):
- time.sleep(1)
- for p in t.process:
- # Protect against 'Broken pipe' exception
- try:
- if not p.use_telnet:
- p.send("q")
- p.send("q")
- else:
- p.send("shutdown")
- except:
- pass
- is_err = False
- try:
- ret = p.expect(const.DESTROYED, False)
- if ret is None:
- is_err = True
- except:
- is_err = True
- if is_err and p.proc.poll() is None:
- if sys.hexversion >= 0x02060000:
- p.proc.terminate()
- else:
- p.wait()
- else:
- p.wait()
- print("Test completed with error: " + errmsg)
- sys.exit(1)
- #########################
- # MAIN
- # Import the test script
- script = util.load_module_from_file("script", inc.ARGS[0])
- # Init random seed
- random.seed()
- # Validate
- if script.test == None:
- print("Error: no test defined")
- sys.exit(1)
- if script.test.skip:
- print("Test " + script.test.title + " is skipped")
- sys.exit(0)
- if len(script.test.inst_params) == 0:
- print("Error: test doesn't contain pjsua run descriptions")
- sys.exit(1)
- # Instantiate pjsuas
- print("====== Running " + script.test.title + " ======")
- print("Using " + G_EXE + " as pjsua executable")
- for inst_param in script.test.inst_params:
- retry = 0
- process_running = False
- while (not process_running) and retry < 3:
- p = None
- retry += 1
- try:
- # Create pjsua's Expect instance from the param
- p = Expect(inst_param)
- p.start()
- except inc.TestError as e:
- handle_error(e.desc, script.test)
- # wait process ready
- if not p.use_telnet:
- while True:
- try:
- p.send("echo 1")
- except:
- time.sleep(0.1)
- continue
- break
- process_running = True
- else:
- t0 = time.time()
- while p.telnet is None:
- time.sleep(0.1)
- dur = int(time.time() - t0)
- if dur > 5:
- break
- process_running = p.telnet is not None
- # wait before retrying
- if not process_running and retry < 2:
- time.sleep(2)
- if not process_running:
- handle_error("Failed running pjsua", script.test)
- # add running instance
- script.test.process.append(p)
- for p in script.test.process:
- try:
- # Wait until registration completes
- if p.inst_param.have_reg:
- p.send("rr")
- p.expect(p.inst_param.uri+".*registration success")
- # Synchronize stdout
- if not p.use_telnet:
- p.send("")
- p.expect(const.PROMPT)
- p.send("echo 1")
- p.expect("echo 1")
- except inc.TestError as e:
- handle_error(e.desc, script.test)
- # Run the test function
- if script.test.test_func != None:
- try:
- script.test.test_func(script.test)
- except inc.TestError as e:
- handle_error(e.desc, script.test)
- except:
- handle_error("Unknown error: " + str(traceback.format_exc()), script.test)
- # Shutdown all instances
- for p in script.test.process:
- # Unregister if we have_reg to make sure that next tests
- # won't fail
- if p.inst_param.have_reg:
- p.send("ru")
- p.expect(p.inst_param.uri+".*unregistration success")
-
- if p.use_telnet:
- p.send("shutdown")
- else:
- p.send("q")
- time.sleep(0.5)
- for p in script.test.process:
- if p.running:
- p.expect(const.DESTROYED, False)
- p.wait()
- # Run the post test function
- if script.test.post_func != None:
- try:
- script.test.post_func(script.test)
- except inc.TestError as e:
- handle_error(e.desc, script.test, False)
- # Done
- print("Test " + script.test.title + " completed successfully")
- sys.exit(0)
|