代码拉取完成,页面将自动刷新
同步操作将从 BidingCC/码多多 全能知识库(Python 版) 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
# +----------------------------------------------------------------------
# | ChatWork智能聊天办公系统
# +----------------------------------------------------------------------
# | 软件声明: 本系统并非自由软件,未经授权任何形式的商业使用均属非法。
# | 版权保护: 任何企业和个人不允许对程序代码以任何形式任何目的复制/分发。
# | 授权要求: 如有商业使用需求,请务必先与版权所有者取得联系并获得正式授权。
# +----------------------------------------------------------------------
# | Author: ChatWork Team <2474369941@qq.com>
# +----------------------------------------------------------------------
import os
import sys
import gzip
import logging
import importlib
from datetime import datetime
from typing import Dict, List, IO
_LEVEL_NUM = {
"notset": logging.NOTSET,
"debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL
}
__all__ = ["configure_logger"]
class CompressedFileHandler(logging.FileHandler):
def __init__(self, filename, mode="a", encoding=None, delay=False, gzip_size=None):
super().__init__(filename, mode, encoding, delay)
self.gzip_size: int = int(gzip_size) if gzip_size else 1024 * 1024 * 5
self.filename: str = filename
def emit(self, record):
""" Emit a record """
if self.stream is None:
self.stream: IO = self._open()
if self.stream is not None and self.do_dir():
self.stream.close()
self.stream = self._open()
self.do_zip()
logging.StreamHandler.emit(self, record)
def do_dir(self):
""" Rebuild log directory file """
year: str = datetime.now().strftime("%Y%m")
days: str = datetime.now().strftime("%d")
path: str = "/".join(self.baseFilename.split("/")[:-2])
dirs: str = f"{path}/{year}"
self.baseFilename = f"{dirs}/{days}.log"
if not os.path.exists(dirs):
os.makedirs(dirs)
if not os.path.exists(self.baseFilename):
return True
return False
def do_zip(self):
""" Compress and archive logs """
if os.path.exists(self.baseFilename) and os.path.getsize(self.baseFilename) > self.gzip_size:
with open(self.baseFilename, "rb") as f_in, gzip.open(self.baseFilename + ".gz", "wb") as f_out:
f_out.writelines(f_in)
open(self.baseFilename, "w").close()
def configure_logger():
""" Configure Logger """
config = __loading_logs_configs()
path: str = config.get("path") or "runtime/log"
gzip_size: int = int(config.get("gzip_size")) or 1024 * 1024 * 5
level_file: int = _LEVEL_NUM[config.get("level_file") or "debug"]
level_sole: int = _LEVEL_NUM[config.get("level_sole") or "info"]
enable_file: bool = config.get("enable_file") or True
enable_sole: bool = config.get("enable_sole") or True
format_file: str = (config.get("format_file")
or "[%(asctime)s][%(levelname)s] [%(filename)s:%(lineno)d] [%(thread)d] - %(message)s")
format_sole: str = (config.get("format_sole")
or "[%(levelname)s]: [%(filename)s:%(lineno)d] [%(thread)d] - %(message)s")
format_date: str = config.get("format_date") or "%Y-%m-%d %H:%M:%S %p"
rely_levels: Dict[str, List[str]] = config.get("rely_levels") or {}
handlers = []
if enable_file:
year: str = datetime.now().strftime("%Y%m")
days: str = datetime.now().strftime("%d")
path: str = f"{path}/{year}"
if not os.path.exists(path):
os.makedirs(path)
file_handler = CompressedFileHandler(filename=f"{path}/{days}.log", gzip_size=gzip_size)
file_handler.setFormatter(logging.Formatter(format_file))
file_handler.setLevel(level_file)
handlers.append(file_handler)
if enable_sole:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(format_sole))
console_handler.setLevel(level_sole)
handlers.append(console_handler)
logging.basicConfig(
level=logging.NOTSET,
format=format_file,
datefmt=format_date,
handlers=handlers if handlers else None
)
logging.getLogger("asyncio").setLevel(logging.ERROR)
logging.getLogger("tortoise").setLevel(logging.ERROR)
logging.getLogger("apscheduler").setLevel(logging.ERROR)
for key, rely in rely_levels.items():
if _LEVEL_NUM.get(key) is None:
raise Exception(f"`rely_levels` Unsupported error types [{key}]")
for module in rely:
if key == "error" and module in ["asyncio", "tortoise", "apscheduler"]:
continue
logging.getLogger(module).setLevel(key)
def __loading_logs_configs():
""" Load Log configuration """
configs = {}
try:
package = importlib.import_module("config")
clz = getattr(package, "GlobalSetting", None)
if not clz:
return configs
obj = clz().dict()
return obj.get("LOGGER", {})
except ModuleNotFoundError:
return configs
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。