我有以下從控制器呼叫的代碼。
public async Task Execute()
{
var collections= await _repo.GetCollections(); // This gets 500 items
List<Object1> coolCollections= new List<Object1>();
List<Object2> uncoolCollections= new List<Object2>();
foreach (var collection in collections)
{
if(collection == "Something")
{
var specialObject = TurnObjectIntoSpecialObject(collection);
uncoolCollections.Add(specialObject);
}
else
{
var anotherObject = TurnObjectIntoAnotherObject(object);
coolCollections.Add(anotherObject);
}
}
var list1Async = coolCollections.Select(async obj => await restService.PostObject1(obj)); //each call takes 200 -> 2000ms
var list2Async = uncoolCollections.Select(async obj => await restService.PostObject2(obj));//each call takes 300 -> 3000ms
var asyncTasks = list1Async.Concat<Task>(list2Async);
await Task.WhenAll(asyncTasks); //As 500 'tasks'
}
不幸的是,在大約 300 次左右的請求后,我收到了 504 錯誤。我無法更改 RestService 呼叫的 API,因此我一直在嘗試使上述代碼更高效。
更改Task.WhenAll為foreach回圈確實有效,并且確實解決了超時問題,但速度非常慢。
我的問題是如何確保上述代碼在 x 次請求后不會超時?
uj5u.com熱心網友回復:
對遠程站點或資料庫進行更多并發呼叫并不會提高吞吐量,恰恰相反。并發操作之間的沖突意味著超過某個點,一切都將開始花費更多時間,直到服務崩潰。現在你有一個可能的 300 路阻塞問題。
所有服務處理這個問題的方式是限制并發連接數。事實上,許多服務會限制客戶端,這樣如果有人……發送 500 個并發請求,它們就不會崩潰。有些人甚至可能會告訴您他們正在用 429 回應來限制您。
另一種方法是使用批量請求,因此您可以發送一批 500 個操作,而不是進行 300 或 500 次呼叫,從而允許服務以有效的方式處理它們。
您可以使用Parallel.ForEachAsync以指定的并行度執行多個呼叫:
ParallelOptions parallelOptions = new()
{
MaxDegreeOfParallelism = 30
};
await Parallel.ForEachAsync(object1, parallelOptions, async obj =>
{
await restService.PostObject1(obj);
});
await Parallel.ForEachAsync(object2, parallelOptions, async obj =>
{
await restService.PostObject2(obj);
});
您可以調整 DOP 以找到最有效的方法,而不會減慢遠程服務的速度。
如果服務可以處理它,您可以同時啟動兩個管道并等待它們完成:
ParallelOptions parallelOptions = new()
{
MaxDegreeOfParallelism = 30
};
var task1=Parallel.ForEachAsync(object1, parallelOptions, async obj =>
{
await restService.PostObject1(obj);
});
var task2=Parallel.ForEachAsync(object2, parallelOptions, async obj =>
{
await restService.PostObject2(obj);
});
await Task.WhenAll(task1,task2);
在限制 DOP 之前重試這些操作沒有意義。重試 500 次失敗的請求只會導致另一次失敗。以隨機或交錯的延遲重試與從一開始就限制 DOP 本質上是一樣的,只是需要更長的時間才能完成。
uj5u.com熱心網友回復:
由于您無法更改其余服務,因此 504 網關超時將保持不變。更好的解決方案是使用重試機制,如果您收到 504 錯誤代碼,那么您將在“x”秒后重試。
為什么要在“x”秒后重試?504的原因可能有很多,可能是服務器沒有處理更多請求,因為它目前處于最大作業負載。
Polly是一個經過實戰考驗的重試機制庫。
您還可以根據回傳型別撰寫自己的函式或操作。
根據 happyflow 用例,可以使用其他方法,但在這種情況下,我想到即使發生例外也要上傳所有資料。
需要考慮的是,如果此服務是由第三方供應商提供的,請查看檔案。它很可能會有一個關于服務的最大并發連接數的部分。
uj5u.com熱心網友回復:
此答案假定您使用的是 .NET 6 或更高版本。您可以將 投影objects到可列舉的object元素,然后使用該Parallel.ForEachAsync方法并行處理投影物件。此方法允許配置MaxDegreeOfParallelism,以便不是所有投影物件都被一次處理。結果,遠程服務器不會受到超出其處理能力的請求的轟炸,網路帶寬也不會飽和。MaxDegreeOfParallelism可以通過實驗找到最優值。從較小的數字開始,例如 5,然后逐漸增加,直到找到提供最佳性能的最佳點。
public async Task Execute()
{
var objects = await _repo.GetObjects();
IEnumerable<object> projected = objects.Select(obj =>
{
if (obj == "Something")
{
return (object)TurnObjectIntoSpecialObject(obj);
}
else
{
return (object)TurnObjectIntoAnotherObject(obj);
}
});
ParallelOptions options = new()
{
MaxDegreeOfParallelism = 5
};
await Parallel.ForEachAsync(projected, options, async (item, ct) =>
{
switch (item)
{
case Object1 obj1: await restService.PostObject1(obj1); break;
case Object2 obj2: await restService.PostObject2(obj2); break;
default: throw new NotImplementedException();
}
});
}
上面的代碼按照序列中出現的相同順序處理物件objects。PostObject1/操作是并行的PostObject2,但TurnObjectIntoSpecialObject/TurnObjectIntoAnotherObject操作不是。如果你也想將它們并行化,那么你可以將Parallel.ForEachAsync與 一起提供objects,并在并行回圈內進行投影。
如果出現錯誤,只會傳播第一個例外。如果您想傳播所有錯誤,可以在此處找到解決方案。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/533692.html
標籤:C#异步.net核心
