bot.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import os
  4. import time
  5. import json
  6. import wave
  7. import queue
  8. import threading
  9. import traceback
  10. import pjsua2 as pj
  11. from enum import Enum
  12. from src.core.voip.constant import *
  13. import requests
  14. from src.core.callcenter.model import BotChatRequest, ChatResponse,ChatMessage
  15. calls = {}
  16. # recording_file = '/code/src/core/voip/incoming_call.wav'
  17. # player_file1 = '/code/src/core/voip/test111.wav'
  18. # player_file2 = '/code/src/core/voip/test222.wav'
  19. class BotStatus(Enum):
  20. # 等待用户讲话,未超时
  21. wait_speaking_not_timeout = 1
  22. # 等待用户讲话,已超时但用户仍在持续讲话
  23. wait_speaking_timeout_user_speaking = 2
  24. # 机器人讲话过程中
  25. speaking = 3
  26. # 等待用户讲话结束
  27. wait_user_speaking_end = 4
  28. # 等待大模型输出
  29. wait_gpt_return = 5
  30. class UserStatus(Enum):
  31. # ASR返回值为sentenceBegin后将状态置为Speaking
  32. speaking = 1
  33. # ASR返回值为sentenceEnd后,将状态置为Silence
  34. silence = 2
  35. class MyAudioMediaPort(pj.AudioMediaPort):
  36. def __init__(self, call, aud_med=None, asr=None):
  37. pj.AudioMediaPort.__init__(self)
  38. # # 打开一个 .pcm 文件来保存音频流(可选:保存为 .wav)
  39. # self.wav = wave.open(f"{recording_file}", "wb")
  40. # self.wav.setnchannels(1)
  41. # self.wav.setsampwidth(2) # 假设每个样本是 16 位(2 字节)
  42. # self.wav.setframerate(16000)
  43. print("MyAudioMediaPort::come in, debugger")
  44. self.call = call
  45. self.aud_med = aud_med
  46. self.asr = asr
  47. self.user_asr_texts = []
  48. self.cur_player_file = None
  49. self.wait_time = None
  50. self.play_start_time = None # 记录播放开始时间
  51. self.play_complete_flag = False # 播放完成标志
  52. def onFrameRequested(self, frame):
  53. print("Request audio frame:", frame)
  54. def onFrameReceived(self, frame):
  55. # self.wav.writeframes(bytes(frame.buf))
  56. # print("Received audio frame:", frame.buf, frame.size)
  57. if self.asr: # 如果ASR实例存在,则发送音频数据
  58. self.asr.send_audio(frame.buf)
  59. try:
  60. asr_text = self.get_asr_text()
  61. play_complete = self.is_play_complete()
  62. if asr_text and not play_complete:
  63. self.user_asr_texts.append(asr_text)
  64. if asr_text and play_complete:
  65. self.cur_player_file = None
  66. self.user_asr_texts.append(asr_text)
  67. user_asr_text = asr_text if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts)
  68. self.user_asr_texts.clear()
  69. self.call.chat("2",user_asr_text)
  70. #超时处理
  71. current_time = time.time()
  72. if self.wait_time and self.wait_time != "0" and play_complete and not asr_text:
  73. print('ssdsdsd',play_complete,asr_text)
  74. self.wait_time_check(current_time)
  75. message_queue_size = self.call.message_queue.qsize()
  76. if (message_queue_size > 0 and not self.cur_player_file) or (message_queue_size > 0 and play_complete):
  77. print('onFrameReceived:message_queue_size=', message_queue_size, 'play_complete=', play_complete, asr_text)
  78. self.cur_player_file, self.wait_time= self.get_player_file()
  79. self.call.send_bot_speaker(self.cur_player_file)
  80. # 重置播放完成标志和超时计时器,确保新的播放从头开始计时
  81. self.play_complete_flag = False # 重置播放完成标志
  82. self.play_start_time = time.time() # 重新开始计时
  83. except:
  84. pass
  85. def is_play_complete(self):
  86. if self.cur_player_file:
  87. player_id = murmur3_32(self.cur_player_file)
  88. return self.call.player_complete_dict.get(player_id)
  89. def get_asr_text(self):
  90. try:
  91. asr_text = self.call.user_asr_text_queue.get(block=False)
  92. print('get_asr_text:', asr_text)
  93. return asr_text
  94. except:
  95. pass
  96. def get_player_file(self):
  97. try:
  98. message = self.call.message_queue.get(block=False)
  99. player_file = [item.voice_url for item in message.contents if item.content_type == 'voice']
  100. wait_time = message.wait_time
  101. return player_file, wait_time
  102. except Exception as e:
  103. print(f"sdsd: {e}")
  104. traceback.print_exc()
  105. def wait_time_check(self,current_time):
  106. if not hasattr(self, 'play_complete_flag'):
  107. self.play_complete_flag = False
  108. self.play_start_time = 0
  109. # 播放完成后开始计时
  110. if not self.play_complete_flag:
  111. self.play_complete_flag = True
  112. self.play_start_time = current_time
  113. # 检查超时时间是否已到
  114. if current_time - self.play_start_time > int(self.wait_time):
  115. self.play_complete_flag = False # 重置标志位,避免重复超时
  116. self.call.chat("6", "重新请求文本机器人")
  117. class MyAudioMediaPlayer(pj.AudioMediaPlayer):
  118. def __init__(self, player_id, sink, on_complete=None):
  119. pj.AudioMediaPlayer.__init__(self)
  120. self.player_id = player_id
  121. self.sink = sink
  122. self.on_complete = on_complete
  123. def onEof2(self):
  124. # self.stopTransmit(self.sink)
  125. if self.on_complete:
  126. self.on_complete(self.player_id)
  127. # Subclass to extend the Account and get notifications etc.
  128. class Account(pj.Account):
  129. def __init__(self, agent, user_part, **kwargs):
  130. pj.Account.__init__(self)
  131. self.agent = agent
  132. self.user_part = user_part
  133. self.calls = {}
  134. self.kwargs = kwargs
  135. def onRegState(self, prm):
  136. ai = self.getInfo()
  137. print("***OnRegState: " + prm.reason, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText)
  138. def onIncomingCall(self, prm):
  139. ai = self.getInfo()
  140. print("***onIncomingCall: ", prm.callId, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText)
  141. call = MyCall(self.agent, self, self.user_part, prm.callId, **self.kwargs)
  142. call_op_param = pj.CallOpParam(True)
  143. call_op_param.statusCode = pj.PJSIP_SC_OK
  144. call.answer(call_op_param)
  145. self.calls[prm.callId] = call
  146. class MyCall(pj.Call):
  147. def __init__(self, agent, acc, user_part, call_id, **kwargs):
  148. pj.Call.__init__(self, acc, call_id)
  149. self.agent = agent
  150. self.user_part = user_part
  151. self.call_id = call_id
  152. self.kwargs = kwargs
  153. self.aud_med = None
  154. self.audio_port = None
  155. self.player = None
  156. self.asr = None
  157. # self.scripts = build_demo_script()
  158. self.user_asr_text_queue = queue.Queue(maxsize=100)
  159. self.message_queue = queue.Queue(maxsize=3)
  160. self.player_complete_dict = {}
  161. from src.core.voip.asr import TestSt
  162. self.asr = TestSt(call_id, message_receiver=self.on_receiver_asr_result) # 创建ASR实例
  163. self.asr.start() # 启动ASR线程
  164. def onDtmfDigit(self, prm):
  165. digit = prm.digit
  166. print(f"Received DTMF digit: {digit}")
  167. self.user_asr_text_queue.put(f"DTMF({digit})DTMF")
  168. def onCallState(self, prm):
  169. call_info = self.getInfo()
  170. print("Call state: ", call_info.stateText)
  171. # pj.PJSIP_INV_STATE_NULL
  172. # pj.PJSIP_INV_STATE_CALLING
  173. # pj.PJSIP_INV_STATE_INCOMING
  174. # pj.PJSIP_INV_STATE_EARLY
  175. # pj.PJSIP_INV_STATE_CONNECTING
  176. # pj.PJSIP_INV_STATE_CONFIRMED
  177. # pj.PJSIP_INV_STATE_DISCONNECTED
  178. if call_info.state == pj.PJSIP_INV_STATE_CONFIRMED:
  179. # 当呼叫状态为已确认(即接通)
  180. self.bot_say_hello()
  181. if call_info.state == pj.PJSIP_INV_STATE_DISCONNECTED:
  182. print("通话结束")
  183. # 远程挂机之后要将分机号回收
  184. self.agent.release(self.user_part)
  185. def onCallMediaState(self, prm):
  186. call_info = self.getInfo()
  187. print("Call Media state: ", call_info.stateText)
  188. for media in call_info.media:
  189. if media.type == pj.PJMEDIA_TYPE_AUDIO and \
  190. (media.status == pj.PJSUA_CALL_MEDIA_ACTIVE):
  191. print("Call Media state 111: ", call_info.stateText)
  192. self.aud_med = self.getAudioMedia(media.index)
  193. try:
  194. # 建立双向通道
  195. self.receive_user_speaker()
  196. # self.bot_say_hello()
  197. except Exception as e:
  198. traceback.print_exc()
  199. def receive_user_speaker(self):
  200. self.audio_port = MyAudioMediaPort(self, self.aud_med, self.asr)
  201. self.audio_port.createPort("Incoming Call Port", build_audio_format())
  202. self.aud_med.startTransmit(self.audio_port)
  203. def send_bot_speaker(self, player_file):
  204. if not player_file :
  205. return
  206. player_id = murmur3_32(player_file)
  207. self.player_complete_dict[player_id] = False
  208. print('self.player_complete_dict[player_id]D:', player_id, player_file, self.player_complete_dict[player_id])
  209. self.player = MyAudioMediaPlayer(player_id, self.aud_med, on_complete=self.on_media_player_complete)
  210. # self.player.createPlayer(player_file[0], pj.PJMEDIA_FILE_NO_LOOP)
  211. self.player.createPlaylist(player_file, f'my_hello_playlist{player_id}', pj.PJMEDIA_FILE_NO_LOOP)
  212. self.player.startTransmit(self.aud_med)
  213. def on_receiver_asr_result(self, message, *args):
  214. print('asr返回内容:',message)
  215. self.user_asr_text_queue.put(message)
  216. def on_media_player_complete(self, player_id):
  217. print('player complete')
  218. self.player_complete_dict[player_id] = True
  219. def bot_say_hello(self):
  220. print('bot_say_hello, come in ')
  221. self.chat('1', user_asr_text="SAY_HELLO")
  222. def chat(self,event_type, user_asr_text=None):
  223. # 调用文本机器人接口
  224. ToTextBotAgent(event_type,user_asr_text,self)
  225. class ToTextBotAgent:
  226. def __init__(self, event_type, user_asr_text, call_agent):
  227. if not user_asr_text:
  228. print("ASR文本为空,终止执行。")
  229. return
  230. self.call_agent = call_agent
  231. self.request_data = BotChatRequest(
  232. node_id="1",
  233. user_id="139311",
  234. session_id="1",
  235. record_id="2",
  236. task_id="ceshi",
  237. asr_text=user_asr_text
  238. )
  239. # 发送请求并处理响应
  240. self.test_request(self.request_data,event_type)
  241. def to_hang(self, action_content):
  242. user_part = self.call_agent.user_part_pool.get()
  243. self.call_agent.hangup(user_part, action_content)
  244. def handling_message(self, message: ChatMessage):
  245. print('handling_release', message)
  246. if message:
  247. action = message.action
  248. action_code = action.action_code
  249. if action_code == 'hang': # 挂断
  250. action_content = action.action_content
  251. self.to_hang(action_content)
  252. elif action_code == 'transfer': # 转人工
  253. print('todo 转人工')
  254. elif action_code == 'normal': # 正常通话
  255. self.call_agent.message_queue.put(message)
  256. print(f"成功获取响应: ActionCode: {message.wait_time}")
  257. else:
  258. print("文本机器人接口调用失败")
  259. def to_quest(self, request: BotChatRequest):
  260. # 将实体类转换为JSON字符串
  261. headers = {'Content-Type': 'application/json'}
  262. request_json = request.to_json_string()
  263. # 发送POST请求
  264. try:
  265. response = requests.post('http://example.com/api', data=request_json, headers=headers) # 使用占位URL
  266. if response.status_code == 200:
  267. # 成功,解析响应JSON
  268. response_data = response.json()
  269. return ChatResponse.from_json(json.dumps(response_data)) # 转换为JSON字符串并解析
  270. else:
  271. # 错误处理
  272. print(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
  273. return None
  274. except requests.RequestException as e:
  275. traceback.print_exc()
  276. return None
  277. # 模拟接口请求返回
  278. def test_request(self, params: BotChatRequest,event_type):
  279. print("test_request::params=", params)
  280. response_data = {
  281. "node_id": "1.0",
  282. "contents": [],
  283. "interruptable": False,
  284. "wait_time": "6",
  285. "action": {
  286. "action_code": "normal",
  287. "action_content": "正常通话"
  288. },
  289. "talk_time_out": ""
  290. }
  291. print("event_type:", event_type)
  292. if event_type == '1':
  293. response_data['contents'].append({
  294. "content_type": "voice",
  295. "content": "",
  296. "voice_url": '/code/src/core/voip/scripts/1_00.wav',
  297. "voice_content": "五一北京到上海的高铁票还有吗?"
  298. })
  299. elif event_type == '2':
  300. new_contents = [
  301. {
  302. "content_type": "voice",
  303. "content": "",
  304. "voice_url": '/code/src/core/voip/test111.wav',
  305. "voice_content": "测试第二个录音文件"
  306. },
  307. {
  308. "content_type": "voice",
  309. "content": "",
  310. "voice_url": '/code/src/core/voip/test222.wav',
  311. "voice_content": "五一北京到上海的高铁票还有吗?"
  312. }
  313. ]
  314. response_data['contents']= new_contents
  315. elif event_type == '6':
  316. response_data['contents'].append({
  317. "content_type": "voice",
  318. "content": "",
  319. "voice_url": '/code/src/core/voip/scripts/4_00.wav',
  320. "voice_content": "sds"
  321. })
  322. try:
  323. print(json.dumps(response_data['contents']))
  324. parsed_response = ChatMessage.from_json(response_data)
  325. self.handling_message(parsed_response)
  326. except Exception as e:
  327. print(f"Error in test_request: {e}")
  328. traceback.print_exc() # 打印完整的错误信息
  329. return None
  330. class BotAgent:
  331. def __init__(self, logger, user_part_range=range(1001, 1019), host="192.168.100.195", port="5060", password="slibra@#123456"):
  332. self.logger = logger
  333. self.user_part_range, self.host, self.port, self.password = user_part_range, host, port, password
  334. self.user_part_pool = queue.Queue(maxsize=len(user_part_range))
  335. self.accounts = {}
  336. self.calls = {}
  337. self.ep = pj.Endpoint()
  338. self.is_stopping = False
  339. threading.Thread(target=self.create_pjsua2, args=()).start()
  340. def create_pjsua2(self):
  341. # Create and initialize the library
  342. ep_cfg = pj.EpConfig()
  343. ep_cfg.uaConfig.threadCnt = 32
  344. ep_cfg.uaConfig.mainThreadOnly = False
  345. ep_cfg.uaConfig.maxCalls = 20
  346. ep_cfg.uaConfig.maxAccounts = 20
  347. ep_cfg.medConfig.noVad = True
  348. ep_cfg.logConfig.level = 3
  349. ep_cfg.logConfig.consoleLevel = 3
  350. self.ep.libCreate()
  351. self.ep.libInit(ep_cfg)
  352. aud_dev_mgr = self.ep.audDevManager()
  353. aud_dev_mgr.setNullDev() # 使用虚拟音频设备(如果没有实际设备)
  354. # Set up media configuration, particularly jitter buffer
  355. media_cfg = pj.MediaConfig()
  356. media_cfg.jbMinPre = 4 # Minimum pre-fetch frames
  357. media_cfg.jbMaxPre = 16 # Maximum pre-fetch frames
  358. media_cfg.noVad = True # Disable Voice Activity Detection if needed
  359. self.ep.medConfig = media_cfg # Apply media config to endpoint
  360. # Create SIP transport. Error handling sample is shown
  361. sipTpConfig = pj.TransportConfig()
  362. sipTpConfig.port = 30506
  363. self.ep.transportCreate(pj.PJSIP_TRANSPORT_UDP, sipTpConfig)
  364. # Start the library
  365. self.ep.libStart()
  366. for user_part in self.user_part_range:
  367. acfg = pj.AccountConfig()
  368. acfg.idUri = f"sip:{user_part}@{self.host}:{self.port}"
  369. acfg.regConfig.registrarUri = f"sip:{self.host}:{self.port}"
  370. cred = pj.AuthCredInfo("digest", "*", f"{user_part}", 0, self.password)
  371. acfg.sipConfig.authCreds.append(cred)
  372. acfg.regConfig.timeoutSec = 86400 # 注册超时时间(秒)
  373. acfg.regConfig.retryIntervalSec = 10 # 重试间隔时间(秒)
  374. acfg.regConfig.firstRetryIntervalSec = 10 # 首次重试间隔时间(秒)
  375. # acfg.natConfig.iceEnabled = True
  376. # acfg.natConfig.turnEnabled = True
  377. # acfg.natConfig.turnServer = "stun:pbx.fuxicarbon.com:3478"
  378. # acfg.natConfig.turnUsername = "username"
  379. # acfg.natConfig.turnPassword = "password"
  380. # Create the account
  381. acc = Account(self, user_part)
  382. acc.create(acfg)
  383. self.user_part_pool.put(user_part)
  384. self.accounts[user_part] = acc
  385. while not self.is_stopping:
  386. self.ep.libHandleEvents(100)
  387. def hangup(self, user_part, reason="NORMAL_CLEARING", **sip_headers):
  388. call_op_param = pj.CallOpParam(True)
  389. call_op_param.statusCode = pj.PJSIP_SC_OK
  390. call_op_param.reason = reason
  391. for k, v in sip_headers:
  392. call_op_param.headers.append(pj.SipHeader(f"sip_h_{k}", v))
  393. acc = self.accounts.get(user_part)
  394. if acc:
  395. for k, v in acc.calls.items():
  396. v.hangup(call_op_param)
  397. # 机器人主动挂机回收分机号
  398. self.release(user_part)
  399. def register(self, **kwargs):
  400. user_part = self.user_part_pool.get()
  401. acc = self.accounts.get(user_part)
  402. self.logger.info('register, user_part :%d, pool.size :%d', user_part, self.user_part_pool.qsize())
  403. if acc:
  404. print('register==========>', acc.getId())
  405. # ps = pj.PresenceStatus()
  406. # ps.status = pj.PJSUA_BUDDY_STATUS_ONLINE
  407. # ps.activity = pj.PJRPID_ACTIVITY_AWAY
  408. # ps.note = "Away"
  409. # acc.setOnlineStatus(ps)
  410. # acc.setRegistration(renew=True)
  411. acc.kwargs = kwargs
  412. return user_part
  413. def unregister(self, user_part):
  414. acc = self.accounts.get(user_part)
  415. if acc:
  416. acc.setRegistration(renew=False)
  417. # 用户远程挂机回收分机号
  418. self.release(user_part)
  419. def release(self, user_part):
  420. if not user_part:
  421. self.logger.info("release, user_part is None")
  422. return
  423. def element_in_queue(q, element):
  424. with q.mutex: # 确保线程安全
  425. for item in list(q.queue): # 将队列转换为列表进行遍历
  426. if item == element:
  427. return True
  428. return False
  429. if element_in_queue(self.user_part_pool, user_part):
  430. self.logger.info("release, already exists, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize())
  431. return
  432. self.user_part_pool.put(user_part)
  433. self.logger.info("release, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize())
  434. def destroy(self):
  435. self.is_stopping = True
  436. # Destroy the library
  437. self.ep.libDestroy()
  438. def __del__(self):
  439. self.destroy()
  440. # if __name__ == '__main__':
  441. # import logging
  442. # logger = logging.getLogger('voip')
  443. # bot = BotAgent(logger)