#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2024/4/13 15:12 # @Author : liu import json import time import datetime import decimal import traceback import pymysql import os from typing import List, Tuple from src.core import singleton from redis import StrictRedis, ConnectionPool SERVE_HOST = os.environ.get("SERVE_HOST") # SERVE_HOST = "192.168.100.195" MYSQL_PASSWORD = 'EKoAe3H8xybQKrFPApXM' if SERVE_HOST != "192.168.100.159": SIP_SERVER = SERVE_HOST MYSQL_PASSWORD = "12345678" else: SIP_SERVER = "pbx.fuxicarbon.com" # RADIS_HOST = os.environ.get("REDIS_HOST", "10.0.0.24") # # # 设置mysql连接信息 host从容器配置文件获取 默认为测试mysql 如果获取配置文件是9.54 设置线上mysql # MYSQL_HOST = os.environ.get("MYSQL_HOST", "192.168.9.54") # MYSQL_PASSWORD = 'Hongshan2024@longjiang' # # if MYSQL_HOST == "192.168.9.54": # MYSQL_PASSWORD = "Hongshan2024#longjiang%xinyi" # else: # MYSQL_PASSWORD = "Hongshan2024@longjiang" # @singleton class MysqlHandler: """ 表名: t_xinyi_daily 日报 t_xinyi_industry 工业 t_xinyi_Robot 机器人化验 """ def __init__(self, # host='192.168.100.159', # user='root', # passwd='12345678', host=SERVE_HOST, user='root', passwd=MYSQL_PASSWORD, db='libra_bot', port=3306, charset='utf8' ): print(f"mysql_host:{host}", f"mysql_passwd:{MYSQL_PASSWORD}" ) self.host = host self.user = user self.passwd = passwd self.db = db self.port = port self.charset = charset self.conn = None self._conn() def _conn(self): try: self.conn = pymysql.connect(host=self.host, user=self.user, passwd=self.passwd, db=self.db, port=self.port, charset=self.charset) return True except: traceback.print_exc() return False def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就...... _number = 0 _status = True while _status and _number <= num: try: self.conn.ping() # cping 校验连接是否异常 _status = False except: if self._conn() == True: # 重新连接,成功退出 _status = False break _number += 1 time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束 def select(self, query, params: Tuple = None): try: self._reConn() # 使用 cursor() 方法创建一个游标对象 cursor cursor = self.conn.cursor(pymysql.cursors.DictCursor) # 执行SQL语句 cursor.execute(query, params) # 获取所有记录列表 results = cursor.fetchall() return results except: print(query, params) traceback.print_exc() finally: # 增/删/改均需要进行commit提交,进行保存 # conn.commit() # 关闭游标 cursor.close() def execute(self, sql, values: Tuple = None): try: self._reConn() # 使用 cursor() 方法创建一个游标对象 cursor cursor = self.conn.cursor() # 执行SQL语句 cursor.execute(sql, values) self.conn.commit() return cursor.rowcount, cursor.lastrowid except: print(sql, values) traceback.print_exc() raise finally: cursor.close() def create_database(self, db_name: str): sql = f"CREATE DATABASE IF NOT EXISTS `{db_name}` CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;" try: self.execute(sql) print(f"数据库'{db_name}'创建成功") except pymysql.MySQLError as e: print(f"创建数据库时发生错误: {e}") def use_database(self, db_name: str): try: self.conn.select_db(db_name) except pymysql.ProgrammingError as e: print(f"切换至数据库'{db_name}'时发生错误: {e}") def create_table(self, table_schema: str): try: self.execute(table_schema) print("表创建成功") except pymysql.MySQLError as e: print(f"创建表时发生错误: {e}") def insert(self, table_name: str, values: Tuple, columns: Tuple[str, ...] = None): placeholders = ', '.join(['%s'] * len(values)) if columns: columns_str = ', '.join(columns) sql = f"INSERT INTO `{table_name}` ({columns_str}) VALUES ({placeholders})" else: sql = f"INSERT INTO `{table_name}` VALUES ({placeholders})" try: rowcount, lastrowid = self.execute(sql, values) return lastrowid except pymysql.MySQLError as e: print(f"插入数据时发生错误: {e}") def update(self, table_name: str, set_clause: str, where_clause: str, values: Tuple): sql = f"UPDATE `{table_name}` SET {set_clause} WHERE {where_clause}" try: rowcount, lastrowid = self.execute(sql, values) return rowcount except pymysql.MySQLError as e: print(f"更新数据时发生错误: {e}") def delete(self, table_name: str, where_clause: str, values: Tuple): sql = f"DELETE FROM `{table_name}` WHERE {where_clause}" try: rowcount, lastrowid = self.execute(sql, values) return rowcount except pymysql.MySQLError as e: print(f"删除数据时发生错误: {e}") def select_one(self, query: str, params: Tuple = None): try: result = self.select(query, params) if result and len(result) > 0: return result[0] except pymysql.MySQLError as e: print(f"执行查询时发生错误: {e}") def __del__(self): self.conn.close() @singleton class RedisHandler: def __init__(self, host=SERVE_HOST, port=6379, db=0, password='^YHN&UJM'): try: print(f"redis_host:{host}") self.redis = StrictRedis( connection_pool=ConnectionPool(host=host, port=port, db=db, password=password), decode_responses=True) except Exception as e: traceback.print_exc() def get(self, key): return self.redis.get(key) def set(self, key, val, expire=None, prefix=None): newkey = prefix + key if prefix else key self.redis.set(newkey, val) if expire: self.redis.expire(newkey, expire) def __del__(self): self.redis.close() # def main(): # redis_handler = RedisHandler() # print(redis_handler.redis.info()) # redis_handler.redis.xadd('stop_gen_stream', {'session_id': 3333333}) # # stream_name = 'stop_gen_stream' # consumer_group = 'stop_gen_group' # consumer_name = 'stop_gen_consumer' # # # redis_handler.redis.xgroup_create(stream_name, consumer_group, id='0', mkstream=True) # # # 从流中读取消息 # # while True: # # # 读取数据 # # data = redis_handler.redis.xreadgroup(consumer_group, consumer_name, {stream_name: '>'}, # # count=1, block=1000) # # # 处理收到的消息 # # for stream, messages in data: # # for message in messages: # # message_id = message[0].decode('utf-8') # # message_data = message[1] # # print(f"Received message {message_id}: {message_data}") # # # 在这里可以添加任何自定义的处理逻辑 # # def main1(): mysql = MysqlHandler() rows = mysql.select_one("select * from c_agent") print(rows) # # 创建数据库 # ds.create_database('new_database') # # # 切换到新数据库 # ds.use_database('new_database') # # # 创建表 # table_schema = """ # CREATE TABLE IF NOT EXISTS test_table ( # id INT AUTO_INCREMENT PRIMARY KEY, # name VARCHAR(100), # email VARCHAR(100), # created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP # ); # """ # ds.create_table(table_schema) # # # 插入数据 # ds.insert('test_table', ('John Doe', 'john.doe@example.com')) # # # 更新数据 # set_clause = "name = %s, email = %s" # where_clause = "id = %s" # values = ('Jane Doe', 'jane.doe@example.com', 1) # ds.update('test_table', set_clause, where_clause, values) # # # 删除数据 # where_clause = "id = %s" # values = (1,) # ds.delete('test_table', where_clause, values) if __name__ == "__main__": main1()