mod_sendto.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. import sys
  2. import inc_sip as sip
  3. import inc_const as const
  4. import inc_util as util
  5. import re
  6. from inc_cfg import *
  7. # Read configuration
  8. cfg_file = util.load_module_from_file("cfg_file", ARGS[1])
  9. # Test body function
  10. def test_func(t):
  11. pjsua = t.process[0]
  12. # Create dialog
  13. dlg = sip.Dialog("127.0.0.1", pjsua.inst_param.sip_port,
  14. tcp=cfg_file.sendto_cfg.use_tcp)
  15. #dlg = sip.Dialog("127.0.0.1", 5060, tcp=cfg_file.sendto_cfg.use_tcp)
  16. cfg = cfg_file.sendto_cfg
  17. if len(cfg.complete_msg) != 0:
  18. req = dlg.update_fields(cfg.complete_msg)
  19. else:
  20. req = dlg.create_invite(cfg.sdp, cfg.extra_headers, cfg.body)
  21. resp = dlg.send_request_wait(req, 10)
  22. if resp=="":
  23. raise TestError("Timed-out waiting for response")
  24. # Check response code
  25. code = int(sip.get_code(resp))
  26. if code != cfg.resp_code:
  27. dlg.hangup(code)
  28. raise TestError("Expecting code " + str(cfg.resp_code) +
  29. " got " + str(code))
  30. # Check for patterns that must exist
  31. for p in cfg.resp_include:
  32. if re.search(p, resp, re.M | re.I)==None:
  33. dlg.hangup(code)
  34. raise TestError("Pattern " + p + " not found")
  35. # Check for patterns that must not exist
  36. for p in cfg.resp_exclude:
  37. if re.search(p, resp, re.M | re.I)!=None:
  38. dlg.hangup(code)
  39. raise TestError("Excluded pattern " + p + " found")
  40. pjsua.sync_stdout()
  41. dlg.hangup(code)
  42. pjsua.sync_stdout()
  43. # Here where it all comes together
  44. test = TestParam(cfg_file.sendto_cfg.name,
  45. [cfg_file.sendto_cfg.inst_param],
  46. test_func)