3. 进阶篇
3.1 高级服务实现
流式处理
gRPC 支持多种流式处理模式,包括 Server Streaming、Client Streaming 和 Bidirectional Streaming。
实现 Server Streaming
service Greeter {
rpc SayHelloStream (HelloRequest) returns (stream HelloReply);
}
#[tonic::async_trait]
impl Greeter for MyGreeter {
type SayHelloStreamStream = Pin<Box<dyn Stream<Item = Result<HelloReply, Status>> + Send>>;
async fn say_hello_stream(
&self,
request: Request<HelloRequest>,
) -> Result<Response<Self::SayHelloStreamStream>, Status> {
let name = request.into_inner().name;
let output = async_stream::stream! {
for _ in 0..5 {
let reply = HelloReply {
message: format!("Hello {}!", name),
};
yield Ok(reply);
}
};
Ok(Response::new(Box::pin(output)))
}
}
实现 Client Streaming
service Greeter {
rpc SayHelloClientStream (stream HelloRequest) returns (HelloReply);
}
#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello_client_stream(
&self,
request: Request<Streaming<HelloRequest>>,
) -> Result<Response<HelloReply>, Status> {
let mut stream = request.into_inner();
let mut names = Vec::new();
while let Some(req) = stream.next().await {
let req = req?;
names.push(req.name);
}
let reply = HelloReply {
message: format!("Hello {}!", names.join(", ")),
};
Ok(Response::new(reply))
}
}
处理双向流
service Greeter {
rpc SayHelloBidirectional (stream HelloRequest) returns (stream HelloReply);
}
#[tonic::async_trait]
impl Greeter for MyGreeter {
type SayHelloBidirectionalStream = Pin<Box<dyn Stream<Item = Result<HelloReply, Status>> + Send>>;
async fn say_hello_bidirectional(
&self,
request: Request<Streaming<HelloRequest>>,
) -> Result<Response<Self::SayHelloBidirectionalStream>, Status> {
let mut stream = request.into_inner();
let output = async_stream::stream! {
while let Some(req) = stream.next().await {
let req = req?;
let reply = HelloReply {
message: format!("Hello {}!", req.name),
};
yield Ok(reply);
}
};
Ok(Response::new(Box::pin(output)))
}
}
拦截器
拦截器用于在请求和响应的处理过程中插入自定义逻辑。
使用拦截器处理请求和响应
use tonic::{Interceptor, Request, Response, Status};
#[derive(Clone)]
struct MyInterceptor;
impl Interceptor for MyInterceptor {
fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
let metadata = request.metadata_mut();
metadata.insert("custom-header", "value".parse().unwrap());
Ok(request)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse()?;
let greeter = MyGreeter::default();
let server = Server::builder()
.layer(tonic::service::interceptor(MyInterceptor))
.add_service(GreeterServer::new(greeter))
.serve(addr);
server.await?;
Ok(())
}
实现自定义拦截器
use tonic::{Interceptor, Request, Response, Status};
#[derive(Clone)]
struct MyInterceptor;
impl Interceptor for MyInterceptor {
fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
let metadata = request.metadata_mut();
metadata.insert("custom-header", "value".parse().unwrap());
Ok(request)
}
}
元数据
元数据用于在请求和响应中传递附加信息。
处理请求和响应的元数据
#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
let metadata = request.metadata();
if let Some(value) = metadata.get("custom-header") {
println!("Custom header: {:?}", value);
}
let name = request.into_inner().name;
let reply = HelloReply {
message: format!("Hello {}!", name),
};
let mut response = Response::new(reply);
response.metadata_mut().insert("custom-header", "value".parse().unwrap());
Ok(response)
}
}
自定义元数据
let mut request = tonic::Request::new(HelloRequest {
name: "Tonic".into(),
});
request.metadata_mut().insert("custom-header", "value".parse().unwrap());
let response = client.say_hello(request).await?;
3.2 性能优化
连接池
使用连接池可以有效管理客户端连接,提高性能。
使用连接池管理客户端连接
use tonic::transport::{Channel, ClientTlsConfig};
use tonic::transport::pool::Pool;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let tls = ClientTlsConfig::new()
.domain_name("example.com");
let channel = Channel::from_static("https://example.com")
.tls_config(tls)?
.connect()
.await?;
let pool = Pool::new(channel);
let mut client = GreeterClient::new(pool.clone());
// 调用服务
}
配置连接池参数
use tonic::transport::pool::Pool;
let pool = Pool::new(channel)
.max_size(10)
.idle_timeout(Duration::from_secs(300));
压缩
启用压缩可以减少传输数据量,提高性能。
启用压缩以减少传输数据量
use tonic::transport::{Channel, ClientTlsConfig};
use tonic::codec::CompressionEncoding;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let tls = ClientTlsConfig::new()
.domain_name("example.com");
let channel = Channel::from_static("https://example.com")
.tls_config(tls)?
.connect()
.await?;
let mut client = GreeterClient::new(channel)
.send_gzip()
.accept_gzip();
// 调用服务
}
选择合适的压缩算法
use tonic::codec::CompressionEncoding;
let mut client = GreeterClient::new(channel)
.send_compression_encoding(CompressionEncoding::Gzip)
.accept_compression_encoding(CompressionEncoding::Gzip);
TLS/SSL
配置 TLS/SSL 可以确保通信安全。
配置 TLS/SSL 以确保通信安全
use tonic::transport::{Channel, ClientTlsConfig};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let tls = ClientTlsConfig::new()
.domain_name("example.com");
let channel = Channel::from_static("https://example.com")
.tls_config(tls)?
.connect()
.await?;
let mut client = GreeterClient::new(channel);
// 调用服务
}
生成和使用证书
openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes
use tonic::transport::{Server, Identity, ServerTlsConfig};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse()?;
let greeter = MyGreeter::default();
let cert = std::fs::read_to_string("cert.pem")?;
let key = std::fs::read_to_string("key.pem")?;
let identity = Identity::from_pem(cert, key);
let server = Server::builder()
.tls_config(ServerTlsConfig::new().identity(identity))?
.add_service(GreeterServer::new(greeter))
.serve(addr);
server.await?;
Ok(())
}
3.3 测试与调试
单元测试
编写单元测试代码可以帮助你验证服务的正确性。
编写单元测试代码
#[cfg(test)]
mod tests {
use super::*;
use tonic::Request;
#[tokio::test]
async fn test_say_hello() {
let greeter = MyGreeter::default();
let request = Request::new(HelloRequest {
name: "Tonic".into(),
});
let response = greeter.say_hello(request).await.unwrap();
assert_eq!(response.into_inner().message, "Hello Tonic!");
}
}
使用tonic::transport::Channel
进行测试
use tonic::transport::Channel;
#[tokio::test]
async fn test_client() {
let channel = Channel::from_static("http://[::1]:50051").connect().await.unwrap();
let mut client = GreeterClient::new(channel);
let request = tonic::Request::new(HelloRequest {
name: "Tonic".into(),
});
let response = client.say_hello(request).await.unwrap();
assert_eq!(response.into_inner().message, "Hello Tonic!");
}
集成测试
集成测试可以帮助你验证整个系统的正确性。
编写集成测试代码
#[tokio::test]
async fn test_integration() {
let addr = "[::1]:50051".parse().unwrap();
let greeter = MyGreeter::default();
let server = Server::builder()
.add_service(GreeterServer::new(greeter))
.serve(addr);
tokio::spawn(server);
let channel = Channel::from_static("http://[::1]:50051").connect().await.unwrap();
let mut client = GreeterClient::new(channel);
let request = tonic::Request::new(HelloRequest {
name: "Tonic".into(),
});
let response = client.say_hello(request).await.unwrap();
assert_eq!(response.into_inner().message, "Hello Tonic!");
}
使用tonic::transport::Server
进行测试
use tonic::transport::Server;
#[tokio::test]
async fn test_server() {
let addr = "[::1]:50051".parse().unwrap();
let greeter = MyGreeter::default();
let server = Server::builder()
.add_service(GreeterServer::new(greeter))
.serve(addr);
tokio::spawn(server);
// 其他测试代码
}
调试工具
使用调试工具可以帮助你更好地理解和调试异步代码。
使用tokio-console
进行异步调试
cargo install tokio-console
在代码中启用tokio-console
:
#[tokio::main]
async fn main() {
console_subscriber::init();
// 你的异步代码
}
使用tracing
进行日志记录
use tracing::{info, error};
fn main() {
tracing_subscriber::fmt::init();
info!("Application started");
// 你的代码
error!("An error occurred");
}
通过这篇进阶教程,你已经掌握了 Tonic 的高级技巧和优化方法。无论是处理复杂的服务实现、优化性能,还是进行测试与调试,Tonic 都提供了强大的工具和灵活的配置选项,帮助你在 Rust 项目中高效地使用 gRPC。
无论身在何处
有我不再孤单孤单
长按识别二维码关注我们