Skip to content

Parquet-encoder

The encoder consumes a stream of events and encodes them into parquet record. All records are stored in local files in configured directory. When file size or record count constraint is reached the encoder closes the local file and emits it to next operator for further processing.

.processWith(ParquetEncoderConfigBuilder.builder()
    .withDirectory(value)
    .withPrefix(value)
    .withMaxFileSize(builder -> builder
        .withBytes(value)
    )
    .withMaxRecords(value)
    .withCloseOnIdleTimeout(value)
    .withMessageTypeProvider(value)
    .withMessageWriterFactory(value)
)
processor:
  parquet-encoder:
    directory: value
    prefix: value
    maxFileSize:
      bytes: value
    maxRecords: value
    closeOnIdleTimeout: value
    messageTypeProvider: value
    messageWriterFactory: value

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-parquet-api</artifactId>
    <version>1.7.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-parquet-api', version: '1.7.0'

Properties

directory

Directory where parquet files will be stored Type: string

prefix

Prefix for parquet files Type: string

Default value: data

maxFileSize

Maximum file size in bytes before emitting Type: object

Default value: 128mb

Fields of maxFileSize:

maxFileSize.bytes

Type: number

maxRecords

Maximum number of records before rolling Type: number

Default value: 1000000

closeOnIdleTimeout

When stream is idle, encoder will close parquet file after configured timeout. 0 or negative value means to never close a file by a timer.

Type: object

Default value: PT0S

messageTypeProvider

Provides parquet's message type definition. Required.

Type: object

messageWriterFactory

A factory to create a parquet message writer Required.

Type: object

JSON Schema

You can validate or explore the configuration using its JSON Schema.

Usage Examples

builder .consumeFromSource(...) .processWith(ParquetEncoderConfigBuilder.builder() .withDirectory("/parquet/output/path") .withMaxFileSize(MemoryUnit.ofBytes(96)) .withMaxRecords(100) .withCloseOnIdleTimeout(Duration.ofMinutes(1)) .withPrefix("events") .withMessageTypeProvider(new ReflectiveMessageTypeProvider(Event.class)) .withMessageWriterFactory(new ReflectiveMessageWriterFactory<>(Event.class)) ) .processWith(path -> { ... })