我正在嘗試使用 Actix 撰寫 Udp 客戶端 Actor。我遵循了這個例子,UDP-Echo,但我似乎無法使用 UdpFramed tokio 結構向服務器發送訊息。這是我到目前為止所擁有的,這是 Udp Client Actor 實作
use std::collections::HashMap;
use std::net::{SocketAddr};
use actix_rt::net::UdpSocket;
use actix::{Actor, Addr, AsyncContext, Context, Handler, StreamHandler, Message};
use actix::io::SinkWrite;
use actix_web::web::{Bytes, BytesMut};
use futures_util::stream::{SplitSink};
use futures_util::StreamExt;
use log::info;
use serde_json::Value;
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;
use crate::rosclient::messages::Subscribe;
use std::io::Result;
mod messages;
type SinkItem = (Bytes, SocketAddr);
type UdpSink = SplitSink<UdpFramed<BytesCodec, UdpSocket>, SinkItem>;
pub struct UdpClientActor {
pub address: SocketAddr,
pub sink: SinkWrite<SinkItem, UdpSink>,
}
impl UdpClientActor {
pub fn start(udp: UdpSocket, address: SocketAddr) -> Addr<UdpClientActor> {
let framed = UdpFramed::new(udp, BytesCodec::new());
let (split_sink, split_stream) = framed.split();
UdpClientActor::create(|ctx| {
ctx.add_stream(split_stream.filter_map(
|item: Result<(BytesMut, SocketAddr)>| async {
item.map(|(data, sender)| UdpPacket(data, sender)).ok()
},
));
UdpClientActor {
address,
sink: SinkWrite::new(split_sink, ctx),
}
})
}
}
impl Actor for UdpClientActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let mut hashmap = HashMap::new();
hashmap.insert(String::from("topic"), Value::String(String::from("/client_count")));
let subscription = Subscribe {
id: Default::default(),
op: "subscribe".to_string(),
extra: hashmap
};
ctx.notify(subscription);
}
}
#[derive(Message)]
#[rtype(result = "()")]
struct UdpPacket(BytesMut, SocketAddr);
impl StreamHandler<UdpPacket> for
UdpClientActor {
fn handle(&mut self, item: UdpPacket, _ctx: &mut Self::Context) {
println!("Received: ({:?}, {:?})", item.0, item.1);
self.sink.write((item.0.into(), item.1)).unwrap();
}
}
impl actix::io::WriteHandler<std::io::Error> for UdpClientActor {}
impl Handler<Subscribe> for UdpClientActor {
type Result = ();
fn handle(&mut self, msg: Subscribe, _ctx: &mut Self::Context) -> Self::Result {
let js = serde_json::json!(msg).to_string();
let _ = self.sink.write((Bytes::from(msg.to_string()), self.address));
info!("Subscribing to topic {}", js);
}
}
我的主要功能創建 udp 套接字并生成演員。
fn main() {
////////////////////////////////////////////////////////////////////////////
let fut = async {
////////////////////////////////////////////////////////////////////////////
/////////// UDP_ACTOR
let sock = tokio::net::UdpSocket::bind("0.0.0.0:9091").await.unwrap();
let remote_addr = "172.30.89.169:9091".parse::<SocketAddr>().unwrap();
// let message = b"{ \"op\": \"subscribe\", \"topic\": \"/client_count\"}";
let _ = sock.connect(remote_addr).await;
// sock.send(message).await.unwrap();
let _udp_client = UdpClientActor::start(sock, remote_addr);
};
actix_rt::Arbiter::new().spawn(fut);
// system.block_on(fut);
system.run().unwrap();
}
如果我洗掉評論
let message = b"{ \"op\": \"subscribe\", \"topic\": \"/client_count\"}";
和
sock.send(message).await.unwrap();
我至少可以檢查服務器是否可以實際接收訊息。所以我知道問題一定出在我對actor的實作上。我確實有另一個使用 LinesCodec 而不是 BytesCodec,它遵循完全相同的實作。唯一的區別是 SinkWrite 變成了這樣:
SinkWrite<(String, SocketAddr), SplitSink<UdpFramed<codec::LinesCodec>,
(String, SocketAddr)>>
這是我的 Cargo.toml 供參考。
[package]
name = "local_websocket_client"
version = "0.1.0"
edition = "2018"
[dependencies]
actix="0.12"
actix-codec = "0.4"
actix-rt = "2.5"
bytestring = "1.0"
serde = {version="1.0", features=["serde_derive"]}
log = "0.4"
env_logger = "0.9.0"
chrono = "0.4"
dashmap = "4.0"
futures = "0.3"
openssl = "0.10"
tokio = { version = "1", features = ["full"] }
actix-web = "4.0.0-beta.15"
futures-util = "0.3"
tokio-util = { version="0.6", features=["net", "codec"] }
tokio-udp = "0.1.6"
bytes= { version="0.6", features=["serde"] }
[dependencies.awc]
features = ["openssl"]
version = "3.0.0-beta.9"
[dependencies.serde_json]
features = ["default"]
version = "1.0"
[dependencies.uuid]
features = ["v4", "serde", "v5"]
version = "0.8"
那里有一些額外的板條箱,因為我在同一個應用程式上運行 2 個其他 websocket 客戶端。
我真的很感激在這個問題上的一些幫助。謝謝
uj5u.com熱心網友回復:
通過將 UdpSocket 包裝在 Arc 中并將參考保存在 actor 中以供以后使用來解決它。使用套接字寫入訊息是可行的。用于流處理程式的拆分流不需要更改,因為它按預期作業。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/407211.html
標籤:
