Davidliu 3 months ago
parent
commit
a8a6cf9935
2 changed files with 42 additions and 31 deletions
  1. 16 11
      src/core/callcenter/agent.py
  2. 26 20
      src/core/voip/bot.py

+ 16 - 11
src/core/callcenter/agent.py

@@ -8,6 +8,7 @@ from collections import defaultdict
 from concurrent.futures import ThreadPoolExecutor
 from typing import List
 
+from apscheduler.schedulers.background import BackgroundScheduler
 from sqlalchemy import or_
 
 from src.core.callcenter import registry
@@ -265,9 +266,12 @@ class AgentOperService:
         self.agent_actionlog_service = AgentActionLogService(app)
         self.agent_state_service = AgentStateService(app)
 
-        self.daemon_stopping = False
+        # self.daemon_stopping = False
         self.agent_heartbeat_expire = 30
-        threading.Thread(target=self.agent_heartbeat_daemon).start()
+        self.agent_heartbeat_job_scheduler = BackgroundScheduler()
+        self.agent_heartbeat_job_scheduler.add_job(self.agent_heartbeat_daemon, 'interval', seconds=1, max_instances=1, name='agent_heartbeat_daemon')
+        self.agent_heartbeat_job_scheduler.start()
+        # threading.Thread(target=self.agent_heartbeat_daemon).start()
 
     def agent_heartbeat_daemon(self):
         def check_out_daemon(_name, key, value):
@@ -280,17 +284,18 @@ class AgentOperService:
             except:
                 traceback.print_exc()
 
-        while not self.daemon_stopping:
-            name = CENTER_AGENT_HEARTBEAT % SAAS_ID
-            members = self.redis_handler.redis.hgetall(name)
-            if not members:
-                continue
-            for k,v in members.items():
-                check_out_daemon(name, k, v)
-            time.sleep(1)
+        # while not self.daemon_stopping:
+        name = CENTER_AGENT_HEARTBEAT % SAAS_ID
+        members = self.redis_handler.redis.hgetall(name)
+        if not members:
+            return
+        for k,v in members.items():
+            check_out_daemon(name, k, v)
+        # time.sleep(1)
 
     def __del__(self):
-        self.daemon_stopping = True
+        self.agent_heartbeat_job_scheduler.shutdown()
+        # self.daemon_stopping = True
 
     @with_app_context
     def enable(self, req: AgentActionRequest):

+ 26 - 20
src/core/voip/bot.py

@@ -14,6 +14,8 @@ from enum import Enum
 from datetime import datetime
 from multiprocessing import Process
 
+from apscheduler.schedulers.background import BackgroundScheduler
+
 from src.core.callcenter import registry
 from src.core.callcenter.cache import Cache
 from src.core.datasource import SIP_SERVER, SERVE_HOST
@@ -519,7 +521,11 @@ class BotAgent:
         self.dataHandleServer = DataHandleServer(app)
         self.pjsua_thread = None
         self.start()
-        threading.Thread(target=self.main_thread_daemon).start()
+        # threading.Thread(target=self.main_thread_daemon).start()
+
+        self.daemon_job_scheduler = BackgroundScheduler()
+        self.daemon_job_scheduler.add_job(self.main_thread_daemon, 'interval', seconds=1, max_instances=1, name='bot_agent_daemon')
+        self.daemon_job_scheduler.start()
 
     def create_pjsua2(self, timeout_sec=86400):
         start_time = time.time()
@@ -585,7 +591,7 @@ class BotAgent:
 
         while not self.is_stopping:
             registry.BOT_AGENT_LIVES.set(self.user_part_pool.qsize())
-            self.ep.libHandleEvents(100)
+            self.ep.libHandleEvents(200)
 
         self.logger.info("create pjsua will shutdown 1111")
         self.call_players.clear()
@@ -600,26 +606,26 @@ class BotAgent:
 
 
     def main_thread_daemon(self):
-        while not self.daemon_stopping:
-            _lock = self._play_complete_degree_check()
-            if _lock:
-                self.logger.error("daviddebugger::play time greater than 60s, will restart")
-                self.restart()
-                continue
+        # while not self.daemon_stopping:
+        _lock = self._play_complete_degree_check()
+        if _lock:
+            self.logger.error("daviddebugger::play time greater than 60s, will restart")
+            self.restart()
+            return
 
-            _lock = self.cache.get_pjsua_thread_lock()
-            if _lock:
-                self.cache.del_pjsua_thread_lock()
-                self.logger.error("daviddebugger::thread is lock, will restart")
-                self.restart()
-                continue
+        _lock = self.cache.get_pjsua_thread_lock()
+        if _lock:
+            self.cache.del_pjsua_thread_lock()
+            self.logger.error("daviddebugger::thread is lock, will restart")
+            self.restart()
+            return
 
-            _lock = self.cache.lock_register_per_hours()
-            if not _lock and len(self.accounts) == len(self.user_part_range):
-                self.logger.error("daviddebugger::register expire, will restart")
-                self.restart()
-                continue
-            time.sleep(0.1)
+        _lock = self.cache.lock_register_per_hours()
+        if not _lock and len(self.accounts) == len(self.user_part_range):
+            self.logger.error("daviddebugger::register expire, will restart")
+            self.restart()
+            return
+            # time.sleep(0.1)
 
     def _play_complete_degree_check(self):
         for k, v in list(self.call_players.items()):