1 Star 0 Fork 0

铁血Coder/py-package

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
command.py 17.02 KB
一键复制 编辑 原始数据 按行查看 历史
lvwei 提交于 2017-05-11 16:57 . 去除无用代码
# -*- coding: gbk -*-
'''
该文件描述 服务器和客户端之间命令交互单元
'''
import threading, shutil, signal
import socket
import time, datetime
import guitool
import cPickle as pickle
from distribute import *
from package import *
moniter = common.moniter
global_srv_co = None
global_srv_time = None
EERROR_SUBMIT = -1
EERROR_SHOWSTATUS = -2
EOK = 0
ESUBMIT_FINISH = 1
ESHOWSTATUS_FINISH = 2
# 判断当前连接映射中有没有Slave节点, 如果没有则返回True
def noslave(context):
if len(context.map) == 0:
return True
for k, v in context.map.iteritems():
if v.kind == 'Slave':
return False
return True
def disconnect(cs, context):
timetxt = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
moniter.inst.error('%s %s OFFLINE' % (timetxt, context.map[cs].host))
context.sockets.remove(cs)
context.map[cs].dettach() # slave掉线时,对应的task设置为可用
del context.map[cs]
if noslave(context): # 没有可用的slave了,将作业列表清空
del context.jobqueue[:]
class CommandBase:
def __init__(self, args=()):
self.sendor = args
pass
def excute(self, sock, receiver=()):
return EOK
def remove_in_list(self, job, finished):
for t in job[:]:
if t.equal(finished):
job.remove(t)
def clientilde(self, client):
return client.kind == 'Slave' and not client.handing
# 给slave分配任务
def distribute(self, context, classtype):
job = context.jobqueue[-1]
handle = Distribute(job, context.map)
r = handle.dispatch()
if len(r) == 0:
moniter.inst.warn('job dosn\'t has been distribute, may no slave client to use.')
context.jobqueue.remove(job)
return False
for s, task in r.iteritems():
client = context.map[s]
data = classtype(task)
moniter.inst.debug('Server | %s called = {sendor = %s, task = %s}' % (data.info(), client.host, task.name))
cmd = pickle.dumps(data)
try:
send(s, cmd)
except socket.error, e: # Slave 已经连不上了
(errortype, errorvalue, errortb) = sys.exc_info()
(errno, err_msg) = errorvalue
moniter.inst.error('Excute command %s failed: %s, errno=%d' % (data.info(), err_msg, errno))
disconnect(s, context)
continue
client.attach(task) # slave状态和task状态都设为不可用
return True
def redistribute(self, context, sock, classtype):
job = context.jobqueue[-1]
handle = Distribute(job, context.map)
task = handle.redispatch(context.map[sock])
if task:
sendor = context.map[sock].host
data = classtype(task)
moniter.inst.debug('Server | %s called = {sendor = %s, task = %s}' % (data.info(), sendor, task.name))
cmd = pickle.dumps(data)
try:
send(sock, cmd)
except socket.error, e:
(errortype, errorvalue, errortb) = sys.exc_info()
(errno, err_msg) = errorvalue
moniter.inst.error('Excute command %s failed: %s, errno=%d' % (data.info(), err_msg, errno))
disconnect(sock, context)
moniter.inst.debug('The Task [%s] give to Slave [%s].' % (task.name, context.map[sock].host))
context.map[sock].attach(task) # slave状态和task状态都设为不可用
return task
# 确认客户端身份命令
class CommandConfirm(CommandBase):
def __init__(self, args=()):
CommandBase.__init__(self, args)
def excute(self, sock, receiver=()):
info = Client(socket.gethostbyname_ex(socket.gethostname()), receiver[0])
send(sock, pickle.dumps(info))
return EOK
# Request机提交配置信息命令, 服务器处理
class CommandSubmit(CommandBase):
def __init__(self, args=()):
CommandBase.__init__(self, args)
def excute(self, sock, receiver=()):
job = self.sendor
context = receiver[0]
if not context:
return EERROR_SUBMIT
global global_srv_time
global_srv_time = datetime.datetime.now()
# Slave构建任务分配算法
build_now = len(context.jobqueue) == 0 # 服务器正在调度执行已存在JOB, 新JOB添加到队列等待
context.jobqueue.append(job)
moniter.inst.warn('A job request to server, current job queue length : %d' % (len(context.jobqueue)))
if not build_now:
return ESUBMIT_FINISH
if self.distribute(context, CommandBuildStart):
global_srv_co = BinaryInitThread(job) # 服务器开始获取Bin目录用于打包
return ESUBMIT_FINISH
# Request端处理服务发送来的Slave状态信息
class RecvStateFromReuqest(CommandBase):
def __init__(self, args=()):
CommandBase.__init__(self, args)
def excute(self, sock, receiver=()):
src = receiver[1]
dst = self.sendor
src.clear()
src.update(dst)
# Server 开启发送Slave状态的线程
class OpenSlaveMonitor(CommandBase):
def __init__(self, args=()):
CommandBase.__init__(self, args)
self.timer = None
def _getSlaveState(self, context, sock):
slavestate = {}
# 服务检索当前连接的Slave的状态
for s, v in context.map.iteritems():
if v.kind == 'Request':
continue
if v.handing:
slavestate[v.addr] = 'busy'
else:
slavestate[v.addr] = 'connect'
cmd = pickle.dumps(RecvStateFromReuqest(slavestate))
send(sock, cmd)
self.timer = threading.Timer(1, self._getSlaveState, (context, sock))
self.timer.start()
def excute(self, sock, receiver=()):
context = receiver[0]
if not context:
return
self.timer = threading.Timer(1, self._getSlaveState, (context, sock))
self.timer.start()
# Request 显示服务器发送过来的各Slave和服务器的状态信息
class RequestRecvFromServer(CommandBase):
def __init__(self, args=()):
CommandBase.__init__(self, args)
def excute(self, sock, receiver=()):
message = self.sendor[0]
tp = self.sendor[1]
cache = receiver[0]
if not message or not tp:
return EERROR_SHOWSTATUS
if tp == 'Error':
moniter.inst.error(message)
elif tp == 'Warnning':
moniter.inst.warn(message)
elif tp == 'Info':
moniter.inst.debug(message)
if not cache.has_key(tp):
cache[tp] = []
cache[tp].insert(0, message) # 将发送到Request的信息纳入日志缓存, 头部插入
return ESHOWSTATUS_FINISH
# Slave将文本信息发送到服务器,再由服务器转发至所有request端
class ServerSendToRequest(CommandBase):
def __init__(self, args=()):
CommandBase.__init__(self, args)
def excute(self, sock, receiver=()):
context = receiver[0]
if not context:
return
mapclone = context.map.copy()
for s, v in mapclone.iteritems():
if v.kind != 'Request':
continue
try:
cmd = pickle.dumps(RequestRecvFromServer(self.sendor))
send(s, cmd)
except socket.error, e:
(errortype, errorvalue, errortb) = sys.exc_info()
(errno, err_msg) = errorvalue
moniter.inst.error('Send to message failed: %s, errno=%d' % (err_msg, errno))
disconnect(s, context)
# Slave 开始执行构建
class CommandBuildStart(CommandBase):
def __init__(self, args=()):
CommandBase.__init__(self, args)
self.timer = None
def __upload(self, result):
# 上传DLL
if result.returncode == 0:
up = FTPWrapper(result.uncompletes, '192.168.0.75', 'ftpuser', 'Disk123')
up.upload()
def __showtimecost(self, result):
if result.returncode == 0:
moniter.inst.warn('Compile cost: ' + str(result.compilecost))
moniter.inst.warn('Sign cost: ' + str(result.signcost))
moniter.inst.warn('Total: ' + str(result.totalcost))
else:
moniter.inst.warn('Compile cost: ' + str(result.compilecost))
moniter.inst.warn('Total: ' + str(result.totalcost))
def __transmit(self, sock, reader):
content = reader.read()
if len(content) != 0:
msg = (content, socket.gethostbyname(socket.gethostname()))
cmd = pickle.dumps(ServerSendToRequest(msg))
send(sock, cmd)
# 3秒 读取一次文件,降低I/O 开销
self.timer = threading.Timer(3, self.__transmit, (sock, reader))
self.timer.start()
def transmit(self, sock):
reader = open(moniter.loggerFile, 'r')
self.timer = threading.Timer(1, self.__transmit, (sock, reader))
self.timer.start()
def excute(self, sock, receiver=()):
task = self.sendor
# Slave 开始编译项目
guitool.tip_infomation('编译', '正在编译任务 %s' % (task.name), 2)
self.transmit(sock)
result = SlaveBuilder(task).run()
guitool.tip_infomation('上传', '正在上传DLL文件...', 2)
# 编译完成后上传成果
self.__upload(result)
# 显示构建耗时
self.__showtimecost(result)
guitool.tip_infomation('结果', '%s总耗时: %s' % (task.name, str(result.totalcost)), 2)
# 向Server发送编译完成时间
param = (task, result)
moniter.inst.debug('Slave | CommandBuildFinish called = {task = %s}' % (task.name))
cmd = pickle.dumps(CommandBuildFinish(param))
send(sock, cmd)
return EOK
def info(self):
return self.__class__.__name__
# Slave完成一个构建项通知服务器完成,并作相应处理
class CommandBuildFinish(CommandBase):
def __init__(self, args=()):
CommandBase.__init__(self, args)
def __build_next_job(self, context):
del context.workingjob[:]
del context.lastError[:]
jobsize = len(context.jobqueue)
global global_srv_time
start = global_srv_time
# 重新计时
global_srv_time = datetime.datetime.now()
moniter.inst.warn('Build time cost: ' + str(global_srv_time - start))
moniter.inst.warn('Begin to build next job, current job queue length : %d' % (jobsize))
if jobsize != 0:
if self.distribute(context, CommandBuildStart):
global_srv_co = BinaryInitThread(job) # 服务器开始获取Bin目录用于打包
else:
moniter.inst.info('No more job to build.')
def __handle_result(self, sock, task, result, context):
if result.returncode != 0:
mapclone = context.map.copy()
for s, v in mapclone.iteritems():
if v.kind != 'Request':
continue
fmt = StatusFomatter(mapclone[sock], task)
fmt.appenderror(result.message)
msg = (fmt.format_error(), 'Error')
try:
moniter.inst.debug('Server | RequestRecvFromServer called = {sendor = %s, task = %s}' % (v.host, task.name))
cmd = pickle.dumps(RequestRecvFromServer(msg))
send(s, cmd)
except socket.error, e:
(errortype, errorvalue, errortb) = sys.exc_info()
(errno, err_msg) = errorvalue
moniter.inst.error('Send to message failed: %s, errno=%d' % (err_msg, errno))
disconnect(s, context)
return result.returncode
def __abort_when_fail(self, sock, task, result, context):
pass
def excute(self, sock, receiver=()):
finished = self.sendor[0] # slave完成了的task
result = self.sendor[1] # slave编译结果
context = receiver[0]
if not context or not finished:
return EOK
# 根据编译结果处理DLL的上传和耗时记录
ret = self.__handle_result(sock, finished, result, context)
self.__abort_when_fail(sock, finished, result, context)
# 等待服务器将Bin目录导出
global global_srv_co
if global_srv_co and global_srv_co.isalive():
global_srv_co.wait()
if ret == 0:
context.workingjob.append(finished) # 将完成的任务保存
moniter.inst.info('Slave [%s] build Task [%s] successed.' % (context.map[sock].host, finished.name))
else:
context.lastError.append(result)
moniter.inst.error('Slave [%s] build Task [%s] failed.' % (context.map[sock].host, finished.name))
print context.workingjob
context.map[sock].dettach() # 任务完成,机器状态设置为可用
# 取出当前正在执行的JOB
if len(context.jobqueue) == 0:
self.__build_next_job(context)
return EOK
job = context.jobqueue.pop()
self.remove_in_list(job, finished) # 从当前执行JOB删除Slave完成的Task
can_build_next = len(job) == 0 # 判断一下当前JOB是不是刚好执行完成了, 是否能执行下一个JOB
if not can_build_next: # 将剩余的task分配到可用的Slave上
context.jobqueue.append(job)
if not self.redistribute(context, sock, CommandBuildStart):
can_build_next = len(job) == 0
if can_build_next:
if len(context.lastError) == 0:
PackageRunner(finished, context.workingjob).run()
moniter.inst.info('A Job of Requset build finished.')
self.__build_next_job(context) # 执行下一个JOB
return EOK
# 服务器数据上下文,用于命令处理时的参数
class ServerContext():
def __init__(self):
self.sockets = [] # 连接到服务器的客户端,和服务器套接字
self.map = {} # 客户端标记字典
self.jobqueue = [] # 作业队列,多个Request提交的作业按顺序放入队列
self.workingjob = [] # 当前正在工作,且未完成的JOB
self.lastError = [] # 最后一次构建错误
# 服务器Bin目录获取最新
class BinaryInitThread():
def __init__(self, job):
self.__job = job
self.__finished = False
self.__l = None
if len(job) != 0:
serverpath = job[0].serverpath
if not os.path.exists(serverpath):
os.makedirs(serverpath)
logfile = os.path.join(serverpath, 'checkout.log')
self.__l = open(logfile, 'w')
self.__thread = threading.Thread(target = self.run)
self.__thread.start()
def wait(self):
self.__finished = True
self.__thread.join()
self.__l.close()
def isalive(self):
return self.__thread.isAlive()
def getcheckoutpath(self, job):
pathdict = {}
for t in job:
svn = os.path.join(os.path.join(t.svn, t.master), 'Bin/').replace('\\', '/')
srvmasterbin = os.path.join(os.path.join(t.serverpath, t.master), 'Bin/').replace('/', '\\')
pathdict[srvmasterbin] = (svn, t.subversion)
svnbuild = os.path.join(os.path.join(t.svn, t.master), 'Builder/').replace('\\', '/')
srvbuild = os.path.join(os.path.join(t.serverpath, t.master), 'Builder/').replace('/', '\\')
pathdict[srvbuild] = (svnbuild, t.subversion)
for p in t.plugins:
svnp = os.path.join(os.path.join(t.svn, p), 'Bin/')
svnp = svnp.replace('\\', '/')
srvpluginbin = os.path.join(os.path.join(t.serverpath, p), 'Bin/').replace('/', '\\')
pathdict[srvpluginbin] = (svnp, t.subversion)
return pathdict
def run(self):
# 获得需要SVN Checkout的目录
svndict = self.getcheckoutpath(self.__job)
for local, src in svndict.iteritems():
# 先删除旧目录
if os.path.exists(local):
shutil.rmtree(local)
svn, version = src[0], src[1]
Tool.svn_export(svn, local, 'lvwei', 'Cas123456', version, True, self.__l)
self.__l.flush()
def get_pickling_errors(obj,seen=None):
if seen == None:
seen = []
try:
state = obj.__getstate__()
except AttributeError:
return
if state == None:
return
if isinstance(state,tuple):
if not isinstance(state[0],dict):
state=state[1]
else:
state=state[0].update(state[1])
result = {}
for i in state:
try:
pickle.dumps(state[i],protocol=2)
except pickle.PicklingError:
if not state[i] in seen:
seen.append(state[i])
result[i]=get_pickling_errors(state[i],seen)
return result
if __name__ == "__main__":
print get_pickling_errors(CommandBuildStart())
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/lvwiesuper/py-package.git
git@gitee.com:lvwiesuper/py-package.git
lvwiesuper
py-package
py-package
master

搜索帮助