環境:nodejs 17.2,expressjs 4.17
任務:資料以每秒大約 2 個請求的速度從不同的服務器到達“/user-actions”型別的 url。有必要將它們聚合并每秒發送一次到另一臺服務器。
例如:
- 請求 #1: {userId: 1, action: "hitOne"}
- 請求 #2: {userId: 2, action: "hitFive"}
- 請求 #3: {userId:1, action: "hitFive"}
需要獲取2個物件
const data = [{userId: 1, action: "hitOne"}, {userId: 2, action: "hitFive"}]
和
const data = [{userId: 1, action: "hitFive"}]
這些物件中的每一個每秒都會發送到另一臺服務器 1 次,就像這樣
http.post('http://newserver.url/user-actions', {data});
我正在考慮創建一個變數來記錄請求中出現的所有內容,并在計時器上每秒發送一次該變數到新服務器。但是有些東西告訴我:或者變數會出現問題(例如,由于并發請求)并且不會總是有我正在等待的資料,或者計時器會出現一些廢話。
如何正確實作這樣的場景?
uj5u.com熱心網友回復:
因此,您正在創建某種代理服務。您有兩個潛在的問題:
- 資料持久化和
- 重試和掛起的請求。
我認為你最好的選擇是做這樣的事情:
- 在此特定服務中(使用 API 路由),您只需接收請求,并將它們存盤在諸如 Redis、RabbitMQ 或 Amazon SQS 之類的地方。
- 在另一個服務中,您處理重試、發布等。
即使你不拆分成兩個服務,你仍然想把東西放在專門的存盤服務中。例如,您的行程崩潰了,您丟失了記憶體中保存的所有資料。它還簡化了所有管理細節。諸如存盤、排序最先出現的內容、待處理的請求之類的事情——使用 RabbitMQ 型別的服務非常容易處理。
但是讓我們簡化事情并將它們保存在記憶體中。現在你必須自己處理所有這些事情。
所以這里有一個天真的代理服務。
const axios = require('axios');
const axiosRetry = require('axios-retry');
const REQUEST_INTERVAL = 1000; // every second
const MAX_PARALLEL_REQUESTS = 3;
axiosRetry(axios, { retries: 3});
const bucket = [];
let exportingInterval;
let currentRequestsCount = 0;
const logRequest = (payload) => bucket.push(payload);
const makeRequest = (payload) => axios.post('http://remote-service/user-actions', payload);
const sendData = () => {
// first, make sure you don't make more then X parallel requests
if (currentRequestsCount > MAX_PARALLEL_REQUESTS) {
return
}
// clear the bucket
const data = bucket.splice(0, bucket.length);
if (!data.length) {
return;
}
// send the data, make sure you handle the failure.
currentRequestsCount = currentRequestsCount 1;
makeRequest()
.then(() => currentRequestsCount = currentRequestsCount - 1)
.catch(() => {
// what do do now? We failed three times.
// Let's put everything back in the bucket, try in the next request.
bucket.splice(bucket.length, 0, ...data);
currentRequestsCount = currentRequestsCount - 1;
});
}
const startExporting = () => exportingInterval = setInterval(() => sendData(), REQUEST_INTERVAL);
const stopExporting = () => clearInterval(exportingInterval)
module.exports = {
logRequest,
startExporting,
stopExporting,
}
現在,您將像這樣使用它:
const proxyService = require('./proxy-service');
const app = express();
proxyService.startExporting();
// ...
app.post('/user-data', (req, res) => {
proxyService.logRequest(req.body);
res.end();
});
現在,這只是一個簡單的例子:
- 您確實需要確保重試策略正常。您必須確保不會在發送資料的任何地方進行 DoS。
- 您要確保限制每次呼叫發送的物件數量。
- 也許 1 秒的間隔不是一件好事 - 如果發送資料持續更長時間怎么辦?
- 如果您開始堆積請求怎么辦?我的簡單計數器只計數到 3,也許它更復雜。
此外,呼叫它startExporting并stopExporting應該放在一些常見的地方,你啟動應用程式的地方,以及在正常關閉的情況下清理的地方。
但它讓您了解如何做到這一點。
uj5u.com熱心網友回復:
這是一個權衡:時間,資料
如果你想保證足夠的資料,你可以使用 Promise.all() 函式。當兩個請求都得到回應時,您將呼叫 api 發送它。這將確保有足夠的資料,但不會確保每秒將資料發送到另一臺服務器。
let pr1 = request1();
let pr2 = request2();
await data = promise.all([pr1,pr2]);
requestToAnotherServer(data);
如果要確保服務器每秒向另一臺服務器發送一次資料。您可以設定一個計時器,超時時,您發送服務器收到的資料。但這并不能確保足夠的資料
sendData = [];
setInterval(()=>{
let pr1 = request1().then(data=>{sendData.push(data)});
let pr2 = request2().then(data=>{sendData.push(data)});
requestToAnotherServer(sendData);
sendData = [];
},1000)
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/398917.html
