datasource.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # @Time : 2024/4/13 15:12
  4. # @Author : liu
  5. import json
  6. import time
  7. import datetime
  8. import decimal
  9. import traceback
  10. import pymysql
  11. import os
  12. from src.core import singleton
  13. from redis import StrictRedis, ConnectionPool
  14. RADIS_HOST = os.environ.get("REDIS_HOST", "10.0.0.24")
  15. # 设置mysql连接信息 host从容器配置文件获取 默认为测试mysql 如果获取配置文件是9.54 设置线上mysql
  16. MYSQL_HOST = os.environ.get("MYSQL_HOST", "192.168.9.54")
  17. MYSQL_PASSWORD = 'Hongshan2024@longjiang'
  18. if MYSQL_HOST == "192.168.9.54":
  19. MYSQL_PASSWORD = "Hongshan2024#longjiang%xinyi"
  20. else:
  21. MYSQL_PASSWORD = "Hongshan2024@longjiang"
  22. # @singleton
  23. class MysqlHandler:
  24. """
  25. 表名:
  26. t_xinyi_daily 日报
  27. t_xinyi_industry 工业
  28. t_xinyi_Robot 机器人化验
  29. """
  30. def __init__(self,
  31. # host='10.0.0.28',
  32. # user='root',
  33. # passwd='Hongshan2024@longjiang',
  34. host=MYSQL_HOST,
  35. user='root',
  36. passwd=MYSQL_PASSWORD,
  37. db='big_model',
  38. port=3306,
  39. charset='utf8'
  40. ):
  41. self.host = host
  42. self.user = user
  43. self.passwd = passwd
  44. self.db = db
  45. self.port = port
  46. self.charset = charset
  47. self.conn = None
  48. self._conn()
  49. def _conn(self):
  50. try:
  51. self.conn = pymysql.connect(host=self.host, user=self.user, passwd=self.passwd, db=self.db, port=self.port,
  52. charset=self.charset)
  53. return True
  54. except:
  55. traceback.print_exc()
  56. return False
  57. def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就......
  58. _number = 0
  59. _status = True
  60. while _status and _number <= num:
  61. try:
  62. self.conn.ping() # cping 校验连接是否异常
  63. _status = False
  64. except:
  65. if self._conn() == True: # 重新连接,成功退出
  66. _status = False
  67. break
  68. _number += 1
  69. time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
  70. def query(self, sql):
  71. try:
  72. self._reConn()
  73. # 使用 cursor() 方法创建一个游标对象 cursor
  74. cursor = self.conn.cursor(pymysql.cursors.DictCursor)
  75. # 执行SQL语句
  76. cursor.execute(sql)
  77. # 获取所有记录列表
  78. results = cursor.fetchall()
  79. return results
  80. except:
  81. print(sql)
  82. traceback.print_exc()
  83. finally:
  84. # 增/删/改均需要进行commit提交,进行保存
  85. # conn.commit()
  86. # 关闭游标
  87. cursor.close()
  88. def exec(self, sql):
  89. try:
  90. # 使用 cursor() 方法创建一个游标对象 cursor
  91. cursor = self.db.cursor()
  92. # 执行SQL语句
  93. cursor.execute(sql)
  94. self.db.commit()
  95. except Exception as e:
  96. print(e)
  97. finally:
  98. cursor.close()
  99. def __del__(self):
  100. self.conn.close()
  101. @singleton
  102. class RedisHandler:
  103. def __init__(self, host='192.168.124.6', port=6379, db=0, password='^YHN&UJM'):
  104. try:
  105. # host = '10.0.0.24'
  106. # host = RADIS_HOST
  107. self.redis = StrictRedis(
  108. connection_pool=ConnectionPool(host=host, port=port, db=db, password=password))
  109. except Exception as e:
  110. traceback.print_exc()
  111. def get(self, key):
  112. return self.redis.get(key)
  113. def set(self, key, val, expire=None, prefix=None):
  114. newkey = prefix + key if prefix else key
  115. self.redis.set(newkey, val)
  116. if expire:
  117. self.redis.expire(newkey, expire)
  118. def __del__(self):
  119. self.redis.close()
  120. # def main():
  121. # redis_handler = RedisHandler()
  122. # print(redis_handler.redis.info())
  123. # redis_handler.redis.xadd('stop_gen_stream', {'session_id': 3333333})
  124. #
  125. # stream_name = 'stop_gen_stream'
  126. # consumer_group = 'stop_gen_group'
  127. # consumer_name = 'stop_gen_consumer'
  128. # # # redis_handler.redis.xgroup_create(stream_name, consumer_group, id='0', mkstream=True)
  129. # # # 从流中读取消息
  130. # # while True:
  131. # # # 读取数据
  132. # # data = redis_handler.redis.xreadgroup(consumer_group, consumer_name, {stream_name: '>'},
  133. # # count=1, block=1000)
  134. # # # 处理收到的消息
  135. # # for stream, messages in data:
  136. # # for message in messages:
  137. # # message_id = message[0].decode('utf-8')
  138. # # message_data = message[1]
  139. # # print(f"Received message {message_id}: {message_data}")
  140. # # # 在这里可以添加任何自定义的处理逻辑
  141. #
  142. #
  143. # def main1():
  144. # mysql = MysqlHandler()
  145. # count = mysql.query("select count(0) as totals from t_xinyi_industry where TEST_TIME like '%00'")
  146. # print(count)
  147. #
  148. # rows = mysql.query("select * from t_xinyi_industry where TEST_TIME like '%00' limit 10")
  149. # for idx, cols in enumerate(rows):
  150. # print(cols.get('TEST_TIME'), cols.get('JS_COD'))
  151. #
  152. #
  153. # if __name__ == "__main__":
  154. # main()