Airframe

Airframe

  • Docs
  • Blog
  • Release Notes
  • GitHub

›Framework

Resources

  • Overview
  • Articles
  • Release Notes
  • Logos

Framework

  • airframe-di: Dependency Injection
  • Airframe RPC
  • airframe-http: Creating REST Service
  • airframe-rx: ReactiveX interface
  • AirSpec: Testing Framework

Core Modules

  • airframe-codec: Schema-On-Read Object Serializer
  • airframe-config: Application Config Flow
  • airframe-control: Retry/Rate Control
  • airframe-log: Application Logger
  • airframe-metrics: Human-Friendly Measures for Time and Data Size
  • airframe-surface: Object Shape Inspector

Utilities

  • airframe-benchmark: JMH Benchmark
  • airframe-canvas: Off-Heap Memory Manager
  • airframe-fluentd: Fluentd Logger
  • airframe-http-recorder: Web Request/Response Recorder
  • airframe-jdbc: JDBC Connection Pool
  • airframe-jmx: JMX Application Monitor
  • airframe-json: Pure-Scala JSON Parser
  • airframe-launcher: Command-Line Program Launcher
  • airframe-msgpack: Pure-Scala MessagePack Parser
  • airframe-parquet: Parquet Columnar File Reader and Writer
  • airframe-sql: SQL Parser
  • airframe-ulid: ULID Generator

airframe-http: Creating REST Service

airframe-http is a library for creating REST HTTP web servers at ease. airframe-http-netty is an extension of airframe-http to use Netty as a backend HTTP server.

  • Blog article: Airframe HTTP: Building Low-Friction Web Services Over Finagle

build.sbt

Maven Central

libraryDependencies += "org.wvlet.airframe" %% "airframe-http-netty" % (version)

Defining HTTP Endpoints

MyApi.scala


import wvlet.airframe.http.*
import wvlet.airframe.http.HttpMessage.{Request, Response}
import scala.concurrent.Future

object MyApi {
  case class User(name: String)
  case class NewUserRequest(name:String)
  case class ServerInfo(version:String, ua:Option[String])
}

// [Optional] Specify a common prefix for all endpoints
@Endpoint(path="/v1")
class MyApi {
  import MyApi._

  // Binding http path parameters (e.g., :name) to method arguments
  @Endpoint(method = HttpMethod.GET, path = "/user/:name")
  def getUser(name: String): User = User(name)

  // Receive a JSON request body {"user":"leo"} to generate NewUserRequest instance
  @Endpoint(method = HttpMethod.POST, path = "/user")
  def createNewUser(request:NewUserRequest): User = User(request.name)

  // To read http request headers, get it from RPCContext
  @Endpoint(method = HttpMethod.GET, path = "/info")
  def getInfo(): ServerInfo = {
    val request = RPCContext.current.httpRequest
    ServerInfo("1.0", request.userAgent)
  }

  // Returning Future[X] is also possible.
  // This style is convenient when you need to call another service that returns Future response.
  @Endpoint(method = HttpMethod.GET, path = "/info_f")
  def getInfoFuture(): Future[ServerInfo] = {
    val request = RPCContext.current.httpRequest
    Future.apply(ServerInfo("1.0", request.userAgent))
  }

  // It is also possible to return custom HTTP responses
  @EndPoint(method = HttpMethod.GET, path = "/custom_response")
  def customResponse: Response = {
    val response = Http.response().withContent("hello airframe-http")
    response
  }

  import com.twitter.io.{Buf,Reader}
  // [finagle-backend only] If you return a Reader, the response will be streamed (i.e., it uses less memory)
  @EndPoint(method = HttpMethod.GET, path = "/stream_response")
  def streamingResponse: Reader[User] = {
     Reader.fromSeq(Seq(User("leo"), User("yui")))
  }
}

This MyApi defines these http end points:

GET  /v1/user/:name    returns {"name":"..."}
POST /v1/user          returns {"name":"..."}
GET  /v1/info          returns {"version":"1.0", "ua":"...."}
GET  /v1/info2         returns {"version":"1.0", "ua":"...."}
GET  /v1/info_f        returns {"version":"1.0", "ua":"...."}
...

Mapping between JSON values and Scala objects will be handled automatically.

Path Parameter Types

