我正在試驗 Rocket、Rust 和 SQLx,我想測驗當兩個并行事務嘗試在我的表上插入重復記錄時會發生什么。
我的 insert fn 沒有什么特別的,它作業正常:
async fn insert_credentials<'ex, EX>(&self, executor: EX, credentials: &Credentials) -> Result<u64, Errors>
where
EX: 'ex Executor<'ex, Database = Postgres>,
{
sqlx::query!(
r#"INSERT INTO credentials (username, password)
VALUES ($1, crypt($2, gen_salt('bf')))"#,
credentials.username,
credentials.password,
)
.execute(executor)
.await
.map(|result| result.rows_affected())
.map_err(|err| err.into())
}
但是,我的測驗無限期掛起,因為它等待從未發生過的提交:
#[async_std::test]
async fn it_should_reject_duplicated_username_in_parallel() {
let repo = new_repo();
let db: Pool<Postgres> = connect().await;
let credentials = new_random_credentials();
println!("TX1 begins");
let mut tx1 = db.begin().await.unwrap();
let rows_affected = repo.insert_credentials(&mut tx1, &credentials).await.unwrap();
assert_eq!(rows_affected, 1);
println!("TX2 begins");
let mut tx2 = db.begin().await.unwrap();
println!("It hangs on the next line");
let rows_affected = repo.insert_credentials(&mut tx2, &credentials).await.unwrap();
assert_eq!(rows_affected, 1);
println!("It never reaches this line");
tx1.commit().await.unwrap();
tx2.commit().await.unwrap();
}
如何并行創建和執行這些 TX,以便斷言通過但在嘗試提交第二個 TX 時測驗失敗?
供參考,這是我的Cargo.toml
[package]
name = "auth"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1.52"
serde = "1.0.136"
thiserror = "1.0.30"
# TODO https://github.com/SergioBenitez/Rocket/issues/1893#issuecomment-1002393878
rocket = { git = "https://github.com/SergioBenitez/Rocket", features = ["json"] }
[dependencies.redis]
version = "0.21.5"
features = ["tokio-comp"]
[dependencies.sqlx]
version = "0.5.11"
features = ["macros", "runtime-tokio-rustls", "postgres"]
[dependencies.uuid]
version = "1.0.0-alpha.1"
features = ["v4", "fast-rng", "macro-diagnostics"]
## DEV ##
[dev-dependencies]
mockall = "0.11.0"
[dev-dependencies.async-std]
version = "1.11.0"
features = ["attributes", "tokio1"]
uj5u.com熱心網友回復:
您可以使用async_std::future::timeout或tokio::time::timeout。使用 async_std 的示例:
use async_std::future;
use std::time::Duration;
let max_duration = Duration::from_millis(100);
assert!(timeout(max_duration, tx2.commit()).await.is_err());
如果你想tx2在完成之前繼續tx1,你可以先async_std::task::spawn或tokio::spawntx1:
async_std::task::spawn(async move {
assert!(tx1.commit().await.is_ok());
});
uj5u.com熱心網友回復:
@Mika 為我指出了正確的方向,我可以生成兩個事務并添加一些超時,以便給并發 TX 一些時間來執行。
let handle1 = tokio::spawn(async move {
let repo = new_repo();
let mut tx = db1.begin().await.unwrap();
let rows_affected = repo.insert_credentials(&mut tx, &credentials1).await.unwrap();
assert_eq!(rows_affected, 1);
tokio::time::sleep(Duration::from_millis(100)).await;
tx.commit().await.unwrap()
});
let handle2 = tokio::spawn(async move {
let repo = new_repo();
let mut tx = db2.begin().await.unwrap();
let rows_affected = repo.insert_credentials(&mut tx, &credentials2).await.unwrap();
assert_eq!(rows_affected, 1);
tokio::time::sleep(Duration::from_millis(100)).await;
tx.commit().await.unwrap()
});
let (_first, _second) = rocket::tokio::try_join!(handle1, handle2).unwrap();
我認為這樣兩個 TX 將并行執行,直到睡眠線,然后一個會提交,另一個會在提交線上失敗。但是不,實際上兩個 TX 并行執行,TX1 運行直到睡眠,TX2 在插入線上阻塞,直到 TX1 提交,然后 TX2 在插入線上失敗。
我想這就是 DB 在這種情況下的作業方式,也許我可以通過弄亂 TX 隔離來改變它,但這不是我的意圖。我只是為了了解更多資訊而玩,今天的學習就足夠了 :)
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/455838.html
