Airframe

Airframe

  • Docs
  • Blog
  • Release Notes
  • GitHub

›Framework

Resources

  • Overview
  • Articles
  • Release Notes
  • Logos

Framework

  • airframe-di: Dependency Injection
  • Airframe RPC
  • airframe-http: Creating REST Service
  • airframe-rx: ReactiveX interface
  • AirSpec: Testing Framework

Core Modules

  • airframe-codec: Schema-On-Read Object Serializer
  • airframe-config: Application Config Flow
  • airframe-control: Retry/Rate Control
  • airframe-log: Application Logger
  • airframe-metrics: Human-Friendly Measures for Time and Data Size
  • airframe-surface: Object Shape Inspector

Utilities

  • airframe-benchmark: JMH Benchmark
  • airframe-canvas: Off-Heap Memory Manager
  • airframe-fluentd: Fluentd Logger
  • airframe-http-recorder: Web Request/Response Recorder
  • airframe-jdbc: JDBC Connection Pool
  • airframe-jmx: JMX Application Monitor
  • airframe-json: Pure-Scala JSON Parser
  • airframe-launcher: Command-Line Program Launcher
  • airframe-msgpack: Pure-Scala MessagePack Parser
  • airframe-parquet: Parquet Columnar File Reader and Writer
  • airframe-sql: SQL Parser
  • airframe-ulid: ULID Generator

Airframe RPC

Airframe RPC is a framework for building RPC services by using Scala as a unified interface between servers and clients.

overview

Airframe RPC Features:

  • Use plain Scala functions as RPC endpoints.
  • Support Finagle (HTTP/1) or gRPC (HTTP/2) backends.
  • sbt-airframe plugin to generate RPC clients. No need to make HTTP requests by yourself.
  • Scala.js support for building interactive web browser applications.
  • Open API schema generation.

Why Airframe RPC?

Airframe RPC enables calling Scala methods at remote servers. You don’t need to worry about how to encode your data into JSON, nor how to define HTTP REST endpoints. Airframe RPC abstracts away these details; the framework generates the code for serializing your data objects into JSON or MessagePack and calls appropriate HTTP endpoints on your behalf.

For defining RPC services, we need to define functions and data structures for representing HTTP requests and responses. Airframe RPC leverages the nature of Scala as a functional and object-oriented programming language; You can use plain Scala functions as RPC endpoints and case classes for modeling complex data. Here is an example of Airframe RPC interface:

// An example RPC definition using Airframe RPC
@RPC
trait Greeter {
  // RPC endpoint definition
  def hello(name: String): GreeterResponse = GreeterResponse(s"Hello ${name}!")
}

// A model class defined with case class.
// This will be encoded into JSON {"message":"...."} or its MessagePack representation
case class GreeterResponse(message: String)

While gRPC has been a popular approach for building RPC services, its ecosystem often uses ProtocolBuffers for defining data structures and RPC methods. To start using gRPC in Scala, we need to bridge the gap between ProtocolBuffers and Scala (e.g., ScalaPB). gRPC itself, however, is a data-format agnostic framework. So, in order to naturally use Scala for gRPC, we extended gRPC to support MessagePack (a compact binary alternative of JSON) and used airframe-codec for message serialization so that we can create RPC services without introducing ProtocolBuffers.

Airframe RPC also supports building HTTP/1 services backed by Finagle. Supporting both HTTP/1 and HTTP/2 is important because gRPC heavily uses HTTP/2 features, but HTTP/1-based web clients including web browsers still don't fully support HTTP/2.

In 2020, Scala.js, which can compile Scala code into JavaScript, finally became 1.0.0 after 7 years of development . This also has paved a way for using Scala both for servers (Scala JVM) and clients (Scala.js with Ajax call with HTTP/1). We explored the approach for using Scala's functional interfaces as RPC endpoint definitions, and successfully created Airframe RPC, which works both for Scala JVM and Scala.js, on top of technology stack of 20+ Airframe modules.

