2 Star 2 Fork 0

Kenny小狼/python-tools

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
health_check.py 4.14 KB
一键复制 编辑 原始数据 按行查看 历史
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
from httplib import HTTPConnection
from sys import argv
import socket
import network_tool
import subprocess
try:
import DNS
except:
DNS = False
class CheckResult:
def __init__(self, is_success, reason=""):
self.is_success = is_success
self.reason = reason
def is_valid_ipv4_address(address):
try:
socket.inet_pton(socket.AF_INET, address)
except AttributeError: # no inet_pton here, sorry
try:
socket.inet_aton(address)
except socket.error:
return False
return address.count('.') == 3
except socket.error: # not a valid address
return False
return True
def is_valid_ipv6_address(address):
try:
socket.inet_pton(socket.AF_INET6, address)
except socket.error: # not a valid address
return False
return True
def address_check(addr):
if is_valid_ipv4_address(addr):
return True
else:
ip = network_tool.canIHasIP(addr)
if ip:
return True
else:
return False
def http_check(host, _port=80, timeout=10, url='/', method='GET'):
sr = socket_check(host, _port)
is_success = False
if not sr.is_success:
return CheckResult(sr.is_success, sr.reason)
conn = HTTPConnection(host, _port, True, timeout)
conn.request(method, url)
try:
httpres = conn.getresponse()
except socket.error as e:
return CheckResult(is_success, str(e))
if httpres.status in [200, 301, 302]:
is_success = True
return CheckResult(is_success, httpres.reason)
def socket_check(url, _port):
if not address_check(url):
return CheckResult(False, ("Server %s cant access." % url))
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.settimeout(1)
is_success = False
try:
sk.connect((url, _port))
reason = "Server port %i ok!" % _port
is_success = True
except IOError:
reason = "Server port %i not connect!" % _port
sk.close()
return CheckResult(is_success, reason)
def check_sentinel_redis_master_is_correct(sentinel_host, _port, master_name):
sr = socket_check(sentinel_host, _port)
is_success = False
error_msg = ''
know_error_msg = 'sentinel get master addr is %s, but not correct %s'
if not sr.is_success:
return CheckResult(sr.is_success, sr.reason)
out = subprocess.check_output(
'redis-cli -h %s -p %s SENTINEL get-master-addr-by-name %s | tr \'\n\' \',\' | sed -e \'s/,$//\' ' %
(sentinel_host, _port, master_name),
shell=True)
if out is not None and out.strip() is not '':
m = out.strip().split(',')
master_host = m[0]
master_port = m[1]
# print('master redis %s , port %s' % (master_host, master_port))
sr = socket_check(master_host, int(master_port))
if not sr.is_success:
error_msg = know_error_msg % (master_host, sr.reason)
else:
is_master = subprocess.check_output(
'redis-cli -h %s -p %s info | grep ^role | awk -F \':\' \'{print $2}\'' % (master_host, master_port),
shell=True)
if is_master is not None and is_master.strip() == 'master':
is_success = True
else:
reason = ' it slave of someone'
error_msg = know_error_msg % (master_host, reason)
else:
error_msg = 'get master addr from sentinel fail,message its empty, it must be cant find the master name'
return CheckResult(is_success, error_msg)
if __name__ == '__main__':
if len(argv) >= 3:
result = None
port = 80 if len(argv) is 3 else int(argv[3])
if argv[1] == "socket":
result = socket_check(argv[2], port)
elif argv[1] == "http":
result = http_check(argv[2], port)
elif argv[1] == "sentinel":
result = check_sentinel_redis_master_is_correct(argv[2], port, argv[4])
if result is not None:
print("%s check \"%s\", Result is %s, Reason: %s" % (argv[1], argv[2], result.is_success, result.reason))
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/kennylee/python-tools.git
git@gitee.com:kennylee/python-tools.git
kennylee
python-tools
python-tools
master

搜索帮助