paint-brush
gRPC 秘密:掌握截止期限、超时和自定义上下文经过@gultm
548 讀數
548 讀數

gRPC 秘密:掌握截止期限、超时和自定义上下文

经过 Tatyana9m2024/08/01
Read on Terminal Reader

太長; 讀書

gRPC 是一个开源远程过程调用 (RPC) 框架。它支持服务之间高效且可扩展的通信。其中一个关键方面是管理截止时间、请求超时和上下文传播。了解这些机制有助于确保服务及时响应。
featured image - gRPC 秘密:掌握截止期限、超时和自定义上下文
Tatyana HackerNoon profile picture

gRPC 是一个开源远程过程调用 (RPC) 框架,可实现服务之间的高效且可扩展的通信。gRPC 的一个重要方面是管理截止时间、请求超时和上下文传播(包括自定义结构)。


了解这些机制有助于确保服务及时响应,资源不会浪费在超出合理时间范围的操作上,并且自定义元数据得到有效传输。

了解截止期限和请求超时

截止日期

gRPC 中的截止期限指定了操作必须完成的最长时间。如果操作未在此时间范围内完成,它将自动终止。截止期限对于确保系统资源不会因服务无响应或缓慢而无限期地被占用至关重要。

请求超时

请求超时是客户端愿意等待服务器响应的时间。如果服务器在此时间内没有响应,则请求将被中止。此机制可防止客户端无限期地等待响应。

在 gRPC 中设置截止期限和请求超时

gRPC 提供了灵活的选项,用于在客户端和服务器端设置截止时间并请求超时。以下是在 Go 中执行此操作的方法:

客户端设置截止日期


