目錄
- Future
- Wake & Context
- 為什么需要 executor ?
- 什么是 waker ?
- async/await
- Executor
- Waker struct 到 ArcWake trait
- FuturesUnordered
- 單執行緒 executor
- 執行緒池 executor
- 總結
異步編程在 Rust 中的地位非常高,很多 crate 尤其是多IO操作的都使用了 async/await.
首先弄清楚異步編程的幾個基本概念:
Future
Future 代表一個可在未來某個時候獲取回傳值的 task,為了獲取這個 task 的執行狀況,Future 提供了一個函式用于判斷該 task 是否執行回傳,
trait Future {
type Output;
fn poll(self: Pin<&mut self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
poll 函式就是一個 Future 用于檢查自己的 task 是否已經完成,例如我可以創建一個與某個 IP 建立 TCP 連接的 struct,在構建時完成建立連接的作業,然后實作 Future trait 時檢查連接是否已經建立完成,根據建立情況回傳 enum Poll 中的兩個元素之一:
- Poll::Pending: task 還在等待
- Poll::Ready(result): task 攜帶 result 回傳
實際上,基于 async 定義的函式和代碼塊也會被編譯器編譯為 Future,但是 async 函式或代碼塊無法顯式地回傳 Pending,因此一般只能完成一些簡單的呼叫其他 Future 的作業,復雜的異步程序通常還是交由實作了 Future trait 的型別完成,
Wake & Context
你可能會好奇上面 poll 函式簽名里的 cx 引數的作用,在 Rust 官方檔案的定義中,Context 暫時只用于獲取 Waker,而 Waker 的作用是用于提醒 executor 該 task 已經準備好運行了,
為什么需要 executor ?
同樣以上面的建立 TCP 連接的例子來說,在網路卡頓時,進行一次 poll 可能都沒有建立連接,如果沒有設定 timeout 之類的東西的話,就需要進行多次 poll,這樣的 Future 多了以后,我們可能會想,不妨將所有的 Future 都存盤在一起,然后另起一個執行緒用于回圈遍歷所有的 Future 是否已經 ready,如果 ready 則回傳結果,這就是一個非常簡單的單執行緒 executor 的雛形,
也就是說,executor 是一個托管運行 task 的工具,類似于多執行緒,多執行緒要成功運行需要一個調度器進行調度,但是多執行緒至少需要語言層面甚至作業系統層面的支持,而 executor,如果你翻看 Rust 的官方檔案的話,會發現沒有任何關于 executor 的實作,實際上,Rust 選擇將 executor 的實作交給第三方,自己只保留相關的互動介面(我在隔壁C++看了看,似乎也是一樣的做法,并沒有一個官方的 executor 實作,我唯一所知的在語言層面提供支持的只有Golang 的 goroutine),
什么是 waker ?
上面講述的輪詢所有的 Future 是否已經完成實際是最低效的一種做法,當 Future 多了以后會帶來相當多的 CPU 損耗,考慮到這點,Rust 還提供了一種機制可以用于通知 executor 某個 Future 是否應該被輪詢,當然這只是其中的一種解決方式,實際上 Waker 的 wake 函式可以被實作為任何邏輯,取決于 executor,
在我看來,Waker 的內部定義相當不簡潔,相當不 Rust,Waker 內部定義有一個 RawWaker,RawWaker 包含一個 RawWakerVTable,RawWakerVTable 定義了四個函式指標,executor 要實作 Waker 就需要定義這四種型別的函式然后賦值給 RawWakerVTable,
struct Waker {
waker: RawWaker
}
struct RawWaker {
data: *const (),
vtable: &'static RawWakerVTable
}
struct RawWakerVTable {
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
wake_by_ref: unsafe fn(*const ()),
drop: unsafe fn(*const ())
}
之所以沒有設計為 trait 形式,主要是 clone 函式,受限于 Rust 的 trait object safety,trait 中的任何函式的引數或回傳值如果包含 Self 且有 type bound Sized,則不符合 trait object safe 規范,這樣的 trait 可以被定義,可以被實作,但是無法與 dyn 一起進行動態系結,
而 clones 函式又是必須的,因為 future 可能還會接著呼叫 future 的 poll 方法,就需要再 clone 一個 context 傳入,
或許可以用 Box<dyn Waker> 或者 Arc<dyn Waker> 之類的,但是這些都不比 raw pointer 靈活,所以最終 Rust 還是選擇定義一個包含函式指標的 struct,
async/await
這兩個關鍵字可以說是異步編程領域的標志,,但在 Rust 中這兩個關鍵字只是起到語法糖的作用,并不是異步的核心,
async 用于快速創建 Future,不管是函式還是代碼塊或者lambda運算式,都可以在前面加上 async 關鍵字快速變成 Future,對于
async fn bar() {
foo().await;
}
編譯器會自動生成類似下面的代碼
fn bar() -> impl Future {
std::future::from_generator(move |mut _task_context| {
let _t = {
match std::future::IntoFuture::into_future(foo()) {
mut __awaitee => loop {
match unsafe {
std::future::Future::poll(
std::pin::Pin::new_unchecked(&mut __awaitee),
std::future::get_context(_task_context),
)
} {
std::task::Poll::Ready { 0: result } => break result,
std::task::Poll::Pending {} => {}
}
_task_context = (yield ());
},
};
};
_t
})
}
Tips:上面的代碼可以在 Rust Playground 里面點生成 HIR 看到,
Executor
前面講到 wake 的時候,其實作與具體的 executor 相關,但是我覺得如果不從 executor 的實作角度看一下比較難以理解,只能淺顯地知道 wake 是告訴 executor 準備再 poll 一遍,
Rust 中我知道的 async runtime lib 就是 futures-rs 和 tokio,前者在 GitHub 上是 rust-lang 官方組織推出的 repo,而后者雖然不清楚是否有官方參與,但是功能明顯比前者豐富,據我所知使用異步的專案大部分都是使用 tokio,
我這里選擇更簡單的 futures-rs 講一下其 executor 的實作,雖然其更加輕量但起碼也是官方推出的,有質量保證,
Waker struct 到 ArcWake trait
futures-rs 還是將標準庫里面的 Waker 封裝成了 ArcWake trait,并且是 pub 的,和 raw pointer 打交道畢竟是 unsafe 的,與其滿篇的 unsafe 亂飛,不如將 unsafe 限制在一定的范圍內,
Waker 本質上是一個變數的指標(data)帶著四個函式指標的結構體(RawWakerVTable),因此在定義函式指標時只需要將指標強轉成實作某個 trait 的泛型,再呼叫該 trait 的對應方法不就可以了,以 wake 函式為例:
trait Wake {
fn wake(self) {
Wake::wake_by_ref(&self);
}
fn wake_by_ref(&self);
}
unsafe fn wake<T: WakeTrait>(data: *const ()) {//對應RawWakerVTable里的函式指標
let v = data.cast::<T>();
v.wake();
}
這樣就實作了 Waker struct 到 Waker trait 的轉換,盡管如此,我們還需要一個結構體用來表示 Waker,滿足下列條件:
- 實作 Deref trait,在參考時回傳 &std::task::Waker
- 為了滿足 Rust 的 safety rules,需要手動管理data的記憶體,顯然某個實作了 Wake 的型別不會為了創建 waker 就交出自己的擁有權,因此只能通過傳入的參考轉成指標來創建 ManuallyDrop 實體,并考慮到 Deref trait 和后續的 Context 創建,需要通過 PhantomData 來管理 lifetime annotation
從而創建 WakeRef 結構體:
use std::mem::ManuallyDrop;
use std::task::Waker;
use std::marker::PhantomData;
struct WakeRef<'a> {
waker: ManuallyDrop<Waker>,
_marker: PhantomData<&'a ()>
}
如何根據參考創建 WakeRef 實體:
use std::task::{Waker, RawWaker};
fn get_waker<W: Wake>(wake: &W) -> WakeRef<'_> {
let ptr = wake as *const _ as *const ();
WakeRef {
waker: ManuallyDrop::new(unsafe {Waker::from_raw(RawWaker::new(ptr, ...))}),//...省略的是創建RawWakerVTable的程序
_marker: PhantomData
}
}
實作 Deref
use std::task::Waker;
impl std::ops::Deref for WakeRef<'_> {
type Target = Waker;
fn deref(&self) -> &Waker {
&self.waker
}
}
因此對于某個實作 Wake 的型別來說,只需要傳入參考就可以用 Context::from_waker(&waker) 來創建 context 了,
在 futures-rs 中,由于涉及到多執行緒,所以上述的其實并不安全,需要將普通參考改成 Arc 用于在多執行緒之間傳遞,Wake trait 也變成了 ArcWake,
trait ArcWake: Send + Sync {
fn wake(self: Arc<Self>) {
Self::wake_by_ref(&self)
}
fn wake_by_ref(arc_self: &Arc<Self>);
}
但是道理差不多,RawWakerVTable 的四個函式也與這個有關,以 wake 函式為例:
unsafe fn wake_arc_raw<T: ArcWake>(data: *const ()) {
let arc: Arc<T> = Arc::from_raw(data.cast::<T>());
ArcWake::wake(arc);
}
FuturesUnordered
FuturesUnordered 是一個 Future 的托管容器,其有一條鏈表維護所有的 Future,再通過一個佇列維護所有需要運行的 Future(當然這里都不是 collections 里面那種普通的鏈表和佇列,由于 FuturesUnordered 其實要與單執行緒和執行緒池 executor 共用,所以這兩個資料結構其實還涉及很多原子化操作,在保證原子化且無鎖的前提下要設計一個鏈表還挺麻煩的),
struct FuturesUnordered<Fut> {
ready_to_run_queue: Arc<ReadyToRunQueue<Fut>>,//需要運行的Future佇列
head_all: AtomicPtr<Task<Fut>>,//所有Future組成的鏈表
is_terminated: AtomicBool
}
這里重點看 FuturesUnordered 如何實作 Waker,FuturesUnordered 將 Future 看作一個個 Task ,
struct Task<Fut> {
future: UnsafeCell<Option<Fut>>,
next_all: AtomicPtr<Task<Fut>>,//下一個Task節點
len_all: UnsafeCell<usize>,//鏈表長度
next_ready_to_run: AtomicPtr<Task<Fut>>,//下一個要運行的Task
ready_to_run_queue: Weak<ReadyToRunQueue<Fut>>,
queued: AtomicBool,//是否在Task鏈表內(Task運行時需要從鏈表上摘下)
woken: AtomicBool//是否已經呼叫wake函式
}
為 Task 實作 ArcWake
impl<Fut> ArcWake for Task<Fut> {
fn wake_by_ref(arc_self: &Arc<Self>) {
let inner = match arc_self.ready_to_run_queue.upgrade() {
Some(inner) => inner,
None => return,
};
arc_self.woken.store(true, Relaxed);
let prev = arc_self.queued.swap(true, SeqCst);
if !prev {
inner.enqueue(Arc::as_ptr(arc_self));
inner.waker.wake();
}
}
}
當一個 Task 運行(被poll)時,其被從 FuturesUnordered 的 ready_to_run_queue 上摘下來,而在 wake 中又會重新放回去,因此,如果 Future 內部呼叫了 wake,則 Task 會再被放到 ready_to_run_queue 上運行,如果沒有則不會,
所以每個 Future 使用的 context 其實是來自于 Task:
let waker = Task::waker_ref(task);
let mut cx = Context::from_waker(&waker);
future.poll(&mut cx);
FuturesUnordered 本身實作了 Stream trait
trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
FuturesUnordered 輪流 poll ready_to_run_queue 里面的 Future,根據回傳結果回傳:
- Poll::Pending: ready_to_run_queue 為慷訓所有 Future 已經 poll 了一遍
- Poll::Ready(Some(res)): 某個 Future 回傳 Ready(res)
- Poll::Ready(None): Task 鏈表為空,所有 Task 都已經結束回傳
值得注意的是,在第一種情況下,所有的 Future 都 poll 了一遍,FuturesUnordered 會呼叫一次 wake,告訴 executor FuturesUnordered 已經運行了一個輪回,wake 具體的實作則取決于 executor,
單執行緒 executor
單執行緒 executor 允許在單執行緒上復用任意數量的 task,官方建議盡量在多I/O、只需要在 I/O 操作之間完成很少的作業的場景下使用,
struct LocalPool {
pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
incoming: Rc<Incoming>
}
單執行緒 executor 將 Waker 的 wake 與執行緒的 wake 系結,當呼叫 wake 時,如果 executor 執行緒處于 park(即阻塞) 狀態,則 unpark 執行緒,
struct ThreadNotify {
thread: std::thread::Thread,
unparked: AtomicBool
}
impl ArcWake for ThreadNotify {
fn wake_by_ref(arc_self: &Arc<Self>) {
let unparked = arc_self.unparked.swap(true, Ordering::Release);
if !unparked {
arc_self.thread.unpark();
}
}
}
先看 LocalPool 如何定義 run 操作:
fn run_executor<T, F>(mut f: F) -> T
where
F: FnMut(&mut Context<'_>) -> Poll<T>
{
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref(thread_notify);
let mut cx = Context::from_waker(&waker);
loop {
if let Poll::Ready(t) = f(&mut cx) {//f決定了executor的運行方式,只要回傳Ready就表明executor結束運行,
return t;
}
while !thread_notify.unparked.swap(false, Ordering::Acquire) {
thread::park();
}
}
})
}
從 FutureUnordered 的角度來看,在 poll 一遍之后,如果需要繼續運行,則呼叫 wake,將 unparked token 置為 true,此時執行緒不會陷入阻塞;否則 executor 執行緒會主動陷入阻塞,由于 FutureUnordered 和 executor 實際處于同一執行緒,因此此時 executor 只能從其他執行緒 unpark,
這種設計節省了 CPU 資源,使得執行緒只在有 Future 需要 poll 時需要運行,沒有則掛起,再有了就又可以繼續運行,
執行緒池 executor
執行緒池顯然要比單執行緒 executor 更加復雜,隨便一想就想到其至少要實作以下幾點:
- 新 spawn 一個 Future,如何分配到某個執行緒
- 類似于單執行緒,在執行緒沒有被呼叫 wake 時主動阻塞
對于第一點,使用多生產者單消費者管道 mpsc 進行 Future 的分發,實際的模型其實應該是多消費者單生產者,但是 Rust 并不提供這種管道,所以這里使用管道配合 mutex 使用,
struct PoolState {
tx: Mutex<mpsc::Sender<Message>>,
rx: Mutex<mpsc::Receiver<Message>>,
cnt: AtomicUsize,//clone size
size: usize//pool size
}
將 PoolState 包在 Arc 下就變成了 ThreadPool
struct ThreadPool {
state: Arc<PoolState>
}
當 executor spawn 一個新的 future 時,只需要將其封裝為一個 Task,然后傳入管道:
fn spwan_obj_ok(&self, future: FutureObj<'static, ()>) {
let task = Task {
future,
wake_handle: Arc::new(WakeHandl {exec: self.clone(), mutex: UnparkMutex::new()}),
exec: self.clone()
};
self.state.send(Message::Run(task));
}
ThreadPool 也有自定義的 Task:
struct Task {
future: FutureObj<'static ()>,
exec: ThreadPool,
wake_handle: Arc<WakeHandle>
}
struct WakeHandle {
mutex: UnparkMutex<Task>,
exec: ThreadPool
}
Task 主要分為以下狀態:
- POLLING: 正在poll
- REPOLL: 正在 poll 的 Task 如果呼叫 wake 會變成 REPOLL 狀態
- WAITING: Task 正在等待
- COMPLETE:Task 已經完成

