base.py依賴的python包(Queue,threading,os,signal,subprocess/subprocess32,sys,time,warnings,paramiko,getpass),依賴的gp包(gplog,gpsubprocess,pygresql),pygresql匯入陳述句的是from pygresql.pg import DB,主要使用的DB是SQLCommand類,這個類先不用關注,gpsubprocess是對subprocess的封裝,可以看到這里使用了兩個子行程包gpsubprocess和subprocess,
代碼分析
WorkerPool類
先看WorkPool類定義,類實體包含了存放worker實體的串列、存放帶執行Command的work_queue佇列、存放執行完Command的completed_queue佇列,WorkerPool中的worker實體的數量是在建構式就給定的,初始化后worker實體會一直運行,圖中的start就是在建構式中完成的,Worker實體從work_queue佇列中取作業項Command的函式是getNetWorkItem,
def getNextWorkItem(self):
return self.work_queue.get(block=True)
Worker實體處理完成work_queue中所有命令之后,取不到命令或取到的命令是halt_command,或者任務池標志了should_stop之后,使用markTaskDone函式告知WorkPool該任務完成,
def markTaskDone(self):
self.work_queue.task_done()
clsSystemState.py的GpSystemStateProgram類中run函式中有base檔案中類的使用示例,簡化如下,通過這些示例來學習任務池的使用:
dispatchCount = 0
pool = base.WorkerPool(parallelDegree) #parallelDegree給定worker個數
for hostName, segments in ...:
cmd = ...
hostNameToCmd[hostName] = cmd
pool.addCommand(cmd)
dispatchCount+=1
pool.wait_and_printdots(dispatchCount)
hostNameToResults = {}
for hostName, cmd in hostNameToCmd.iteritems():
hostNameToResults[hostName] = cmd.decodeResults() #取出結果集
pool.haltWork()
主要流程:通過addCommand函式向佇列(work_queue)中添加作業負載(WorkerPool類可以通過建構式向佇列中添加多個命令(串列形式),或者通過addCommand函式添加單個命令),針對work_queue佇列由三種join的方法,代碼如下,hostNameCmd是一個字典,鍵為hostName,值為cmd,通過cmd.decodeResults函式取出結果集,
def join(self):
self.work_queue.join()
return True
def _join_work_queue_with_timeout(self, timeout):
"""
Queue.join() unfortunately doesn't take a timeout (see
https://bugs.python.org/issue9634). Fake it here, with a solution
inspired by notes on that bug report.
XXX This solution uses undocumented Queue internals (though they are not
underscore-prefixed...).
"""
done_condition = self.work_queue.all_tasks_done
done_condition.acquire()
try:
while self.work_queue.unfinished_tasks:
if (timeout <= 0):
# Timed out.
return
start_time = time.time()
done_condition.wait(timeout)
timeout -= (time.time() - start_time)
finally:
done_condition.release()
def wait_and_printdots(self, command_count, quiet=True):
while self.completed_queue.qsize() < command_count:
time.sleep(1)
if not quiet:
sys.stdout.write(".")
sys.stdout.flush()
if not quiet:
print " "
self.join()