Although Airframe RPC is a relatively new project started at March 2020 inside Treasure Data, this project has proved various advantages. For example:

  • Free from REST. We can just use Scala's functional interface for defining servers. Google's REST API Design Guide has been useful resources for defining clear REST API endpoints, but we've found using programming language's native interface is much easier.
  • No more web-framework wars. In Scala, there are many web frameworks, such as Finatra, Finch , Akka HTTP, and our own airframe-http, etc. Each of them has its own pros and cons, and choosing one of them has been a hard choice for us. Now, we can just start from Airframe RPC using plain Scala interfaces. If necessary, we can use airframe-http for adding custom HTTP endpoints.
  • Seamless integration with Scala.js. Writing web browser applications in JavaScript that interact with servers is not easy. You may need to learn about the existing frameworks like React.js, Vue.js, and a lot of techniques for using them. By using Scala both for server and client code, an engineer just joined the company could write an RPC application using Scala and Scala.js in a few days.

Airframe RPC: Overview

For using Airframe RPC, first, define your RPC service interface using regular Scala functions by adding @RPC annotation. All public methods in this class will be your RPC endpoints. For the method arguments and return types, you can use arbitrary types ( See Object Serialization for the list of available types). To pass complex messages, you can use case classes.

package hello.api.v1

import wvlet.airframe.http._

// A model class. This will be serialized into JSON or MessagePack
case
class Person(id: Int, name: String)

// RPC interface definition
@RPC
trait MyService {
  def hello(person: Person): String
}

Next, implement this service interface in Scala:

package hello.api.v1

import wvlet.airframe.http._

class MyServiceImpl extends MyService {
  override def hello(person: Person): String = s"Hello ${person.name} (id=${person.id})!"
}

To start an RPC web server, Airfarme RPC provides Finagle-based web server implementation. The following code starts an RPC web server at http://localhost:8080/:

// Create a Router
val router = Router.add[MyServiceImpl]

// Starting a new RPC server.
Finagle
  .server
  .withRouter(router)
  .withPort(8080)
  .start { server =>
     server.waitForTermination
  }

To access the RPC server, we need to generate an RPC client from the RPC interface definition. We can use an RPC client hello.api.v1.ServiceRPC interface generated by sbt-airframe, which reads an RPC interface code and generates HTTP client code for calling RPC methods.

Now, you are ready to call remote Scala methods:

import hello.api.v1._

// Create an RPC client

val client = ServiceRPC.newRPCSyncClient(Http.client.newSyncClient("localhost:8080"))

// Your first RPC call!
client.myService.hello(Person(id = 1, name = "leo")) // "Hello leo (id=1)!"

That’s it! Now you can call remote Scala methods as if they were regular Scala functions. Airframe RPC also supports asynchronous clients using Future.

Usage

The basic flow of using Airframe RPC is as follows:

  1. Define RPC interfaces with @RPC annotation
  2. Implement the RPC interfaces in Scala
  3. Create wvlet.airframe.http.Router by adding the RPC interface implementation classes.
  4. Generate RPC client code with sbt-airframe plugin

Basic Project Structure

Here is an example build configurations for using Airframe RPC with Scala and Scala.js.

maven central

project/plugins.sbt

// For RPC client generation
addSbtPlugin("org.wvlet.airframe" % "sbt-airframe" % "(version)")

// [optional] For Scala.js
addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.1.0")
addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.0.0")

build.sbt

val AIRFRAME_VERSION = "(version)"

// Common build settings
val buildSettings = Seq(
  organization := "(your organization)",
  scalaVersion := "2.12.10"
  // Add your own settings here
)

// RPC API definition. This project should contain only RPC interfaces
lazy val api =
  crossProject(JSPlatform, JVMPlatform)
    .crossType(CrossType.Pure)
    .in(file("myapp-api"))
    .settings(
       buildSettings,
       libraryDependencies += "org.wvlet.airframe" %%% "airframe-http" % AIRFRAME_VERSION
    )

lazy val apiJVM = api.jvm
lazy val apiJS = api.js

// RPC server project
lazy val server =
  project
    .in(file("myapp-server"))
    .settings(
      buildSettings,
      libraryDependencies ++= Seq(
        "org.wvlet.airframe" %% "airframe-http-finagle" % AIRFRAME_VERSION,
        // [For gRPC] Use airframe-http-grpc instead of Finagle
        "org.wvlet.airframe" %% "airframe-http-grpc" % AIRFRAME_VERSION
      )
    )
    .dependsOn(apiJVM)

