本系列是关于用Rust构建一个KV Server的系列文章,内容包括用tokio做底层异步网络通讯、使用toml文件做配置、protobuf做传输协议、内存/RockDB做数据存储、事件通知、优雅关机、并发连接限制及测量监控等。
在上一篇文章中,我们用tokio实现了客户端和服务器的基本框架并设置了toml格式的配置文件。在这一篇文章中,我们参考Redis的命令:GET、SET、PUBLISH和SUBSCRIBE,使用Protobuf来实现客户端与服务器之间的通信协议层。为了处理Protobuf,我们加入了post库。同时加入了tracing库用于日志处理。Cargo.toml如下:
在上一篇文章中,我们用tokio实现了客户端和服务器的基本框架并设置了toml格式的配置文件。在这一篇文章中,我们参考Redis的命令:GET、SET、PUBLISH和SUBSCRIBE,使用Protobuf来实现客户端与服务器之间的通信协议层。为了处理Protobuf,我们加入了post库。同时加入了tracing库用于日志处理。Cargo.toml如下:
Protobuf在项目根目录下新建cmd.proto,加入如下代码:[dependencies]
anyhow = "1"
tokio = { version = "1.19", features = ["full"] }
serde = { version = "1", features = ["derive"] }
toml = "0.5"
tracing = "0.1"
tracing-subscriber = "0.3"
bytes = "1"
prost = "0.11"
[build-dependencies]
prost-build = "0.11"
1syntax="proto3";
2
3packagecmd;
4
5//命令请求
6messageCmdRequest{
7oneofreq_data{
8Getget=1;
9Setset=2;
10Publishpublish=3;
11Subscribesubscribe=4;
12Unsubscribeunsubscribe=5;
13}
14}
15
16//服务器的响应
17messageCmdResponse{
18uint32status=1;
19stringmessage=2;
20bytesvalue=3;
21}
22
23//请求值命令
24messageGet{
25stringkey=1;
26}
27
28//存储值命令
29messageSet{
30stringkey=1;
31bytesvalue=2;
32uint32expire=3;
33}
34
35//向Topic发布值命令
36messagePublish{
37stringtopic=1;
38bytesvalue=2;
39}
40
41//订阅Topic命令
42messageSubscribe{
43stringtopic=1;
44}
45
46//取消订阅命令
47messageUnsubscribe{
48stringtopic=1;
49uint32id=2;
50}
在src目录下创建pb目录,在根目录下创建build.rs文件,加入如下代码:
1fnmain(){
2letmutconf=prost_build::new();
3conf.bytes(&["."]);
4conf.type_attribute(".","#[derive(PartialOrd)]");
5conf.out_dir("src/pb")
6.compile_protos(&["cmd.proto"],&["."])
7.unwrap();
8}
在src/pb目录下已经自动生成了cmd.rs文件。在src/pb目录下创建mod.rs文件,加入如下代码:
1usebytes::Bytes;
2
3usecrate::{cmd_request::ReqData,CmdRequest,Get,Publish,Set,Subscribe,Unsubscribe};
4
5pubmodcmd;
6
7implCmdRequest{
8//GET命令
9pubfnget(key:implInto) ->Self{
10Self{
11req_data:Some(ReqData::Get(Get{key:key.into()})),
12}
13}
14
15//SET命令
16pubfnset(key:implInto,value:Bytes,expire:u32) ->Self{
17Self{
18req_data:Some(ReqData::Set(Set{
19key:key.into(),
20value,
21expire,
22})),
23}
24}
25
26//PUBLISH命令
27pubfnpublish(topic:implInto,value:Bytes) ->Self{
28Self{
29req_data:Some(ReqData::Publish(Publish{
30topic:topic.into(),
31value,
32})),
33}
34}
35
36//订阅命令
37pubfnsubscribe(topic:implInto) ->Self{
38Self{
39req_data:Some(ReqData::Subscribe(Subscribe{
40topic:topic.into(),
41})),
42}
43}
44
45//解除订阅命令
46pubfnunsubscribe(topic:implInto,id:u32) ->Self{
47Self{
48req_data:Some(ReqData::Unsubscribe(Unsubscribe{
49topic:topic.into(),
50id,
51})),
52}
53}
54}
55
56implCmdResponse{
57pubfnnew(status:u32,message:String,value:Bytes)->Self{
58Self{
59status,
60message,
61value,
62}
63}
64}
在 src/lib.rs 中,引入pb模块:
1modpb;
2pubusepb::*;
客户端 & 服务器我们使用tokio-util库的Frame里的LengthDelimitedCodec(根据长度进行编解码)对protobuf协议进行封包解包。在Cargo.toml里加入tokio-util依赖:修改src/bin/kv_server.rs代码:[dependencies]
......
futures = "0.3"
tokio-util = {version = "0.7", features = ["codec"]}
......
1#[tokio::main]
2asyncfnmain()->Result<(),Box> {
3tracing_subscriber::init();
4
5......
6
7loop{
8......
9
10tokio::spawn(asyncmove{
11//使用Frame的LengthDelimitedCodec进行编解码操作
12letmutstream=Framed::new(stream,LengthDelimitedCodec::new());
13whileletSome(Ok(mutbuf))=stream.next().await{
14//对客户端发来的protobuf请求命令进行拆包
15letcmd_req=CmdRequest::decode(&buf[..]).unwrap();
16info!("Receiveacommand:{:?}",cmd_req);
17
18buf.clear();
19
20//对protobuf的请求响应进行封包,然后发送给客户端。
21letcmd_res=CmdResponse::new(200,"success".to_string(),Bytes::default());
22cmd_res.encode(&mutbuf).unwrap();
23stream.send(buf.freeze()).await.unwrap();
24}
25info!("Client{:?}disconnected",addr);
26});
27}
28}
修改src/bin/kv_client.rs代码:
1#[tokio::main]
2asyncfnmain()->Result<(),Box> {
3tracing_subscriber::init();
4
5......
6
7//使用Frame的LengthDelimitedCodec进行编解码操作
8letmutstream=Framed::new(stream,LengthDelimitedCodec::new());
9letmutbuf=BytesMut::new();
10
11//创建GET命令
12letcmd_get=CmdRequest::get("mykey");
13cmd_get.encode(&mutbuf).unwrap();
14
15//发送GET命令
16stream.send(buf.freeze()).await.unwrap();
17info!("Send info successed!");
18
19//接收服务器返回的响应
20whileletSome(Ok(buf))=stream.next().await{
21letcmd_res=CmdResponse::decode(&buf[..]).unwrap();
22info!("Receivearesponse:{:?}",cmd_res);
23}
24
25Ok(())
26}
我们打开二个终端,分别输入以下命令:服务器执行结果:RUST_LOG=info cargo run --bin kv_server
RUST_LOG=info cargo run --bin kv_client
客户端执行结果:INFO kv_server: Listening on 127.0.0.1:19999 ......
INFO kv_server: Client: 127.0.0.1:50655 connected
INFO kv_server: Receive a command: CmdRequest { req_data: Some(Get(Get { key: "mykey" })) }
INFO kv_client: Send info successed!
INFO kv_client: Receive a response: CmdResponse { status: 200, message: "success", value: b"" }
服务器和客户端都正常处理了收到的请求和响应。 下一篇文章我们将在服务器端使用内存来存储客户端发送过来的数据。 完整代码:https://github.com/Justin02180218/kv_server_rust
审核编辑:汤梓红
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。
举报投诉
-
通信协议
+关注
关注
28文章
857浏览量
40254 -
服务器
+关注
关注
12文章
9017浏览量
85182 -
客户端
+关注
关注
1文章
290浏览量
16661
原文标题:用Rust实现KV Server-2 协议层
文章出处:【微信号:Rust语言中文社区,微信公众号:Rust语言中文社区】欢迎添加关注!文章转载请注明出处。
发布评论请先 登录
相关推荐
怎么使用Paho来实现和MQTT服务器的基本通信?
MQTT协议的全称叫“消息队列遥测传输”协议。它是一个轻量级的通信协议。旨在为低带宽、高延时、不稳定网络中的物联网设备提供消息传输服务。它运行在TCP/IP
发表于 09-11 11:52
BTC设备服务器的系统搭建
?:Node.js通信协议:HTTP、Socket3. Node.js 实现 Socket 通信________________________________________Socket 主要作?是
发表于 09-24 09:05
4412开发板Qt网络编程-TCP实现服务器和客户端
Protocol)是一种面向连接的,可靠的,基于字节流的传输层通信协议,传输数据稳定可靠。在 help 索引中搜索到如图 两个重要类:服务器编程中两个类都会用到,客户端编程中只会用到
发表于 04-28 15:33
当WiFi信号变低时,服务器和客户端之间的TCP通信丢失,如何使客户端重新连接?
大家好,
当 WiFi 信号变低时,服务器和客户端之间的 TCP 通信丢失,比如超过 -80dBm。一旦客户端断开连接,它就无法重新连接并正
发表于 05-15 07:31
服务器和客户端之间的TCP通信丢失怎么处理?
嗨,
当 WiFi 信号变低时,比如超过 -80dBm,我面临服务器和客户端之间的 TCP 通信丢失。一旦客户端断开连接,它就无法重新连接并
发表于 05-16 08:19
瘦客户端的服务器和网络技术
基于服务器的计算代表着一种应用程序部署方法。 对以下基于服务器的计算环境而言,瘦客户端是理想的客户端设备: 借助 Windows 2000 或 Windows Server 2003
发表于 07-02 16:46
•17次下载
TCP通信服务器端和客户端同机互传的简单示例程序免费下载
本文档的主要内容详细介绍的是TCP通信服务器端和客户端同机互传的简单示例程序免费下载初学者学习。本例子是本计算机labview程序之间的通信,如果要想
发表于 10-25 08:00
•9次下载
MQTT中服务端和客户端
MQTT 是一种基于客户端-服务端架构(C/S)的消息传输协议,所以在 MQTT 协议通信中,有两个最为重要的角色,它们便是
服务器Server和客户端Client的区别
例如在使用TCP通讯建立连接时采用客户端服务器模式,这种模式又常常被称为主从式架构,简称为C/S结构,属于一种网络通讯架构,将通讯的双方以客户端(Client )与服务器 (Serve
评论