Queue/queue模塊的類
屬性 描述
Queue(maxsize=0) 創建一個先入先出佇列,如果給定最大值,則在佇列沒有空間時阻塞;否則,為無限佇列
LifoQueue(maxsize=0) 創建一個后入先出佇列,如果給定最大值,則在佇列沒有空間時阻塞;否則,為無限佇列
PriorityQueue(maxsize=0) 創建一個優先級佇列,如果給定最大值,則在佇列沒有空間時阻塞,否則,為無限佇列
Queue/queue例外
屬性 描述
Empty
當對孔佇列呼叫get*()方法時拋出例外
Full 當對已滿的佇列呼叫put*()方法時拋出例外
Worker類
worker類繼承自threading模塊中的Thread類,run函式先使用getNextWorkItem函式取得command,總共有四種情況:任務池中沒有command,該Worker示例需要向任務池標記任務完成;如果取得的命令是pool.halt_command,該Worker示例需要向任務池標記任務完成;如果任務池標記了should_stop,該Worker示例需要向任務池標記任務完成;下面是正常流程,執行命令,并將命令放入任務池完成佇列,
class Woker(Thread):
...
def run(self):
while True:
try:
try:
self.cmd = self.pool.getNextWorkItem()
except TypeError:
# misleading exception raised during interpreter shutdown
return
# we must have got a command to run here
if self.cmd is None:
self.logger.debug("[%s] got a None cmd" % self.name)
self.pool.markTaskDone()
elif self.cmd is self.pool.halt_command:
self.logger.debug("[%s] got a halt cmd" % self.name)
self.pool.markTaskDone()
self.cmd = None
return
elif self.pool.should_stop:
self.logger.debug("[%s] got cmd and pool is stopped: %s" % (self.name, self.cmd))
self.pool.markTaskDone()
self.cmd = None
else:
self.logger.debug("[%s] got cmd: %s" % (self.name, self.cmd.cmdStr))
self.cmd.run()
self.logger.debug("[%s] finished cmd: %s" % (self.name, self.cmd))
self.pool.addFinishedWorkItem(self.cmd)
self.cmd = None
except Exception, e:
self.logger.exception(e)
if self.cmd:
self.logger.debug("[%s] finished cmd with exception: %s" % (self.name, self.cmd))
self.pool.addFinishedWorkItem(self.cmd)
self.cmd = None
def haltWork(self):
self.logger.debug("[%s] haltWork" % self.name)
c = self.cmd
if c is not None and isinstance(c, Command):
c.interrupt()
c.cancel()
threading模塊的Thread類實體化表示一個執行執行緒的物件,擁有的資料屬性name(執行緒名)、ident(執行緒的識別符號)、daemon(布爾標志,表示這個執行緒是否是守護執行緒),方法如下:
Thread物件方法描述
_ini_(group=None, target=None, name=None, args=(), kwargs={}, verbose=None, daemon=None) 實體化一個執行緒物件,需要有一個可呼叫的target,以及其引數args或kwargs,還可以傳遞name或group引數,不過后者還未實作,此外,verbose標志也是可以接受的,而daemon的值將會設定thread.daemon屬性/標志
| 成員 | 描述 |
|---|---|
| start() | 開始執行該執行緒 |
| run() | 定義執行緒功能的方法(通常在子類中被應用開發者重寫) |
| join(timeout=None) | 直至啟動的執行緒終止之前一直掛起;除非給出timeout(秒),否則會一直阻塞 |
| getName() | 回傳執行緒名 |
| setName(name) | 設定執行緒名 |
| isAlivel/is_alive() | 布爾標志,表示這個執行緒是否還存活 |
| isDaemon() | 如果是守護執行緒,則回傳True;否則,回傳False |
| setDaemon(daemonic) | 把執行緒的守護標志設定為布林值daemonic(必須在執行緒start()之前呼叫) |
使用Thread類可以有很多方法創建執行緒,這里介紹三種方法:1. 創建Thread的實體,傳給它一個函式;2. 創建Thread的實體,傳給它一個可呼叫的類實體;3.派生Thread的子類,并創建子類的實體,
Command類
Command類有兩個執行函式runNoWait和run函式,runNoWait函式通過呼叫exec_context.execute(self,wait=False)函式執行命令,并回傳proc;run函式直接呼叫exec_context.execute(self)函式,
def runNoWait(self):
faultPoint = os.getenv('GP_COMMAND_FAULT_POINT')
if not faultPoint or (self.name and not self.name.startswith(faultPoint)):
self.exec_context.execute(self, wait=False)
return self.exec_context.proc
def run(self, validateAfter=False):
faultPoint = os.getenv('GP_COMMAND_FAULT_POINT')
if not faultPoint or (self.name and not self.name.startswith(faultPoint)):
self.exec_context.execute(self)
else:
# simulate error
self.results = CommandResult(1, 'Fault Injection', 'Fault Injection', False, True)
if validateAfter:
self.validate()
pass
ExecutionContext類
以RemoteExecutionContext執行背景關系類為例,引數是以以下方法處理的
keys = sorted(cmd.propagate_env_map.keys(), reverse=True) for k in keys: cmd.cmdStr = "%s=%s && %s" % (k, cmd.propagate_env_map[k], cmd.cmdStr),將引數序列化到cmdStr中,對于LocalExecutionContext來說,呼叫如下命令執行命令:self.proc = gpsubprocess.Popen(cmd.cmdStr, env=None, shell=True,executable='/bin/bash',stdin=subprocess.PIPE,stderr=subprocess.PIPE,stdout=subprocess.PIPE, close_fds=True),對GP封裝的subprocess模式請參見相應其他系列的博客,如果需要等待子行程,則呼叫(rc, stdout_value, stderr_value) = self.proc.communicate2(input=self.stdin),然后使用cmd.set_results(CommandResult( rc, "".join(stdout_value), "".join(stderr_value), self.completed, self.halt)) def cancel(self, cmd):封裝命令回傳的結果,
class LocalExecutionContext(ExecutionContext):
proc = None
halt = False
completed = False
def __init__(self, stdin):
ExecutionContext.__init__(self)
self.stdin = stdin
pass
def execute(self, cmd, wait=True):
# prepend env. variables from ExcecutionContext.propagate_env_map
# e.g. Given {'FOO': 1, 'BAR': 2}, we'll produce "FOO=1 BAR=2 ..."
# also propagate env from command instance specific map
keys = sorted(cmd.propagate_env_map.keys(), reverse=True)
for k in keys:
cmd.cmdStr = "%s=%s && %s" % (k, cmd.propagate_env_map[k], cmd.cmdStr)
# executable='/bin/bash' is to ensure the shell is bash. bash isn't the
# actual command executed, but the shell that command string runs under.
self.proc = gpsubprocess.Popen(cmd.cmdStr, env=None, shell=True,executable='/bin/bash',stdin=subprocess.PIPE,stderr=subprocess.PIPE,stdout=subprocess.PIPE, close_fds=True)
cmd.pid = self.proc.pid
if wait:
(rc, stdout_value, stderr_value) = self.proc.communicate2(input=self.stdin)
self.completed = True
cmd.set_results(CommandResult(
rc, "".join(stdout_value), "".join(stderr_value), self.completed, self.halt))
def cancel(self, cmd):
if self.proc:
try:
os.kill(self.proc.pid, signal.SIGTERM)
except OSError:
pass
def interrupt(self, cmd):
self.halt = True
if self.proc:
self.proc.cancel()
class RemoteExecutionContext(LocalExecutionContext):
trail = set()
def __init__(self, targetHost, stdin, gphome=None):
LocalExecutionContext.__init__(self, stdin)
self.targetHost = targetHost
if gphome:
self.gphome = gphome
else:
self.gphome = GPHOME
def execute(self, cmd):
# prepend env. variables from ExcecutionContext.propagate_env_map
# e.g. Given {'FOO': 1, 'BAR': 2}, we'll produce "FOO=1 BAR=2 ..."
self.__class__.trail.add(self.targetHost)
# also propagate env from command instance specific map
keys = sorted(cmd.propagate_env_map.keys(), reverse=True)
for k in keys:
cmd.cmdStr = "%s=%s && %s" % (k, cmd.propagate_env_map[k], cmd.cmdStr)
# Escape " for remote execution otherwise it interferes with ssh
cmd.cmdStr = cmd.cmdStr.replace('"', '\\"')
cmd.cmdStr = "ssh -o StrictHostKeyChecking=no -o ServerAliveInterval=60 " \
"{targethost} \"{gphome} {cmdstr}\"".format(targethost=self.targetHost,
gphome=". %s/greenplum_path.sh;" % self.gphome,
cmdstr=cmd.cmdStr)
LocalExecutionContext.execute(self, cmd)
if (cmd.get_results().stderr.startswith('ssh_exchange_identification: Connection closed by remote host')):
self.__retry(cmd)
pass
def __retry(self, cmd, count=0):
if count == SSH_MAX_RETRY:
return
time.sleep(SSH_RETRY_DELAY)
LocalExecutionContext.execute(self, cmd)
if (cmd.get_results().stderr.startswith('ssh_exchange_identification: Connection closed by remote host')):
self.__retry(cmd, count + 1)
而RemoteExecutionContext先對引數進行格式化后,還需要對雙引號使用雙斜線進行替換后,然后添加ssh相關命令選項,代碼如下所示:cmd.cmdStr = cmd.cmdStr.replace('"', '\\"') cmd.cmdStr = "ssh -o StrictHostKeyChecking=no -o ServerAliveInterval=60 {targethost} \"{gphome} {cmdstr}\"".format(targethost=self.targetHost, gphome=". %s/greenplum_path.sh;" % self.gphome, cmdstr=cmd.cmdStr),然后呼叫LocalExecutionContext父類的execute函式,如果回傳的結果包含ssh_exchange_identification: Connection closed by remote host,則需要進行等待相應的時間,然后進行重試,

Controller-Worker架構模式

輔助說明:
Controller-Worker是一種組合架構模式,Controller基于Client的引數動態生成Woker數量,并控制Woker的生命周期,如創建和終止,
Controller屬性:
Controller事先知道自身擁有的Woker型別,
Controller依賴一個作業任務池,通過作業任務池Controller監控整體任務執行情況,
Worker屬性:
Worker并行消費作業任務池中任務,并把執行結果回傳到任務池中,
Worker彼此間沒有任何耦合,

輔組說明:
Controller通過WorkerPool和Worker進行命令傳遞,
Controller通過超時機制,保證最后一定有命令結果回傳給Client
Controller通過halt命令,停止所有的Woker
Worker采用Thread方式來實作,
Worker1、Worker2、WorkerN無差別,根據獲取的Cmd,通過ssh方式在對應的Host執行命令,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/250199.html
標籤:其他
