#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2024/4/13 15:12 # @Author : liu import json import time import datetime import decimal import traceback import pymysql import os from src.core import singleton from redis import StrictRedis, ConnectionPool RADIS_HOST = os.environ.get("REDIS_HOST", "10.0.0.24") # 设置mysql连接信息 host从容器配置文件获取 默认为测试mysql 如果获取配置文件是9.54 设置线上mysql MYSQL_HOST = os.environ.get("MYSQL_HOST", "192.168.9.54") MYSQL_PASSWORD = 'Hongshan2024@longjiang' if MYSQL_HOST == "192.168.9.54": MYSQL_PASSWORD = "Hongshan2024#longjiang%xinyi" else: MYSQL_PASSWORD = "Hongshan2024@longjiang" # @singleton class MysqlHandler: """ 表名: t_xinyi_daily 日报 t_xinyi_industry 工业 t_xinyi_Robot 机器人化验 """ def __init__(self, # host='10.0.0.28', # user='root', # passwd='Hongshan2024@longjiang', host=MYSQL_HOST, user='root', passwd=MYSQL_PASSWORD, db='big_model', port=3306, charset='utf8' ): self.host = host self.user = user self.passwd = passwd self.db = db self.port = port self.charset = charset self.conn = None self._conn() def _conn(self): try: self.conn = pymysql.connect(host=self.host, user=self.user, passwd=self.passwd, db=self.db, port=self.port, charset=self.charset) return True except: traceback.print_exc() return False def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就...... _number = 0 _status = True while _status and _number <= num: try: self.conn.ping() # cping 校验连接是否异常 _status = False except: if self._conn() == True: # 重新连接,成功退出 _status = False break _number += 1 time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束 def query(self, sql): try: self._reConn() # 使用 cursor() 方法创建一个游标对象 cursor cursor = self.conn.cursor(pymysql.cursors.DictCursor) # 执行SQL语句 cursor.execute(sql) # 获取所有记录列表 results = cursor.fetchall() return results except: print(sql) traceback.print_exc() finally: # 增/删/改均需要进行commit提交,进行保存 # conn.commit() # 关闭游标 cursor.close() def exec(self, sql): try: # 使用 cursor() 方法创建一个游标对象 cursor cursor = self.db.cursor() # 执行SQL语句 cursor.execute(sql) self.db.commit() except Exception as e: print(e) finally: cursor.close() def __del__(self): self.conn.close() @singleton class RedisHandler: def __init__(self, host='192.168.124.6', port=6379, db=0, password='^YHN&UJM'): try: # host = '10.0.0.24' # host = RADIS_HOST self.redis = StrictRedis( connection_pool=ConnectionPool(host=host, port=port, db=db, password=password)) except Exception as e: traceback.print_exc() def get(self, key): return self.redis.get(key) def set(self, key, val, expire=None, prefix=None): newkey = prefix + key if prefix else key self.redis.set(newkey, val) if expire: self.redis.expire(newkey, expire) def __del__(self): self.redis.close() # def main(): # redis_handler = RedisHandler() # print(redis_handler.redis.info()) # redis_handler.redis.xadd('stop_gen_stream', {'session_id': 3333333}) # # stream_name = 'stop_gen_stream' # consumer_group = 'stop_gen_group' # consumer_name = 'stop_gen_consumer' # # # redis_handler.redis.xgroup_create(stream_name, consumer_group, id='0', mkstream=True) # # # 从流中读取消息 # # while True: # # # 读取数据 # # data = redis_handler.redis.xreadgroup(consumer_group, consumer_name, {stream_name: '>'}, # # count=1, block=1000) # # # 处理收到的消息 # # for stream, messages in data: # # for message in messages: # # message_id = message[0].decode('utf-8') # # message_data = message[1] # # print(f"Received message {message_id}: {message_data}") # # # 在这里可以添加任何自定义的处理逻辑 # # # def main1(): # mysql = MysqlHandler() # count = mysql.query("select count(0) as totals from t_xinyi_industry where TEST_TIME like '%00'") # print(count) # # rows = mysql.query("select * from t_xinyi_industry where TEST_TIME like '%00' limit 10") # for idx, cols in enumerate(rows): # print(cols.get('TEST_TIME'), cols.get('JS_COD')) # # # if __name__ == "__main__": # main()