代码拉取完成,页面将自动刷新
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
description: please input using description
author: justbk
date: 2022/7/4
modify_records:
- 2022/7/4 justbk create this file
"""
import datetime
from jenkinsapi.jenkins import Jenkins
from jenkinsapi.build import Build
import re
import time
import threading
import requests
jenkins_url = 'http://113.137.58.68:50007/jenkins'
jenkins_usr = 'root'
jenkins_pwd = '11035e0576c308dedd910b488acaafc5f6'
# jenkins' job
jenkins_job = 'run_ssjdbc_8pcs'
# how many job' build to watch.
jenkins_number = 7
jenkins_getcpu_number = 8
# the report data wait time, default is 0, test is 0.5
report_wait = 0.2
wait_for_start = True
pushgateway_url_format = 'http://192.168.0.78:9091/metrics/job/{node}/instance/{data_type}'
# need repair in different task.
pushgateway_tag = 'count_tpmc'
pushgateway_job = 'pushgateway'
class Tag:
def __init__(self, instance, key, value):
self.instance = instance
self.key = key
self.value = value
def decorate_data(self, data):
return "%s{%s=\"%s\"} %s\n" % (self.instance, self.key, self.value, data)
def delete_param(self, node, job):
return {'match[]': '{%s=\"%s\", exported_job=\"%s\"}' % (self.key, self.value, node),
'job': '%s' % job}
TAG_TPMC = Tag(pushgateway_tag, pushgateway_tag, pushgateway_tag)
TAG_CPU = Tag(pushgateway_tag, pushgateway_tag, 'getcpu')
class UrlSource:
PROMETHEUS = 'http://192.168.0.78:9090'
@classmethod
def uri_del(cls):
uri = '/api/v1/admin/tsdb/delete_series'
return "".join([cls.PROMETHEUS, uri])
class LogParser:
TERM_CONST = 'Term-00'
TERM_PATTERN = re.compile(
'Term-00,\s+Running Average tpmTOTAL:\s+(\d+[\.]*\d*)\s+Current tpmTOTAL:\s+(\d+)\s+Memory Usage:\s+\d+\s*MB / \d+\s*MB\s*')
RESULT_PATTERN = re.compile('Term-00, Measured tpmC \(NewOrders\) =\s+(\d+[\.]*\d*)')
# Term-00 32 busy_cpu=.2
TERM_CPU_PERCENT = re.compile('Term-00\s+(\d+)\s+busy_cpu=(\d*\.*\d*)')
@classmethod
def find_all_term(cls, log):
return list(cls.TERM_PATTERN.findall(log))
@classmethod
def find_all_term_by_console(cls, console):
lines = console.split('\n')
special = list(filter(lambda line: line.startswith(cls.TERM_CONST), lines))
if len(special) > 0:
return [item for x in special for item in cls.find_all_term(x)]
return []
@classmethod
def find_all_cpu_by_console(cls, console):
lines = console.split('\n')
special = list(filter(lambda line: line.startswith(cls.TERM_CONST), lines))
if len(special) > 0:
return [item for x in special for item in list(cls.TERM_CPU_PERCENT.findall(x))]
return []
class FetchConsole(threading.Thread):
def __init__(self, build: Build, build_id, reporter, tag, wait_time=0.5, least_char=50):
super(FetchConsole, self).__init__()
self.build = build
self.build_id = build_id
self.reporter = reporter
self.tag = tag
self.wait_time = wait_time
self.least_char = least_char
self.console = ''
def build_running(self):
return self.build.is_running()
def get_console(self):
return self.build.get_console().replace('\b', '')
def report(self, add_console):
if self.reporter is not None:
self.reporter.report(self.build, self.build_id, add_console)
def run(self) -> None:
pre_console = ''
first_run = True
while first_run or self.build_running():
first_run = False
console = self.get_console()
if len(console) - len(pre_console) <= self.least_char:
time.sleep(self.wait_time)
continue
last_term_pos = console.rfind(self.tag)
addition_console = console[len(pre_console): last_term_pos]
pre_console = console[:last_term_pos]
self.report(addition_console)
time.sleep(1)
console = self.get_console()
self.console = console
if len(console) > len(pre_console):
self.report(console[len(pre_console):])
class Reporter:
def __init__(self, node, tag, data_type='cpu'):
self.node = node
self.data_type = data_type
self.tag = tag
self.session = requests.session()
def get_url(self):
return pushgateway_url_format.format(node=self.node, data_type=self.data_type)
def request(self, url, method, data=None, headers=None, fp=None, **kwargs):
if headers is None:
headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}
print('report data:%s'%(data))
result = self.session.request(method, url, data=data, headers=headers, **kwargs)
ret = None
if fp is not None and callable(fp):
ret = fp(result)
result.close()
return ret
def report(self, tpmc):
self.request(self.get_url(), 'post', data=self.tag.decorate_data(tpmc))
def delete_data(self):
url = UrlSource.uri_del()
params = self.tag.delete_param(self.node, pushgateway_job)
print(url, params)
_func = lambda res: (res.reason, res.raw, res.request)
ret = self.request(url, method='post', params=params, fp=_func)
print(ret)
def close(self):
try:
self.session.close()
except Exception as e:
print("close session with exp:%s" % (e))
class MyJenkins:
def __init__(self, server: Jenkins, job_name, build_num):
self.server = server
self.job = self.server.get_job(job_name)
self.reports = {}
self.build_num = build_num
self.fetch_consoles = None
def print_serverinfo(self):
print('server version={}'.format(self.server.version))
print('server nodes:{}'.format(self.server.get_nodes()))
jobs = self.server.get_jobs()
print('server jobs:{}'.format(jobs))
def create_reporter(self, build_id):
return Reporter(build_id, TAG_TPMC)
def create_fetchconsole(self, build_number, job_id):
return FetchConsole(self.job.get_build(build_number), job_id, self, LogParser.TERM_CONST)
def get_reporter(self, build_id) -> Reporter:
if build_id in self.reports:
return self.reports[build_id]
if build_id not in self.reports:
self.reports[build_id] = self.create_reporter(build_id)
reporter = self.reports[build_id]
return reporter
def report(self, build: Build, build_id, add_console):
values = self.console_parse(add_console)
for v in values:
data = self.data_convert(v)
print('build:{} node_id:{} value:{}'.format(build.get_number(), build_id, data))
self.get_reporter(build_id).report(data)
if report_wait >= 0:
time.sleep(report_wait)
def console_parse(self, add_console):
return LogParser.find_all_term_by_console(add_console)
def data_convert(self, value):
return float(value[0]) * 0.45
def wait_for_running(self, timeout=10 * 60 * 60):
if not wait_for_start:
return True
cur_t = datetime.datetime.now()
while (datetime.datetime.now() - cur_t).total_seconds() < timeout:
if not self.get_running_number():
time.sleep(10)
continue
print('task start running!')
return True
print('wait over:%s' % (datetime.datetime.now() - cur_t).total_seconds())
return False
def get_running_number(self):
base_build = self.job.get_last_buildnumber()
running_num = 0
for i in range(self.build_num):
if self.job.get_build(base_build - i).is_running():
running_num += 1
return running_num == self.build_num
def parse_start(self):
max_buildnumber = self.job.get_last_buildnumber()
self.fetch_consoles = [self.create_fetchconsole(max_buildnumber - num, num + 1) for num in
range(self.build_num)]
for num in range(self.build_num):
self.get_reporter(num + 1).delete_data()
for fetch_console in self.fetch_consoles:
fetch_console.start()
def parse_end(self):
for fetch_console in self.fetch_consoles:
fetch_console.join()
self.parse_fetchconsole(fetch_console)
def parse_fetchconsole(self, fetch_console):
results = LogParser.RESULT_PATTERN.findall(fetch_console.console.replace('\n', ''))
print(results)
class CpuJenkins(MyJenkins):
def parse_fetchconsole(self, fetch_console):
pass
def console_parse(self, add_console):
return LogParser.find_all_cpu_by_console(add_console)
def data_convert(self, value):
return float(value[1])
def create_reporter(self, build_id):
return Reporter(build_id, TAG_CPU)
def create_fetchconsole(self, build_number, job_id):
return FetchConsole(self.job.get_build(build_number), job_id + jenkins_number, self, LogParser.TERM_CONST)
def test_parse(server):
jenkins = MyJenkins(server, jenkins_job, 1)
jenkins.parse_start()
jenkins.parse_end()
# jenkins.wait_for_running()
def test_cpu(server):
jenkins = CpuJenkins(server, 'get_cpu_percent', 1)
jenkins.parse_start()
jenkins.parse_end()
def test_tpmc_and_cpu(server):
jenkins_tpmc = MyJenkins(server, jenkins_job, jenkins_number)
jenkins_cpu = CpuJenkins(server, 'get_cpu_percent', jenkins_getcpu_number)
tpmc_start = jenkins_tpmc.wait_for_running()
if not tpmc_start:
print("tpmc task not started!")
tpmc_cpu = jenkins_cpu.wait_for_running()
if not tpmc_cpu:
print("cpu task not started!")
jenkins_tpmc.parse_start()
jenkins_cpu.parse_start()
jenkins_tpmc.parse_end()
jenkins_cpu.parse_end()
def test_url():
for i in range(jenkins_number):
report = Reporter(i + 1, TAG_TPMC)
report.report(0)
report.delete_data()
# for i in range(jenkins_number, jenkins_number + jenkins_getcpu_number):
for i in range(0, jenkins_number + jenkins_getcpu_number):
report = Reporter(i + 1, TAG_CPU)
if i >= jenkins_number:
report.report(0)
report.delete_data()
if __name__ == '__main__':
server = Jenkins(jenkins_url, username=jenkins_usr, password=jenkins_pwd)
test_url()
# test_tpmc_and_cpu(server)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。