// RPC client project
lazy val client =
  project
    .in(file("myapp-client"))
    .enablePlugins(AirframeHttpPlugin)
    .settings(
      buildSettings,
      // Generate an RPC client for myapp.app.v1 package
      airframeHttpClients := Seq("myapp.app.v1:rpc"),
      // Enable debug logging of sbt-airframe
      airframeHttpGeneratorOption := "-l debug",
      libraryDependencies ++= Seq(
        // Add this for using gRPC
        "org.wvlet.airframe" %% "airframe-http-grpc" % AIRFRAME_VERSION
      )
   )
   .dependsOn(apiJVM)

// [optional] Scala.js UI using RPC
lazy val ui =
  project
    .in(file("myapp-ui"))
    .enablePlugins(ScalaJSPlugin, AirframeHttpPlugin)
    .settings(
      buildSettings,
      // Scala.js only supports async clients
      airframeHttpClients := Seq("myapp.app.v1:rpc"),
      // Enable debug logging of sbt-airframe
      airframeHttpGeneratorOption := "-l debug"
    )
    .dependsOn(apiJS)

sbt-airframe plugin

sbt-airframe plugins supports generating HTTP clients for making RPC calls. sbt-airframe supports generating async, sync, or Scala.js HTTP clients.

maven central

Add the following plugin settings:

project/plugins.sbt

addSbtPlugin("org.wvlet.airframe" % "sbt-airframe" % "(version)")

To generate HTTP clients, add airframeHttpClients setting to your build.sbt. You need to specify which API package to use for generating RPC clients. The format is <RPC package name>:<client type>(:<target package name>(.<target class name)?))?. For example:

build.sbt

enablePlugins(AirframeHttpPlugin)

airframeHttpClients := Seq("hello.api.v1:rpc")

With this setting, sbt-airframe generates hello.api.v1.ServiceRPC class. You can create RPC clients from this class with .newSyncClient(...) and .newAsyncClient(...). Sync clients are blocking RPC clients, which wait until the method recevies RPC responses from the RPC server. Async clients returns Future[_] response type so that you can do other jobs while waiting the response. The generated client code can be found in target/scala-(scala version)/src_managed/(api package)/ folder.

To rename the generated client name, append the desired class name followed by comma:

airframeHttpClients := Seq("hello.api.v1:rpc:HelloRPC")

This example generates hello.api.v1.HelloRPC class.

Supported RPC Client Types

Supported client types are:

  • rpc : Create a default RPC client class for Scala JVM (sync and async) and Scala.js (async-only)
  • grpc: Create gRPC client (ServiceGrpc: SyncClient, AsyncClient)
  • sync: (legacy client. Use rpc instead) Create a sync HTTP client (ServiceSyncClient) for Scala (JVM)
  • async: (legacy client. Use rpc instead) Create an async HTTP client (ServiceClient) for Scala (JVM) using Future abstraction (F). The F can be scala.concurrent.Future or twitter-util's Future.
  • scalajs: (legacy client. Use rpc instead) Create an RPC client (ServiceClientJS)

Internally, sbt-airframe generates these clients using HTTP code generators. This code reads a Router definition of RPC interfaces, and generate client code for calling RPC endpoints. Currently, we only supports generating HTTP clients for Scala. In near future, we would like to add Open API spec generator so that many programming languages can be used with Airframe RPC.

sbt-airframe commands

When you change your API interface, run airframeHttpReload command to update your RPC client:

# Regenerate the generated client code.Use this if RPC interface has changed
> airframeHttpReload

# Generating RPC clients manually
> airframeHttpGenerateClients

# Clean the generated code
> airframeHttpClean

Open API

sbt-airframe plugin also supports generating Open API specification from Airframe RPC interfaces. To generate OpenAPI spec from RPC definition, add airframeHttpOpenAPIPackages configuration to your build.sbt:

// [Required] RPC packages to use for generating Open API specification
airframeHttpOpenAPIPackages := Seq("hello.api")
// [Optional] Specify target directory to generate openapi.yaml. The default is target directory
airframeHttpOpenAPITargetDir := target.value
// [Optional] Additional configurations (e.g., title, version, etc.)
airframeHttpOpenAPIConfig := OpenAPIConfig(
  title = "My API", // default is project name
  version = "1.0.0", // default is project version,
  format = "yaml", // yaml (default) or json
  filePrefix = "openapi" // Output file name: (filePrefix).(format)
)

