DavidLiu 3 ay önce
ebeveyn
işleme
746928a673

+ 12 - 1
src/core/callcenter/cache.py

@@ -218,4 +218,15 @@ class Cache:
     def set_register_per_hours(self, expire=86400):
         hour = datetime.now().strftime('%Y%m%d%H')
         key = BOT_REGISTER_PER_HOURS %hour
-        return self.redis_handler.redis.set(key, "1", ex=expire, nx=True)
+        return self.redis_handler.redis.set(key, "1", ex=expire, nx=True)
+
+    def get_pjsua_thread_lock(self):
+        minute = datetime.now().strftime('%Y%m%d%H%M')
+        key = BOT_PJSUA_THREAD_LOCK % minute
+        return self.redis_handler.redis.get(key)
+
+    def set_pjsua_thread_lock(self, expire=60):
+        minute = datetime.now().strftime('%Y%m%d%H%M')
+        key = BOT_PJSUA_THREAD_LOCK % minute
+        return self.redis_handler.redis.set(key, "1", ex=expire, nx=True)
+

+ 1 - 0
src/core/callcenter/constant.py

@@ -90,6 +90,7 @@ AFTER_PLAY_HOLD_MUSIC = "CTI:ENGINE:AFTER:HOLD:%s"
 CTI_MANAGE_CENTER_CALL_END_KEY = "CTI:MANAGE:CENTER:CALL:END:KEY:%s"
 CTI_AGENT_MANUAL_ANSWER = "AGENT:MANUAL:ANSWER:%s:%s"
 BOT_REGISTER_PER_HOURS = "BOT:REGISTER:PER_HOURS:%s"
+BOT_PJSUA_THREAD_LOCK = "BOT:PJSUA:THREAD:LOCK:%s"
 
 def get_json_dict(json_text=None):
     if isinstance(json_text, str):

+ 1 - 0
src/core/callcenter/esl/handler/channel_hangup_handler.py

@@ -121,6 +121,7 @@ class ChannelHangupHandler(EslEventHandler):
 
             if (CallType.BOT_CALL.code == call_info.call_type or CallType.INCOMING_BOT_CALL.code == call_info.call_type) and device_info.device_type == DeviceType.ROBOT.code:
                 if not device_info.answer_time:
+                    self.cache.set_pjsua_thread_lock()
                     self.logger.info("daviddebugger::CALL_BOT_NO_ANSWER_ERROR_REQUESTS call_id=%s, device_id=%s", call_id, device_id)
                     registry.CALL_BOT_NO_ANSWER_ERROR_REQUESTS.labels(f"{call_info.bucket_type}", f"{device_info.sip_status}").inc()
                 registry.CALL_BOT_HANGUP_REQUESTS.labels(f"{call_info.bucket_type}", f"{device_info.sip_status}").inc()

+ 29 - 25
src/core/voip/bot.py

@@ -11,6 +11,7 @@ import sys
 import pjsua2 as pj
 from enum import Enum
 from datetime import datetime
+from multiprocessing import Process
 
 from src.core.callcenter import registry
 from src.core.callcenter.cache import Cache
@@ -358,10 +359,10 @@ class MyCall(pj.Call):
         self.logger.info('handling_release %s', action.action_code)
         action_code = action.action_code
         if action_code == 'hang':  # 挂断
-            self.agent.hangup(self, self.user_part)
+            self.agent.hangup(self.user_part)
             self.end_statistics()
         elif action_code == 'transfer':  # 转人工
-            self.agent.transfer(call=self, user_part=self.user_part, call_id=self.session_id, device_id=self.device_id)
+            self.agent.transfer(user_part=self.user_part, call_id=self.session_id, device_id=self.device_id)
             self.end_statistics()
             #更新通话记录
             self.agent.dataHandleServer.update_record(self.session_id, service_category=2)
@@ -396,7 +397,7 @@ class MyCall(pj.Call):
 
         self.asr.close()
         # 远程挂机之后要将分机号回收
-        self.agent.hangup(self, self.user_part)
+        self.agent.hangup(self.user_part)
         # self.agent.release(self.user_part)
         self.end_statistics()
 
@@ -531,7 +532,9 @@ class BotAgent:
         self.acd_service = None
         self.cache = Cache(app)
         self.dataHandleServer = DataHandleServer(app)
-        threading.Thread(target=self.create_pjsua2, daemon=True).start()
+        self.process = Process(name="PJSUA-THREAD", target=self.create_pjsua2)
+        self.process.start()
+        # threading.Thread(name="PJSUA-THREAD", target=self.create_pjsua2, daemon=True).start()
 
     def create_pjsua2(self, timeout_sec=86400):
         start_time = time.time()
@@ -597,32 +600,33 @@ class BotAgent:
 
         while not self.is_stopping:
             registry.BOT_AGENT_LIVES.set(self.user_part_pool.qsize())
-            self.register_per_hours()
+            self.thread_health_check()
             self.ep.libHandleEvents(100)
 
-    def register_per_hours(self):
+    def thread_health_check(self):
         self.counter += 1
-        if self.counter % 1000 == 0:
-            self.counter = 0
-            # for acc in self.accounts:
-            #     print(acc.getId(), acc.isValid())
+        _lock = self.cache.get_pjsua_thread_lock()
+        if _lock:
+            self.logger.error("daviddebugger::thread is lock, will restart")
+            self.restart()
 
         _lock = self.cache.lock_register_per_hours()
         if not _lock and len(self.accounts) == len(self.user_part_range):
+            self.logger.error("daviddebugger::register expire, will restart")
             self.restart()
 
-    def transfer(self, call, user_part, call_id, device_id, service_id='00000000000000000'):
+    def transfer(self, user_part, call_id, device_id, service_id='00000000000000000'):
         if self.acd_service:
             self.acd_service.transfer_to_agent(call_id, device_id, service_id, hold=True)
         # sip_headers = {'P-LIBRA-HangUpReason': 'transferToAgent', 'P-LIBRA-ServiceId': service_id}
         try_count = 100
         while try_count >0:
             if self.cache.get_after_play_hold_music(call_id):
-                self.hangup(call, user_part)
+                self.hangup(user_part)
                 break
             time.sleep(0.1)
 
-    def hangup(self, call, user_part, reason="NORMAL_CLEARING", **sip_headers):
+    def hangup(self, user_part, reason="NORMAL_CLEARING", **sip_headers):
         call_op_param = pj.CallOpParam(True)
         call_op_param.statusCode = pj.PJSIP_SC_OK
         call_op_param.reason = reason
@@ -636,19 +640,13 @@ class BotAgent:
             sip_header_vector.push_back(_sip_header)
         call_op_param.txOption.headers = sip_header_vector
         try:
-            call.hangup(call_op_param)
             acc = self.accounts.get(user_part)
             if acc:
-                keys = [k for k,v in list(acc.calls.items()) if call == v]
-                for k in keys:
-                    self.logger.info('hangup, key=%s, totals=%s', k, len(acc.calls))
-                    acc.calls.pop(k)
-            del call
-            #     for k, v in acc.calls.items():
-            #         self.logger.info('hangup, call_idx=%s, call_active=%s'%(k, v.isActive()))
-            #         if v.isActive():
-            #             v.hangup(call_op_param)
-            #     acc.calls.clear()
+                for k, v in acc.calls.items():
+                    self.logger.info('hangup, call_idx=%s, call_active=%s'%(k, v.isActive()))
+                    if v.isActive():
+                        v.hangup(call_op_param)
+                acc.calls.clear()
         except:
             traceback.print_exc()
         finally:
@@ -699,7 +697,13 @@ class BotAgent:
     def restart(self):
         self.destroy()
         self.is_stopping = False
-        threading.Thread(target=self.create_pjsua2, daemon=True).start()
+        self.process.kill()
+        self.logger.info('restart, 11111')
+        self.process.join()
+        self.logger.info('restart, 22222')
+        self.process = Process(name="PJSUA-THREAD", target=self.create_pjsua2)
+        self.process.start()
+        # threading.Thread(target=self.create_pjsua2, daemon=True).start()
 
     def destroy(self):
         self.is_stopping = True