最近,我在玩nodejs中的異步函陣列合,并意識到如果我可以將它們包裝成管道,以便在其他地方重用它們,那就太好了。前一個異步函式的每個輸出都將成為下一個函式的輸入。對于基本的實作,我使用了這樣的代碼:
where : 這就是基本的方案,在大多數時候都能滿足我的需求。然而,我得出的結論是,當某些條件得到滿足時,我很可能需要提前退出(也就是提前完成管道的最終確定)。我找不到一個干凈利落地實作這一目標的方法,唯一對我有效的是用一些錯誤來拒絕承諾,這將作為一個斷路器。這是我使用的方法: 所以我的任務會收到額外的函式作為引數,所以我可以取消一些管道階段: 我的問題是:是否有一些干凈的方法來實作提前退出,并在滿足某些條件時完成管道,而不拋出一個錯誤? 另外,我發現這個專案看起來很有希望( 注意,我不是一個有經驗的nodejs開發人員。 謝謝! uj5u.com熱心網友回復: 你忘記了將 讓我們看看我們如何解決這個問題。我們將添加一個簡單的 這里是 你可以通過使用 或者你可以使用 uj5u.com熱心網友回復: 你可以使用節點流api。有一些方法可以停止/恢復/管道/什么的流。
https://nodejs.org/api/stream.html
標籤:pipeline
pipeline.js。
function run(tasks, input) {
let context = input;
const reducer = (acc, task) => acc. then((result) =>; {
context = result;
return task(context)。
};
return tasks.reduce(reducer, Promise.resolve(context)) 。
}
tasks.js/* 不做任何事情,回傳輸入 */
const task1 = async (text) => {
console.log(`previous output: ${context}`)。)
return context。
}
/* 不做任何事情,回傳輸入 */。
const task2 = async (text) => {
console.log(`previous output: ${context}`)。)
return context。
}
...
test.js/* import tasks -> tas1, task2, ... */
const tasks = require('tasks.js'/span>)。
const pipeline = require('pipeline.js') 。
pipeline.run(tasks, null)
.then((output) => console. log('done')
.catch((err) => console.error(err) )。
pipeline.js : pipeline實作tasks.js :函式定義,我可以從中組成管道test.js:測驗一切的代碼pipeline
pipeline.jsfunction run(tasks, input){
let context = input;
/* 添加了取消關閉 */。
const cancel = (/span>) => {
throw new PipelineCancelError()。
}
const reducer = (acc, task) => acc. then((result) =>; {
context = result;
return task(context)。
};
return tasks.reduce(reducer, Promise.resolve(context)) 。
}
tasks.js/* 不做任何事情,回傳輸入 */
const task3 = async (text, cancel) => {
console.log(`previous output: ${context}`)。)
if (someCondition) {
/* 取消管道 */
cancel()。
}
return context。
}
express風格的處理程式),但是當任務拋出未處理的錯誤時,我難以處理錯誤。promise-pipelinecancel傳遞給你的task -function run(tasks, input){
let context = input;
/* 添加了取消關閉 */。
const cancel = (/span>) => {
throw new PipelineCancelError()。
}
const reducer = (acc, task) => acc. then((result) =>; {
context = result;
return task(context); // <- pass cancel to task
};
return tasks.reduce(reducer, Promise.resolve(context) )。
}
sleep作為示范 -
<iframe name="sif1" sandbox="allow-forms allow-modals allow-scripts" class="snippet-box-edit snippet-box-result" frameborder="0"></iframe>
function run(tasks, input) {
const cancel = e => { throw e }
const reducer = (acc, task) => acc. then(result => task(結果,取消))
return tasks.reduce(reducer, Promise.resolve(input) )
}
function sleep (ms) {
return new Promise(r =>。 setTimeout(r, ms))
}
async function task1 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
return context.toUpperCase()
}
async function task2 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
return context "!"
}
async function task3 (context, cancel){
await sleep(1000)
console.log(`previous output: ${context}`)
cancel(Error("task3 cancelled")
return context "?"。
}
run([task1, task2, task2], "hello").then(console. log, console.error) previous output: hello
以前的輸出。HELLO
以前的輸出。HELLO!
HELLO!!!
cancel效果的演示 -
<iframe name="sif2" sandbox="allow-forms allow-modals allow-scripts" class="snippet-box-edit snippet-box-result" frameborder="0"></iframe>
function run(tasks, input) {
const cancel = e => { throw e }
const reducer = (acc, task) => acc. then(result => task(結果,取消))
return tasks.reduce(reducer, Promise.resolve(input) )
}
function sleep (ms) {
return new Promise(r =>。 setTimeout(r, ms))
}
async function task1 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
return context.toUpperCase()
}
async function task2 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
return context "!"
}
async function task3 (context, cancel){
await sleep(1000)
console.log(`previous output: ${context}`)
cancel(Error("task3 cancelled")
return context "?"。
}
run([task1, task2, task3, task2], "hello").then(console. log, console.error) previous output: hello
以前的輸出。HELLO
以前的輸出。HELLO!
錯誤。"task3被取消"
.then的第二個引數來應對取消的情況 -run([task1, task2, task3, task2], "hello"/span>)
.then(console.log,console.error) // <-。
.catch -run([task1, task2, task3, task2], "hello"/span>)
.then(console.log)
.catch(console.error) // <-.
