bot.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import os
  4. import time
  5. import json
  6. import wave
  7. import queue
  8. import threading
  9. import traceback
  10. import sys
  11. import pjsua2 as pj
  12. from enum import Enum
  13. from src.core.callcenter.cache import Cache
  14. from src.core.datasource import SIP_SERVER, SERVE_HOST
  15. from src.core.voip.constant import *
  16. import requests
  17. from src.core.callcenter.api import BotChatRequest,ChatMessage
  18. from src.core import singleton_keys
  19. from src.core.callcenter.snowflake import Snowflake
  20. calls = {}
  21. # recording_file = '/code/src/core/voip/incoming_call.wav'
  22. # player_file1 = '/code/src/core/voip/test111.wav'
  23. # player_file2 = '/code/src/core/voip/test222.wav'
  24. class BotStatus(Enum):
  25. # 等待用户讲话,未超时
  26. wait_speaking_not_timeout = 1
  27. # 等待用户讲话,已超时但用户仍在持续讲话
  28. wait_speaking_timeout_user_speaking = 2
  29. # 机器人讲话过程中
  30. speaking = 3
  31. # 等待用户讲话结束
  32. wait_user_speaking_end = 4
  33. # 等待大模型输出
  34. wait_gpt_return = 5
  35. class UserStatus(Enum):
  36. # ASR返回值为sentenceBegin后将状态置为Speaking
  37. speaking = 1
  38. # ASR返回值为sentenceEnd后,将状态置为Silence
  39. silence = 2
  40. class MyAudioMediaPort(pj.AudioMediaPort):
  41. def __init__(self, call, aud_med=None, asr=None):
  42. pj.AudioMediaPort.__init__(self)
  43. # # 打开一个 .pcm 文件来保存音频流(可选:保存为 .wav)
  44. # self.wav = wave.open(f"{recording_file}", "wb")
  45. # self.wav.setnchannels(1)
  46. # self.wav.setsampwidth(2) # 假设每个样本是 16 位(2 字节)
  47. # self.wav.setframerate(16000)
  48. print("MyAudioMediaPort::come in, debugger")
  49. self.call = call
  50. self.aud_med = aud_med
  51. self.asr = asr
  52. self.user_asr_texts = []
  53. def onFrameRequested(self, frame):
  54. print("Request audio frame:", frame)
  55. def onFrameReceived(self, frame):
  56. # self.wav.writeframes(bytes(frame.buf))
  57. # print("Received audio frame:", frame.buf, frame.size)
  58. if self.asr: # 如果ASR实例存在,则发送音频数据
  59. self.asr.send_audio(frame.buf)
  60. try:
  61. asr_text = self.get_asr_text()
  62. play_complete = self.call.is_play_complete()
  63. current_time = time.time() # 实时当前时间
  64. if self.call.inputType == '1.0':
  65. time_difference = int(current_time - self.call.inputLongStart)
  66. # print('current_time - self.call.inputLongStart:',time_difference > 35, self.call.txtLock , play_complete)
  67. if time_difference > 35 and play_complete:
  68. self.user_asr_texts.append(f"DTMF({self.call.digit})DTMF")
  69. user_asr_text = self.user_asr_texts[0] if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts)
  70. self.user_asr_texts.clear()
  71. self.call.chat(user_asr_text)
  72. # print("测试超长", user_asr_text)
  73. elif asr_text:
  74. self.user_asr_texts.append(asr_text)
  75. if time_difference > int(self.call.wait_time):
  76. self.call.reset_wait_time()
  77. else:
  78. if asr_text and not play_complete:
  79. self.user_asr_texts.append(asr_text)
  80. if (asr_text and play_complete) or (play_complete and self.user_asr_texts):
  81. if asr_text:
  82. self.user_asr_texts.append(asr_text)
  83. user_asr_text = self.user_asr_texts[0] if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts)
  84. self.user_asr_texts.clear()
  85. self.call.chat(user_asr_text)
  86. if self.call.wait_time and self.call.wait_time != "0" and play_complete and not asr_text:
  87. self.call.wait_time_check(current_time, self.call.wait_time)
  88. message_queue_size = self.call.message_queue.qsize()
  89. if (message_queue_size > 0 and not self.call.cur_player_file) or (message_queue_size > 0 and play_complete):
  90. print('onFrameReceived:message_queue_size=', message_queue_size, 'play_complete=', play_complete, asr_text)
  91. self.call.cur_player_file, self.call.wait_time, self.call.inputType,self.call.action, self.call.node_id = self.get_player_file()
  92. # 重置播放完成标志和超时计时器,确保新的播放从头开始计时
  93. self.call.reset_wait_time()
  94. self.call.send_bot_speaker(self.call.cur_player_file)
  95. except:
  96. pass
  97. def get_asr_text(self):
  98. try:
  99. asr_text = self.call.user_asr_text_queue.get(block=False)
  100. print('get_asr_text:', asr_text)
  101. return asr_text
  102. except:
  103. pass
  104. def get_player_file(self):
  105. try:
  106. message = self.call.message_queue.get(block=False)
  107. player_file = [item.voice_url for item in message.contents if item.content_type == 'voice']
  108. print('get_player_file:', player_file,message.wait_time, message.inputType, message.action, message.node_id)
  109. return player_file, message.wait_time, message.inputType, message.action, message.node_id
  110. except Exception as e:
  111. traceback.print_exc()
  112. class MyAudioMediaPlayer(pj.AudioMediaPlayer):
  113. def __init__(self, player_id, sink, on_complete=None):
  114. pj.AudioMediaPlayer.__init__(self)
  115. self.player_id = player_id
  116. self.sink = sink
  117. self.on_complete = on_complete
  118. def onEof2(self):
  119. # self.stopTransmit(self.sink)
  120. if self.on_complete:
  121. self.on_complete(self.player_id)
  122. # Subclass to extend the Account and get notifications etc.
  123. class Account(pj.Account):
  124. def __init__(self, agent, user_part, **kwargs):
  125. pj.Account.__init__(self)
  126. self.agent = agent
  127. self.user_part = user_part
  128. self.calls = {}
  129. self.kwargs = kwargs
  130. def onRegState(self, prm):
  131. ai = self.getInfo()
  132. print("***OnRegState: " + prm.reason, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText)
  133. def onIncomingCall(self, prm):
  134. ai = self.getInfo()
  135. print("***onIncomingCall: ", prm.callId, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText)
  136. call = MyCall(self.agent, self, self.user_part, prm.callId, **self.kwargs)
  137. call_op_param = pj.CallOpParam(True)
  138. call_op_param.statusCode = pj.PJSIP_SC_OK
  139. call.answer(call_op_param)
  140. self.calls[prm.callId] = call
  141. class MyCall(pj.Call):
  142. def __init__(self, agent, acc, user_part, call_id, **kwargs):
  143. pj.Call.__init__(self, acc, call_id)
  144. self.agent = agent
  145. self.user_part = user_part
  146. self.call_id = call_id
  147. self.kwargs = kwargs
  148. self.aud_med = None
  149. self.audio_port = None
  150. self.player = None
  151. self.asr = None
  152. self.session_id = kwargs.get('variable_sip_h_P-LIBRA-CallId')
  153. self.device_id = kwargs.get('variable_sip_h_P-LIBRA-DeviceId')
  154. print("self.session_id:", self.session_id)
  155. # self.scripts = build_demo_script()
  156. self.user_asr_text_queue = queue.Queue(maxsize=100)
  157. self.message_queue = queue.Queue(maxsize=3)
  158. self.player_complete_dict = {}
  159. self.wait_time = None
  160. self.inputType = None #记录按键类型 1为长按键类型
  161. self.digit = '' # 存储长按键内容
  162. self.action = None
  163. self.node_id= 'start'
  164. self.cur_player_file = None #当前播放的文件
  165. from src.core.voip.asr import TestSt
  166. self.asr = TestSt(call_id, message_receiver=self.on_receiver_asr_result) # 创建ASR实例
  167. self.asr.start() # 启动ASR线程
  168. self.call_phone, self.callIdString = self.get_phone()
  169. # 超时设置
  170. self.play_start_time = time.time() # 倒计时开始时间
  171. self.play_complete_flag = False # 倒计时开始标志
  172. self.txtLock = False
  173. self.inputLongStart = time.time() #长按键开始时间
  174. def wait_time_check(self, current_time, wait_time):
  175. try:
  176. # 确保 wait_time 是整数类型
  177. wait_time = int(wait_time)
  178. # 如果播放尚未完成,重置标志并返回
  179. if not self.play_complete_flag:
  180. self.play_complete_flag = True
  181. self.play_start_time = current_time
  182. # print(f"开始计时: {self.play_start_time}")
  183. elapsed_time = int(current_time - self.play_start_time)
  184. # print(f"当前时间: {current_time}, 已过时间: {elapsed_time}, 最大等待时间: {wait_time}")
  185. if elapsed_time > wait_time:
  186. # self.user_asr_text_queue.put("ASR408error")
  187. self.chat('ASR408error')
  188. except ValueError as e:
  189. print(f"无效的等待时间参数: {wait_time}, 错误: {e}")
  190. self.reset_wait_time()
  191. def reset_wait_time(self):
  192. self.play_complete_flag = False # 重置播放完成标志
  193. self.play_start_time = None # 重置开始计时时间
  194. def get_phone(self):
  195. import re
  196. call_info = self.getInfo()
  197. match = re.match(r'"(\d+)" <sip:(\d+)@', call_info.remoteUri)
  198. if match:
  199. print("Phone Number:", match.group(1))
  200. return match.group(1), call_info.callIdString # 假设显示名称部分是手机号
  201. else:
  202. return "", call_info.callIdString
  203. def is_play_complete(self): #语音机器人是否播放结束
  204. if self.cur_player_file:
  205. player_id = murmur3_32(self.cur_player_file)
  206. return self.player_complete_dict.get(player_id)
  207. def onDtmfDigit(self, prm):
  208. # 判断是否播放完成 否则不记录用户说的内容
  209. if not self.is_play_complete():
  210. return
  211. digit = prm.digit
  212. self.reset_wait_time()
  213. # 假设为超长类型按键 把用户输入的按键进行拼接 如果为# 则把用户输入所有按键放入队列并发送文本机器人
  214. # 如果为非正常按键服务 输入以后直接发送文本机器人
  215. if self.inputType == '1.0':
  216. if digit != '#':
  217. self.digit += digit
  218. elif digit == '#':
  219. # self.user_asr_text_queue.put(f"DTMF({self.digit})DTMF")
  220. self.chat(f"DTMF({self.digit})DTMF")
  221. else:
  222. self.user_asr_text_queue.put(f"DTMF({digit})DTMF")
  223. def onCallState(self, prm):
  224. call_info = self.getInfo()
  225. print("Call state: %s, call id: %s, callcallIdString: %s ", call_info.state, call_info.id, call_info.callIdString)
  226. # pj.PJSIP_INV_STATE_NULL
  227. # pj.PJSIP_INV_STATE_CALLING
  228. # pj.PJSIP_INV_STATE_INCOMING
  229. # pj.PJSIP_INV_STATE_EARLY
  230. # pj.PJSIP_INV_STATE_CONNECTING
  231. # pj.PJSIP_INV_STATE_CONFIRMED
  232. # pj.PJSIP_INV_STATE_DISCONNECTED
  233. if call_info.state == pj.PJSIP_INV_STATE_CONFIRMED:
  234. # 当呼叫状态为已确认(即接通)
  235. self.bot_say_hello()
  236. if call_info.state == pj.PJSIP_INV_STATE_DISCONNECTED:
  237. print("通话结束", self.user_part)
  238. if self.audio_port:
  239. self.audio_port = None # 或调用相关销毁方法
  240. if self.player:
  241. self.player = None # 或调用播放器停止方法
  242. # 远程挂机之后要将分机号回收
  243. self.agent.release(self.user_part)
  244. def onCallMediaState(self, prm):
  245. call_info = self.getInfo()
  246. # print("Call Media state: ", call_info.stateText)
  247. for media in call_info.media:
  248. if media.type == pj.PJMEDIA_TYPE_AUDIO and \
  249. (media.status == pj.PJSUA_CALL_MEDIA_ACTIVE):
  250. print("Call Media state 111: ", call_info.stateText)
  251. self.aud_med = self.getAudioMedia(media.index)
  252. try:
  253. # 建立双向通道
  254. self.receive_user_speaker()
  255. # self.bot_say_hello()
  256. except Exception as e:
  257. traceback.print_exc()
  258. def receive_user_speaker(self):
  259. self.audio_port = MyAudioMediaPort(self, self.aud_med, self.asr)
  260. self.audio_port.createPort("Incoming Call Port", build_audio_format())
  261. self.aud_med.startTransmit(self.audio_port)
  262. def send_bot_speaker(self, player_file):
  263. if not player_file :
  264. return
  265. player_id = murmur3_32(player_file)
  266. self.player_complete_dict[player_id] = False
  267. # print('self.player_complete_dict[player_id]D:', player_id, player_file, self.player_complete_dict[player_id])
  268. print(f"[DEBUG] Sending bot speaker, player_file: {player_file}, player_id: {player_id}")
  269. self.player = MyAudioMediaPlayer(player_id, self.aud_med, on_complete=self.on_media_player_complete)
  270. # self.player.createPlayer(player_file[0], pj.PJMEDIA_FILE_NO_LOOP)
  271. self.player.createPlaylist(player_file, f'my_hello_playlist{player_id}', pj.PJMEDIA_FILE_NO_LOOP)
  272. self.player.startTransmit(self.aud_med)
  273. def on_receiver_asr_result(self, message, *args):
  274. # 判断是否播放完成 否则不记录用户说的内容
  275. if not self.is_play_complete():
  276. return
  277. message = json.loads(message)
  278. if message["header"]["status"] == 20000000:
  279. if message["header"]["name"] == "SentenceEnd":
  280. result = message["payload"]["result"]
  281. print("asr返回内容Result:", result)
  282. self.user_asr_text_queue.put(result)
  283. elif message["header"]["name"] == "TranscriptionResultChanged":
  284. self.reset_wait_time()
  285. else:
  286. print(f"Status is not {message['header']['status']}")
  287. def on_media_player_complete(self, player_id):
  288. print('player complete')
  289. self.player_complete_dict[player_id] = True
  290. self.digit = ''
  291. self.inputLongStart = time.time()
  292. #播放完毕执行的动作
  293. self.say_end_action(self.action)
  294. def bot_say_hello(self):
  295. # print('bot_say_hello, come in ')
  296. self.chat(user_asr_text="start")
  297. def chat(self, user_asr_text=None):
  298. # 调用文本机器人接口
  299. ToTextBotAgent(user_asr_text,self)
  300. def say_end_action(self, action):
  301. print('handling_release', action.action_code)
  302. action_code = action.action_code
  303. if action_code == 'hang': # 挂断
  304. action_content = action.action_content
  305. print(f'todo 挂电话:{action_content}')
  306. self.agent.hangup(self.user_part)
  307. elif action_code == 'transfer': # 转人工
  308. print('todo 转人工')
  309. self.agent.transfer(self.user_part, self.session_id, self.device_id)
  310. class ToTextBotAgent:
  311. def __init__(self, user_asr_text, call_agent):
  312. if not user_asr_text or (call_agent.action and call_agent.action.action_code != 'normal'):
  313. # print("ASR文本为空,终止执行。")
  314. return
  315. self.call_agent = call_agent
  316. self.request_data = BotChatRequest(
  317. nodeId=self.call_agent.node_id,
  318. userId=self.call_agent.call_phone,
  319. sessionId= self.call_agent.session_id,
  320. recordId="",
  321. taskId="10001",
  322. asrText=user_asr_text,
  323. ext= None
  324. )
  325. print("user_asr_text发送结果:", user_asr_text)
  326. # 发送请求并处理响应
  327. # self.test_request(self.request_data)
  328. self.to_quest(self.request_data)
  329. def to_quest(self, request: BotChatRequest):
  330. # if self.call_agent.txtLock:
  331. # return
  332. # 将实体类转换为JSON字符串
  333. headers = {'Content-Type': 'application/json'}
  334. request_data = request.to_json_string()
  335. url = f"http://{SERVE_HOST}:40072/botservice"
  336. # self.call_agent.txtLock = True
  337. # 发送POST请求
  338. print(f"请求数据:{request_data},url:{url}")
  339. try:
  340. response = requests.post(url=url, json=json.loads(request_data), headers=headers, timeout=10) # 使用占位URL
  341. print(f"原始响应内容:{response.text}")
  342. if response.status_code == 200:
  343. response_data = response.json()
  344. if "data" in response_data and response_data["code"]==0:
  345. data = response_data["data"]
  346. parsed_response = ChatMessage.from_json(data)
  347. self.call_agent.message_queue.put(parsed_response)
  348. sys.stdout.flush() # 强制刷新输出缓冲区
  349. else:
  350. print("响应中没有 'data' 字段")
  351. else:
  352. # 错误处理
  353. print(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
  354. except requests.RequestException as e:
  355. print(f"请求发生异常: {e}")
  356. # 模拟接口请求返回
  357. def test_request(self, params: BotChatRequest):
  358. # if self.call_agent.txtLock:
  359. # return
  360. print("test_request::params=", params)
  361. # self.call_agent.txtLock = True
  362. response_data = {
  363. "node_id": "1.0",
  364. "contents": [],
  365. "wait_time": "6",
  366. "action": {
  367. "action_code": "normal",
  368. "action_content": "正常通话"
  369. },
  370. "inputType": "0"
  371. }
  372. print("asrText:", params.asrText)
  373. if params.asrText == 'start': #欢迎语
  374. response_data['contents'].append({
  375. "content_type": "voice",
  376. "content": "",
  377. "voice_url": '/code/src/core/voip/scripts/1_00.wav',
  378. "voice_content": "五一北京到上海的高铁票还有吗?"
  379. })
  380. response_data['inputType'] = '1.0'
  381. elif params.asrText == 'ASR408error': #超时执行
  382. response_data['contents'].append({
  383. "content_type": "voice",
  384. "content": "",
  385. "voice_url": '/code/src/core/voip/scripts/4_00.wav',
  386. "voice_content": "waitTime超时"
  387. })
  388. elif "DTMF" in params.asrText and self.call_agent.inputType =='1.0': #长按键超30s执行
  389. response_data['contents'].append({
  390. "content_type": "voice",
  391. "content": "",
  392. "voice_url": '/code/src/core/voip/scripts/2_00.wav',
  393. "voice_content": "sds"
  394. })
  395. else :
  396. response_data['contents'] = [
  397. {
  398. "content_type": "voice",
  399. "content": "",
  400. "voice_url": '/code/src/core/voip/test111.wav',
  401. "voice_content": "测试第二个录音文件"
  402. },
  403. {
  404. "content_type": "voice",
  405. "content": "",
  406. "voice_url": '/code/src/core/voip/test222.wav',
  407. "voice_content": "五一北京到上海的高铁票还有吗?"
  408. }
  409. ]
  410. try:
  411. print(json.dumps(response_data['contents']))
  412. parsed_response = ChatMessage.from_json(response_data)
  413. self.call_agent.message_queue.put(parsed_response)
  414. except Exception as e:
  415. print(f"Error in test_request: {e}")
  416. traceback.print_exc() # 打印完整的错误信息
  417. return None
  418. @singleton_keys
  419. class BotAgent:
  420. def __init__(self, app, user_part_range=range(1001, 1011), host=SIP_SERVER, port="5060", password="slibra@#123456"):
  421. self.logger = app.logger
  422. self.user_part_range, self.host, self.port, self.password = user_part_range, host, port, password
  423. self.user_part_pool = queue.Queue(maxsize=len(user_part_range))
  424. self.accounts = {}
  425. self.calls = {}
  426. self.ep = pj.Endpoint()
  427. self.is_stopping = False
  428. self.acd_service = None
  429. self.cache = Cache(app)
  430. threading.Thread(target=self.create_pjsua2, daemon=True).start()
  431. def create_pjsua2(self):
  432. # Create and initialize the library
  433. ep_cfg = pj.EpConfig()
  434. ep_cfg.uaConfig.threadCnt = 12
  435. ep_cfg.uaConfig.mainThreadOnly = False
  436. ep_cfg.uaConfig.maxCalls = 20
  437. ep_cfg.uaConfig.maxAccounts = 20
  438. ep_cfg.medConfig.noVad = True
  439. ep_cfg.logConfig.level = 4
  440. ep_cfg.logConfig.consoleLevel = 4
  441. self.ep.libCreate()
  442. self.ep.libInit(ep_cfg)
  443. aud_dev_mgr = self.ep.audDevManager()
  444. aud_dev_mgr.setNullDev() # 使用虚拟音频设备(如果没有实际设备)
  445. # Set up media configuration, particularly jitter buffer
  446. media_cfg = pj.MediaConfig()
  447. media_cfg.jbMinPre = 4 # Minimum pre-fetch frames
  448. media_cfg.jbMaxPre = 16 # Maximum pre-fetch frames
  449. media_cfg.noVad = True # Disable Voice Activity Detection if needed
  450. self.ep.medConfig = media_cfg # Apply media config to endpoint
  451. # Create SIP transport. Error handling sample is shown
  452. sipTpConfig = pj.TransportConfig()
  453. sipTpConfig.port = 30506
  454. self.ep.transportCreate(pj.PJSIP_TRANSPORT_UDP, sipTpConfig)
  455. # Start the library
  456. self.ep.libStart()
  457. for user_part in self.user_part_range:
  458. acfg = pj.AccountConfig()
  459. acfg.idUri = f"sip:{user_part}@{self.host}:{self.port}"
  460. acfg.regConfig.registrarUri = f"sip:{self.host}:{self.port}"
  461. cred = pj.AuthCredInfo("digest", "*", f"{user_part}", 0, self.password)
  462. acfg.sipConfig.authCreds.append(cred)
  463. acfg.regConfig.timeoutSec = 86400 # 注册超时时间(秒)
  464. acfg.regConfig.retryIntervalSec = 10 # 重试间隔时间(秒)
  465. acfg.regConfig.firstRetryIntervalSec = 10 # 首次重试间隔时间(秒)
  466. acfg.natConfig.iceEnabled = True
  467. acfg.natConfig.turnEnabled = True
  468. acfg.natConfig.turnServer = "stun:pbx.fuxicarbon.com:3478"
  469. # acfg.natConfig.turnUsername = "username"
  470. # acfg.natConfig.turnPassword = "password"
  471. # Create the account
  472. acc = Account(self, user_part=user_part)
  473. acc.create(acfg)
  474. self.user_part_pool.put(user_part)
  475. self.accounts[user_part] = acc
  476. while not self.is_stopping:
  477. self.ep.libHandleEvents(100)
  478. def transfer(self, user_part, call_id, device_id, service_id='00000000000000000'):
  479. if self.acd_service:
  480. self.acd_service.transfer_to_agent(call_id, device_id, service_id, user_part)
  481. # sip_headers = {'P-LIBRA-HangUpReason': 'transferToAgent', 'P-LIBRA-ServiceId': service_id}
  482. try_count = 10
  483. while try_count >0:
  484. if self.cache.get_need_play_hold_music(call_id):
  485. self.hangup(user_part)
  486. self.cache.del_need_play_hold_music(call_id)
  487. break
  488. time.sleep(1)
  489. def hangup(self, user_part, reason="NORMAL_CLEARING", **sip_headers):
  490. call_op_param = pj.CallOpParam(True)
  491. call_op_param.statusCode = pj.PJSIP_SC_OK
  492. call_op_param.reason = reason
  493. call_op_param.txOption = pj.SipTxOption()
  494. sip_header_vector = pj.SipHeaderVector()
  495. for k, v in sip_headers.items():
  496. _sip_header = pj.SipHeader()
  497. _sip_header.hName = str(k)
  498. _sip_header.hValue = str(v)
  499. print('hangup, header_name=%s, header_value=%s'%(k, v))
  500. sip_header_vector.push_back(_sip_header)
  501. call_op_param.txOption.headers = sip_header_vector
  502. acc = self.accounts.get(user_part)
  503. if acc:
  504. for k, v in acc.calls.items():
  505. v.hangup(call_op_param)
  506. # 机器人主动挂机回收分机号
  507. self.release(user_part)
  508. def register(self, **kwargs):
  509. user_part = self.user_part_pool.get()
  510. acc = self.accounts.get(user_part)
  511. self.logger.info('register, user_part :%d, pool.size :%d', user_part, self.user_part_pool.qsize())
  512. if acc:
  513. print('register==========>', acc.getId())
  514. # ps = pj.PresenceStatus()
  515. # ps.status = pj.PJSUA_BUDDY_STATUS_ONLINE
  516. # ps.activity = pj.PJRPID_ACTIVITY_AWAY
  517. # ps.note = "Away"
  518. # acc.setOnlineStatus(ps)
  519. # acc.setRegistration(renew=True)
  520. acc.kwargs = kwargs
  521. return user_part
  522. def unregister(self, user_part):
  523. acc = self.accounts.get(user_part)
  524. if acc:
  525. acc.setRegistration(renew=False)
  526. # 用户远程挂机回收分机号
  527. self.release(user_part)
  528. def release(self, user_part):
  529. if not user_part:
  530. self.logger.info("release, user_part is None")
  531. return
  532. def element_in_queue(q, element):
  533. with q.mutex: # 确保线程安全
  534. for item in list(q.queue): # 将队列转换为列表进行遍历
  535. if item == element:
  536. return True
  537. return False
  538. if element_in_queue(self.user_part_pool, user_part):
  539. self.logger.info("release, already exists, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize())
  540. return
  541. self.user_part_pool.put(user_part)
  542. self.logger.info("release, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize())
  543. def destroy(self):
  544. self.is_stopping = True
  545. # Destroy the library
  546. self.ep.libDestroy()
  547. def __del__(self):
  548. self.destroy()
  549. # if __name__ == '__main__':
  550. # import logging
  551. # logger = logging.getLogger('voip')
  552. # bot = BotAgent(logger)