主頁 > 後端開發 > ray-分布式計算框架-集群與異步Job管理

ray-分布式計算框架-集群與異步Job管理

2023-04-26 07:28:44 後端開發

0. ray 簡介

ray是開源分布式計算框架,為并行處理提供計算層,用于擴展AI與Python應用程式,是ML作業負載統一工具包
img

  • Ray AI Runtime

ML應用程式庫集

  • Ray Core

通用分布式計算庫

  • Task -- Ray允許任意Python函式在單獨的Python worker上運行,這些異步Python函式稱為任務
  • Actor -- 從函式擴展到類,是一個有狀態的作業者,當一個Actor被創建,一個新的worker被創建,并且actor的方法被安排到那個特定的worker上,并且可以訪問和修改那個worker的狀態
  • Object -- Task與Actor在物件上創建與計算,被稱為遠程物件,被存盤在ray的分布式共享記憶體物件存盤上,通過物件參考來參考遠程物件,集群中每個節點都有一個物件存盤,遠程物件存盤在何處(一個或多個節點上)與遠程物件參考的持有者無關
  • Placement Groups -- 允許用戶跨多個節點原子性的保留資源組,以供后續Task與Actor使用
  • Environment Dependencies -- 當Ray在遠程機器上執行Task或Actor時,它們的依賴環境項(Python包、本地檔案、環境變數)必須可供代碼運行,解決環境依賴的方式有兩種,一種是在集群啟動前準備好對集群的依賴,另一種是在ray的運行時環境動態安裝
  • Ray cluster

一組連接到公共 Ray 頭節點的作業節點,通過 kubeRay operator管理運行在k8s上的ray集群

  • 關聯鏈接
    • Ray Doc: https://docs.ray.io/en/latest/ray-overview/index.html
    • Ray Github: https://ray-project.github.io/kuberay/deploy/helm-cluster/
    • Python raycluster 管理API: https://github.com/ray-project/kuberay/tree/master/clients/python-client
    • Ray Job Python SDK Doc: https://docs.ray.io/en/latest/cluster/running-applications/job-submission/jobs-package-ref.html#ray-job-submission-sdk-ref

1. ray 集群管理

ray版本:2.3.0

  • Kind 創建測驗k8s集群

1主3從集群

# 組態檔 -- 一主兩從(默認單主),檔案名:k8s-3nodes.yaml
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker

創建k8s集群

kind create cluster --config k8s-3nodes.yaml
  • KubeRay 部署ray集群
# helm方式安裝
# 添加Charts倉庫
helm repo add kuberay https://ray-project.github.io/kuberay-helm/

# 安裝default名稱空間
# 安裝 kubeRay operator
# 下載離線的chart包: helm pull kuberay/kuberay-operator --version 0.5.0
# 本地安裝: helm install kuberay-operator 
helm install kuberay-operator kuberay/kuberay-operator --version 0.5.0

# 創建ray示例集群,若通過sdk管理則跳過
# 下載離線的ray集群自定義資源:helm pull kuberay/ray-cluster  --version 0.5.0
helm install raycluster kuberay/ray-cluster --version 0.5.0

# 獲取ray集群對應的CR
kubectl get raycluster

# 查詢pod的狀態
kubectl get pods

# 轉發svc 8265埠到本地8265埠
kubectl port-forward --address 0.0.0.0 svc/raycluster-kuberay-head-svc 8265:8265

# 登錄ray head節點,并執行一個job
kubectl exec -it ${RAYCLUSTER_HEAD_POD} -- bash
python -c "import ray; ray.init(); print(ray.cluster_resources())" # (in Ray head Pod)

# 洗掉ray集群
helm uninstall raycluster

# 洗掉kubeRay
helm uninstall kuberay-operator

# 查詢helm管理的資源
helm ls --all-namespaces
  • Ray 集群管理

前置要求:

  1. 安裝 KubeRay
  2. 安裝 k8s sdk: pip install kubernetes
  3. 將python_client拷貝到PYTHONPATH路徑下或者直接安裝python_client, 該庫路徑為:https://github.com/ray-project/kuberay/tree/master/clients/python-client/python_client
from python_client import kuberay_cluster_api
from python_client.utils import kuberay_cluster_utils, kuberay_cluster_builder


