123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # @Time : 2024/4/13 15:12
- # @Author : liu
- import json
- import time
- import datetime
- import decimal
- import traceback
- import pymysql
- import os
- from src.core import singleton
- from redis import StrictRedis, ConnectionPool
- RADIS_HOST = os.environ.get("REDIS_HOST", "10.0.0.24")
- # 设置mysql连接信息 host从容器配置文件获取 默认为测试mysql 如果获取配置文件是9.54 设置线上mysql
- MYSQL_HOST = os.environ.get("MYSQL_HOST", "192.168.9.54")
- MYSQL_PASSWORD = 'Hongshan2024@longjiang'
- if MYSQL_HOST == "192.168.9.54":
- MYSQL_PASSWORD = "Hongshan2024#longjiang%xinyi"
- else:
- MYSQL_PASSWORD = "Hongshan2024@longjiang"
- # @singleton
- class MysqlHandler:
- """
- 表名:
- t_xinyi_daily 日报
- t_xinyi_industry 工业
- t_xinyi_Robot 机器人化验
- """
- def __init__(self,
- # host='10.0.0.28',
- # user='root',
- # passwd='Hongshan2024@longjiang',
- host=MYSQL_HOST,
- user='root',
- passwd=MYSQL_PASSWORD,
- db='big_model',
- port=3306,
- charset='utf8'
- ):
- self.host = host
- self.user = user
- self.passwd = passwd
- self.db = db
- self.port = port
- self.charset = charset
- self.conn = None
- self._conn()
- def _conn(self):
- try:
- self.conn = pymysql.connect(host=self.host, user=self.user, passwd=self.passwd, db=self.db, port=self.port,
- charset=self.charset)
- return True
- except:
- traceback.print_exc()
- return False
- def _reConn(self, num=28800, stime=3): # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就......
- _number = 0
- _status = True
- while _status and _number <= num:
- try:
- self.conn.ping() # cping 校验连接是否异常
- _status = False
- except:
- if self._conn() == True: # 重新连接,成功退出
- _status = False
- break
- _number += 1
- time.sleep(stime) # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
- def query(self, sql):
- try:
- self._reConn()
- # 使用 cursor() 方法创建一个游标对象 cursor
- cursor = self.conn.cursor(pymysql.cursors.DictCursor)
- # 执行SQL语句
- cursor.execute(sql)
- # 获取所有记录列表
- results = cursor.fetchall()
- return results
- except:
- print(sql)
- traceback.print_exc()
- finally:
- # 增/删/改均需要进行commit提交,进行保存
- # conn.commit()
- # 关闭游标
- cursor.close()
- def exec(self, sql):
- try:
- # 使用 cursor() 方法创建一个游标对象 cursor
- cursor = self.db.cursor()
- # 执行SQL语句
- cursor.execute(sql)
- self.db.commit()
- except Exception as e:
- print(e)
- finally:
- cursor.close()
- def __del__(self):
- self.conn.close()
- @singleton
- class RedisHandler:
- def __init__(self, host='192.168.124.6', port=6379, db=0, password='^YHN&UJM'):
- try:
- # host = '10.0.0.24'
- # host = RADIS_HOST
- self.redis = StrictRedis(
- connection_pool=ConnectionPool(host=host, port=port, db=db, password=password))
- except Exception as e:
- traceback.print_exc()
- def get(self, key):
- return self.redis.get(key)
- def set(self, key, val, expire=None, prefix=None):
- newkey = prefix + key if prefix else key
- self.redis.set(newkey, val)
- if expire:
- self.redis.expire(newkey, expire)
- def __del__(self):
- self.redis.close()
- # def main():
- # redis_handler = RedisHandler()
- # print(redis_handler.redis.info())
- # redis_handler.redis.xadd('stop_gen_stream', {'session_id': 3333333})
- #
- # stream_name = 'stop_gen_stream'
- # consumer_group = 'stop_gen_group'
- # consumer_name = 'stop_gen_consumer'
- # # # redis_handler.redis.xgroup_create(stream_name, consumer_group, id='0', mkstream=True)
- # # # 从流中读取消息
- # # while True:
- # # # 读取数据
- # # data = redis_handler.redis.xreadgroup(consumer_group, consumer_name, {stream_name: '>'},
- # # count=1, block=1000)
- # # # 处理收到的消息
- # # for stream, messages in data:
- # # for message in messages:
- # # message_id = message[0].decode('utf-8')
- # # message_data = message[1]
- # # print(f"Received message {message_id}: {message_data}")
- # # # 在这里可以添加任何自定义的处理逻辑
- #
- #
- # def main1():
- # mysql = MysqlHandler()
- # count = mysql.query("select count(0) as totals from t_xinyi_industry where TEST_TIME like '%00'")
- # print(count)
- #
- # rows = mysql.query("select * from t_xinyi_industry where TEST_TIME like '%00' limit 10")
- # for idx, cols in enumerate(rows):
- # print(cols.get('TEST_TIME'), cols.get('JS_COD'))
- #
- #
- # if __name__ == "__main__":
- # main()
|