主頁 > 軟體工程 > 服務器在發送大量資料時被中斷

服務器在發送大量資料時被中斷

2022-04-06 16:09:36 軟體工程

當我優雅地關閉連接到它的客戶端時,我的服務器崩潰了,而客戶端正在接收大量資料。我正在考慮一個可能的終身錯誤,就像 boost ASIO 中的大多數錯誤一樣,但是我自己無法指出我的錯誤。

每個客戶端與服務器建立 2 個連接,其中一個用于同步,另一個是長連接,用于接收持續更新。在“同步階段”客戶端接收大資料以與服務器狀態同步(“狀態”基本上是 JSON 格式的 DB 資料)。同步后,同步連接關閉。客戶端通過另一個連接接收對資料庫的更新(與“同步資料”相比,這些資料當然是非常小的資料)。

這些是相關檔案:

連接.h

#pragma once

#include <array>
#include <memory>
#include <string>
#include <boost/asio.hpp>

class ConnectionManager;

/// Represents a single connection from a client.
class Connection : public std::enable_shared_from_this<Connection>
{
public:
  Connection(const Connection&) = delete;
  Connection& operator=(const Connection&) = delete;

  /// Construct a connection with the given socket.
  explicit Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager);

  /// Start the first asynchronous operation for the connection.
  void start();

  /// Stop all asynchronous operations associated with the connection.
  void stop();

  /// Perform an asynchronous write operation.
  void do_write(const std::string& buffer);

  int getNativeHandle();

  ~Connection();

private:
  /// Perform an asynchronous read operation.
  void do_read();

  /// Socket for the connection.
  boost::asio::ip::tcp::socket socket_;

  /// The manager for this connection.
  ConnectionManager& connection_manager_;

  /// Buffer for incoming data.
  std::array<char, 8192> buffer_;

  std::string outgoing_buffer_;
};

typedef std::shared_ptr<Connection> connection_ptr;

連接.cpp

#include "connection.h"

#include <utility>
#include <vector>
#include <iostream>
#include <thread>

#include "connection_manager.h"

Connection::Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager)
    : socket_(std::move(socket))
    , connection_manager_(manager)
{
}

void Connection::start()
{
  do_read();
}

void Connection::stop()
{
  socket_.close();
}

Connection::~Connection()
{
}

void Connection::do_read()
{
  auto self(shared_from_this());
  socket_.async_read_some(boost::asio::buffer(buffer_), [this, self](boost::system::error_code ec, std::size_t bytes_transferred) {
        if (!ec) {
            std::string buff_str = std::string(buffer_.data(), bytes_transferred);
            const auto& tokenized_buffer = split(buff_str, ' ');
            
            if(!tokenized_buffer.empty() && tokenized_buffer[0] == "sync") {
                /// "syncing connection" sends a specific text
                /// hence I can separate between sycing and long-lived connections here and act accordingly.

                const auto& exec_json_strs = getExecutionJsons();
                const auto& order_json_strs = getOrdersAsJsons();
                const auto& position_json_strs = getPositionsAsJsons();
                const auto& all_json_strs = exec_json_strs   order_json_strs   position_json_strs   createSyncDoneJson();
                
                /// this is potentially a very large data.
                do_write(all_json_strs);
            }

            do_read();
        } else {
          connection_manager_.stop(shared_from_this());
        }
      });
}

void Connection::do_write(const std::string& write_buffer)
{
  outgoing_buffer_ = write_buffer;

  auto self(shared_from_this());
  boost::asio::async_write(socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()), [this, self](boost::system::error_code ec, std::size_t transfer_size) {
        if (!ec) {
           /// everything is fine.
        } else {
           /// what to do here?
           /// server crashes once I get error code 32 (EPIPE) here.
        }
      });
}

連接管理器.h

#pragma once

#include <set>
#include "connection.h"

/// Manages open connections so that they may be cleanly stopped when the server
/// needs to shut down.
class ConnectionManager
{
public:
  ConnectionManager(const ConnectionManager&) = delete;
  ConnectionManager& operator=(const ConnectionManager&) = delete;

  /// Construct a connection manager.
  ConnectionManager();

  /// Add the specified connection to the manager and start it.
  void start(connection_ptr c);

  /// Stop the specified connection.
  void stop(connection_ptr c);

  /// Stop all connections.
  void stop_all();

  void sendAllConnections(const std::string& buffer);

private:
  /// The managed connections.
  std::set<connection_ptr> connections_;
};

連接管理器.cpp

#include "connection_manager.h"

ConnectionManager::ConnectionManager()
{
}

void ConnectionManager::start(connection_ptr c)
{
  connections_.insert(c);
  c->start();
}

void ConnectionManager::stop(connection_ptr c)
{
    connections_.erase(c);
    c->stop();
}

void ConnectionManager::stop_all()
{
  for (auto c: connections_)
    c->stop();

  connections_.clear();
}

/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void ConnectionManager::sendAllConnections(const std::string& buffer)
{
  for (auto c: connections_)
      c->do_write(buffer);
}

服務器.h

#pragma once

#include <boost/asio.hpp>
#include <string>
#include "connection.h"
#include "connection_manager.h"

class Server
{
public:
  Server(const Server&) = delete;
  Server& operator=(const Server&) = delete;

  /// Construct the server to listen on the specified TCP address and port, and
  /// serve up files from the given directory.
  explicit Server(const std::string& address, const std::string& port);

  /// Run the server's io_service loop.
  void run();

  void deliver(const std::string& buffer);

private:
  /// Perform an asynchronous accept operation.
  void do_accept();

  /// Wait for a request to stop the server.
  void do_await_stop();

  /// The io_service used to perform asynchronous operations.
  boost::asio::io_service io_service_;

  /// The signal_set is used to register for process termination notifications.
  boost::asio::signal_set signals_;

  /// Acceptor used to listen for incoming connections.
  boost::asio::ip::tcp::acceptor acceptor_;

  /// The connection manager which owns all live connections.
  ConnectionManager connection_manager_;

  /// The *NEXT* socket to be accepted.
  boost::asio::ip::tcp::socket socket_;
};

服務器.cpp

#include "server.h"
#include <signal.h>
#include <utility>

Server::Server(const std::string& address, const std::string& port)
    : io_service_()
    , signals_(io_service_)
    , acceptor_(io_service_)
    , connection_manager_()
    , socket_(io_service_)
{
  // Register to handle the signals that indicate when the server should exit.
  // It is safe to register for the same signal multiple times in a program,
  // provided all registration for the specified signal is made through Asio.
  signals_.add(SIGINT);
  signals_.add(SIGTERM);
#if defined(SIGQUIT)
  signals_.add(SIGQUIT);
#endif // defined(SIGQUIT)

  do_await_stop();

  // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
  boost::asio::ip::tcp::resolver resolver(io_service_);
  boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve({address, port});
  acceptor_.open(endpoint.protocol());
  acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
  acceptor_.bind(endpoint);
  acceptor_.listen();

  do_accept();
}

void Server::run()
{
  // The io_service::run() call will block until all asynchronous operations
  // have finished. While the server is running, there is always at least one
  // asynchronous operation outstanding: the asynchronous accept call waiting
  // for new incoming connections.
  io_service_.run();
}

void Server::do_accept()
{
  acceptor_.async_accept(socket_,
      [this](boost::system::error_code ec)
      {
        // Check whether the server was stopped by a signal before this
        // completion handler had a chance to run.
        if (!acceptor_.is_open())
        {
          return;
        }

        if (!ec)
        {
          connection_manager_.start(std::make_shared<Connection>(
              std::move(socket_), connection_manager_));
        }

        do_accept();
      });
}

void Server::do_await_stop()
{
  signals_.async_wait(
      [this](boost::system::error_code /*ec*/, int /*signo*/)
      {
        // The server is stopped by cancelling all outstanding asynchronous
        // operations. Once all operations have finished the io_service::run()
        // call will exit.
        acceptor_.close();
        connection_manager_.stop_all();
      });
}

/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void Server::deliver(const std::string& buffer)
{
    connection_manager_.sendAllConnections(buffer);
}   

所以,我重復我的問題:當我優雅地關閉連接到它的客戶端時,我的服務器崩潰了,而客戶端正在接收大量資料而我不知道為什么。

編輯:一旦我收到 EPIPE 錯誤,async_write 函式就會發生崩潰。該應用程式是多執行緒的。有 4 個執行緒在生成時使用自己的資料呼叫 Server::deliver。Deliver() 用于使客戶端保持最新狀態,它與初始同步無關:同步是使用從 db 獲取的持久資料完成的。

我有一個 io_service,所以我認為我不需要 strands。io_service::run 在主執行緒上被呼叫,所以主執行緒是阻塞的。

uj5u.com熱心網友回復:

審查,添加一些缺失的代碼位:

namespace /*missing code stubs*/ {
    auto split(std::string_view input, char delim) {
        std::vector<std::string_view> result;
        boost::algorithm::split(result, input,
                                boost::algorithm::is_from_range(delim, delim));
        return result;
    }

    std::string getExecutionJsons()   { return ""; }
    std::string getOrdersAsJsons()    { return ""; }
    std::string getPositionsAsJsons() { return ""; }
    std::string createSyncDoneJson()  { return ""; }
}

現在我注意到的事情是:

  1. 你有一個單一的io_service,所以一個執行緒。好的,所以除非您的其他代碼中有執行緒(main例如?),否則不需要任何執行緒。

  2. 懷疑執行緒在起作用的一個特殊原因是沒有人可以呼叫Server::deliver,因為run()它是阻塞的。這意味著無論何時您deliver()現在呼叫它都會導致資料競爭,從而導致未定義的行為

    隨意的評論

     /// this function is used to keep clients up to date with the changes,
     /// not used during syncing phase.
    

    消除這種擔憂并沒有多大作用。代碼需要防止濫用。評論不會被執行。讓它變得更好:

     void Server::deliver(const std::string& buffer) {
         post(io_context_,
              [this, buffer] { connection_manager_.broadcast(std::move(buffer)); });
     }
    
  3. 在接受“新”之前,您不會檢查先前的寫入是否已完成。這意味著呼叫Connection::do_write導致未定義行為有兩個原因:

    • outgoing_buffer_在使用該緩沖區的正在進行的異步操作期間進行修改是 UB

    • async_write在同一個 IO 物件上有兩個重疊是 UB(請參閱檔案

    解決這個問題的典型方法是使用一個傳出訊息佇列。

  4. usingasync_read_some很少是您想要的,特別是因為讀取不會累積到動態緩沖區中。這意味著,如果您的資料包在意外邊界處分離,您可能根本無法檢測到命令,或者檢測不正確。

    而是考慮asio::async_read_until使用動態緩沖區(例如

    • 直接讀入std::string,因此您不必將緩沖區復制到字串中
    • 讀入streambuf,以便您可以使用std::istream(&sbuf_)決議而不是標記化
  5. all_json_strs顯然必須擁有文本容器的連接是浪費的。相反,使用 const-buffer-sequence 將它們全部組合而不復制。

    Better yet, consider a streaming approach to JSON serialization so not all the JSON needs to be serialized in memory at any given time.

  6. Don't declare empty destructors (~Connection). They're pessimizations

  7. Likewise for empty constructors (ConnectionManager). If you must, consider

    ConnectionManager::ConnectionManager() = default;
    
  8. The getNativeHandle gives me more questions about other code that may interfere. E.g. it may indicate other libraries doing operations, which again can lead to overlapped reads/writes, or it could be a sign of more code living on threads (as Server::run() is by definition blocking)

  9. Connection manager should probably hold weak_ptr, so Connections could eventually terminate. Now, the last reference is by defintion held in the connection manager, meaning nothing ever gets destructed when the peer disconnects or the session fails for some other reason.

  10. This is not idiomatic:

    // Check whether the server was stopped by a signal before this
    // completion handler had a chance to run.
    if (!acceptor_.is_open()) {
        return;
    }
    

    If you closed the acceptor, the completion handler is called with error::operation_aborted anyways. Simply handle that, e.g. in the final version I'll post later:

    // separate strand for each connection - just in case you ever add threads
    acceptor_.async_accept(
        make_strand(io_context_), [this](error_code ec, tcp::socket sock) {
            if (!ec) {
                connection_manager_.register_and_start(
                    std::make_shared<Connection>(std::move(sock),
                                                 connection_manager_));
                do_accept();
            }
        });
    
  11. I notice this comment:

    // The server is stopped by cancelling all outstanding asynchronous
    // operations. Once all operations have finished the io_service::run()
    // call will exit.
    

    In fact you never cancel() any operation on any IO object in your code. Again, comments aren't executed. It's better to indeed do as you say, and let the destructors close the resources. This prevents spurious errors when objects are used-after-close, and also prevents very annoying race conditions when e.g. you closed the handle, some other thread re-opened a new stream on the same filedescriptor and you had given out the handle to a third party (using getNativeHandle)... you see where this leads?

Reproducing The Problem?

Having reviewed this way, I tried to repro the issue, so I created fake data:

    std::string getExecutionJsons()   { return std::string(1024,  'E'); }
    std::string getOrdersAsJsons()    { return std::string(13312, 'O'); }
    std::string getPositionsAsJsons() { return std::string(8192,  'P'); }
    std::string createSyncDoneJson()  { return std::string(24576, 'C'); }

With some minor tweaks to the Connection class:

    std::string buff_str =
        std::string(buffer_.data(), bytes_transferred);
    const auto& tokenized_buffer = split(buff_str, ' ');

    if (!tokenized_buffer.empty() &&
        tokenized_buffer[0] == "sync") {
        std::cerr << "sync detected on " << socket_.remote_endpoint() << std::endl;
        /// "syncing connection" sends a specific text
        /// hence I can separate between sycing and long-lived
        /// connections here and act accordingly.

        const auto& exec_json_strs     = getExecutionJsons();
        const auto& order_json_strs    = getOrdersAsJsons();
        const auto& position_json_strs = getPositionsAsJsons();
        const auto& all_json_strs      = exec_json_strs  
            order_json_strs   position_json_strs  
            createSyncDoneJson();

        std::cerr << "All json length: " << all_json_strs.length() << std::endl;
        /// this is potentially a very large data.
        do_write(all_json_strs); // already on strand!
    }

We get the server outputting

sync detected on 127.0.0.1:43012
All json length: 47104
sync detected on 127.0.0.1:43044
All json length: 47104

And clients faked with netcat:

$ netcat localhost 8989 <<< 'sync me' > expected
^C
wc -c expected 
47104 expected

Good. Now let's cause premature disconnect:

netcat localhost 8989 -w0 <<< 'sync me' > truncated
$ wc -c truncated 
0 truncated

So, it does lead to early close, but server still says

sync detected on 127.0.0.1:44176
All json length: 47104

Let's instrument do_write as well:

    async_write( //
        socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()),
        [/*this,*/ self](error_code ec, size_t transfer_size) {
            std::cerr << "do_write completion: " << transfer_size << " bytes ("
                      << ec.message() << ")" << std::endl;

            if (!ec) {
                /// everything is fine.
            } else {
                /// what to do here?
                // FIXME: probably cancel the read loop so the connection
                // closes?
            }
        });

Now we see:

sync detected on 127.0.0.1:44494
All json length: 47104
do_write completion: 47104 bytes (Success)
sync detected on 127.0.0.1:44512
All json length: 47104
do_write completion: 32768 bytes (Operation canceled)

For one disconnected and one "okay" connection.

No sign of crashes/undefined behaviour. Let's check with -fsanitize=address,undefined: clean record, even adding a heartbeat:

int main() {
    Server s("127.0.0.1", "8989");

    std::thread yolo([&s] {
        using namespace std::literals;
        int i = 1;

        do {
            std::this_thread::sleep_for(5s);
        } while (s.deliver("HEARTBEAT DEMO "   std::to_string(i  )));
    });

    s.run();

    yolo.join();
}

Conclusion

The only problem highlighted above that weren't addressed were:

  • additional threading issues not shown (perhaps via getNativeHandle)

  • the fact that you can have overlapping writes in the Connection do_write. Fixing that:

     void Connection::write(std::string msg) { // public, might not be on the strand
         post(socket_.get_executor(),
              [self = shared_from_this(), msg = std::move(msg)]() mutable {
                  self->do_write(std::move(msg));
              });
     }
    
     void Connection::do_write(std::string msg) { // assumed on the strand
         outgoing_.push_back(std::move(msg));
    
         if (outgoing_.size() == 1)
             do_write_loop();
     }
    
     void Connection::do_write_loop() {
         if (outgoing_.size() == 0)
             return;
    
         auto self(shared_from_this());
         async_write( //
             socket_, boost::asio::buffer(outgoing_.front()),
             [this, self](error_code ec, size_t transfer_size) {
                 std::cerr << "write completion: " << transfer_size << " bytes ("
                           << ec.message() << ")" << std::endl;
    
                 if (!ec) {
                     outgoing_.pop_front();
                     do_write_loop();
                 } else {
                     socket_.cancel();
    
                     // This would ideally be enough to free the connection, but
                     // since `ConnectionManager` doesn't use `weak_ptr` you need to
                     // force the issue using kind of an "umbellical cord reflux":
                     connection_manager_.stop(self);
                 }
             });
     }
    

As you can see I also split write/do_write to prevent off-strand invocation. Same with stop.

Full Listing

A full listing with all the remarks/fixes from above:

  • File connection.h

     #pragma once
    
     #include <boost/asio.hpp>
    
     #include <array>
     #include <deque>
     #include <memory>
     #include <string>
     using boost::asio::ip::tcp;
    
     class ConnectionManager;
    
     /// Represents a single connection from a client.
     class Connection : public std::enable_shared_from_this<Connection> {
       public:
         Connection(const Connection&) = delete;
         Connection& operator=(const Connection&) = delete;
    
         /// Construct a connection with the given socket.
         explicit Connection(tcp::socket socket, ConnectionManager& manager);
    
         void start();
         void stop();
         void write(std::string msg);
    
       private:
         void do_stop();
         void do_write(std::string msg);
         void do_write_loop();
    
         /// Perform an asynchronous read operation.
         void do_read();
    
         /// Socket for the connection.
         tcp::socket socket_;
    
         /// The manager for this connection.
         ConnectionManager& connection_manager_;
    
         /// Buffer for incoming data.
         std::array<char, 8192> buffer_;
    
         std::deque<std::string> outgoing_;
     };
    
     using connection_ptr = std::shared_ptr<Connection>;
    
  • File connection_manager.h

     #pragma once
    
     #include <list>
     #include "connection.h"
    
     /// Manages open connections so that they may be cleanly stopped when the server
     /// needs to shut down.
     class ConnectionManager {
       public:
         ConnectionManager(const ConnectionManager&) = delete;
         ConnectionManager& operator=(const ConnectionManager&) = delete;
         ConnectionManager() = default; // could be split across h/cpp if you wanted
    
         void register_and_start(connection_ptr c);
         void stop(connection_ptr c);
         void stop_all();
    
         void broadcast(const std::string& buffer);
    
         // purge defunct connections, returns remaining active connections
         size_t garbage_collect();
    
       private:
         using handle = std::weak_ptr<connection_ptr::element_type>;
         std::list<handle> connections_;
     };
    
  • File server.h

     #pragma once
    
     #include <boost/asio.hpp>
     #include <string>
     #include "connection.h"
     #include "connection_manager.h"
    
     class Server {
       public:
         Server(const Server&) = delete;
         Server& operator=(const Server&) = delete;
    
         /// Construct the server to listen on the specified TCP address and port,
         /// and serve up files from the given directory.
         explicit Server(const std::string& address, const std::string& port);
    
         /// Run the server's io_service loop.
         void run();
    
         bool deliver(const std::string& buffer);
    
       private:
         void do_accept();
         void do_await_signal();
    
         boost::asio::io_context      io_context_;
         boost::asio::any_io_executor strand_{io_context_.get_executor()};
         boost::asio::signal_set      signals_{strand_};
         tcp::acceptor                acceptor_{strand_};
         ConnectionManager            connection_manager_;
     };
    
  • File connection.cpp

     #include "connection.h"
    
     #include <boost/algorithm/string.hpp>
     #include <iostream>
     #include <thread>
     #include <utility>
     #include <vector>
    
     #include "connection_manager.h"
     using boost::system::error_code;
    
     Connection::Connection(tcp::socket socket, ConnectionManager& manager)
         : socket_(std::move(socket))
         , connection_manager_(manager) {}
    
     void Connection::start() { // always assumed on the strand (since connection
                                // just constructed)
         do_read();
     }
    
     void Connection::stop() { // public, might not be on the strand
         post(socket_.get_executor(),
              [self = shared_from_this()]() mutable {
                  self->do_stop();
              });
     }
    
     void Connection::do_stop() { // assumed on the strand
         socket_.cancel(); // trust shared pointer to destruct
     }
    
     namespace /*missing code stubs*/ {
         auto split(std::string_view input, char delim) {
             std::vector<std::string_view> result;
             boost::algorithm::split(result, input,
                                     boost::algorithm::is_from_range(delim, delim));
             return result;
         }
    
         std::string getExecutionJsons()   { return std::string(1024,  'E'); }
         std::string getOrdersAsJsons()    { return std::string(13312, 'O'); }
         std::string getPositionsAsJsons() { return std::string(8192,  'P'); }
         std::string createSyncDoneJson()  { return std::string(24576, 'C'); }
     } // namespace
    
     void Connection::do_read() {
         auto self(shared_from_this());
         socket_.async_read_some(
             boost::asio::buffer(buffer_),
             [this, self](error_code ec, size_t bytes_transferred) {
                 if (!ec) {
                     std::string buff_str =
                         std::string(buffer_.data(), bytes_transferred);
                     const auto& tokenized_buffer = split(buff_str, ' ');
    
                     if (!tokenized_buffer.empty() &&
                         tokenized_buffer[0] == "sync") {
                         std::cerr << "sync detected on " << socket_.remote_endpoint() << std::endl;
                         /// "syncing connection" sends a specific text
                         /// hence I can separate between sycing and long-lived
                         /// connections here and act accordingly.
    
                         const auto& exec_json_strs     = getExecutionJsons();
                         const auto& order_json_strs    = getOrdersAsJsons();
                         const auto& position_json_strs = getPositionsAsJsons();
                         const auto& all_json_strs      = exec_json_strs  
                             order_json_strs   position_json_strs  
                             createSyncDoneJson();
    
                         std::cerr << "All json length: " << all_json_strs.length() << std::endl;
                         /// this is potentially a very large data.
                         do_write(all_json_strs); // already on strand!
                     }
    
                     do_read();
                 } else {
                     std::cerr << "do_read terminating: " << ec.message() << std::endl;
                     connection_manager_.stop(shared_from_this());
                 }
             });
     }
    
     void Connection::write(std::string msg) { // public, might not be on the strand
         post(socket_.get_executor(),
              [self = shared_from_this(), msg = std::move(msg)]() mutable {
                  self->do_write(std::move(msg));
              });
     }
    
     void Connection::do_write(std::string msg) { // assumed on the strand
         outgoing_.push_back(std::move(msg));
    
         if (outgoing_.size() == 1)
             do_write_loop();
     }
    
     void Connection::do_write_loop() {
         if (outgoing_.size() == 0)
             return;
    
         auto self(shared_from_this());
         async_write( //
             socket_, boost::asio::buffer(outgoing_.front()),
             [this, self](error_code ec, size_t transfer_size) {
                 std::cerr << "write completion: " << transfer_size << " bytes ("
                           << ec.message() << ")" << std::endl;
    
                 if (!ec) {
                     outgoing_.pop_front();
                     do_write_loop();
                 } else {
                     socket_.cancel();
    
                     // This would ideally be enough to free the connection, but
                     // since `ConnectionManager` doesn't use `weak_ptr` you need to
                     // force the issue using kind of an "umbellical cord reflux":
                     connection_manager_.stop(self);
                 }
             });
     }
    
  • File connection_manager.cpp

     #include "connection_manager.h"
    
     void ConnectionManager::register_and_start(connection_ptr c) {
         connections_.emplace_back(c);
         c->start();
     }
    
     void ConnectionManager::stop(connection_ptr c) {
         c->stop();
     }
    
     void ConnectionManager::stop_all() {
         for (auto h : connections_)
             if (auto c = h.lock())
                 c->stop();
     }
    
     /// this function is used to keep clients up to date with the changes, not used
     /// during syncing phase.
     void ConnectionManager::broadcast(const std::string& buffer) {
         for (auto h : connections_)
             if (auto c = h.lock())
                 c->write(buffer);
     }
    
     size_t ConnectionManager::garbage_collect() {
         connections_.remove_if(std::mem_fn(&handle::expired));
         return connections_.size();
     }
    
  • File server.cpp

     #include "server.h"
     #include <signal.h>
     #include <utility>
    
     using boost::system::error_code;
    
     Server::Server(const std::string& address, const std::string& port)
         : io_context_(1) // THREAD HINT: single threaded
         , connection_manager_()
     {
         // Register to handle the signals that indicate when the server should exit.
         // It is safe to register for the same signal multiple times in a program,
         // provided all registration for the specified signal is made through Asio.
         signals_.add(SIGINT);
         signals_.add(SIGTERM);
     #if defined(SIGQUIT)
         signals_.add(SIGQUIT);
     #endif // defined(SIGQUIT)
    
         do_await_signal();
    
         // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
         tcp::resolver resolver(io_context_);
         tcp::endpoint endpoint = *resolver.resolve({address, port});
         acceptor_.open(endpoint.protocol());
         acceptor_.set_option(tcp::acceptor::reuse_address(true));
         acceptor_.bind(endpoint);
         acceptor_.listen();
    
         do_accept();
     }
    
     void Server::run() {
         // The io_service::run() call will block until all asynchronous operations
         // have finished. While the server is running, there is always at least one
         // asynchronous operation outstanding: the asynchronous accept call waiting
         // for new incoming connections.
         io_context_.run();
     }
    
     void Server::do_accept() {
         // separate strand for each connection - just in case you ever add threads
         acceptor_.async_accept(
             make_strand(io_context_), [this](error_code ec, tcp::socket sock) {
                 if (!ec) {
                     connection_manager_.register_and_start(
                         std::make_shared<Connection>(std::move(sock),
                                                      connection_manager_));
                     do_accept();
                 }
             });
     }
    
     void Server::do_await_signal() {
         signals_.async_wait([this](error_code /*ec*/, int /*signo*/) {
             // handler on the strand_ because of the executor on signals_
    
             // The server is stopped by cancelling all outstanding asynchronous
             // operations. Once all operations have finished the io_service::run()
             // call will exit.
             acceptor_.cancel();
             connection_manager_.stop_all();
         });
     }
    
     bool Server::deliver(const std::string& buffer) {
         if (io_context_.stopped()) {
             return false;
         }
         post(io_context_,
              [this, buffer] { connection_manager_.broadcast(std::move(buffer)); });
         return true;
     }
    
  • File test.cpp

     #include "server.h"
    
     int main() {
         Server s("127.0.0.1", "8989");
    
         std::thread yolo([&s] {
             using namespace std::literals;
             int i = 1;
    
             do {
                 std::this_thread::sleep_for(5s);
             } while (s.deliver("HEARTBEAT DEMO "   std::to_string(i  )));
         });
    
         s.run();
    
         yolo.join();
     }
    

轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/456316.html

標籤:C 插座 促进 升压-asio

上一篇:如何在std::pair中轉發不可移動物件

下一篇:為什么需要使用`MSG_WAITALL`FLAG而不是`0`FLAG?為什么要將它與UDP一起使用?

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • Git本地庫既關聯GitHub又關聯Gitee

    創建代碼倉庫 使用gitee舉例(github和gitee差不多) 1.在gitee右上角點擊+,選擇新建倉庫 ? 2.選擇填寫倉庫資訊,然后進行創建 ? 3.服務端已經準備好了,本地開始作準備 (1)Git 全域設定 git config --global user.name "成鈺" git c ......

    uj5u.com 2020-09-10 05:04:14 more
  • CODING DevOps 代碼質量實戰系列第二課,相約周三

    隨著 ToB(企業服務)的興起和 ToC(消費互聯網)產品進入成熟期,線上故障帶來的損失越來越大,代碼質量越來越重要,而「質量內建」正是 DevOps 核心理念之一。**《DevOps 代碼質量實戰(PHP 版)》**為 CODING DevOps 代碼質量實戰系列的第二課,同時也是本系列的 PHP ......

    uj5u.com 2020-09-10 05:07:43 more
  • 推薦Scrum書籍

    推薦Scrum書籍 直接上干貨,推薦書籍清單如下(推薦有順序的哦) Scrum指南 Scrum精髓 Scrum敏捷軟體開發 Scrum捷徑 硝煙中的Scrum和XP : 我們如何實施Scrum 敏捷軟體開發:Scrum實戰指南 Scrum要素 大規模Scrum:大規模敏捷組織的設計 用戶故事地圖 用 ......

    uj5u.com 2020-09-10 05:07:45 more
  • CODING DevOps 代碼質量實戰系列最后一課,周四發車

    隨著 ToB(企業服務)的興起和 ToC(消費互聯網)產品進入成熟期,線上故障帶來的損失越來越大,代碼質量越來越重要,而「質量內建」正是 DevOps 核心理念之一。 **《DevOps 代碼質量實戰(Java 版)》**為 CODING DevOps 代碼質量實戰系列的最后一課,同時也是本系列的 ......

    uj5u.com 2020-09-10 05:07:52 more
  • 敏捷軟體工程實踐書籍

    Scrum轉型想要做好,第一步先了解并真正落實Scrum,那么我推薦的Scrum書籍是要看懂并實踐的。第二步是團隊的工程實踐要做扎實。 下面推薦工程實踐書單: 重構:改善既有代碼的設計 決議極限編程 : 擁抱變化 代碼整潔代碼 程式員的職業素養 修改代碼的藝術 撰寫可讀代碼的藝術 測驗驅動開發 : ......

    uj5u.com 2020-09-10 05:07:55 more
  • Jenkins+svn+nginx實作windows環境自動部署vue前端專案

    前面文章介紹了Jenkins+svn+tomcat實作自動化部署,現在終于有空抽時間出來寫下Jenkins+svn+nginx實作自動部署vue前端專案。 jenkins的安裝和配置已經在前面文章進行介紹,下面介紹實作vue前端專案需要進行的哪些額外的步驟。 注意:在安裝jenkins和nginx的 ......

    uj5u.com 2020-09-10 05:08:49 more
  • CODING DevOps 微服務專案實戰系列第一課,明天等你

    CODING DevOps 微服務專案實戰系列第一課**《DevOps 微服務專案實戰:DevOps 初體驗》**將由 CODING DevOps 開發工程師 王寬老師 向大家介紹 DevOps 的基本理念,并探討為什么現代開發活動需要 DevOps,同時將以 eShopOnContainers 項 ......

    uj5u.com 2020-09-10 05:09:14 more
  • CODING DevOps 微服務專案實戰系列第二課來啦!

    近年來,工程專案的結構越來越復雜,需要接入合適的持續集成流水線形式,才能滿足更多變的需求,那么如何優雅地使用 CI 能力提升生產效率呢?CODING DevOps 微服務專案實戰系列第二課 《DevOps 微服務專案實戰:CI 進階用法》 將由 CODING DevOps 全堆疊工程師 何晨哲老師 向 ......

    uj5u.com 2020-09-10 05:09:33 more
  • CODING DevOps 微服務專案實戰系列最后一課,周四開講!

    隨著軟體工程越來越復雜化,如何在 Kubernetes 集群進行灰度發布成為了生產部署的”必修課“,而如何實作安全可控、自動化的灰度發布也成為了持續部署重點關注的問題。CODING DevOps 微服務專案實戰系列最后一課:**《DevOps 微服務專案實戰:基于 Nginx-ingress 的自動 ......

    uj5u.com 2020-09-10 05:10:00 more
  • CODING 儀表盤功能正式推出,實作作業資料可視化!

    CODING 儀表盤功能現已正式推出!該功能旨在用一張張統計卡片的形式,統計并展示使用 CODING 中所產生的資料。這意味著無需額外的設定,就可以收集歸納寶貴的作業資料并予之量化分析。這些海量的資料皆會以圖表或串列的方式躍然紙上,方便團隊成員隨時查看各專案的進度、狀態和指標,云端協作迎來真正意義上 ......

    uj5u.com 2020-09-10 05:11:01 more
最新发布
  • windows系統git使用ssh方式和gitee/github進行同步

    使用git來clone專案有兩種方式:HTTPS和SSH:
    HTTPS:不管是誰,拿到url隨便clone,但是在push的時候需要驗證用戶名和密碼;
    SSH:clone的專案你必須是擁有者或者管理員,而且需要在clone前添加SSH Key。SSH 在push的時候,是不需要輸入用戶名的,如果配置... ......

    uj5u.com 2023-04-19 08:41:12 more
  • windows系統git使用ssh方式和gitee/github進行同步

    使用git來clone專案有兩種方式:HTTPS和SSH:
    HTTPS:不管是誰,拿到url隨便clone,但是在push的時候需要驗證用戶名和密碼;
    SSH:clone的專案你必須是擁有者或者管理員,而且需要在clone前添加SSH Key。SSH 在push的時候,是不需要輸入用戶名的,如果配置... ......

    uj5u.com 2023-04-19 08:35:34 more
  • 2023年農牧行業6大CRM系統、5大場景盤點

    在物聯網、大資料、云計算、人工智能、自動化技術等現代資訊技術蓬勃發展與逐步成熟的背景下,數字化正成為農牧行業供給側結構性變革與高質量發展的核心驅動因素。因此,改造和提升傳統農牧業、開拓創新現代智慧農牧業,加快推進農牧業的現代化、資訊化、數字化建設已成為農牧業發展的重要方向。 當下,企業數字化轉型已經 ......

    uj5u.com 2023-04-18 08:05:44 more
  • 2023年農牧行業6大CRM系統、5大場景盤點

    在物聯網、大資料、云計算、人工智能、自動化技術等現代資訊技術蓬勃發展與逐步成熟的背景下,數字化正成為農牧行業供給側結構性變革與高質量發展的核心驅動因素。因此,改造和提升傳統農牧業、開拓創新現代智慧農牧業,加快推進農牧業的現代化、資訊化、數字化建設已成為農牧業發展的重要方向。 當下,企業數字化轉型已經 ......

    uj5u.com 2023-04-18 08:00:18 more
  • 計算機組成原理—存盤器

    計算機組成原理—硬體結構 二、存盤器 1.概述 存盤器是計算機系統中的記憶設備,用來存放程式和資料 1.1存盤器的層次結構 快取-主存層次主要解決CPU和主存速度不匹配的問題,速度接近快取 主存-輔存層次主要解決存盤系統的容量問題,容量接近與價位接近于主存 2.主存盤器 2.1概述 主存與CPU的聯 ......

    uj5u.com 2023-04-17 08:20:31 more
  • 談一談我對協同開發的一些認識

    如今各互聯網公司普通都使用敏捷開發,采用小步快跑的形式來進行專案開發。如果是小專案或者小需求,那一個開發可能就搞定了。但對于電商等復雜的系統,其功能多,結構復雜,一個人肯定是搞不定的,所以都是很多人來共同開發維護。以我曾經待過的商城團隊為例,光是后端開發就有七十多人。 為了更好地開發這類大型系統,往 ......

    uj5u.com 2023-04-17 08:18:55 more
  • 專案管理PRINCE2核心知識點整理

    PRINCE2,即 PRoject IN Controlled Environment(受控環境中的專案)是一種結構化的專案管理方法論,由英國政府內閣商務部(OGC)推出,是英國專案管理標準。
    PRINCE2 作為一種開放的方法論,是一套結構化的專案管理流程,描述了如何以一種邏輯性的、有組織的方法,... ......

    uj5u.com 2023-04-17 08:18:51 more
  • 談一談我對協同開發的一些認識

    如今各互聯網公司普通都使用敏捷開發,采用小步快跑的形式來進行專案開發。如果是小專案或者小需求,那一個開發可能就搞定了。但對于電商等復雜的系統,其功能多,結構復雜,一個人肯定是搞不定的,所以都是很多人來共同開發維護。以我曾經待過的商城團隊為例,光是后端開發就有七十多人。 為了更好地開發這類大型系統,往 ......

    uj5u.com 2023-04-17 08:18:00 more
  • 專案管理PRINCE2核心知識點整理

    PRINCE2,即 PRoject IN Controlled Environment(受控環境中的專案)是一種結構化的專案管理方法論,由英國政府內閣商務部(OGC)推出,是英國專案管理標準。
    PRINCE2 作為一種開放的方法論,是一套結構化的專案管理流程,描述了如何以一種邏輯性的、有組織的方法,... ......

    uj5u.com 2023-04-17 08:17:55 more
  • 計算機組成原理—存盤器

    計算機組成原理—硬體結構 二、存盤器 1.概述 存盤器是計算機系統中的記憶設備,用來存放程式和資料 1.1存盤器的層次結構 快取-主存層次主要解決CPU和主存速度不匹配的問題,速度接近快取 主存-輔存層次主要解決存盤系統的容量問題,容量接近與價位接近于主存 2.主存盤器 2.1概述 主存與CPU的聯 ......

    uj5u.com 2023-04-17 08:12:06 more