123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # @Time : 2024/4/13 15:12
- # @Author : liu
- import json
- import time
- import datetime
- import decimal
- import traceback
- import pymysql
- import os
- from typing import List, Tuple
- from src.core import singleton
- from redis import StrictRedis, ConnectionPool
- SERVE_HOST = os.environ.get("SERVE_HOST")
- # SERVE_HOST = "192.168.100.195"
- MYSQL_PASSWORD = 'EKoAe3H8xybQKrFPApXM'
- if SERVE_HOST != "192.168.100.159":
- SIP_SERVER = SERVE_HOST
- MYSQL_PASSWORD = "12345678"
- else:
- SIP_SERVER = "pbx.fuxicarbon.com"
- # RADIS_HOST = os.environ.get("REDIS_HOST", "10.0.0.24")
- #
- # # 设置mysql连接信息 host从容器配置文件获取 默认为测试mysql 如果获取配置文件是9.54 设置线上mysql
- # MYSQL_HOST = os.environ.get("MYSQL_HOST", "192.168.9.54")
- # MYSQL_PASSWORD = 'Hongshan2024@longjiang'
- #
- # if MYSQL_HOST == "192.168.9.54":
- # MYSQL_PASSWORD = "Hongshan2024#longjiang%xinyi"
- # else:
- # MYSQL_PASSWORD = "Hongshan2024@longjiang"
- # @singleton
- class MysqlHandler:
- """
- 表名:
- t_xinyi_daily 日报
- t_xinyi_industry 工业
- t_xinyi_Robot 机器人化验
- """
- def __init__(self,
- # host='192.168.100.159',
- # user='root',
- # passwd='12345678',
- host=SERVE_HOST,
- user='root',
- passwd=MYSQL_PASSWORD,
- db='libra_bot',
- port=3306,
- charset='utf8'
- ):
- print(f"mysql_host:{host}", f"mysql_passwd:{MYSQL_PASSWORD}" )
- self.host = host
- self.user = user
- self.passwd = passwd
- self.db = db
- self.port = port
- self.charset = charset
- self.conn = None
- self._conn()
- def _conn(self):
- try:
- self.conn = pymysql.connect(host=self.host, user=self.user, passwd=self.passwd, db=self.db, port=self.port,
- charset=self.charset)
- return True
- except:
- traceback.print_exc()
- return False
- def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就......
- _number = 0
- _status = True
- while _status and _number <= num:
- try:
- self.conn.ping() # cping 校验连接是否异常
- _status = False
- except:
- if self._conn() == True: # 重新连接,成功退出
- _status = False
- break
- _number += 1
- time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
- def select(self, query, params: Tuple = None):
- try:
- self._reConn()
- # 使用 cursor() 方法创建一个游标对象 cursor
- cursor = self.conn.cursor(pymysql.cursors.DictCursor)
- # 执行SQL语句
- cursor.execute(query, params)
- # 获取所有记录列表
- results = cursor.fetchall()
- return results
- except:
- print(query, params)
- traceback.print_exc()
- finally:
- # 增/删/改均需要进行commit提交,进行保存
- # conn.commit()
- # 关闭游标
- cursor.close()
- def execute(self, sql, values: Tuple = None):
- try:
- self._reConn()
- # 使用 cursor() 方法创建一个游标对象 cursor
- cursor = self.conn.cursor()
- # 执行SQL语句
- cursor.execute(sql, values)
- self.conn.commit()
- return cursor.rowcount, cursor.lastrowid
- except:
- print(sql, values)
- traceback.print_exc()
- raise
- finally:
- cursor.close()
- def create_database(self, db_name: str):
- sql = f"CREATE DATABASE IF NOT EXISTS `{db_name}` CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;"
- try:
- self.execute(sql)
- print(f"数据库'{db_name}'创建成功")
- except pymysql.MySQLError as e:
- print(f"创建数据库时发生错误: {e}")
- def use_database(self, db_name: str):
- try:
- self.conn.select_db(db_name)
- except pymysql.ProgrammingError as e:
- print(f"切换至数据库'{db_name}'时发生错误: {e}")
- def create_table(self, table_schema: str):
- try:
- self.execute(table_schema)
- print("表创建成功")
- except pymysql.MySQLError as e:
- print(f"创建表时发生错误: {e}")
- def insert(self, table_name: str, values: Tuple, columns: Tuple[str, ...] = None):
- placeholders = ', '.join(['%s'] * len(values))
- if columns:
- columns_str = ', '.join(columns)
- sql = f"INSERT INTO `{table_name}` ({columns_str}) VALUES ({placeholders})"
- else:
- sql = f"INSERT INTO `{table_name}` VALUES ({placeholders})"
- try:
- rowcount, lastrowid = self.execute(sql, values)
- return lastrowid
- except pymysql.MySQLError as e:
- print(f"插入数据时发生错误: {e}")
- def update(self, table_name: str, set_clause: str, where_clause: str, values: Tuple):
- sql = f"UPDATE `{table_name}` SET {set_clause} WHERE {where_clause}"
- try:
- rowcount, lastrowid = self.execute(sql, values)
- return rowcount
- except pymysql.MySQLError as e:
- print(f"更新数据时发生错误: {e}")
- def delete(self, table_name: str, where_clause: str, values: Tuple):
- sql = f"DELETE FROM `{table_name}` WHERE {where_clause}"
- try:
- rowcount, lastrowid = self.execute(sql, values)
- return rowcount
- except pymysql.MySQLError as e:
- print(f"删除数据时发生错误: {e}")
- def select_one(self, query: str, params: Tuple = None):
- try:
- result = self.select(query, params)
- if result and len(result) > 0:
- return result[0]
- except pymysql.MySQLError as e:
- print(f"执行查询时发生错误: {e}")
- def __del__(self):
- self.conn.close()
- @singleton
- class RedisHandler:
- def __init__(self, host=SERVE_HOST, port=6379, db=0, password='^YHN&UJM'):
- try:
- print(f"redis_host:{host}")
- self.redis = StrictRedis(
- connection_pool=ConnectionPool(host=host, port=port, db=db, password=password),
- decode_responses=True)
- except Exception as e:
- traceback.print_exc()
- def get(self, key):
- return self.redis.get(key)
- def set(self, key, val, expire=None, prefix=None):
- newkey = prefix + key if prefix else key
- self.redis.set(newkey, val)
- if expire:
- self.redis.expire(newkey, expire)
- def __del__(self):
- self.redis.close()
- # def main():
- # redis_handler = RedisHandler()
- # print(redis_handler.redis.info())
- # redis_handler.redis.xadd('stop_gen_stream', {'session_id': 3333333})
- #
- # stream_name = 'stop_gen_stream'
- # consumer_group = 'stop_gen_group'
- # consumer_name = 'stop_gen_consumer'
- # # # redis_handler.redis.xgroup_create(stream_name, consumer_group, id='0', mkstream=True)
- # # # 从流中读取消息
- # # while True:
- # # # 读取数据
- # # data = redis_handler.redis.xreadgroup(consumer_group, consumer_name, {stream_name: '>'},
- # # count=1, block=1000)
- # # # 处理收到的消息
- # # for stream, messages in data:
- # # for message in messages:
- # # message_id = message[0].decode('utf-8')
- # # message_data = message[1]
- # # print(f"Received message {message_id}: {message_data}")
- # # # 在这里可以添加任何自定义的处理逻辑
- #
- #
- def main1():
- mysql = MysqlHandler()
- rows = mysql.select_one("select * from c_agent")
- print(rows)
- # # 创建数据库
- # ds.create_database('new_database')
- #
- # # 切换到新数据库
- # ds.use_database('new_database')
- #
- # # 创建表
- # table_schema = """
- # CREATE TABLE IF NOT EXISTS test_table (
- # id INT AUTO_INCREMENT PRIMARY KEY,
- # name VARCHAR(100),
- # email VARCHAR(100),
- # created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- # );
- # """
- # ds.create_table(table_schema)
- #
- # # 插入数据
- # ds.insert('test_table', ('John Doe', 'john.doe@example.com'))
- #
- # # 更新数据
- # set_clause = "name = %s, email = %s"
- # where_clause = "id = %s"
- # values = ('Jane Doe', 'jane.doe@example.com', 1)
- # ds.update('test_table', set_clause, where_clause, values)
- #
- # # 删除数据
- # where_clause = "id = %s"
- # values = (1,)
- # ds.delete('test_table', where_clause, values)
- if __name__ == "__main__":
- main1()
|