gRPCClient.jl
gRPCClient.jl aims to be a production grade gRPC client emphasizing performance and reliability.
Features
- Unary+Streaming RPC
- HTTP/2 connection multiplexing
- Synchronous and asynchronous interfaces
- Thread safe
- SSL/TLS
The client is missing a few features which will be added over time if there is sufficient interest:
- OAuth2
- Compression
Getting Started
Test gRPC Server
All examples in the documentation are run against a test server written in Go. You can run it by doing the following:
cd test/go
# Build
go build -o grpc_test_server
# Run
./grpc_test_serverCode Generation
Note: support for this is currently being upstreamed into ProtoBuf.jl. Until then, make sure you add the feature branch with gRPC code generation support:
pkg> add https://github.com/csvance/ProtoBuf.jl#external-service-support
gRPCClient.jl integrates with ProtoBuf.jl to automatically generate Julia client stubs for calling gRPC.
using ProtoBuf
using gRPCClient
# Register our service codegen with ProtoBuf.jl
grpc_register_service_codegen()
# Creates Julia bindings for the messages and RPC defined in test.proto
protojl("test/proto/test.proto", ".", "test/gen")Example Usage
See here for examples covering all provided interfaces for both unary and streaming gRPC calls.
API
Package Initialization / Shutdown
gRPCClient.grpc_init — Method
grpc_init([grpc_curl::gRPCCURL])Initializes the gRPCCURL object. This should be called once before making gRPC calls. There is no harm in calling this more than once (ie by different packages/dependencies). Typical usage looks like this:
grpc_init()
client = TestService_TestRPC_Client("172.238.177.88", 8001)
# Make some gRPC calls
# Shut down the global gRPC handle
grpc_shutdown()Unless specifying a gRPCCURL the global one provided by grpc_global_handle() is used. Each gRPCCURL state has its own connection pool and request semaphore, so sometimes you may want to manage your own like shown below:
grpc_myapp = gRPCCURL()
grpc_init(grpc_myapp)
client = TestService_TestRPC_Client("172.238.177.88", 8001; grpc=grpc_myapp)
# Make some gRPC calls
# Only shuts down your gRPC handle
grpc_shutdown(grpc_myapp)gRPCClient.grpc_shutdown — Method
grpc_shutdown([grpc_curl::gRPCCURL])Shuts down the gRPCCURL. This neatly cleans up all active connections and requests. Useful for calling during development with Revise. Unless specifying the gRPCCURL, the global one provided by grpc_global_handle() is shutdown.
gRPCClient.grpc_global_handle — Method
grpc_global_handle()Returns the global gRPCCURL state which contains a libCURL multi handle. By default all gRPC clients use this multi in order to ensure that HTTP/2 multiplexing happens where possible.
Generated ServiceClient Constructors
When you generate service stubs using ProtoBuf.jl, a constructor method is automatically created for each RPC endpoint. These constructors create gRPCServiceClient instances that are used to make RPC calls.
Constructor Signature
For a service method named TestRPC in service TestService, the generated constructor will have the form:
TestService_TestRPC_Client(
host, port;
secure=false,
grpc=grpc_global_handle(),
deadline=10,
keepalive=60,
max_send_message_length = 4*1024*1024,
max_recieve_message_length = 4*1024*1024,
)Parameters
host: The hostname or IP address of the gRPC server (e.g.,"localhost","api.example.com")port: The port number the gRPC server is listening on (e.g.,50051)secure: ABoolthat controls whether HTTPS/gRPCS (whentrue) or HTTP/gRPC (whenfalse) is used for the connection. Default:falsegrpc: The global gRPC handle obtained fromgrpc_global_handle(). This manages the underlying libcurl multi-handle for HTTP/2 multiplexing. Default:grpc_global_handle()deadline: The gRPC deadline in seconds. If a request takes longer than this time limit, it will be cancelled and raise an exception. Default:10keepalive: The TCP keepalive interval in seconds. This sets bothCURLOPT_TCP_KEEPINTVL(interval between keepalive probes) andCURLOPT_TCP_KEEPIDLE(time before first keepalive probe) to help detect broken connections. Default:60max_send_message_length: The maximum size in bytes for messages sent to the server. Attempting to send messages larger than this will raise an exception. Default:4*1024*1024(4 MiB)max_recieve_message_length: The maximum size in bytes for messages received from the server. Receiving messages larger than this will raise an exception. Default:4*1024*1024(4 MiB)
Example
# Create a client for the TestRPC endpoint
client = TestService_TestRPC_Client(
"localhost", 50051;
secure=true, # Use HTTPS/gRPCS
deadline=30, # 30 second timeout
max_send_message_length=10*1024*1024, # 10 MiB max send
max_recieve_message_length=10*1024*1024 # 10 MiB max receive
)RPC
Unary
gRPCClient.grpc_async_request — Method
grpc_async_request(client::gRPCServiceClient{TRequest,false,TResponse,false}, request::TRequest) where {TRequest<:Any,TResponse<:Any}Initiate an asynchronous gRPC request: send the request to the server and then immediately return a gRPCRequest object without waiting for the response. In order to wait on / retrieve the result once its ready, call grpc_async_await. This is ideal when you need to send many requests in parallel and waiting on each response before sending the next request would things down.
using gRPCClient
# ============================================================================
# Step 1: Initialize gRPC
# ============================================================================
# This must be called once before making any gRPC requests.
# It initializes the underlying libcurl multi handle and other resources.
grpc_init()
# ============================================================================
# Step 2: Include Generated Protocol Buffer Bindings
# ============================================================================
# These bindings define the message types (e.g., TestRequest, TestResponse)
# and client stubs for your gRPC service. They are generated from .proto files.
include("test/gen/test/test_pb.jl")
# ============================================================================
# Step 3: Create a Client for Your RPC Method
# ============================================================================
# The client is bound to a specific RPC method on your gRPC service.
# Arguments: hostname, port
client = TestService_TestRPC_Client("localhost", 8001)
# ============================================================================
# Step 4: Send Multiple Async Requests
# ============================================================================
# Use grpc_async_request when you want to send requests without blocking.
# This is useful for sending many requests in parallel.
# Send all requests without waiting for responses
requests = Vector{gRPCRequest}()
for i in 1:10
# Each request is sent immediately and returns a gRPCRequest handle
push!(
requests,
grpc_async_request(client, TestRequest(1, zeros(UInt64, 1)))
)
end
# ============================================================================
# Step 5: Wait for and Process Responses
# ============================================================================
# Use grpc_async_await to retrieve the response when you need it.
for request in requests
# This blocks until the specific request completes
response = grpc_async_await(client, request)
@info response
endgRPCClient.grpc_async_request — Method
grpc_async_request(client::gRPCServiceClient{TRequest,false,TResponse,false}, request::TRequest, channel::Channel{gRPCAsyncChannelResponse{TResponse}}, index::Int64) where {TRequest<:Any,TResponse<:Any}Initiate an asynchronous gRPC request: send the request to the server and then immediately return. When the request is complete a background task will put the response in the provided channel. This has the advantage over the request / await patern in that you can handle responses immediately after they are recieved in any order.
using gRPCClient
# ============================================================================
# Step 1: Initialize gRPC
# ============================================================================
# This must be called once before making any gRPC requests.
grpc_init()
# ============================================================================
# Step 2: Include Generated Protocol Buffer Bindings
# ============================================================================
include("test/gen/test/test_pb.jl")
# ============================================================================
# Step 3: Create a Client for Your RPC Method
# ============================================================================
client = TestService_TestRPC_Client("localhost", 8001)
# ============================================================================
# Step 4: Create a Channel to Receive Responses
# ============================================================================
# Use the channel-based pattern when you want to process responses as soon
# as they arrive, regardless of the order they were sent.
N = 10
channel = Channel{gRPCAsyncChannelResponse{TestResponse}}(N)
# ============================================================================
# Step 5: Send All Requests
# ============================================================================
# The index parameter allows you to track which request each response
# corresponds to, since responses may arrive out of order.
for (index, request) in enumerate([TestRequest(i, zeros(UInt64, i)) for i in 1:N])
grpc_async_request(client, request, channel, index)
end
# ============================================================================
# Step 6: Process Responses as They Arrive
# ============================================================================
# Responses are pushed to the channel as they complete. You can process
# them immediately without waiting for all requests to finish first.
for i in 1:N
cr = take!(channel)
# Check if an exception was thrown during the request
!isnothing(cr.ex) && throw(cr.ex)
# Use the index to match responses to requests
@assert length(cr.response.data) == cr.index
endgRPCClient.grpc_async_await — Method
grpc_async_await(client::gRPCServiceClient{TRequest,false,TResponse,false}, request::gRPCRequest) where {TRequest<:Any,TResponse<:Any}Wait for the request to complete and return the response when it is ready. Throws any exceptions that were encountered during handling of the request.
gRPCClient.grpc_sync_request — Method
grpc_sync_request(client::gRPCServiceClient{TRequest,false,TResponse,false}, request::TRequest) where {TRequest<:Any,TResponse<:Any}Do a synchronous gRPC request: send the request and wait for the response before returning it. Under the hood this just calls grpc_async_request and grpc_async_await. Use this when you want the simplest possible interface for a single request.
using gRPCClient
# ============================================================================
# Step 1: Initialize gRPC
# ============================================================================
# This must be called once before making any gRPC requests.
grpc_init()
# ============================================================================
# Step 2: Include Generated Protocol Buffer Bindings
# ============================================================================
include("test/gen/test/test_pb.jl")
# ============================================================================
# Step 3: Create a Client for Your RPC Method
# ============================================================================
client = TestService_TestRPC_Client("localhost", 8001)
# ============================================================================
# Step 4: Make a Synchronous Request
# ============================================================================
# This blocks until the response is ready. It's the simplest way to make
# a single gRPC request when you don't need parallelism.
response = grpc_sync_request(client, TestRequest(1, zeros(UInt64, 1)))
@info responseStreaming
gRPCClient.grpc_async_request — Method
grpc_async_request(client::gRPCServiceClient{TRequest,true,TResponse,false}, request::Channel{TRequest}) where {TRequest<:Any,TResponse<:Any}Start a client streaming gRPC request (multiple requests, single response).
using gRPCClient
# ============================================================================
# Step 1: Initialize gRPC
# ============================================================================
# This must be called once before making any gRPC requests.
grpc_init()
# ============================================================================
# Step 2: Include Generated Protocol Buffer Bindings
# ============================================================================
include("test/gen/test/test_pb.jl")
# ============================================================================
# Step 3: Create a Client for Your Streaming RPC Method
# ============================================================================
client = TestService_TestClientStreamRPC_Client("localhost", 8001)
# ============================================================================
# Step 4: Create a Request Channel and Send Requests
# ============================================================================
# The channel buffers requests that will be streamed to the server.
# Buffer size of 16 means up to 16 requests can be queued.
request_c = Channel{TestRequest}(16)
# Send one or more requests through the channel
put!(request_c, TestRequest(1, zeros(UInt64, 1)))
# ============================================================================
# Step 5: Initiate the Streaming Request
# ============================================================================
req = grpc_async_request(client, request_c)
# ============================================================================
# Step 6: Close the Request Channel When Done
# ============================================================================
# IMPORTANT: You must close the channel to signal that no more requests
# will be sent. The server won't send the response until the stream ends.
close(request_c)
# ============================================================================
# Step 7: Wait for the Single Response
# ============================================================================
# After all requests are sent and processed, the server returns one response.
test_response = grpc_async_await(client, req)gRPCClient.grpc_async_request — Method
grpc_async_request(client::gRPCServiceClient{TRequest,false,TResponse,true},request::TRequest,response::Channel{TResponse}) where {TRequest<:Any,TResponse<:Any}Start a server streaming gRPC request (single request, multiple responses).
using gRPCClient
# ============================================================================
# Step 1: Initialize gRPC
# ============================================================================
# This must be called once before making any gRPC requests.
grpc_init()
# ============================================================================
# Step 2: Include Generated Protocol Buffer Bindings
# ============================================================================
include("test/gen/test/test_pb.jl")
# ============================================================================
# Step 3: Create a Client for Your Streaming RPC Method
# ============================================================================
client = TestService_TestServerStreamRPC_Client("localhost", 8001)
# ============================================================================
# Step 4: Create a Response Channel
# ============================================================================
# The channel will receive multiple responses from the server.
# Buffer size of 16 means up to 16 responses can be queued.
response_c = Channel{TestResponse}(16)
# ============================================================================
# Step 5: Initiate the Streaming Request
# ============================================================================
# Send a single request. The server will respond with multiple messages.
req = grpc_async_request(
client,
TestRequest(1, zeros(UInt64, 1)),
response_c,
)
# ============================================================================
# Step 6: Process Streaming Responses
# ============================================================================
# Read responses from the channel as they arrive. The channel will be closed
# when the server finishes sending all responses.
for test_response in response_c
@info test_response
end
# ============================================================================
# Step 7: Check for Exceptions
# ============================================================================
# Call grpc_async_await to raise any exceptions that occurred during the request.
grpc_async_await(req)gRPCClient.grpc_async_request — Method
grpc_async_request(client::gRPCServiceClient{TRequest,true,TResponse,true},request::Channel{TRequest},response::Channel{TResponse}) where {TRequest<:Any,TResponse<:Any}Start a bidirectional streaming gRPC request (multiple requests, multiple responses).
using gRPCClient
# ============================================================================
# Step 1: Initialize gRPC
# ============================================================================
# This must be called once before making any gRPC requests.
grpc_init()
# ============================================================================
# Step 2: Include Generated Protocol Buffer Bindings
# ============================================================================
include("test/gen/test/test_pb.jl")
# ============================================================================
# Step 3: Create a Client for Your Streaming RPC Method
# ============================================================================
client = TestService_TestBidirectionalStreamRPC_Client("localhost", 8001)
# ============================================================================
# Step 4: Create Request and Response Channels
# ============================================================================
# Both channels allow streaming in both directions simultaneously.
# Buffer size of 16 means up to 16 messages can be queued in each direction.
request_c = Channel{TestRequest}(16)
response_c = Channel{TestResponse}(16)
# ============================================================================
# Step 5: Initiate the Bidirectional Streaming Request
# ============================================================================
req = grpc_async_request(client, request_c, response_c)
# ============================================================================
# Step 6: Send Requests and Receive Responses Concurrently
# ============================================================================
# In bidirectional streaming, you can send and receive at the same time.
# This example shows a simple pattern, but you can use tasks for more
# complex concurrent communication patterns.
# Send a request
put!(request_c, TestRequest(1, zeros(UInt64, 1)))
# Receive responses as they arrive
for test_response in response_c
@info test_response
# Optionally send more requests based on responses
# put!(request_c, ...)
break # Exit after first response for this example
end
# ============================================================================
# Step 7: Close the Request Channel When Done
# ============================================================================
# IMPORTANT: You must close the request channel to signal that no more
# requests will be sent.
close(request_c)
# ============================================================================
# Step 8: Check for Exceptions
# ============================================================================
# Call grpc_async_await to raise any exceptions that occurred during the request.
grpc_async_await(req)gRPCClient.grpc_async_await — Method
grpc_async_await(client::gRPCServiceClient{TRequest,true,TResponse,false},request::gRPCRequest) where {TRequest<:Any,TResponse<:Any}Raise any exceptions encountered during the streaming request.
Exceptions
gRPCClient.gRPCServiceCallException — Type
Exception type that is thrown when something goes wrong while calling an RPC. This can either be triggered by the servers response code or by the client when something fails.
This exception type has two fields:
grpc_status::Int- See here for an indepth explanation of each status.message::String
gRPCClientUtils.jl
A module for benchmarking and stress testing has been included in utils/gRPCClientUtils.jl. In order to add it to your test environment:
using Pkg
Pkg.add(path="utils/gRPCClientUtils.jl")Benchmarks
All benchmarks run against the Test gRPC Server in test/go. See the relevant documentation for information on how to run this.
All Benchmarks w/ PrettyTables.jl
using gRPCClientUtils
benchmark_table()╭──────────────────────────────────┬─────────────┬────────────────┬────────────┬──────────────┬─────────┬──────┬──────╮
│ Benchmark │ Avg Memory │ Avg Allocs │ Throughput │ Avg duration │ Std-dev │ Min │ Max │
│ │ KiB/message │ allocs/message │ messages/s │ μs │ μs │ μs │ μs │
├──────────────────────────────────┼─────────────┼────────────────┼────────────┼──────────────┼─────────┼──────┼──────┤
│ workload_smol │ 2.78 │ 67.5 │ 17744 │ 56 │ 3.3 │ 51 │ 66 │
│ workload_32_224_224_uint8 │ 636.8 │ 74.1 │ 578 │ 1731 │ 99.33 │ 1583 │ 1899 │
│ workload_streaming_request │ 0.87 │ 6.5 │ 339916 │ 3 │ 1.61 │ 2 │ 20 │
│ workload_streaming_response │ 13.0 │ 27.7 │ 65732 │ 15 │ 4.94 │ 6 │ 50 │
│ workload_streaming_bidirectional │ 1.45 │ 25.6 │ 105133 │ 10 │ 6.06 │ 4 │ 55 │
╰──────────────────────────────────┴─────────────┴────────────────┴────────────┴──────────────┴─────────┴──────┴──────╯Stress Workloads
In addition to benchmarks, a number of workloads based on these are available:
stress_workload_smol()stress_workload_32_224_224_uint8()stress_workload_streaming_request()stress_workload_streaming_response()stress_workload_streaming_bidirectional()
These run forever, and are useful to help identify any stability issues or resource leaks.