def main():
    
    # ray集群管理的api 獲取集群串列、創建集群、更新集群、洗掉集群
    kuberay_api = kuberay_cluster_api.RayClusterApi()

    # CR 構建器,構建ray集群對應的字典格式的CR
    cr_builder = kuberay_cluster_builder.ClusterBuilder()

    # CR資源物件操作工具,更新cr資源
    cluster_utils = kuberay_cluster_utils.ClusterUtils()

    # 構建集群的CR,是一個字典物件,可以修改、洗掉、添加額外的屬性
    # 可以指定包含特定環境依賴的人ray鏡像
    cluster = (
        cr_builder.build_meta(name="new-cluster1", labels={"demo-cluster": "yes"}) # 輸入ray群名稱、名稱空間、資源標簽、ray版本資訊
        .build_head(cpu_requests="0", memory_requests="0")   # ray集群head資訊: ray鏡像名稱、對應service型別、cpu memory的requests與limits、ray head啟動引數
        .build_worker(group_name="workers", cpu_requests="0", memory_requests="0") # ray集群worker資訊: worker組名稱、 ray鏡像名稱、ray啟動命令、cpu memory的requests與limits、默認副本個數、最大與最小副本個數
        .get_cluster()
    )
    
    # 檢查CR是否構建成功
    if not cr_builder.succeeded:
        print("error building the cluster, aborting...")
        return

    # 創建ray集群
    kuberay_api.create_ray_cluster(body=cluster)

    # 更新ray集群CR中的worker副本集合
    cluster_to_patch, succeeded = cluster_utils.update_worker_group_replicas(
        cluster, group_name="workers", max_replicas=4, min_replicas=1, replicas=2
    )

    if succeeded:
        # 更新ray集群
        kuberay_api.patch_ray_cluster(
            name=cluster_to_patch["metadata"]["name"], ray_patch=cluster_to_patch
        )

    # 在原來的集群的CR中的作業組添加新的作業組
    cluster_to_patch, succeeded = cluster_utils.duplicate_worker_group(
        cluster, group_name="workers", new_group_name="duplicate-workers"
    )

    if succeeded:
        kuberay_api.patch_ray_cluster(
            name=cluster_to_patch["metadata"]["name"], ray_patch=cluster_to_patch
        )

    # 列出所有創建的集群
    kube_ray_list = kuberay_api.list_ray_clusters(k8s_namespace="default", label_selector='demo-cluster=yes')
    if "items" in kube_ray_list:
        for cluster in kube_ray_list["items"]:
            print(cluster["metadata"]["name"], cluster["metadata"]["namespace"])

    # 洗掉集群
    if "items" in kube_ray_list:
        for cluster in kube_ray_list["items"]:
            print("deleting raycluster = {}".format(cluster["metadata"]["name"]))
            
            # 通過指定名稱洗掉ray集群
            kuberay_api.delete_ray_cluster(
                name=cluster["metadata"]["name"],
                k8s_namespace=cluster["metadata"]["namespace"],
            )


if __name__ == "__main__":
    main()

2. ray Job 管理

前置: pip install -U "ray[default]"

  • 創建一個job任務
# 檔案名稱: test_job.py
# python 標準庫
import json
import ray
import sys

# 已經在ray節點安裝的庫
import redis

# 通過job提交時傳遞的模塊依賴 runtime_env 配置 py_modules,通過 py_nodules傳遞過來就可以直接在job中匯入
from test_module import test_1
import stk12

# 創建一個連接redeis物件,通過redis作為中轉向job傳遞輸入并獲取job的輸出
redis_cli = redis.Redis(host='192.168.6.205', port=6379,  decode_responses=True)

# 通過redis獲取傳入過來的引數
input_params_value = https://www.cnblogs.com/2bjiujiu/p/None
if len(sys.argv) > 1:
    input_params_key = sys.argv[1]
    input_params_value = json.loads(redis_cli.get(input_params_key))


# 執行遠程任務
@ray.remote
def hello_world(value):
    return [v + 100 for v in value]

ray.init()

# 輸出傳遞過來的引數
print("input_params_value:", input_params_value, type(input_params_key))

# 執行遠程函式
result = ray.get(hello_world.remote(input_params_value))

# 獲取輸出key
output_key = input_params_key.split(":")[0] + ":output"

# 將輸出結果放入redis
redis_cli.set(output_key, json.dumps(result))

# 測驗傳遞過來的Python依賴庫是否能正常匯入
print(test_1.test_1())
print(stk12.__dir__())
  • 創建測驗自定義模塊
# 模塊路徑: test_module/test_1.py
def test_1():
    return "test_1"
  • 創建一個job提交物件
import json

from ray.job_submission import JobSubmissionClient, JobStatus
import time
import uuid
import redis

