datasource.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # @Time : 2024/4/13 15:12
  4. # @Author : liu
  5. import json
  6. import time
  7. import datetime
  8. import decimal
  9. import traceback
  10. import pymysql
  11. import os
  12. from typing import List, Tuple
  13. from src.core import singleton
  14. from redis import StrictRedis, ConnectionPool
  15. from src.core.voip.constant import *
  16. # RADIS_HOST = os.environ.get("REDIS_HOST", "10.0.0.24")
  17. #
  18. # # 设置mysql连接信息 host从容器配置文件获取 默认为测试mysql 如果获取配置文件是9.54 设置线上mysql
  19. # MYSQL_HOST = os.environ.get("MYSQL_HOST", "192.168.9.54")
  20. # MYSQL_PASSWORD = 'Hongshan2024@longjiang'
  21. #
  22. # if MYSQL_HOST == "192.168.9.54":
  23. # MYSQL_PASSWORD = "Hongshan2024#longjiang%xinyi"
  24. # else:
  25. # MYSQL_PASSWORD = "Hongshan2024@longjiang"
  26. # @singleton
  27. class MysqlHandler:
  28. """
  29. 表名:
  30. t_xinyi_daily 日报
  31. t_xinyi_industry 工业
  32. t_xinyi_Robot 机器人化验
  33. """
  34. def __init__(self,
  35. # host='192.168.100.159',
  36. # user='root',
  37. # passwd='12345678',
  38. host=SERVE_HOST,
  39. user='root',
  40. passwd=MYSQL_PASSWORD,
  41. db='libra_bot',
  42. port=3306,
  43. charset='utf8'
  44. ):
  45. print(f"mysql_host:{host}", f"mysql_passwd:{MYSQL_PASSWORD}" )
  46. self.host = host
  47. self.user = user
  48. self.passwd = passwd
  49. self.db = db
  50. self.port = port
  51. self.charset = charset
  52. self.conn = None
  53. self._conn()
  54. def _conn(self):
  55. try:
  56. self.conn = pymysql.connect(host=self.host, user=self.user, passwd=self.passwd, db=self.db, port=self.port,
  57. charset=self.charset)
  58. return True
  59. except:
  60. traceback.print_exc()
  61. return False
  62. def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就......
  63. _number = 0
  64. _status = True
  65. while _status and _number <= num:
  66. try:
  67. self.conn.ping() # cping 校验连接是否异常
  68. _status = False
  69. except:
  70. if self._conn() == True: # 重新连接,成功退出
  71. _status = False
  72. break
  73. _number += 1
  74. time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
  75. def select(self, query, params: Tuple = None):
  76. try:
  77. self._reConn()
  78. # 使用 cursor() 方法创建一个游标对象 cursor
  79. cursor = self.conn.cursor(pymysql.cursors.DictCursor)
  80. # 执行SQL语句
  81. cursor.execute(query, params)
  82. # 获取所有记录列表
  83. results = cursor.fetchall()
  84. return results
  85. except:
  86. print(query, params)
  87. traceback.print_exc()
  88. finally:
  89. # 增/删/改均需要进行commit提交,进行保存
  90. # conn.commit()
  91. # 关闭游标
  92. cursor.close()
  93. def execute(self, sql, values: Tuple = None):
  94. try:
  95. self._reConn()
  96. # 使用 cursor() 方法创建一个游标对象 cursor
  97. cursor = self.conn.cursor()
  98. # 执行SQL语句
  99. cursor.execute(sql, values)
  100. self.conn.commit()
  101. return cursor.rowcount, cursor.lastrowid
  102. except:
  103. print(sql, values)
  104. traceback.print_exc()
  105. raise
  106. finally:
  107. cursor.close()
  108. def create_database(self, db_name: str):
  109. sql = f"CREATE DATABASE IF NOT EXISTS `{db_name}` CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;"
  110. try:
  111. self.execute(sql)
  112. print(f"数据库'{db_name}'创建成功")
  113. except pymysql.MySQLError as e:
  114. print(f"创建数据库时发生错误: {e}")
  115. def use_database(self, db_name: str):
  116. try:
  117. self.conn.select_db(db_name)
  118. except pymysql.ProgrammingError as e:
  119. print(f"切换至数据库'{db_name}'时发生错误: {e}")
  120. def create_table(self, table_schema: str):
  121. try:
  122. self.execute(table_schema)
  123. print("表创建成功")
  124. except pymysql.MySQLError as e:
  125. print(f"创建表时发生错误: {e}")
  126. def insert(self, table_name: str, values: Tuple, columns: Tuple[str, ...] = None):
  127. placeholders = ', '.join(['%s'] * len(values))
  128. if columns:
  129. columns_str = ', '.join(columns)
  130. sql = f"INSERT INTO `{table_name}` ({columns_str}) VALUES ({placeholders})"
  131. else:
  132. sql = f"INSERT INTO `{table_name}` VALUES ({placeholders})"
  133. try:
  134. rowcount, lastrowid = self.execute(sql, values)
  135. return lastrowid
  136. except pymysql.MySQLError as e:
  137. print(f"插入数据时发生错误: {e}")
  138. def update(self, table_name: str, set_clause: str, where_clause: str, values: Tuple):
  139. sql = f"UPDATE `{table_name}` SET {set_clause} WHERE {where_clause}"
  140. try:
  141. rowcount, lastrowid = self.execute(sql, values)
  142. return rowcount
  143. except pymysql.MySQLError as e:
  144. print(f"更新数据时发生错误: {e}")
  145. def delete(self, table_name: str, where_clause: str, values: Tuple):
  146. sql = f"DELETE FROM `{table_name}` WHERE {where_clause}"
  147. try:
  148. rowcount, lastrowid = self.execute(sql, values)
  149. return rowcount
  150. except pymysql.MySQLError as e:
  151. print(f"删除数据时发生错误: {e}")
  152. def select_one(self, query: str, params: Tuple = None):
  153. try:
  154. result = self.select(query, params)
  155. if result and len(result) > 0:
  156. return result[0]
  157. except pymysql.MySQLError as e:
  158. print(f"执行查询时发生错误: {e}")
  159. def __del__(self):
  160. self.conn.close()
  161. @singleton
  162. class RedisHandler:
  163. def __init__(self, host=SERVE_HOST, port=6379, db=0, password='^YHN&UJM'):
  164. try:
  165. print(f"redis_host:{host}")
  166. self.redis = StrictRedis(
  167. connection_pool=ConnectionPool(host=host, port=port, db=db, password=password))
  168. except Exception as e:
  169. traceback.print_exc()
  170. def get(self, key):
  171. return self.redis.get(key)
  172. def set(self, key, val, expire=None, prefix=None):
  173. newkey = prefix + key if prefix else key
  174. self.redis.set(newkey, val)
  175. if expire:
  176. self.redis.expire(newkey, expire)
  177. def __del__(self):
  178. self.redis.close()
  179. # def main():
  180. # redis_handler = RedisHandler()
  181. # print(redis_handler.redis.info())
  182. # redis_handler.redis.xadd('stop_gen_stream', {'session_id': 3333333})
  183. #
  184. # stream_name = 'stop_gen_stream'
  185. # consumer_group = 'stop_gen_group'
  186. # consumer_name = 'stop_gen_consumer'
  187. # # # redis_handler.redis.xgroup_create(stream_name, consumer_group, id='0', mkstream=True)
  188. # # # 从流中读取消息
  189. # # while True:
  190. # # # 读取数据
  191. # # data = redis_handler.redis.xreadgroup(consumer_group, consumer_name, {stream_name: '>'},
  192. # # count=1, block=1000)
  193. # # # 处理收到的消息
  194. # # for stream, messages in data:
  195. # # for message in messages:
  196. # # message_id = message[0].decode('utf-8')
  197. # # message_data = message[1]
  198. # # print(f"Received message {message_id}: {message_data}")
  199. # # # 在这里可以添加任何自定义的处理逻辑
  200. #
  201. #
  202. def main1():
  203. mysql = MysqlHandler()
  204. rows = mysql.select_one("select * from c_agent")
  205. print(rows)
  206. # # 创建数据库
  207. # ds.create_database('new_database')
  208. #
  209. # # 切换到新数据库
  210. # ds.use_database('new_database')
  211. #
  212. # # 创建表
  213. # table_schema = """
  214. # CREATE TABLE IF NOT EXISTS test_table (
  215. # id INT AUTO_INCREMENT PRIMARY KEY,
  216. # name VARCHAR(100),
  217. # email VARCHAR(100),
  218. # created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  219. # );
  220. # """
  221. # ds.create_table(table_schema)
  222. #
  223. # # 插入数据
  224. # ds.insert('test_table', ('John Doe', 'john.doe@example.com'))
  225. #
  226. # # 更新数据
  227. # set_clause = "name = %s, email = %s"
  228. # where_clause = "id = %s"
  229. # values = ('Jane Doe', 'jane.doe@example.com', 1)
  230. # ds.update('test_table', set_clause, where_clause, values)
  231. #
  232. # # 删除数据
  233. # where_clause = "id = %s"
  234. # values = (1,)
  235. # ds.delete('test_table', where_clause, values)
  236. if __name__ == "__main__":
  237. main1()