Rust 的连接池不能在测试之间共享

在本文中,我将写一个我在用 Rust 编写数据库连接测试时迷上的故事。

运行示例

例如,考虑测试这样的函数:

use sqlx::{Poxtgres, Row, Executor};
use chrono::{DateTime, Utc};

pub async fn select_now<'c, E>(executor: E) -> DateTime<Utc>
where
    E: Executor<'c, Database = Postgres>,
{
    let row = sqlx::query("SELECT NOW()")
        .fetch_one(executor)
        .await
        .unwrap();
    let now: DateTime<Utc> = row.try_get("now").unwrap();
    now
}

天真测试

让我们从编写最简单的测试开始。

use super::*;

use std::ops::{Add, Sub};
use sqlx::{Connection, PgConnection}
use chrono::Duration;

#[tokio::test]
async fn test_naive() {
    let db_url = std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified");
    let mut conn = PgConnection::connect(&db_url).await.unwrap();
    let now = select_now(&mut conn).await;
    let expected = Utc::now();
    let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
    assert!(range.contains(&now));
}

确认测试成功。

$ cargo test --lib --tests tests::test_naive

running 1 test
test tests::test_naive ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 4 filtered out; finished in 0.12s

重用连接池中的连接

但是,此测试会为每个测试创建一个新连接。让我们共享连接池。

在测试之间共享数据async_once_cell使用 crate 初始化连接池。

use super::*;

use std::ops::{Add, Sub};
use async_once_cell::OnceCell;
use sqlx::{Connection, PgConnection, PgPool}
use chrono::Duration;

async fn get_pool() -> &'static PgPool {
    static POOL: OnceCell<PgPool> = OnceCell::new();
    POOL.get_or_init(async {
        let db_url = std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified");
        PgPool::connect(&db_url).await.unwrap()
    })
    .await
}

#[tokio::test]
async fn test_pool() {
    let pool = get_pool().await;
    let now = select_now(pool).await;
    let expected = Utc::now();
    let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
    assert!(range.contains(&now));
}

让我们运行测试并确保它们通过。

$ cargo test --lib --tests tests::test_pool
running 1 test
test tests::test_pool ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 5 filtered out; finished in 0.13s

看起来这样没有问题,但实际上,如果你用这种写法增加测试次数,测试会随机失败,并出现一个神秘的错误IO driver has terminated

#[tokio::test]
async fn test_pool2() {
    let pool = get_pool().await;
    let now = select_now(pool).await;
    for _ in 1..10 { // 複雑なSQLのシミュレーションとしてfor文で何回も実行する。
        select_now(pool).await;
    }
    let expected = Utc::now();
    let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
    assert!(range.contains(&now));
}

#[tokio::test]
async fn test_pool3() {
    let pool = get_pool().await;
    let now = select_now(pool).await;
    for _ in 1..10 { // 複雑なSQLのシミュレーションとしてfor文で何回も実行する。
        select_now(pool).await;
    }
    let expected = Utc::now();
    let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
    assert!(range.contains(&now));
}
$ cargo test --lib --tests tests::test_pool
running 3 tests
test tests::test_pool ... ok
test tests::test_pool3 ... ok
test tests::test_pool2 ... FAILED

failures:

---- tests::test_pool2 stdout ----
thread 'tests::test_pool2' panicked at 'called `Result::unwrap()` on an `Err` value: Io(Custom { kind: Other, error: "IO driver has terminated" })', examples/async-share/src/lib.rs:12:10
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    tests::test_pool2

test result: FAILED. 2 passed; 1 failed; 0 ignored; 0 measured; 5 filtered out; finished in 0.37s

错误原因调查

现在让我们调试这个神秘的错误。首先,让我们输出日志。

+use env_logger;

 async fn get_pool() -> &'static PgPool {
     static POOL: OnceCell<PgPool> = OnceCell::new();
     POOL.get_or_init(async {
+        std::env::set_var("RUST_LOG", "info");
+        env_logger::init();
         let db_url = std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified");
         PgPool::connect(&db_url).await.unwrap()
     })
     .await
}

当我运行它时,我注意到即使测试成功,也有一个可疑的日志。