# 上傳un到ray集群供job使用的模塊
import test_module
from agi import stk12

# 創建一個連接redeis物件
redis_cli = redis.Redis(host='192.168.6.205', port=6379,  decode_responses=True)

# 創建一個client,指定遠程ray集群的head地址
client = JobSubmissionClient("http://127.0.0.1:8265")

# 創建任務的ID
id = uuid.uuid4().hex
input_params_key = f"{id}:input"
input_params_value = https://www.cnblogs.com/2bjiujiu/p/[1, 2, 3, 4, 5]

# 將輸入引數存入redis,供遠程函式job使用
redis_cli.set(input_params_key, json.dumps(input_params_value))


# 提交一個ray job 是一個獨立的ray應用程式
job_id = client.submit_job(
    # 執行該job的入口腳本
    entrypoint=f"python test_job.py {input_params_key}",

    # 將本地檔案上傳到ray集群
    runtime_env={
        "working_dir": "./",
        "py_modules": [test_module, stk12],
        "env_vars": {"testenv": "test-1"}
    },

    # 自定義任務ID
    submission_id=f"{id}"
)

# 輸出job ID
print("job_id:", job_id)


def wait_until_status(job_id, status_to_wait_for, timeout_seconds=5):
    """輪詢獲取Job的狀態,當完成時獲取任務的的日志輸出"""
    start = time.time()
    while time.time() - start <= timeout_seconds:
        # 獲取任務的狀態
        status = client.get_job_status(job_id)
        print(f"status: {status}")

        # 檢查任務的狀態
        if status in status_to_wait_for:
            break
        time.sleep(1)


wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})

# 輸出job日志
logs = client.get_job_logs(job_id)
print(logs)

# 輸出從job中獲取的任務
output_key = job_id + ":output"
output_value = https://www.cnblogs.com/2bjiujiu/p/redis_cli.get(output_key)
print("output:", output_value)
  • job 管理
from ray.job_submission import JobSubmissionClient, JobDetails, JobInfo, JobType, JobStatus
# 創建一個job提交客戶端,如果管理多個ray集群的Job則切換或者創建多個連接ray head節點的客戶端
job_cli = JobSubmissionClient("http://127.0.0.1:8265")

# Job資訊,對應Job中submission_id屬性
job_id = "b9ad6ff9ada445a29fb54307f1394594"
job_info = job_cli.get_job_info(job_id)

# 獲取提交的所有job
jobs = job_cli.list_jobs()

for job in jobs:

    # 獲取job的狀態
    job_status = job_cli.get_job_status(job.submission_id)
    print(f"job_id: {job.submission_id}, job_status: {job_status}")

    # 輸出job的json格式詳情
    print("job:", job.json())

# 停止Job
job_cli.stop_job(job_id)

# 洗掉 job
# job_cli.delete_job(job_id)

# 提交 Job
# job_cli.submit_job()

# 獲取版本資訊
print("version:", job_cli.get_version())

3. 產品場景

  • 將周期、耗時任務異步化

鏡像檔案打包下載、檔案同步、運維腳本、資料匯出與同步、鏡像同步、服務啟停、TATC衛星專案中演算法任務的執行、批量同型別任務的計算(如衛星專案中衛星軌跡的計算)、備份任務

  • k8s中每個租戶可以創建與洗掉自己的ray集群實體,在線IDE中將計算型任務交給ray來執行,不消耗IED所在環境的計算資源

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/551163.html

標籤:Python

上一篇:【NLP教程】用python呼叫百度AI開放平臺進行情感傾向分析

下一篇:返回列表

