在前几篇文章中,我们已经掌握了 Protobuf 的基础语法、高级特性和序列化反序列化操作。本篇文章将深入讲解 gRPC 与 Protobuf 的集成,重点介绍如何通过 .proto
文件定义服务接口,并在 Go 和 Java 中实现 gRPC 服务与客户端的完整交互流程。我们将通过详细代码示例和分步解析,帮助你彻底掌握微服务架构中的通信设计。
一、gRPC 简介与核心概念
1. 什么是 gRPC?
gRPC 是一个高性能、开源的远程过程调用(RPC)框架,基于 HTTP/2 协议 和 Protobuf 数据格式 构建。它支持多种语言,并提供了同步/异步调用、流式通信等特性。
2. gRPC 的核心优势
特性 | 描述 |
---|---|
高效通信 | 基于二进制协议(Protobuf),比 JSON 更快、更小 |
多语言支持 | 支持 Go、Java、Python、C++、Rust 等 |
双向流式通信 | 支持客户端/服务端流式数据传输 |
自动代码生成 | 通过 .proto 文件自动生成客户端和服务端代码 |
强大的工具链 | 提供调试工具(如 grpcurl )、插件系统 |
二、通过 .proto
定义 gRPC 服务
1. 示例 .proto
文件
syntax = "proto3";
package user;//新版本有了下面的option go_package 这里的pacage就可以去掉了(当然留着也不影响)
option go_package = "/user;user"; // 指定生成的 Go 包路径(生成源码的路径和包名,前面是路径后面是包名,可以自己定义)
//option go_package = ".;user"; //这个可以生成在当前目录下
// 定义服务接口
service UserService {
// 1. 单向调用(Unary RPC)
rpc GetUser (GetUserRequest) returns (UserResponse);
// 2. 服务端流式调用(Server Streaming)
rpc ListUsers (ListUsersRequest) returns (stream UserResponse);
// 3. 客户端流式调用(Client Streaming)
rpc CreateUsers (stream CreateUserRequest) returns (CreateUsersResponse);
// 4. 双向流式调用(Bidirectional Streaming)
rpc UpdateUsers (stream UpdateUserRequest) returns (stream UserResponse);
}
// 消息定义
message GetUserRequest {
int32 id = 1;
}
message UserResponse {
int32 id = 1;
string name = 2;
string email = 3;
}
message ListUsersRequest {
string filter = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
}
message CreateUsersResponse {
int32 count = 1;
}
message UpdateUserRequest {
int32 id = 1;
string name = 2;
}
要注意下面这里有了变化(以后会讲解为什么要用option go_package):
package user;//新版本有了下面的option go_package 这里的pacage就可以去掉了(当然留着也不影响)
option go_package = "/user;user"; // 指定生成的 Go 包路径(生成源码的路径和包名,前面是路径后面是包名,可以自己定义)
//option go_package = ".;user"; //这个可以生成在当前目录下
三、生成 gRPC 代码
1. 安装 gRPC 工具
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
Java
protoc --java_out=. \
--grpc-java_out=. \
--plugin=protoc-gen-grpc-java=protoc-gen-grpc-java \
user.proto
2. 生成代码命令
Go
protoc --go_out=. --go-grpc_out=. user.proto
//protoc --go_out=. --go-grpc_out=. user.proto
//这个命令使用了两个输出插件:--go_out=. 和 --go-grpc_out=.。它分别调用了 Go 相关的 Protobuf 插件和 gRPC Go 插件来生成对应的目标文件。其中:
//--go_out=. 表示使用 Go 的 Protobuf 编译插件生成对应的 Go 文件。
//--go-grpc_out=. 表示使用 Go 的 gRPC 编译插件生成 gRPC 服务相关的 Go 文件。
Java
protoc --java_out=. --grpc-java_out=. user.proto
四、Go 实现 gRPC 服务端与客户端
1. 服务端代码详解
package main
import (
"context"
"fmt"
"log"
"net"
pb "./user_go_proto"
"google.golang.org/grpc"
)
type userService struct {
pb.UnimplementedUserServiceServer
}
// 单向调用
func (s *userService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.UserResponse, error) {
return &pb.UserResponse{
Id: req.Id,
Name: "Alice",
Email: "alice@example.com",
}, nil
}
// 服务端流式调用
func (s *userService) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
users := []*pb.UserResponse{
{Id: 1, Name: "Alice", Email: "alice@example.com"},
{Id: 2, Name: "Bob", Email: "bob@example.com"},
}
for _, user := range users {
if err := stream.Send(user); err != nil {
return err
}
}
return nil
}
// 客户端流式调用
func (s *userService) CreateUsers(stream pb.UserService_CreateUsersServer) error {
count := 0
for {
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
count++
}
return stream.SendAndClose(&pb.CreateUsersResponse{Count: int32(count)})
}
// 双向流式调用
func (s *userService) UpdateUsers(stream pb.UserService_UpdateUsersServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
resp := &pb.UserResponse{
Id: req.Id,
Name: req.Name,
}
if err := stream.Send(resp); err != nil {
return err
}
}
return nil
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterUserServiceServer(s, &userService{})
log.Printf("Server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
代码解析
- 服务端实现:通过
pb.RegisterUserServiceServer
注册服务。 - 流式处理:通过
stream
接口处理双向通信。 - 错误处理:捕获
io.EOF
结束流式调用。
2. 客户端代码详解
package main
import (
"context"
"fmt"
"log"
pb "./user_go_proto"
"google.golang.org/grpc"
)
func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
client := pb.NewUserServiceClient(conn)
// 单向调用
resp, err := client.GetUser(context.Background(), &pb.GetUserRequest{Id: 1})
if err != nil {
log.Fatalf("could not get user: %v", err)
}
fmt.Printf("User: %v\n", resp)
// 服务端流式调用
stream, err := client.ListUsers(context.Background(), &pb.ListUsersRequest{Filter: "IT"})
if err != nil {
log.Fatalf("could not list users: %v", err)
}
for {
user, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("error receiving user: %v", err)
}
fmt.Printf("Received: %v\n", user)
}
// 客户端流式调用
stream2, err := client.CreateUsers(context.Background())
if err != nil {
log.Fatalf("could not create users: %v", err)
}
for i := 0; i < 3; i++ {
if err := stream2.Send(&pb.CreateUserRequest{
Name: fmt.Sprintf("User %d", i),
Email: fmt.Sprintf("user%d@example.com", i),
}); err != nil {
log.Fatalf("error sending user: %v", err)
}
}
resp2, err := stream2.CloseAndRecv()
if err != nil {
log.Fatalf("error closing stream: %v", err)
}
fmt.Printf("Created %d users\n", resp2.Count)
// 双向流式调用
stream3, err := client.UpdateUsers(context.Background())
if err != nil {
log.Fatalf("could not update users: %v", err)
}
for i := 0; i < 3; i++ {
if err := stream3.Send(&pb.UpdateUserRequest{
Id: int32(i),
Name: fmt.Sprintf("Updated User %d", i),
}); err != nil {
log.Fatalf("error sending update: %v", err)
}
resp3, err := stream3.Recv()
if err != nil {
log.Fatalf("error receiving update: %v", err)
}
fmt.Printf("Updated: %v\n", resp3)
}
}
代码解析
- 连接建立:通过
grpc.Dial
连接服务端。 - 流式调用:通过
stream.Send()
和stream.Recv()
实现双向通信。 - 错误处理:捕获
io.EOF
结束流式调用。
五、Java 实现 gRPC 服务端与客户端
1. 服务端代码详解
import user.UserServiceGrpc;
import user.GetUserRequest;
import user.UserResponse;
import user.ListUsersRequest;
import user.CreateUserRequest;
import user.CreateUsersResponse;
import user.UpdateUserRequest;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class UserServiceServer {
public static void main(String[] args) throws IOException, InterruptedException {
final Server server = ServerBuilder.forPort(50051)
.addService(new UserServiceImpl())
.build();
server.start();
System.out.println("Server started at port 50051");
final CountDownLatch latch = new CountDownLatch(1);
server.awaitTermination();
}
static class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase {
// 单向调用
@Override
public void getUser(GetUserRequest request, StreamObserver<UserResponse> responseObserver) {
UserResponse response = UserResponse.newBuilder()
.setId(request.getId())
.setName("Alice")
.setEmail("alice@example.com")
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
// 服务端流式调用
@Override
public void listUsers(ListUsersRequest request, StreamObserver<UserResponse> responseObserver) {
UserResponse user1 = UserResponse.newBuilder()
.setId(1)
.setName("Alice")
.setEmail("alice@example.com")
.build();
UserResponse user2 = UserResponse.newBuilder()
.setId(2)
.setName("Bob")
.setEmail("bob@example.com")
.build();
responseObserver.onNext(user1);
responseObserver.onNext(user2);
responseObserver.onCompleted();
}
// 客户端流式调用
@Override
public StreamObserver<CreateUserRequest> createUsers(StreamObserver<CreateUsersResponse> responseObserver) {
return new StreamObserver<>() {
int count = 0;
@Override
public void onNext(CreateUserRequest request) {
count++;
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
CreateUsersResponse response = CreateUsersResponse.newBuilder()
.setCount(count)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
};
}
// 双向流式调用
@Override
public StreamObserver<UpdateUserRequest> updateUsers(StreamObserver<UserResponse> responseObserver) {
return new StreamObserver<>() {
@Override
public void onNext(UpdateUserRequest request) {
UserResponse response = UserResponse.newBuilder()
.setId(request.getId())
.setName(request.getName())
.build();
responseObserver.onNext(response);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
}
代码解析
- 服务端实现:通过继承
UserServiceGrpc.UserServiceImplBase
实现接口。 - 流式处理:通过
StreamObserver
处理双向通信。 - 错误处理:通过
onError
捕获异常。
2. 客户端代码详解
import user.UserServiceGrpc;
import user.GetUserRequest;
import user.UserResponse;
import user.ListUsersRequest;
import user.CreateUserRequest;
import user.CreateUsersResponse;
import user.UpdateUserRequest;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
public class UserServiceClient {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
UserServiceGrpc.UserServiceBlockingStub blockingStub = UserServiceGrpc.newBlockingStub(channel);
// 单向调用
GetUserRequest request = GetUserRequest.newBuilder().setId(1).build();
try {
UserResponse response = blockingStub.getUser(request);
System.out.println("User: " + response.getName());
} catch (StatusRuntimeException e) {
e.printStackTrace();
}
// 服务端流式调用
ListUsersRequest listRequest = ListUsersRequest.newBuilder().setFilter("IT").build();
UserServiceGrpc.UserServiceStub asyncStub = UserServiceGrpc.newStub(channel);
asyncStub.listUsers(listRequest, new StreamObserver<>() {
@Override
public void onNext(UserResponse user) {
System.out.println("Received: " + user.getName());
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Stream completed.");
}
});
// 客户端流式调用
UserServiceGrpc.UserServiceStub createStub = UserServiceGrpc.newStub(channel);
createStub.createUsers(new StreamObserver<>() {
@Override
public void onNext(CreateUsersResponse response) {
System.out.println("Created " + response.getCount() + " users");
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Create stream completed.");
}
}).forEachRemaining(user -> {
if (user != null) {
System.out.println("Sending: " + user.getName());
}
});
// 双向流式调用
UserServiceGrpc.UserServiceStub updateStub = UserServiceGrpc.newStub(channel);
StreamObserver<UpdateUserRequest> requestStream = updateStub.updateUsers(new StreamObserver<>() {
@Override
public void onNext(UserResponse response) {
System.out.println("Updated: " + response.getName());
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Update stream completed.");
}
});
for (int i = 0; i < 3; i++) {
UpdateUserRequest updateRequest = UpdateUserRequest.newBuilder()
.setId(i)
.setName("Updated User " + i)
.build();
requestStream.onNext(updateRequest);
}
requestStream.onCompleted();
try {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
代码解析
- 连接建立:通过
ManagedChannelBuilder
连接服务端。 - 流式调用:通过
StreamObserver
实现双向通信。 - 错误处理:通过
onError
捕获异常。
六、多语言交互的最佳实践
1. 保持 .proto
文件统一
- 所有语言共享同一个
.proto
文件,确保接口定义一致。 - 使用
protoc
生成对应语言的代码。
2. 版本控制
- 在
.proto
文件中添加版本注释:// Version 1.0.0 message User { string name = 1; }
3. 依赖管理
- 使用
go mod
或Maven
管理依赖,确保不同语言的代码版本一致。
注意:
这篇文章中使用的Go和Java 实现 gRPC 服务端与客户端的例子是二者分开用的,而不是混合语言,其实在这里我更想做的是Go和Java放在一起使用,比如Go做服务端,Java做客户端。原因是我觉得Go更适合grpc,所以大家着重看Go的讲解即可。如果要混合的话也是以Go为主,Java为辅。
这次没有使用多语言的原因是,突然混合在一起的话怕大家不好理解,我在其他文章中也有讲解跨语言使用的例子,大家有兴趣的可以去看看。
七、总结
在本文中,我们详细讲解了 gRPC 与 Protobuf 的深度集成,包括:
- 通过
.proto
文件定义服务接口 - 在 Go 和 Java 中实现服务端与客户端
- 单向、流式通信的完整代码示例
- 多语言交互的最佳实践
通过这些内容,你已经能够构建高性能、可扩展的微服务系统,并在不同语言之间实现无缝通信。gRPC 与 Protobuf 的结合是现代分布式系统的基石,希望这篇文章能帮助你更自信地在项目中应用这些技术。