亚洲精品久久久中文字幕-亚洲精品久久片久久-亚洲精品久久青草-亚洲精品久久婷婷爱久久婷婷-亚洲精品久久午夜香蕉

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

Python如何實(shí)現(xiàn)Paramiko的二次封裝

瀏覽:15日期:2022-06-28 17:04:09

Paramiko是一個(gè)用于執(zhí)行SSH命令的Python第三方庫(kù),使用該庫(kù)可實(shí)現(xiàn)自動(dòng)化運(yùn)維的所有任務(wù),如下是一些常用代碼的封裝方式,多數(shù)代碼為半成品,只是敲代碼時(shí)的備份副本防止丟失,僅供參考。

目前本人巡檢百臺(tái)設(shè)備完全無(wú)壓力,如果要巡檢過(guò)千臺(tái)則需要多線程的支持,過(guò)萬(wàn)臺(tái)則需要加入智能判斷等。

實(shí)現(xiàn)命令執(zhí)行: 直接使用過(guò)程化封裝,執(zhí)行CMD命令.

import paramikossh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())def BatchCMD(address,username,password,port,command): try: ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2) stdin , stdout , stderr = ssh.exec_command(command) result = stdout.read() if len(result) != 0: result = str(result).replace('n', 'n') result = result.replace('b’', '').replace('’', '') return result else: return None except Exception: return None

實(shí)現(xiàn)磁盤巡檢: 獲取磁盤空間并返回字典格式

def GetAllDiskSpace(address,username,password,port): ref_dict = {} cmd_dict = {'Linuxn' : 'df | grep -v ’Filesystem’ | awk ’{print $5 ':' $6}’','AIXn' : 'df | grep -v ’Filesystem’ | awk ’{print $4 ':' $7}’'} # 首先檢測(cè)系統(tǒng)版本 os_version = BatchCMD(address,username,password,port,'uname') for version,run_cmd in cmd_dict.items(): if(version == os_version): # 根據(jù)不同版本選擇不同的命令 os_ref = BatchCMD(address,username,password,port,run_cmd) ref_list= os_ref.split('n') # 循環(huán)將其轉(zhuǎn)換為字典 for each in ref_list:# 判斷最后是否為空,過(guò)濾最后一項(xiàng)if each != '': ref_dict[str(each.split(':')[1])] = str(each.split(':')[0]) return ref_dict# 磁盤巡檢總函數(shù)def DiskMain(): with open('db.json', 'r', encoding='utf-8') as read_fp: load_json = read_fp.read() js = json.loads(load_json) base = js.get('base') count = len(base) for each in range(0,count): print('033[37m-033[0m' * 80) print('033[35m 檢測(cè)地址: {0:10} t 用戶名: {1:10} t 密碼: {2:10} t 端口: {3:4}033[0m'.format(base[each][1],base[each][2],base[each][3],base[each][4])) print('033[37m-033[0m' * 80) ref = GetAllDiskSpace(base[each][1],base[each][2],base[each][3],base[each][4]) for k,v in ref.items():# 判斷是否存在空盤if( v.split('%')[0] != '-'): # 將占用百分比轉(zhuǎn)換為整數(shù) space_ret = int(v.split('%')[0]) if space_ret >= 70: print('033[31m 磁盤分區(qū): {0:30} t 磁盤占用: {1:5} 033[0m'.format(k,v)) continue if space_ret >= 50: print('033[33m 磁盤分區(qū): {0:30} t 磁盤占用: {1:5} 033[0m'.format(k, v)) continue else: print('033[34m 磁盤分區(qū): {0:30} t 磁盤占用: {1:5} 033[0m'.format(k, v)) continue print()# 組內(nèi)傳遞用戶名密碼時(shí)調(diào)用此方法def GroupDiskMain(address,username,password,port): ref = GetAllDiskSpace(address,username,password,port) for k, v in ref.items(): if (v.split('%')[0] != '-'): space_ret = int(v.split('%')[0]) if space_ret >= 70:print('磁盤分區(qū): {0:30} t 磁盤占用: {1:5} -> [警告]'.format(k, v))continue if space_ret >= 50:print('磁盤分區(qū): {0:30} t 磁盤占用: {1:5} -> [警惕]'.format(k, v))continue else:print('磁盤分區(qū): {0:30} t 磁盤占用: {1:5} -> [正常]'.format(k, v))continue print()

獲取系統(tǒng)內(nèi)存利用率: 獲取系統(tǒng)內(nèi)存利用率

def GetAllMemSpace(address,username,password,port): cmd_dict = {'Linuxn' : 'cat /proc/meminfo | head -n 2 | awk ’{print $2}’ | xargs | awk ’{print $1 ':' $2}’','AIXn' : 'df | grep -v ’Filesystem’ | awk ’{print $4 ':' $7}’'} # 首先檢測(cè)系統(tǒng)版本 os_version = BatchCMD(address,username,password,port,'uname') for version,run_cmd in cmd_dict.items(): if(version == os_version): # 根據(jù)不同版本選擇不同的命令 os_ref = BatchCMD(address,username,password,port,run_cmd) # 首先現(xiàn)將KB轉(zhuǎn)化為MB mem_total = math.ceil( int(os_ref.split(':')[0].replace('n','')) / 1024) mem_free = math.ceil(int(os_ref.split(':')[1].replace('n','')) / 1024) mem_used = str( int(mem_total) - int(mem_free)) # 計(jì)算占用空間百分比 percentage = 100 - int(mem_free / int(mem_total / 100)) print('內(nèi)存總計(jì)空間: {}'.format(str(mem_total) + ' MB')) print('內(nèi)存剩余空間: {}'.format(str(mem_free) + ' MB')) print('內(nèi)存已用空間: {}'.format(str(mem_used) + ' MB')) print('計(jì)算百分比: {}'.format(str(percentage) + ' %'))

