#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2024/4/13 15:12 # @Author : liu import json import time import datetime import decimal import traceback import pymysql import os from typing import List, Tuple from src.core import singleton from redis import StrictRedis, ConnectionPool from src.core.voip.constant import * # RADIS_HOST = os.environ.get("REDIS_HOST", "10.0.0.24") # # # 设置mysql连接信息 host从容器配置文件获取 默认为测试mysql 如果获取配置文件是9.54 设置线上mysql # MYSQL_HOST = os.environ.get("MYSQL_HOST", "192.168.9.54") # MYSQL_PASSWORD = 'Hongshan2024@longjiang' # # if MYSQL_HOST == "192.168.9.54": # MYSQL_PASSWORD = "Hongshan2024#longjiang%xinyi" # else: # MYSQL_PASSWORD = "Hongshan2024@longjiang" # @singleton class MysqlHandler: """ 表名: t_xinyi_daily 日报 t_xinyi_industry 工业 t_xinyi_Robot 机器人化验 """ def __init__(self, # host='192.168.100.159', # user='root', # passwd='12345678', host=SERVE_HOST, user='root', passwd=MYSQL_PASSWORD, db='libra_bot', port=3306, charset='utf8' ): print(f"mysql_host:{host}", f"mysql_passwd:{MYSQL_PASSWORD}" ) self.host = host self.user = user self.passwd = passwd self.db = db self.port = port self.charset = charset self.conn = None self._conn() def _conn(self): try: self.conn = pymysql.connect(host=self.host, user=self.user, passwd=self.passwd, db=self.db, port=self.port, charset=self.charset) return True except: traceback.print_exc() return False def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就...... _number = 0 _status = True while _status and _number <= num: try: self.conn.ping() # cping 校验连接是否异常 _status = False except: if self._conn() == True: # 重新连接,成功退出 _status = False break _number += 1 time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束 def select(self, query, params: Tuple = None): try: self._reConn() # 使用 cursor() 方法创建一个游标对象 cursor cursor = self.conn.cursor(pymysql.cursors.DictCursor) # 执行SQL语句 cursor.execute(query, params) # 获取所有记录列表 results = cursor.fetchall() return results except: print(query, params) traceback.print_exc() finally: # 增/删/改均需要进行commit提交,进行保存 # conn.commit() # 关闭游标 cursor.close() def execute(self, sql, values: Tuple = None): try: self._reConn() # 使用 cursor() 方法创建一个游标对象 cursor cursor = self.conn.cursor() # 执行SQL语句 cursor.execute(sql, values) self.conn.commit() return cursor.rowcount, cursor.lastrowid except: print(sql, values) traceback.print_exc() raise finally: cursor.close() def create_database(self, db_name: str): sql = f"CREATE DATABASE IF NOT EXISTS `{db_name}` CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;" try: self.execute(sql) print(f"数据库'{db_name}'创建成功") except pymysql.MySQLError as e: print(f"创建数据库时发生错误: {e}") def use_database(self, db_name: str): try: self.conn.select_db(db_name) except pymysql.ProgrammingError as e: print(f"切换至数据库'{db_name}'时发生错误: {e}") def create_table(self, table_schema: str): try: self.execute(table_schema) print("表创建成功") except pymysql.MySQLError as e: print(f"创建表时发生错误: {e}") def insert(self, table_name: str, values: Tuple, columns: Tuple[str, ...] = None): placeholders = ', '.join(['%s'] * len(values)) if columns: columns_str = ', '.join(columns) sql = f"INSERT INTO `{table_name}` ({columns_str}) VALUES ({placeholders})" else: sql = f"INSERT INTO `{table_name}` VALUES ({placeholders})" try: rowcount, lastrowid = self.execute(sql, values) return lastrowid except pymysql.MySQLError as e: print(f"插入数据时发生错误: {e}") def update(self, table_name: str, set_clause: str, where_clause: str, values: Tuple): sql = f"UPDATE `{table_name}` SET {set_clause} WHERE {where_clause}" try: rowcount, lastrowid = self.execute(sql, values) return rowcount except pymysql.MySQLError as e: print(f"更新数据时发生错误: {e}") def delete(self, table_name: str, where_clause: str, values: Tuple): sql = f"DELETE FROM `{table_name}` WHERE {where_clause}" try: rowcount, lastrowid = self.execute(sql, values) return rowcount except pymysql.MySQLError as e: print(f"删除数据时发生错误: {e}") def select_one(self, query: str, params: Tuple = None): try: result = self.select(query, params) if result and len(result) > 0: return result[0] except pymysql.MySQLError as e: print(f"执行查询时发生错误: {e}") def __del__(self): self.conn.close() @singleton class RedisHandler: def __init__(self, host=SERVE_HOST, port=6379, db=0, password='^YHN&UJM'): try: print(f"redis_host:{host}") self.redis = StrictRedis( connection_pool=ConnectionPool(host=host, port=port, db=db, password=password)) except Exception as e: traceback.print_exc() def get(self, key): return self.redis.get(key) def set(self, key, val, expire=None, prefix=None): newkey = prefix + key if prefix else key self.redis.set(newkey, val) if expire: self.redis.expire(newkey, expire) def __del__(self): self.redis.close() # def main(): # redis_handler = RedisHandler() # print(redis_handler.redis.info()) # redis_handler.redis.xadd('stop_gen_stream', {'session_id': 3333333}) # # stream_name = 'stop_gen_stream' # consumer_group = 'stop_gen_group' # consumer_name = 'stop_gen_consumer' # # # redis_handler.redis.xgroup_create(stream_name, consumer_group, id='0', mkstream=True) # # # 从流中读取消息 # # while True: # # # 读取数据 # # data = redis_handler.redis.xreadgroup(consumer_group, consumer_name, {stream_name: '>'}, # # count=1, block=1000) # # # 处理收到的消息 # # for stream, messages in data: # # for message in messages: # # message_id = message[0].decode('utf-8') # # message_data = message[1] # # print(f"Received message {message_id}: {message_data}") # # # 在这里可以添加任何自定义的处理逻辑 # # def main1(): mysql = MysqlHandler() rows = mysql.select_one("select * from c_agent") print(rows) # # 创建数据库 # ds.create_database('new_database') # # # 切换到新数据库 # ds.use_database('new_database') # # # 创建表 # table_schema = """ # CREATE TABLE IF NOT EXISTS test_table ( # id INT AUTO_INCREMENT PRIMARY KEY, # name VARCHAR(100), # email VARCHAR(100), # created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP # ); # """ # ds.create_table(table_schema) # # # 插入数据 # ds.insert('test_table', ('John Doe', 'john.doe@example.com')) # # # 更新数据 # set_clause = "name = %s, email = %s" # where_clause = "id = %s" # values = ('Jane Doe', 'jane.doe@example.com', 1) # ds.update('test_table', set_clause, where_clause, values) # # # 删除数据 # where_clause = "id = %s" # values = (1,) # ds.delete('test_table', where_clause, values) if __name__ == "__main__": main1()