互聯網故障一般表現為丟包和時延增大,持續性故障不難排查,難的是間歇性或凌晨故障,后者往往來不及等我們測驗就已經恢復正常,得不到例外時的mtr無法判斷故障點在哪里
故此有了根據丟包率和時延變換聯動mtr的需求
前段時間使用Mysql實作了這個功能,缺點是占用太多系統資源,且腳本繁重,優點是資料可復用,做多種形式的展示
后續使用socket+deque實作低能耗與輕量,也可用通過開放互聯網API來做分布式監控,缺點是歷史資料不留存,用完即丟
系統環境
Ubuntu 18.04.5 LTS+Python 3.6.9
python庫
自帶基本庫,考慮到系統權限問題沒有使用第三方庫
1 #!/usr/bin/env python3 2 #-*-coding:utf-8-*- 3 from collections import deque 4 import itertools,time 5 import queue,json 6 import argparse,sys,re,os,subprocess 7 import time,socket,random,string 8 import threading 9 from functools import reduce 10 import logging 11 ipqli=[] 12 filename = os.path.realpath(sys.argv[0]) 13 def logger(): 14 dir = os.path.dirname(os.path.realpath(sys.argv[0])) 15 log_name = dir+'/log' 16 logger = logging.getLogger() 17 fh = logging.FileHandler(log_name) 18 formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s") 19 fh.setFormatter(formater) 20 logger.setLevel(logging.DEBUG) 21 logger.addHandler(fh) 22 return logger 23 #ping程式,避免系統權限問題未使用ping3 24 class Ping: 25 def __init__(self,ip,flag,inver=1,count=20,udp_length=64): 26 ip = tuple(ip) 27 self.sip,self.tip,self.type,self.port=ip 28 self.type = self.type.lower() 29 self.port = int(self.port) 30 self.inver=inver 31 self.count=count 32 self.flag=flag 33 self.udp_length=udp_length 34 self.log = logger() 35 restime_name = 'restime_deque'+''.join(ip).replace('.','') 36 pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','') 37 locals()[restime_name] = deque(maxlen=60) 38 locals()[pkloss_name] = deque(maxlen=60) 39 self.restime_deque = locals()[restime_name] 40 self.pkloss_deque = locals()[pkloss_name] 41 self.ret_restime_deque = globals()[restime_name] 42 self.ret_pkloss_deque = globals()[pkloss_name] 43 self.compile= r'(?<=time=)\d+\.?\d+(?= ms)' 44 def _tcp(self): 45 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 46 s.settimeout(1) 47 start_time = time.time() 48 res_count=0 49 try: 50 s.bind((self.sip,0)) 51 s.connect((self.tip, self.port)) 52 s.shutdown(socket.SHUT_RD) 53 value = https://www.cnblogs.com/darkchen/p/(time.time() - start_time)*1000 54 self.restime_deque.append(value) 55 self.pkloss_deque.append(0) 56 res_count=1 57 except socket.timeout: 58 self.restime_deque.append(0) 59 self.pkloss_deque.append(1) 60 except OSError as e: 61 self.log.debug(e) 62 return 0,0 63 usetime = time.time()-start_time 64 sleep_time = self.inver - usetime if usetime<self.inver else self.inver 65 return sleep_time,res_count 66 def _udp(self): 67 res_count=0 68 s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 69 s.settimeout(1) 70 start_time = time.time() 71 data=https://www.cnblogs.com/darkchen/p/''.join(random.choice(string.ascii_letters+ string.digits) for x in range(self.udp_length)) 72 try: 73 s.sendto(data.encode('utf-8'),(self.tip,self.port)) 74 s.recv(1024) 75 value = https://www.cnblogs.com/darkchen/p/(time.time() - start_time)*1000 76 self.restime_deque.append(value) 77 self.pkloss_deque.append(0) 78 res_count=1 79 except socket.timeout: 80 self.restime_deque.append(0) 81 self.pkloss_deque.append(1) 82 except OSError as e: 83 self.log.debug(e) 84 return 0,0 85 usetime = time.time()-start_time 86 sleep_time = self.inver - usetime if usetime<self.inver else self.inver 87 return sleep_time,res_count 88 def _icmp(self): 89 res_count=0 90 start_time = time.time() 91 cmd = 'ping -i %s -c 1 -W 1 -I %s %s'%(self.inver,self.sip,self.tip) 92 ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8') 93 try: 94 value=https://www.cnblogs.com/darkchen/p/re.findall(self.compile, ret,re.S)[0] 95 self.restime_deque.append(value) 96 self.pkloss_deque.append(0) 97 res_count=1 98 except: 99 self.pkloss_deque.append(1) 100 self.restime_deque.append(0) 101 usetime = time.time()-start_time 102 sleep_time = self.inver - usetime if usetime<self.inver else self.inver 103 return sleep_time,res_count 104 def fastping(self): 105 getattr(self, '_'+self.type)() 106 def slow_ping(self): 107 index = 0 108 res_count=0 109 while index<self.count: 110 sleep_time,count=getattr(self, '_'+self.type)() 111 index+=1 112 res_count+=count 113 if not self.flag == len(ipqli) or len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2 : 114 break 115 time.sleep(sleep_time) 116 return index,res_count 117 def ping_value(self): 118 start_time = time.time() 119 count = self.count 120 rescount = self.count 121 if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2: 122 fastli=[] 123 for x in range(self.count): 124 t = threading.Thread(target=self.fastping) 125 t.start() 126 fastli.append(t) 127 for th in fastli: 128 th.join() 129 else: 130 count,rescount = self.slow_ping() 131 rescount=count if rescount==0 else rescount 132 use_time = round(time.time()-start_time,4) 133 li = [self.restime_deque.pop() for x in range(count)] 134 pkli = [self.pkloss_deque.pop() for x in range(count)] 135 try: 136 restime = reduce(lambda x ,y :round(float(x)+float(y),2), li)/rescount if len(li) >1 else round(float(li[0]),2) 137 pkloss= reduce(lambda x ,y :int(x)+int(y), pkli)/count*100 138 return (round(restime,2),round(pkloss,2),use_time) 139 except Exception as e: 140 self.log.debug(e) 141 return 0,0,0 142 #server端代碼 143 class Server(): 144 def __init__(self,sock): 145 global ipqli 146 self.ipqli=ipqli 147 self.thli=[] 148 self.sock=sock 149 self.basedir = os.path.dirname(os.path.realpath(sys.argv[0])) 150 self.log = logger() 151 @classmethod 152 def start(cls): 153 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 154 address = ('127.0.0.1',6589) 155 s.bind(address) 156 s.listen(100) 157 obj = cls(s) 158 ping_server=threading.Thread(target=obj.server) 159 ping_server.start() 160 obj.thli.append(ping_server) 161 create_t = threading.Thread(target=obj.create) 162 create_t.start() 163 obj.thli.append(create_t) 164 for t in obj.thli: 165 t.join() 166 def server(self): 167 while True: 168 conn,addr = self.sock.accept() 169 data=https://www.cnblogs.com/darkchen/p/conn.recv(1024) 170 data = https://www.cnblogs.com/darkchen/p/data.decode('utf-8') 171 data =https://www.cnblogs.com/darkchen/p/ json.loads(data) 172 ip,item = data 173 restime_ipq = 'restime_deque'+''.join(ip).replace('.','') 174 pkloss_ipq = 'pkloss_deque'+''.join(ip).replace('.','') 175 if ip not in self.ipqli: 176 globals()[restime_ipq] = deque(maxlen=30) 177 globals()[pkloss_ipq] = deque(maxlen=30) 178 self.ipqli.append(ip) 179 self.log.debug('create ipdeque %s %s'%(restime_ipq,pkloss_ipq)) 180 self.sendvalue(conn,ip,item) 181 conn.close() 182 def create(self): 183 create_list =[] 184 while True: 185 if self.ipqli: 186 leng = len(self.ipqli) 187 for ip in self.ipqli: 188 t=threading.Thread(target=self.makevalue,args=(ip,leng)) 189 t.start() 190 create_list.append(t) 191 for t in create_list: 192 t.join() 193 def makevalue(self,ip,leng): 194 restime_name = 'restime_deque'+''.join(ip).replace('.','') 195 pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','') 196 restime_ipq = globals()[restime_name] 197 pkloss_ipq = globals()[pkloss_name] 198 obj = Ping(ip,leng) 199 while leng==len(self.ipqli) and (len(restime_ipq) < 30 or len(pkloss_ipq) <30): 200 restime,pkloss,use_time=obj.ping_value() 201 restime_ipq.append((restime,use_time)) 202 pkloss_ipq.append((pkloss,use_time)) 203 else: 204 if leng==len(self.ipqli): 205 del restime_ipq 206 del pkloss_ipq 207 ipqli.remove(ip) 208 self.log.debug('delete ipdeque %s %s'%(restime_name,pkloss_name)) 209 def sendvalue(self,conn,ip,item): 210 fromat_ip=''.join(ip).replace('.','') 211 _,tip,_,_=ip 212 restime_name = 'restime_deque'+fromat_ip 213 pkloss_name = 'pkloss_deque'+fromat_ip 214 restime_ipq = globals()[restime_name] 215 pkloss_ipq = globals()[pkloss_name] 216 mtr_dir = self.basedir+'/mtr_log/'+tip+'-'+time.strftime('%Y-%m-%d',time.localtime()) + '.log' 217 mtr_cmd = self.basedir + '/mtr.sh'+' '+tip+' '+mtr_dir 218 if item =='restime': 219 while True: 220 if len(restime_ipq)>1: 221 ret,use_time = restime_ipq.pop() 222 hisret,_=restime_ipq[-1] 223 if ret - hisret >20: 224 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 225 break 226 elif item =='pkloss': 227 while True: 228 if len(pkloss_ipq) >1: 229 ret,use_time = pkloss_ipq.pop() 230 if 100> ret >20: 231 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 232 break 233 conn.sendall(str(ret).encode()) 234 #用戶輸入IP格式檢查 235 class Ipcheck(): 236 def __init__(self,sip,tip,item,ping_type): 237 self.sip =sip 238 self.tip=tip 239 self.item=item 240 self.type = ping_type.lower() 241 def check(self): 242 if self.item not in ['restime','pkloss']: 243 return False 244 elif self.type not in ['icmp','tcp','udp']: 245 return False 246 elif not self.checkipformat(): 247 return False 248 else: 249 return True 250 def check_fun(self,ip): 251 return int(ip)<256 252 def checkipformat(self): 253 try: 254 tiplist = self.tip.split('.') 255 tipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.tip) 256 if self.sip: 257 siplist = self.sip.split('.') 258 sipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.sip) 259 else: 260 siplist=[1,1,1,1] 261 sipformat=True 262 if not tipformat or not sipformat: 263 raise 264 check_ipli = tiplist+siplist 265 return self.checkiplength(check_ipli) 266 except: 267 return False 268 def checkiplength(self,check_ipli): 269 if list(itertools.filterfalse(self.check_fun, check_ipli)): 270 return False 271 else: 272 return True 273 def run(): 274 275 cmd = 'python3 %s -S server'%filename 276 subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 277 #socket_client端,向server請求資料并回傳給用戶 278 def socket_client(ip,item): 279 try: 280 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) 281 s.connect(('127.0.0.1',6589)) 282 data =https://www.cnblogs.com/darkchen/p/ [ip,item] 283 data =https://www.cnblogs.com/darkchen/p/ json.dumps(data) 284 s.sendall(data.encode()) 285 ret = s.recv(1024) 286 print(ret.decode()) 287 except: 288 print('server will start') 289 sys.exit(0) 290 if __name__ == '__main__': 291 parser = argparse.ArgumentParser(description='icmp for monitor') 292 parser.add_argument('-S',action = 'store',dest='server') 293 parser.add_argument('-t',action = 'store',dest='tip') 294 parser.add_argument('-s',action = 'store',dest='sip') 295 parser.add_argument('-I',action='store',dest='item') 296 parser.add_argument('-T',action='store',dest='ping_type',default='icmp') 297 parser.add_argument('-p',action='store',dest='port',default='0') 298 args= parser.parse_args() 299 server_status_cmd = "ps -ef | grep '%s -S server' | grep -v grep | cut -c 9-16"%filename 300 server_status = subprocess.Popen(server_status_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0] 301 if not server_status: 302 run() 303 if args.server: 304 Server.start() 305 sys.exit(0) 306 tip = args.tip 307 sip = args.sip 308 item = args.item 309 ping_type = args.ping_type 310 port = args.port 311 ip=(sip,tip,ping_type,port) 312 check = Ipcheck(sip, tip, item,ping_type) 313 if not check.check(): 314 print('''---------------------------Options----------------------------------- 315 -s --source ip address 316 -t --destination ip address 317 -I --item(restime/pkloss) 318 -T --type(icmp/tcp/udp default icmp) 319 -p --port(default 0) 320 ---------------------------Example----------------------------------- 321 ------pingd -s 10.0.3.108 -t 10.0.0.1 -I restime -T tcp -p 80------- 322 ''') 323 sys.exit(0) 324 socket_client(ip,item)
mtr.sh
#!/usr/bin/env bash IP=$1 dir=$2 mtr -r -n -c 30 -w -b $IP >> $2
udp探測需要服務器端開啟對應埠
1 #!/usr/bin env python3 2 import socket 3 while True: 4 sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 5 sock.bind(('ipaddress',port)) 6 data,addr = sock.recvfrom(65535) 7 sock.sendto(data,addr)
也可以使用socat,實際測驗使用socat會引入額外開銷,時延不準確
socat -v UDP-LISTEN:4000,fork PIPE
效果

Mysql 版
https://www.cnblogs.com/darkchen/p/14744242.html
以驅魔為理想,為生計而奔波轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/394924.html
標籤:Python
上一篇:Redis常用資料結構及應用場景
