在同步的 Rust 方法中调用异步代码经常会导致一些问题,特别是对于不熟悉异步 Rust runtime 底层原理的初学者。在本文中,我们将讨论我们遇到的一个特殊问题,并分享我们采取的解决方法的经验。
背景和问题
在做GreptimeDB项目的时候,我们遇到一个关于在同步 Rust 方法中调用异步代码的问题。经过一系列故障排查后,我们弄清了问题的原委,这大大加深了对异步 Rust 的理解,因此在这篇文章中分享给大家,希望能给被相似问题困扰的 Rust 开发者一些启发。
我们的整个项目是基于Tokio这个异步 Rust runtime 的,它将协作式的任务运行和调度方便地封装在.await调用中,非常简洁优雅。但是这样也让不熟悉 Tokio 底层原理的用户一不小心就掉入到坑里。
我们遇到的问题是,需要在一个第三方库的 trait 实现中执行一些异步代码,而这个 trait 是同步的,我们无法修改这个 trait 的定义。
traitSequencer{ fngenerate(&self)->Vec; }
我们用一个PlainSequencer来实现这个 trait ,而在实现generate方法的时候依赖一些异步的调用(比如这里的PlainSequencer::generate_async):
implPlainSequencer{ asyncfngenerate_async(&self)->Vec{ letmutres=vec![]; foriin0..self.bound{ res.push(i); tokio::sleep(Duration::from_millis(100)).await; } res } } implSequencerforPlainSequencer{ fngenerate(&self)->Vec { self.generate_async().await } }
这样就会出现问题,因为generate是一个同步方法,里面是不能直接 await 的。
error[E0728]:`await`isonlyallowedinside`async`functionsandblocks -->src/common/tt.rs30 | 31|/fngenerate(&self)->Vec{ 32||self.generate_async().await ||^^^^^^onlyallowedinside`async`functionsandblocks 33||} ||_____-thisisnot`async`
我们首先想到的是,Tokio 的 runtime 有一个Runtime::block_on方法,可以同步地等待一个 future 完成。
implSequencerforPlainSequencer{ fngenerate(&self)->Vec{ RUNTIME.block_on(async{ self.generate_async().await }) } } #[cfg(test)] modtests{ #[tokio::test] asyncfntest_sync_method(){ letsequencer=PlainSequencer{ bound:3 }; letvec=sequencer.generate(); println!("vec:{:?}",vec); } }
编译可以通过,但是运行时直接报错:
Cannotstartaruntimefromwithinaruntime.Thishappensbecauseafunction(like`block_on`)attemptedtoblockthecurrentthreadwhilethethreadisbeingusedtodriveasynchronoustasks. thread'tests::test_sync_method'panickedat'Cannotstartaruntimefromwithinaruntime.Thishappensbecauseafunction(like`block_on`)attemptedtoblockthecurrentthreadwhilethethreadisbeingusedtodriveasynchronoustasks.',/Users/lei/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/enter.rs9
提示不能从一个执行中的 runtime 直接启动另一个异步 runtime。看来 Tokio 为了避免这种情况特地在Runtime::block_on入口做了检查。既然不行那我们就再看看其他的异步库是否有类似的异步转同步的方法。
果然找到一个futures::block_on。
implSequencerforPlainSequencer{ fngenerate(&self)->Vec{ futures::block_on(async{ self.generate_async().await }) } }
编译同样没问题,但是运行时代码直接直接 hang 住不返回了。
cargotest--color=always--packagetokio-demo
--bintttests::test_sync_method
--no-fail-fast----format=json
--exact-Zunstable-options--show-output
Compilingtokio-demov0.1.0(/Users/lei/Workspace/Rust/learning/tokio-demo) Finishedtest[unoptimized+debuginfo]target(s)in0.39s Runningunittestssrc/common/tt.rs(target/debug/deps/tt-adb10abca6625c07) {"type":"suite","event":"started","test_count":1} {"type":"test","event":"started","name":"tests::test_sync_method"} #theexecutionjusthangshere:(
明明generate_async方法里面只有一个简单的sleep()调用,但是为什么 future 一直没完成呢?
并且吊诡的是,同样的代码,在tokio::test里面会 hang 住,但是在tokio::main中则可以正常执行完毕:
#[tokio::main] pubasyncfnmain(){ letsequencer=PlainSequencer{ bound:3 }; letvec=sequencer.generate(); println!("vec:{:?}",vec); }
执行结果:
cargorun--color=always--packagetokio-demo--bintt Finisheddev[unoptimized+debuginfo]target(s)in0.05s Running`target/debug/tt` vec:[0,1,2]
其实当初真正遇到这个问题的时候定位到具体在哪里 hang 住并没有那么容易。真实代码中 async 执行的是一个远程的 gRPC 调用,当初怀疑过是否是 gRPC server 的问题,动用了网络抓包等等手段最终发现是 client 侧的问题。
这也提醒了我们在出现 bug 的时候,抽象出问题代码的执行模式并且做出一个最小可复现的样例(Minimal Reproducible Example)是非常重要的。
Catchup
在 Rust 中,一个异步的代码块会被make_async_expr编译为一个实现了std::Future的 generator。
#[tokio::test] asyncfntest_future(){ letfuture=async{ println!("hello"); }; //theaboveasyncblockwon'tgetexecuteduntilweawaitit. future.await; }
而.await本质上是一个语法糖,则会被lower_expr_await编译成类似于下面的一个语法结构:
//pseudo-rustcode match::into_future(){ mut__awaitee=>loop{ matchunsafe{::poll( <::Pin>::new_unchecked(&mut__awaitee), ::get_context(task_context), )}{ ::Ready(result)=>breakresult, ::Pending=>{} } task_context=yield(); } }
在上面这个去掉了语法糖的伪代码中,可以看到有一个循环不停地检查 generator 的状态是否为已完成(std::poll)。
自然地,必然存在一个组件来做这件事,这里就是 Tokio 和async-std这类异步运行时发挥作用的地方了。Rust 在设计之初就特意将异步的语法(async/await)和异步运行时的实现分开,在上述的示例代码中,poll 的操作是由 Tokio 的 executor 执行的。
问题分析
回顾完背景知识,我们再看一眼方法的实现:
fngenerate(&self)->Vec{ futures::block_on(async{ self.generate_async().await }) }
调用generate方法的肯定是 Tokio 的 executor,那么 block_on 里面的self.generate_async().await这个 future 又是谁在 poll 呢?
一开始我以为,futures::block_on会有一个内部的 runtime 去负责generate_async的 poll。于是查看了代码(主要是futures_executor::run_executor这个方法):
fnrun_executor)->Poll >(mutf:F)->T{ let_enter=enter().expect( "cannotexecute`LocalPool`executorfromwithin anotherexecutor", ); CURRENT_THREAD_NOTIFY.with(|thread_notify|{ letwaker=waker_ref(thread_notify); letmutcx=Context::from_waker(&waker); loop{ ifletPoll::Ready(t)=f(&mutcx){ returnt; } letunparked=thread_notify.unparked.swap(false,Ordering::Acquire); if!unparked{ thread::park(); thread_notify.unparked.store(false,Ordering::Release); } } }) }
立刻嗅到了一丝不对的味道,虽然这个方法名为run_executor,但是整个方法里面貌似没有任何 spawn 的操作,只是在当前线程不停的循环判断用户提交的 future 的状态是否为 ready 啊!
这意味着,当 Tokio 的 runtime 线程执行到这里的时候,会立刻进入一个循环,在循环中不停地判断用户的的 future 是否 ready。如果还是 pending 状态,则将当前线程 park 住。
假设,用户 future 的异步任务也是交给了当前线程去执行,futures::block_on等待用户的 future ready,而用户 future 等待futures::block_on释放当前的线程资源,那么不就死锁了?
这个推论听起来很有道理,让我们来验证一下。既然不能在当前 runtime 线程 block,那就重新开一个 runtime block:
implSequencerforPlainSequencer{ fngenerate(&self)->Vec{ letbound=self.bound; futures::block_on(asyncmove{ RUNTIME.spawn(asyncmove{ letmutres=vec![]; foriin0..bound{ res.push(i); tokio::sleep(Duration::from_millis(100)).await; } res }).await.unwrap() }) } }
果然可以了。
cargotest--color=always--packagetokio-demo
--bintttests::test_sync_method
--no-fail-fast----format=json
--exact-Zunstable-options--show-output
Finishedtest[unoptimized+debuginfo]target(s)in0.04s Runningunittestssrc/common/tt.rs(target/debug/deps/tt-adb10abca6625c07) vec:[0,1,2]
值得注意的是,在futures::block_on里面,额外使用了一个RUNTIME来 spawn 我们的异步代码。其原因还是刚刚所说的,这个异步任务需要一个 runtime 来驱动状态的变化。
如果我们删除 RUNTIME,而为 futures::block_on 生成一个新的线程,虽然死锁问题得到了解决,但tokio::sleep 方法的调用会报错"no reactor is running",这是因为 Tokio 的功能运作需要一个 runtime:
called`Result::unwrap()`onan`Err`value:Any{..} thread''panickedat'thereisnoreactorrunning,mustbecalledfromthecontextofaTokio1.xruntime', ...
tokio::main和tokio::test
在分析完上面的原因之后,“为什么tokio::main中不会 hang 住而tokio::test会 hang 住?“ 这个问题也很清楚了,他们两者所使用的的 runtime 并不一样。tokio::main使用的是多线程的 runtime,而tokio::test使用的是单线程的 runtime,而在单线程的 runtime 下,当前线程被futures::block_on卡死,那么用户提交的异步代码是一定没机会执行的,从而必然形成上面所说的死锁。
Best practice
经过上面的分析,结合 Rust 基于 generator 的协作式异步特性,我们可以总结出 Rust 下桥接异步代码和同步代码的一些注意事项:
•将异步代码与同步代码结合使用可能会导致阻塞,因此不是一个明智的选择。
•在同步的上下文中调用异步代码时,请使用 futures::block_on 并将异步代码 spawn 到另一个专用的 runtime 中执行 ,因为前者会阻塞当前线程。
•如果必须从异步的上下文中调用有可能阻塞的同步代码(比如文件 IO 等),则建议使用 tokio::spawn_blocking 在专门处理阻塞操作的 executor 上执行相应的代码。
审核编辑:刘清
-
RPC
+关注
关注
0文章
111浏览量
11510 -
Asynchrono
+关注
关注
0文章
4浏览量
6519 -
Rust
+关注
关注
1文章
228浏览量
6570
原文标题:如何在同步的 Rust 方法中调用异步代码 | Tokio 使用中的几点教训
文章出处:【微信号:Rust语言中文社区,微信公众号:Rust语言中文社区】欢迎添加关注!文章转载请注明出处。
发布评论请先 登录
相关推荐
评论