參考鏈接 :
http://esprog.hatenablog.com/entry/2018/05/19/150313
https://blogs.unity3d.com/2018/10/22/what-is-a-job-system/
Job系統作為一個多執行緒系統, 它因為跟ECS有天生的融合關系所以比較重要的樣子, 我也按照使用型別的分類來看看Job System到底怎么樣.
Job說實話就是一套封裝的多執行緒系統, 我相信所有開發人員都能自己封裝一套, 所以Unity推出這個的時候跟著ECS一起推出, 因為單獨推出來的話肯定推不動, 多執行緒, 執行緒安全, 執行緒鎖, 執行緒共享資源, 這些都沒什么區別, 我從一個簡單串列的功能來說吧.
先來一個普通的多執行緒 :
using System.Collections; using System.Collections.Generic; using UnityEngine; using System; using System.Threading; public class NormalListAccessTest01 : MonoBehaviour { public class RunData { public List<int> datas = new List<int>(); public float speed; public float deltaTime; } public static void RunOnThread<T>(System.Action<T> call, T obj, System.Action endCall = null) { System.Threading.ThreadPool.QueueUserWorkItem((_obj) => { call.Invoke(obj); if(endCall != null) { ThreadMaster.Instance.CallFromMainThread(endCall); } }); } private void OnGUI() { if(GUI.Button(new Rect(100, 100, 100, 50), "Run Test")) { ThreadMaster.GetOrCreate(); var data = https://www.cnblogs.com/tiancaiwrk/p/new RunData(); data.deltaTime = Time.deltaTime; data.speed = 100.0f; for(int i = 0; i < 10000; i++) { data.datas.Add(i); } RunOnThread<RunData>((_data) => { // 這是在作業執行緒里 Debug.Log("Start At : " + System.DateTime.Now.ToString("HH:mm:ss fff")); var move = _data.deltaTime * _data.speed; for(int i = 0; i < _data.datas.Count; i++) { var val = _data.datas[i] + 1; _data.datas[i] = val; } }, data, () => { // 這是在主執行緒里 Debug.Log(data.datas[0]); Debug.Log("End At : " + System.DateTime.Now.ToString("HH:mm:ss fff")); }); } } }
執行緒轉換的一個簡單封裝ThreadMaster :
using System.Collections; using System.Collections.Generic; using UnityEngine; public class ThreadMaster : MonoBehaviour { private static ThreadMaster _instance; public static ThreadMaster Instance { get { return GetOrCreate(); } } private volatile List<System.Action> _calls = new List<System.Action>(); public static ThreadMaster GetOrCreate() { if(_instance == false) { _instance = new GameObject("ThreadMaster").AddComponent<ThreadMaster>(); } return _instance; } public void CallFromMainThread(System.Action call) { _calls.Add(call); } void Update() { if(_calls.Count > 0) { for(int i = 0; i < _calls.Count; i++) { var call = _calls[i]; call.Invoke(); } _calls.Clear(); } } }
沒有加什么鎖, 簡單運行沒有問題, 下面來個Job的跑一下:
using UnityEngine; using Unity.Collections; using Unity.Jobs; public class JobSystemSample00 : MonoBehaviour { struct VelocityJob : IJob { public NativeArray<int> datas; public void Execute() { for(var i = 0; i < datas.Length; i++) { datas[i] = datas[i] + 1; } } } public void Test() { var datas = new NativeArray<int>(100, Allocator.Persistent); var job = new VelocityJob() { datas = datas }; JobHandle jobHandle = job.Schedule(); JobHandle.ScheduleBatchedJobs(); //Debug.Log(datas[0]); // Error : You must call JobHandle.Complete() jobHandle.Complete(); Debug.Log(datas[0]); datas.Dispose(); } private void OnGUI() { if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test")) { Test(); } } }
這里就有一個大問題了, 在有注釋的地方 // Error : You must call JobHandle.Complete(), 是說在Job沒有呼叫Complete()時, 去獲取相關陣列內容是非法的! 而這個jobHandle.Complete(); 無法通過作業執行緒去呼叫, 也就是說Job的運行它是無法自行結束的, 無法發出運行結束的通知的, 對比上面封裝的普通多執行緒弱爆了. 而這個Complete()函式如果在作業執行緒執行完成前呼叫, 會強制立即執行(檔案也是寫 Wait for the job to complete), 也就是說它只能在主執行緒呼叫并且會阻塞主執行緒, 這樣就可以定性了, 它的Job System不是為了提供一般使用的多執行緒封裝給我們用的, 可是它又是很強大的, 因為它能使用高效的記憶體結構, 能保證資料訪問安全, 能在需要的時候呼叫Complete方法強制等待作業執行緒執行完畢(如果沒猜錯的話, 引擎對這個做了很大優化, 并不是簡單等待), 還有BurstCompile等, 如果我們封裝成功了的話, 就是很好的多執行緒庫了.
PS : 打個比方一個mesh的渲染, 在渲染之前必須計算完所有坐標轉換, Job的好處就是可以進行多執行緒并行的計算, 然后還能被主執行緒強制執行完畢, 比在主執行緒中單獨計算強多了. 而這個強制執行才是核心邏輯.
經過幾次測驗, 幾乎沒有辦法簡單擴展Job系統來讓它成為像上面一樣擁有自動完成通知的系統, 如下 :
1. 添加JobHandle變數到IJob中, 在Execute結束時呼叫
struct VelocityJob : IJob { public NativeArray<int> datas; [Unity.Collections.LowLevel.Unsafe.NativeDisableUnsafePtrRestriction] public JobHandle selfHandle; // 是這個IJob呼叫Schedule的句柄 public void Execute() { for(var i = 0; i < datas.Length; i++) { datas[i] = datas[i] + 1; } selfHandle.Complete(); } }
報錯, InvalidOperationException: VelocityJob.selfHandle.jobGroup uses unsafe Pointers which is not allowed. 無法解決, 直接就無法在IJob結構體中添加JobHandle變數. 并且無法在作業執行緒中呼叫Complete方法.
2. 添加回呼函式進去
struct VelocityJob : IJob { public NativeArray<int> datas; public System.Action endCall; public void Execute() { for(var i = 0; i < datas.Length; i++) { datas[i] = datas[i] + 1; } if(endCall != null) { endCall.Invoke(); } } }
報錯, Job系統的struct里面只能存在值型別的變數 !!-_-
3. 使用全域的參考以及執行緒轉換邏輯來做成自動回呼的形式, 雖然可以使用了可是非常浪費資源 :
using UnityEngine; using Unity.Collections; using Unity.Jobs; using System.Collections.Generic; public class JobSystemSample01 : MonoBehaviour { private static int _id = 0; public static int NewID => _id++; public static Dictionary<int, IJobCall> ms_handleRef = new Dictionary<int, IJobCall>(); public class IJobCall { public JobHandle jobHandle; public System.Action endCall; } struct VelocityJob : IJob { public NativeArray<int> datas; public int refID; public void Execute() { for(var i = 0; i < datas.Length; i++) { datas[i] = datas[i] + 1; } var handle = ms_handleRef[refID]; ThreadMaster.Instance.CallFromMainThread(() => { handle.jobHandle.Complete(); if(handle.endCall != null) { handle.endCall.Invoke(); } }); } } public void Test() { ThreadMaster.GetOrCreate(); var datas = new NativeArray<int>(100, Allocator.Persistent); int id = NewID; var job = new VelocityJob() { refID = id, datas = datas }; ms_handleRef[id] = new IJobCall() { jobHandle = job.Schedule(), endCall = () => { Debug.Log(datas[0]); datas.Dispose(); } }; } private void OnGUI() { if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test")) { Test(); } } }
通過上面封裝就可以作為一般多執行緒使用了, 并且我們獲得了引擎提供的資料安全和高效邏輯性, 再加上利用BurstCpmpile和只讀屬性, 能夠提升一些計算效率吧. ECS on Job已經在另外一篇中說過了, 這里忽略了.
----------------------------------------------
當我測驗到IJobParallelFor的時候, 發現并行并不像GPU那樣的并行那么美好, 因為GPU它本身就是全并行的, 像卷積之類的, 它跟像素的處理順序本身就沒有關系, 可是我們的邏輯有些會受順序的影響. 先看看下面的代碼 :
using UnityEngine; using Unity.Collections; using Unity.Jobs; public class IJobParallelForSample01 : MonoBehaviour { struct VelocityJob : IJobParallelFor { public NativeArray<int> datas; public void Execute(int index) { if(index == 0) { index = datas.Length - 1; } datas[index] = datas[index - 1] + 1; } } public void Test() { var datas = new NativeArray<int>(100, Allocator.Persistent); for(int i = 0; i < datas.Length; i++) { datas[i] = i; } var job = new VelocityJob() { datas = datas }; var jobHandle = job.Schedule(datas.Length, 20); JobHandle.ScheduleBatchedJobs(); jobHandle.Complete(); Debug.Log(datas[0]); datas.Dispose(); } private void OnGUI() { if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test")) { Test(); } } }
主要的是Schedule的方法上 : public static JobHandle Schedule<T>(this T jobData, int arrayLength, int innerloopBatchCount, JobHandle dependsOn = default) where T : struct, IJobParallelFor;
第二個引數innerloopBatchCount表示的是分塊的大小, 比如我們陣列長度是100, 每20個元素分成一塊, 一共可以分5塊, 如果你的CPU核心數大于等于5它就能開5個執行緒來處理, 可是你不能去獲取這個塊之外的Index的資料:

顯然這里資料每20個一組被分為了5組, 在5個執行緒里, 然后跨組獲取資料就報錯了.
測驗一下執行緒數是否5個 :
struct VelocityJob : IJobParallelFor { public NativeArray<int> datas; public void Execute(int index) { throw new System.Exception(index + " ERROR"); } }

5個執行緒報錯, 應該每個執行緒內的處理也是按照for的順序來的.
把每個塊改成5的大小, 看看它能開幾個執行緒:
var jobHandle = job.Schedule(datas.Length, 5);

恩開了8個, 我的機器確實是8核的, 不過它的分塊不是我想的0-5-10-15, 或者0-12-24-36 而是整10的, 不知道為什么, 因為按照我設定每個分組是5, 而整體平均100/8=12.5而不應該是整10的, 具體不詳.
如果我們要跟其它元素進行互動, 就只能把處理單元設定到跟陣列一樣大, 才能在一個塊中處理:
using UnityEngine; using Unity.Collections; using Unity.Jobs; public class IJobParallelForSample01 : MonoBehaviour { struct VelocityJob : IJobParallelFor { public NativeArray<int> datas; public void Execute(int index) { if(index > 0 && index < datas.Length - 1) { datas[index] = datas[datas.Length - 1]; } } } public void Test() { var datas = new NativeArray<int>(10, Allocator.Persistent); for(int i = 0; i < datas.Length; i++) { datas[i] = i; } var job = new VelocityJob() { datas = datas }; var jobHandle = job.Schedule(datas.Length, datas.Length); JobHandle.ScheduleBatchedJobs(); jobHandle.Complete(); Debug .Log(datas[0]); datas.Dispose(); } private void OnGUI() { if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test")) { Test(); } } }

順便測驗一下各個執行緒的分配情況:
private volatile static Dictionary<int, List<int>> ms_threads = new Dictionary<int, List<int>>(); struct VelocityJob : IJobParallelFor { public NativeArray<int> datas; public void Execute(int index) { Debug.Log(index + " : " + System.Threading.Thread.CurrentThread.ManagedThreadId); lock(ms_threads) { List<int> val = null; ms_threads.TryGetValue(System.Threading.Thread.CurrentThread.ManagedThreadId, out val); if(val == null) { val = new List<int>(); ms_threads[System.Threading.Thread.CurrentThread.ManagedThreadId] = val; } val.Add(index); } } }
var jobHandle = job.Schedule(100, 5);
結果是分為8個執行緒, 4個執行緒的塊為10, 4個為15


所以不能想當然的去獲取其它Index的內容, 畢竟分塊邏輯不一定.
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/30980.html
標籤:其他
上一篇:肥豬流碼農的逆襲之路(1)
下一篇:計算機網路
