Airframe

Airframe

  • Docs
  • Blog
  • Release Notes
  • GitHub

›Utilities

Resources

  • Overview
  • Articles
  • Release Notes
  • Logos

Framework

  • airframe-di: Dependency Injection
  • Airframe RPC
  • airframe-http: Creating REST Service
  • airframe-rx: ReactiveX interface
  • AirSpec: Testing Framework

Core Modules

  • airframe-codec: Schema-On-Read Object Serializer
  • airframe-config: Application Config Flow
  • airframe-control: Retry/Rate Control
  • airframe-log: Application Logger
  • airframe-metrics: Human-Friendly Measures for Time and Data Size
  • airframe-surface: Object Shape Inspector

Utilities

  • airframe-benchmark: JMH Benchmark
  • airframe-canvas: Off-Heap Memory Manager
  • airframe-fluentd: Fluentd Logger
  • airframe-http-recorder: Web Request/Response Recorder
  • airframe-jdbc: JDBC Connection Pool
  • airframe-jmx: JMX Application Monitor
  • airframe-json: Pure-Scala JSON Parser
  • airframe-launcher: Command-Line Program Launcher
  • airframe-msgpack: Pure-Scala MessagePack Parser
  • airframe-parquet: Parquet Columnar File Reader and Writer
  • airframe-sql: SQL Parser
  • airframe-ulid: ULID Generator

airframe-parquet: Parquet Columnar File Reader and Writer

airframe-parquet is a library for reading and writing for Scala objects using Parquet columnar data format.

Usage

Maven Central

libraryDependencies ++= Seq(
  "org.wvlet.airframe" %% "airframe-parquet" % "(version)"
  // Use your own hadoop version
  "org.apache.hadoop"  % "hadoop-client"  % "3.4.0",
  // [Optional] For supporting S3
  "org.apache.hadoop"  % "hadoop-aws"  % "3.4.0",
  // [Optional] For using custom AWS credential provider
  "software.amazon.awssdk" % "auth" % "2.25.13"
)
import wvlet.airframe.parquet.Parquet

case class MyEntry(id: Int, name:String)

// Writing objects to file
val writer = Parquet.newWriter[MyEntry](path = "data.parquet")
writer.write(MyEntry(1, "leo"))
writer.write(MyEntry(2, "yui"))
// Ensure writing entries to the file
writer.close()

// Reading Parquet data as objects
val reader = Parquet.newReader[MyEntry](path = "data.parquet")
val e1 = reader.read() // MyEntry(1,"leo")
val e2 = reader.read() // MyEntry(2,"yui")
reader.read() // null
reader.close()

// Reading records as Map[String, Any]
val mapReader = Parquet.newReader[Map[String, Any]](path = "data.parquet")
val m1 = mapReader.read() // Map("id"->1, "name" -> "leo")
val m2 = mapReader.read() // Map("id"->2, "name" -> "yui")
mapReader.read() // null
mapReader.close()

// Reading records as Json
import wvlet.airframe.json.Json
val jsonReader = Parquet.newReader[Json](path = "data.parquet")
val j1 = jsonReader.read() // {"id":1,"name":"leo"}
val j2 = jsonReader.read() // {"id":2,"name":"yui"} 
jsonReader.read() // null
jsonReader.close()

// Writing dynamically generated records
import org.apache.parquet.schema._
// Create a Parquet schema
val schema = new MessageType(
  "MyEntry",
  Types.required(PrimitiveTypeName.INT32).named("id"),
  Types.optional(PrimitiveTypeName.BINARY).as(stringType).named("name")
)
// Create a record writer for the given schema
val recordWriter = Parquet.newRecordWriter(path = "record.parquet", schema = schema)
// Write a record using Map (column name -> value)
recordWriter.write(Map("id" -> 1, "name" -> "leo"))
// Write a record using JSON object
recordWriter.write("""{"id":2, "name":"yui"}""")
// Write a record using Array
recordWriter.write(Seq(3, "aina"))
// Write a record using JSON array
recordWriter.write("""[4, "xxx"]""")
recordWriter.close()