With this configuration, Open API spec will be generated when running package task:

> package

Or you can manually trigger OpenAPI file generation:

> airframeHttpOpenAPIGenerate

It will generate target/openapi.yaml file.

RPC Logging

Airframe RPC stores HTTP access logs to log/http-access.json by default. This json logs contains HTTP request related parameters and RPC-specific fields described below:

  • rpc_interface: RPC interface class name
  • rpc_class: The atual RPC implementation class name
  • rpc_method: The RPC method name
  • rpc_args: The RPC call argument parameters described in JSON

These parameters can be used for debugging your RPC requests.

See also airframe-http: Access Logs for more details.

RPC Filters

(This feature is not available for gRPC backend)

Airframe RPC can chain arbitrary HTTP request filters before processing HTTP requests. Most typical use cases would be adding an authentication filter for RPC calls:

import wvlet.airframe.http._
import wvlet.airframe.http.HttpMessage.{Request,Response}

object AuthFilter extends Http.Filter {
  def apply(request: Request, context: Context): Future[Response] = {
    val auth = request.authorization
    if (isValidAuth(auth)) {
      // Call the next filter chain
      context(request)
    }
    else {
      // Reject the request
      throw RPCStatus.UNAUTHENTICATED_U13.newException("Invalid user")
    }
  }
}
// Router for RPC
val rpcRouter = Router.add[MyApp]

// Add a filter before processing RPC requests
val router = Router
        .add(AuthFilter)
        .andThen(rpcRouterr)

DI Integration

Airframe RPC natively supports Airframe DI for dependency injection so that you can inject necessary components for running your web service using bind[X] syntax or constructor injection. DI is useful when building web applications requiring many components and if you need to decouple component implementations from the service implementations. Airframe DI also supports switching component implementations between production and tests for the convenience of module tests.

Here is an example of using Airframe DI for starting an RPC server:

import wvlet.airframe._

trait MyAPIImpl extends MyAPI {
  // Inject your component
  private
  val myService = bind[MyService]

  override def hello(
  ...) =...
}

val router = Router.add[MyAPIImpl]

// Define the component implementation to use
val design = newDesign
  .bind[MyService].toInstance(new MyServiceImpl(...))
  .add(Finagle.server.withRouter(router).design)

// Launch a Finagle Server
design.build[FinagleServer] { server =>
  server.waitForTermination
}

Object Serialization

Airframe @RPC interface supports almost all commonly used data types in Scala (and Scala.js). Note that some Java-specific classes (e.g., ZonedDateTime) is not supported in Scala.js.

Here is the list of available data types:

  • case classes whose parameter types (including generic types) are described in this list.
  • Primitive types (Int, Long, String, Double, Float, Boolean, etc)
  • java.util.UUID
  • java.time.Instant (recommended because it can be used for Scala.js too)
    • (JVM only) ZonedDataTime, java.util.Date. These types cannot be used in Scala.js.
  • Collection types: Seq, IndexedSeq, List, Set, Array, Map, Tuple (up to 21 parameters), Option, Either.
  • Exception, Throwable
    • Exception types will be serialized as GenericException for safety.
  • airframe-metrics types: ElapsedTime, DataSize, Count, etc.
  • Raw Json, JSONValue, MsgPack values.
  • Enum-like case object class, which has object X { def unapply(s:String): Option[X] } definition. String representation of enum-like classes will be used. Scala's native Enumeration classes are not supported.

Airframe RPC internally uses schema-on-read functionality of airframe-codec for serializing messages between server and clients. Even if the data type is slightly different from the target type, for example, if the input data is "100", but the target type is Int, the input String "100" will be translated into an Int value 100 automatically.

RPCContext

Since Airframe 22.8.0, airframe-rpc introduced RPCContext for reading and writing the thread-local storage, and referencing the original HTTP request:

import wvlet.airframe.http._

