0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

如何在同步的Rust方法中调用异步代码呢?

jf_wN0SrCdH 来源:GreptimeDB 2023-03-17 09:18 次阅读

在同步的 Rust 方法中调用异步代码经常会导致一些问题,特别是对于不熟悉异步 Rust runtime 底层原理的初学者。在本文中,我们将讨论我们遇到的一个特殊问题,并分享我们采取的解决方法的经验。

背景和问题

在做GreptimeDB项目的时候,我们遇到一个关于在同步 Rust 方法中调用异步代码的问题。经过一系列故障排查后,我们弄清了问题的原委,这大大加深了对异步 Rust 的理解,因此在这篇文章中分享给大家,希望能给被相似问题困扰的 Rust 开发者一些启发。

我们的整个项目是基于Tokio这个异步 Rust runtime 的,它将协作式的任务运行和调度方便地封装在.await调用中,非常简洁优雅。但是这样也让不熟悉 Tokio 底层原理的用户一不小心就掉入到坑里。

我们遇到的问题是,需要在一个第三方库的 trait 实现中执行一些异步代码,而这个 trait 是同步的,我们无法修改这个 trait 的定义。

traitSequencer{
fngenerate(&self)->Vec;
}

我们用一个PlainSequencer来实现这个 trait ,而在实现generate方法的时候依赖一些异步的调用(比如这里的PlainSequencer::generate_async):

implPlainSequencer{
asyncfngenerate_async(&self)->Vec{
letmutres=vec![];
foriin0..self.bound{
res.push(i);
tokio::sleep(Duration::from_millis(100)).await;
}
res
}
}

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
self.generate_async().await
}
}

这样就会出现问题,因为generate是一个同步方法,里面是不能直接 await 的。

error[E0728]:`await`isonlyallowedinside`async`functionsandblocks
-->src/common/tt.rs30
|
31|/fngenerate(&self)->Vec{
32||self.generate_async().await
||^^^^^^onlyallowedinside`async`functionsandblocks
33||}
||_____-thisisnot`async`

我们首先想到的是,Tokio 的 runtime 有一个Runtime::block_on方法,可以同步地等待一个 future 完成。

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
RUNTIME.block_on(async{
self.generate_async().await
})
}
}

#[cfg(test)]
modtests{
#[tokio::test]
asyncfntest_sync_method(){
letsequencer=PlainSequencer{
bound:3
};
letvec=sequencer.generate();
println!("vec:{:?}",vec);
}
}

编译可以通过,但是运行时直接报错:

Cannotstartaruntimefromwithinaruntime.Thishappensbecauseafunction(like`block_on`)attemptedtoblockthecurrentthreadwhilethethreadisbeingusedtodriveasynchronoustasks.
thread'tests::test_sync_method'panickedat'Cannotstartaruntimefromwithinaruntime.Thishappensbecauseafunction(like`block_on`)attemptedtoblockthecurrentthreadwhilethethreadisbeingusedtodriveasynchronoustasks.',/Users/lei/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/enter.rs9

提示不能从一个执行中的 runtime 直接启动另一个异步 runtime。看来 Tokio 为了避免这种情况特地在Runtime::block_on入口做了检查。既然不行那我们就再看看其他的异步库是否有类似的异步转同步的方法。

果然找到一个futures::block_on。

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
futures::block_on(async{
self.generate_async().await
})
}
}

编译同样没问题,但是运行时代码直接直接 hang 住不返回了。


cargotest--color=always--packagetokio-demo

--bintttests::test_sync_method

--no-fail-fast----format=json

--exact-Zunstable-options--show-output

Compilingtokio-demov0.1.0(/Users/lei/Workspace/Rust/learning/tokio-demo) Finishedtest[unoptimized+debuginfo]target(s)in0.39s Runningunittestssrc/common/tt.rs(target/debug/deps/tt-adb10abca6625c07) {"type":"suite","event":"started","test_count":1} {"type":"test","event":"started","name":"tests::test_sync_method"} #theexecutionjusthangshere:(

明明generate_async方法里面只有一个简单的sleep()调用,但是为什么 future 一直没完成呢?

并且吊诡的是,同样的代码,在tokio::test里面会 hang 住,但是在tokio::main中则可以正常执行完毕:

#[tokio::main]
pubasyncfnmain(){
letsequencer=PlainSequencer{
bound:3
};
letvec=sequencer.generate();
println!("vec:{:?}",vec);
}

执行结果:

cargorun--color=always--packagetokio-demo--bintt
Finisheddev[unoptimized+debuginfo]target(s)in0.05s
Running`target/debug/tt`
vec:[0,1,2]

其实当初真正遇到这个问题的时候定位到具体在哪里 hang 住并没有那么容易。真实代码中 async 执行的是一个远程的 gRPC 调用,当初怀疑过是否是 gRPC server 的问题,动用了网络抓包等等手段最终发现是 client 侧的问题。

