我認為自己對 asio 有相當的經驗,但無法弄清楚如何正確執行async_read和async_write在boost::asio::ssl::stream<boost::asio::ip::tcp::socket>. 我創建了以下最小示例https://github.com/ladnir/asio-ssl-stackoverflow,我將在下面進行解釋。
我的目標很簡單,在ssl_stream. 檔案很清楚,您需要從我所做的一個鏈中執行async_readandasync_write 呼叫。
我的設定是有一個io_context多執行緒。資料在套接字上不斷發送和接收。發送和接收資料都有自己的回呼鏈。在每個的完成處理程式中,我只是安排另一個發送或接收操作。所有這些都是在一個鏈中執行的。下面是代碼的主要部分
std::function<void(bool, ssl::stream<tcp::socket>&, io_context::strand&, u64)> f =
[&](bool send, ssl::stream<tcp::socket>& sock, io_context::strand& strand, u64 t) {
strand.dispatch([&, send, t]() {
std::vector<u8> buffer(10000);
auto bb = mutable_buffer(buffer.data(), buffer.size());
auto callback = [&, send, t, buffer = std::move(buffer), moveOnly = std::unique_ptr<int>{}](boost::system::error_code error, std::size_t n) mutable {
if (error) {
std::cout << error.message() << std::endl;
std::terminate();
}
// perform another operation or complete.
if (t)
f(send, sock, strand, t - 1);
else
--spinLock;
};
if (send)
async_write(sock, bb, std::move(callback));
else
async_read(sock, bb, std::move(callback));
});
};
然后為服務器和客戶端套接字啟動發送和接收回呼鏈。
// launch our callback chains.
f(true, srvSocket, srvStrand, trials);
f(false, srvSocket, srvStrand, trials);
f(true, cliSocket, cliStrand, trials);
f(false, cliSocket, cliStrand, trials);
似乎盡管使用了 strand,但 OpenSSL 內部的某些內容并未以執行緒安全的方式執行。當我運行代碼時,有時會出現解密失敗,有時它只是在 OpenSSL 的某個地方崩潰。
如果我使用tcp::socket此代碼可以正常作業。如果我制作io_context單執行緒,那么它作業正常。我已經在 ubuntu 和 windows 上測驗過了。
從相關問題(例如this )看來,只要您將其包裹在一條鏈中,全雙工就應該可以作業。
有誰看到我做錯了什么?也許在另一個已經安排好的情況下執行async_read/是不安全的?async_write
Sehe 解決方案的注意事項:在將我的代碼與 Sehe 的解決方案進行比較后,我確定我的代碼存在一個主要錯誤。的執行背景關系ssl::stream<tcp::socket>是多執行緒io_context。因此,不能保證ssl::stream在我的鏈上執行作業。而且,怎么可能?不知道我的ssl::stream鏈,并且僅在初始呼叫期間恰好在其上執行。當ssl::stream從底層套接字回呼時,它將/可能會安排更多的讀/寫操作,但不會在我選擇的鏈上。
唯一可以保證的是ssl::stream它在構造它的執行背景關系中。一個簡單的解決方法是使用 strand 作為執行背景關系來構造流。
我更新了上面的 github repo 以包含我的解決方案。
謝謝,Sehe的洞察力。
uj5u.com熱心網友回復:
這是那些日子之一。找不到您的代碼的問題,我感到非常愚蠢。即使使用簡化的代碼(簡化的方式),我也不斷遇到釋放后使用和堆疊抖動的變體。
在我看來,明顯的罪魁禍首是在移動向量后使用向量成員資料。我無法證明這一點(手動斷言值data()并size()在整個移動程序中保持穩定并沒有失敗),但請參見下文。
所以...我去了第一方,只是按照我寫的方式寫了整個程式。也就是說,
- 使用執行器而不是服務參考 顯式處理程式系結
asio::strand<>而不是io_context::strand(已棄用)asio::thread_pool而不是io_context加上原始執行緒(和作業警衛)
我認為這些都不重要,但它確實簡化了代碼。
在做心智體操以知道是否function<>可以擁有自己的副本時,我決定它需要共享所有權。
auto make_loop = [&](auto& stream, bool sending) {
auto shared_loop = std::make_shared<std::function<void()>>();
auto port = stream.lowest_layer().remote_endpoint().port();
auto mode = [=](auto t) {
return std::string(sending ? "write" : "read")
" t=" std::to_string(t) " port:" std::to_string(port);
};
auto s = std::make_shared<sentinel>("shared_loop " mode(trials));
*shared_loop = [wl = /*std::weak_ptr*/(shared_loop), mode, sending, &stream,
t = trials, s_ = s]() mutable {
if (!t--) {
return;
}
auto data = std::make_shared<std::vector<uint8_t>>(10'000);
auto buf = asio::buffer(*data);
auto handler = [sl = my_lock(wl), mode, data_ = std::move(data), t] //
(error_code ec, size_t n) {
trace("Handler ", mode(t), " n=", n, " ", ec.message());
if (t && sl)
(*sl)();
};
trace("Initiating ", mode(t));
if (sending)
async_write(stream, buf, handler);
else
async_read(stream, buf, handler);
};
return *shared_loop;
};
post(srv.get_executor(), make_loop(srv, true));
post(srv.get_executor(), make_loop(srv, false));
post(cli.get_executor(), make_loop(cli, true));
post(cli.get_executor(), make_loop(cli, false));
好訊息是,這個程式運行沒有問題(在 ASAN UBSAN 下):
(幾乎)住在科利魯
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <iostream>
namespace asio = boost::asio;
namespace ssl = asio::ssl;
using asio::ip::tcp;
using ssl::context;
using stream = ssl::stream<tcp::socket>;
using boost::system::error_code;
static constexpr auto trials = 10;
static inline void trace(auto const&... args) {
static std::mutex mx;
std::lock_guard lk(mx);
(std::cout << ... << args) << std::endl;
}
struct sentinel {
std::string msg;
~sentinel() {
trace("~sentinel: ", msg);
} // trace when shared loop is freed
};
template <typename... Ts> static inline auto my_lock(std::shared_ptr<Ts...> sp) { return sp; }
template <typename... Ts> static inline auto my_lock(std::weak_ptr<Ts...> wp) { return wp.lock(); }
int main() try {
asio::thread_pool ioc;
ssl::context sctx{ssl::context::tlsv13_server};
ssl::context cctx{ssl::context::tlsv13_client};
sctx.set_default_verify_paths();
sctx.set_password_callback([](auto&&...) { return "test"; });
sctx.use_certificate_file("server.pem", context::pem);
sctx.use_private_key_file("server.pem", context::pem);
tcp::acceptor acc(make_strand(ioc), tcp::v4());
trace("before set_option");
acc.set_option(tcp::acceptor::reuse_address(true));
acc.bind({{}, 7878});
acc.listen();
trace("listening");
stream srv(acc.get_executor(), sctx);
auto fut = std::async([&] {
acc.accept(srv.lowest_layer());
trace("Handshaking ", srv.lowest_layer().remote_endpoint());
srv.handshake(stream::server);
});
stream cli(make_strand(ioc), cctx);
cli.lowest_layer().connect({{}, 7878});
cli.handshake(stream::client);
fut.get();
trace("connected");
auto make_loop = [&](auto& stream, bool sending) {
auto shared_loop = std::make_shared<std::function<void()>>();
auto port = stream.lowest_layer().remote_endpoint().port();
auto mode = [=](auto t) {
return std::string(sending ? "write" : "read")
" t=" std::to_string(t) " port:" std::to_string(port);
};
auto s = std::make_shared<sentinel>("shared_loop " mode(trials));
*shared_loop = [wl = /*std::weak_ptr*/(shared_loop), mode, sending, &stream,
t = trials, s_ = s]() mutable {
if (!t--) {
return;
}
auto data = std::make_shared<std::vector<uint8_t>>(10'000);
auto buf = asio::buffer(*data);
auto handler = [sl = my_lock(wl), mode, data_ = std::move(data), t] //
(error_code ec, size_t n) {
trace("Handler ", mode(t), " n=", n, " ", ec.message());
if (t && sl)
(*sl)();
};
trace("Initiating ", mode(t));
if (sending)
async_write(stream, buf, handler);
else
async_read(stream, buf, handler);
};
return *shared_loop;
};
post(srv.get_executor(), make_loop(srv, true));
post(srv.get_executor(), make_loop(srv, false));
post(cli.get_executor(), make_loop(cli, true));
post(cli.get_executor(), make_loop(cli, false));
trace("waiting");
ioc.join();
sentinel atdone{"done"};
} catch(boost::system::system_error const& se) {
std::cout << se.what() << " from " << se.code().location() << "\n";
}
出于演示目的在本地運行它:https ://i.imgur.com/q5e7ENI.mp4
麻煩
有兩個值得注意的問題:
如您所見,共享回圈功能已泄漏。這是因為它回圈地堅持自己。如果我們試圖通過使用weak_ptr(例如注釋)來打破回圈,那么異步鏈將無法在第一次迭代之后繼續進行。我認為這里沒有解決方案,除了將所有權完全移出處理程式。
如果在我的示例中,我將緩沖區替換
std::vector為您所擁有的緩沖區:auto data = std::vector<uint8_t>(10'000); auto buf = asio::buffer(data);(無需進一步更改),ASAN 將反對 heap-use-after-free,無論資料地址應該有多穩定......我不會打賭
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/520786.html
