diff --git a/ruoyi-fastapi-backend/.env.dev b/ruoyi-fastapi-backend/.env.dev index afc6dd6a4d9d36ca35eb29fe408f0a1c908379e3..cc7e6062b1290e515e899db2dbbc3ddd528ef5c0 100644 --- a/ruoyi-fastapi-backend/.env.dev +++ b/ruoyi-fastapi-backend/.env.dev @@ -54,13 +54,15 @@ DB_POOL_RECYCLE = 3600 DB_POOL_TIMEOUT = 30 # -------- Redis配置 -------- -# Redis主机 +# Redis集群模式是否开启 +REDIS_CLUSTER = False +# Redis主机或集群,集群模式,格式:'127.0.0.1:7670,127.0.0.1:7671' REDIS_HOST = '127.0.0.1' -# Redis端口 +# Redis端口,集群模式不用 REDIS_PORT = 6379 # Redis用户名 REDIS_USERNAME = '' # Redis密码 REDIS_PASSWORD = '' -# Redis数据库 +# Redis数据库,集群模式不用 REDIS_DATABASE = 2 \ No newline at end of file diff --git a/ruoyi-fastapi-backend/.env.prod b/ruoyi-fastapi-backend/.env.prod index 6dac0e69ebe2b24234b2d4decf5e671914928dc1..7150564ebe0fc5e5269d60ce9805dcd900406b57 100644 --- a/ruoyi-fastapi-backend/.env.prod +++ b/ruoyi-fastapi-backend/.env.prod @@ -54,13 +54,15 @@ DB_POOL_RECYCLE = 3600 DB_POOL_TIMEOUT = 30 # -------- Redis配置 -------- -# Redis主机 +# Redis集群模式是否开启 +REDIS_CLUSTER = False +# Redis主机或集群,集群模式,格式:'127.0.0.1:7670,127.0.0.1:7671' REDIS_HOST = '127.0.0.1' -# Redis端口 +# Redis端口,集群模式不用 REDIS_PORT = 6379 # Redis用户名 REDIS_USERNAME = '' # Redis密码 REDIS_PASSWORD = '' -# Redis数据库 +# Redis数据库,集群模式不用 REDIS_DATABASE = 2 \ No newline at end of file diff --git a/ruoyi-fastapi-backend/config/env.py b/ruoyi-fastapi-backend/config/env.py index 78378a6489f3ceff880bbd4571fc0d8e3dd85af8..b0a4c7038a60c68c0042ca495b0834cdce29e21a 100644 --- a/ruoyi-fastapi-backend/config/env.py +++ b/ruoyi-fastapi-backend/config/env.py @@ -56,7 +56,7 @@ class RedisSettings(BaseSettings): """ Redis配置 """ - + redis_cluster: bool = False redis_host: str = '127.0.0.1' redis_port: int = 6379 redis_username: str = '' diff --git a/ruoyi-fastapi-backend/config/get_redis.py b/ruoyi-fastapi-backend/config/get_redis.py index 9d78cad0c935cf9fc91a6f3b67aecff132174178..a430139da70e1c57a8f3e9f208011a04ee0bb3c3 100644 --- a/ruoyi-fastapi-backend/config/get_redis.py +++ b/ruoyi-fastapi-backend/config/get_redis.py @@ -1,4 +1,5 @@ from redis import asyncio as aioredis +from redis.cluster import ClusterNode from redis.exceptions import AuthenticationError, TimeoutError, RedisError from config.database import AsyncSessionLocal from config.env import RedisConfig @@ -7,6 +8,35 @@ from module_admin.service.dict_service import DictDataService from utils.log_util import logger +def create_cluster_nodes(node_string): + """ + 创建集群节点列表。 + + 根据传入的节点字符串,生成一个包含多个ClusterNode对象的列表。 + 节点字符串的格式为"host1:port1,host2:port2,...",每个节点的主机名和端口号用冒号分隔, + 不同节点之间用逗号分隔。 + + 参数: + - node_string (str): 节点字符串,格式为"host1:port1,host2:port2,..." + + 返回: + - list: 包含多个ClusterNode对象的列表,每个ClusterNode对象表示一个集群节点。 + """ + # 初始化节点列表 + nodes = [] + + # 遍历节点字符串,生成ClusterNode对象,并添加到节点列表中 + for node in node_string.split(','): + # 分割节点字符串,获取主机名和端口号 + host, port = node.split(':') + + # 将主机名和端口号转换为ClusterNode对象,并添加到节点列表中 + nodes.append(ClusterNode(host=host, port=int(port))) + + # 返回节点列表 + return nodes + + class RedisUtil: """ Redis相关方法 @@ -20,15 +50,28 @@ class RedisUtil: :return: Redis连接对象 """ logger.info('开始连接redis...') - redis = await aioredis.from_url( - url=f'redis://{RedisConfig.redis_host}', - port=RedisConfig.redis_port, - username=RedisConfig.redis_username, - password=RedisConfig.redis_password, - db=RedisConfig.redis_database, - encoding='utf-8', - decode_responses=True, - ) + if RedisConfig.redis_cluster: + logger.info(f'redis集群模式,集群地址:{RedisConfig.redis_host}') + # 创建 ClusterNode 实例列表 + startup_nodes = create_cluster_nodes(RedisConfig.redis_host) + redis = await aioredis.RedisCluster( + startup_nodes=startup_nodes, + username=RedisConfig.redis_username, + password=RedisConfig.redis_password, + encoding='utf-8', + decode_responses=True, + ) + else: + logger.info(f'redis单机模式,单机地址:{RedisConfig.redis_host}:{RedisConfig.redis_port}') + redis = await aioredis.from_url( + url=f'redis://{RedisConfig.redis_host}', + port=RedisConfig.redis_port, + username=RedisConfig.redis_username, + password=RedisConfig.redis_password, + db=RedisConfig.redis_database, + encoding='utf-8', + decode_responses=True, + ) try: connection = await redis.ping() if connection: diff --git a/ruoyi-fastapi-backend/config/get_scheduler.py b/ruoyi-fastapi-backend/config/get_scheduler.py index 2c9457b3c5234636fbaafe6f51ddfd1a52a8a916..742b0b353abbaf8bbcbd10ab3a409e722a3c451c 100644 --- a/ruoyi-fastapi-backend/config/get_scheduler.py +++ b/ruoyi-fastapi-backend/config/get_scheduler.py @@ -3,7 +3,8 @@ from apscheduler.events import EVENT_ALL from apscheduler.executors.asyncio import AsyncIOExecutor from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.jobstores.memory import MemoryJobStore -from apscheduler.jobstores.redis import RedisJobStore +from apscheduler.jobstores.redis import RedisJobStore as BaseRedisJobStore +from apscheduler.jobstores.base import BaseJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger @@ -20,6 +21,56 @@ from module_admin.service.job_log_service import JobLogService from utils.log_util import logger import module_task # noqa: F401 +try: + import cPickle as pickle +except ImportError: # pragma: nocover + import pickle + +try: + from redis import Redis + from redis.cluster import RedisCluster, ClusterNode +except ImportError: # pragma: nocover + raise ImportError('RedisJobStore requires redis installed') + + +def create_cluster_nodes(node_string): + nodes = [] + for node in node_string.split(','): + host, port = node.split(':') + nodes.append(ClusterNode(host=host, port=int(port))) + return nodes + +class RedisJobStore(BaseRedisJobStore): + def __init__(self, db=0, jobs_key='apscheduler.jobs', run_times_key='apscheduler.run_times', + pickle_protocol=pickle.HIGHEST_PROTOCOL, cluster=False, **connect_args): + super(BaseJobStore, self).__init__() + + if db is None: + raise ValueError('The "db" parameter must not be empty') + if not jobs_key: + raise ValueError('The "jobs_key" parameter must not be empty') + if not run_times_key: + raise ValueError('The "run_times_key" parameter must not be empty') + self.pickle_protocol = pickle_protocol + self.jobs_key = jobs_key + self.run_times_key = run_times_key + self.__cluster = cluster + if not self.__cluster: + self.redis = Redis(db=int(db), **connect_args) + else: + self.startup_nodes = create_cluster_nodes(connect_args.get('host')) + self.redis = RedisCluster( + startup_nodes=self.startup_nodes, + username=connect_args.get('username'), + password=connect_args.get('password'), + decode_responses=connect_args.get('decode_responses', True), + ) + + def shutdown(self): + if not self.__cluster: + self.redis.connection_pool.disconnect() + else: + self.redis.disconnect_connection_pools() # 重写Cron定时 class MyCronTrigger(CronTrigger): @@ -103,6 +154,7 @@ job_stores = { 'sqlalchemy': SQLAlchemyJobStore(url=SQLALCHEMY_DATABASE_URL, engine=engine), 'redis': RedisJobStore( **dict( + cluster=RedisConfig.redis_cluster, host=RedisConfig.redis_host, port=RedisConfig.redis_port, username=RedisConfig.redis_username,