本文是我翻譯《JavaScript Concurrency》書籍的第七章 抽取并發邏輯,該書主要以Promises、Generator、Web workers等技術來講解JavaScript并發編程方面的實踐,
完整書籍翻譯地址:https://github.com/yzsunlei/javascript_concurrency_translation ,由于能力有限,肯定存在翻譯不清楚甚至翻譯錯誤的地方,歡迎朋友們提issue指出,感謝,
到本書這里,我們已經在代碼中明確地模擬了并發問題,使用promises,我們同步化了兩個或更多異步操作,使用生成器,我們按需創建資料,避免不必要的記憶體分配,最后,我們了解到Web worker是利用多個CPU內核的主要工具,
在本章中,我們將采用所有這些方法并將它們放入應用程式代碼的背景關系中,也就是說,如果并發是默認的,那么我們需要使并發盡可能不那么明顯,我們將首先探索各種技術,這些技術將幫助我們在使用的組件中封裝并發機制,然后,我們將通過使用promises來幫助worker通信,直接改進前兩章的代碼,
一旦我們能夠使用promises抽象worker通信,我們將嘗試在生成器的幫助下實作惰性的worker,我們還將使用Parallel.js庫來介紹worker抽象,然后是worker執行緒池的概念,
撰寫并發代碼
并發編程很難做到,即使是人為的示例應用程式,大部分復雜性來自并發代碼,我們顯然希望我們的代碼可讀性好,同時保持并發的好處,我們希望充分利用系統上的每個CPU,我們只想在需要時計算我們需要的東西,我們不希望意大利面條式的代碼將多個異步操作混在一起,在開發應用程式的同時關注并發編程的所有這些方面會削弱我們應該關注的內容 - 提供應用程式有價值的功能,
在本節中,我們將介紹可能用于將我們的應用程式的其余部分與棘手的并發隔離的方法,這通常意味著將并發作為默認模式 - 即使在引擎下沒有發生真正的并發時也是如此,最后,我們不希望我們的代碼包含90%的并發處理技巧,而只有10%的功能,
隱藏并發機制
在我們所有的代碼中暴露并發機制的難度是,他們每一個都稍微不同于另一個,這擴大了我們可能已經發現所在的回呼地獄的情況,例如,不是所有的并發操作都是從一些遠程資源獲取資料的網路請求,異步資料可能來自一個worker或一些本身就是異步的回呼,想象一下場景我們使用了三個不同的資料源來計算一個我們需要的值,所有的這些都是異步的,這里是這個問題的示圖:
此圖中的資料是我們在應用程式代碼中關注的內容,從我們正在構建的功能的角度來看,我們并不關心上述任何事情,因此,我們的前端架構需要封裝與并發相關的復雜性,這意味著我們的每個組件都應該能夠以相同的方式訪問資料,除了我們所有的異步資料源之外,還有另一個要考慮的復雜因素 - 當資料不是異步的并且來自本地資料源呢?那么同步本地資料源和HTTP請求呢?我們將在下一節中介紹這些,
沒有并發性
僅僅因為我們正在撰寫并發JavaScript應用程式,并非每個操作本身都是并發的,例如,如果一個組件向另一個組件詢問它已經在記憶體中的資料,則它不是異步操作并會立即回傳,我們的應用程式可能到處都是這些操作,其中并發性根本就沒有意義,其中存在的挑戰 - 我們如何將異步操作與同步操作無縫混合?
簡單的答案是我們在每處做出并發的默認假設,promise使這個問題易于處理,以下是使用promise來封裝異步和同步操作的示圖說明:
這看起來很像前面的那個圖,但有兩個重要區別,我們添加了一個synchronous()操作; 這沒有回呼函式,因為它不需要回呼函式,它不是在等待其他任何東西,所以它會直接地回傳,其他兩個函式就像在上圖中一樣;兩者都依賴回呼函式將資料提供給我們的應用程式,第二個重要區別是有一個promise物件,這取代了sync()操作和資料概念,或者更確切地說,它將它們融合到同一個概念中,
這是promise的關鍵作用 - 它們為我們抽象同步問題提供能力,這不僅適用于網路請求,還適用于Web worker訊息或依賴于回呼的任何其他異步操作,它需要一些調整來考慮下我們的資料,因為我們得保證它最侄訓到達這里,但是,一旦我們消除了這種心理差距,默認情況下就會啟用并發,就我們的功能而言,并發是默認的,而我們在操作背后所做的事情并不是最具破壞性的,
現在讓我們看一些代碼,我們將創建兩個函式:一個是異步的,另一個是簡單回傳值的普通函式,這里的目標是使運行這些函式的代碼相同,盡管生成值的方式有很大不同:
//一個異步“fetch”函式,我們使用“setTimeout()”
//在1秒后通過“callback()”回傳一些資料,
function fetchAsync(callback) {
setTimeout(() => {
callback({hello: 'world'});
}, 1000);
}
//同步操作只簡單地回傳資料,
function fetchSync() {
return {hello: 'world'};
}
//對“fetchAsync()”呼叫的promise,
//我們通過了“resolve”函式作為回呼,
var asyncPromise = new Promise((resolve, reject) => {
fetchAsync(resolve);
});
//對“fetchSync()”呼叫的promise,
//這個promise立即完成使用回傳值,
var syncPromise = new Promise((resolve, reject) => {
resolve(fetchSync());
});
//創建一個等待兩個promise完成的promise,
//這讓我們無縫混合同步值和異步值,
Promise.all([
asyncPromise,
syncPromise
]).then((results) => {
var [asyncResult, syncResult] = results;
console.log('async', asyncResult);
//→async {hello: 'world'}
});
console.log('sync', syncResult);
//→sync {hello:'world'}
在這里權衡的是增加promise的復雜性,包裹它們而不是讓簡單的回傳值函式馬上回傳,但在現實中,封裝promise的復雜性中,如果我們不是寫一個并發應用,我們顯然需要關心這類問題本身,這些的好處是巨大的,當一切都是promise的值時,我們可以安全地排除令人討厭的導致不一致的并發錯誤,
worker與promise通信
我們現在已經知道了為什么將原始值視為promise有益于我們的代碼,是時候將這個概念應用于web workers了,在前兩章中,我們的代碼同步來自Web worker的回應看起來有點棘手,這是因為我們基本上試圖模仿許多promise善于處理的樣板作業,我們首先嘗試通過創建輔助函式來解決這些問題,這些函式為我們包裝worker通信,回傳promise,然后我們將嘗試另一種涉及在較低級別擴展Web worker的方法,最后,我們將介紹一些涉及多個worker的更復雜的同步方案,例如上一章中的那些worker方案,
輔助函式
如果我們能夠以promise解決的形式獲得Web worker回應,那將是理想的,但是,我們需要首先創造promise - 我們該怎么做這個?好吧,我們可以手動創建promise,其中發送給worker的訊息是從promise executor函式中發送的,但是,如果我們采用這種方法,我們就不會比引入promise之前好多少了,
技巧是在單個輔助函式中封裝發布到worker的訊息和從worker接收的任何訊息,如下所示:
我們來看一個實作這種模式的輔助函式示例,首先,我們需要一個執行某項任務的worker - 我們將從這開始:
//吃掉一些CPU回圈...
//源自http://adambom.github.io/parallel.js/
function work(n) {
var i = 0;
while (++i < n * n) {}
return i;
}
//當我們收到訊息時,我們會發布一條訊息id,
//以及在“number”上執行“work()”的結果,
addEventListener('message', (e) => {
postMessage({
id: e.data.id,
result: work(e.data.number)
});
});
在這里,我們有一個worker,它會對我們傳遞的任何數字進行平方,這個work()函式特意很慢,以便我們可以看到我們的應用程式作為一個整體在Web worker花費比平時更長的時間來完成任務時的表現,它還使用我們在之前的Web worker示例中看到的id,因此它可以與發送訊息的代碼協調,讓我們現在實作使用此worker的輔助函式:
//這將生成唯一ID,
//我們需要它們將Web worker執行的任務
//映射到更大的創建它們的操作,
function* genID() {
var id = 0;
while (true) {
yield id++;
}
}
//創建全域“id”生成器,
var id = genID();
//這個物件包含promises的決議器函式,
//當結果從worker那里回傳時,我們通過id在這里查看,
var resolvers = {};
//開始我們的worker...
var worker = new Worker('worker.js');
worker.addEventListener('message', (e) => {
//找到合適的決議器函式,
var resolver = resolvers[e.data.id];
//從“resolvers”物件中洗掉它,
delete resolvers[e.data.id];
//通過呼叫決議器函式將worker資料傳遞給promise,
resolver(e.data.result);
});
//這是我們的輔助函式,
//它處理向worker發送訊息,
//并將promise系結到worker的回應,
function square(number) {
return new Promise((resolve, reject) => {
//用于將Web worker回應和決議器函式系結在一起的id,
var msgId = id.next().value;
//存盤決議器以便以后在Web worker訊息回呼中可以使用,
resolvers[msgId] = resolve;
//發布訊息 - id和number引數
worker.postMessage({
id: msgId,
number: number
});
});
}
square(10).then((result) => {
console.log('square(10)', result);
//→square(10) 100
});
square(100).then((result) => {
console.log('square(100)', result);
//→square(100) 10000
});
square(1000).then((result) => {
console.log('square(1000)', result);
//→square(1000) 1000000
});
如果我們關注square()函式的使用方式,傳遞一個數字引數并將一個promise作為回傳值,我們可以看到這符合我們之前關于默認情況下使代碼并發的討論,例如,我們可以從這個場景中完全洗掉worker,只需更改輔助函式決議它回傳的promise的方式,我們的其余代碼將繼續保持不變,
輔助函式策略只是一種使用promises簡化worker通信的方法,也許我們可以決定我們不一定要維護一堆輔助函式,接下來,我們將看一個比輔助函式更細粒度的方法,
擴展postMessage()
我們可以采用更通用的方法,而不是積聚大量輔助功能,輔助函式本身沒有什么問題;他們是直接而且重要的,如果我們達到了數百個這樣的函式,它們的作用就會開始大打折扣了,更通用的方法是繼續使用worker.postMessage(),
所以讓我們看看是否可以使這個方法回傳一個promise,就像我們上一節中的helper函式一樣,這樣,我們繼續使用細粒度postMessage()方法,但改進了我們的同步語意,首先,看看這里的worker代碼:
addEventListener('message', (e) => {
//我們將發回主執行緒的結果,
//它應該始終包含訊息id,
var result = {id: e.data.id};
//基于“action”,計算回應值“value”,
//選項是單獨保留文本,
//將其轉換為大寫,或將其轉換為小寫,
if (e.data.action === 'echo') {
result.value = https://www.cnblogs.com/yzsunlei/p/e.data.value;
} else if (e.data.action ==='upper') {
result.value = https://www.cnblogs.com/yzsunlei/p/e.data.value.toUpperCase();
} else if (e.data.action ==='lower') {
result.value = https://www.cnblogs.com/yzsunlei/p/e.data.value.toLowerCase();
}
});
//通過等待延時模擬一個運行時間很長的worker,
//它在1秒后回傳結果,
setTimeout(() => {
postMessage(result);
}, 1000);
這與我們迄今為止在Web worker代碼中看到的完全不同,現在,在主執行緒中,我們必須弄清楚如何改變Worker的介面,我們現在就這樣做,然后,我們將嘗試向此worker發布一些訊息并將處理promises作為回應:
//這個物件包含promises的決議器函式,
//當結果從worker那里回傳時,我們通過id在這里查看,
var resolvers = {};
//保持“postMessage()”的原始實作,
//所以我們可以稍后在我們的自定義“postMessage()”中呼叫它,
var postMessage = Worker.prototype.postMessage;
//用我們的自定義實作替換“postMessage()”,
Worker.prototype.postMessage = function(data) {
return new Promise((resolve, reject) => {
//用于將Web worker回應和決議器函式系結在一起的id,
var msgId = id.next().value;
//存盤決議器以便以后可以在Web worker訊息回呼使用,
resolvers[msgId] = resolve;
//運行原始的“Worker.postMessage()”實作,
//實際上負責將訊息發布到worker執行緒,
postMessage.call(this, Object.assign({
id: msgId
}, data));
});
};
//開始我們的worker...
var worker = new Worker('worker.js');
worker.addEventListener('message', (e) => {
//找到合適的決議器函式,
var resolver = resolvers[e.data.id];
//從“resolvers”物件中洗掉它,
delete resolvers[e.data.id];
//通過呼叫決議器函式將worker資料傳遞給promise,
resolver(e.data.value);
});
worker.postMessage({
action: 'echo',
value: 'Hello World'
}).then((value) => {
console.log('echo', `"${value}"`);
//→echo “Hello World”
});
worker.postMessage({
action: 'upper',
value: 'Hello World'
}).then((value) => {
console.log('upper', `"${value}"`);
//→upper “HELLO WORLD”
});
worker.postMessage({
action: 'lower',
value: 'Hello World'
}).then((value) => {
console.log('lower',`"${value}"`);
//→lower “hello world”
});
嗯,這正是我們需要的,對吧?我們可以直接將訊息資料發布給worker,并通過promise決議將回應資料發送給我們,作為一個額外的好處,如果我們如此傾向,我們實際上可以圍繞這個新的postMessage()函式實作包裝輔助函式,主要參與完成這項作業的技巧是存盤對原始postMessage()的參考,然后,我們覆寫web worker屬性postMessage,而不是函式本身,最后,我們可以復用它并添加必要的協調來保證好用,
同步worker結果
該代碼在最后2段已經充分降低了web workers回呼地獄到可接受的水平,在事實上,現在我們已經有了一個方法處理如何封裝web workers通信由具有的postMessage()回傳一個promise,我們準備要開始簡化一些未使用這種方法的混亂的worker代碼,我們已經了解了這些例子的,所以到目前為止,已經從promise中獲益良多,他們是簡單的; 沒有這些抽象不會是世界末日,
那么我們映射資料集合然后映射和迭代集合的場景呢?我們可以回顧map/reduce代碼在“第6章,實用的并行”,這主要是由于所有worker通信模板代碼與嘗試執行map/reduce操作的代碼混合在一起,讓我們看看使用promise技術是否更好,首先,我們將創建一個非常基本的worker:
//回傳一個輸入陣列的映射,
//它通過平方陣列中的每個數字,
addEventListener('message', (e) => {
postMessage({
id: e.data.id,
value: e.data.value.map(v => v * v)
});
});
我們可以使用此worker傳遞陣列進行映射,因此,我們將創建其中兩個并在兩個worker之間拆分作業負載,如下所示:
function onMessage(e) {
//找到合適的決議器函式,
var resolver = resolvers[e.data.id];
//從“resolvers”物件中洗掉它,
delete resolvers[e.data.id];
//通過呼叫決議器函式將worker資料傳遞給promise
resolver(e.data.value);
}
//開始我們的worker...
var worker1 = new Worker('worker.js'),
worker2 = new Worker('worker.js');
//創建一些要處理的資料,
var array = new Array(50000).fill(null).map((v, i) => i);
//當worker回傳資料時,找到適當的決議器函式來呼叫,
worker1.addEventListener('message', onMessage);
worker2.addEventListener('message', onMessage);
//將輸入資料拆分為2,給出前半部分到第一個worker,
//給出后一部分到第二個worker,在這一點上,我們有兩個promises,
var promise1 = worker1.postMessage({
value: array.slice(0, Math.floor(array.length / 2))
});
var promise2 = worker2.postMessage({
value: array.slice(Math.floor(array.length / 2))
});
//使用“Promise.all()”來同步workers
//比手動嘗試協調整個worker回呼函式要容易得多,
Promise.all([promise1, promise2]).then((values) => {
console.log('reduced', [].concat(...values).reduce((r, v) => r + v));
//→reduced 41665416675000
});
這就是我們需要向worker發布資料以及同步來自兩個或更多worker的資料時,我們實際上就有動力撰寫并發代碼 - 它看起來與現在的其他應用程式代碼相同,
惰性workers
現在是我們從不同角度看待web workers的時候了,我們使用worker的根本原??因是我們想要在相同的時間內計算比過去更多的資料,正如我們現在所知,這樣做涉及訊息傳遞錯綜復雜,可以說是分而治之的策略,我們必須通過將資料輸入和輸出worker,通常使用陣列,
生成器幫助我們實作惰性地計算,也就是說,我們不想在記憶體中計算內容或分配資料,直到我們確實需要它,web workers難以實作這一目標嗎?或者我們可以利用生成器來惰性地并行計算嗎?
在本節中,我們將探討有關在Web worker中使用生成器的方法,首先,我們將研究與Web worker相關的開銷問題,然后,我們將撰寫一些代碼通過使用生成器來將資料輸入或者輸出worker,最后,我們將看看我們是否可以惰性地通過一個生成器鏈在web worker上傳遞所有資料,
減少開銷
主執行緒可以拆分開銷大的Web workers操作,在另一個執行緒中運行它們,這意味著DOM能夠渲染掛起的更新并處理掛起的用戶事件,但是,我們仍然面臨分配大型陣列的開銷和更新UI所需的時間,盡管與Web worker并行處理,但我們的用戶仍然可能面臨運行緩慢,因為在處理完整個資料集之前,UI沒有更新,這是常見的模式的示圖:
這是具有單個worker的資料所采用的通用路徑; 當有多個worker時,同樣的方法也適用,使用這種方法,我們無法避免需要將資料序列化兩次這一事實,我們必須分配兩次,這些開銷僅僅是為了促進worker的通信,而與我們試圖實作的應用程式功能幾乎沒有關系,
worker通信所需的陣列和序列化開銷通常不是什么大問題,但是,對于更大的集合,我們可能會面臨真正的性能問題,這源于我們用于提高性能的機制,因此,從另一個角度看worker通信不會受到損失,即使最初沒有必要,
這是大多數worker采用的通用路徑的變體,不是預先分配和序列化大量資料,而是將單個項傳入和傳出worker,這使得UI有機會在所有處理的資料到達之前使用已處理的資料進行更新,
在workers中生成值
如果我們想要在workers生成結果時更新UI,那么他們無法將結果集打包為陣列,以便在完成所有計算后發送回主執行緒,當發生這種情況時,UI就停在那里而不回應用戶,我們希望一個惰性的方法,其中值是在一段時間產生一個,這樣UI就可以越快被更新,讓我們建立一個例子,將輸入發送到該web workers,然后將結果以一個比我們之前在這本書已經看到的更細微的水平發送回來:
首先,我們將創造一個worker; 它的代碼如下:
//消耗一些CPU回圈...
//源自http://adambom.github.io/parallel.js/
function work(n) {
var i = 0;
while(++i < n * n) {}
return i;
}
//將呼叫“work()”的結果發回給主執行緒
addEventListener('message', (e) => {
postMessage(work(e.data));
});
這里沒有什么可大不了的,它與我們已經習慣的通過低效率地對數字進行減慢運行的代碼的work()函式相同,worker內部沒有使用實際的生成器,這是因為我們真的不需要,我們馬上就會明白為什么:
//創建一個“update()”協程,
//在生成結果時持續更新UI,
var update = coroutine(function* () {
var input;
while (true) {
input = yield;
console.log('result', input.data);
}
});
//創建worker,并指定“update()”協程
//作為“message”回呼處理程式,
var worker = new Worker('worker.js');
worker.addEventListener('message', update);
//一個數字逐漸變大的陣列
var array = new Array(10).fill(null).map((v, i) => i * 10000);
//迭代陣列,將每個數字作為私有訊息傳遞給worker,
for(let item of array) {
worker.postMessage(item);
}
//→
//result 1
//result 100000000
//result 400000000
//result 900000000
//result 1600000000
//result 2500000000
//result 3600000000
//result 4900000000
//result 6400000000
//result 8100000000
傳遞給我們worker的每個數字的處理成本都比前一個數字要高,總的來說,在向用戶顯示任何內容之前處理整個輸入陣列會覺得應用程式掛起或出錯了,但是,這不是這種情況,因為雖然每個數字的處理開銷很高,但我們會在結果可用時將結果發布回來,
我們通過傳入一個陣列來執行和將陣列作為輸出回傳來執行有著相同的作業量,然而,這種方法只是改變了事情發生的順序,我們在演示中引入了協作式多任務 - 在一個任務中計算一些資料并在另一個任務中更新UI,完成作業所花費的總時間是相同的,但對于用戶來說,感覺要快得多,總得說來,用戶可感知的應用程式性能是唯一重要的性能指標,
我們將輸入作為單獨的訊息傳遞,我們可以將輸入作為陣列傳遞,單獨發布結果,并獲得相同的效果,但是,這可能
僅僅是不必要的復雜性,對于模式有自然的對應關系,因為它是 - 專案輸入,專案輸出,如果你不需要就不要改變它,
惰性worker鏈
正如我們在“第4章,使用Generator實作惰性計算”看到,我們可以組裝生成器鏈,這就是我們惰性地實作復雜函式的方式;一個項流經一系列生成器函式,這些函式在生成之前將項轉換為下一個生成器,直到它到達呼叫者,沒有生成器,我們必須分配大量的中間資料結構,只是為了將資料從一個函式傳遞到下一個函式,
在本文之前的部分中,我們看到Web worker可以使用類似于生成器的模式,由于我們在這里面臨類似的問題,我們不希望分配大型資料結構,我們可以通過在更細粒度級別傳遞專案來避免這樣做,這具有保持UI回應的額外好處,因為我們能夠在最后一個專案從worker到達之前更新它,鑒于我們可以與worker做很多事情,我們難道不能基于在這個想法并組裝更復雜的worker處理節點鏈嗎?
例如,假設我們有一組數字和幾個轉換,我們在UI中顯示這些轉換之前,我們需要按特定順序進行這些轉換,理想情況下,我們會設定一個worker鏈,每個worker負責執行其指定的轉換,然后將輸出傳遞給下一個worker,最終,主執行緒獲得一個可以在DOM中顯示的值,
這個目標的問題在于它所涉及的很棘手的通信,由于專用worker只與創建它們的主執行緒進行通信,因此將結果發送回主執行緒,然后發送到鏈中的下一個worker執行緒,這幾乎沒有什么益處,好吧,事實證明,專用worker可以直接通信而不涉及主執行緒,我們可以在這里使用稱為頻道訊息的東西,這個想法很簡單; 它涉及創建一個頻道,它有兩個埠 - 訊息在一個埠上發布并在另一個埠上接收,
我們一直在使用訊息傳遞頻道和埠,他們被卷入web workers,這是訊息事件和postMessage()方法模式的來源,以下是我們如何使用頻道和埠連接我們的Web worker的示圖:
我們可以看到,每個頻道使用兩個訊息傳遞埠,第一埠是用于發布訊息,而所述第二埠被使用來接收訊息事件,主執行緒唯一一次使用是當所述處理鏈首先被用于發布一個訊息給第一信道和當該訊息從第三信道被接收到的訊息,不要讓worker通信所需的六個埠嚇倒我們,讓我們寫一些代碼; 也許,那里看起來會更易于理解,首先,我們將創建鏈中使用的worker,實際上,他們是同一個worker的兩個實體,下面是代碼:
addEventListener('message', (e) => {
//獲取用于發送和接收訊息的埠,
var [port1, port2] = e.ports;
//偵聽第一個埠的傳入訊息,
port1.addEventListener('message', (e) => {
//在第二個埠上回應,結果為呼叫“work()”,
port2.postMessage(work(e.data));
});
//啟動兩個埠,
port1.start();
port2.start();
});
這是很有趣的,在這個worker中,我們有訊息埠可以使用,第一個埠用于接收輸入,第二個埠用于發送輸出,該work()函式簡單地使用我們現在熟悉的平方數消耗CPU周期來看workers如何表現,我們在主執行緒中想要做的是創建這個worker的兩個實體,這樣我們就可以傳遞第一個平方數的實體,然后,在不將結果傳遞回主執行緒的情況下,它將結果傳遞給下一個worker,并再次對數字進行平方,通信路線應該與前面的圖表非常相似,讓我們看一下使用訊息傳遞通道連接worker的一些代碼:
//開始我們的worker...
var worker1 = new Worker('worker.js');
var worker2 = new Worker('worker.js');
//創建通信所需的在兩個worker之間的訊息通道,
var channel1 = new MessageChannel();
var channel2 = new MessageChannel();
var channel3 = new MessageChannel();
//我們的“update()”協程會記錄worker的回應
var update = coroutine(function* () {
var input;
while (true) {
input = yield;
console.log('result', input.data);
}
});
//使用“worker1”連接“channel1”和“channel2”,
worker1.postMessage(null, [
channel1.port2,
channel2.port1
]);
//使用“worker2”連接“channel2”和“channel3”,
worker2.postMessage(null, [
channel2.port2,
channel3.port1
]);
//將我們的協程“update()”連接到收到“channel3”任何訊息,
channel3.port2.addEventListener('message', update);
channel3.port2.start();
//我們的輸入資料 - 一組數字,
var array = new array(25)
.fill(null)
.map((v, i) => i*10);
//將每個陣列項發布到“channel1”,
for (let item of array) {
channel1.port1.postMessage(item);
}
除了我們要發送給worker的資料之外,我們還可以發送一個訊息埠串列,我們希望將這些訊息埠傳輸到worker背景關系,這就是我們對發送給worker的前兩條訊息的處理方式,訊息資料為空,因為我們沒有對它做任何事情,實際上,這些是我們發送的唯一訊息直接給worker,通信的其余部分通過我們創建的訊息通道進行,開銷大的計算發生在worker上,因為那是訊息處理程式所在的位置,
使用Parallel.js
使用Parallel.js庫的目的是為了使與Web worker互動盡可能的無縫,在事實上,它完成了這本書的一個關鍵目標,它隱藏并發機制,并讓我們能夠專注于我們正在構建的應用程式,
在本節中,我們將介紹Parallel.js對worker通信采取的方法以及將代碼傳遞給worker的通用方法,然后,我們將介紹一些使用Parallel.js生成新worker執行緒的代碼,最后,我們將探索這個庫已經提供的內置map/reduce功能,
它怎么作業的
在本書中到目前為止我們使用的所有worker都是我們自己創造的,我們在worker中實作了訊息事件處理,計算某些值,然后發布回應,使用Parallel.js,我們不實作worker,相反,我們實作函式,然后將函式傳遞給由庫管理的workers,
這給我們帶來了一些麻煩,我們所有的代碼都在主執行緒中實作,這意味著更容易使用在主執行緒中實作的函式,因為我們不需要使用importScripts()將它們匯入到Web worker中,我們也不需要通過腳本目錄創建Web worker并手動啟動它們,相反,我們讓Parallel.js為我們生成新的worker,然后我們可以通過將函式和資料傳遞給他們來告訴worker該做什么,那么,這究竟是如何作業的呢?
workers需要一個腳本引數,沒有有效的腳本,worker根本無法作業,Parallel.js有一個簡單的eval腳本,這是傳遞給庫創建的worker的內容,然后,主執行緒中的API將在worker中進行評估代碼,并在需要與workers通信時將其發送,
這是可行的,因為Parallel.js的目的不是暴露worker支持的大量功能,相反,目標是使worker通信機制盡可能無縫,同時提供最小的功能,這樣可以輕松構建與我們的應用程式相關的并發功能,而不是我們永遠不會使用的許多其他功能,
以下是我們如何使用Parallel.js和它的eval腳本將資料和代碼傳遞給worker的說明:
生成workers
Parallel.js庫有一個作業的概念,作業的主要輸入是作業要處理的資料,作業的創建并不直接與后臺worker的創建聯系在一起,workers與Parallel.js中的作業不同;使用庫時,我們不直接與worker互動,一旦我們有了作業實體,并且它提供了我們的資料,我們就會使用一個作業方法來呼叫workers,
最基本的方法是spawn(),它將一個函式作為引數并在Web worker中運行它,我們傳遞給它一個函式作為引數并且在web worker中運行,我們傳遞給它的函式可以回傳結果,然后將它們決議為一個thenable物件被spawn()函式回傳,讓我們看一下使用Parallel.js生成由一個web worker回傳的新作業的代碼:
//一個數字輸入陣列,
var array = new Array(2500)
.fill(null)
.map((v, i) => i);
//創建一個新的并行作業,
//在這里沒有worker的創建 -
//我們只傳遞我們正在使用的構造資料,
var job = new Parallel(array);
//為我們的“spawn()”作業啟動一個定時器,
console.time(`${array.length} items`);
//創建一個新的Web worker,并將我們的資料和這個函式傳遞給它,
//我們正在慢慢映射陣列的每個數字到它的平方,
job.spawn((coll) => {
return coll.map((n) => {
var i = 0;
while(++i < n*n) {}
return i;
});
//“spawn()”的回傳值是thenable,含義
//我們可以分配一個“then()”回呼函式,
//就像回傳的promise那樣,
}).then((result) => {
console.timeEnd(`${array.length} items`);
//→2500 items:3408.078ms
});
那么現在,這很不錯; 我們不必擔心任何單調的Web worker生命周期任務,我們有一些資料和一些我們想要應用于資料的函式,我們希望與頁面上發生的其他作業并行運行,最吸引人的是熟悉的thenable,從那里回傳的spawn()方法,它適用于我們的并發應用程式,其中所有其他應用程式都被視為promise,
我們記錄處理我們提供的輸入資料的函式所需的時間,我們只為這個任務生成一個Web worker,因此在主執行緒中計算得到的結果與原來的時間相同,除了釋放主執行緒來處理DOM事件和重繪之外,沒有實際的性能提升,我們將看看是否可以使用一個不同的方法來提升并發級別,
當我們完成后,spawn()創建的worker立即終止,這為我們釋放了記憶體,但是,沒有并發級別來管理
spawn()的使用,如果我們愿意,我們可以連續呼叫它100次,
Mapping and reducing
在上一節中,我們使用spawn()方法生成了一個worker執行緒,Parallel.js還有一個map()方法和一個reduce()方法,這個方法是讓事情變得更輕松,通過傳遞map()函式,庫將自動將其應用于作業資料中的每個項,類似的語意適用于reduce()方法,讓我們通過撰寫一些代碼來看看它是如何作業的:
//一個數字輸入陣列,
var array = new Array(2500)
.fill(null)
.map((v, i) => i);
//創建一個新的并行作業,
//這里不會創建workers - 我們只傳遞我們正在使用的構造資料,
var job1 = new Parallel(array);
//為我們的“spawn()”作業啟動一個計時器,
console.time('JOB1');
//這里的問題是Parallel.js會為每個陣列元素創建一個新的worker,
//導致并行減速,
job1.map((n) => {
var i = 0;
while (++i < n*n) {}
return i;
}).reduce((pair) => {
//將陣列項reduce為一個總和,
return pair[0] + pair[1];
}).then((data) => {
console.log('job1 reduced', data);
//→job1 reduced 5205208751
console.timeEnd('job1');
//→job1:59443.863ms
});
哎喲! 這是一個非常重要的性能 - 這里發生了什么?我們在這里看到的是一種稱為并行減速的現象,當并行通信開銷過多時,會發生這種減速,在這個特定示例中發生這種情況的原因是由于Parallel.js在map()中處理陣列的方式,每個陣列項都通過一個worker,這并不意味著創建了2500個worker - 一個worker用于陣列中的每個元素,創建的worker數量最多只能達到4或者我們在本書前面看到的navigator.hardwareConcurrency值,
在真正的開銷來自于發送的訊息并收到了worker-5000個訊息!這顯然不是最優的,因為由代碼中的定時器給證明,讓我們看看是否能夠做出一個對這些數字的大幅改善,同時保持大致相同的代碼結構:
//更快的執行,
var job2 = new Parallel(array);
console.time('job2');
//在映射陣列之前,將陣列拆分為較小的陣列塊,
//這樣,每個Parallel.js worker都是處理陣列而不是陣列項,
//這避免了發送數千個Web worker訊息,
job2.spawn((data) => {
var index = 0,
size = 1000,
results = [];
while (true) {
let chunk = data.slice(index, index + size);
if (chunk.length) {
results.push(chunk);
index += size;
} else {
return result;
}
}
}).map((array) => {
//回傳陣列塊的映射,
return array.map((n) => {
var i = 0;
while(++i < n * n) {}
return i;
});
}).reduce((pair) => {
//將陣列塊或數字reduce為一個總和,
return(Array.isArray(pair[0]) ?
pair[0].reduce((r, v) => r + v) :
pair[0]) + (Array.isArray(pair[1]) ?
pair[1].reduce((r, v) => r + v) :
pair[1]);
}).then((data) => {
console.log('job2 reduced', data);
//→job2 resuced 5205208751
});
console.timeEnd('job2');
//→job2:2723.661ms
在這里,我們可以看到的是在同樣的結果被產生,并且快得多,不同之處在于我們開始作業之前將陣列切片成的陣列較小的陣列塊,這些陣列就是傳遞給workers的項,而不是單個的數,所以映射作業略微有好的改變,而平方一個數字,它映射一個較小的陣列到平方的陣列,該reduce的邏輯是稍微復雜一些,但總體來說,我們的做法是仍然是相同的,最重要的是,我們已經洗掉了大量的訊息傳遞瓶頸,他們在第一次執行造成不可接受的性能缺陷,
就像spawn()方法在回傳時清理worker一樣,Parallel.js中的map()和reduce()方法也是如此,
釋放worker的缺點是,無論何時呼叫這些方法,都需要重新創建它們,我們將在下一節討論這個挑戰,
worker執行緒池
本章的最后一節介紹了worker執行緒池的概念,在上一節關于Parallel.js的介紹中,我們遇到了經常創建和終止worker的問題,這需要很多開銷,如果知道我們能夠運行的并發級別,那么為什么不分配一個可以承擔作業的靜態大小的worker執行緒池?
創建worker執行緒池的第一個設計任務是分配worker,下一步是通過將作業分發給池中的可用worker來計劃作業,最后,當所有worker都在運行時,我們需要考慮忙碌狀態,讓我們開始吧,
分配池
在考慮分配worker執行緒池之前,我們需要查看總體worker抽象池,我們如何希望它的外觀和行為是怎樣的?理想情況下,我們希望抽象池的外觀和行為類似于普通的專用worker,我們可以向執行緒池發布訊息并獲得promise作為回應,因此,雖然我們無法直接擴展Worker原型,但我們可以創建一個與Worker API非常相似的新的抽象,
我們現在來看一些代碼吧,這是我們將使用的初始抽象:
//表示Web worker執行緒的“池”,
//隱藏在后面單個Web worker介面的介面,
function WorkerPool(script) {
//并發級別,或者Web worker要創造的數量,
//這使用了“hardwareConcurrency”屬性(如果存在),
//否則,默認為4,
//因為這是對最常見的CPU結構進行的合理猜測,
var concurrency = navigator.hardwareConcurrency || 4;
//worker實體本身存盤在Map中,作為鍵,
//我們馬上就會明白為什么,
var workers = this.workers = new Map();
//對于發布的訊息存在佇列,所有worker都很忙,
//所以這可能永遠不會被用到的,
var queue = this.queue = [];
//用于下面創建worker程式實體,
//以及添加事件監聽器,
var worker;
for (var i = 0; i < concurrency; i++) {
worker = new Worker(script);
worker.addEventListener('message', function(e) {
//我們使用“get()”方法來查找promise的“resolve()”函式,
//該worker是關鍵,我們呼叫的從worker回傳的資料的決議器
//并且可以將其重置為null,
//這個很重要,因為它表示worker是空閑的,
//可以承擔更多作業,
workers.get(this)(e.data);
workers.set(this, null);
//如果有排隊的資料,我們得到第一個
//佇列中的“data”和“resolver”,
//我們用資料呼叫“postMessage()”之前,
//我們使用新的“resolve()”函式更新“workers”映射,
if (queue.length) {
var [data, resolver] = queue.shift();
workers.set(this, resolver);
this.postMessage(data);
}
//這是worker的初始設定,作為在“worker”映射中的鍵,
//它的值為null,意味著沒有決議函式,它可以承擔作業,
this.workers.set(worker, null);
}.bind(worker));
}
}
創建新的WorkerPool時,給定的腳本用于生成執行緒池中的所有worker,該worker屬性是一個Map實體,worker實體本身是作為鍵,我們將worker存盤為映射鍵的原因是我們可以輕松地查找適當的決議器函式來呼叫,
當給定的worker程式回應時,呼叫我們添加到每個worker的訊息事件處理程式,這就是我們找的等待呼叫的決議器函式的地方,我們不可能呼叫錯誤的決議器,因為給定的worker在完成當前任務之前不會接受新的任務,
調度任務
現在我們將實作postMessage()方法,這是呼叫者用于將訊息發布到池中的一個worker,呼叫者不知道哪個worker滿足了他們的要求,他們也不關心,他們將promise作為回傳值,并以worker回應作為決議值:
WorkerPool.prototype.postMessage = function(data) {
//“workers”Map映射實體,其中包含所有存盤的Web worker,
var workers = this.workers;
//當所有worker都很忙時訊息被放在“queue”佇列中
var queue = this.queue;
//嘗試找一個可用的worker,
var worker = this.getWorker();
//promise會立即回傳給呼叫者,
//即使沒有worker可用,
return new Promise(function(resolve) {
//如果找到了worker,我們可以更新Map映射,
//使用worker作為鍵,并使用“resolve()”函式作為值,
//如果沒有worker,那么訊息資料以及“resolve()”函式被推送到“queue”佇列,
if (worker) {
workers.set(worker, resolve);
worker.postMessage(data);
} else {
queue.push([data, resolve]);
}
});
};
它是promise執行器函式,實際負責查找第一個可用的worker并在那里發布我們的訊息,當找到可用的worker時,我們還在我們的worker映射中設定了worker的決議器函式,如果池中沒有可用的worker程式,已發布的訊息則將進入佇列,此佇列在訊息事件處理程式中清空,這是因為當worker回傳訊息時,這意味著worker是空閑的可以承擔更多作業,并且在回傳空閑狀態之前檢查是否有任何worker排隊,
該getWorker()方法是一個簡單的輔助函式為我們查找下一個可用的worker,我們知道如果一個worker在workers映射中將其決議器函式設定為null,則可以執行該任務,最后,讓我們看看這個worker執行緒池的應用:
//創建一個新的執行緒池和一個負載計數器,
var pool = new WorkerPool('worker.js');
var workload = 0;
document.getElementById('work').addEventListener('click', function(e) {
//獲取我們要傳遞給worker的資料,
//并為此負載創建計數器,
var amount = +document.getElementById('amount').value,
timer = 'Workload' + (++workload);
console.time(timer);
//將訊息傳遞給執行緒池,并在promise完成時,停止計時器,
pool.postMessage(amount).then(function(result) {
console.timeEnd(timer);
});
//如果訊息開始排隊,
//我們的執行緒池就是過載并顯示警告,
if (pool.queue.length) {
console.warn('worker pool is getting busy...');
}
});
在這種使用場景中,我們有幾個表單控制元件將引數化作業發送給worker,數字越大,作業時間越長; 它使用標準的work()函式來緩慢地對數字作平方,如果我們使用大量數字并頻繁單擊按鈕將訊息發布到執行緒池中,那么最終我們將耗盡執行緒池中可用的資源,如果是這種情況,我們將顯示警告,但是,這僅用于故障排除,當執行緒池繁忙時,發布的訊息不會丟失,它們只是排隊等候,
小結
本章的重點是從代碼中洗掉突兀的并發語法,它只是提高了我們應用程式成功運行的可能性,因為我們將擁有易于維護和構建的代碼,我們解決的第一個問題是通過使所有內容都是并發的方式來撰寫并發代碼,當沒有所涉及的猜測成分時,我們的代碼就是一致的,不易受并發錯誤的影響,
然后,我們研究了抽象Web worker通信可以采取的各種方法,輔助函式是一個選項,因此擴展了postMessage()方法,然后,當我們需要UI回應時,我們解決了Web workers的一些限制,即使我們的大型資料集處理速度更快,我們仍然存在更新UI的問題,這是通過將Web worker作為生成器處理來完成的,
我們不必自己撰寫所有這些JavaScript并發工具方法,我們花了一些時間來研究Parallel.js庫的各種功能和限制,我們以介紹Web worker執行緒池結束了本章,這些消除了與worker創建和終止相關的大量開銷,并且它們極大地簡化了任務的分配和結果的協調,
這些都是適用于前端的并發話題,現在是時候切換一下,使用NodeJS查看后端的JavaScript并發性,
最后補充下書籍章節目錄
- 《JavaScript并發編程》第一章 JavaScript并發簡介
- 《JavaScript并發編程》第二章 JavaScript運行模型
- 《JavaScript并發編程》第三章 使用Promises實作同步
- 《JavaScript并發編程》第四章 使用Generators實作惰性計算
- 《JavaScript并發編程》第五章 使用Web Workers
- 《JavaScript并發編程》第六章 實用的并發
- 《JavaScript并發編程》第七章 抽取并發邏輯
另外還有講解兩章nodeJs后端并發方面的,和一章專案實戰方面的,這里就不再貼了,有興趣可轉向https://github.com/yzsunlei/javascript_concurrency_translation查看,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/180444.html
標籤:JavaScript