这也提醒了我们在出现 bug 的时候,抽象出问题代码的执行模式并且做出一个最小可复现的样例(Minimal Reproducible Example)是非常重要的。

Catchup

在 Rust 中,一个异步的代码块会被make_async_expr编译为一个实现了std::Future的 generator。

#[tokio::test]
asyncfntest_future(){
letfuture=async{
println!("hello");
};

//theaboveasyncblockwon'tgetexecuteduntilweawaitit.
future.await;
}

而.await本质上是一个语法糖,则会被lower_expr_await编译成类似于下面的一个语法结构:

//pseudo-rustcode
match::into_future(){
mut__awaitee=>loop{
matchunsafe{::poll(
<::Pin>::new_unchecked(&mut__awaitee),
::get_context(task_context),
)}{
::Ready(result)=>breakresult,
::Pending=>{}
}
task_context=yield();
}
}

在上面这个去掉了语法糖的伪代码中,可以看到有一个循环不停地检查 generator 的状态是否为已完成(std::poll)。

自然地,必然存在一个组件来做这件事,这里就是 Tokio 和async-std这类异步运行时发挥作用的地方了。Rust 在设计之初就特意将异步的语法(async/await)和异步运行时的实现分开,在上述的示例代码中,poll 的操作是由 Tokio 的 executor 执行的。

问题分析

回顾完背景知识,我们再看一眼方法的实现:

fngenerate(&self)->Vec{
futures::block_on(async{
self.generate_async().await
})
}

调用generate方法的肯定是 Tokio 的 executor,那么 block_on 里面的self.generate_async().await这个 future 又是谁在 poll 呢?

一开始我以为,futures::block_on会有一个内部的 runtime 去负责generate_async的 poll。于是查看了代码(主要是futures_executor::run_executor这个方法):

fnrun_executor)->Poll>(mutf:F)->T{
let_enter=enter().expect(
"cannotexecute`LocalPool`executorfromwithin
anotherexecutor",
);

CURRENT_THREAD_NOTIFY.with(|thread_notify|{
letwaker=waker_ref(thread_notify);
letmutcx=Context::from_waker(&waker);
loop{
ifletPoll::Ready(t)=f(&mutcx){
returnt;
}
letunparked=thread_notify.unparked.swap(false,Ordering::Acquire);
if!unparked{
thread::park();
thread_notify.unparked.store(false,Ordering::Release);
}
}
})
}

立刻嗅到了一丝不对的味道,虽然这个方法名为run_executor,但是整个方法里面貌似没有任何 spawn 的操作,只是在当前线程不停的循环判断用户提交的 future 的状态是否为 ready 啊!

这意味着,当 Tokio 的 runtime 线程执行到这里的时候,会立刻进入一个循环,在循环中不停地判断用户的的 future 是否 ready。如果还是 pending 状态,则将当前线程 park 住。

假设,用户 future 的异步任务也是交给了当前线程去执行,futures::block_on等待用户的 future ready,而用户 future 等待futures::block_on释放当前的线程资源,那么不就死锁了?

这个推论听起来很有道理,让我们来验证一下。既然不能在当前 runtime 线程 block,那就重新开一个 runtime block:

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
letbound=self.bound;
futures::block_on(asyncmove{
RUNTIME.spawn(asyncmove{
letmutres=vec![];
foriin0..bound{
res.push(i);
tokio::sleep(Duration::from_millis(100)).await;
}
res
}).await.unwrap()
})
}
}

果然可以了。


cargotest--color=always--packagetokio-demo

--bintttests::test_sync_method

--no-fail-fast----format=json

--exact-Zunstable-options--show-output

Finishedtest[unoptimized+debuginfo]target(s)in0.04s Runningunittestssrc/common/tt.rs(target/debug/deps/tt-adb10abca6625c07) vec:[0,1,2]

值得注意的是,在futures::block_on里面,额外使用了一个RUNTIME来 spawn 我们的异步代码。其原因还是刚刚所说的,这个异步任务需要一个 runtime 来驱动状态的变化。

如果我们删除 RUNTIME,而为 futures::block_on 生成一个新的线程,虽然死锁问题得到了解决,但tokio::sleep 方法的调用会报错"no reactor is running",这是因为 Tokio 的功能运作需要一个 runtime:

called`Result::unwrap()`onan`Err`value:Any{..}
thread''panickedat'thereisnoreactorrunning,mustbecalledfromthecontextofaTokio1.xruntime',
...

tokio::main和tokio::test

在分析完上面的原因之后,“为什么tokio::main中不会 hang 住而tokio::test会 hang 住?“ 这个问题也很清楚了,他们两者所使用的的 runtime 并不一样。tokio::main使用的是多线程的 runtime,而tokio::test使用的是单线程的 runtime,而在单线程的 runtime 下,当前线程被futures::block_on卡死,那么用户提交的异步代码是一定没机会执行的,从而必然形成上面所说的死锁。

Best practice

经过上面的分析,结合 Rust 基于 generator 的协作式异步特性,我们可以总结出 Rust 下桥接异步代码和同步代码的一些注意事项:

•将异步代码与同步代码结合使用可能会导致阻塞,因此不是一个明智的选择。

•在同步的上下文中调用异步代码时,请使用 futures::block_on 并将异步代码 spawn 到另一个专用的 runtime 中执行 ,因为前者会阻塞当前线程。

•如果必须从异步的上下文中调用有可能阻塞的同步代码(比如文件 IO 等),则建议使用 tokio::spawn_blocking 在专门处理阻塞操作的 executor 上执行相应的代码。





审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • RPC
    RPC
    +关注

    关注

    0

    文章

    111

    浏览量

    11510
  • Asynchrono
    +关注

    关注

    0

    文章

    4

    浏览量

    6519
  • Rust
    +关注

    关注

    1

    文章

    228

    浏览量

    6570

原文标题:如何在同步的 Rust 方法中调用异步代码 | Tokio 使用中的几点教训

文章出处:【微信号:Rust语言中文社区,微信公众号:Rust语言中文社区】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    如何使用Rust语言和rumqttc模块实现MQTT协议的异步API

    的系统编程语言,非常适合开发物联网设备和后端服务。本教程将介绍如何使用Rust语言和rumqttc模块实现MQTT协议的异步API,并提供几个相关的代码示例,最佳实践和教程总结。 本篇内容主要围绕
    的头像 发表于 09-19 14:45 2321次阅读

    何在Rust连接和使用MySQL数据库

    何在Rust连接和使用MySQL数据库。 安装 mysql 模块 这里我们假设你已经安装了Rust编程语言工具链,在本教程,我们将使用
    的头像 发表于 09-30 17:05 1614次阅读

    何在Rust读写文件

    见的内存安全问题和数据竞争问题。 在Rust,读写文件是一项非常常见的任务。本教程将介绍如何在Rust读写文件,包括基础用法和进阶用法。
    的头像 发表于 09-20 10:57 1973次阅读

    Rust的多线程编程概念和使用方法

    Rust是一种强类型、高性能的系统编程语言,其官方文档强调了Rust的标准库具有良好的并发编程支持。Thread是Rust的一种并发编程
    的头像 发表于 09-20 11:15 924次阅读

    异步复位同步撤离是什么意思?如何做到异步复位同步撤离

    复位消抖之后的下一件事,[异步复位]()同步撤离。这句话什么意思
    的头像 发表于 12-04 13:57 4681次阅读
    <b class='flag-5'>异步</b>复位<b class='flag-5'>同步</b>撤离是什么意思?如何做到<b class='flag-5'>异步</b>复位<b class='flag-5'>同步</b>撤离<b class='flag-5'>呢</b>?

    何在电路解决EMC的问题?有什么方法

    何在电路解决EMC的问题?有什么方法
    发表于 09-08 15:28

    USART异步通信同步异步有什么区别

    USART异步通信同步异步有什么区别异步通信怎样连线?
    发表于 12-10 07:34

    同步复位和异步复位到底孰优孰劣

    异步复位,同步释放的理解目录目录同步复位和异步复位异步复位 同步复位 那么
    发表于 01-17 07:01

    如何利用C语言去调用rust静态库

    这篇文章: c语言调用rust库函数按步骤做完,倒是挺顺利,增强了信心。编译arm版静态库上面测试都是在x86上面进行的,嵌入式基本是使用arm和riscv等芯片。考虑到上手门槛,我这里选择了
    发表于 06-21 10:27

    Rust代码中加载静态库时,出现错误 ` rust-lld: error: undefined symbol: malloc `怎么解决?

    我正在 MCUXpresso IDE 创建一个静态库。我正在使用 redlib 在我的代码中导入 ` [i]stdlib.h`。它成功地构建了一个静态库。但是,静态库未定义一些标准库函数,例如
    发表于 06-09 08:44

    FPGA设计异步复位同步释放问题

    异步复位同步释放 首先要说一下同步复位与异步复位的区别。 同步复位是指复位信号在时钟的上升沿或者下降沿才能起作用,而
    发表于 06-07 02:46 2140次阅读

    【FPGA】异步复位,同步释放的理解

    异步复位,同步释放的理解目录目录 同步复位和异步复位 异步复位 同步复位 那么
    发表于 01-17 12:53 4次下载
    【FPGA】<b class='flag-5'>异步</b>复位,<b class='flag-5'>同步</b>释放的理解

    异步信号与同步电路交互的问题及其解决方法

    异步信号与同步电路交互的问题及其解决方法  异步信号和同步电路的交互问题是指在使用异步信号与
    的头像 发表于 12-07 10:53 660次阅读

    何在同步Rust 方法调用异步代码 | Tokio 使用的几点教训

    同步Rust 方法调用异步代码经常会导致一些
    的头像 发表于 12-24 16:23 1289次阅读

    异步电路的时钟同步处理方法

    异步电路的时钟同步处理方法  时钟同步异步电路
    的头像 发表于 01-16 14:42 1079次阅读