import sys import re import os import subprocess import random import telnetlib import time import threading import traceback import getopt import inc_cfg as inc import inc_const as const import inc_util as util # Vars G_EXE = "" # pjsua executable path G_INUNIX = False # flags that test is running in Unix # Usage string usage = \ """ run.py - Automated test driver Usage: run.py [options] MODULE CONFIG Options: --exe, -e pjsua executable path --null-audio, -n use null audio Sample: run.py -n mod_run.py scripts-run/100_simple.py """ # Parse arguments try: opts, args = getopt.getopt(sys.argv[1:], "hne:", ["help", "null-audio", "exe="]) except getopt.GetoptError as err: print(str(err)) print(usage) sys.exit(2) for o, a in opts: if o in ("-h", "--help"): print(usage) sys.exit() elif o in ("-n", "--null-audio"): inc.HAS_SND_DEV = 0 elif o in ("-e", "--exe"): G_EXE = a else: print("Unknown options") sys.exit(2) if len(args) != 2: print("Invalid arguments") print(usage) sys.exit(2) # Set global ARGS to be used by modules inc.ARGS = args # Get the pjsua executable name if G_EXE == "": if sys.platform.find("win32")!=-1: EXE_DIR = "../../pjsip-apps/bin/" EXECUTABLES = [ "pjsua_vc6d.exe", "pjsua_vc6.exe", "pjsua-i386-Win32-vc8-Debug.exe", "pjsua-i386-Win32-vc8-Debug-Dynamic.exe", "pjsua-i386-Win32-vc8-Debug-Static.exe", "pjsua-i386-Win32-vc8-Release.exe", "pjsua-i386-Win32-vc8-Release-Dynamic.exe", "pjsua-i386-Win32-vc8-Release-Static.exe", "pjsua-i386-Win32-vc14-Debug.exe", "pjsua-i386-Win32-vc14-Debug-Dynamic.exe", "pjsua-i386-Win32-vc14-Debug-Static.exe", "pjsua-i386-Win32-vc14-Release.exe", "pjsua-i386-Win32-vc14-Release-Dynamic.exe", "pjsua-i386-Win32-vc14-Release-Static.exe" ] e_ts = 0 for e in EXECUTABLES: e = EXE_DIR + e if os.access(e, os.F_OK): st = os.stat(e) if e_ts==0 or e_ts 5: raise inc.TestError(self.name + ": Timeout connecting to pjsua: " + repr(e)) self.running = True while self.proc.poll() is None: line = self.telnet.read_until(b'\n', 60) linestr = line.decode('utf-8') if linestr == "" or const.DESTROYED in linestr: break; #Print the line if echo is ON if self.echo: try: print(self.name + ": " + linestr.rstrip()) except UnicodeEncodeError: print((self.name + ": " + linestr.rstrip()).encode('utf-8')) self.lock.acquire() self.output += linestr self.lock.release() self.running = False else: fullcmd = G_EXE + " " + self.inst_param.arg + " --stdout-refresh=5 --stdout-refresh-text=" + const.STDOUT_REFRESH if not self.inst_param.enable_buffer: fullcmd = fullcmd + " --stdout-no-buf" self.trace("Popen " + fullcmd) self.proc = subprocess.Popen(fullcmd, shell=G_INUNIX, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False) self.running = True while self.proc.poll() is None: line = self.proc.stdout.readline() if line == "": break; #Print the line if echo is ON if self.echo: print(self.name + ": " + line.rstrip()) self.lock.acquire() self.output += line self.lock.release() self.running = False def send(self, cmd): self.trace("send " + cmd) if self.use_telnet: self.telnet.write((cmd + '\r\n').encode('utf-8')) else: self.proc.stdin.writelines(cmd + "\n") self.proc.stdin.flush() def expect(self, pattern, raise_on_error=True, title="", timeout=15): # no prompt for telnet if self.use_telnet and pattern==const.PROMPT: return self.trace("expect " + pattern) r = re.compile(pattern, re.I) found_at = -1 t0 = time.time() while found_at < 0: self.lock.acquire() lines = self.output.splitlines() for i, line in enumerate(lines): # Search for expected text if r.search(line) != None: found_at = i break # Trap assertion error if raise_on_error: if self.ra.search(line) != None: self.lock.release() raise inc.TestError(self.name + ": " + line) self.output = '\n'.join(lines[found_at+1:])+"\n" if found_at >= 0 else "" self.lock.release() if found_at >= 0: return line if not self.running: if raise_on_error: raise inc.TestError(self.name + ": Premature EOF") break else: t1 = time.time() dur = int(t1 - t0) if dur > timeout: self.trace("Timed-out!") if raise_on_error: raise inc.TestError(self.name + " " + title + ": Timeout expecting pattern: \"" + pattern + "\"") break else: time.sleep(0.01) return None def get_config(self, key_config): self.send("dd") line = self.expect(key_config); return line def sync_stdout(self): if not self.use_telnet: self.trace("sync_stdout") cmd = "echo 1" + str(random.randint(1000,9999)) self.send(cmd) self.expect(cmd) def wait(self): self.trace("wait") self.join() self.proc.communicate() if self.telnet: self.telnet.close() def trace(self, s): if self.trace_enabled: now = time.time() fmt = self.name + ": " + "================== " + s + " ==================" + " [at t=%(time)03d]" try: print(fmt % {'time':int(now - self.t0)}) except UnicodeEncodeError: print((fmt % {'time':int(now - self.t0)}).encode('utf-8')) ######################### # Error handling def handle_error(errmsg, t, close_processes = True): print("====== Caught error: " + errmsg + " ======") if (close_processes): time.sleep(1) for p in t.process: # Protect against 'Broken pipe' exception try: if not p.use_telnet: p.send("q") p.send("q") else: p.send("shutdown") except: pass is_err = False try: ret = p.expect(const.DESTROYED, False) if ret is None: is_err = True except: is_err = True if is_err and p.proc.poll() is None: if sys.hexversion >= 0x02060000: p.proc.terminate() else: p.wait() else: p.wait() print("Test completed with error: " + errmsg) sys.exit(1) ######################### # MAIN # Import the test script script = util.load_module_from_file("script", inc.ARGS[0]) # Init random seed random.seed() # Validate if script.test == None: print("Error: no test defined") sys.exit(1) if script.test.skip: print("Test " + script.test.title + " is skipped") sys.exit(0) if len(script.test.inst_params) == 0: print("Error: test doesn't contain pjsua run descriptions") sys.exit(1) # Instantiate pjsuas print("====== Running " + script.test.title + " ======") print("Using " + G_EXE + " as pjsua executable") for inst_param in script.test.inst_params: retry = 0 process_running = False while (not process_running) and retry < 3: p = None retry += 1 try: # Create pjsua's Expect instance from the param p = Expect(inst_param) p.start() except inc.TestError as e: handle_error(e.desc, script.test) # wait process ready if not p.use_telnet: while True: try: p.send("echo 1") except: time.sleep(0.1) continue break process_running = True else: t0 = time.time() while p.telnet is None: time.sleep(0.1) dur = int(time.time() - t0) if dur > 5: break process_running = p.telnet is not None # wait before retrying if not process_running and retry < 2: time.sleep(2) if not process_running: handle_error("Failed running pjsua", script.test) # add running instance script.test.process.append(p) for p in script.test.process: try: # Wait until registration completes if p.inst_param.have_reg: p.send("rr") p.expect(p.inst_param.uri+".*registration success") # Synchronize stdout if not p.use_telnet: p.send("") p.expect(const.PROMPT) p.send("echo 1") p.expect("echo 1") except inc.TestError as e: handle_error(e.desc, script.test) # Run the test function if script.test.test_func != None: try: script.test.test_func(script.test) except inc.TestError as e: handle_error(e.desc, script.test) except: handle_error("Unknown error: " + str(traceback.format_exc()), script.test) # Shutdown all instances for p in script.test.process: # Unregister if we have_reg to make sure that next tests # won't fail if p.inst_param.have_reg: p.send("ru") p.expect(p.inst_param.uri+".*unregistration success") if p.use_telnet: p.send("shutdown") else: p.send("q") time.sleep(0.5) for p in script.test.process: if p.running: p.expect(const.DESTROYED, False) p.wait() # Run the post test function if script.test.post_func != None: try: script.test.post_func(script.test) except inc.TestError as e: handle_error(e.desc, script.test, False) # Done print("Test " + script.test.title + " completed successfully") sys.exit(0)