// In case you need to write dynamic recoreds containing case classes,
// register the Surfaces of these classes
case class Nested(id:Int, entry:MyEntry)
val nestedRecordWriter = Parquet.newRecordWriter(
  path = "nested.parquet",
  // You can build a Parquet schema matching to Surface
  schema = Parquet.toParquetSchema(Surface.of[Nested]),
  knownSurfaces = Seq(Surface.of[MyEntry]) // required to serialize MyEntry
)

// Write dynamic records
nestedRecordWriter.write(Map("id" -> 1, "entry" -> MyEntry(1, "yyy"))
nestedRecordWriter.write(Map("id" -> 2, "entry" -> MyEntry(2, "zzz"))
nestedRecordWriter.close()

Using with AWS S3

airframe-parquet uses HadoopFileSystem for reading data from S3. hadoopConf needs to be configured for AWS authentication.

import org.apache.hadoop.conf.Configuration

val conf = new Configuration()
// Option 1: Using AWS keys
conf.set("fs.s3a.access.key", "...")
conf.set("fs.s3a.secret.key", "...")

// Option 2: Using a custom AWS credential provider implementing com.amazonaws.auth.AWSCredentialsProvider
conf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

// Use s3a:// prefix to specify an S3 path, and pass hadoopConf
Parquet.newReader[MyEntry](path = "s3a://my-bucket/data.parquet", hadoopConf = conf)

For other configuration parameters, see also hadoop-aws documentation.

Querying Parquet with A Simple SQL

To apply column projection and predicate filtering, you can use SQL statements. The syntax of SQL is select column1, column2, ... from _ where (column condition). The input table name must be just _ (underscore). The where clause condition supports only a limited set of predicates, =, !=, <, >, <=, >=, BETWEEN, OR, AND, IS NULL, IS NOT NULL, etc., where the left operator is a column name.

Projecting columns:

// Selecting a subset of columns with SQL
val reader = Parquet.query[Json](path = "data.parquet", sql = "select id from _")
reader.read() // {"id":1}
reader.read() // {"id":2}
reader.read() // null

Filtering records:

// Selecting a subset of columns with SQL
val reader = Parquet.query[Json](path = "data.parquet", sql = "select * from _ where id = 2")
reader.read() // {"id":2}
reader.read() // null

Column Projection with Model Classes

If you need to read only a subset of columns, use a model class that has fewer parameters from the original model class. The Parquet reader will access only to the column blocks of the specified column in the model class parameters:

case class MyRecord(p1:Int, p2: String, p3:Boolean)

val writer = Parquet.newWriter[MyRecord](path = "record.parquet")
writer.write(...)
writer.close()

case class MyRecordProjection(p1:Int, p3:Boolean)
val reader = Parquet.newReader[MyRecordProjection](path = "record.parquet")

// Only p1 and p3 columns will be read from the Parquet file
reader.read() // MyRecordProjection(p1, p3)
reader.close()

Applying Row Group Filter with Parquet FilterApi

Parquet can skip reading records by using row group filters. You can use FilterAPI of parquet-mr to build such a filter:

import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi

// Describing filter condition using parquet-mr FilterApi
val filter = FilterCompat.get(
  FilterApi.eq(
    FilterApi.intColumn("id"),
    Integer.valueOf(100) // Need to use Java primitive values
  )
)

val reader = Parquet.newReader[MyEntry](
  path = "data.parquet",
  // Set your filter here
  config = _.withFilter(filter)
)

Read Column Statistics

// Read Parquet metadat to get column statistics
val stats: Map[String, ColumnStatistics] = Parquet.readStatistics("data.parquet")
// Map(id -> ColumnStatistics(numNulls = Some(0), uncompressedSize = Some(..), .., minValue = Some(1), maxValue = Some(2)), ... )
← airframe-msgpack: Pure-Scala MessagePack Parserairframe-sql: SQL Parser →
  • Usage
    • Using with AWS S3
  • Querying Parquet with A Simple SQL
  • Column Projection with Model Classes
  • Applying Row Group Filter with Parquet FilterApi
  • Read Column Statistics
Airframe
Docs
Documentation
Community
Gitter Chat
More
GitHubStar
airframe logo
Copyright © 2024 wvlet.org