我在 Node 中有一個帶有許多端點的 API。它作業得很好,但是有一個可以讓大型請求運行長達 1 小時的處理,這經常會中斷。我正在考慮只在請求中發回一個 url,您可以在其中檢查請求的狀態,然后在準備好后下載它。為此,在 Node 中處理作業佇列的最佳方法是什么?
當前端點的示例代碼如下
const router = express.Router();
const schema = joi.object().keys // This is a schema to validate the json input
router.post('/', async (req, res) => {
let conn = await connect(); // Util method that connects to a Q/KDB server
let request = req.body;
joi.validate(request, schema, (err, _result) => {
if (err) {
res.status(400).send({ error: err['details'][0]['message'] });
}
else {
let qRequest = buildRequest(request); // Util function to build request
// Connect to Q/KDB server with node-q package and process request
conn.k('api.process', qRequest, function(err, resp) {
if (err) {
log // Write to log
res.status(400).send({ error: err['details'][0]['message']
}
else {
res.status(200).send(resp);
}
});
}
});
});
編輯:我發現我基本上只需要建立一個作業佇列,其中作業 ID 對應于它們。Bull 包似乎不錯,但我不想添加其他依賴項,例如 Redis。
uj5u.com熱心網友回復:
從概念上講,有幾種方法可以解決這樣的問題:
- 您可以回傳一個 jobID 并讓客戶端使用包含 jobID 的 URL 定期查詢該 jobID,直到他們得到結果(這聽起來像您的設想)
- 您可以讓客戶端將 webSocket 或 socket.io 連接連接到服務器,當結果完成后,服務器可以直接通過 websocket/socket.io 連接發送結果。
- 完成后,您可以使用服務器發送事件(SSE) 將結果“推送”到客戶端。
以下是上述第一個選項的方案大綱:
- 為要處理的每個傳入作業創建一個唯一的 jobID
- 創建一個查詢jobID狀態的路由
- 有一個
Map物件,其中包含由 jobID 索引的行程中的作業串列 - 有一個
setInterval()掃描作業映射中的作業以洗掉任何過期的作業(客戶端從未回來獲取它們的作業)。您可以設定該掃描的頻率以及您保留該作業的時間。 - 當一個請求進來時,它會生成一個新的 jobID,將一個“待處理”的作業添加到 Map 并回傳給客戶端一個他們可以查詢作業狀態的 URL。
- 當您最終完成處理作業時,結果將添加到
job物件中,并且其狀態更改為"complete"。 - 添加了一條路由來查詢作業狀態,其中包括
jobID. - 如果在查詢時,作業狀態為
"complete",則回傳結果并將作業從 Map 中洗掉。 - 如果在查詢時,作業狀態為
"error",則回傳錯誤并將作業從 Map 中洗掉。 - 如果查詢時 jobID 不存在,則回傳 404 狀態
- 如果在查詢時,作業狀態不是
"complete"or"error",則回傳job.statusand 可選job.progress。如果您愿意,這允許您長時間運行的行程反饋任何進度,并且您可以根據需要使用多個狀態值。
這是說明這個概念的代碼:
// A map of objects,
// the key is the jobID
// data is an object {status: "complete", result: someResult, timeStarted: someTime}
// If the job is not yet complete, status will be something other than "complete"
// and result will not yet exist
const jobs = new Map();
// check for expired jobs
const expirationInterval = 60 * 60 * 1000; // run expiration check once an hour
const expirationTime = 12 * 60 * 60 * 1000; // let jobs stay here for 12 hours
setInterval(() => {
// accumulate an array of items to remove so we aren't modifying while iterating
const expired = [];
const now = Date.now();
for (let [key, job] of jobs) {
if (now - job.timeStarted > expirationTime) {
expired.push(key);
}
}
// now remove all expired jobs
for (let key of expired) {
jobs.delete(key);
}
}, expirationInterval);
// make a job id that consists of current time (in ms) plus random number
// jobs can then be sorted or aged by time also
function makeJobID() {
const base = Date.now().toString();
const random = Math.random().toFixed(6).toString().slice(2);
return base "_" random;
}
// fetch data for a jobID
// The job may either not exist any more,
// may still be "pending" (or have some other status)
// or may be "complete"
// Note: if this router is not at the top level, you will have to make
// this path line up with the URL you sent back to the client
router.get("/jobstatus/:jobID", (req, res) => {
let job = jobs.get(req.params.jobID);
if (!job) {
res.sendStatus(404);
return;
}
if (job.status === "complete") {
// remove it from the jobs Map and send the data
jobs.delete(req.params.jobID);
res.send({
status: "complete",
result: job.result
});
} else if (job.status === "error") {
// remove it from the jobs Map and send the data
jobs.delete(req.params.jobID);
res.send({
status: "error",
error: job.error
});
} else {
// optional job.progress can also be communicated back. This can be
// a number, a string or an object of other data
if (job.progress) {
res.send({ status: job.status, progress: job.progress });
} else {
res.send({ status: job.status });
}
}
});
router.post('/', async (req, res) => {
let conn;
try {
conn = await connect(); // Util method that connects to a Q/KDB server
} catch (e) {
console.log(e);
res.sendStatus(500);
}
let request = req.body;
joi.validate(request, schema, (err, _result) => {
if (err) {
res.status(400).send({ error: err['details'][0]['message'] });
} else {
// coin job id and add a job object to the jobs map
const jobID = makeJobID();
const job = {
timeStarted: Date.now(),
status: "pending"
};
jobs.set(jobID, job);
// send response now that gives them a URL to query
res.status(202).send({
status: "Job submitted",
url: `https://yourdomain.com/jobstatus/${jobID}` // pick whatever URL you want here
});
let qRequest = buildRequest(request); // Util function to build request
// Connect to Q/KDB server with node-q package and process request
conn.k('api.process', qRequest, function(err, resp) {
if (err) {
// set job status to "error"
job.status = "error";
job.timeCompleted = Date.now();
try {
job.error = err['details'][0]['message'];
} catch (e) {
console.log(e);
job.error = "known";
}
} else {
// job has finished, update the job
// we can update the job object directly because the job Map
// points at this same object
job.status = "complete";
job.timeCompleted = Date.now();
job.result = resp;
}
});
}
});
});
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/428078.html
