基于Docker的Hadoop平臺搭建
main.py
"""
要使用這個程式,首先需要自己建立hadoop master節點
"""
import tkinter as tk
from buttons.check import check_cntr
from buttons.create import create_cntr
from buttons.delete import delete_cntr
from buttons.start import start_cntr
from buttons.stop import stop_cntr
from buttons.init import init
# from hosts import hosts_list, volumes, client_list
if __name__ == '__main__':
root = tk.Tk()
root.title('hadoop集群管理系統')
root.geometry('500x400')
canvas = tk.Canvas(root, height=200, width=500)
image_file = tk.PhotoImage(file='welcome.gif')
image = canvas.create_image(40, 0, anchor='nw', image=image_file)
canvas.pack(side='top')
button1 = tk.Button(root, text='創建容器', command=create_cntr)
button1.place(x=40, y=220, width=100, height=30)
button2 = tk.Button(root, text='查看容器狀態', command=check_cntr)
button2.place(x=220, y=220, width=100, height=30)
button3 = tk.Button(root, text='開啟容器', command=start_cntr)
button3.place(x=40, y=260, width=100, height=30)
button4 = tk.Button(root, text='關閉容器', command=stop_cntr)
button4.place(x=220, y=260, width=100, height=30)
button5 = tk.Button(root, text='洗掉容器', command=delete_cntr)
button5.place(x=40, y=300, width=100, height=30)
button6 = tk.Button(root, text='hadoop', command=init)
button6.place(x=220,y=300,width=100,height=30)
root.mainloop()
host.py
"""
放置很多自定義的常用的函式
以及保存所有物理主機資訊
以及client資訊
"""
import docker
import json
import os, time, re, sys
factor_mem = 0.8 # 0.8表示所有容器可使用的最大記憶體大小之和不能超過主機記憶體的80%
hosts_list = dict() # 存盤所有物理主機的資訊 key:主機名 value:{ip:*, memory:*, cpu:* ,remainMem}
hosts_list['zz-laptop'] = {'ip': '192.168.43.112', 'memory': 19.8, 'cpu': 8, 'remainMem': 19.8 * factor_mem,
'remainCpu': 8, 'avi_cpu': [i for i in range(0, 8)]}
hosts_list['lay'] = {'ip': '192.168.43.111', 'memory': 16, 'cpu': 8, 'remainMem': 16 * factor_mem,
'remainCpu': 8, 'avi_cpu': [i for i in range(0, 8)]}
prefix = '/home/lay' # 當前物理主機的當前用戶目錄
prefix_2= '/home/zz'
default_host = 'lay' # 默認主機,創建虛擬機等界面默認顯示的物理機,
# master節點資訊
hadoop_master = {'host_name': 'zz-laptop', 'ip': '172.17.1.2', 'name': 'host1', 'inside_host_name': 'master'}
host='http://'+hadoop_master['ip']+':50070/'
print(host)
volumes=dict()
volumes['lay'] = {prefix + '/share/hosts':
{'bind': '/etc/hosts', 'mode': 'rw'},
prefix + '/share/workers':
{'bind': '/usr/local/hadoop/etc/hadoop/workers', 'mode': 'rw'}}
volumes['zz-laptop'] = {prefix_2 + '/share/hosts':
{'bind': '/etc/hosts', 'mode': 'rw'},
prefix_2 + '/share/workers':
{'bind': '/usr/local/hadoop/etc/hadoop/workers', 'mode': 'rw'}}
environment = dict() # 字典,保存docker容器內環境變數,用于exec_run
environment['JAVA_HOME'] = '/usr/lib/jvm/jdk1.8.0_162'
localhost_text = '127.0.0.1 localhost\n'
# 保存所有物理主機及其client
client_list = dict() # key:物理主機名 value:DockerClient
for host in hosts_list.keys():
temp_ip = hosts_list[host]['ip']
client_list[host] = docker.DockerClient(base_url=temp_ip + ':2375')
# print(client_list)
name_list = list(hosts_list.keys()) # 存盤所有物理主機名
hadoop_dict_filename = prefix + "/share/hadoop_dict.json" # 保存集群資訊的字典所存放的位置
hadoop_dict = dict() # 存盤所有hadoop節點的資訊,每次節點增減之后的hadoop_dict字典會寫入hadoop_dict_filename#
# key:容器內部主機名 value:{'host_name':物理主機名, 'ip':容器ip, 'name':容器名, 'inside_host_name':容器內部主機名
# 通過client,獲取該物理機上所有未運行的容器
def get_closed_cntr(client):
containers_list = []
containers = client.containers.list(all=True)
containers_run = client.containers.list()
for container in containers:
if container not in containers_run:
containers_list.append(container)
return containers_list
# 通過client獲取該物理機上所有運行中的容器
def get_running_cntr(client):
containers_run = client.containers.list()
return containers_run
# 通過client獲取該物理機上所有容器
def get_all_containers(client):
return client.containers.list(all=True)
# 減小host_name主機的剩余可使用記憶體
def reduce_men(host_name, mem):
temp = hosts_list[host_name]['remainMem']
hosts_list[host_name]['remainMem'] = temp - mem
print(host_name)
print('remain_mem:', hosts_list[host_name]['remainMem'])
return True
# 增加host_name物理主機的剩余可使用記憶體,在洗掉容器時呼叫
def add_mem(host_name, cntr):
# cntr = client_list[default_host].containers.list()[0] cpu
mem_limit = int(cntr.attrs['HostConfig']['Memory']/1024/1024/1024)
temp = hosts_list[host_name]['remainMem']
hosts_list[host_name]['remainMem'] = temp + mem_limit
print(host_name)
print('remain_mem:', hosts_list[host_name]['remainMem'])
return True
# 減小主機的剩余可使用cpu數量,更新可使用的cpu串列
def reduce_cpu(host_name, cpu_num, use_cpu_list):
temp = hosts_list[host_name]['remainCpu']
# print(temp)
hosts_list[host_name]['remainCpu'] = temp - cpu_num
temp_list = hosts_list[host_name]['avi_cpu'][:]
[temp_list.remove(cpu) for cpu in use_cpu_list if cpu in temp_list]
hosts_list[host_name]['avi_cpu'] = temp_list
print(host_name)
print('remain_cpu', temp - cpu_num)
print('remain_cpu_list', temp_list)
return True
# 增加主機的剩余可使用cpu數量,更新可使用的cpu串列
def add_cpu(host_name, cntr):
cpu_num, use_cpu_list = get_cpu_num_list(cntr)
temp = hosts_list[host_name]['remainCpu']
# print(temp)
hosts_list[host_name]['remainCpu'] = temp + cpu_num
temp_list = hosts_list[host_name]['avi_cpu'][:]
[temp_list.append(i) for i in use_cpu_list]
hosts_list[host_name]['avi_cpu'] = temp_list
print(host_name)
print('remain_cpu', temp + cpu_num)
print('remain_cpu_list', temp_list)
return True
# 傳入一個的容器,回傳該容器內的主機名
def get_cntr_hostname(cntr):
inside_host_name = cntr.attrs["Config"]["Hostname"]
return inside_host_name
# 輸入一個容器,獲取該容器使用的cpu數量以及對應cpu
def get_cpu_num_list(cntr):
cpu_text = cntr.attrs['HostConfig']['CpusetCpus'] # 要開啟的容器使用的cpu串列
if len(cpu_text) == 1:
use_cpu_list = [int(cpu_text)]
else:
use_cpu_list = list(eval(cpu_text))
cpu_num = len(use_cpu_list)
return cpu_num, use_cpu_list
# 輸入一個容器,獲取該容器使用的記憶體大小
def get_mem_limit(cntr):
mem = cntr.attrs["HostConfig"]["Memory"] / 1024 / 1024 / 1024
mem = int(mem)
return mem
# 傳入一個容器,將這個容器加入到hadoop集群中作為資料節點
def join_hadoop(cntr):
out1 = cntr.exec_run('/usr/local/hadoop/sbin/hadoop-daemon.sh start datanode', stdout=True, stderr=True,
environment=environment, workdir='/') # 加入到已有的hadoop集群
out2 = cntr.exec_run('/usr/local/hadoop/sbin/yarn-daemon.sh start nodemanager', stdout=True, stderr=True,
environment=environment, workdir='/')
print("out1:", out1.output)
print("out2:", out2.output)
refresh_nodes() # 在master上重繪datanode資訊
return True
# 修改組態檔,用來增刪節點
# 引數: 物理主機名 容器名 容器內主機名 是添加還是洗掉,默認添加
def modify_file(host_name, cntr_name, inside_host_name, flag='add'):
global hadoop_dict
file1 = prefix + '/share/hosts'
file2 = prefix + '/share/workers'
if flag == 'add': # 如果添加節點,將其加入hadoop_dict
cntr = client_list[host_name].containers.get(cntr_name)
ip = cntr.attrs['NetworkSettings']['IPAddress']
hadoop_dict[inside_host_name] = {'host_name': host_name, 'ip': ip, 'name': cntr_name,
'inside_host_name': inside_host_name}
else: # 如果洗掉節點,將其從hadoop_dict中洗掉
hadoop_dict.pop(inside_host_name)
print('hadoop_dict:', hadoop_dict)
text1 = localhost_text
text2 = ''
for key, value in hadoop_dict.items(): # 獲取寫入到檔案的內容
# print('key:', key)
# print('value:', value)
text1 = text1 + value['ip'] + ' ' + key + '\n'
text2 = text2 + key + '\n'
tf = open(hadoop_dict_filename, 'w') # 更新hadoop_dict_filename
json.dump(hadoop_dict, tf)
tf.close()
p = open(file1, mode='w', encoding='utf-8') # 更新hosts
p.write(text1)
p.close()
q = open(file2, mode='w', encoding='utf-8') # 更新workers
q.write(text2)
q.close()
update_etc_hosts()
def update_etc_hosts():
global hadoop_dict
text = ''
for key, value in hadoop_dict.items(): # 獲取寫入到檔案的內容
text = text + value['ip'] + ' ' + key + '\n'
file3 = '/etc/hosts_back'
file4 = '/etc/hosts'
with open(file3, mode='r', encoding='utf-8') as file_object:
hosts_text = file_object.read()
with open(file4, mode='w', encoding='utf-8') as file_object: # 更新 /etc/hosts
file_object.write(hosts_text+text)
# master重繪datanode
def refresh_nodes():
client = client_list[hadoop_master['host_name']] # 獲取master節點所在物理機的client
cntr = client.containers.get(hadoop_master['name']) # 獲取master節點容器
# 重繪datanode
out1 = cntr.exec_run('/usr/local/hadoop/bin/hdfs dfsadmin -refreshNodes', stdout=True, stderr=True,
environment=environment, workdir='/')
print(out1.output)
# 在master上remove一個datanode
# 引數 物理主機名 容器名 容器內部主機名
def remove_datanode(host_name, cntr_name, inside_host_name):
excludes_file = prefix + '/share/excludes'
p = open(excludes_file, mode='w', encoding='utf8') # 將要remove的資料節點容器內部主機名放入excludes檔案
p.write(inside_host_name)
p.close()
refresh_nodes() # master會根據excludes檔案的內容,重繪datanode
p = open(excludes_file, mode='w', encoding='utf8') # 清空 excludes檔案
p.write('')
p.close()
modify_file(host_name, cntr_name, inside_host_name, 'remove') # 修改組態檔,將剛洗掉的這個資料節點資訊從組態檔中洗掉
# 初始化
def init():
global hadoop_dict
# 獲取所有運行中的容器資訊,將其占用的記憶體/cpu從物理機可用記憶體/cpu中減去
running_cntr = dict()
for host_name in hosts_list: # 在所有物理機中
client = client_list[host_name]
running_cntr[host_name] = get_running_cntr(client)
for cntr in get_running_cntr(client): # 在該物理機的所有運行中的容器
use_mem = get_mem_limit(cntr)
use_cpu_num, use_cpu_list = get_cpu_num_list(cntr)
reduce_men(host_name, use_mem)
reduce_cpu(host_name, use_cpu_num, use_cpu_list)
# 如果存在json檔案則從檔案中獲取hadoop集群節點
if os.path.exists(hadoop_dict_filename): # 如果存在json檔案
tf = open(hadoop_dict_filename, 'r') # 讀取json檔案中保存的hadoop集群節點資訊
hadoop_dict = json.load(tf) # 讀取hadoop集群中所有節點的資訊
tf.close()
else: # 不存在json檔案則創建, 并默認只有master節點
hadoop_dict['master'] = hadoop_master # 從hadoop_master中默認讀取master節點資訊,并認為集群目前只有這一個節點
file = open(hadoop_dict_filename, 'w')
file.close()
tf = open(hadoop_dict_filename, 'w') # 更新hadoop_dict_filename
json.dump(hadoop_dict, tf)
tf.close()
update_etc_hosts() # 更新/etc/hosts
pop_list = [] # json檔案中的節點如果不在運行中,則從hadoop集群中洗掉
for key, value in hadoop_dict.items():
temp_cntr_name = value['name']
temp_host_name = value['host_name']
temp_client = client_list[temp_host_name]
temp_cntr = temp_client.containers.get(temp_cntr_name)
if temp_cntr not in running_cntr[temp_host_name]:
pop_list.append(key)
[hadoop_dict.pop(key) for key in pop_list]
print(hadoop_dict)
init() # 初始化程式
# cntr.attrs['HostConfig']['CpusetCpus'] cpu
# cntr = client_list[default_host].containers.list()[0]
# cntr.attrs['HostConfig']['Memory']/1024/1024/1024 memory
# cntr = client_list[default_host].containers.get('0989d8d4b549')
# a = get_closed_cntr(client_list[default_host])
# a = cntr.attrs["HostConfig"]["Memory"]/1024/1024/1024 # 要開啟的容器使用的cpu串列
# print(a[0].name)
# a = cntr.attrs
# print(a)
# print(json.dumps(a, indent=4, sort_keys=True))
# inside_host_name = get_cntr_hostname(cntr)
# print(inside_host_name)
# modify_file(default_host,'test11', inside_host_name)
# cntr = client_list[default_host].containers.get('042a5e10e305')
# stats = cntr.stats(decode=True)
# print(get_mem_limit(cntr))
# print(get_cpu_num_list(cntr))
# old_result = next(stats)
# new_result = cntr.attrs['NetworkSettings']['IPAddress']
# cpu_total_usage = new_result['cpu_stats']['cpu_usage']['total_usage'] - old_result['cpu_stats']['cpu_usage'][
# 'total_usage']
# cpu_system_uasge = new_result['cpu_stats']['system_cpu_usage'] - old_result['cpu_stats']['system_cpu_usage']
# cpu_num = len(old_result['cpu_stats']['cpu_usage']['percpu_usage'])
# cpu_percent = round((float(cpu_total_usage) / float(cpu_system_uasge)) * cpu_num * 100.0, 2)
#
# result = new_result['cpu_stats']['cpu_usage']['percpu_usage']
# num = sum(result)/max(result)
# num = round(num)
# print(num)
# print(json.dumps(new_result, indent=4, sort_keys=True))
# out = cntr.exec_run('cat /etc/hostname', stdout=True, stderr=True, workdir='/')
# print(str(out.output)[2:-3])
#
# print(cpu_percent)
# print(psutil.cpu_times())
#
#
# begin = time.time()
# pattern = re.compile("\d+")
# time2sleep = 2.5
# x = []
# y = []
# t = 0
# totaltime = float(sys.argv[1]) if len(sys.argv) > 1 else 10.0
# print(totaltime)
# while totaltime >= 0:
# # cpuInfo = os.popen('top -bi -n 2 -d 0.01').read().split('\n\n\n')[1].split('\n')[2]
# # items = pattern.findall(cpuInfo)
# # usage =float(items[0]) +float(items[1])+float(items[2])
# # top巨慢,果斷直接從/proc/stat搞起
#
# info1 = os.popen("cat /proc/stat").readline()
# ct1 = pattern.findall(info1)
# sum1 = 0
# for i in ct1:
# sum1 += int(i)
#
# time.sleep(0.1)
#
# info2 = os.popen("cat /proc/stat").readline()
# ct2 = pattern.findall(info2)
# sum2 = 0
# for i in ct2:
# sum2 += int(i)
#
# usage = (int(ct2[0]) + int(ct2[2]) - int(ct1[0]) - int(ct1[2])) * 1.0 / (sum2 - sum1) * 100
# print(usage, "%")
# y.append(usage)
#
# now = time.time() - begin
# x.append(now)
#
# totaltime = totaltime - 0.1
#
# end = time.time()
#
# print(end - begin)
check.py
import tkinter as tk
import tkinter.messagebox
from tkinter import ttk
from hosts import client_list, name_list, default_host, \
get_all_containers, get_cpu_num_list, get_cntr_hostname, get_mem_limit
import docker
import matplotlib.pyplot as plt
import time
ani = 0
temp_list1 = []
def check_cntr():
root2 = tk.Tk()
root2.title('查看容器狀態')
root2.geometry('650x600')
global c4, c5, t2, temp_list1
l0 = tk.Label(root2, text='當前選擇的物理機上所有的容器')
l1 = tk.Label(root2, text='請選擇想要查詢的容器所在的物理機')
v1 = tk.StringVar()
c4 = ttk.Combobox(root2, textvariable=v1)
c4["values"] = name_list # 物理主機名
c4.current(name_list.index(default_host))
t2 = tk.Text(root2)
c4.bind("<<ComboboxSelected>>", go2)
temp_list1 = []
cts = get_all_containers(client_list[default_host]) # 獲取所有運行中的容器
for i in range(0, len(cts)):
print(cts[i].name)
t2.insert('end', cts[i].name + '\n')
temp_list1.append(cts[i].name)
l2 = tk.Label(root2, text='請輸入要查看的容器的name')
v2 = tk.StringVar()
c5 = ttk.Combobox(root2, textvariable=v2)
c5["values"] = temp_list1
b1 = tk.Button(root2, text='開始查詢')
b1.bind('<Button-1>', get5)
l0.place(x=400, y=20)
l1.place(x=20, y=20)
c4.place(x=20, y=60)
t2.place(x=400, y=60, width=100, height=150)
l2.place(x=20, y=300)
c5.place(x=400, y=300, width=100)
b1.place(x=400, y=460)
root2.mainloop()
def go2(*args):
temp_list1.clear()
t2.delete('1.0', 'end')
host_name = c4.get()
client = client_list[host_name]
cts = get_all_containers(client)
for i in range(0, len(cts)):
temp_list1.append(cts[i].name)
t2.insert('end', cts[i].name + '\n')
c5["values"] = temp_list1
def get5(self):
host_name = c4.get()
name = c5.get()
status(host_name, name)
def status(host_name, name):
root6 = tk.Tk()
root6.title('容器狀態')
root6.geometry('600x400')
try:
client = client_list[host_name]
except:
tkinter.messagebox.showerror(title='提示', message='容器引數資訊輸入錯誤')
return
try:
container = client.containers.get(name)
cntr_stats = str(container.attrs['State']['Status'])
mem_limit = get_mem_limit(container)
if cntr_stats == 'running': # 容器在運行中
stats = container.stats(decode=True)
old_result = next(stats)
new_result = next(stats)
cpu_total_usage = new_result['cpu_stats']['cpu_usage']['total_usage'] - \
old_result['cpu_stats']['cpu_usage'][
'total_usage']
cpu_system_uasge = new_result['cpu_stats']['system_cpu_usage'] - old_result['cpu_stats']['system_cpu_usage']
cpu_num = len(old_result['cpu_stats']['cpu_usage']['percpu_usage'])
cpu_percent = str(round((float(cpu_total_usage) / float(cpu_system_uasge)) * cpu_num * 100.0, 2)) + ' %'
mem_usage = str(new_result['memory_stats']['usage'] / 1024 / 1024)
mem_percent = str(round(float(mem_usage) / (float(mem_limit) * 1024) * 100.0, 2)) + ' %'
mem_usage = mem_usage + 'MiB'
else:
mem_usage = 'none'
mem_percent = 'none'
cpu_percent = 'none'
use_cpu_list = get_cpu_num_list(container)[1]
use_cpu_text = ''
for cpu in use_cpu_list:
use_cpu_text += str(cpu)
use_cpu_text += ','
use_cpu_text = use_cpu_text[:-1]
inside_host_name = get_cntr_hostname(container) # 容器內部主機名
tk.Label(root6, text='容器名稱').place(x=20, y=20)
tk.Label(root6, text=str(name)).place(x=200, y=20)
tk.Label(root6, text='容器狀態').place(x=20, y=60)
tk.Label(root6, text=str(container.attrs['State']['Status'])).place(x=200, y=60)
tk.Label(root6, text='容器內部主機名').place(x=20, y=100)
tk.Label(root6, text=inside_host_name).place(x=200, y=100)
tk.Label(root6, text='容器ip').place(x=20, y=140)
tk.Label(root6, text=container.attrs['NetworkSettings']['IPAddress']).place(x=200, y=140)
tk.Label(root6, text='使用的cpu').place(x=20, y=180)
tk.Label(root6, text=use_cpu_text).place(x=200, y=180)
tk.Label(root6, text='cpu使用率').place(x=20, y=220)
tk.Label(root6, text=str(cpu_percent)).place(x=200, y=220)
tk.Label(root6, text='使用記憶體').place(x=20, y=260)
tk.Label(root6, text=str(mem_usage)).place(x=200, y=260)
tk.Label(root6, text='總記憶體').place(x=20, y=300)
tk.Label(root6, text=str(mem_limit) + 'GiB').place(x=200, y=300)
tk.Label(root6, text='記憶體使用率').place(x=20, y=340)
tk.Label(root6, text=str(mem_percent)).place(x=200, y=340)
b1 = tk.Button(root6, text='實時顯示cpu使用率')
b2 = tk.Button(root6, text='實時顯示記憶體使用率')
b1.bind('<Button-1>', cur1)
b2.bind('<Button-1>', cur2)
b1.place(x=300, y=220)
b2.place(x=300, y=340)
except docker.errors.NotFound as e:
tkinter.messagebox.showerror(title='提示', message='未找到該容器,請重新輸入')
def cur1(self):
host_name = c4.get()
client = client_list[host_name]
name = c5.get()
container = client.containers.get(name)
cntr_stats = str(container.attrs['State']['Status'])
if cntr_stats != 'running':
tk.messagebox.showinfo("提示", '該容器不在運行中!')
return
ani_cpu(container)
def cur2(self):
host_name = c4.get()
client = client_list[host_name]
name = c5.get()
container = client.containers.get(name)
cntr_stats = str(container.attrs['State']['Status'])
if cntr_stats != 'running':
tk.messagebox.showinfo("提示", '該容器不在運行中!')
return
ani_memory(container)
# 關閉 實時顯示cpu/記憶體的動態影像時 進行的操作
def handle_close(event):
print("stop!")
global ani
ani = 0 # 使ani為0,不再繼續動態更新圖片
plt.ioff()
def ani_cpu(container): # 動態更新cpu使用率
stats = container.stats(decode=True)
begin = time.time()
x = []
y = []
# 打開互動模式
plt.ion()
global ani
ani = 1
fig, ax = plt.subplots()
fig.canvas.mpl_connect('close_event', handle_close)
# 在前面得到的子圖上繪圖
# 回圈
while (ani):
# 清除原有影像
plt.cla()
# 設定標題等
plt.title("動態曲線圖")
# 網格線
plt.grid(True)
old_result = next(stats)
new_result = next(stats)
cpu_total_usage = new_result['cpu_stats']['cpu_usage']['total_usage'] - old_result['cpu_stats']['cpu_usage'][
'total_usage']
cpu_num = len(old_result['cpu_stats']['cpu_usage']['percpu_usage'])
cpu_system_uasge = new_result['cpu_stats']['system_cpu_usage'] - old_result['cpu_stats']['system_cpu_usage']
cpu_percent = round((float(cpu_total_usage) / float(cpu_system_uasge)) * cpu_num * 100.0, 2)
y.append(cpu_percent)
now = time.time() - begin
x.append(now)
plt.plot(x, y)
plt.title("CPU Usage Rate")
plt.xlabel("time/s")
plt.ylabel("percentage/%")
# 暫停
plt.pause(1)
plt.show()
def ani_memory(container): # 動態更新記憶體使用率
stats = container.stats(decode=True)
begin = time.time()
x = []
y = []
# 打開互動模式
plt.ion()
global ani
ani = 1
fig, ax = plt.subplots()
fig.canvas.mpl_connect('close_event', handle_close)
# 在前面得到的子圖上繪圖
# 回圈
while (ani):
# 清除原有影像
plt.cla()
# 設定標題等
plt.title("動態曲線圖")
# 網格線
plt.grid(True)
new_result = next(stats)
mem_usage = new_result['memory_stats']['usage'] / 1024 / 1024
mem_limit = new_result['memory_stats']['limit'] / 1024 / 1024 / 1024
mem_percent = round(float(mem_usage) / (float(mem_limit) * 1024) * 100.0, 2)
y.append(mem_percent)
now = time.time() - begin
x.append(now)
plt.plot(x, y)
plt.title("Memory Usage Rate")
plt.xlabel("time/s")
plt.ylabel("percentage/%")
# 暫停
plt.pause(1)
plt.ioff()
plt.show()
create.py
import tkinter as tk
import tkinter.messagebox
from tkinter import ttk
from hosts import hosts_list, client_list, volumes, \
name_list, get_all_containers, default_host, \
reduce_cpu, reduce_men, join_hadoop,\
modify_file, hadoop_dict
# 剩余Mem與剩余Cpu
mem_list = [i for i in range(1, int(hosts_list[default_host]['remainMem'] + 1))]
cpu_list = [i for i in range(1, int(hosts_list[default_host]['remainCpu'] + 1))]
# client_number = len(client_list)
# 創建新容器
def create_cntr():
root1 = tk.Tk()
root1.title('創建容器')
root1.geometry('650x600')
global c1, c2, c3, e1, e2, t1
l0 = tk.Label(root1, text='當前物理機上所有的容器')
l1 = tk.Label(root1, text='請選擇在哪臺物理機上創建容器')
v1 = tk.StringVar()
c1 = ttk.Combobox(root1, textvariable=v1)
c1["values"] = name_list
c1.current(name_list.index(default_host))
t1 = tk.Text(root1)
c1.bind("<<ComboboxSelected>>", go1)
cts = get_all_containers(client_list[default_host])
for i in range(0, len(cts)):
# print(cts[i].name)
t1.insert('end', cts[i].name + '\n')
l2 = tk.Label(root1, text='請選擇新建容器最大可使用的記憶體(單位GB)')
v2 = tk.StringVar()
c2 = ttk.Combobox(root1, textvariable=v2)
c2["values"] = mem_list
c2.current(0)
l3 = tk.Label(root1, text='請選擇新建容器最多可使用的CPU數目')
v3 = tk.StringVar()
c3 = ttk.Combobox(root1, textvariable=v3)
c3["values"] = cpu_list
c3.current(0)
l4 = tk.Label(root1, text='請輸入容器名')
v4 = tk.StringVar()
e1 = tk.Entry(root1, textvariable=v4)
l5 = tk.Label(root1, text='請輸入容器內主機名')
v5 = tk.StringVar()
e2 = tk.Entry(root1, textvariable=v5)
b1 = tk.Button(root1, text='開始創建')
b1.bind('<Button-1>', get1)
l0.place(x=400, y=20)
l1.place(x=20, y=20)
c1.place(x=20, y=60)
t1.place(x=400, y=60, width=100, height=150)
l2.place(x=20, y=300)
c2.place(x=400, y=300, width=100)
l3.place(x=20, y=340)
c3.place(x=400, y=340, width=100)
l4.place(x=20, y=380)
e1.place(x=400, y=380, width=100)
l5.place(x=20, y=420)
e2.place(x=400, y=420, width=100)
b1.place(x=400, y=460)
root1.mainloop()
def go1(*args):
global mem_list, cpu_list
t1.delete('1.0', 'end')
host_name = c1.get() # 物理主機名
client = client_list[host_name] # 通過物理主機名獲取DockerClient
cts = get_all_containers(client)
for i in range(0, len(cts)):
# print(cts[i].name)
t1.insert('end', cts[i].name + '\n')
mem_max = hosts_list[host_name]['remainMem']
cpu_max = hosts_list[host_name]['remainCpu']
mem_list = [i for i in range(1, int(mem_max)+1)]
cpu_list = [i for i in range(1, int(cpu_max)+1)]
c2["values"] = mem_list
c3["values"] = cpu_list
# 獲取用戶為新容器輸入的引數
def get1(self):
host_name = c1.get() # 物理主機名
in_mem = c2.get()
cpu_num = c3.get()
cntr_name = e1.get()
host = e2.get()
create_new(host_name, in_mem, cpu_num, cntr_name, host)
# 創建新容器
def create_new(host_name, in_mem, cpu_num, cntr_name, inside_host_name):
remain_mem = hosts_list[host_name]['remainMem'] # 獲取該物理機剩余記憶體
remain_cpu = hosts_list[host_name]['remainCpu'] # 獲取該物理機剩余cpu數量
hadoop_hosts = hadoop_dict.keys()
try:
client = client_list[host_name] # 通過物理主機名獲取DockerClient
except:
tkinter.messagebox.showerror(title='提示', message='虛擬機引數資訊輸入錯誤')
return
mem_num = int(eval(in_mem))
if mem_num > remain_mem:
tkinter.messagebox.showerror(title='提示', message='選擇記憶體過大!所有容器可使用記憶體超過主機最大記憶體的80%')
return
mem_max = in_mem + 'g'
cpu_number = int(eval(cpu_num))
if cpu_number > remain_cpu:
tkinter.messagebox.showerror(title='提示', message='選擇cpu數量過大!所有容器可使用cpu超過主機cpu數量')
return
if inside_host_name in hadoop_hosts:
tkinter.messagebox.showerror(title='提示', message='創建失敗!hadoop集群中已有節點使用了該主機名')
return
cpu_remain_list = hosts_list[host_name]['avi_cpu'] # 可使用的cpu
get_cpu = cpu_remain_list[:cpu_number] # 從中選取cpu_number個cpu
cpu_set = ''
for cpu in get_cpu:
cpu_set += str(cpu)
cpu_set += ','
cpu_set = cpu_set[:-1]
print('cpu_set:', cpu_set)
reduce_cpu(host_name, cpu_number, get_cpu) # 從可用cpu數量中減去剛使用的,從可用cpu中去掉剛選用的
reduce_men(host_name, mem_num) # 從可用記憶體中減去剛分配的
new_container = client.containers.run(image='ubuntu/hadoop', name=cntr_name, mem_limit=mem_max, tty=True,
privileged=True,
detach=True, stdin_open=True, cpuset_cpus=cpu_set, hostname=inside_host_name,
volumes=volumes[host_name]
)
go1()
modify_file(host_name, cntr_name, inside_host_name) # 修改組態檔
join_hadoop(new_container) # 加入到已有的hadoop集群
tk.messagebox.showinfo(title='提示', message='容器創建成功')
delete.py
import tkinter as tk
from tkinter import ttk
from hosts import name_list, get_closed_cntr,default_host,client_list
import docker
temp_list4=[]
def delete_cntr():
root5=tk.Tk()
root5.title('洗掉容器')
root5.geometry('650x600')
global t5,c10,c11, temp_list4
l0 = tk.Label(root5, text='當前選擇的物理機上不在運行中的容器')
l1 = tk.Label(root5, text='請選擇洗掉哪臺物理機上的容器')
v1 = tk.StringVar()
c10 = ttk.Combobox(root5, textvariable=v1)
c10["values"] = name_list
c10.current(name_list.index(default_host))
t5 = tk.Text(root5)
c10.bind("<<ComboboxSelected>>", go5)
cts = get_closed_cntr(client_list[default_host])
temp_list4 = []
for i in range(0, len(cts)):
print(cts[i].name)
t5.insert('end', cts[i].name + '\n')
temp_list4.append(cts[i].name)
l2 = tk.Label(root5, text='請輸入要洗掉的容器的name')
v2 = tk.StringVar()
c11 = ttk.Combobox(root5, textvariable=v2)
c11["values"] = temp_list4
b1 = tk.Button(root5, text='洗掉容器')
b1.bind('<Button-1>', get4)
l0.place(x=400, y=20)
l1.place(x=20, y=20)
c10.place(x=20, y=60)
t5.place(x=400, y=60, width=100, height=150)
l2.place(x=20, y=300)
c11.place(x=400, y=300, width=100)
b1.place(x=400, y=460)
root5.mainloop()
def go5(*args):
temp_list4.clear()
t5.delete('1.0', 'end')
host_name = c10.get()
client = client_list[host_name]
cts = get_closed_cntr(client)
for i in range(0, len(cts)):
temp_list4.append(cts[i].name)
t5.insert('end', cts[i].name + '\n')
c11["values"] = temp_list4
def get4(self):
host_name = c10.get()
name = c11.get()
del_container(host_name, name)
def del_container(host_name,name):
try:
client = client_list[host_name]
except:
tk.messagebox.showerror(title='提示',message='虛擬機引數資訊輸入錯誤')
return
try:
container = client.containers.get(name)
container.remove()
go5()
tk.messagebox.showinfo("提示", '成功洗掉虛擬機')
except docker.errors.NotFound as e:
tk.messagebox.showerror(title='提示', message='未找到該虛擬機,請重新輸入')
init.py
import os
import tkinter as tk
import hdfs
from hdfs.client import Client
from tkinter import ttk
from hosts import name_list, get_closed_cntr, client_list, default_host, \
join_hadoop, modify_file, get_cntr_hostname, reduce_men, reduce_cpu, \
hosts_list, hadoop_dict, get_cpu_num_list,hadoop_master,environment,prefix
import docker
workdir = '/root/earthquake/'
host='http://'+hadoop_master['ip']+':50070/'
print(host)
client = Client(host, root="/", timeout=10000, session=False)
host_name = hadoop_master['host_name']
print(host_name)
container = client_list[host_name].containers.get(hadoop_master['name'])
print(hadoop_master['name'])
def init():
out1 = container.exec_run('/usr/local/hadoop/bin/hadoop fs -chmod 777 /', stdout=True, stderr=True,
environment=environment, workdir='/')
print(out1)
init_root= tk.Tk()
init_root.title('DataManage')
init_root.geometry('600x600')
global e11,e21,e12,e22
# canvas = tk.Canvas(root, height=200, width=500)
# image_file = tk.PhotoImage(file='welcome.gif')
# image = canvas.create_image(40, 0, anchor='nw', image=image_file)
# canvas.pack(side='top')
v11=tk.StringVar()
v21=tk.StringVar()
v12=tk.StringVar()
v22=tk.StringVar()
l11=tk.Label(init_root, text='上傳檔案:',bg='white')
l21=tk.Label(init_root, text='請輸入上傳檔案的本地路徑')
l31=tk.Label(init_root, text='請輸入上傳檔案的hdfs路徑')
l12=tk.Label(init_root, text='下載檔案:',bg='white')
l22=tk.Label(init_root, text='請輸入下載檔案的本地路徑')
l32=tk.Label(init_root, text='請輸入下載檔案的hdfs路徑')
e11=tk.Entry(init_root, textvariable=v11)
e21=tk.Entry(init_root, textvariable=v21)
e12=tk.Entry(init_root, textvariable=v12)
e22=tk.Entry(init_root, textvariable=v22)
b11 = tk.Button(init_root, text='upload',command=get11)
b12 = tk.Button(init_root, text='download',command=get12)
b2 = tk.Button(init_root, text='資料分析',command=get2)
b3 = tk.Button(init_root, text='可視化',command=get3)
l11.place(x=20,y=20, width=100)
l21.place(x=20,y=50)
e11.place(x=20, y=90,width=200)
l31.place(x=20,y=130)
e21.place(x=20,y=170,width=200)
b11.place(x=20, y=210, width=100)
l12.place(x=300,y=20, width=100)
l22.place(x=300,y=50)
e12.place(x=300, y=90,width=200)
l32.place(x=300,y=130)
e22.place(x=300,y=170,width=200)
b12.place(x=300, y=210, width=100)
b2.place(x=20,y=300,width=100)
b3.place(x=300,y=300,width=100)
init_root.mainloop()
def get11():
local_path=e11.get()
hdfs_path=e21.get()
print(local_path, hdfs_path)
upload_file(local_path, hdfs_path)
def get12():
local_path=e12.get()
hdfs_path=e22.get()
print(local_path, hdfs_path)
download_file(local_path, hdfs_path)
def upload_file(local_path,hdfs_path):
try:
client.upload(hdfs_path, local_path)
tk.messagebox.showinfo(title='提示', message='檔案上傳成功')
except hdfs.util.HdfsError as e:
tk.messagebox.showerror(title='提示', message='hdfs已存在該資料路徑,請重新輸入')
def download_file(local_path,hdfs_path):
try:
client.download(hdfs_path, local_path)
tk.messagebox.showinfo(title='提示', message='檔案下載成功')
except hdfs.util.HdfsError as e:
tk.messagebox.showerror(title='提示', message='該檔案不存在,請重新輸入')
def get2():
analyze()
def get3():
plot()
def analyze():
python_file = 'analyze.py'
a = '/usr/bin/python3 ' + python_file
b = container
out1 = container.exec_run('/usr/bin/python3 ' + python_file, stdout=True, stderr=True,
environment=environment, workdir=workdir)
print(out1.output)
tk.messagebox.showinfo(title='提示', message='資料分析完成,分析結果已保存在~/earthquake/temp_file中')
def plot():
python_path = '/usr/bin/python3'
path = prefix + '/earthquake/'
python_file = 'visualization.py'
os.system('cd ' + path + ';' + python_path + ' ' + python_file)
start.py
import tkinter as tk
from tkinter import ttk
from hosts import name_list, get_closed_cntr, client_list, default_host, \
join_hadoop, modify_file, get_cntr_hostname, reduce_men, reduce_cpu, \
hosts_list, hadoop_dict, get_cpu_num_list
import docker
temp_list2 = [] # 顯示內容
def start_cntr():
root3 = tk.Tk()
root3.title('開啟容器')
root3.geometry('650x600')
global c6, c7, t3, temp_list2
l0 = tk.Label(root3, text='當前選中物理機上未開啟的容器')
l1 = tk.Label(root3, text='請選擇想要開啟的容器所在的物理機')
v1 = tk.StringVar()
c6 = ttk.Combobox(root3, textvariable=v1)
c6["values"] = name_list
c6.current(name_list.index(default_host))
t3 = tk.Text(root3)
c6.bind("<<ComboboxSelected>>", go3)
cts = get_closed_cntr(client_list[default_host]) # 獲取未開啟的容器
temp_list2 = []
for i in range(0, len(cts)):
print(cts[i].name)
t3.insert('end', cts[i].name + '\n')
temp_list2.append(cts[i].name)
l2 = tk.Label(root3, text='請輸入要開啟容器的name')
v2 = tk.StringVar()
c7 = ttk.Combobox(root3, textvariable=v2)
c7["values"] = temp_list2
b1 = tk.Button(root3, text='開啟容器')
b1.bind('<Button-1>', get2)
l0.place(x=400, y=20)
l1.place(x=20, y=20)
c6.place(x=20, y=60)
t3.place(x=400, y=60, width=100, height=150)
l2.place(x=20, y=300)
c7.place(x=400, y=300, width=100)
b1.place(x=400, y=460)
root3.mainloop()
# 更新顯示內容
def go3(*args):
temp_list2.clear()
t3.delete('1.0', 'end')
host_name = c6.get() # 獲取物理主機名
client = client_list[host_name]
cts = get_closed_cntr(client)
for i in range(0, len(cts)):
temp_list2.append(cts[i].name)
t3.insert('end', cts[i].name + '\n')
c7["values"] = temp_list2
def get2(self):
host_name = c6.get()
name = c7.get()
start_container(host_name, name)
def start_container(host_name, name):
try:
client = client_list[host_name]
except:
tk.messagebox.showerror(title='提示', message='容器引數資訊輸入錯誤')
return
try:
container = client.containers.get(name)
inside_host_name = get_cntr_hostname(container) # 要開啟的容器內主機名
mem_limit = int(container.attrs['HostConfig']['Memory'] / 1024 / 1024 / 1024) # 要開啟的容器最多可占用的記憶體大小,單位G
cpu_num, use_cpu_list = get_cpu_num_list(container)
avi_cpu_list = hosts_list[host_name]['avi_cpu'] # 可使用的cpu
avi_cpu_num = hosts_list[host_name]['remainCpu'] # 可使用的cpu數量
avi_mem = hosts_list[host_name]['remainMem'] # 可使用的記憶體
hadoop_hosts = hadoop_dict.keys() # hadoop集群每個節點內的主機名
if mem_limit >= avi_mem:
tk.messagebox.showinfo("提示", '該物理主機上可用記憶體小于待開啟容器可用的最大記憶體,開啟失敗!')
elif cpu_num >= avi_cpu_num:
tk.messagebox.showinfo("提示", 'cpu數量不足,開啟失敗!')
elif inside_host_name in hadoop_hosts:
tk.messagebox.showinfo("提示", '待開啟容器的主機名在hadoop集群中已經存在,開啟失敗 !')
elif len(set(use_cpu_list)|set(avi_cpu_list)) != avi_cpu_num :
busy_cpu_list = [ cpu for cpu in use_cpu_list if cpu not in avi_cpu_list] # 該容器需要的cpu中被占用的cpu
text = ' '
for cpu in busy_cpu_list:
text = text+str(cpu)+' '
tk.messagebox.showinfo("提示", '待開啟容器需要的cpu:'+text+'正在被其他容器使用,開啟失敗!')
else: # 剩余記憶體,cpu滿足要求
reduce_cpu(host_name, cpu_num, use_cpu_list) # 減少物理機可用cpu
reduce_men(host_name, mem_limit) # 減少物理機可用記憶體
container.start() # 開啟容器
inside_host_name = get_cntr_hostname(container)
modify_file(host_name, name, inside_host_name) # 修改組態檔
join_hadoop(container) # 加入到已有的hadoop集群
go3()
tk.messagebox.showinfo("提示", '成功開啟容器')
except docker.errors.NotFound as e:
tk.messagebox.showerror(title='提示', message='未找到該容器,請重新輸入')
stop.py
import tkinter as tk
from tkinter import ttk
from hosts import name_list, get_closed_cntr, client_list, default_host, \
join_hadoop, modify_file, get_cntr_hostname, reduce_men, reduce_cpu, \
hosts_list, hadoop_dict, get_cpu_num_list
import docker
temp_list2 = [] # 顯示內容
def start_cntr():
root3 = tk.Tk()
root3.title('開啟容器')
root3.geometry('650x600')
global c6, c7, t3, temp_list2
l0 = tk.Label(root3, text='當前選中物理機上未開啟的容器')
l1 = tk.Label(root3, text='請選擇想要開啟的容器所在的物理機')
v1 = tk.StringVar()
c6 = ttk.Combobox(root3, textvariable=v1)
c6["values"] = name_list
c6.current(name_list.index(default_host))
t3 = tk.Text(root3)
c6.bind("<<ComboboxSelected>>", go3)
cts = get_closed_cntr(client_list[default_host]) # 獲取未開啟的容器
temp_list2 = []
for i in range(0, len(cts)):
print(cts[i].name)
t3.insert('end', cts[i].name + '\n')
temp_list2.append(cts[i].name)
l2 = tk.Label(root3, text='請輸入要開啟容器的name')
v2 = tk.StringVar()
c7 = ttk.Combobox(root3, textvariable=v2)
c7["values"] = temp_list2
b1 = tk.Button(root3, text='開啟容器')
b1.bind('<Button-1>', get2)
l0.place(x=400, y=20)
l1.place(x=20, y=20)
c6.place(x=20, y=60)
t3.place(x=400, y=60, width=100, height=150)
l2.place(x=20, y=300)
c7.place(x=400, y=300, width=100)
b1.place(x=400, y=460)
root3.mainloop()
# 更新顯示內容
def go3(*args):
temp_list2.clear()
t3.delete('1.0', 'end')
host_name = c6.get() # 獲取物理主機名
client = client_list[host_name]
cts = get_closed_cntr(client)
for i in range(0, len(cts)):
temp_list2.append(cts[i].name)
t3.insert('end', cts[i].name + '\n')
c7["values"] = temp_list2
def get2(self):
host_name = c6.get()
name = c7.get()
start_container(host_name, name)
def start_container(host_name, name):
try:
client = client_list[host_name]
except:
tk.messagebox.showerror(title='提示', message='容器引數資訊輸入錯誤')
return
try:
container = client.containers.get(name)
inside_host_name = get_cntr_hostname(container) # 要開啟的容器內主機名
mem_limit = int(container.attrs['HostConfig']['Memory'] / 1024 / 1024 / 1024) # 要開啟的容器最多可占用的記憶體大小,單位G
cpu_num, use_cpu_list = get_cpu_num_list(container)
avi_cpu_list = hosts_list[host_name]['avi_cpu'] # 可使用的cpu
avi_cpu_num = hosts_list[host_name]['remainCpu'] # 可使用的cpu數量
avi_mem = hosts_list[host_name]['remainMem'] # 可使用的記憶體
hadoop_hosts = hadoop_dict.keys() # hadoop集群每個節點內的主機名
if mem_limit >= avi_mem:
tk.messagebox.showinfo("提示", '該物理主機上可用記憶體小于待開啟容器可用的最大記憶體,開啟失敗!')
elif cpu_num >= avi_cpu_num:
tk.messagebox.showinfo("提示", 'cpu數量不足,開啟失敗!')
elif inside_host_name in hadoop_hosts:
tk.messagebox.showinfo("提示", '待開啟容器的主機名在hadoop集群中已經存在,開啟失敗 !')
elif len(set(use_cpu_list)|set(avi_cpu_list)) != avi_cpu_num :
busy_cpu_list = [ cpu for cpu in use_cpu_list if cpu not in avi_cpu_list] # 該容器需要的cpu中被占用的cpu
text = ' '
for cpu in busy_cpu_list:
text = text+str(cpu)+' '
tk.messagebox.showinfo("提示", '待開啟容器需要的cpu:'+text+'正在被其他容器使用,開啟失敗!')
else: # 剩余記憶體,cpu滿足要求
reduce_cpu(host_name, cpu_num, use_cpu_list) # 減少物理機可用cpu
reduce_men(host_name, mem_limit) # 減少物理機可用記憶體
container.start() # 開啟容器
inside_host_name = get_cntr_hostname(container)
modify_file(host_name, name, inside_host_name) # 修改組態檔
join_hadoop(container) # 加入到已有的hadoop集群
go3()
tk.messagebox.showinfo("提示", '成功開啟容器')
except docker.errors.NotFound as e:
tk.messagebox.showerror(title='提示', message='未找到該容器,請重新輸入')
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/289206.html
標籤:其他