標籤雲
其他(158071) Python(38104) JavaScript(25391) Java(18001) C(15217) 區塊鏈(8260) C#(7972) AI(7469) 爪哇(7425) MySQL(7144) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5870) 数组(5741) R(5409) Linux(5329) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4561) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2431) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1960) Web開發(1951) HtmlCss(1926) python-3.x(1918) 弹簧靴(1913) C++(1911) xml(1889) PostgreSQL(1874) .NETCore(1855) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • ray-分布式計算框架-集群與異步Job管理

    0. ray 簡介 ray是開源分布式計算框架,為并行處理提供計算層,用于擴展AI與Python應用程式,是ML作業負載統一工具包 Ray AI Runtime ML應用程式庫集 Ray Core 通用分布式計算庫 Task -- Ray允許任意Python函式在單獨的Python worker上運 ......

    uj5u.com 2023-04-26 07:28:44 more
  • 【NLP教程】用python呼叫百度AI開放平臺進行情感傾向分析

    一、背景 Hi,大家!我是 @馬哥python說 ,一名10年程式猿。 今天我來演示一下:通過百度AI開放平臺,利用python呼叫百度介面進行中文情感傾向分析,并得出情感極性分為積極、消極還是中性以及置信度結果。 二、操作步驟 首先,打開百度AI首頁:百度AI開放平臺-全球領先的人工智能服務平臺 ......

    uj5u.com 2023-04-26 07:28:22 more
  • Python中文分詞庫——jieba的用法

    1.介紹 jieba是優秀的中文分詞第三方庫。由于中文文本之間每個漢字都是連續書寫的,我們需要通過特定的手段來獲得其中的每個單詞,這種手段就叫分詞。而jieba是Python計算生態中非常優秀的中文分詞第三方庫,需要通過安裝來使用它。 jieba庫提供了三種分詞模式,但實際上要達到分詞效果只要掌握一 ......

    uj5u.com 2023-04-26 07:27:56 more
  • [Python自動化]使用Python Pexpect模塊實作自動化互動腳本使用心

    使用Python Pexpect模塊實作自動化互動腳本使用心得 參考檔案:https://pexpect.readthedocs.io/en/stable/ 前言 在最近的作業中,需要使用DockerFile構建鏡像。在構建鏡像的程序中,有一些執行的命令是需要互動的。例如安裝tzdata(apt i ......

    uj5u.com 2023-04-26 07:27:52 more
  • Django 如何使用 Celery 完成異步任務或定時任務

    以前版本的 Celery 需要一個單獨的庫(django-celery)才能與 Django 一起作業, 但從 Celery 3.1 開始,情況便不再如此,我們可以直接通過 Celery 庫來完成在 Django 中的任務。 安裝 Redis 服務端 以 Docker 安裝為例,安裝一個密碼為 my ......

    uj5u.com 2023-04-26 07:27:48 more
  • 簡述PHP中trait的使用和同時引入多個trait時同名方法沖突的處理

    PHP的類是單一繼承模式,也就是每個類只能繼承一個父類(基類)。 但有時需要引入更多通用(共用)的方法,同時這些方法又不適合集成到基類。 那么這時,就需要使用其他方法來引入這些方法。其中trait,就是方法之一。 trait是PHP5.4之后出現的一種代碼復用方法,形式和Class非常相似,同時可以 ......

    uj5u.com 2023-04-26 07:21:26 more
  • 這可能是最全面的MySQL面試八股文了

    什么是MySQL MySQL是一個關系型資料庫,它采用表的形式來存盤資料。你可以理解成是Excel表格,既然是表的形式存盤資料,就有表結構(行和列)。行代表每一行資料,列代表該行中的每個值。列上的值是有資料型別的,比如:整數、字串、日期等等。 資料庫的三大范式 第一范式1NF 確保資料庫表欄位的原 ......

    uj5u.com 2023-04-25 09:31:53 more
  • 【Jmeter】按比例分配Api壓測

    先看 【Jmeter】基礎介紹-詳細 【Jmeter】Request1輸出作為Request2輸入-后置處理器 繼續聊提出的第二個問題,即 2.需要按比例分配API請求并發,以模擬真實的API壓力場景 做壓測的時候,一般的需求都是多個API同時壓,不然也看不出真正的tps是多少啊。 比如雖然介面a的 ......

    uj5u.com 2023-04-25 09:30:35 more
  • 這可能是最全面的MySQL面試八股文了

    什么是MySQL MySQL是一個關系型資料庫,它采用表的形式來存盤資料。你可以理解成是Excel表格,既然是表的形式存盤資料,就有表結構(行和列)。行代表每一行資料,列代表該行中的每個值。列上的值是有資料型別的,比如:整數、字串、日期等等。 資料庫的三大范式 第一范式1NF 確保資料庫表欄位的原 ......

    uj5u.com 2023-04-25 09:29:15 more
  • 【Jmeter】按比例分配Api壓測

    先看 【Jmeter】基礎介紹-詳細 【Jmeter】Request1輸出作為Request2輸入-后置處理器 繼續聊提出的第二個問題,即 2.需要按比例分配API請求并發,以模擬真實的API壓力場景 做壓測的時候,一般的需求都是多個API同時壓,不然也看不出真正的tps是多少啊。 比如雖然介面a的 ......

    uj5u.com 2023-04-25 09:22:32 more