@RPC
trait MyAPI {
  def hello: String = {
    // Read the thread-local storage
    val userName = RPCContext.current.getThreadLocal[String]("context_user")
    s"Hello ${userName}"
  } 
  
  def authTest: String = {
    // Read the original http request
    RPCContext.current.httpRequest.authorization match {
      case Some(auth) if isValidAuth(auth) => 
        "Ok"  
      case _ =>
         throw RPCStatus.PERMISSION_DENIED_U14.newException(s"invalid user")
    }
  }
}

RPCContext is available both for Finagle and gRPC backend.

Receiving Raw HTTP Responses

⚠️ Since Airframe 22.8.0, we no longer recommend adding Request as RPC parameters. Use RPCContext.current.httpRequest instead

If you need to manage HTTP request specific parameters (e.g., HTTP headers), you can add request object to the RPC arguments.

import wvlet.airframe.http._
import wvlet.airframe.http.HttpMessage.{Request, Respone}

@RPC
trait MyAPI {
  def rpc1(p1: String, p2: Int, request: Request): Response
}

Reporting Errors with RPCStatus

Airframe RPC provides predefined RPCStatus code for reporting application errors at ease. In your RPC implementation, use one of the RPCStatus codes and create an exception with .newException(...) method to report an error:

import wvlet.airframe.http._

@RPC
trait MyApp {
  def helloRPC(msg: String): String = {
    // This will report the error code, error message, and the stack trace inside the response body.
    // See the table below to see the http status code that will be returned to the client.
    throw RPCStatus.INVALID_REQUEST_U1.newException("Unexpected message")
  }
}

If necessary, you can pass an application specific error code appErrorCode and more detailed metadata in the form of Map[String, Any] using the .newException arguments. Note: This metadata needs to be serializable. See Object Serialization section. These error details will be reported to the RPC server log and to the HTTP response body. The stacktrace of the exception will be reported as well. If you need to hide such a stack trace from the error message for security reasons (e.g., rejecting requests in an authentication filter), call .newException(...).noStackTrace to hide the stack trace.

Exceptions created from RPCStatus will be mapped to an appropriate HTTP status code. If the backend is gRPC, it will be mapped to the corresponding gRPC status code as well. RPCStatus covers all existing gRPC status code and frequently used HTTP status code. The mapping table between RPCStatus and Grpc/HTTP status code is shown below:

RPCStatusTypegRPC StatusHttp Status
SUCCESS_S0SUCCESSOK_0200: OK
USER_ERROR_U0USER_ERRORINVALID_ARGUMENT_3400: Bad Request
INVALID_REQUEST_U1USER_ERRORINVALID_ARGUMENT_3400: Bad Request
INVALID_ARGUMENT_U2USER_ERRORINVALID_ARGUMENT_3400: Bad Request
SYNTAX_ERROR_U3USER_ERRORINVALID_ARGUMENT_3400: Bad Request
OUT_OF_RANGE_U4USER_ERROROUT_OF_RANGE_11400: Bad Request
NOT_FOUND_U5USER_ERRORNOT_FOUND_5404: Not Found
ALREADY_EXISTS_U6USER_ERRORALREADY_EXISTS_6409: Conflict
NOT_SUPPORTED_U7USER_ERRORUNIMPLEMENTED_12405: Method Not Allowed
UNIMPLEMENTED_U8USER_ERRORUNIMPLEMENTED_12405: Method Not Allowed
UNEXPECTED_STATE_U9USER_ERRORFAILED_PRECONDITION_9400: Bad Request
INCONSISTENT_STATE_U10USER_ERRORFAILED_PRECONDITION_9400: Bad Request
CANCELLED_U11USER_ERRORCANCELLED_1499: Client Closed Request
ABORTED_U12USER_ERRORABORTED_10409: Conflict
UNAUTHENTICATED_U13USER_ERRORUNAUTHENTICATED_16401: Unauthorized
PERMISSION_DENIED_U14USER_ERRORPERMISSION_DENIED_7403: Forbidden
INTERNAL_ERROR_I0INTERNAL_ERRORINTERNAL_13500: Internal Server Error
UNKNOWN_I1INTERNAL_ERRORUNKNOWN_2500: Internal Server Error
UNAVAILABLE_I2INTERNAL_ERRORUNAVAILABLE_14503: Service Unavailable
TIMEOUT_I3INTERNAL_ERRORDEADLINE_EXCEEDED_4504: Gateway Timeout
DEADLINE_EXCEEDED_I4INTERNAL_ERRORDEADLINE_EXCEEDED_4504: Gateway Timeout
INTERRUPTED_I5INTERNAL_ERRORINTERNAL_13500: Internal Server Error
SERVICE_STARTING_UP_I6INTERNAL_ERRORUNAVAILABLE_14503: Service Unavailable
SERVICE_SHUTTING_DOWN_I7INTERNAL_ERRORUNAVAILABLE_14503: Service Unavailable
DATA_LOSS_I8INTERNAL_ERRORDATA_LOSS_15500: Internal Server Error
RESOURCE_EXHAUSTED_R0RESOURCE_EXHAUSTEDRESOURCE_EXHAUSTED_8429: Too Many Requests
OUT_OF_MEMORY_R1RESOURCE_EXHAUSTEDRESOURCE_EXHAUSTED_8429: Too Many Requests
EXCEEDED_RATE_LIMIT_R2RESOURCE_EXHAUSTEDRESOURCE_EXHAUSTED_8429: Too Many Requests
EXCEEDED_CPU_LIMIT_R3RESOURCE_EXHAUSTEDRESOURCE_EXHAUSTED_8429: Too Many Requests
EXCEEDED_MEMORY_LIMIT_R4RESOURCE_EXHAUSTEDRESOURCE_EXHAUSTED_8429: Too Many Requests
EXCEEDED_TIME_LIMIT_R5RESOURCE_EXHAUSTEDRESOURCE_EXHAUSTED_8429: Too Many Requests
EXCEEDED_DATA_SIZE_LIMIT_R6RESOURCE_EXHAUSTEDRESOURCE_EXHAUSTED_8429: Too Many Requests
EXCEEDED_STORAGE_LIMIT_R7RESOURCE_EXHAUSTEDRESOURCE_EXHAUSTED_8429: Too Many Requests
EXCEEDED_BUDGET_R8RESOURCE_EXHAUSTEDRESOURCE_EXHAUSTED_8429: Too Many Requests

RPC Request Retry

Generally speaking, the RPC client can retry the request upon RPCStatus with INTERNAL_ERROR type. If RESOURCE_EXHAUSTED error type is returned, the client should wait a bit until the server-side resource becomes available. USER_ERROR is not retryable in general. The generated RPC clients has a built-in request retry mechanism (default is Jitter retry upto 15 retries) based on the returned HTTP status code from the RPC server.

You can configure the retry method (e.g., retry count) when building an HTTP client. For example, you can increase the number of retries like this:

val httpClient = Http.client.withRetryContext(_.withMaxRetry(100)).newSyncClient("localhost:8080")
val rpcClient = ServiceRPC.newSyncClient(httpClient)

Reading RPCException at the client

To read the RPCStatus and error details at the RPC client side, catch RPCException:

try {
  rpcClient.hello(...)
}
catch {
  case e: RPCException =>
    // Read the error message from the RPC server
}

Other Tips

Airframe RPC is built on top of Airframe HTTP framework. See Airframe HTTP documentation for the other features and advanced configurations.

Airframe gRPC

(This is an experimental feature available since Airframe 20.8.0)

Airframe gRPC is a gRPC and HTTP2-based implementation of Airframe RPC, which can make thousands of RPC calls per second. Airframe gRPC requires no Protobuf definitions. You can use plain Scala interface and case classes to define gRPC services.

Example gRPC projects can be found from here.

build.sbt

"org.wvlet.airframe" %% "airframe-http-grpc" % AIRFRAME_VERSION

Defining gRPC API

As in Airframe RPC, just use Scala trait annotated with @RPC. All public methods in this trait will be RPC endpoints:

package example.api

import wvlet.airframe.http.RPC

@RPC
trait GreeterApi {
  def sayHello(message: String): String
}

Generating gRPC client

Add a following build setting to generate a gRPC client by using sbt-airframe plugin:

airframeHttpClients := Seq("example.api:grpc")