獲取系統(tǒng)進(jìn)程信息: 獲取系統(tǒng)進(jìn)程信息,并返回字典格式

def GetAllProcessSpace(address,username,password,port): ref_dict = {} cmd_dict = {'Linuxn' : 'ps aux | grep -v ’USER’ | awk ’{print $2 ':' $11}’ | uniq','AIXn' : 'ps aux | grep -v ’USER’ | awk ’{print $2 ':' $12}’ | uniq'} os_version = BatchCMD(address,username,password,port,'uname') for version,run_cmd in cmd_dict.items(): if(version == os_version): os_ref = BatchCMD(address, username, password, port, run_cmd) ref_list = os_ref.split('n') for each in ref_list:if each != '': ref_dict[str(each.split(':')[0])] = str(each.split(':')[1]) return ref_dict# 巡檢進(jìn)程是否存在def ProcessMain(): with open('db.json', 'r', encoding='utf-8') as read_fp: load_json = read_fp.read() js = json.loads(load_json) process = js.get('process') process_count = len(process) for x in range(0,process_count): # 根據(jù)process中的值查詢base中的賬號(hào)密碼 base = js.get('base') if( list(process[x].keys())[0] == base[x][0] ):# 拿到賬號(hào)密碼之后再提取出他們的進(jìn)程ID于進(jìn)程名print('033[37m-033[0m' * 80)print('033[35m 檢測(cè)地址: {0:10} t 用戶名: {1:10} t 密碼: {2:10} t 端口: {3:4}033[0m'. format(base[x][1], base[x][2], base[x][3], base[x][4]))print('033[37m-033[0m' * 80)ref_dic = GetAllProcessSpace(base[x][1],base[x][2],base[x][3],base[x][4])# ref_val = 全部進(jìn)程列表 proc_val = 需要檢測(cè)的進(jìn)程列表ref_val = list(ref_dic.values())proc_val = list(process[x].values())[0]# 循環(huán)比較是否在列表中for each in proc_val: flag = each in ref_val if(flag == True): print('033[34m 進(jìn)程: {0:50} 狀態(tài): √ 033[0m'.format(each)) else: print('033[31m 進(jìn)程: {0:50} 狀態(tài): × 033[0m'.format(each))

實(shí)現(xiàn)劇本運(yùn)行功能: 針對(duì)特定一臺(tái)主機(jī)運(yùn)行劇本功能,隨便寫的一個(gè)版本,僅供參考

def RunRule(address,username,password,port,playbook): os_version = BatchCMD(address,username,password,port,'uname') if(os_version == list(playbook.keys())[0]): play = list(playbook.values())[0] print() print('033[37m-033[0m' * 130) print('033[35m 系統(tǒng)類型: {0:4} t 地址: {1:10} t 用戶名: {2:10} t 密碼: {3:15} t 端口: {4:4}033[0m' .format(os_version.replace('n',''),address,username,password,port)) print('033[37m-033[0m' * 130) for each in range(0,len(play)): RunCmd = play[each] + ' > /dev/null 2>&1 && echo $?' print('033[30m [>] 派發(fā)命令: {0:100} t 狀態(tài): {1:5} 033[0m'.format(RunCmd.replace(' > /dev/null 2>&1 && echo $?', ''),'正在派發(fā)')) os_ref = BatchCMD(address, username, password, port, RunCmd) if(os_ref == '0n'):print('033[34m [√] 運(yùn)行命令: {0:100} t 狀態(tài): {1:5} 033[0m'.format( RunCmd.replace(' > /dev/null 2>&1 && echo $?',''),'派發(fā)完成')) else:print('033[31m [×] 運(yùn)行命令: {0:100} t 狀態(tài): {1:5} 033[0m'.format( RunCmd.replace(' > /dev/null 2>&1 && echo $?',''),'派發(fā)失敗'))# 既然失敗了,就把剩下的也打出來(lái)吧,按照失敗處理for x in range(each+1,len(play)): print('033[31m [×] 運(yùn)行命令: {0:100} t 狀態(tài): {1:5} 033[0m'.format( play[x].replace(' > /dev/null 2>&1 && echo $?', ''), '終止執(zhí)行'))break else: return 0# 批量: 傳入主機(jī)組不同主機(jī)執(zhí)行不同劇本def RunPlayBook(HostList,PlayBook): count = len(HostList) error = [] success = [] for each in range(0,count): ref = RunRule(HostList[each][0],HostList[each][1],HostList[each][2],HostList[each][3],PlayBook) if ref == 0: error.append(HostList[each][0]) else: success.append(HostList[each][0]) print('nn') print('-' * 130) print('執(zhí)行清單') print('-' * 130) for each in success: print('成功主機(jī): {}'.format(each)) for each in error: print('失敗主機(jī): {}'.format(each))# 運(yùn)行測(cè)試def PlayBookRun(): playbook = { 'Linuxn':[ 'ifconfig', 'vmstat', 'ls', 'netstat -an', 'ifconfis', 'cat /etc/passwd | grep ’root’ | awk ’{print $2}’'] } addr_list = [ ['192.168.1.127', 'root', '1233', '22'], ['192.168.1.126', 'root', '1203', '22'] ] # 指定addr_list這幾臺(tái)機(jī)器執(zhí)行playbook劇本 RunPlayBook(addr_list,playbook)

過(guò)程化實(shí)現(xiàn)文件上傳下載: 文件傳輸功能 PUT上傳 GET下載

def BatchSFTP(address,username,password,port,soruce,target,flag): transport = paramiko.Transport((address, int(port))) transport.connect(username=username, password=password) sftp = paramiko.SFTPClient.from_transport(transport) if flag == 'PUT': try: ret = sftp.put(soruce, target) if ret !='':transport.close()return 1 else:transport.close()return 0 transport.close() except Exception: transport.close() return 0 elif flag == 'GET': try: target = str(address + '_' + target) os.chdir('./recv_file') ret = sftp.get(soruce, target) if ret != '':transport.close()return 1 else:transport.close()return 0 transport.close() except Exception: transport.close() return 0# 批量將本地文件 source 上傳到目標(biāo) target 中def PutRemoteFile(source,target): with open('db.json', 'r', encoding='utf-8') as fp: load_json = fp.read() js = json.loads(load_json) base = js.get('base') print('-' * 130) print('接收主機(jī) tt 登錄用戶 t 登錄密碼 t 登錄端口 t 本地文件 tt 傳輸?shù)?ttt 傳輸狀態(tài)') print('-' * 130) for each in range(0,len(base)): # 先判斷主機(jī)是否可通信 ref = BatchCMD(base[each][1], base[each][2], base[each][3], base[each][4],'uname') if ref == None:print('033[31m{0:15} t {1:6} t {2:10} t {3:3} t {4:10} t {5:10} t 未連通033[0m'.format( base[each][1],base[each][2],base[each][3],base[each][4],source,target))continue ref = BatchSFTP(base[each][1],base[each][2],base[each][3],base[each][4],source,target,'PUT') if(ref == 1):print('033[34m{0:15} t {1:6} t {2:10} t {3:3} t {4:10} t {5:10} t 傳輸成功033[0m'.format( base[each][1],base[each][2],base[each][3],base[each][4],source,target)) else:print('033[31m{0:15} t {1:6} t {2:10} t {3:3} t {4:10} t {5:10} t 傳輸失敗033[0m'.format( base[each][1], base[each][2], base[each][3], base[each][4], source, target))# 批量將目標(biāo)文件拉取到本地特定目錄(存在缺陷)def GetRemoteFile(source,target): with open('db.json', 'r', encoding='utf-8') as fp: load_json = fp.read() js = json.loads(load_json) base = js.get('base') print('-' * 130) print('發(fā)送主機(jī) tt 登錄用戶 t 登錄密碼 t 登錄端口 tt 遠(yuǎn)程文件 tt 拉取到 ttt 傳輸狀態(tài)') print('-' * 130) for each in range(0,len(base)): ref = BatchCMD(base[each][1], base[each][2], base[each][3], base[each][4], 'uname') if ref == None:print('033[31m{0:15} t {1:6} t {2:10} t {3:3} t {4:10} t {5:10} t 未連通033[0m'.format( base[each][1], base[each][2], base[each][3], base[each][4], source, target))continue ref = BatchSFTP(base[each][1],base[each][2],base[each][3],base[each][4],source,target,'GET') if(ref == 1):print('033[34m{0:15} t {1:6} t {2:10} t {3:3} t {4:10} t {5:10} t 傳輸成功033[0m'.format( base[each][1],base[each][2],base[each][3],base[each][4],source,target)) else:print('033[31m{0:15} t {1:6} t {2:10} t {3:3} t {4:10} t {5:10} t 傳輸失敗033[0m'.format( base[each][1], base[each][2], base[each][3], base[each][4], source, target))

另一種命令執(zhí)行方法:

import paramikossh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())def BatchCMD(address,username,password,port,command): try: ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2) stdin , stdout , stderr = ssh.exec_command(command) result = stdout.read() if len(result) != 0: return result else: return -1 except Exception: return -1# 通過(guò)獲取主機(jī)Ping狀態(tài)def GetPing(): fp = open('unix_base.db', 'r', encoding='utf-8') count = len(open('unix_base.db', 'r', encoding='utf-8').readlines()) print('-' * 100) print('{0:20} t {1:10} t {2:13} t {3:5} t {4:9} t {5:40}'.format('IP地址','機(jī)器系統(tǒng)','設(shè)備SN','機(jī)房位置','存活狀態(tài)','主機(jī)作用')) print('-' * 100) for each in range(count): ref = eval(fp.readline()) ret = BatchCMD(ref[0],ref[5],ref[6],22,'pwd | echo $?') if(int(ret)==0): print('{0:20} t {1:10} t {2:11} t {3:5} t {4:9} t {5:40}'. format(ref[0],ref[1],ref[2],ref[3],'正常',ref[4])) else: print('{0:20} t {1:10} t {2:13} t {3:5} t {4:9} t {5:40}'. format(ref[0],ref[1],ref[2],ref[3],'異常',ref[4])) fp.close()# ps aux | grep 'usbCfgDev' | grep -v 'grep' | awk {’print $2’}def GetProcessStatus(): fp = open('unix_process.db', 'r', encoding='utf-8') count = len(open('unix_process.db', 'r', encoding='utf-8').readlines()) for each in range(count): proc = eval(fp.readline()) proc_len = len(proc) print('-' * 70) print('---> 巡檢地址: {0:10} t 登錄用戶: {1:7} t 登錄密碼: {2:10}'.format(proc[0],proc[1],proc[2])) print('-' * 70) for process in range(3, proc_len): command = 'ps aux | grep ’{}’ | grep -v ’grep’ | awk ’{}’ | head -1'.format(proc[process],'{print $2}') try:ref = BatchCMD(proc[0],proc[1],proc[2],22,command)if(int(ref)!=-1): print('進(jìn)程: {0:18} t PID: {1:10} t 狀態(tài): {2}'.format(proc[process], int(ref),'√'))else: print('進(jìn)程: {0:18} t PID:{1:10} t 狀態(tài): {2}'.format(proc[process], 0,'×')) except Exception:print('進(jìn)程: {0:18} t PID:{1:10} t 狀態(tài): {2}'.format(proc[process], 0,'×')) print() fp.close()def GetDiskStatus(): fp = open('unix_disk.db', 'r', encoding='utf-8') count = len(open('unix_disk.db', 'r', encoding='utf-8').readlines()) for each in range(count): proc = eval(fp.readline()) proc_len = len(proc) print('-' * 100) print('---> 巡檢地址: {0:10} t 登錄系統(tǒng): {1:7} t 登錄賬號(hào): {2:10} 登錄密碼: {3:10}'. format(proc[0],proc[1],proc[2],proc[3])) print('-' * 100) try: ref = BatchCMD(proc[0], proc[2], proc[3], 22, 'df | grep -v ’Filesystem’') st = str(ref).replace('n', 'n') print(st.replace('b’', '').replace('’', '')) except Exception: pass print() fp.close()# 運(yùn)行命令def RunCmd(command,system): fp = open('unix_disk.db', 'r', encoding='utf-8') count = len(open('unix_disk.db', 'r', encoding='utf-8').readlines()) for each in range(count): proc = eval(fp.readline()) proc_len = len(proc) if proc[1] == system: print('-' * 100) print('---> 巡檢地址: {0:10} t 登錄系統(tǒng): {1:7} t 登錄賬號(hào): {2:10} 登錄密碼: {3:10}'. format(proc[0],proc[1],proc[2],proc[3])) print('-' * 100) try:ref = BatchCMD(proc[0], proc[2], proc[3], 22, command)st = str(ref).replace('n', 'n')print(st.replace('b’', '').replace('’', '')) except Exception:pass fp.close()

面向?qū)ο蟮姆庋b方法: 使用面向?qū)ο蠓庋b,可極大的提高復(fù)用性。

import paramikoclass MySSH: def __init__(self,address,username,password,default_port = 22): self.address = address self.default_port = default_port self.username = username self.password = password self.obj = paramiko.SSHClient() self.obj.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.obj.connect(self.address,self.default_port,self.username,self.password) self.objsftp = self.obj.open_sftp() def BatchCMD(self,command): stdin , stdout , stderr = self.obj.exec_command(command) result = stdout.read() if len(result) != 0: result = str(result).replace('n', 'n') result = result.replace('b’', '').replace('’', '') return result else: return None def GetRemoteFile(self,remotepath,localpath): self.objsftp.get(remotepath,localpath) def PutLocalFile(self,localpath,remotepath): self.objsftp.put(localpath,remotepath) def GetFileSize(self,file_path): ref = self.BatchCMD('du -s ' + file_path + ' | awk ’{print $1}’') return ref def CloseSSH(self): self.objsftp.close() self.obj.close()if __name__ == ’__main__’: ssh = MySSH(’192.168.191.3’,’root’,’1233’,22) ref = ssh.BatchCMD('ifconfig') print(ref) sz = ssh.GetFileSize('/etc/passwd') print(sz) ssh.CloseSSH()第二次封裝完善。import paramiko,os,json,reclass MySSH: def __init__(self,address,username,password,default_port = 22): self.address = address self.default_port = default_port self.username = username self.password = password try: self.obj = paramiko.SSHClient() self.obj.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.obj.connect(self.address,self.default_port,self.username,self.password,timeout=3,allow_agent=False,look_for_keys=False) self.objsftp = self.obj.open_sftp() except Exception: pass def BatchCMD(self,command): try: stdin , stdout , stderr = self.obj.exec_command(command,timeout=3) result = stdout.read() if len(result) != 0:result = str(result).replace('n', 'n')result = result.replace('b’', '').replace('’', '')return result else:return None except Exception: return None def GetRemoteFile(self,remote_path,local_path): try: self.objsftp.get(remote_path,local_path) return True except Exception: return False def PutLocalFile(self,localpath,remotepath): try: self.objsftp.put(localpath,remotepath) return True except Exception: return False def CloseSSH(self): self.objsftp.close() self.obj.close() # 獲取文件大小 def GetFileSize(self,file_path): ref = self.BatchCMD('du -s ' + file_path + ' | awk ’{print $1}’') return ref.replace('n','') # 判斷文件是否存在 def IsFile(self,file_path): return self.BatchCMD('[ -e {} ] && echo ’True’ || echo ’False’'.format(file_path))

通過(guò)Eval函數(shù)解析執(zhí)行: 自定義語(yǔ)法規(guī)則與函數(shù),通過(guò)Eval函數(shù)實(shí)現(xiàn)解析執(zhí)行. 沒(méi)寫完,僅供參考。

import json,os,sys,mathimport argparse,time,reimport paramikossh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())def BatchCMD(address,username,password,port,command): try: ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2) stdin , stdout , stderr = ssh.exec_command(command) result = stdout.read() if len(result) != 0: result = str(result).replace('n', 'n') result = result.replace('b’', '').replace('’', '') return result else: return None except Exception: return None# ------------------------------------------------------------------------# 內(nèi)置解析方法def GetDisk(x): return str(x)def GetCPULoad(): return str(10)# 句式解析器,解析句子并執(zhí)行def judge(string): # 如果匹配到IF則執(zhí)行判斷條件解析 if re.findall(r’IF{ (.*?) }’, string, re.M) != []: # 則繼續(xù)提取出他的表達(dá)式 ptr = re.compile(r’IF[{] (.*?) [}]’,re.S) subject = re.findall(ptr,string)[0] subject_list = subject.split(' ') # 拼接語(yǔ)句并執(zhí)行 sentence = eval(subject_list[0]) + subject_list[1] + subject_list[2] # 組合后執(zhí)行,返回結(jié)果 if eval(sentence): return 'IF' else: return False # 如果匹配到put則執(zhí)行上傳解析 elif re.findall(r’PUT{ (.*?) }’, string, re.M) != []: print('put') return False# 獲取特定目錄下所有的劇本def GetAllRule(): rootdir = os.getcwd() + 'rule' all_files = [f for f in os.listdir(rootdir)] print('-' * 90) print('{0:15} t {1:10} t {2:10} t {3:5} t {4:5}'.format('劇本名稱','應(yīng)用平臺(tái)','應(yīng)用端口','執(zhí)行主機(jī)數(shù)','命令條數(shù)')) print('-' * 90) for switch in all_files: # 首先判斷文件結(jié)尾是否為Json if( switch.endswith('.json') == True): all_switch_dir = rootdir + switch try:# 判斷文件內(nèi)部是否符合JSON規(guī)范with open(all_switch_dir , 'r' ,encoding='utf-8') as read_file: # 判斷是否存在指定字段來(lái)識(shí)別規(guī)范 load = json.loads(read_file.read()) if load.get('framework') != None and load.get('task_sequence') != None: run_addr_count = len(load.get('address_list')) command_count = len(load.get('task_sequence')) print('{0:15} t {1:10} {2:10} tt {3:5} tt {4:5}'. format(switch,load.get('framework'),load.get('default_port'),run_addr_count,command_count)) except ValueError:pass# 指定一個(gè)劇本并運(yùn)行def RunPlayBook(rule_name): rootdir = os.getcwd() + 'rule' all_files = [f for f in os.listdir(rootdir)] for switch in all_files: if( switch.endswith('.json') == True): all_switch_dir = rootdir + switch # 尋找到需要加載的劇本地址 if( switch == rule_name):with open(all_switch_dir , 'r' ,encoding='utf-8') as read_file: data = json.loads(read_file.read()) address_list = data.get('address_list') # 循環(huán)每個(gè)主機(jī)任務(wù) for each in address_list: # 得到劇本內(nèi)容 task_sequence = data.get('task_sequence') default_port = data.get('default_port') print('-' * 90) print('地址: {0:15} 用戶名: {1:10} 密碼: {2:10}'.format(each[0],each[1],each[2])) print('-' * 90) for task in task_sequence: flag = judge(task[0]) if flag == 'IF':ref = BatchCMD(each[0],each[1],each[2],default_port,task[1])print(ref) elif flag == False:ref = BatchCMD(each[0],each[1],each[2],default_port,task[0])print(ref)if __name__ == '__main__': RunPlayBook('get_log.json')

定義劇本規(guī)范如下。

{ 'framework': 'Centos', 'version': '7.0', 'address_list': [ ['192.168.191.3','root','1233'] ], 'default_port': '22', 'task_sequence': [ ['ifconfig'], ['IF{ GetLastCmdFlag() == True }','uname'] ]}

詞法分析: 詞法分析解析劇本內(nèi)容。

# 獲取特定目錄下所有的劇本def GetAllRule(): rootdir = os.getcwd() + 'rule' all_files = [f for f in os.listdir(rootdir)] print('-' * 90) print('{0:15} t {1:10} t {2:10} t {3:5} t {4:5}'.format('劇本名稱','應(yīng)用平臺(tái)','應(yīng)用端口','執(zhí)行主機(jī)數(shù)','命令條數(shù)')) print('-' * 90) for switch in all_files: # 首先判斷文件結(jié)尾是否為Json if( switch.endswith('.json') == True): all_switch_dir = rootdir + switch try:# 判斷文件內(nèi)部是否符合JSON規(guī)范with open(all_switch_dir , 'r' ,encoding='utf-8') as read_file: # 判斷是否存在指定字段來(lái)識(shí)別規(guī)范 load = json.loads(read_file.read()) if load.get('framework') != None and load.get('task_sequence') != None: run_addr_count = len(load.get('address_list')) command_count = len(load.get('task_sequence')) print('{0:15} t {1:10} {2:10} tt {3:5} tt {4:5}'. format(switch,load.get('framework'),load.get('default_port'),run_addr_count,command_count)) except ValueError:pass# 句式解析器,解析句子并執(zhí)行def judge(string): # 如果匹配到IF則執(zhí)行判斷條件解析 if re.findall(r’IF{ (.*?) }’, string, re.M) != []: # 則繼續(xù)提取出他的表達(dá)式 ptr = re.compile(r’IF[{] (.*?) [}]’,re.S) subject = re.findall(ptr,string)[0] subject_list = subject.split(' ') # 公開(kāi)接口,執(zhí)行命令 ssh = MySSH('192.168.191.3','root','1233','22') # 組合命令并執(zhí)行 sentence = str(eval(subject_list[0]) + subject_list[1] + subject_list[2]) if eval(sentence): return 'IF',ssh else: return False # 如果匹配到put則執(zhí)行上傳解析 elif re.findall(r’PUT{ (.*?) }’, string, re.M) != []: print('put') return False# 指定一個(gè)劇本并運(yùn)行def RunPlayBook(rule_name): rootdir = os.getcwd() + 'rule' all_files = [f for f in os.listdir(rootdir)] for switch in all_files: if( switch.endswith('.json') == True): all_switch_dir = rootdir + switch # 尋找到需要加載的劇本地址 if( switch == rule_name):with open(all_switch_dir , 'r' ,encoding='utf-8') as read_file: data = json.loads(read_file.read()) address_list = data.get('address_list') # 循環(huán)每個(gè)主機(jī)任務(wù) for each in address_list: # 得到劇本內(nèi)容 task_sequence = data.get('task_sequence') default_port = data.get('default_port') print('-' * 90) print('地址: {0:15} 用戶名: {1:10} 密碼: {2:10}'.format(each[0],each[1],each[2])) print('-' * 90) for task in task_sequence: flag,obj = judge(task[0]) if flag == 'IF':ret = obj.BatchCMD(task[1])print(ret)if __name__ == ’__main__’: ret = judge('IF{ ssh.GetFileSize(’/etc/passwd’) >= 4 }') print(ret)