[2022-09-27T05:37:56Z WARN  sqlx_core::pool::connection] error occurred while testing the connection on-release: error communicating with database: IO driver has terminated
[2022-09-27T05:37:56Z INFO  sqlx::query] SELECT NOW(); rows affected: 0, rows returned: 1, elapsed: 428.833µs
[2022-09-27T05:37:56Z INFO  sqlx::query] SELECT NOW(); rows affected: 0, rows returned: 1, elapsed: 139.333µs
test tests::test_pool3 ... ok
[2022-09-27T05:37:56Z INFO  sqlx_core::pool::inner] ping on idle connection returned error: error communicating with database: IO driver has terminated

显然,从池中重新获取连接时连接断开。

此日志的原始来源

    if options.test_before_acquire {
        // Check that the connection is still live
        if let Err(e) = conn.ping().await {
            // an error here means the other end has hung up or we lost connectivity
            // either way we're fine to just discard the connection
            // the error itself here isn't necessarily unexpected so WARN is too strong
            log::info!("ping on idle connection returned error: {}", e);
            // connection is broken so don't try to close nicely
            return Err(conn.close_hard().await);
        }
    }

如果连接断开,则有可能由于违反协议而中断通信。

不过我也用wireshark分析过抓包,但是中间好像没有因为未经授权的通讯断开。

问题似乎在 Rust 一方。

经过调试器的进一步调查,我发现发生错误的地方能够找到

问题似乎是下面代码中的self.handle.inner.is_shutdown() 返回true

    fn poll_ready(
        &self,
        cx: &mut Context<'_>,
        direction: Direction,
    ) -> Poll<io::Result<ReadyEvent>> {
        // Keep track of task budget
        let coop = ready!(crate::coop::poll_proceed(cx));
        let ev = ready!(self.shared.poll_readiness(cx, direction));

        if self.handle.inner.is_shutdown() {
            return Poll::Ready(Err(gone()));
        }

        coop.made_progress();
        Poll::Ready(Ok(ev))
    }

为什么is_shutdown 变成了true?当我在此字段设置为true 的位置设置断点时,我能够确认当异步运行时被销毁时is_shutdown 变为true

为什么我不能共享连接

至此,我可以解释为什么会发生错误。关键在于运行测试时异步运行时的生命周期。

async 函数测试具有 #[tokio::test] 属性,它将测试代码转换为:

// 変換前
#[tokio::test]
async fn test_pool() {
  ...
}

// 変換後
#[test]
fn test_pool() {
  let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
  runtime.block_on(async {
    ...
  });
}

换句话说,问题在以下流程中表现出来。

  1. Runtime 在第一个测试用例开始时生成。
  2. 连接池在测试执行期间创建新连接时,会将资源绑定到Runtime IO Driver。
  3. 在测试结束时Runtime 被丢弃,IO 驱动程序is_shutdown 设置为true
  4. 在第二个测试用例开始时生成一个新的Runtime
  5. 从连接池中重用连接,但由于链接了丢弃的IO Driver,确定连接断开。

    由于连接池在获取连接的时候会对连接进行连接检查,如果失败则重新建立连接,即使您使用连接池,也会为每个测试用例重新建立连接, 它的意思是。

    在这种情况下(几乎)没有必要使用连接池。

    解决方法

    这个问题的本质是为每个测试用例生成和销毁异步运行时有一件事。所以共享运行时避免了这个问题。

    fn runtime() -> &'static Runtime {
        static RUNTIME: once_cell::sync::OnceCell<Runtime> = once_cell::sync::OnceCell::new();
        RUNTIME.get_or_init(|| tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap())
    }
    
    #[test]
    fn test_pool_shared_runtime() {
        runtime().block_on(async {
            let pool = get_pool().await;
            let now = select_now(pool).await;
            let expected = Utc::now();
            let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
            assert!(range.contains(&now));
        });
    }
    

    概括

    • 异步 IO 资源由异步运行时管理。
    • 在使用#[tokio::test] 的测试用例中,会为每个测试用例生成并销毁一个异步运行时
    • 如果要共享连接池,还需要共享异步运行时。

原创声明:本文系作者授权爱码网发表,未经许可,不得转载;

原文地址:https://www.likecs.com/show-308627169.html