gRPC 是一个开源远程过程调用 (RPC) 框架,可实现服务之间的高效且可扩展的通信。gRPC 的一个重要方面是管理截止时间、请求超时和上下文传播(包括自定义结构)。
了解这些机制有助于确保服务及时响应,资源不会浪费在超出合理时间范围的操作上,并且自定义元数据得到有效传输。
gRPC 中的截止期限指定了操作必须完成的最长时间。如果操作未在此时间范围内完成,它将自动终止。截止期限对于确保系统资源不会因服务无响应或缓慢而无限期地被占用至关重要。
请求超时是客户端愿意等待服务器响应的时间。如果服务器在此时间内没有响应,则请求将被中止。此机制可防止客户端无限期地等待响应。
gRPC 提供了灵活的选项,用于在客户端和服务器端设置截止时间并请求超时。以下是在 Go 中执行此操作的方法:
import ( "context" "log" "time" "google.golang.org/grpc" pb "path/to/your/protobuf/package" ) func main() { conn, err := grpc.Dial("server_address", grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() client := pb.NewYourServiceClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() resp, err := client.YourMethod(ctx, &pb.YourRequest{}) if err != nil { log.Fatalf("could not call method: %v", err) } log.Printf("Response: %v", resp) }
在服务器端,gRPC 允许您强制执行截止时间并处理超出客户端指定的截止时间的情况:
import ( "context" "log" "net" "time" "google.golang.org/grpc" pb "path/to/your/protobuf/package" ) type server struct { pb.UnimplementedYourServiceServer } func (s *server) YourMethod(ctx context.Context, req *pb.YourRequest) (*pb.YourResponse, error) { select { case <-time.After(10 * time.Second): return &pb.YourResponse{}, nil case <-ctx.Done(): return nil, ctx.Err() } } func main() { lis, err := net.Listen("tcp", ":50051") if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterYourServiceServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
若要通过 gRPC 中的上下文发送自定义结构,您需要在将数据附加到上下文之前对其进行序列化,然后在接收端对其进行反序列化。这涉及将自定义结构转换为可通过网络传输的格式(例如 JSON 或协议缓冲区),然后将此序列化数据添加到上下文元数据。
type CustomStruct struct { Field1 string Field2 int }
第 2 步:序列化结构
import ( "context" "encoding/json" "fmt" "google.golang.org/grpc/metadata" ) func serializeCustomStruct(customStruct CustomStruct) (string, error) { data, err := json.Marshal(customStruct) if err != nil { return "", err } return string(data), nil }
步骤 3:附加到上下文
func attachCustomStructToContext(ctx context.Context, customStruct CustomStruct) (context.Context, error) { serializedData, err := serializeCustomStruct(customStruct) if err != nil { return nil, err } md := metadata.Pairs("custom-struct", serializedData) ctx = metadata.NewOutgoingContext(ctx, md) return ctx, nil }
步骤 4:传输
func main() { conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() client := pb.NewYourServiceClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() customStruct := CustomStruct{Field1: "value1", Field2: 42} ctx, err = attachCustomStructToContext(ctx, customStruct) if err != nil { log.Fatalf("could not attach custom struct to context: %v", err) } resp, err := client.YourMethod(ctx, &pb.YourRequest{}) if err != nil { log.Fatalf("could not call method: %v", err) } log.Printf("Response: %v", resp) }
步骤 5:在服务器上提取并反序列化
func deserializeCustomStruct(data string) (CustomStruct, error) { var customStruct CustomStruct err := json.Unmarshal([]byte(data), &customStruct) if err != nil { return CustomStruct{}, err } return customStruct, nil } func extractCustomStructFromContext(ctx context.Context) (CustomStruct, error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { return CustomStruct{}, fmt.Errorf("no metadata found in context") } serializedData := md["custom-struct"] if len(serializedData) == 0 { return CustomStruct{}, fmt.Errorf("no custom struct found in metadata") } return deserializeCustomStruct(serializedData[0]) } func (s *server) YourMethod(ctx context.Context, req *pb.YourRequest) (*pb.YourResponse, error) { customStruct, err := extractCustomStructFromContext(ctx) if err != nil { return nil, err } log.Printf("Received custom struct: %+v", customStruct) select { case <-time.After(10 * time.Second): return &pb.YourResponse{}, nil case <-ctx.Done(): return nil, ctx.Err() } }
要在所有 gRPC 调用中一致地处理上下文传播(包括自定义结构),您可以使用拦截器。拦截器是处理请求和响应的中间件,可添加日志记录、监控和上下文元数据处理等功能。
您需要一元拦截器和流式拦截器来管理不同类型的 RPC 调用:
客户端一元拦截器:
func unaryClientInterceptor( ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption, ) error { customStruct, ok := ctx.Value("customStruct").(CustomStruct) if ok { ctx, err := attachCustomStructToContext(ctx, customStruct) if err != nil { return err } } return invoker(ctx, method, req, reply, cc, opts...) }
服务器端一元拦截器:
func unaryServerInterceptor( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (interface{}, error) { customStruct, err := extractCustomStructFromContext(ctx) if err != nil { return nil, err } ctx = context.WithValue(ctx, "customStruct", customStruct) return handler(ctx, req) }
客户端流拦截器:
func streamClientInterceptor( ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption, ) (grpc.ClientStream, error) { customStruct, ok := ctx.Value("customStruct").(CustomStruct) if ok { ctx, err := attachCustomStructToContext(ctx, customStruct) if err != nil { return nil, err } } return
服务器端流拦截器:
import ( "context" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) // StreamServerInterceptor handles server-side streaming func streamServerInterceptor( srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, ) error { ctx := ss.Context() customStruct, err := extractCustomStructFromContext(ctx) if err != nil { return err } // Add custom struct to context for server handling newCtx := context.WithValue(ctx, "customStruct", customStruct) wrapped := grpc_middleware.WrapServerStream(ss) wrapped.WrappedContext = newCtx // Handle the request return handler(srv, wrapped) } // Example of using the interceptor in a gRPC server setup func main() { lis, err := net.Listen("tcp", ":50051") if err != nil { log.Fatalf("failed to listen: %v", err) } // Register the interceptors server := grpc.NewServer( grpc.UnaryInterceptor(unaryServerInterceptor), grpc.StreamInterceptor(streamServerInterceptor), ) // Register your gRPC service implementations here pb.RegisterYourServiceServer(server, &yourServiceServer{}) if err := server.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
通过创建和注册一元和流式拦截器,您可以确保在所有 gRPC 调用中一致地处理上下文传播(包括自定义结构)。这种方法可确保您的自定义元数据得到正确管理和传播,从而让您能够构建强大而灵活的 gRPC 服务。