#!/usr/bin/env python # -*- coding: utf-8 -*- """ @Time : 2024/10/18 15:28 @File : mysql.py @Desc : """ import os from config import get_logger import pymysql logger = get_logger() import json def get_mysql_config(env='prod'): if env=="prod": mysql_host = os.getenv("mysql_host", "192.168.100.159") mysql_user = os.getenv("mysql_user", "root") mysql_passwd = os.getenv("mysql_passwd", "EKoAe3H8xybQKrFPApXM") db = os.getenv("db", "libra_bot") else: mysql_host = os.getenv("mysql_host", "192.168.9.54") mysql_user = os.getenv("mysql_user", "root") mysql_passwd = os.getenv("mysql_passwd", "Hongshan2024#longjiang%xinyi") db = os.getenv("db", "big_model") return mysql_host, mysql_user, mysql_passwd, db mysql_host, mysql_user, mysql_passwd, db = get_mysql_config() class Mysql(object): def __init__(self, host=mysql_host, username=mysql_user, pwd=mysql_passwd, db=db, **kwargs): self.conn = self.mysql_connect(host, username, pwd, db, **kwargs) self.cursor = self.conn.cursor() @staticmethod def mysql_connect(host, username, pwd, db, **kwargs): try: connect = pymysql.connect(host=host, user=username, passwd=pwd, db=db,**kwargs) logger.info('%s', 'mysql连接成功') return connect except Exception as e: logger.info("数据库连接异常%s", e) return False def insert_records(self, records, table="botrecords"): cols = ["session", "req_time", "uid", "bid", "intent", "contents", "dialog"] colname = ",".join(cols) upname = ",".join([i for i in map(lambda x: x+f"=values({x})", cols[1:])]) session, records =records[0], ",".join([i for i in map(lambda x: "'"+str(x)+"'", records)]) sql = f""" insert into {table} ({colname}) values ({records}) ON DUPLICATE KEY UPDATE {upname} """ logger.info(f"sql:{sql}") self.cursor.execute(sql) self.conn.commit() logger.info('%s', '插入数据成功') def get_records(self, uid, table="botrecords"): sql = f""" select * from libra_bot.{table} where uid={uid} """ self.cursor.execute(sql) content = self.cursor.fetchall() res = {} cols = ["id","session", "req_time", "uid", "bid", "intent", "contents", "dialog"] if len(content) >0: for cell in content: if cell[0] not in res: record = dict(zip(cols, cell)) res[cell[0]] = record return res def get_records_from_db(self, cols, table="botrecords"): sql =f"select * from libra_bot.{table}" self.cursor.execute(sql) content = self.cursor.fetchall() res = {} #cols = ["id", "req_time", "date", "hour", "task", "ext"] if len(content) > 0: for cell in content: if cell[0] not in res: record = zip(cols, cell) res[cell[0]] = dict(record) return res def close_mysql(self): self.cursor.close() if __name__ == "__main__": mysql = Mysql() print(mysql.get_records("13261201206"))