123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- import time
- import sys
- import inc_const as const
- import inc_util as util
- from inc_cfg import *
- # Load configuration
- cfg_file = util.load_module_from_file("cfg_file", ARGS[1])
- # Trigger address switch for media flow between ua1 and ua2.
- # When the receiver uses STUN while both sides are actually in the same
- # private network, initial media packets may be sent to public IP address
- # as specified in the receiver SDP and those packets may not be delivered
- # if the NAT does not support hairpinning. This function will make both
- # sides to send some initial packets to trigger destination address switch
- # in media transport, so future packets will be delivered to the correct
- # address (private IP address).
- def hole_punch(ua1, ua2):
- if ua1.use_telnet:
- ua1.send("# 987")
- else:
- ua1.send("#")
- ua1.expect("#")
- ua1.send("987")
- if ua2.use_telnet:
- ua2.send("# 789")
- else:
- ua2.send("#")
- ua2.expect("#")
- ua2.send("789")
- time.sleep(0.1)
- # Check media flow between ua1 and ua2
- def check_media(ua1, ua2):
- if ua1.use_telnet:
- ua1.send("# 1122")
- else:
- ua1.send("#")
- ua1.expect("#")
- ua1.send("1122")
- ua2.expect(const.RX_DTMF + "1")
- ua2.expect(const.RX_DTMF + "1")
- ua2.expect(const.RX_DTMF + "2")
- ua2.expect(const.RX_DTMF + "2")
- # Test body function
- def test_func(t):
- callee = t.process[0]
- caller = t.process[1]
- # if have_reg then wait for couple of seconds for PUBLISH
- # to complete (just in case pUBLISH is used)
- if callee.inst_param.have_reg:
- time.sleep(1)
- if caller.inst_param.have_reg:
- time.sleep(1)
-
- # Check if ICE is used
- use_ice = ("--use-ice" in caller.inst_param.arg) and ("--use-ice" in callee.inst_param.arg)
- # Check if STUN is used (by either side)
- use_stun = ("--stun-srv" in caller.inst_param.arg) or ("--stun-srv" in callee.inst_param.arg)
- # Check if DTLS-SRTP is used
- use_dtls_srtp = "--srtp-keying=1" in caller.inst_param.arg
- # Caller making call
- if caller.use_telnet:
- caller.send("call new " + t.inst_params[0].uri)
- else:
- caller.send("m")
- caller.send(t.inst_params[0].uri)
- caller.expect(const.STATE_CALLING)
-
- # Callee waits for call and answers with 180/Ringing
- time.sleep(0.2)
- callee.expect(const.EVENT_INCOMING_CALL)
- if callee.use_telnet:
- callee.send("call answer 180")
- else:
- callee.send("a")
- callee.send("180")
- callee.expect("SIP/2.0 180")
- caller.expect("SIP/2.0 180")
- # Synchronize stdout
- caller.sync_stdout()
- callee.sync_stdout()
- # Callee answers with 200/OK
- if callee.use_telnet:
- callee.send("call answer 200")
- else:
- callee.send("a")
- callee.send("200")
- # Wait until call is connected in both endpoints
- ##time.sleep(0.2)
- caller.expect(const.STATE_CONFIRMED)
- callee.expect(const.STATE_CONFIRMED)
- # Synchronize stdout
- caller.sync_stdout()
- callee.sync_stdout()
- ##time.sleep(0.1)
- caller.sync_stdout()
- callee.sync_stdout()
-
- # Wait ICE nego before checking media
- if use_ice:
- # Unfortunately ICE nego may race with STATE_CONFIRMED (esp. on callee side), so let's just sleep
- #caller.expect("ICE negotiation success")
- #callee.expect("ICE negotiation success")
- # Additional wait for ICE updating address (via UPDATE/re-INVITE)
- time.sleep(0.5)
- # Wait DTLS-SRTP nego before checking media
- if use_dtls_srtp:
- # Unfortunately DTLS-SRTP nego may race with STATE_CONFIRMED, so let's just sleep
- #caller.expect("SRTP started, keying=DTLS-SRTP")
- #callee.expect("SRTP started, keying=DTLS-SRTP")
- time.sleep(0.5)
- # Trigger address switch before checking media
- if use_stun and not use_ice:
- hole_punch(caller, callee)
- # Test that media is okay
- check_media(caller, callee)
- check_media(callee, caller)
- # Hold call by caller
- if caller.use_telnet:
- caller.send("call hold")
- else:
- caller.send("H")
- caller.expect("INVITE sip:")
- callee.expect("INVITE sip:")
- callee.expect(const.MEDIA_HOLD)
- caller.expect(const.MEDIA_HOLD)
-
- # Synchronize stdout
- caller.sync_stdout()
- callee.sync_stdout()
- # Release hold
- ##time.sleep(0.5)
- if caller.use_telnet:
- caller.send("call reinvite")
- else:
- caller.send("v")
- caller.expect("INVITE sip:")
- callee.expect("INVITE sip:")
- callee.expect(const.MEDIA_ACTIVE, title="waiting for media active after call hold")
- caller.expect(const.MEDIA_ACTIVE, title="waiting for media active after call hold")
- # Synchronize stdout
- caller.sync_stdout()
- callee.sync_stdout()
- # Trigger address switch before checking media
- if use_stun and not use_ice:
- hole_punch(caller, callee)
- # Test that media is okay
- check_media(caller, callee)
- check_media(callee, caller)
- # Synchronize stdout
- caller.sync_stdout()
- callee.sync_stdout()
- # Hold call by callee
- if callee.use_telnet:
- callee.send("call hold")
- else:
- callee.send("H")
- callee.expect("INVITE sip:")
- caller.expect("INVITE sip:")
- caller.expect(const.MEDIA_HOLD)
- callee.expect(const.MEDIA_HOLD)
-
- # Synchronize stdout
- caller.sync_stdout()
- callee.sync_stdout()
- # Release hold
- ##time.sleep(0.1)
- if callee.use_telnet:
- callee.send("call reinvite")
- else:
- callee.send("v")
- callee.expect("INVITE sip:")
- caller.expect("INVITE sip:")
- caller.expect(const.MEDIA_ACTIVE, title="waiting for media active after call hold")
- callee.expect(const.MEDIA_ACTIVE, title="waiting for media active after call hold")
- # Synchronize stdout
- caller.sync_stdout()
- callee.sync_stdout()
- # Trigger address switch before checking media
- if use_stun and not use_ice:
- hole_punch(caller, callee)
- # Test that media is okay
- # Wait for some time for ICE negotiation
- ##time.sleep(0.6)
- check_media(caller, callee)
- check_media(callee, caller)
- # Synchronize stdout
- caller.sync_stdout()
- callee.sync_stdout()
- # UPDATE (by caller)
- if caller.use_telnet:
- caller.send("call update")
- else:
- caller.send("U")
- #caller.sync_stdout()
- callee.expect(const.MEDIA_ACTIVE, title="waiting for media active with UPDATE")
- caller.expect(const.MEDIA_ACTIVE, title="waiting for media active with UPDATE")
-
- # Synchronize stdout
- caller.sync_stdout()
- callee.sync_stdout()
- # Trigger address switch before checking media
- if use_stun and not use_ice:
- hole_punch(caller, callee)
- # Test that media is okay
- ##time.sleep(0.1)
- check_media(caller, callee)
- check_media(callee, caller)
- # UPDATE (by callee)
- if callee.use_telnet:
- callee.send("call update")
- else:
- callee.send("U")
- callee.expect("UPDATE sip:")
- caller.expect("UPDATE sip:")
- caller.expect(const.MEDIA_ACTIVE, title="waiting for media active with UPDATE")
- callee.expect(const.MEDIA_ACTIVE, title="waiting for media active with UPDATE")
-
- # Synchronize stdout
- caller.sync_stdout()
- callee.sync_stdout()
- # Trigger address switch before checking media
- if use_stun and not use_ice:
- hole_punch(caller, callee)
- # Test that media is okay
- ##time.sleep(0.1)
- check_media(caller, callee)
- check_media(callee, caller)
- # Synchronize stdout
- caller.sync_stdout()
- callee.sync_stdout()
- # Set codecs in both caller and callee so that there is
- # no common codec between them.
- # In caller we only enable PCMU, in callee we only enable PCMA
- if caller.use_telnet:
- caller.send("Cp * 0")
- caller.send("Cp")
- caller.expect("PCMU/8000.* prio: 0")
- caller.send("Cp PCMU 120")
- caller.send("Cp")
- caller.expect("PCMU/8000.* prio: 120")
- else:
- caller.send("Cp")
- caller.expect("Enter codec")
- caller.send("* 0")
- caller.send("Cp")
- caller.expect("Enter codec")
- caller.send("pcmu 120")
- if callee.use_telnet:
- callee.send("Cp * 0")
- callee.send("Cp")
- callee.expect("PCMA/8000.* prio: 0")
- callee.send("Cp PCMA 120")
- callee.send("Cp")
- callee.expect("PCMA/8000.* prio: 120")
- else:
- callee.send("Cp")
- callee.expect("Enter codec")
- callee.send("* 0")
- callee.send("Cp")
- callee.expect("Enter codec")
- callee.send("pcma 120")
- # Test when UPDATE fails (by callee)
- if callee.use_telnet:
- callee.send("call update")
- else:
- callee.send("U")
- caller.expect("SIP/2.0 488")
- callee.expect("SIP/2.0 488")
- callee.sync_stdout()
- caller.sync_stdout()
-
- # Test that media is still okay
- ##time.sleep(0.1)
- check_media(caller, callee)
- check_media(callee, caller)
- # Test when UPDATE fails (by caller)
- if caller.use_telnet:
- caller.send("call update")
- else:
- caller.send("U")
- caller.expect("UPDATE sip:")
- callee.expect("UPDATE sip:")
- callee.expect("SIP/2.0 488")
- caller.expect("SIP/2.0 488")
- caller.sync_stdout()
- callee.sync_stdout()
-
- # Test that media is still okay
- ##time.sleep(0.1)
- check_media(callee, caller)
- check_media(caller, callee)
- # Hangup call
- ##time.sleep(0.1)
- if caller.use_telnet:
- caller.send("call hangup")
- else:
- caller.send("h")
- # Wait until calls are cleared in both endpoints
- caller.expect(const.STATE_DISCONNECTED)
- callee.expect(const.STATE_DISCONNECTED)
-
- # Here where it all comes together
- test = cfg_file.test_param
- test.test_func = test_func
|