0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Stream的核心概念

科技绿洲 来源:TinyZ 作者:TinyZ 2023-09-19 16:19 次阅读

Stream 是 Rust 语言中的一种迭代器,它可以使得我们在处理数据时更加高效、灵活。Stream 不仅可以处理大量数据,还可以进行异步操作,这使得它在处理网络请求等 IO 操作时非常有用。

Stream 的核心概念是将数据视为流,每次处理一个元素,而不是将整个数据集加载到内存中。这样可以避免内存占用过大的问题,同时也能够提高程序的效率。

基础用法

创建 Stream

在 Rust 中,我们可以使用iter方法来创建 Stream。例如,我们可以使用以下代码来创建一个包含 1 到 5 的 Stream:

let stream = (1..5).into_iter();

这里使用了into_iter方法将一个范围转换为 Stream。

遍历 Stream

遍历 Stream 可以使用for_each方法,例如:

stream.for_each(|x| println!("{}", x));

这里使用了闭包来打印每个元素。

过滤 Stream

我们可以使用filter方法来过滤 Stream 中的元素,例如:

let stream = (1..5).into_iter().filter(|x| x % 2 == 0);

这里使用了闭包来判断元素是否为偶数。

映射 Stream

我们可以使用map方法来对 Stream 中的元素进行映射,例如:

let stream = (1..5).into_iter().map(|x| x * 2);

这里使用了闭包来将每个元素乘以 2。

合并 Stream

我们可以使用chain方法来合并多个 Stream,例如:

let stream1 = (1..3).into_iter();
let stream2 = (4..6).into_iter();
let stream = stream1.chain(stream2);

这里使用了chain方法将两个 Stream 合并为一个。

排序 Stream

我们可以使用sorted方法来对 Stream 中的元素进行排序,例如:

let stream = vec![3, 1, 4, 1, 5, 9].into_iter().sorted();

这里使用了sorted方法将 Stream 中的元素按照升序排序。

取前 n 个元素

我们可以使用take方法来取 Stream 中的前 n 个元素,例如:

let stream = (1..5).into_iter().take(3);

这里使用了take方法取 Stream 中的前 3 个元素。

跳过前 n 个元素

我们可以使用skip方法来跳过 Stream 中的前 n 个元素,例如:

let stream = (1..5).into_iter().skip(2);

这里使用了skip方法跳过 Stream 中的前 2 个元素。

统计元素个数

我们可以使用count方法来统计 Stream 中的元素个数,例如:

let stream = (1..5).into_iter();
let count = stream.count();
println!("{}", count);

这里使用了count方法统计 Stream 中的元素个数,并打印出来。

进阶用法

异步 Stream

在 Rust 中,我们可以使用futures库来创建异步 Stream。例如,我们可以使用以下代码来创建一个异步 Stream:

use futures::stream::StreamExt;

let stream = futures::stream::iter(vec![1, 2, 3]);

这里使用了iter方法来创建一个包含 1 到 3 的异步 Stream。

并行 Stream

在 Rust 中,我们可以使用rayon库来创建并行 Stream。例如,我们可以使用以下代码来创建一个并行 Stream:

rayon = "1.7"
use rayon::iter::ParallelIterator;

let stream = (1..5).into_par_iter();

这里使用了into_par_iter方法将一个范围转换为并行 Stream。

处理 Stream 中的错误

在处理 Stream 时,有时候会出现错误。我们可以使用Result来处理这些错误。例如,我们可以使用以下代码来处理 Stream 中的错误:

let stream = vec![1, 2, "a", 3].into_iter().map(|x| {
    if let Some(y) = x.downcast_ref::< i32 >() {
        Ok(*y)
    } else {
        Err("not a number")
    }
});

for item in stream {
    match item {
        Ok(x) = > println!("{}", x),
        Err(e) = > println!("{}", e),
    }
}

这里使用了downcast_ref方法将元素转换为i32类型,如果转换失败则返回错误。

无限 Stream

在 Rust 中,我们可以使用repeat方法来创建一个无限 Stream。例如,我们可以使用以下代码来创建一个包含无限个 1 的 Stream:

let stream = std::iter::repeat(1);

这里使用了repeat方法将 1 重复无限次。

处理 Stream 中的重复元素

在处理 Stream 时,有时候会出现重复元素的情况。我们可以使用dedup方法来去除 Stream 中的重复元素。例如:

let stream = vec![1, 2, 2, 3, 3, 3].into_iter().dedup();

这里使用了dedup方法去除 Stream 中的重复元素。

处理 Stream 中的空元素

在处理 Stream 时,有时候会出现空元素的情况。我们可以使用filter方法来过滤掉 Stream 中的空元素。例如:

let stream = vec![1, 2, "", 3, "", ""].into_iter().filter(|x| !x.is_empty());