MySSH類最終封裝: 通過(guò)面向?qū)ο髮?duì)其進(jìn)行封裝,實(shí)現(xiàn)了查詢CPU,負(fù)載,內(nèi)存利用率,磁盤容量,等通用數(shù)據(jù)的獲取。

import paramiko, math,jsonclass MySSH: def __init__(self, address, username, password, default_port): self.address = address self.default_port = default_port self.username = username self.password = password # 初始化,遠(yuǎn)程模塊 def Init(self): try: self.ssh_obj = paramiko.SSHClient() self.ssh_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh_obj.connect(self.address, self.default_port, self.username, self.password, timeout=3, allow_agent=False, look_for_keys=False) self.sftp_obj = self.ssh_obj.open_sftp() except Exception: return False # 執(zhí)行非交互命令 def BatchCMD(self, command): try: stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3) result = stdout.read() if len(result) != 0:result = str(result).replace('n', 'n')result = result.replace('b’', '').replace('’', '')return result else:return None except Exception: return None # 將遠(yuǎn)程文件下載到本地 def GetRemoteFile(self, remote_path, local_path): try: self.sftp_obj.get(remote_path, local_path) return True except Exception: return False # 將本地文件上傳到遠(yuǎn)程 def PutLocalFile(self, localpath, remotepath): try: self.sftp_obj.put(localpath, remotepath) return True except Exception: return False # 關(guān)閉接口 def CloseSSH(self): try: self.sftp_obj.close() self.ssh_obj.close() except Exception: pass # 獲取文件大小 def GetFileSize(self, file_path): ref = self.BatchCMD('du -s ' + file_path + ' | awk ’{print $1}’') return ref.replace('n', '') # 判斷文件是否存在 def IsFile(self, file_path): return self.BatchCMD('[ -e {} ] && echo ’True’ || echo ’False’'.format(file_path)) # 獲取系統(tǒng)型號(hào) def GetSystemVersion(self): return self.BatchCMD('uname') # 檢測(cè)目標(biāo)主機(jī)存活狀態(tài) def GetPing(self): try: if self.GetSystemVersion() != None:return True else:return False except Exception: return False # 獲取文件列表,并得到大小 def GetFileList(self, path): try: ref_list = [] self.sftp_obj.chdir(path) file_list = self.sftp_obj.listdir('./') for sub_path in file_list:dict = {}file_size = self.GetFileSize(path + sub_path)dict[path + sub_path] = file_sizeref_list.append(dict) return ref_list except Exception: return False # 將遠(yuǎn)程文件全部打包后拉取到本地 def GetTarPackageAll(self, path): try: file_list = self.sftp_obj.listdir(path) self.sftp_obj.chdir(path) for packageName in file_list:self.ssh_obj.exec_command('tar -czf /tmp/{0}.tar.gz {0}'.format(packageName))self.sftp_obj.get('/tmp/{}.tar.gz'.format(packageName), './file/{}.tar.gz'.format(packageName))self.sftp_obj.remove('/tmp/{}.tar.gz'.format(packageName))return True except Exception: return True # 獲取磁盤空間并返回字典 def GetAllDiskSpace(self): ref_dict = {} cmd_dict = {'Linuxn': 'df | grep -v ’Filesystem’ | awk ’{print $5 ':' $6}’', 'AIXn': 'df | grep -v ’Filesystem’ | awk ’{print $4 ':' $7}’' } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items():if (version == os_version): # 根據(jù)不同版本選擇不同的命令 os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split('n') # 循環(huán)將其轉(zhuǎn)換為字典 for each in ref_list: # 判斷最后是否為空,過(guò)濾最后一項(xiàng) if each != '': ref_dict[str(each.split(':')[1])] = str(each.split(':')[0]) return ref_dict except Exception: return False # 獲取系統(tǒng)內(nèi)存利用率 def GetAllMemSpace(self): cmd_dict = {'Linuxn': 'cat /proc/meminfo | head -n 2 | awk ’{print $2}’ | xargs | awk ’{print $1 ':' $2}’', 'AIXn': 'svmon -G | grep -v ’virtual’ | head -n 1 | awk ’{print $2 ':' $4}’' } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items():if (version == os_version): # 根據(jù)不同版本選擇不同的命令 os_ref = self.BatchCMD(run_cmd) # 首先現(xiàn)將KB轉(zhuǎn)化為MB mem_total = math.ceil(int(os_ref.split(':')[0].replace('n', '')) / 1024) mem_free = math.ceil(int(os_ref.split(':')[1].replace('n', '')) / 1024) # 計(jì)算占用空間百分比 percentage = 100 - int(mem_free / int(mem_total / 100)) # 拼接字典數(shù)據(jù) return {'Total': str(mem_total), 'Free': str(mem_free), 'Percentage': str(percentage)} except Exception: return False # 獲取系統(tǒng)進(jìn)程信息,并返回字典格式 def GetAllProcessSpace(self): ref_dict = {} cmd_dict = {'Linuxn': 'ps aux | grep -v ’USER’ | awk ’{print $2 ':' $11}’ | uniq', 'AIXn': 'ps aux | grep -v ’USER’ | awk ’{print $2 ':' $12}’ | uniq' } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items():if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split('n') for each in ref_list: if each != '': ref_dict[str(each.split(':')[0])] = str(each.split(':')[1]) return ref_dict except Exception: return False # 獲取CPU利用率 def GetCPUPercentage(self): ref_dict = {} cmd_dict = {'Linuxn': 'vmstat | tail -n 1 | awk ’{print $13 ':' $14 ':' $15}’', 'AIXn': 'vmstat | tail -n 1 | awk ’{print $14 ':' $15 ':' $16}’' } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items():if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split('n') for each in ref_list: if each != '': each = each.split(':') ref_dict = {'us': each[0],'sys':each[1],'idea':each[2]} return ref_dict except Exception: return False # 獲取機(jī)器的負(fù)載情況 def GetLoadAVG(self): ref_dict = {} cmd_dict = {'Linuxn': 'uptime | awk ’{print $10 ':' $11 ':' $12}’', 'AIXn': 'uptime | awk ’{print $10 ':' $11 ':' $12}’' } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items():if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split('n') for each in ref_list: if each != '': each = each.replace(',','').split(':') ref_dict = {'1avg': each[0],'5avg': each[1],'15avg': each[2]} return ref_dict return False except Exception: return False # 修改當(dāng)前用戶密碼 def SetPasswd(self,username,password): try: os_id = self.BatchCMD('id | awk {’print $1’}') print(os_id) if(os_id == 'uid=0(root)n'):self.BatchCMD('echo ’{}’ | passwd --stdin ’{}’ > /dev/null'.format(password,username))return True except Exception: return False# 定義超類,集成基類MySSHclass SuperSSH(MySSH): def __init__(self,address, username, password, default_port): super(SuperSSH, self).__init__(address, username, password, default_port)

