123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- @Time : 2024/10/18 15:28
- @File : mysql.py
- @Desc :
- """
- import os
- from config import get_logger
- import pymysql
- logger = get_logger()
- import json
- def get_mysql_config(env='prod'):
- if env=="prod":
- mysql_host = os.getenv("mysql_host", "192.168.100.159")
- mysql_user = os.getenv("mysql_user", "root")
- mysql_passwd = os.getenv("mysql_passwd", "EKoAe3H8xybQKrFPApXM")
- db = os.getenv("db", "libra_bot")
- else:
- mysql_host = os.getenv("mysql_host", "192.168.9.54")
- mysql_user = os.getenv("mysql_user", "root")
- mysql_passwd = os.getenv("mysql_passwd", "Hongshan2024#longjiang%xinyi")
- db = os.getenv("db", "big_model")
- return mysql_host, mysql_user, mysql_passwd, db
- mysql_host, mysql_user, mysql_passwd, db = get_mysql_config()
- class Mysql(object):
- def __init__(self, host=mysql_host, username=mysql_user, pwd=mysql_passwd, db=db, **kwargs):
- self.conn = self.mysql_connect(host, username, pwd, db, **kwargs)
- self.cursor = self.conn.cursor()
- @staticmethod
- def mysql_connect(host, username, pwd, db, **kwargs):
- try:
- connect = pymysql.connect(host=host, user=username, passwd=pwd, db=db,**kwargs)
- logger.info('%s', 'mysql连接成功')
- return connect
- except Exception as e:
- logger.info("数据库连接异常%s", e)
- return False
- def insert_records(self, records, table="botrecords"):
- cols = ["session", "req_time", "uid", "bid", "intent", "contents", "dialog"]
- colname = ",".join(cols)
- upname = ",".join([i for i in map(lambda x: x+f"=values({x})", cols[1:])])
- session, records =records[0], ",".join([i for i in map(lambda x: "'"+str(x)+"'", records)])
- sql = f"""
- insert into {table} ({colname}) values ({records}) ON DUPLICATE KEY UPDATE {upname}
- """
- logger.info(f"sql:{sql}")
- self.cursor.execute(sql)
- self.conn.commit()
- logger.info('%s', '插入数据成功')
- def get_records(self, uid, table="botrecords"):
- sql = f"""
- select * from libra_bot.{table} where uid={uid}
- """
- self.cursor.execute(sql)
- content = self.cursor.fetchall()
- res = {}
- cols = ["id","session", "req_time", "uid", "bid", "intent", "contents", "dialog"]
- if len(content) >0:
- for cell in content:
- if cell[0] not in res:
- record = dict(zip(cols, cell))
- res[cell[0]] = record
- return res
- def get_records_from_db(self, cols, table="botrecords"):
- sql =f"select * from libra_bot.{table}"
- self.cursor.execute(sql)
- content = self.cursor.fetchall()
- res = {}
- #cols = ["id", "req_time", "date", "hour", "task", "ext"]
- if len(content) > 0:
- for cell in content:
- if cell[0] not in res:
- record = zip(cols, cell)
- res[cell[0]] = dict(record)
- return res
- def close_mysql(self):
- self.cursor.close()
- if __name__ == "__main__":
- mysql = Mysql()
- print(mysql.get_records("13261201206"))
|