TL;DR:我想對 REST-API 執行異步呼叫。標準呼叫會給我一個 CompleteableFuture<Response>,但是因為 API 對它在一定時間內允許的呼叫數量有限制,我希望能夠將呼叫排隊到 1. 按順序執行它們和 2。僅當我當時沒有超過 API 限制時才執行它們,否則請等待。
長版:
我正在使用 Retrofit 對 API 執行 Rest 呼叫,當我呼叫它時,Retrofit 回傳 CompleteableFuture<WhateverResponseClassIDeclare>。然而,由于我呼叫的 API 的限制,我想嚴格控制我呼叫的時間和順序。詳細地說,在某個時間段內呼叫太多會導致我被 IP 封禁。同樣,我想保持我的呼叫順序,即使它們不會立即執行。目標是呼叫 API 的 Wrapper,它像原始 API 一樣回傳 CompleteableFuture,但異步執行這些中間步驟。
我在玩 BlockingQueues、Functions、Callables、Suppliers 以及它們之間的所有東西,但我還不能讓它作業。
下面是我創建的當前非功能代碼作為模型來測驗這個概念。
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Function;
public class Sandbox2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MockApi mockApi = new MockApi();
CompletableFuture<Integer> result1 = mockApi.requestAThing("Req1");
CompletableFuture<Integer> result2 = mockApi.requestAThing("Req2");
CompletableFuture<Integer> result3 = mockApi.requestAThing("Req3");
System.out.println("Result1: " result1.get());
System.out.println("Result2: " result2.get());
System.out.println("Result3: " result3.get());
}
public static class MockApi {
ActualApi actualApi = new ActualApi();
BlockingDeque<Function<String, CompletableFuture<Integer>>> queueBlockingDeque = new LinkedBlockingDeque();
public CompletableFuture<Integer> requestAThing(String req1) {
Function<String, CompletableFuture<Integer>> function = new Function<String, CompletableFuture<Integer>>() {
@Override
public CompletableFuture<Integer> apply(String s) {
return actualApi.requestHandler(s);
}
};
return CompletableFuture
.runAsync(() -> queueBlockingDeque.addLast(function))
.thenRun(() -> waitForTheRightMoment(1000))
.thenCombine(function)
}
private void waitForTheRightMoment(int time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static class ActualApi {
public CompletableFuture<Integer> requestHandler(String request) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Integer.parseInt(request.substring(3));
});
}
}
}
uj5u.com熱心網友回復:
JDK 9 (JDK 1.8) 之前的版本
您可以利用ScheduledExecutor接受專案以預先固定的速率/延遲在預先配置的執行緒池上異步執行。
您可以通過以下方式獲得此類服務:
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService創建實體后,您可以開始提交要執行的專案(請求),如下所示:
executorService.schedule(
() -> actualApi.requestHandler(req),
delay,
unit
);
同時,使用直接呼叫想要導致 aCompletableFuture<Integer>但會導致 a ScheduledFuture<CompletableFuture<Integer>>,您必須阻止才能獲得包裝結果。
相反,您需要在內部阻止最終請求結果,ScheduledExecutorService然后將最終請求結果包裝在complete 中ComppletableFuture:
public <T> CompletableFuture<T> scheduleCompletableFuture(
final CompletableFuture<T> command,
final long delay,
final TimeUnit unit) {
final CompletableFuture<T> completableFuture = new CompletableFuture<>();
this.executorService.schedule(
(() -> {
try {
return completableFuture.complete(command.get());
} catch (Throwable t) {
return completableFuture.completeExceptionally(t);
}
}),
delay,
unit
);
return completableFuture;
}
下面是您的實施的審查版本:
public class Sandbox2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MockApi mockApi = new MockApi();
CompletableFuture<Integer> result1 = mockApi.requestAThing("Req1");
CompletableFuture<Integer> result2 = mockApi.requestAThing("Req2");
CompletableFuture<Integer> result3 = mockApi.requestAThing("Req3");
System.out.println("Result1: " result1.get());
System.out.println("Result2: " result2.get());
System.out.println("Result3: " result3.get());
}
public static class MockApi {
private final AtomicLong delay = new AtomicLong(0);
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
public CompletableFuture<Integer> requestAThing(String req1) {
return this.scheduleCompletableFuture(new ActualApi().requestHandler(req1), delay.incrementAndGet(), TimeUnit.SECONDS);
}
public <T> CompletableFuture<T> scheduleCompletableFuture(
final CompletableFuture<T> command,
final long delay,
final TimeUnit unit) {
final CompletableFuture<T> completableFuture = new CompletableFuture<>();
this.executorService.schedule(
(() -> {
try {
return completableFuture.complete(command.get());
} catch (Throwable t) {
return completableFuture.completeExceptionally(t);
}
}),
delay,
unit
);
return completableFuture;
}
}
public static class ActualApi {
public CompletableFuture<Integer> requestHandler(String request) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Integer.parseInt(request.substring(3));
});
}
}
}
JDK 9 及更高版本
如果您使用的是JDK 9版本,則可以使用支持的延遲Executor:
CompletableFuture<String> future = new CompletableFuture<>();
future.completeAsync(() -> {
try {
// do something
} catch(Throwable e) {
// do something on error
}
}, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
MockApi#requestAThing然后你會更干凈,更短,你不再需要自定義ScheduledExecutor:
public static class MockApi {
private final AtomicLong delay = new AtomicLong(0);
public CompletableFuture<Integer> requestAThing(String req1) {
CompletableFuture<Void> future = new CompletableFuture<>();
return future.completeAsync(() -> null, CompletableFuture.delayedExecutor(delay.incrementAndGet(), TimeUnit.SECONDS))
.thenCombineAsync(new ActualApi().requestHandler(req1), (nil, result) -> result);
}
// ...
}
uj5u.com熱心網友回復:
您可能會考慮使用bucket4j
uj5u.com熱心網友回復:
我找到了一種方法來產生我想要的行為。通過將我的 Executor 限制為單個執行緒,我可以將呼叫排隊,它們將按照我將它們排隊的順序進行。
我將在下面為任何感興趣的人提供我的模擬課程的代碼:
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Sandbox2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MockApi mockApi = new MockApi();
CompletableFuture<Integer> result1 = mockApi.requestAThing("Req1");
System.out.println("Request1 queued up");
CompletableFuture<Integer> result2 = mockApi.requestAThing("Req2");
System.out.println("Request2 queued up");
CompletableFuture<Integer> result3 = mockApi.requestAThing("Req3");
System.out.println("Request3 queued up");
//Some other logic happens here
Thread.sleep(10000);
System.out.println("Result1: " result1.get());
System.out.println("Result2: " result2.get());
System.out.println("Result3: " result3.get());
System.exit(0);
}
public static class MockApi {
ActualApi actualApi = new ActualApi();
private ExecutorService executorService = Executors.newSingleThreadExecutor();
;
public CompletableFuture<Integer> requestAThing(String req1) {
CompletableFuture<Integer> completableFutureCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("Waiting with " req1);
waitForTheRightMoment(new Random().nextInt(1000) 1000);
System.out.println("Done Waiting with " req1);
return actualApi.requestHandler(req1).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}, executorService);
return completableFutureCompletableFuture;
}
private void waitForTheRightMoment(int time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static class ActualApi {
public CompletableFuture<Integer> requestHandler(String request) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(new Random().nextInt(1000) 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Request Handled " request);
return Integer.parseInt(request.substring(3));
});
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/345417.html