如圖為 Task 在不同狀態間的轉換,有些轉換是自動的,比如 poll 回傳 Ready 時自動進入 COMPLETE 狀態,在 REPOLL 狀態會通過呼叫 wait 函式再次進入 POLLING 狀態重復運行一次 poll 函式;有些轉換則需要呼叫函式,比如從 WAITING 進入 POLLING 需要呼叫 Task 的 run 函式才能運行,poll 回傳 Pending 時根據 Future 是否呼叫 wake 函式分別進入 REPOLL 和 WAITING 狀態,
impl Task {
fn run(self) {
let Self { mut future, wake_handle, mut exec } = self;
let waker = waker_ref(&wake_handle);
let mut cx = Context::from_waker(&waker);
unsafe {
wake_handle.mutex.start_poll();
loop {
let res = future.poll_unpin(&mut cx);
match res {
Poll::Pending => {}
Poll::Ready(()) => return wake_handle.mutex.complete(),
}
let task = Self { future, wake_handle: wake_handle.clone(), exec };
match wake_handle.mutex.wait(task) {
Ok(()) => return, // we've waited
Err(task) => {
// someone's notified us
future = task.future;
exec = task.exec;
}
}
}
}
}
}
執行緒池 executor 和單執行緒 executor 對待 Pending 的方式,相同點在于如果 Future 沒有呼叫 wake,則放棄 Future,Future 要運行只能重新 spawn,不同點:
- 執行緒池:如果 Future 呼叫 wake,所在的執行緒阻塞式呼叫 poll 直到回傳 Ready 或者 Future 放棄呼叫 wake
- 單執行緒:呼叫 wake 不會立刻再屌用 poll,但加入到 ready_to_run_queue 里面在下一次回圈中被 poll
總結
本文只是一篇介紹 Rust 異步編程的原理,并通過具體的倉庫稍微深挖一下實作的程序,具體的原因還是官方檔案的介紹非常模糊,以我來說,第一次看到 Waker 完全不知道怎么用,底層到底是干了什么,"Future be ready to run again" 又是什么意思,如果不稍微看一下 runtime lib 的原始碼,有些東西很難理解,
本文只是簡單介紹了一個 futures-rs 的實作,executor 方面都忽略了很多細節,而 futures-rs 還有大量的擴展代碼藏在 util 目錄下,但是這些東西一般看看檔案就知道大概做了什么,懂得異步的實作原理就知道大概是怎么實作的,如果實在不懂還是可以去看原始碼,
我愿瀟灑如鷹,遠離地上宿命轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/504244.html
標籤:其他
上一篇:Caffeine快取框架入門學習
