Rust 的连接池不能在测试之间共享
在本文中,我将写一个我在用 Rust 编写数据库连接测试时迷上的故事。
运行示例
例如,考虑测试这样的函数:
use sqlx::{Poxtgres, Row, Executor};
use chrono::{DateTime, Utc};
pub async fn select_now<'c, E>(executor: E) -> DateTime<Utc>
where
E: Executor<'c, Database = Postgres>,
{
let row = sqlx::query("SELECT NOW()")
.fetch_one(executor)
.await
.unwrap();
let now: DateTime<Utc> = row.try_get("now").unwrap();
now
}
天真测试
让我们从编写最简单的测试开始。
use super::*;
use std::ops::{Add, Sub};
use sqlx::{Connection, PgConnection}
use chrono::Duration;
#[tokio::test]
async fn test_naive() {
let db_url = std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified");
let mut conn = PgConnection::connect(&db_url).await.unwrap();
let now = select_now(&mut conn).await;
let expected = Utc::now();
let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
assert!(range.contains(&now));
}
确认测试成功。
$ cargo test --lib --tests tests::test_naive
running 1 test
test tests::test_naive ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 4 filtered out; finished in 0.12s
重用连接池中的连接
但是,此测试会为每个测试创建一个新连接。让我们共享连接池。
在测试之间共享数据async_once_cell使用 crate 初始化连接池。
use super::*;
use std::ops::{Add, Sub};
use async_once_cell::OnceCell;
use sqlx::{Connection, PgConnection, PgPool}
use chrono::Duration;
async fn get_pool() -> &'static PgPool {
static POOL: OnceCell<PgPool> = OnceCell::new();
POOL.get_or_init(async {
let db_url = std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified");
PgPool::connect(&db_url).await.unwrap()
})
.await
}
#[tokio::test]
async fn test_pool() {
let pool = get_pool().await;
let now = select_now(pool).await;
let expected = Utc::now();
let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
assert!(range.contains(&now));
}
让我们运行测试并确保它们通过。
$ cargo test --lib --tests tests::test_pool
running 1 test
test tests::test_pool ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 5 filtered out; finished in 0.13s
看起来这样没有问题,但实际上,如果你用这种写法增加测试次数,测试会随机失败,并出现一个神秘的错误IO driver has terminated
。
#[tokio::test]
async fn test_pool2() {
let pool = get_pool().await;
let now = select_now(pool).await;
for _ in 1..10 { // 複雑なSQLのシミュレーションとしてfor文で何回も実行する。
select_now(pool).await;
}
let expected = Utc::now();
let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
assert!(range.contains(&now));
}
#[tokio::test]
async fn test_pool3() {
let pool = get_pool().await;
let now = select_now(pool).await;
for _ in 1..10 { // 複雑なSQLのシミュレーションとしてfor文で何回も実行する。
select_now(pool).await;
}
let expected = Utc::now();
let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
assert!(range.contains(&now));
}
$ cargo test --lib --tests tests::test_pool
running 3 tests
test tests::test_pool ... ok
test tests::test_pool3 ... ok
test tests::test_pool2 ... FAILED
failures:
---- tests::test_pool2 stdout ----
thread 'tests::test_pool2' panicked at 'called `Result::unwrap()` on an `Err` value: Io(Custom { kind: Other, error: "IO driver has terminated" })', examples/async-share/src/lib.rs:12:10
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
failures:
tests::test_pool2
test result: FAILED. 2 passed; 1 failed; 0 ignored; 0 measured; 5 filtered out; finished in 0.37s
错误原因调查
现在让我们调试这个神秘的错误。首先,让我们输出日志。
+use env_logger;
async fn get_pool() -> &'static PgPool {
static POOL: OnceCell<PgPool> = OnceCell::new();
POOL.get_or_init(async {
+ std::env::set_var("RUST_LOG", "info");
+ env_logger::init();
let db_url = std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified");
PgPool::connect(&db_url).await.unwrap()
})
.await
}
当我运行它时,我注意到即使测试成功,也有一个可疑的日志。
[2022-09-27T05:37:56Z WARN sqlx_core::pool::connection] error occurred while testing the connection on-release: error communicating with database: IO driver has terminated
[2022-09-27T05:37:56Z INFO sqlx::query] SELECT NOW(); rows affected: 0, rows returned: 1, elapsed: 428.833µs
[2022-09-27T05:37:56Z INFO sqlx::query] SELECT NOW(); rows affected: 0, rows returned: 1, elapsed: 139.333µs
test tests::test_pool3 ... ok
[2022-09-27T05:37:56Z INFO sqlx_core::pool::inner] ping on idle connection returned error: error communicating with database: IO driver has terminated
显然,从池中重新获取连接时连接断开。
if options.test_before_acquire {
// Check that the connection is still live
if let Err(e) = conn.ping().await {
// an error here means the other end has hung up or we lost connectivity
// either way we're fine to just discard the connection
// the error itself here isn't necessarily unexpected so WARN is too strong
log::info!("ping on idle connection returned error: {}", e);
// connection is broken so don't try to close nicely
return Err(conn.close_hard().await);
}
}
如果连接断开,则有可能由于违反协议而中断通信。
不过我也用wireshark分析过抓包,但是中间好像没有因为未经授权的通讯断开。
问题似乎在 Rust 一方。
经过调试器的进一步调查,我发现发生错误的地方能够找到
问题似乎是下面代码中的self.handle.inner.is_shutdown()
返回true
。
fn poll_ready(
&self,
cx: &mut Context<'_>,
direction: Direction,
) -> Poll<io::Result<ReadyEvent>> {
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));
if self.handle.inner.is_shutdown() {
return Poll::Ready(Err(gone()));
}
coop.made_progress();
Poll::Ready(Ok(ev))
}
为什么is_shutdown
变成了true
?当我在此字段设置为true
的位置设置断点时,我能够确认当异步运行时被销毁时is_shutdown
变为true
。
为什么我不能共享连接
至此,我可以解释为什么会发生错误。关键在于运行测试时异步运行时的生命周期。
async
函数测试具有 #[tokio::test]
属性,它将测试代码转换为:
// 変換前
#[tokio::test]
async fn test_pool() {
...
}
// 変換後
#[test]
fn test_pool() {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
runtime.block_on(async {
...
});
}
换句话说,问题在以下流程中表现出来。
Runtime
在第一个测试用例开始时生成。- 连接池在测试执行期间创建新连接时,会将资源绑定到
Runtime
IO Driver。 - 在测试结束时
Runtime
被丢弃,IO 驱动程序is_shutdown
设置为true
- 在第二个测试用例开始时生成一个新的
Runtime
- 从连接池中重用连接,但由于链接了丢弃的IO Driver,确定连接断开。
由于连接池在获取连接的时候会对连接进行连接检查,如果失败则重新建立连接,即使您使用连接池,也会为每个测试用例重新建立连接, 它的意思是。
在这种情况下(几乎)没有必要使用连接池。
解决方法
这个问题的本质是为每个测试用例生成和销毁异步运行时有一件事。所以共享运行时避免了这个问题。
fn runtime() -> &'static Runtime { static RUNTIME: once_cell::sync::OnceCell<Runtime> = once_cell::sync::OnceCell::new(); RUNTIME.get_or_init(|| tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap()) } #[test] fn test_pool_shared_runtime() { runtime().block_on(async { let pool = get_pool().await; let now = select_now(pool).await; let expected = Utc::now(); let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1)); assert!(range.contains(&now)); }); }
概括
- 异步 IO 资源由异步运行时管理。
- 在使用
#[tokio::test]
的测试用例中,会为每个测试用例生成并销毁一个异步运行时 - 如果要共享连接池,还需要共享异步运行时。
原创声明:本文系作者授权爱码网发表,未经许可,不得转载;
原文地址:https://www.likecs.com/show-308627169.html
- 上一篇 »python中进程池和回调函数
- 下一篇 »java 数据库连接池常见分类和使用