我目前正在使用wsdl-to-java生成異步客戶端代碼,該代碼被用于查詢一個 SOAP Web 服務。下面是生成的異步方法的一個片段:
@WebMethod(operatingName = "GetSession")
public Future<? > getSessionAsync(
@WebParam(partName = "引數", name = "GetSessionRequest")
mynamespace.GetSessionRequest引數。
@WebParam(partName = "ResponseHeader", mode = WebParam.Mode.OUT, name = "ResponseHeader", header = true)
javax.xml.ws.Holder<mydatacontract.ResponseHeader> responseHeader。
@WebParam(name = "asyncHandler", targetNamespace = "")
AsyncHandler<myservice.GetSessionResponse> asyncHandler
);
我在一個封裝類中呼叫上述生成的代碼:
getSession(GetSessionRequest request) {
Future<?> response = generatedClient.getSessionAsync(request, responseHeader, handler)
}
handler(Response<GetSessionResponse> response) {
//在這一點上沒有訪問SOAP XML的權限? }
根據我的理解,生成的代碼負責序列化/反序列化,我無法訪問原始的SOAP回應。有一些方法可以記錄 SOAP XML 回應,如此處所述,但我需要在代碼中訪問該回應,因為該回應需要被轉儲到資料庫中。
是否有任何方法可以在處理程式中訪問這一點而不觸及生成的客戶端代碼?
更新:
我能夠讀取ResponseContext,其型別為java.util.Map<String, Object>。但是這并沒有回傳我所尋找的原始 SOAP XML。
此外,使用入站的 攔截器將意味著我失去呼叫函式的背景關系。這對于在資料庫中存盤與每個呼叫相關的 XML 回應是必需的。
更新2:
Future 回傳一個型別為 Response 的物件,可以在 Response.java 找到。該jdoc說明如下:
interface provides methods used to obtain the payload and context of a message sent.
回應操作呼叫而發送的訊息的有效載荷和背景關系的方法。
呼叫的訊息的有效載荷和背景關系。
然而,我只能檢索到Context,而沒有訪問有效載荷的屬性。
我找到了一個 SO 答案,其中有一個針對 Axis 的解決方案這里。在 cxf 中是否有可能有類似的東西?
uj5u.com熱心網友回復:
看看這個代碼片段(來源: https://github.com/apache/cxf/blob/master/core/src/main/java/org/apache/cxf/interceptor/LoggingInInterceptor.java)
import org.apache.cxf.message.Message。
...
protected void logging(Logger記錄器,Message訊息) {
if (message.containsKey(LoggingMessage.ID_KEY)) {
return;
}
String id = (String)message.getExchange().get(LoggingMessage.ID_KEY)。
if (id == null) {
id = LoggingMessage.nextId();
message.getExchange().put(LoggingMessage.ID_KEY, id)。
}
message.put(LoggingMessage.ID_KEY, id)。
final LoggingMessage buffer<
= new LoggingMessage("Inbound Message
----------------------------", id);
if(!Boolean.TRUE.equals(message.deCOUPLED_CHANNEL_MESSAGE)) {
//避免記錄解耦回應的默認回應代碼200。
Integer responseCode = (Integer)message.get(Message.RESPONSE_CODE)。
if (responseCode != null) {
buffer.getResponseCode().append( responseCode)。
}
}
String encoding = (String)message.get(Message.ENCODING)。
if (encoding != null) {
buffer.getEncoding().append(encoding)。
}
String httpMethod = (String)message.get(Message.HTTP_REQUEST_METHOD) 。
if (httpMethod != null) {
buffer.getHttpMethod().append(httpMethod)。
}
String ct = (String)message.get(Message.CONTENT_TYPE)。
if (ct != null) {
buffer.getContentType().append(ct)。
}
Object headers = message.get(Message.PROTOCOL_HEADERS)。
if (headers != null) {
buffer.getHeader().append(headers)。
}
String uri = (String)message.get(Message.REQUEST_URL);
if (uri == null) {
String address = (String)message.get(Message.ENDPOINT_ADDRESS)。
uri = (String)message.get(Message.REQUEST_URI);
if (uri != null && uri.startsWith("/") {
if (address != null && !address.startedWith(uri)) {
if (address.endWith("/") && address.length() > 1) {
address = address.substring(0, address.length() - 1) 。
}
uri = address uri;
}
} else {
uri = 地址。
}
}
if (uri != null) {
buffer.getAddress().append(uri)。
String query = (String) message.get(Message.QUERY_STRING);
if (query != null) {
buffer.getAddress().append('? ').append(query)。
}
}
if (!isShowBinaryContent() && isBinaryContent(ct) ) {
buffer.getMessage().append(BINARY_CONTENT_MESSAGE).append('
')。
log(logger, buffer.toString())。
return;
}
if (!isShowMultipartContent() && isMultipartContent(ct) ) {
buffer.getMessage().append(MULTIPART_CONTENT_MESSAGE).append('
')。
log(logger, buffer.toString())。
return;
}
//// 重要的一行
。 InputStream is = message.getContent(InputStream.class)。
////
if (is != null) {
logInputStream(message, is, buffer, encoding, ct)。
} else {
Reader reader = message.getContent(Reader.class)。
if (reader != null) {
logReader(message, reader, buffer)。
}
}
log(logger, formatLoggingMessage(buffer))。
}
因此,你可以創建一個攔截器并使用org.apache.cxf.message.Message
uj5u.com熱心網友回復:
問題:SOAP訊息可以被攔截,但在與原始呼叫 場地。要將 SOAP 訊息傳回原始呼叫站點是很困難的,尤其是在 特別是在多執行緒或異步環境中。
我所看到的唯一解決方案是明確規定每個請求只有一個 JAX-WS 代理,它有一個處理程式。 請求。一個應用程式只有一個代理,這將是一個瓶頸,因此它需要使用 多執行緒工具來實作并行和異步執行。
。下面是我的想法,用代碼表示。首先,我將一步一步地完成它,最后是所有代碼的轉儲。
更新:我已經用一個靜態的ThreadLocal<SoapApiWrapper>替換了LinkedBlockingQueen。
實體,并且用newWorkStealingPool()替換了執行器。
我已經將其設定為使用http://www.dneonline.com/calculator.asmx。它可以編譯和運行,但我還沒有 花了很多時間來確保它能正確作業或達到最佳狀態。我確信有一些問題(我的CPU風扇 即使我沒有運行代碼,我的CPU風扇也在努力作業)。) 請注意!
(有沒有人知道一個更好的公共 SOAP API,我可以在本地運行或用請求淹沒它?
(有沒有人知道一個更好的公共 SOAP API,我可以在本地運行或用請求淹沒它?)
如果你想測驗,這里有一些公共的SOAP APIs。
https://documenter.getpostman.com/view/8854915/Szf26WHn
一步一步來
實作SOAPHandler類,該類將捕獲訊息,稱為SoapMessageHandler。
public class SoapMessageHandler implements SOAPHandler< SOAPMessageContext> {
//在一個串列中捕獲訊息。
private final List<SOAPMessageContext> messages = new ArrayList<> ()。
// get & clear messages
public List<SOAPMessageContext> collectMessages() {
var m = new ArrayList<>(messages)。
messages.clear()。
return m。
}
@Override
public boolean handleMessage(SOAPMessageContext context) {
messages.add(context); //collect message
return true。
}
@Override; }
public boolean handleFault(SOAPMessageContext context) {
messages.add(context); //collect error
return true。
}
定義一個SoapApiWrapper類,它
- 創建一個
SoapMessageHandler, - 創建一個 JAX-WS 代理,
- 并添加處理程式。
- 并將處理程式添加到代理中。
class SoapApiWrapper {
//1.創建一個處理程式。
private final SoapMessageHandler soapMessageHandler = new SoapMessageHandler() 。
private final CalculatorSoap connection;
public SoapApiWrapper() {
//2.創建一個連接。
var factoryBean = new JaxWsProxyFactoryBean() 。
factoryBean.setAddress("http://www.dneonline.com/calculator.asmx");
factoryBean.setServiceClass(CalculatorSoap.class)。
//3.添加處理程式。
factoryBean.setHandlers(Collections.singletonList(soapMessageHandler))。
connection = factoryBean.create(CalculatorSoap.class)。
}
}
定義一個SoapApiManager,它具有
- 一個
ExecutorService,它將管理SOAP請求和回應 。
- 一個
ThreadLocal<SoapApiWrapper>,所以每個執行緒都有一個JAX-WS Proxy(理念 from https://stackoverflow.com/a/16680215/4161471)
public class SoapApiManager {
//1.請求執行者
private static final ExecutorService executorService = Executors. newWorkStealingPool(THREAD_LIMIT)。
private static final ThreadLocal<SoapApiWrapper> soapApiWrapper = ThreadLocal.withInitial(SoapApiWrapper::new)。
}
SoapApiManager有一個方法,submitRequest(...)。它將回傳SOAP API回應**和**的SOAP訊息。
和**的SOAP訊息。
public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT> > submitRequest(
SoapRequestRunner<ResponseT> requestRunner
) {
//...。
}
引數是一個SoapRequestRunner,一個接受JAX-WS Proxy并回傳SOAP
回應。
@FunctionalInterface
interface SoapRequestRunner< ResponseT> {
ResponseT sendRequest(CalculatorSoap calculatorSoap)。
}
當被呼叫時,submitRequest(...)執行以下操作:
- 封裝
submitRequest(...)。 - 用
CompleteableFuture.supplyAsync(...)包裝SoapRequestRunner,并使用 我們的ExectutorService。
- 從
ThreadLocal獲取一個SoapApiWrapper, - 呼叫SOAP API(通過應用
SoapRequestRunner到SoapApiWrapper的JAX-WS 代理) - 等待SOAP的結果,
SoapApiWrapper的SOAPHandler中提取SOAP訊息,SoapResponseHolder public <ResponseT> CompletableFuture<SoapResponseHolder< ResponseT>> submitRequest(
SoapRequestRunner<ResponseT> requestRunner
) { //1. 使用CompletableFuture & executorService。
return CompletableFuture.supplyAsync(createRequestCall(requestRunner), executorService) 。
}
private <ResponseT> Supplier<SoapResponseHolder<ResponseT>> createRequestCall (
SoapRequestRunner<ResponseT> requestRunner
) {
return ( ) -> {
SoapApiWrapper api = null。
try {
api = soapApiWrapperQueue.get(); // 2.取一個API包裝器。
var response = requestRunner.sendRequest(api.connection); // 3& 4. request & response
var messages = api.soapMessageHandler.collectMessages(); // 5. 提取原始SOAP訊息
return new SoapResponseHolder<>(response, messages); // 6. bundle into DTO
} catch (InterruptedException e) {
throw new RuntimeException(e)。
} finally {
if (api != null) {
soapApiWrapperQueue.offer(api)。
}
}
};
}
示例使用方法
public class Main {
public static void main(String[] args){
SoapApiManager apiManager = new SoapApiManager() 。
apiManager
.submitRequest((soapApi) -> soapApi.add(5, 4)
.thenAccept(response -> {
//我們可以得到SOAP API回應。
var sum = response.getResponse();
//也是截獲的訊息!
var messages = response.getMessages() 。
var allXml = messages.stream().map(Main::getRawXml).collectors.join("
---
"))。)
System.out.println("sum: "/span> sum ",
" allXml)。)
});
}
public static String getRawXml(SOAPMessageContext context) {
try {
ByteArrayOutputStream byteOS = new ByteArrayOutputStream() 。
context.getMessage().writeTo(byteOS)。
return byteOS.toString(StandardCharsets.UTF_8)。
} catch (SOAPException | IOException e) {
throw new RuntimeException(e)。
}
}
輸出
sum: 105,
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
<soap:Body>
<Add xmlns="http://tempuri.org/">
<intA>73</intA>
<intB>32</intB>
</Add>
</soap:Body>
</soap:Envelope>
---
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >
<soap:Header/>
<soap:Body>
<AddResponse xmlns="http://tempuri.org/">
<AddResult>105</AddResult>
</AddResponse>
</soap:Body>
</soap:Envelope>
所有代碼
下面是一個作業實體,包括對回應的驗證。它創建了大量(REQUESTS_COUNT)的請求,并將它們全部提交給SoapApiManager。
每個請求都會列印出執行緒的名稱,以及JAX-WS代理的哈希代碼(我想檢查一下
它們被重復使用),以及基本的輸入/輸出(例如:-9 - 99 = -108)。
還有驗證以確保每個 SoapResponseHolder 都有正確的結果和原始 SOAP
訊息,并且發送了正確數量的請求。
Main.java
import com.github.underscore.lodash.Xml。
import com.github.underscore.lodash.Xml.XmlStringBuilder.Step。
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.xml.soap.SOAPException;
import javax.xml.ws.handler.soap.SOAPMessageContext;
public class Main implements AutoCloseable{
private final SoapApiManager apiManager = new SoapApiManager()。
private static final int THREAD_COUNT = 4;
private static final int REQUESTS_COUNT = 500;
private final AtomicInteger i = new AtomicInteger() 。
public static void main(String[] args)
throws InterruptedException {
try (var m = new Main()) {
m.run()。
}
}
private void run() throws InterruptedException {
var executor = Executors.newFixedThreadPool(THREAD_COUNT);
var tasks = Stream.generate(() -> Map.entry(randomInt(), randomInt() )
.limit(REQUESTS_COUNT)
.map(intA -> (Callable<Boolean> ) () -> {
sendAndValidateRequest(intA.getKey(), intA.getValue())。
i.incrementAndGet()。
return true。
})
.collect(Collectors.toList())。
執行器.invokeAll(tasks)。
var waiter = Executors.newSingleThreadScheduledExecutor()。
waiter.scheduleWithFixedDelay(
() -> {
var size = i.get()。
System.out.println(">waiting... (size" size ")")。)
if (size >= REQUESTS_COUNT) {
System.out.println(">完成等待!" size)。
waiter.shutdownNow()。
}
},
3, 3, TimeUnit.SECONDS
);
System.out.println("完成發送任務" waiter.awaitTermination(10, TimeUnit.SECONDS) )。
waiter.shutdownNow()。
Thread.sleep(TimeUnit.SECONDS.toMillis(5))。
Executor.shutdown()。
系統.out.println(
"executor.awaitTermination" executor.awaitTermination(10, TimeUnit.SECONDS) )。
if (!executor.isTerminated() ) {
System.out.println("executor.shutdownNow" executor.shutdownNow())。
}
if (i.get() != REQUESTS_COUNT) {
throw new RuntimeException(
"測驗沒有執行" REQUESTS_COUNT " 次,實際:" i.get()
);
}
}
private int randomInt() {
return ThreadLocalRandom.current().nextInt(-100, 100) 。
}
private void sendAndValidateRequest(int a, int b) {
apiManager
.submitRequest((soapApi) -> {
var response = soapApi.add(a, b)。
系統輸出列印輸出(
" [%-12s / %-18s] M %s = = M
"。
soapApi.hashCode()。
Thread.currentThread().getName()。
a,
(b >= 0 ? " "/span> : "-"/span>)。)
Math.abs(b)。
回應
);
return 回應。
})
.thenAcceptAsync(response -> {
var sum = response.getResponse();
var messages = response.getMessages() 。
var allXml = messages.stream() .map(Main::getRawXml)
.collect(Collectors.join("
---
"))。)
if (sum != a b) {
throw new RuntimeException(
"Bad sum, sent " a " " b , result: " sum ", xml: " allXml
);
}
if (messages.size() != 2) {
throw new RuntimeException(
"壞訊息,預計1個請求和1個回應,但得到" messages.size()
", xml: "/span> allXml
);
}
if (!allXml.contains("<AddResult>" (a b) "< /AddResult>")) {
throw new RuntimeException(
"結果不好,不包含AddResult=" (a b) ",實際:" allXml
);
}
});
}
public static String getRawXml(SOAPMessageContext context) {
try(var byteOS = new ByteArrayOutputStream() ) {
context.getMessage().writeTo(byteOS)。
var rawSoap = byteOS.toString(StandardCharsets.UTF_8)。
return Xml.formatXml(rawSoap, Step.TWO_SPACES)。
} catch (SOAPException | IOException e) {
throw new RuntimeException(e)。
}
}
@Override; }
public void close() {
apiManager.close()。
}
}
SoapApiManager.java
import java.util.Collections。
import java.util.List;
import java.util.concurrent.BlockingQueue。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import javax.xml.ws.handler.soap.SOAPMessageContext;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.tempuri.CalculatorSoap;
public class SoapApiManager implementsAutoCloseable {
private static final int THREAD_LIMIT = Math. min(Runtime.getRuntime().availableProcessors(), 5) 。
private static final ExecutorService executorService = Executors. newWorkStealingPool(THREAD_LIMIT)。
private static final ThreadLocal<SoapApiWrapper> soapApiWrapper = ThreadLocal.withInitial(SoapApiWrapper::new)。
@Override
public void close() {
executorService.shutdown()。
}
private static class SoapApiWrapper {
private final CalculatorSoap connection;
private final SoapMessageHandler soapMessageHandler = new SoapMessageHandler() 。
public SoapApiWrapper() {
var factoryBean = new JaxWsProxyFactoryBean() 。
factoryBean.setAddress("http://www.dneonline.com/calculator.asmx");
factoryBean.setServiceClass(CalculatorSoap.class)。
factoryBean.setHandlers(Collections.singletonList(soapMessageHandler))。
connection = factoryBean.create(CalculatorSoap.class)。
}
}
public <ResponseT> CompletableFuture<SoapResponseHolder< ResponseT>> submitRequest(
SoapRequestRunner<ResponseT> requestRunner
) {
return CompletableFuture.supplyAsync(createRequestCall(requestRunner), executorService)。
}
private <ResponseT> Supplier<SoapResponseHolder<ResponseT>> createRequestCall (
SoapRequestRunner<ResponseT> requestRunner
) {
return ( ) -> {
SoapApiWrapper api = soapApiWrapper.get()。
var response = requestRunner.sendRequest(api.connection)。
var messages = api.soapMessageHandler.collectMessages()。
return new SoapResponseHolder<>(response, messages)。
};
}
@FunctionalInterface
interface SoapRequestRunner<ResponseT> {
ResponseT sendRequest(CalculatorSoap calculatorSoap)。
}
public static class SoapResponseHolder< ResponseT> {
private final List<SOAPMessageContext> messages;
private final ResponseT response;
SoapResponseHolder(
回應T回應。
List<SOAPMessageContext> messages
) {
this.response = response;
this.messages = messages;
}
public ResponseT getResponse() {
return 回應。
}
public List<SOAPMessageContext> getMessages() {
return messages;
}
}
SoapMessageHandler.java
package org.example;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.xml.namespace.QName。
import javax.xml.ws.handler.MessageContext;
import javax.xml.ws.handler.soap.SOAPHandler;
import javax.xml.ws.handler.soap.SOAPMessageContext;
public class SoapMessageHandler implements SOAPHandler< SOAPMessageContext> {
private final List<SOAPMessageContext> messages = new ArrayList<>()。
public List<SOAPMessageContext> collectMessages() {
var m = new ArrayList<>(messages)。
messages.clear()。
return m。
}
@Override
public Set<QName> getHeaders() {
return Collections.emptySet()。
}
@Override
public boolean handleMessage(SOAPMessageContext context) {
messages.add(context)。
return true。
}
@Override; }
public boolean handleFault(SOAPMessageContext context) {
messages.add(context)。
return true。
}
@Override; }
public void close(MessageContext context) {
}
}
build.gradle.kts
plugins {
java
id("com.github.bjornvester.wsdl2java") version"1.2"。
}
group = "org.example"/span>
版本 = "1.0-SNAPSHOT"/span>
資源庫 {
mavenCentral()
}
依賴性 {
implementation(enforcedPlatform("org.apache.cxf:cxf-bom:3.4.4")
implementation("org.apache.cxf:cxf-core")
implementation("org.apache.cxf:cxf-rt-frontend-jaxws")
implementation("org.apache.cxf:cxf-rt-transports-http")
implementation("org.apache.cxf:cxf-rt-databinding-jaxb")
// implementation("org.apache.cxf:cxf-rt-transports-http-jetty")
implementation("org.apache.cxf:cxf-rt-transports-http-hc")
implementation("com.sun.activation:javax.activation:1.2.0")
實作("javax.annotation:javax.annotation-api:1.3.2")
implementation("com.sun.xml.messaging.saaj:saaj-impl:1.5.1")
implementation("com.github.javadev:underscore:1.68")
//<editor-fold desc="JAXB" >
implementation("org.jvnet.jaxb2_commons:jaxb2-basics-runtime:1.11.1"/span>)
xjcPlugins("org.jvnet.jaxb2_commons:jaxb2-basics:1.11.1")
//</editor-fold>
//<editor-fold desc="Test">
testImplementation(forcedPlatform("org.junit:junit-bom:5.7.2"/span>) //JUnit 5 BOM。
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.0" )
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
//</editor-fold>。
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(11)
}
}
wsdl2java {
cxfVersion.set("3.4.4"/span>)
options.addAll("-xjc-Sequals", "-xjc-XhashCode")
}
tasks.test {
useJUnitPlatform()
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/312425.html
標籤:
