mysql.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. @Time : 2024/10/18 15:28
  5. @File : mysql.py
  6. @Desc :
  7. """
  8. import os
  9. from config import get_logger
  10. import pymysql
  11. logger = get_logger()
  12. import json
  13. def get_mysql_config(env='prod'):
  14. if env=="prod":
  15. mysql_host = os.getenv("mysql_host", "192.168.100.159")
  16. mysql_user = os.getenv("mysql_user", "root")
  17. mysql_passwd = os.getenv("mysql_passwd", "EKoAe3H8xybQKrFPApXM")
  18. db = os.getenv("db", "libra_bot")
  19. else:
  20. mysql_host = os.getenv("mysql_host", "192.168.9.54")
  21. mysql_user = os.getenv("mysql_user", "root")
  22. mysql_passwd = os.getenv("mysql_passwd", "Hongshan2024#longjiang%xinyi")
  23. db = os.getenv("db", "big_model")
  24. return mysql_host, mysql_user, mysql_passwd, db
  25. mysql_host, mysql_user, mysql_passwd, db = get_mysql_config()
  26. class Mysql(object):
  27. def __init__(self, host=mysql_host, username=mysql_user, pwd=mysql_passwd, db=db, **kwargs):
  28. self.conn = self.mysql_connect(host, username, pwd, db, **kwargs)
  29. self.cursor = self.conn.cursor()
  30. @staticmethod
  31. def mysql_connect(host, username, pwd, db, **kwargs):
  32. try:
  33. connect = pymysql.connect(host=host, user=username, passwd=pwd, db=db,**kwargs)
  34. logger.info('%s', 'mysql连接成功')
  35. return connect
  36. except Exception as e:
  37. logger.info("数据库连接异常%s", e)
  38. return False
  39. def insert_records(self, records, table="botrecords"):
  40. cols = ["session", "req_time", "uid", "bid", "intent", "contents", "dialog"]
  41. colname = ",".join(cols)
  42. upname = ",".join([i for i in map(lambda x: x+f"=values({x})", cols[1:])])
  43. session, records =records[0], ",".join([i for i in map(lambda x: "'"+str(x)+"'", records)])
  44. sql = f"""
  45. insert into {table} ({colname}) values ({records}) ON DUPLICATE KEY UPDATE {upname}
  46. """
  47. logger.info(f"sql:{sql}")
  48. self.cursor.execute(sql)
  49. self.conn.commit()
  50. logger.info('%s', '插入数据成功')
  51. def get_records(self, uid, table="botrecords"):
  52. sql = f"""
  53. select * from libra_bot.{table} where uid={uid}
  54. """
  55. self.cursor.execute(sql)
  56. content = self.cursor.fetchall()
  57. res = {}
  58. cols = ["id","session", "req_time", "uid", "bid", "intent", "contents", "dialog"]
  59. if len(content) >0:
  60. for cell in content:
  61. if cell[0] not in res:
  62. record = dict(zip(cols, cell))
  63. res[cell[0]] = record
  64. return res
  65. def get_records_from_db(self, cols, table="botrecords"):
  66. sql =f"select * from libra_bot.{table}"
  67. self.cursor.execute(sql)
  68. content = self.cursor.fetchall()
  69. res = {}
  70. #cols = ["id", "req_time", "date", "hour", "task", "ext"]
  71. if len(content) > 0:
  72. for cell in content:
  73. if cell[0] not in res:
  74. record = zip(cols, cell)
  75. res[cell[0]] = dict(record)
  76. return res
  77. def close_mysql(self):
  78. self.cursor.close()
  79. if __name__ == "__main__":
  80. mysql = Mysql()
  81. print(mysql.get_records("13261201206"))