123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- #
- from socket import *
- import re
- import random
- import time
- import sys
- import inc_cfg as cfg
- from select import *
- # SIP request template
- req_templ = \
- """$METHOD $TARGET_URI SIP/2.0\r
- Via: SIP/2.0/UDP $LOCAL_IP:$LOCAL_PORT;rport;branch=z9hG4bK$BRANCH\r
- Max-Forwards: 70\r
- From: <sip:caller@pjsip.org>$FROM_TAG\r
- To: <$TARGET_URI>$TO_TAG\r
- Contact: <sip:$LOCAL_IP:$LOCAL_PORT;transport=udp>\r
- Call-ID: $CALL_ID@pjsip.org\r
- CSeq: $CSEQ $METHOD\r
- Allow: PRACK, INVITE, ACK, BYE, CANCEL, UPDATE, REFER\r
- Supported: replaces, 100rel, norefersub\r
- User-Agent: pjsip.org Python tester\r
- Content-Length: $CONTENT_LENGTH\r
- $SIP_HEADERS"""
- def is_request(msg):
- return msg.split(" ", 1)[0] != "SIP/2.0"
-
- def is_response(msg):
- return msg.split(" ", 1)[0] == "SIP/2.0"
- def get_code(msg):
- if msg=="":
- return 0
- return int(msg.split(" ", 2)[1])
- def get_tag(msg, hdr="To"):
- pat = "^" + hdr + ":.*"
- result = re.search(pat, msg, re.M | re.I)
- if result==None:
- return ""
- line = result.group()
- #print "line=", line
- tags = line.split(";tag=")
- if len(tags)>1:
- return tags[1]
- return ""
- #return re.split("[;& ]", s)
- def get_header(msg, hname):
- headers = msg.splitlines()
- for hdr in headers:
- hfields = hdr.split(": ", 2)
- if hfields[0]==hname:
- return hfields[1]
- return None
- class Dialog:
- sock = None
- dst_addr = ""
- dst_port = 5060
- local_ip = ""
- local_port = 0
- tcp = False
- call_id = str(random.random())
- cseq = 0
- local_tag = ";tag=" + str(random.random())
- rem_tag = ""
- last_resp_code = 0
- inv_branch = ""
- trace_enabled = True
- last_request = ""
- def __init__(self, dst_addr, dst_port=5060, tcp=False, trace=True, local_port=0):
- self.dst_addr = dst_addr
- self.dst_port = dst_port
- self.tcp = tcp
- self.trace_enabled = trace
- if tcp==True:
- self.sock = socket(AF_INET, SOCK_STREAM)
- self.sock.connect(dst_addr, dst_port)
- else:
- self.sock = socket(AF_INET, SOCK_DGRAM)
- self.sock.bind(("127.0.0.1", local_port))
-
- self.local_ip, self.local_port = self.sock.getsockname()
- self.trace("Dialog socket bound to " + self.local_ip + ":" + str(self.local_port))
- def trace(self, txt):
- if self.trace_enabled:
- try:
- print(str(time.strftime("%H:%M:%S ")) + txt)
- except UnicodeEncodeError:
- print((str(time.strftime("%H:%M:%S ")) + txt).encode('utf-8'))
- def update_fields(self, msg):
- if self.tcp:
- transport_param = ";transport=tcp"
- else:
- transport_param = ""
- msg = msg.replace("$TARGET_URI", "sip:"+self.dst_addr+":"+str(self.dst_port) + transport_param)
- msg = msg.replace("$LOCAL_IP", self.local_ip)
- msg = msg.replace("$LOCAL_PORT", str(self.local_port))
- msg = msg.replace("$FROM_TAG", self.local_tag)
- msg = msg.replace("$TO_TAG", self.rem_tag)
- msg = msg.replace("$CALL_ID", self.call_id)
- msg = msg.replace("$CSEQ", str(self.cseq))
- branch=str(random.random())
- msg = msg.replace("$BRANCH", branch)
- return msg
- def create_req(self, method, sdp, branch="", extra_headers="", body=""):
- if branch=="":
- self.cseq = self.cseq + 1
- msg = req_templ
- msg = msg.replace("$METHOD", method)
- msg = msg.replace("$SIP_HEADERS", extra_headers)
- if branch=="":
- branch=str(random.random())
- msg = msg.replace("$BRANCH", branch)
- if sdp!="":
- msg = msg.replace("$CONTENT_LENGTH", str(len(sdp)))
- msg = msg + "Content-Type: application/sdp\r\n"
- msg = msg + "\r\n"
- msg = msg + sdp
- elif body!="":
- msg = msg.replace("$CONTENT_LENGTH", str(len(body)))
- msg = msg + "\r\n"
- msg = msg + body
- else:
- msg = msg.replace("$CONTENT_LENGTH", "0")
- return self.update_fields(msg)
- def create_response(self, request, code, reason, to_tag=""):
- response = "SIP/2.0 " + str(code) + " " + reason + "\r\n"
- lines = request.splitlines()
- for line in lines:
- hdr = line.split(":", 1)[0]
- if hdr in ["Via", "From", "To", "CSeq", "Call-ID"]:
- if hdr=="To" and to_tag!="":
- line = line + ";tag=" + to_tag
- elif hdr=="Via":
- line = line + ";received=127.0.0.1"
- response = response + line + "\r\n"
- return response
- def create_invite(self, sdp, extra_headers="", body=""):
- self.inv_branch = str(random.random())
- return self.create_req("INVITE", sdp, branch=self.inv_branch, extra_headers=extra_headers, body=body)
- def create_ack(self, sdp="", extra_headers=""):
- return self.create_req("ACK", sdp, extra_headers=extra_headers, branch=self.inv_branch)
- def create_bye(self, extra_headers=""):
- return self.create_req("BYE", "", extra_headers)
- def send_msg(self, msg, dst_addr=None):
- if (is_request(msg)):
- self.last_request = msg.split(" ", 1)[0]
- if not dst_addr:
- dst_addr = (self.dst_addr, self.dst_port)
- self.trace("============== TX MSG to " + str(dst_addr) + " ============= \n" + msg)
- self.sock.sendto(msg.encode('utf-8'), 0, dst_addr)
- def wait_msg_from(self, timeout):
- endtime = time.time() + timeout
- msg = ""
- src_addr = None
- while time.time() < endtime:
- readset = select([self.sock], [], [], 1)
- if len(readset[0]) < 1 or not self.sock in readset[0]:
- if len(readset[0]) < 1:
- print("select() timeout (will wait for " + str(int(endtime - time.time())) + "more secs)")
- elif not self.sock in readset[0]:
- print("select() alien socket")
- else:
- print("select other error")
- continue
- try:
- msg, src_addr = self.sock.recvfrom(4096)
- break
- except:
- print("recv() exception: ", sys.exc_info()[0])
- continue
-
- msgstr = msg.decode('utf-8')
- if msgstr=="":
- return "", None
- if self.last_request=="INVITE" and self.rem_tag=="":
- self.rem_tag = get_tag(msgstr, "To")
- self.rem_tag = self.rem_tag.rstrip("\r\n;")
- if self.rem_tag != "":
- self.rem_tag = ";tag=" + self.rem_tag
- self.trace("=== rem_tag:" + self.rem_tag)
- self.trace("=========== RX MSG from " + str(src_addr) + " ===========\n" + msgstr)
- return (msgstr, src_addr)
-
- def wait_msg(self, timeout):
- return self.wait_msg_from(timeout)[0]
-
- # Send request and wait for final response
- def send_request_wait(self, msg, timeout):
- t1 = 1.0
- endtime = time.time() + timeout
- resp = ""
- code = 0
- for i in range(0,5):
- self.send_msg(msg)
- resp = self.wait_msg(t1)
- if resp!="" and is_response(resp):
- code = get_code(resp)
- break
- last_resp = resp
- while code < 200 and time.time() < endtime:
- resp = self.wait_msg(endtime - time.time())
- if resp != "" and is_response(resp):
- code = get_code(resp)
- last_resp = resp
- elif resp=="":
- break
- return last_resp
-
- def hangup(self, last_code=0):
- self.trace("====== hangup =====")
- if last_code!=0:
- self.last_resp_code = last_code
- if self.last_resp_code>0 and self.last_resp_code<200:
- msg = self.create_req("CANCEL", "", branch=self.inv_branch, extra_headers="")
- self.send_request_wait(msg, 5)
- msg = self.create_ack()
- self.send_msg(msg)
- elif self.last_resp_code>=200 and self.last_resp_code<300:
- msg = self.create_ack()
- self.send_msg(msg)
- msg = self.create_bye()
- self.send_request_wait(msg, 5)
- else:
- msg = self.create_ack()
- self.send_msg(msg)
- class SendtoCfg:
- # Test name
- name = ""
- # pjsua InstanceParam
- inst_param = None
- # Complete INVITE message. If this is not empty, then this
- # message will be sent instead and the "sdp" and "extra_headers"
- # settings will be ignored.
- complete_msg = ""
- # Initial SDP
- sdp = ""
- # Extra headers to add to request
- extra_headers = ""
- # Expected code
- resp_code = 0
- # Use TCP?
- use_tcp = False
- # List of RE patterns that must exist in response
- resp_include = []
- # List of RE patterns that must NOT exist in response
- resp_exclude = []
- # Full (non-SDP) body
- body = ""
- # Constructor
- def __init__(self, name, pjsua_args, sdp, resp_code,
- resp_inc=[], resp_exc=[], use_tcp=False,
- extra_headers="", body="", complete_msg="",
- enable_buffer = False):
- self.complete_msg = complete_msg
- self.sdp = sdp
- self.resp_code = resp_code
- self.resp_include = resp_inc
- self.resp_exclude = resp_exc
- self.use_tcp = use_tcp
- self.extra_headers = extra_headers
- self.body = body
- self.inst_param = cfg.InstanceParam("pjsua", pjsua_args)
- self.inst_param.enable_buffer = enable_buffer
- class RecvfromTransaction:
- # The test title for this transaction
- title = ""
- # Optinal list of pjsua command and optional expect patterns
- # to be invoked to make pjsua send a request
- # Sample:
- # (to make call and wait for INVITE to be sent)
- # cmds = [ ["m"], ["sip:127.0.0.1", "INVITE sip:"] ]
- cmds = []
- # Check if the CSeq must be greater than last Cseq?
- check_cseq = True
- # List of RE patterns that must exists in incoming request
- include = []
- # List of RE patterns that MUST NOT exist in incoming request
- exclude = []
- # Response code to send
- resp_code = 0
- # Additional list of headers to be sent on the response
- # Note: no need to add CRLF on the header
- resp_hdr = []
- # Message body. This should include the Content-Type header too.
- # Sample:
- # body = """Content-Type: application/sdp\r\n
- # \r\n
- # v=0\r\n
- # ...
- # """
- body = None
- # Pattern to be expected on pjsua when receiving the response
- expect = ""
- # Required config
- pj_config = ""
-
- def __init__(self, title, resp_code, check_cseq=True,
- include=[], exclude=[], cmds=[], resp_hdr=[], resp_body=None, expect="", pj_config=""):
- self.title = title
- self.cmds = cmds
- self.include = include
- self.exclude = exclude
- self.resp_code = resp_code
- self.resp_hdr = resp_hdr
- self.body = resp_body
- self.expect = expect
- self.pj_config=pj_config
-
- class RecvfromCfg:
- # Test name
- name = ""
- # pjsua InstanceParam
- inst_param = None
- # List of RecvfromTransaction
- transaction = None
- # Use TCP?
- tcp = False
- # Required config
- pj_config = ""
- # Note:
- # Any "$PORT" string in the pjsua_args will be replaced
- # by server port
- def __init__(self, name, pjsua_args, transaction, tcp=False, pj_config=""):
- self.name = name
- self.inst_param = cfg.InstanceParam("pjsua", pjsua_args)
- self.transaction = transaction
- self.tcp=tcp
- self.pj_config=pj_config
|