patterndescriptionexampleinput examplebinding
:argsingle match/v1/user/:id/v1/user/1id = 1
*argtail match/v1/entry/*key/v1/entry/config/versionkey = config/version

*arg can be used only at the end of the path.

MessagePack Support

If an HTTP POST request has Content-Type: application/msgpack header, airframe-http will read the content body of the request as MessagePack data, and bind it to the method arguments using airframe-codec, which will manage data type conversions (e.g, from msgpack into Int or objects) automatically.

If an HTTP request has Accept: application/msgpack header, the response body will be encoded with MessagePack format. This is useful for reducing the response size and sending data to the client as is. For example, JSON cannot represent precise double values and binary data without some transformations. With MessagePack, you can send the data to the client more naturally.

Starting A Netty HTTP Server

To start a server, create a netty server configuration with Netty.server, and

import wvlet.airframe.http.*
import wvlet.airframe.http.netty.Netty

// Define API routes. This will read all @Endpoint annotations in MyApi
// You can add more routes by using `.add[X]` method.
val router = RxRouter.of[MyApi]

Netty.server
  .withPort(8080)
  .withRouter(router)
  .start { server =>
    // Netty http server will start here
    // Keep running the server 
    server.awaitTermination
  }

Customizing Netty

To customize Netty, use Netty.server.withXXX methods. For example, you can customize server name, port, logging, etc.:

import wvlet.airframe.http.*
improt wvlet.airframe.http.netty.Netty

val router = RxRouter.add[MyApi]

val server = Netty.server
  .withName("my server")
  .withRouter(router)
  .withPort(8080)
  // [optional] Add custom log entries
  .withExtraLogEntries { () => 
    val m = ListMap.newBuilder[String, Any]
    // Add a custom log entry
    m += "application_version" -> "1.0"
    // Add a thread-local parameter to the log
    RPCContext.current.getThreadLocal("user_id").map { uid =>
      m += "user_id" -> uid
    }
    m.result
  }
  // [optional] Disable server-side logging (log/http_server.json)
  .noLogging
  // [optional] Add a custom MessageCodec mapping
  .withCustomCodec{ case s: Surface.of[MyClass] => ... }

server.start { server =>
  // The customized server will start here
  server.waitServerTermination
}

Integration with Airframe DI

By calling .design, you can get a design for Airframe DI:

val design = Netty.server
 .withName("my-server")
 .withRouter(router)
 .withPort(8080)
 .design

design.build[NettyServer] { server =>
   // A server will start here
   
   // Wait until the server is terminated
   server.waitServerTermination
}
// The server will terminate after exiting the session

Running Multiple Netty Servers with Airframe DI

To run multiple HTTP servers with Airframe DI, wrapping NettyServer within different classes is recommended:

import wvlet.airframe.*
import wvlet.airframe.http.netty.Netty

class MyAppServer(server: NettyServer):
  export server.waitServerTermination

class AdminServer(server: NettyServer):
  export server.waitServerTermination

class MyService(myAppServer: MyAppServer, adminServer: AdminServer):
  def waitServerTermination: Unit = {
    myAppServer.waitServerTermination
    adminServer.waitServerTermination
  }

  // Explicitely stop the servers   
  def stop: Unit = {
    myAppServer.stop
    adminServer.stop
  }

case class ServiceConfig(port:Int, adminPort:Int)

val design = newDesign
  .bind[ServiceConfig].toInstance(ServiceConfig(8080, 8081))  
  .bind[MyAppServer].toProvider { (config: ServiceConfig, session: Session) =>
    Netty.server
     .withName("myapp")
     .withRouter(router1)
     .withPort(config.port) // port 8080
     .newServer(session)
  }
  .bind[AdminServer].toProvider { (config: ServiceConfig, session: Session) =>
    Netty.server
     .withName("admin")
     .withRouter(router2)
     .withPort(config.adminPort) // port 8081
     .newServer(session)
  }
  

design.build[MyService] { service =>
  // Two servers will start here
  
  // Await the MyApp server termination
  service.waitServerTermination
}

// After existing the scope, the servers will be stopped automatically (via AutoCloseable.close() method).

Static Content

To return static contents (e.g., html, image files, etc.), use StaticContent.fromResource(resourceBasePath, relativePath). This method finds a file from your class paths (e.g., files in dependency jar files or resource files).

import wvlet.airframe.http.*

class StaticContentServer {
  @Endpoint(path="/content/*path")
  def content(path:String) = StaticContent.fromResource(basePath = "/your/resource/package/path", path)
}

You can also add multiple resource paths or local directories to the search paths:

val sc = StaticContent
  .fromResource("/resource/path1")
  .fromResource("/resource/path2")
  .fromDirectory("/path/to/directory")

sc(path) // Create an HTTP response

Reporting Errors

To report server-side errors, you can throw HttpServerException with a custom HttpStatus code.

import wvlet.airframe.http.Http

// This will return 403 http response to the client
throw Http.serverException(HttpStatus.Forbidden_403)

If the endpoint returns Future type, returning just Future[Throwable] (will produce 500 response code) or Future[HttpServerException] to customize the response code by yourself will also work.

Throwing an RPCStatus as an exception will also work. An appropriate HTTP status code will be set automatically:

throw RPCStatus.INVALID_REQUEST_U1.newException("Unexpected message")

Returning Custom Error Responses

To return JSON or MsgPack responses, use Http.serverException(request, status, object, (codec factory)?):

import wvlet.airframe.http.Http

case class ErrorResponse(code:Int, message:String)

// The error response object will be converted into JSON by using airframe-codec:
throw Http.serverException(request, HttpStatus.Forbidden_403, ErrorResponse(100, "forbidden"))

// This will return {"code":100,"message":"forbidden"}

If the input request has Accept: application/x-msgpack header, the same code will translate the object into MessagePack format.

To fully customize the error response, use .withXXX methods:

val e = Http.serverException(HttpStatus.BadRequest_400)
    .withHeader(...)
    .withJson(...)

throw e

Filters

Router supports nesting HTTP request filters (RxHttpFilter) for authentication, logging, etc. before processing the final @EndPoint method:

import wvlet.airframe.*
import wvlet.airframe.http.*
import wvlet.airframe.rx.Rx

// Your endpoint definition
class MyApp {
  @Endpoint(method = HttpMethod.GET, path = "/user/:name")
  def getUser(name: String): User = User(name)
}

// Implement RxHttpFilter to define a custom filter 
// that will be applied before the endpoint processing.

class AuthFilter extends RxHttpFilter {
  def apply(request: Request, next: RxHttpEndpoint): Rx[Response] = {
    if (isValidAuth(request.authorization)) {
      // Call the next filter chain
      next(request)
    }
    else {
      // Reject the request
      throw RPCStatus.UNAUTHENTICATED_U13.newException("Invalid user")
    }
  }
}

// Add a filter and chain to the endpoint .andThen[X]:
val router = RxRouter
 .filter[AuthFilter]
 .andThen[MyApp]

Thread-Local Storage

To pass data between filters and applications, you can use thread-local storage in the context:

object AuthFilter extends RxHttpFilter {
  def apply(request: Request, next: RxHttpEndpoint): Rx[Response] = {
    if(authorize(request)) {
      request.getParam("user_id").map { uid =>
        // Pass a thread-local parameter to the parent response handler
        RPCContext.current.setThreadLocal("user_id", uid)
      }
    }
    next(request)
  }
}

object AuthLogFilter extends RxHttpFilter with LogSupport {
  def apply(request: Request, next: RxHttpEndpoint): Rx[Response] = {
    next(request).map { response =>
      // Read the thread-local parameter set in the context(request)
      RPCContext.current.getThreadLocal("user_id").map { uid =>
        info(s"user_id: ${uid}")
      }
      response
    }
  }
}


val router = RxRouter
  .filter(AuthLogFilter)
  .andThen(AuthFilter)
  .andThen[MyApp]

Using local variables inside filters will not work because the request processing will happen when Future[X] is evaluated, so we must use thead-local parmeter holder, which will be prepared for each request call.

Access Logs

airframe-http stores HTTP access logs at log/http-server.json by default in JSON format. When the log file becomes large, it will be compressed with gz and rotated automatically.

The default logger will record request parameters, request headers (except Authorization headers), response parameters, and response headers.

Example JSON logs:

{"time":1589319681,"event_time":"2020-05-12T14:41:21.567-0700","method":"GET","path":"/user/1","uri":"/user/1","request_size":0,"remote_host":"127.0.0.1","remote_port":52786,"host":"localhost:52785","connection":"Keep-Alive","user_agent":"okhttp/3.12.11","x_request_id":"10","content_length":"0","accept_encoding":"gzip","response_time_ms":714,"status_code":200,"status_code_name":"OK","response_content_type":"application/json;charset=utf-8"}
{"time":1589319681,"event_time":"2020-05-12T14:41:21.573-0700","method":"GET","path":"/user/info","uri":"/user/info?id=2&name=kai","query_string":"id=2&name=kai","request_size":0,"remote_host":"127.0.0.1","remote_port":52786,"host":"localhost:52785","connection":"Keep-Alive","user_agent":"okhttp/3.12.11","x_request_id":"10","content_length":"0","accept_encoding":"gzip","response_time_ms":921,"status_code":200,"status_code_name":"OK","response_content_type":"application/json;charset=utf-8"}

For most of the cases, using the default logger is sufficient. If necessary, you can customize the logging by using your own request/response loggers:

import wvlet.airframe.http.netty.*

Netty
  .server
  .withHttpLoggerConfig {
    _.withLogFilter { (m: Map[String, Any]) =>
      // You can customize the log entries here
      m
    }
  }

Reading Access Logs with Fluentd

The generated HTTP access log files can be processed in Fluentd. For example, if you want to store access logs to Treasure Data, add the following in_tail fluentd configuration:

<source>
  @type tail
  # Your log file location and position file
  path     /var/log/http_server.json
  pos_file /var/log/td-agent/http_server.json.pos
  # [Optional] Append tags to the log (For using td-agent)
  tag      td.(your database name).http_access
  format   json
  time_key time
</source>

HTTP Clients

airframe-http has several HTTP client implementations (Java's http client, OkHttp client, URLConnection client, etc.). The http client has a built-in retry logic (for 5xx http status code, connection failures) and circuit breakers.

Default Http Client

import wvlet.airframe.http._
import wvlet.airframe.http.HttpMessage.{Request,Response}

// Creating a default HTTP client
val client = Http.client.newSyncClient("http://localhost:8080")

// Send a new GET request
// Note: Use client.send(Http.xxx) for throwing HttpClientException upon 4xx, 5xx errors
val response = client.sendSafe(Http.GET("/v1/info"))

// Simple a request and receives a raw http response. You can customize headers too:
val r: Response = client.send(Http.GET("/path").withHeader(....))

// Send POST request with JSON body and read the response as an MyObj object
case class MyObj(id: Int, name: String)
val myobj = client.readAs[MyObj](Http.POST("/path2").withJson(...))

// Send an object data as the request body, and receive the response as ResponseType object
// JSON/MessagePack data will be transformed internally
val resp: ResponseType = client.call[RequestType, ResponseType](Http.POST("/path3"), requestDataObj) 

Customizing HTTP clients

You can customize various configuration of the http clients:

Http.client
  // Add an HTTP header
  .withRequestFilter(_.withAuthorization("Bearer xxx"))
  // Change the number of retry
  .withRetryContext(_.withMaxRetry(10)) 
  // Set a connection timeout
  .withConnectionTimeout(Duration(60, TimeUnit.SECONDS))
  .newSyncClient("http://localhost:8080")

Rx-based async client

An async-http client is a new addition since 23.5.0 and is useful when chaining responses from remote servers or rendering DOM in Scala.js using RPC responses.

val client = Http.client.newAsyncClient("https://...") 
val rx = client.send(Http.GET("...")) // Returns Rx[HttpMessage.Response]
rx.toRxStream.map(x => x.contentString) // Returns Rx[String]

Rx[A] value will not trigger any execution until it is evaluated by the other framework (e.g., airframe-http RPC, airframe-rx-http, AirSpec test runner, etc.)

In case you need to explicitly extract a value from the response, you can use rx.run { event => ...}

URLConnection client for Java 8

For compatibility with Java 8, you can use URLConnection-based client:

import wvlet.airframe.http.Http

val client = Http.client
  .withBackend(URLConnectionClientBackend)
  .newSyncClient("http://localhost:8080")

Note: URLConnection-based client cannot send PATCH requests due to a bug of JDK

OkHttp Client

libraryDependencies += "org.wvlet.airframe" %% "airframe-http-okhttp" % (version)
import wvlet.airframe.http.okhttp.OkHttp
// Create an OkHttp-based sync http client
val client = OkHttp.client.newSyncClient(host_name)

Finagle Http Client

(deprecated. Use Http.client instead)

import wvlet.airframe.http.finagle.Finagle

// Asynchronous HTTP client backed by Finagle (Using twitter-util Future)
Finagle.client.newClient("http://localhost:8080")

// a Finagle-based sync client
Finagle.client.newSyncClient(host_name)
← Airframe RPCairframe-rx: ReactiveX interface →
  • Defining HTTP Endpoints
    • Path Parameter Types
    • MessagePack Support
  • Starting A Netty HTTP Server
  • Customizing Netty
  • Integration with Airframe DI
    • Running Multiple Netty Servers with Airframe DI
  • Static Content
  • Reporting Errors
    • Returning Custom Error Responses
  • Filters
    • Thread-Local Storage
  • Access Logs
    • Reading Access Logs with Fluentd
  • Default Http Client
  • Customizing HTTP clients
  • Rx-based async client
    • URLConnection client for Java 8
  • OkHttp Client
  • Finagle Http Client
Airframe
Docs
Documentation
Community
Gitter Chat
More
GitHubStar
airframe logo
Copyright © 2024 wvlet.org