这里使用了filter方法过滤掉 Stream 中的空元素。

处理 Stream 中的 None 值

在处理 Stream 时,有时候会出现 None 值的情况。我们可以使用filter_map方法来过滤掉 Stream 中的 None 值。例如:

let stream = vec![Some(1), None, Some(2), None, Some(3)].into_iter().filter_map(|x| x);

这里使用了filter_map方法过滤掉 Stream 中的 None 值。

处理 Stream 中的重复元素

在处理 Stream 时,有时候会出现重复元素的情况。我们可以使用dedup_by方法来去除 Stream 中的重复元素。例如:

let stream = vec!["a", "b", "bc", "cd", "de", "ef"].into_iter().dedup_by(|a, b| a.chars().next() == b.chars().next());

这里使用了dedup_by方法去除 Stream 中的重复元素,去重条件是元素的首字母相同。

最佳实践

在使用 Stream 时,我们应该注意以下几点:

  • • 尽量使用异步 Stream 来处理 IO 操作,这样可以避免阻塞线程。
  • • 在处理大量数据时,应该使用并行 Stream 来提高程序的效率。
  • • 在处理错误时,应该使用Result来处理错误,避免程序崩溃。
  • • 在处理无限 Stream 时,应该使用take方法限制 Stream 的大小,避免程序无限运行。
  • • 在处理重复元素时,应该使用dedupdedup_by方法去除重复元素,避免重复计算。

示例代码

下面是一个完整的示例代码,演示了如何使用 Stream 来处理数据:

itertools = "0.10.5"
rayon = "1.7"
futures = "0.3.28"
use futures::stream::StreamExt;
use itertools::Itertools;
use rayon::iter::ParallelIterator;

fn main() {
    // 创建Stream
    let stream = (1..5).into_iter();

    // 遍历Stream
    stream.for_each(|x| println!("{}", x));

    // 过滤Stream
    let stream = (1..5).into_iter().filter(|x| x % 2 == 0);
    stream.for_each(|x| println!("{}", x));

    // 映射Stream
    let stream = (1..5).into_iter().map(|x| x * 2);
    stream.for_each(|x| println!("{}", x));

    // 合并Stream
    let stream1 = (1..3).into_iter();
    let stream2 = (4..6).into_iter();
    let stream = stream1.chain(stream2);
    stream.for_each(|x| println!("{}", x));

    // 排序Stream
    let stream = vec![3, 1, 4, 1, 5, 9].into_iter().sorted();
    stream.for_each(|x| println!("{}", x));

    // 取前n个元素
    let stream = (1..5).into_iter().take(3);
    stream.for_each(|x| println!("{}", x));

    // 跳过前n个元素
    let stream = (1..5).into_iter().skip(2);
    stream.for_each(|x| println!("{}", x));

    // 统计元素个数
    let stream = (1..5).into_iter();
    let count = stream.count();
    println!("{}", count);

    // 异步Stream
    let stream = futures::stream::iter(vec![1, 2, 3]);
    futures::executor::block_on(async {
        stream.for_each(|x| async move {
            println!("{}", x);
        }).await;
    });

    // 并行Stream
    let stream = (1..5).into_par_iter();
    stream.for_each(|x| println!("{}", x));

    // 处理Stream中的错误
    let stream = vec![1, 2, "a", 3].into_iter().map(|x| {
        if let Some(y) = x.downcast_ref::< i32 >() {
            Ok(*y)
        } else {
            Err("not a number")
        }
    });

    for item in stream {
        match item {
            Ok(x) = > println!("{}", x),
            Err(e) = > println!("{}", e),
        }
    }

    // 无限Stream
    let stream = std::iter::repeat(1).take(5);
    stream.for_each(|x| println!("{}", x));

    // 处理Stream中的重复元素
    let stream = vec![1, 2, 2, 3, 3, 3].into_iter().dedup();
    stream.for_each(|x| println!("{}", x));

    // 处理Stream中的空元素
    let stream = vec![1, 2, "", 3, "", ""].into_iter().filter(|x| !x.is_empty());
    stream.for_each(|x| println!("{}", x));

    // 处理Stream中的None值
    let stream = vec![Some(1), None, Some(2), None, Some(3)].into_iter().filter_map(|x| x);
    stream.for_each(|x| println!("{}", x));

    // 处理Stream中的重复元素
    let stream = vec!["a", "b", "bc", "cd", "de", "ef"].into_iter().dedup_by(|a, b| a.chars().next() == b.chars().next());
    stream.for_each(|x| println!("{}", x));
}

总结

