在此前的文章中,我已經向你介紹了Kubeflow,這是一個為團隊設定的機器學習平臺,需要構建機器學習流水線,
在本文中,我們將了解如何采用現有的機器學習詳細并將其變成Kubeflow的機器學習流水線,進而可以部署在Kubernetes上,在進行本次練習的時候,請考慮你該如何將現有的機器學習專案轉換到Kubeflow上,
我將使用Fashion MNIST作為例子,因為在本次練習中模型的復雜性并不是我們需要解決的主要目標,對于這一簡單的例子,我將流水線分為3個階段:
-
Git clone代碼庫
-
下載并重新處理訓練和測驗資料
-
訓練評估
當然,你可以根據自己的用例將流水線以任意形式拆分,并且可以隨意擴展流水線,
獲取代碼
你可以從Github上獲取代碼:
% git clone https://github.com/benjamintanweihao/kubeflow-mnist.git
以下是我們用來創建流水線的完整清單,實際上,你的代碼很可能跨多個庫和檔案,在我們的例子中,我們將代碼分為兩個腳本,preprocessing.py和train.py,
from tensorflow import keras
import argparse
import os
import pickle
def preprocess(data_dir: str):
fashion_mnist = keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
train_images = train_images / 255.0
test_images = test_images / 255.0
os.makedirs(data_dir, exist_ok=True)
with open(os.path.join(data_dir, 'train_images.pickle'), 'wb') as f:
pickle.dump(train_images, f)
with open(os.path.join(data_dir, 'train_labels.pickle'), 'wb') as f:
pickle.dump(train_labels, f)
with open(os.path.join(data_dir, 'test_images.pickle'), 'wb') as f:
pickle.dump(test_images, f)
with open(os.path.join(data_dir, 'test_labels.pickle'), 'wb') as f:
pickle.dump(test_labels, f)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Kubeflow MNIST training script')
parser.add_argument('--data_dir', help='path to images and labels.')
args = parser.parse_args()
preprocess(data_dir=args.data_dir)
處理腳本采用單個引數data_dir,它下載并預處理資料,并將pickled版本保存在data_dir中,在生產代碼中,這可能是TFRecords的存盤目錄,
train.py
import calendar
import os
import time
import tensorflow as tf
import pickle
import argparse
from tensorflow import keras
from constants import PROJECT_ROOT
def train(data_dir: str):
# Training
model = keras.Sequential([
keras.layers.Flatten(input_shape=(28, 28)),
keras.layers.Dense(128, activation='relu'),
keras.layers.Dense(10)])
model.compile(optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
with open(os.path.join(data_dir, 'train_images.pickle'), 'rb') as f:
train_images = pickle.load(f)
with open(os.path.join(data_dir, 'train_labels.pickle'), 'rb') as f:
train_labels = pickle.load(f)
model.fit(train_images, train_labels, epochs=10)
with open(os.path.join(data_dir, 'test_images.pickle'), 'rb') as f:
test_images = pickle.load(f)
with open(os.path.join(data_dir, 'test_labels.pickle'), 'rb') as f:
test_labels = pickle.load(f)
# Evaluation
test_loss, test_acc = model.evaluate(test_images, test_labels, verbose=2)
print(f'Test Loss: {test_loss}')
print(f'Test Acc: {test_acc}')
# Save model
ts = calendar.timegm(time.gmtime())
model_path = os.path.join(PROJECT_ROOT, f'mnist-{ts}.h5')
tf.saved_model.save(model, model_path)
with open(os.path.join(PROJECT_ROOT, 'output.txt'), 'w') as f:
f.write(model_path)
print(f'Model written to: {model_path}')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Kubeflow FMNIST training script')
parser.add_argument('--data_dir', help='path to images and labels.')
args = parser.parse_args()
train(data_dir=args.data_dir)
在train.py中,將建立模型,并使用data_dir指定訓練和測驗資料的位置,模型訓練完畢并且開始執行評估后,將模型寫入帶有時間戳的路徑,請注意,該路徑也已寫入output.txt,稍后將對此進行參考,
開發Kubeflow流水線
為了開始創建Kubeflow流水線,我們需要拉取一些依賴項,我準備了一個environment.yml,其中包括了kfp 0.5.0、tensorflow以及其他所需的依賴項,
你需要安裝Conda,然后執行以下步驟:
% conda env create -f environment.yml
% source activate kubeflow-mnist
% python preprocessing.py --data_dir=/path/to/data
% python train.py --data_dir=/path/to/data
現在我們來回顧一下我們流水線中的幾個步驟:
-
Git clone代碼庫
-
下載并預處理訓練和測驗資料
-
訓練并進行評估
在我們開始寫代碼之前,需要從宏觀上了解Kubeflow流水線,
流水線由連接組件構成,一個組件的輸出成為另一個組件的輸入,每個組件實際上都在容器中執行(在本例中為Docker),將發生的情況是,我們會執行一個我們稍后將要指定的Docker鏡像,它包含了我們運行preprocessing.py和train.py所需的一切,當然,這兩個階段會有它們的組件,
我們還需要額外的一個鏡像以git clone專案,我們需要將專案bake到Docker鏡像,但在實際專案中,這可能會導致Docker鏡像的大小膨脹,
說到Docker鏡像,我們應該先創建一個,
Step0:創建一個Docker鏡像
如果你只是想進行測驗,那么這個步驟不是必須的,因為我已經在Docker Hub上準備了一個鏡像,這是Dockerfile的全貌:
FROM tensorflow/tensorflow:1.14.0-gpu-py3
LABEL MAINTAINER "Benjamin Tan <[email protected]>"
SHELL ["/bin/bash", "-c"]
# Set the locale
RUN echo 'Acquire {http::Pipeline-Depth "0";};' >> /etc/apt/apt.conf
RUN DEBIAN_FRONTEND="noninteractive"
RUN apt-get update && apt-get -y install --no-install-recommends locales && locale-gen en_US.UTF-8
ENV LANG en_US.UTF-8
ENV LANGUAGE en_US:en
ENV LC_ALL en_US.UTF-8
RUN apt-get install -y --no-install-recommends \
wget \
git \
python3-pip \
openssh-client \
python3-setuptools \
google-perftools && \
rm -rf /var/lib/apt/lists/*
# install conda
WORKDIR /tmp
RUN wget --quiet https://repo.anaconda.com/miniconda/Miniconda3-4.7.12-Linux-x86_64.sh -O ~/miniconda.sh && \
/bin/bash ~/miniconda.sh -b -p /opt/conda && \
rm ~/miniconda.sh && \
ln -s /opt/conda/etc/profile.d/conda.sh /etc/profile.d/conda.sh && \
echo ". /opt/conda/etc/profile.d/conda.sh" >> ~/.bashrc
# build conda environments
COPY environment.yml /tmp/kubeflow-mnist/conda/
RUN /opt/conda/bin/conda update -n base -c defaults conda
RUN /opt/conda/bin/conda env create -f /tmp/kubeflow-mnist/conda/environment.yml
RUN /opt/conda/bin/conda clean -afy
# Cleanup
RUN rm -rf /workspace/{nvidia,docker}-examples && rm -rf /usr/local/nvidia-examples && \
rm /tmp/kubeflow-mnist/conda/environment.yml
# switch to the conda environment
RUN echo "conda activate kubeflow-mnist" >> ~/.bashrc
ENV PATH /opt/conda/envs/kubeflow-mnist/bin:$PATH
RUN /opt/conda/bin/activate kubeflow-mnist
# make /bin/sh symlink to bash instead of dash:
RUN echo "dash dash/sh boolean false" | debconf-set-selections && \
DEBIAN_FRONTEND=noninteractive dpkg-reconfigure dash
# Set the new Allocator
ENV LD_PRELOAD /usr/lib/x86_64-linux-gnu/libtcmalloc.so.
關于Dockerfile值得關注的重要一點是Conda環境是否設定完成并準備就緒,要構建鏡像:
% docker build -t your-user-name/kubeflow-mnist . -f Dockerfile
% docker push your-user-name/kubeflow-mnist
那么,現在讓我們來創建第一個組件!
在pipeline.py中可以找到以下代碼片段,
Step1:Git Clone
在這一步中,我們將從遠程的Git代碼庫中執行一個git clone,特別是,我想要向你展示如何從私有倉庫中進行git clone,因為這是大多數企業的專案所在的位置,當然,這也是一個很好的機會來演示Rancher中一個很棒的功能,它能簡單地添加諸如SSH密鑰之類的密鑰,
使用Rancher添加密鑰
訪問Rancher界面,在左上角,選擇local,然后選擇二級選單的Default:

然后,選擇Resources下的Secrets

你應該看到一個密鑰的串列,它們正在被你剛剛選擇的集群所使用,點擊Add Secret:

使用你在下圖中所看到的值來填寫該頁面,如果kubeflow沒有在命名空間欄下展示出來,你可以通過選擇Add to a new namespace并且輸入kubeflow簡單地創建一個,
確保Scope僅是個命名空間,如果將Scope設定為所有命名空間,那么將使得在Default專案中的任意作業負載都能夠使用你的ssh密鑰,
在Secret Values中,key是id_rsa,值是id_rsa的內容,完成之后,點擊Save,

如果一些進展順利,你將會看到下圖的內容,現在你已經成功地在kubeflow命名空間中添加了你的SSH密鑰,并且無需使用kubectl!

既然我們已經添加了我們的SSH key,那么是時候回到代碼,我們如何利用新添加的SSH密鑰來訪問私有git倉庫?
def git_clone_darkrai_op(repo_url: str):
volume_op = dsl.VolumeOp(
name="create pipeline volume",
resource_name="pipeline-pvc",
modes=["ReadWriteOnce"],
size="3Gi"
)
image = 'alpine/git:latest'
commands = [
"mkdir ~/.ssh",
"cp /etc/ssh-key/id_rsa ~/.ssh/id_rsa",
"chmod 600 ~/.ssh/id_rsa",
"ssh-keyscan bitbucket.org >> ~/.ssh/known_hosts",
f"git clone {repo_url} {PROJECT_ROOT}",
f"cd {PROJECT_ROOT}"]
op = dsl.ContainerOp(
name='git clone',
image=image,
command=['sh'],
arguments=['-c', ' && '.join(commands)],
container_kwargs={'image_pull_policy': 'IfNotPresent'},
pvolumes={"/workspace": volume_op.volume}
)
# Mount Git Secrets
op.add_volume(V1Volume(name='ssh-key-volume',
secret=V1SecretVolumeSource(secret_name='ssh-key-secret')))
op.add_volume_mount(V1VolumeMount(mount_path='/etc/ssh-key', name='ssh-key-volume', read_only=True))
return op
首先,創建一個Kubernetes volume,預定義大小為3Gi,其次,將image變數指定為我們將要使用的alpine/git Docker鏡像,之后是在Docker容器中執行的命令串列,這些命令實質上是設定SSH密鑰的,以便于流水線可以從私有倉庫git clone,或者使用git://URL來代替 https://,
該函式的核心是下面一行,回傳一個dsl.ContainerOp,
command和arguments指定了執行鏡像之后需要執行的命令,
最后一個變數十分有趣,是pvolumes,它是Pipeline Volumes簡稱,它創建一個Kubernetes volume并允許流水線組件來共享單個存盤,該volume被掛載在/workspace上,那么這個組件要做的就是把倉庫git clone到/workspace中,
使用Secrets
再次查看命令和復制SSH密鑰的位置,
流水線volume在哪里創建呢?當我們將所有組件都整合到一個流水線中時,就會看到創建好的volume,我們在/etc/ssh-key/上安裝secrets:
op.add_volume_mount(V1VolumeMount(mount_path='/etc/ssh-key', name='ssh-key-volume', read_only=True))
請記得我們將secret命名為ssh-key-secret:
op.add_volume(V1Volume(name='ssh-key-volume',
secret=V1SecretVolumeSource(secret_name='ssh-key-secret')))
通過使用相同的volume名稱ssh-key-volume,我們可以把一切系結在一起,
Step2:預處理
def preprocess_op(image: str, pvolume: PipelineVolume, data_dir: str):
return dsl.ContainerOp(
name='preprocessing',
image=image,
command=[CONDA_PYTHON_CMD, f"{PROJECT_ROOT}/preprocessing.py"],
arguments=["--data_dir", data_dir],
container_kwargs={'image_pull_policy': 'IfNotPresent'},
pvolumes={"/workspace": pvolume}
)
正如你所看到的, 預處理步驟看起來十分相似,
image指向我們在Step0中創建的Docker鏡像,
這里的command使用指定的conda python簡單地執行了preprocessing.py腳本,變數data_dir被用于執行preprocessing.py腳本,
在這一步驟中pvolume將在/workspace里有倉庫,這意味著我們所有的腳本在這一階段都是可用的,并且在這一步中預處理資料會存盤在/workspace下的data_dir中,
Step3:訓練和評估
def train_and_eval_op(image: str, pvolume: PipelineVolume, data_dir: str, ):
return dsl.ContainerOp(
name='training and evaluation',
image=image,
command=[CONDA_PYTHON_CMD, f"{PROJECT_ROOT}/train.py"],
arguments=["--data_dir", data_dir],
file_outputs={'output': f'{PROJECT_ROOT}/output.txt'},
container_kwargs={'image_pull_policy': 'IfNotPresent'},
pvolumes={"/workspace": pvolume}
)
最后,是時候進行訓練和評估這一步驟,這一步唯一的區別在于file_outputs變數,如果我們再次查看train.py,則有以下代碼段:
with open(os.path.join(PROJECT_ROOT, 'output.txt'), 'w') as f:
f.write(model_path)
print(f'Model written to: {model_path}')
我們正在將模型路徑寫入名為output.txt的文本檔案中,通常,可以將其發送到下一個流水線組件,在這種情況下,該引數將包含模型的路徑,
將一切放在一起
要指定流水線,你需要使用dsl.pipeline來注釋流水線功能:
@dsl.pipeline(
name='Fashion MNIST Training Pipeline',
description='Fashion MNIST Training Pipeline to be executed on KubeFlow.'
)
def training_pipeline(image: str = 'benjamintanweihao/kubeflow-mnist',
repo_url: str = 'https://github.com/benjamintanweihao/kubeflow-mnist.git',
data_dir: str = '/workspace'):
git_clone = git_clone_darkrai_op(repo_url=repo_url)
preprocess_data = https://www.cnblogs.com/rancherlabs/p/preprocess_op(image=image,
pvolume=git_clone.pvolume,
data_dir=data_dir)
_training_and_eval = train_and_eval_op(image=image,
pvolume=preprocess_data.pvolume,
data_dir=data_dir)
if __name__ =='__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(training_pipeline, __file__ + '.tar.gz')
還記得流水線組件的輸出是另一個組件的輸入嗎?在這里,git clone、container_op的pvolume將傳遞到preprocess_cp,
最后一部分將pipeline.py轉換為可執行腳本,最后一步是編譯流水線:
% dsl-compile --py pipeline.py --output pipeline.tar.gz
上傳并執行流水線
現在要進行最有趣的部分啦!第一步,上傳流水線,點擊Upload a pipeline:
接下來,填寫Pipeline Name和Pipeline Description,然后選擇Choose file并且指向pipeline.tar.gz以上傳流水線,
下一頁將會展示完整的流水線,我們所看到的是一個流水線的有向無環圖,在本例中這意味著依賴項會通往一個方向并且它不包含回圈,點擊藍色按鈕Create run 以開始訓練,
大部分欄位已經已經填寫完畢,請注意,Run parameters與使用@ dsl.pipeline注釋的training_pipeline函式中指定的引數相同:
最后,當你點擊藍色的Start按鈕時,整個流水線就開始運轉了!你點擊每個組件并查看日志就能夠知道發生了什么,當整個流水線執行完畢時,在所有組件的右方會有一個綠色的確認標志,如下所示:
結論
如果你從上一篇文章開始就一直在關注,那么你應該已經安裝了Kubeflow,并且應該能體會到大規模管理機器學習專案的復雜性,
在這篇文章中,我們先介紹了為Kubeflow準備一個機器學習專案的程序,然后是構建一個Kubeflow流水線,最后是使用Kubeflow介面上傳并執行流水線,這種方法的奇妙之處在于,你的機器學習專案可以是簡單的,也可以是復雜的,只要你愿意,你就可以使用相同的技術,
因為Kubeflow使用Docker容器作為組件,你可以自由地加入任何你喜歡的工具,而且由于Kubeflow運行在Kubernetes上,你可以讓Kubernetes處理機器學習作業負載的調度,
我們還了解了一個我喜歡的Rancher功能,它十分方便,可以輕松添加secrets,立刻,你就可以輕松地組織secrets(如SSH密鑰),并選擇將其分配到哪個命名空間,而無需為Base64編碼而煩惱,就像Rancher的應用商店一樣,這些便利性使Kubernetes的作業更加愉快,更不容易出錯,
當然,Rancher提供的服務遠不止這些,我鼓勵你自己去做一些探索,我相信你會偶然發現一些讓你大吃一驚的功能,Rancher作為一個開源的企業級Kubernetes管理平臺,Run Kubernetes Everywhere一直是我們的愿景和宗旨,開源和無廠商鎖定的特性,可以讓用戶輕松地在不同的基礎設施部署和使用Rancher,此外,Rancher極簡的操作體驗也可以讓用戶在不同的場景中利用Rancher提升效率,幫助開發人員專注于創新,而無需在繁瑣的小事中浪費精力,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/3912.html
標籤:其他