我們繼續(xù)為上面的代碼加上命令行,讓其可以直接使用,這里需要遵循一定的格式規(guī)范,我們使用JSON解析數(shù)據(jù),JSON格式如下.

{ 'aix': [ ['192.168.1.1','root','123123'], ['192.168.1.1','root','2019'], ], 'suse': [ ['192.168.1.1','root','123123'], ], 'centos': [ ['192.168.1.1','root','123123'], ]}

接著是主程序代碼,如下所示.

# -*- coding: utf-8 -*-from MySSH import MySSHimport json,os,sys,argparseclass InitJson(): def __init__(self,db): self.db_name = db def GetPlatform(self,plat): with open(self.db_name, 'r', encoding='utf-8') as Read_Pointer: load_json = json.loads(Read_Pointer.read()) for k,v in load_json.items():try: if k == plat: return vexcept Exception: return None return Noneif __name__ == '__main__': ptr = InitJson('database.json') parser = argparse.ArgumentParser() parser.add_argument('-G','--group',dest='group',help='指定主機(jī)組') parser.add_argument('-C','--cmd',dest='cmd',help='指定CMD命令') parser.add_argument('--get',dest='get',help='指定獲取數(shù)據(jù)類型<ping>') parser.add_argument('--dst', dest='dst_file',help='目標(biāo)位置') parser.add_argument('--src',dest='src_file',help='原文件路徑') args = parser.parse_args() # 批量CMD --group=aix --cmd=ls if args.group and args.cmd: platform = ptr.GetPlatform(args.group) success,error = [],[] for each in platform: ssh = MySSH(each[0], each[1], each[2], 22) if ssh.Init() != False:print('-' * 140)print('主機(jī): {0:15} t 賬號(hào): {1:10} t 密碼: {2:10} t 命令: {3:30}'. format(each[0], each[1], each[2], args.cmd))print('-' * 140)print(ssh.BatchCMD(args.cmd))ssh.CloseSSH()success.append(each[0]) else:error.append(each[0])ssh.CloseSSH() print('nn','-' * 140, 'n 執(zhí)行報(bào)告 n', '-' * 140, 'n失敗主機(jī): {}n'.format(error), '-' * 140) # 批量獲取主機(jī)其他數(shù)據(jù) --group=centos --get=ping if args.group and args.get: platform = ptr.GetPlatform(args.group) success, error = [], [] for each in platform:ssh = MySSH(each[0], each[1], each[2], 22)# 判斷是否為pingif ssh.Init() != False: if args.get == 'ping': ret = ssh.GetPing() if ret == True: print('[*] 主機(jī): {} 存活中.'.format(each[0])) # 收集磁盤數(shù)據(jù) elif args.get == 'disk': print('-' * 140) print('主機(jī): {0:15} t 賬號(hào): {1:10} t 密碼: {2:10}'. format(each[0], each[1], each[2])) print('-' * 140) ret = ssh.GetAllDiskSpace() for k, v in ret.items(): if (v.split('%')[0] != '-'):space_ret = int(v.split('%')[0])if space_ret >= 70: print('磁盤分區(qū): {0:30} t 磁盤占用: {1:5} -> [警告]'.format(k, v)) continueif space_ret >= 50: print('磁盤分區(qū): {0:30} t 磁盤占用: {1:5} -> [警惕]'.format(k, v)) continueelse: print('磁盤分區(qū): {0:30} t 磁盤占用: {1:5}'.format(k, v)) continue print()else: error.append(each[0]) ssh.CloseSSH() print('nn', '-' * 140, 'n 執(zhí)行報(bào)告 n', '-' * 140, 'n失敗主機(jī): {}n'.format(error), '-' * 140) # 實(shí)現(xiàn)文件上傳過(guò)程 --group=centos --src=./a.txt --dst=/tmp/test.txt if args.group and args.src_file and args.dst_file: platform = ptr.GetPlatform(args.group) success, error = [], [] for each in platform: ssh = MySSH(each[0], each[1], each[2], 22) if ssh.Init() != False:ret = ssh.PutLocalFile(args.src_file,args.dst_file)if ret == True: print('主機(jī): {} t 本地文件: {} t ---> 傳到: {}'.format(each[0], args.src_file,args.dst_file)) else:error.append(each[0])ssh.CloseSSH() print('nn', '-' * 140, 'n 執(zhí)行報(bào)告 n', '-' * 140, 'n失敗主機(jī): {}n'.format(error), '-' * 140)