import ( "context" "log" "time" "google.golang.org/grpc" pb "path/to/your/protobuf/package" ) func main() { conn, err := grpc.Dial("server_address", grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() client := pb.NewYourServiceClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() resp, err := client.YourMethod(ctx, &pb.YourRequest{}) if err != nil { log.Fatalf("could not call method: %v", err) } log.Printf("Response: %v", resp) }

服务器端处理

在服务器端,gRPC 允许您强制执行截止时间并处理超出客户端指定的截止时间的情况:


 import ( "context" "log" "net" "time" "google.golang.org/grpc" pb "path/to/your/protobuf/package" ) type server struct { pb.UnimplementedYourServiceServer } func (s *server) YourMethod(ctx context.Context, req *pb.YourRequest) (*pb.YourResponse, error) { select { case <-time.After(10 * time.Second): return &pb.YourResponse{}, nil case <-ctx.Done(): return nil, ctx.Err() } } func main() { lis, err := net.Listen("tcp", ":50051") if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterYourServiceServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }


在上下文中传播自定义结构

若要通过 gRPC 中的上下文发送自定义结构,您需要在将数据附加到上下文之前对其进行序列化,然后在接收端对其进行反序列化。这涉及将自定义结构转换为可通过网络传输的格式(例如 JSON 或协议缓冲区),然后将此序列化数据添加到上下文元数据。

逐步流程

  1. 定义您的自定义结构:定义您想要发送的自定义结构。
  2. 序列化结构:将自定义结构转换为字符串或字节数组。
  3. 附加到上下文:将序列化数据添加到上下文元数据。
  4. 传输:发送带有上下文的 gRPC 调用。
  5. 在服务器上提取和反序列化:从服务器端的上下文中提取元数据并将其反序列化回自定义结构。

步骤 1:定义您的自定义结构


type CustomStruct struct { Field1 string Field2 int }


第 2 步:序列化结构


import ( "context" "encoding/json" "fmt" "google.golang.org/grpc/metadata" ) func serializeCustomStruct(customStruct CustomStruct) (string, error) { data, err := json.Marshal(customStruct) if err != nil { return "", err } return string(data), nil }


步骤 3:附加到上下文


func attachCustomStructToContext(ctx context.Context, customStruct CustomStruct) (context.Context, error) { serializedData, err := serializeCustomStruct(customStruct) if err != nil { return nil, err } md := metadata.Pairs("custom-struct", serializedData) ctx = metadata.NewOutgoingContext(ctx, md) return ctx, nil }


步骤 4:传输


func main() { conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() client := pb.NewYourServiceClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() customStruct := CustomStruct{Field1: "value1", Field2: 42} ctx, err = attachCustomStructToContext(ctx, customStruct) if err != nil { log.Fatalf("could not attach custom struct to context: %v", err) } resp, err := client.YourMethod(ctx, &pb.YourRequest{}) if err != nil { log.Fatalf("could not call method: %v", err) } log.Printf("Response: %v", resp) }


步骤 5:在服务器上提取并反序列化


func deserializeCustomStruct(data string) (CustomStruct, error) { var customStruct CustomStruct err := json.Unmarshal([]byte(data), &customStruct) if err != nil { return CustomStruct{}, err } return customStruct, nil } func extractCustomStructFromContext(ctx context.Context) (CustomStruct, error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { return CustomStruct{}, fmt.Errorf("no metadata found in context") } serializedData := md["custom-struct"] if len(serializedData) == 0 { return CustomStruct{}, fmt.Errorf("no custom struct found in metadata") } return deserializeCustomStruct(serializedData[0]) } func (s *server) YourMethod(ctx context.Context, req *pb.YourRequest) (*pb.YourResponse, error) { customStruct, err := extractCustomStructFromContext(ctx) if err != nil { return nil, err } log.Printf("Received custom struct: %+v", customStruct) select { case <-time.After(10 * time.Second): return &pb.YourResponse{}, nil case <-ctx.Done(): return nil, ctx.Err() } }


为所有 gRPC 调用实现中间件

要在所有 gRPC 调用中一致地处理上下文传播(包括自定义结构),您可以使用拦截器。拦截器是处理请求和响应的中间件,可添加日志记录、监控和上下文元数据处理等功能。

一元拦截器和流式拦截器

您需要一元拦截器和流式拦截器来管理不同类型的 RPC 调用:


  • 一元拦截器:处理单个请求-响应周期。


  • 流拦截器:处理请求和响应流,包括客户端流、服务器端流和双向流。

一元拦截器实现

客户端一元拦截器:


 func unaryClientInterceptor( ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption, ) error { customStruct, ok := ctx.Value("customStruct").(CustomStruct) if ok { ctx, err := attachCustomStructToContext(ctx, customStruct) if err != nil { return err } } return invoker(ctx, method, req, reply, cc, opts...) }


服务器端一元拦截器:


 func unaryServerInterceptor( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (interface{}, error) { customStruct, err := extractCustomStructFromContext(ctx) if err != nil { return nil, err } ctx = context.WithValue(ctx, "customStruct", customStruct) return handler(ctx, req) }

流式拦截器实现

客户端流拦截器:


 func streamClientInterceptor( ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption, ) (grpc.ClientStream, error) { customStruct, ok := ctx.Value("customStruct").(CustomStruct) if ok { ctx, err := attachCustomStructToContext(ctx, customStruct) if err != nil { return nil, err } } return


服务器端流拦截器:


 import ( "context" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) // StreamServerInterceptor handles server-side streaming func streamServerInterceptor( srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, ) error { ctx := ss.Context() customStruct, err := extractCustomStructFromContext(ctx) if err != nil { return err } // Add custom struct to context for server handling newCtx := context.WithValue(ctx, "customStruct", customStruct) wrapped := grpc_middleware.WrapServerStream(ss) wrapped.WrappedContext = newCtx // Handle the request return handler(srv, wrapped) } // Example of using the interceptor in a gRPC server setup func main() { lis, err := net.Listen("tcp", ":50051") if err != nil { log.Fatalf("failed to listen: %v", err) } // Register the interceptors server := grpc.NewServer( grpc.UnaryInterceptor(unaryServerInterceptor), grpc.StreamInterceptor(streamServerInterceptor), ) // Register your gRPC service implementations here pb.RegisterYourServiceServer(server, &yourServiceServer{}) if err := server.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }


通过创建和注册一元和流式拦截器,您可以确保在所有 gRPC 调用中一致地处理上下文传播(包括自定义结构)。这种方法可确保您的自定义元数据得到正确管理和传播,从而让您能够构建强大而灵活的 gRPC 服务。