代码拉取完成,页面将自动刷新
import datetime
import os
import importlib.util
import json
import runpy
import subprocess
import time
from pynput import keyboard
import webview
import yaml
import sys
import os
import models
from transport.transport_client import TransportClient
from transport.transport_server import TransportServer
def get_resource_path(relative_path):
# 获取资源路径
if hasattr(sys, '_MEIPASS'):
# 打包后的环境
return os.path.join(sys._MEIPASS, relative_path)
else:
# 开发环境
return os.path.join(os.path.abspath("."), relative_path)
def execute_code_file(file_path):
try:
spec = importlib.util.spec_from_file_location("module.name", f"script/{file_path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if hasattr(module, '_main') and callable(module.main):
result = module.main()
return result
else:
print("代码文件中没有可调用的 main 函数。")
return None
except Exception as e:
print(f"执行代码时出错: {e}")
return e
def execute_code(value):
if isinstance(value, datetime.date):
return value.strftime('%Y-%m-%d')
if isinstance(value, str) and value.endswith('.py'):
result = execute_code_file(value)
if result:
# print(f"执行结果: {result}")
return result
return None
if isinstance(value, str) and '_main' in value:
method = {}
exec(value, {}, method)
code_func = method.get('_main')
if code_func:
result = code_func()
# print(f"执行结果: {result}")
return result
return None
class Api:
def __init__(self, url):
self.trans_client = None
self.trans_server = None
self.url = url
self.meta_file = get_resource_path('script/meta.yaml')
self.windows = {}
self.currentActive = ''
self.script_windw_uuid = ''
self.config_path = get_resource_path('config.yaml')
def init(self):
# 主动模式
self.start_transport_server()
# 被动模式(服务器)
self.start_transport_client()
def get_script_list(self):
"""获取脚本列表"""
if not os.path.isfile(self.meta_file):
return []
with open(self.meta_file, 'r', encoding='utf-8') as yaml_file:
metas = yaml.load(yaml_file, Loader=yaml.BaseLoader)
if not metas:
return []
return metas
def get_script_detail(self):
"""获取脚本参数信息"""
script_name = self.windows[self.currentActive]['script_name']
with open(self.meta_file, 'r', encoding='utf-8') as yaml_file:
metas = yaml.load(yaml_file, Loader=yaml.FullLoader)
metas = list(filter(lambda x: x.get('script') == script_name, metas))
if not metas:
return []
meta = metas[0]
for item in meta.get('args', []):
value = item.get('value')
item['value'] = execute_code(value)
if item.get('enums'):
item['enums'] = item['enums'].split(',')
return meta
def _console(self, *text):
_args = tuple(map(lambda x: str(x), text))
window = self.windows[self.currentActive]['window']
json_result = json.dumps({'content': ' '.join(_args) + '\n'})
window.evaluate_js(f"updateLogContent({json_result});")
def receive_message(self, content):
main_window = webview.windows[0]
main_window.evaluate_js(f"updateMessage({content});")
def run_script(self, script_meta):
"""运行脚本"""
user_config = list(filter(lambda x: x['account'] == self.config['current_account'], self.config['users']))
if user_config:
user_config = user_config[0]
user_config = {f"_{k}": v for k, v in user_config.items()}
else:
user_config = {}
self.currentActive = webview.active_window().uid
window = self.windows[self.currentActive]['window']
json_result = json.dumps({'content': ''})
window.evaluate_js(f"updateLogContent({json_result}, true);")
params = {}
for item in script_meta['args']:
params[item['id']] = item.get('value')
script = runpy.run_path(f'script/{script_meta["script"]}',
init_globals={'_console': self._console, **user_config, **params})
try:
result = script['main']()
except Exception as e:
result = str(e)
if result:
json_result = json.dumps({'content': result})
window.evaluate_js(f"updateLogContent({json_result});")
def open_script_window(self, script_name):
"""打开脚本运行窗口"""
script_window = webview.create_window(f'运维工具箱 - {script_name}', self.url + 'run', focus=True, js_api=self,
width=900, height=650)
self.windows[script_window.uid] = {'window': script_window, 'script_name': script_name}
self.currentActive = script_window.uid
def simulate_typing(self, content):
# 模拟键入
time.sleep(1)
keyboard_controller = keyboard.Controller()
for char in content:
keyboard_controller.type(char)
time.sleep(0.01)
def refresh_config(self):
"""刷新配置文件"""
with open(self.config_path, 'r', encoding='utf-8') as yaml_file:
self.config = yaml.safe_load(yaml_file)
return self.config
def update_config(self, content):
with open(self.config_path, 'w', encoding='utf-8') as f:
f.write(yaml.safe_dump(content, allow_unicode=True))
def start_transport_server(self):
if self.trans_server:
return
self.trans_server = TransportServer(self.receive_message)
self.trans_server.start()
def close_transport_server(self):
if self.trans_server:
self.trans_server.stop()
self.trans_server = None
session = webview.windows[0].session
config = session.query(models.Config).first()
config.is_server = False
session.commit()
def start_transport_client(self, enable=False):
if self.trans_client:
return
port = 6985
session = webview.windows[0].session
config = session.query(models.Config).first()
if enable:
self.trans_client = TransportClient(ip_prefix="192.168.10.")
self.trans_client.start(port)
config.is_server = True
session.commit()
else:
if config.is_server:
self.trans_client = TransportClient(ip_prefix="192.168.10.")
self.trans_client.start(port)
# return {'status': True, 'msg': f'在线人数:{servers}', 'servers': servers}
def send_message(self, content):
print('content', content)
if content['type'] == 'text':
self.trans_server.send_message(content)
elif content['type'] == 'file':
self.trans_server.send_file(content)
def download_file(self, message):
print('message', message)
message['status'] = 'download'
self.trans_server.send_message(message)
def open_dir(self, message):
# 目标目录和文件
base_dir = 'chat_files'
abspath = os.path.abspath(os.path.join(base_dir, message['download_url']))
# 打开资源管理器并定位到目录
subprocess.run('explorer /select,"{}"'.format(abspath), shell=True)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。