With this setting, gRPC client stubs example.api.ServiceGrpc will be generated. You can create a new sync or async client with ServiceGrpc.newSyncClient or newAsyncClient methods.

Starting An Airframe gRPC Server

import wvlet.airframe.http.Router
import wvlet.airframe.http.grpc.gRPC

// API implementation
class GreeterApiImpl extends example.api.GreeterApi {
  def sayHello(message: String): String = s"Hello ${message}!"
}

// Create a Router definition in the same manner with Airframe RPC
val router = Router.add[GreeterApiImpl]

gRPC.server
  .withRouter(router)
  .withPort(8080)
  // [optional] You can add gRPC interceptors here
  //.withInterceptor(...)
  // [optional] you can customize gRPC server here
  //.withServerInitializer{ x: ServerBuilder => x.addMethod(...); x }
  // [optional] Disable the default logging to log/http-access.json file
  //.noRequestLogging
  .start { server =>
    // gRPC server (based on Netty) starts at localhost:8080
    server.awaitTermination
  }

gRPC Client

sbt-airframe generates ServiceGrpc.scala file to the target API package. You can create sync ( blocking) or async (non-blocking) gRPC clients using this class.

gRPC Sync Client

import example.api.ServiceGrpc

// Create a client channel
val channel = ManagedChannelBuilder.forTarget("localhost:8080").usePlaintext().build()

// Create a gRPC blocking client (SyncClient)
val client = ServiceGrpc.newSyncClient(channel)
try {
  // Call gRPC server
  val ret = client.GreeterApi.sayHello("Airframe gRPC") // Hello Airframe gRPC!
}
finally {
  client.close()
}

gRPC Async Client

import example.api.ServiceGrpc
import io.grpc.stub.StreamObserver

// Create an async gRPC client
val client = ServiceGrpc.newAsyncClient(channel)

// Call gRPC server
client.GreeterApi.sayHello("Airframe gRPC", new StreamObserver[String] {
  def onNext(v: String): Unit = {
    // v == Hello Airframe gRPC!
  }
  def onError(t: Throwable): Unit = {
    // report the error
  }
  def onCompleted(): Unit = {
    // RPC call completion
  }
})

gRPC Streaming

To implement server/client/bi-directional streaming, define RPC endpoints with RxStream[A]argument or return types of airframe-rx. RxStream[A] describes reactive-streaming data from client or server-side, and it basically the same with Seq[A] other than the chained operators of RxStream[A] such as map, flatMap, etc. will be evaluated as a new streaming input of A arrives (i.e., reactive evaluation).

import wvlet.airframe.http.RPC
import wvlet.airframe.rx.{Rx, RxStream}

@RPC
trait GreeterStreaming {
  // Server streaming returns RxStream[X] value
  def serverStreaming(name: String): RxStream[String] = {
    Rx.sequence("Hello", "See you").map { x => s"${x} ${name}!" }
  }

  // Client streaming receives only one RxStream[X] argument
  def clientStreaming(names: RxStream[String]): String = {
    names
      .map { x => s"Hello ${x}!" }
      .toSeq // Rx[X].toSeq materialize the streaming inputs as a concrete Seq[X]
      .mkString(", ")
  }

  // Bidirectional streaming receives only one RxStream[X] argument and returns RxStream[Y] response
  def bidirectionalStreaming(names: RxStream[String]): RxStream[String] = {
    names.map { x => s"Hello ${x}!" }
  }
}

Launching Multiple gRPC Servers

To launch multiple gRPC serves, use GrpcServerFactory:

import wlvet.airframe._
import wvlet.airframe.http.grpc.{gRPC, GrpcServerFactory}

val d = Design.newDesign

d.build { f: GrpcServerFactory =>
  val s1 = f.newGrpcServer(
    gRPC.server.withName("grpc1").withPort(8080).withRouter(...)
  )
  val s2 = f.newGrpcServer(
    gRPC.server.withName("grpc2").withPort(8081).withRouter(...)
  )

  // Wait until all servers terminate
  f.awaitTermination
}

All gRPC servers created by the factory will be closed when the factory is closed.

Reading gRPC Metadata

To access gRPC Metadata containing HTTP2 headers, use GrpcContext.current method:

import wvlet.airframe.http.RPC
import wvlet.airframe.http.grpc.GrpcContext

