datasource.py 8.9 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # @Time : 2024/4/13 15:12
  4. # @Author : liu
  5. import json
  6. import time
  7. import datetime
  8. import decimal
  9. import traceback
  10. import pymysql
  11. import os
  12. from typing import List, Tuple
  13. from src.core import singleton
  14. from redis import StrictRedis, ConnectionPool
  15. SERVE_HOST = os.environ.get("SERVE_HOST")
  16. # SERVE_HOST = "192.168.100.195"
  17. MYSQL_PASSWORD = 'EKoAe3H8xybQKrFPApXM'
  18. if SERVE_HOST != "192.168.100.159":
  19. SIP_SERVER = SERVE_HOST
  20. MYSQL_PASSWORD = "12345678"
  21. else:
  22. SIP_SERVER = "pbx.fuxicarbon.com"
  23. # RADIS_HOST = os.environ.get("REDIS_HOST", "10.0.0.24")
  24. #
  25. # # 设置mysql连接信息 host从容器配置文件获取 默认为测试mysql 如果获取配置文件是9.54 设置线上mysql
  26. # MYSQL_HOST = os.environ.get("MYSQL_HOST", "192.168.9.54")
  27. # MYSQL_PASSWORD = 'Hongshan2024@longjiang'
  28. #
  29. # if MYSQL_HOST == "192.168.9.54":
  30. # MYSQL_PASSWORD = "Hongshan2024#longjiang%xinyi"
  31. # else:
  32. # MYSQL_PASSWORD = "Hongshan2024@longjiang"
  33. # @singleton
  34. class MysqlHandler:
  35. """
  36. 表名:
  37. t_xinyi_daily 日报
  38. t_xinyi_industry 工业
  39. t_xinyi_Robot 机器人化验
  40. """
  41. def __init__(self,
  42. # host='192.168.100.159',
  43. # user='root',
  44. # passwd='12345678',
  45. host=SERVE_HOST,
  46. user='root',
  47. passwd=MYSQL_PASSWORD,
  48. db='libra_bot',
  49. port=3306,
  50. charset='utf8'
  51. ):
  52. print(f"mysql_host:{host}", f"mysql_passwd:{MYSQL_PASSWORD}" )
  53. self.host = host
  54. self.user = user
  55. self.passwd = passwd
  56. self.db = db
  57. self.port = port
  58. self.charset = charset
  59. self.conn = None
  60. self._conn()
  61. def _conn(self):
  62. try:
  63. self.conn = pymysql.connect(host=self.host, user=self.user, passwd=self.passwd, db=self.db, port=self.port,
  64. charset=self.charset)
  65. return True
  66. except:
  67. traceback.print_exc()
  68. return False
  69. def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就......
  70. _number = 0
  71. _status = True
  72. while _status and _number <= num:
  73. try:
  74. self.conn.ping() # cping 校验连接是否异常
  75. _status = False
  76. except:
  77. if self._conn() == True: # 重新连接,成功退出
  78. _status = False
  79. break
  80. _number += 1
  81. time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
  82. def select(self, query, params: Tuple = None):
  83. try:
  84. self._reConn()
  85. # 使用 cursor() 方法创建一个游标对象 cursor
  86. cursor = self.conn.cursor(pymysql.cursors.DictCursor)
  87. # 执行SQL语句
  88. cursor.execute(query, params)
  89. # 获取所有记录列表
  90. results = cursor.fetchall()
  91. return results
  92. except:
  93. print(query, params)
  94. traceback.print_exc()
  95. finally:
  96. # 增/删/改均需要进行commit提交,进行保存
  97. # conn.commit()
  98. # 关闭游标
  99. cursor.close()
  100. def execute(self, sql, values: Tuple = None):
  101. try:
  102. self._reConn()
  103. # 使用 cursor() 方法创建一个游标对象 cursor
  104. cursor = self.conn.cursor()
  105. # 执行SQL语句
  106. cursor.execute(sql, values)
  107. self.conn.commit()
  108. return cursor.rowcount, cursor.lastrowid
  109. except:
  110. print(sql, values)
  111. traceback.print_exc()
  112. raise
  113. finally:
  114. cursor.close()
  115. def create_database(self, db_name: str):
  116. sql = f"CREATE DATABASE IF NOT EXISTS `{db_name}` CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;"
  117. try:
  118. self.execute(sql)
  119. print(f"数据库'{db_name}'创建成功")
  120. except pymysql.MySQLError as e:
  121. print(f"创建数据库时发生错误: {e}")
  122. def use_database(self, db_name: str):
  123. try:
  124. self.conn.select_db(db_name)
  125. except pymysql.ProgrammingError as e:
  126. print(f"切换至数据库'{db_name}'时发生错误: {e}")
  127. def create_table(self, table_schema: str):
  128. try:
  129. self.execute(table_schema)
  130. print("表创建成功")
  131. except pymysql.MySQLError as e:
  132. print(f"创建表时发生错误: {e}")
  133. def insert(self, table_name: str, values: Tuple, columns: Tuple[str, ...] = None):
  134. placeholders = ', '.join(['%s'] * len(values))
  135. if columns:
  136. columns_str = ', '.join(columns)
  137. sql = f"INSERT INTO `{table_name}` ({columns_str}) VALUES ({placeholders})"
  138. else:
  139. sql = f"INSERT INTO `{table_name}` VALUES ({placeholders})"
  140. try:
  141. rowcount, lastrowid = self.execute(sql, values)
  142. return lastrowid
  143. except pymysql.MySQLError as e:
  144. print(f"插入数据时发生错误: {e}")
  145. def update(self, table_name: str, set_clause: str, where_clause: str, values: Tuple):
  146. sql = f"UPDATE `{table_name}` SET {set_clause} WHERE {where_clause}"
  147. try:
  148. rowcount, lastrowid = self.execute(sql, values)
  149. return rowcount
  150. except pymysql.MySQLError as e:
  151. print(f"更新数据时发生错误: {e}")
  152. def delete(self, table_name: str, where_clause: str, values: Tuple):
  153. sql = f"DELETE FROM `{table_name}` WHERE {where_clause}"
  154. try:
  155. rowcount, lastrowid = self.execute(sql, values)
  156. return rowcount
  157. except pymysql.MySQLError as e:
  158. print(f"删除数据时发生错误: {e}")
  159. def select_one(self, query: str, params: Tuple = None):
  160. try:
  161. result = self.select(query, params)
  162. if result and len(result) > 0:
  163. return result[0]
  164. except pymysql.MySQLError as e:
  165. print(f"执行查询时发生错误: {e}")
  166. def __del__(self):
  167. self.conn.close()
  168. @singleton
  169. class RedisHandler:
  170. def __init__(self, host=SERVE_HOST, port=6379, db=0, password='^YHN&UJM'):
  171. try:
  172. print(f"redis_host:{host}")
  173. self.redis = StrictRedis(
  174. connection_pool=ConnectionPool(host=host, port=port, db=db, password=password),
  175. decode_responses=True)
  176. except Exception as e:
  177. traceback.print_exc()
  178. def get(self, key):
  179. return self.redis.get(key)
  180. def set(self, key, val, expire=None, prefix=None):
  181. newkey = prefix + key if prefix else key
  182. self.redis.set(newkey, val)
  183. if expire:
  184. self.redis.expire(newkey, expire)
  185. def __del__(self):
  186. self.redis.close()
  187. # def main():
  188. # redis_handler = RedisHandler()
  189. # print(redis_handler.redis.info())
  190. # redis_handler.redis.xadd('stop_gen_stream', {'session_id': 3333333})
  191. #
  192. # stream_name = 'stop_gen_stream'
  193. # consumer_group = 'stop_gen_group'
  194. # consumer_name = 'stop_gen_consumer'
  195. # # # redis_handler.redis.xgroup_create(stream_name, consumer_group, id='0', mkstream=True)
  196. # # # 从流中读取消息
  197. # # while True:
  198. # # # 读取数据
  199. # # data = redis_handler.redis.xreadgroup(consumer_group, consumer_name, {stream_name: '>'},
  200. # # count=1, block=1000)
  201. # # # 处理收到的消息
  202. # # for stream, messages in data:
  203. # # for message in messages:
  204. # # message_id = message[0].decode('utf-8')
  205. # # message_data = message[1]
  206. # # print(f"Received message {message_id}: {message_data}")
  207. # # # 在这里可以添加任何自定义的处理逻辑
  208. #
  209. #
  210. def main1():
  211. mysql = MysqlHandler()
  212. rows = mysql.select_one("select * from c_agent")
  213. print(rows)
  214. # # 创建数据库
  215. # ds.create_database('new_database')
  216. #
  217. # # 切换到新数据库
  218. # ds.use_database('new_database')
  219. #
  220. # # 创建表
  221. # table_schema = """
  222. # CREATE TABLE IF NOT EXISTS test_table (
  223. # id INT AUTO_INCREMENT PRIMARY KEY,
  224. # name VARCHAR(100),
  225. # email VARCHAR(100),
  226. # created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  227. # );
  228. # """
  229. # ds.create_table(table_schema)
  230. #
  231. # # 插入数据
  232. # ds.insert('test_table', ('John Doe', 'john.doe@example.com'))
  233. #
  234. # # 更新数据
  235. # set_clause = "name = %s, email = %s"
  236. # where_clause = "id = %s"
  237. # values = ('Jane Doe', 'jane.doe@example.com', 1)
  238. # ds.update('test_table', set_clause, where_clause, values)
  239. #
  240. # # 删除数据
  241. # where_clause = "id = %s"
  242. # values = (1,)
  243. # ds.delete('test_table', where_clause, values)
  244. if __name__ == "__main__":
  245. main1()