Stream 是 Rust 语言中非常重要的一个概念,它可以使得我们在处理数据时更加高效、灵活。在使用 Stream 时,我们应该注意异步、并行、错误处理、无限 Stream、重复元素等问题,这样才能写出高效、健壮的程序。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 数据
    +关注

    关注

    8

    文章

    6876

    浏览量

    88806
  • 代码
    +关注

    关注

    30

    文章

    4742

    浏览量

    68330
  • Stream
    +关注

    关注

    0

    文章

    20

    浏览量

    7960
  • rust语言
    +关注

    关注

    0

    文章

    57

    浏览量

    3006
收藏 人收藏

    评论

    相关推荐

    Stream模块的基础用法和进阶用法

    在 Rust 语言中,Tokio 是一个非常流行的异步编程框架。它提供了一系列的模块,其中最常用的就是 Stream 模块。Stream 模块允许我们以异步的方式处理数据流,这在很多情况下非常
    的头像 发表于 09-19 15:33 1149次阅读

    Stream API原理介绍

    原理介绍 Stream API 的核心Stream 接口,它表示一组元素的序列,可以按需进行计算。Stream 接口提供了大量的中间操作和终端操作,可以用于过滤、映射、排序、聚合
    的头像 发表于 09-30 15:31 672次阅读

    Redis Stream应用案例

    摘要: Redis Stream Redis最新的大版本5.0已经RC1了,其中最重要的Feature莫过于Redis Stream了,关于Redis Stream的基本使用介绍和设计理念可以看我
    发表于 06-26 17:15

    请问BF60x中PVP编程stream是一个什么概念

    1:stream 是一个什么概念。是不是特指PVP的输入和输出流?一个pvp的初始化过程只需要调用两次adi_pvp_OpenStream?一次给输入一次给输出? 比如如下函数
    发表于 08-27 11:51

    请问AXI4-Stream到Video核心的技巧有什么?

    大家好。我遇到了xilinx视频内核的问题,并试图解决这个问题好几周但都失败了。有人能给我一些关于AXI4-Stream到Video核心的技巧吗?我试图在我的项目中实现Video Scaler核心
    发表于 11-08 09:53

    git的三个核心概念详解

    git的三个核心概念(工作区,版本库stage,版本库master)
    发表于 12-24 07:17

    AXI-stream数据传输过程

      AXI4-Stream跟AXI4的区别在于AXI4-Stream没有ADDR接口,这样就不涉及读写数据的概念了,只有简单的发送与接收说法,减少了延时,允许无限制的数据突发传输规模
    发表于 01-08 16:52

    Media Stream Processor Environ

    MSP ConsortiumM.100 Revision 1.6Media Stream Processor EnvironmentSpecification This document
    发表于 06-26 10:10 15次下载

    AXI-Stream代码

    AXI-Stream代码详解 AXI4-Stream跟AXI4的区别在于AXI4-Stream没有ADDR接口,这样就不涉及读写数据的概念了,只有简单的发送与接收说法,减少了延时,允许
    的头像 发表于 11-05 17:40 3497次阅读
    AXI-<b class='flag-5'>Stream</b>代码

    Kafka的核心概念

    Kafka 是主流的消息流系统,其中的概念还是比较多的,下面通过图示的方式来梳理一下 Kafka 的核心概念,以便在我们的头脑中有一个清晰的认识。
    的头像 发表于 06-20 14:24 926次阅读

    关于AXI4-Stream协议总结分享

    XI4-Stream跟AXI4的区别就是AXI4-Stream去除了地址线,这样就不涉及读写数据的概念了,只有简单的发送与接收说法,减少了延时。由于AXI4-Stream协议(amba
    的头像 发表于 06-23 10:08 2260次阅读

    ARM SMMU Data structures之Stream Table

    incoming transaction的StreamID可以找到一个STE。SMMU支持两种Stream table格式,格式由Stream table base registers设置。
    的头像 发表于 05-11 09:22 1274次阅读
    ARM SMMU Data structures之<b class='flag-5'>Stream</b> Table

    浅析Stream里的隐式转换

    Stream、Flow是在电路描述里经常用到的对象。
    的头像 发表于 05-15 17:36 453次阅读
    浅析<b class='flag-5'>Stream</b>里的隐式转换

    LogiCORE IP AXI4-Stream FIFO内核解决方案

    LogiCORE IP AXI4-Stream FIFO内核允许以内存映射方式访问一个AXI4-Stream接口。该内核可用于与AXI4-Stream IP接口,类似于LogiCORE IP AXI以太网内核,而无需使用完整的D
    的头像 发表于 09-25 10:55 1388次阅读
    LogiCORE IP AXI4-<b class='flag-5'>Stream</b> FIFO内核解决方案

    Java的Stream的常用知识

    什么是Stream 生产线 Stream就像处理生产流水线一样去工作,传送带就是Stream的管道,每个工厂关注直接的生产,将上游产品加工成下游需要的产品。为什么Stream比传统的处
    的头像 发表于 10-11 15:45 433次阅读
    Java的<b class='flag-5'>Stream</b>的常用知识