簡(jiǎn)單的使用命令:

遠(yuǎn)程CMD: python main.py --group=centos --cmd='free -h | grep -v ’total’'

Python如何實(shí)現(xiàn)Paramiko的二次封裝

判斷存活: python main.py --group=centos --get='ping'

Python如何實(shí)現(xiàn)Paramiko的二次封裝

拉取磁盤:python main.py --group=suse --get='disk'

Python如何實(shí)現(xiàn)Paramiko的二次封裝

批量上傳文件: python main.py --group=suse --src='http://www.aoyou183.cn/bcjs/aaa' --dst='/tmp/bbb.txt'

Python如何實(shí)現(xiàn)Paramiko的二次封裝

由于我的設(shè)備少,所以沒(méi)開(kāi)多線程,擔(dān)心開(kāi)多線程對(duì)目標(biāo)造成過(guò)大壓力,也沒(méi)啥必要。

番外: 另外我研究了一個(gè)主機(jī)分組的小工具,加上命令執(zhí)行代碼量才800行,實(shí)現(xiàn)了一個(gè)分組數(shù)據(jù)庫(kù),在這里記下使用方法。

默認(rèn)運(yùn)行進(jìn)入一個(gè)交互式shell環(huán)境。

Init = 初始化json文件,ShowHostList=顯示所有主機(jī),ShowGroup=顯示所有組,ShowAllGroup=顯示所有主機(jī)包括組。

Python如何實(shí)現(xiàn)Paramiko的二次封裝

添加修改與刪除記錄,命令如下。

Python如何實(shí)現(xiàn)Paramiko的二次封裝

添加刪除主機(jī)組。

Python如何實(shí)現(xiàn)Paramiko的二次封裝

通過(guò)UUID向主機(jī)組中添加或刪除主機(jī)記錄。

Python如何實(shí)現(xiàn)Paramiko的二次封裝

測(cè)試主機(jī)組連通性。

Python如何實(shí)現(xiàn)Paramiko的二次封裝

以上就是Python如何實(shí)現(xiàn)Paramiko的二次封裝的詳細(xì)內(nèi)容,更多關(guān)于Python實(shí)現(xiàn)Paramiko的二次封裝的資料請(qǐng)關(guān)注好吧啦網(wǎng)其它相關(guān)文章!

標(biāo)簽: Python 編程
相關(guān)文章:
主站蜘蛛池模板: 久久久99精品 | 一级毛片一片毛 | 久久久午夜影院 | 亚洲人成网站999久久久综合 | 国产精品 视频一区 二区三区 | 日韩成人中文字幕 | 嘿嘿嘿视频在线观看 | 一级免费黄色 | 亚洲欧美日韩中文字幕久久 | 国产成人18黄禁网站免费观看 | 欧美日韩一区二区三区自拍 | 欧美黑人一级做a爱性色 | 国产三级视频在线观看视主播 | 国产精品白嫩在线观看 | 成人99国产精品一级毛片 | 香港激情三级做爰小说 | 亚洲精品国产第一区二区小说 | 免费一区在线观看 | 青草视频在线观看免费资源 | 成人免费网站在线观看 | 日韩精品免费视频 | 伊人狠狠色j香婷婷综合 | 亚洲国产日韩综合久久精品 | 任你操网站 | 麻豆视频成人 | 亚洲综合二区 | 成人欧美一区二区三区视频不卡 | 亚洲性后网 | 2021中国大陆精品视频xxxx | 国产在线视频精品视频免费看 | 毛片特级| 国产精品视频分类 | 日韩欧美黄色大片 | 99re热这里只有精品18 | 尹人香蕉久久99天天拍欧美p7 | 美女免费精品高清毛片在线视 | 久久91亚洲精品久久91综合 | 在线欧美精品一区二区三区 | 国产成人不卡 | 欧美黄色一级毛片 | 丁香婷婷色综合 |