@RPC
trait MyApi {
  def hello: String = {
    // Get the current context
    val ctx: Option[GrpcContext] = GrpcContext.current
    // Read gRPC Metadata
    ctx.map(_.metadata) // Option[Metadata]
    // ...
  }
}

RPC Internals

(This section describes the internals of Airframe RPC protocol. Just for using Airframe RPC, you can skip this section.)

Airframe RPC uses MessagePack (or JSON) for data transfer. All requests must use HTTP POST method and the URL path matching to /(RPC interface package name).(RPC interface name)/(method name). The request message body of an RPC request is a MessagePack Map representation of a sequence of key-value pairs of (method argument name) -> (method argument value).

When gRPC backend is used, the client must use HTTP/2 and the message body must be encoded as Length-Prefixed-Message defined in gRPC protocol.

RPC Protocol

Airframe RPC maps function calls to HTTP POST requests.

Let's see how RPC calls will be translated into HTTP requests using the following RPC interface example:

package hello.api.v1

import wvlet.airframe.http.RPC

@RPC
trait MyService {
  def hello(request: HelloRequest): HelloResponse
}

case class HelloRequest(name: String)

case class HelloResponse(message: String)

  • Method: POST
  • Path: /(package name).(RPC interface name)/(method name)
    • ex. POST /hello.api.v1.MyService/hello
  • Content-Type: application/msgpack (default), application/json, or application/grpc ( gRPC backend with HTTP/2)
  • Request body: JSON or MessagePack (default) representation of the method arguments. Each method parameter names and arguments need to be a key-value pair in the JSON object.
    • For an RPC method def m(p1:T1, p2:T2, ...), the request body will have the structrure of {"p1":(json representation of T1), "p2":(json representation of T2}, ...}. For example, the request to the above hello(request:HelloRequest) method will require the following JSON body:
{
  "request": {
    "name": "leo"
  }
}
  • Accept: "application/json" or "application/msgpack" (default)
  • Response body: JSON or MessagePack (default) representation of the method return type:
{
  "message": "..."
}
  • Http Status
    • 200 (Ok) for successful responses.
    • 400 (Bad Request) if some request parameters are invalid.

RPC Protocol (HTTP/2 for gRPC)

For gRPC backend, see also gRPC over HTTP2 protocol for the other HTTP headers.

// Save Length-Prefixed-Message representation of MessagePack Map value of {"name":"hello"}
// The first byte (0x00) means no compression
// The next 4 bytes (0x00 0x00 0x00 0x0c) represents the message body size = 12 bytes in big-endian
// Then, followed by MessagePack binary value
$ echo -n $'\x00\x00\x00\x00\x0c\x81\xa4\x6e\x61\x6d\x65\xa5\x68\x65\x6c\x6c\x6f' > hello.mspack

An example of calling Airframe gRPC method with Curl:

$ curl -v --raw --http2-prior-knowledge -X POST  -H "content-type: application/grpc" -H "TE: trailers" --data-binary @hello.mspack --output - http://localhost:8080/greeter.api.GreeterApi/hello

With accept: application/json header, JSON string can be passed instead of MessagePack. In this case, the response message format will be {"response":(RPC result)}.

← airframe-di: Dependency Injectionairframe-http: Creating REST Service →
  • Why Airframe RPC?
  • Airframe RPC: Overview
  • Usage
    • Basic Project Structure
    • sbt-airframe plugin
    • Open API
    • RPC Logging
    • RPC Filters
    • DI Integration
    • Object Serialization
    • RPCContext
    • Receiving Raw HTTP Responses
    • Reporting Errors with RPCStatus
    • RPC Request Retry
    • Reading RPCException at the client
    • Other Tips
  • Airframe gRPC
    • Defining gRPC API
    • Generating gRPC client
    • Starting An Airframe gRPC Server
    • gRPC Client
    • gRPC Streaming
    • Launching Multiple gRPC Servers
    • Reading gRPC Metadata
  • RPC Internals
    • RPC Protocol
    • RPC Protocol (HTTP/2 for gRPC)
Airframe
Docs
Documentation
Community
Gitter Chat
More
GitHubStar
airframe logo
Copyright © 2023 wvlet.org