Skip to content

Onnx

Runs ONNX model inference for general machine learning tasks. This processor can either use an existing model reference defined in the resources section or create a new model instance using the provided URI.

Both input and output labels can be found using https://netron.app/.

Input

The onnx processor accepts float arrays (tensors) and uses the inputTensorName setting to present them to the model as a named input.

Outputs

The inference result is post-processed to convert some of the ONNX specific data types to Java (e.g. OnnxSequence). Note that most models will already output data as map and rarely return OnnxMap as a single field. Processing result is encapsulated as an InferenceResult type object that contains the original input tensor and the output values.

When outputTensorNames is provided the results will be filtered by those names which improve performance. This is because ONNX is storing result data of-heap and accessing result values comes with the cost.

Models

Supports standard ONNX models (.onnx files) for various machine learning tasks including classification, regression, and anomaly detection.

Downloads

The modelUri can point to a location on a local disk (using file:// scheme) or on a remote storage. Remote storage support depends on available plugins, e.g., S3 plugin allows downloads from an S3-compatible bucket.

.processWith(OnnxProcessorConfigBuilder.builder()
    .withModelResource(value)
    .withModelUri(value)
    .withInputTensorName(value)
    .withOutputTensorNames(value)
    .withPrintDownloadProgress(value)
    .withCache(builder -> builder
        .withDirectory(value)
        .withMaxCacheSize(value)
        .withExpirationTime(value)
        .withCleanupOnStart(value)
    )
    .withSessionOptions(builder -> builder
        .withIntraOpNumThreads(value)
        .withInterOpNumThreads(value)
        .withGraphOptimizationLevel(value)
        .withExecutionMode(value)
        .withMemoryPatternOptimization(value)
        .withEnableCpuMemArena(value)
        .withProfilerFilePath(value)
        .withEnableCuda(value)
        .withCudaDeviceId(value)
        .withCudaExecutionProviderOptions(value)
        .withEnableCpu(value)
        .withEnableMemoryReuse(value)
        .withLogSeverityLevel(value)
        .withLogId(value)
        .withOnnxSessionConfigurator(value)
    )
)
processor:
  onnx:
    modelResource: value
    modelUri: value
    inputTensorName: value
    outputTensorNames: value
    printDownloadProgress: value
    cache:
      directory: value
      maxCacheSize: value
      expirationTime: value
      cleanupOnStart: value
    sessionOptions:
      intraOpNumThreads: value
      interOpNumThreads: value
      graphOptimizationLevel: value
      executionMode: value
      memoryPatternOptimization: value
      enableCpuMemArena: value
      profilerFilePath: value
      enableCuda: value
      cudaDeviceId: value
      cudaExecutionProviderOptions: value
      enableCpu: value
      enableMemoryReuse: value
      logSeverityLevel: value
      logId: value
      onnxSessionConfigurator: value

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-onnx-api</artifactId>
    <version>1.6.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-onnx-api', version: '1.6.0'

Properties

modelResource

Reference to an existing ONNX model resource. If specified, modelUri is ignored. Type: object

modelUri

URI to the ONNX model file. Required if modelResource is not specified. Type: string

inputTensorName

Name of the input tensor in the ONNX model. Required.

Type: string

outputTensorNames

Specifies the names of output tensors to retrieve from the ONNX model. When provided, only the tensors matching these names are included in the results. If omitted, all available output tensors are passed to the next operator. Type: array

printDownloadProgress

Whether to display progress information during model file downloads. Type: boolean

Default value: false

cache

This configuration controls how model files are cached locally, including the cache location, size limits, expiration policy, and cleanup behavior. If not provided files will be cached in the /tmp directory. Type: object

Fields of cache:

cache.directory

Directory where files will be cached. If not specified, a temporary directory will be created. Type: string

cache.maxCacheSize

Maximum size of the cache in bytes. Files will be evicted when the cache exceeds this size. Use 0 for unlimited. Type: number

Default value: 0

cache.expirationTime

Duration after which cached files are considered stale and will not be used by the system. Type: object

cache.cleanupOnStart

Whether to clean up expired or invalid cache entries when the cache is initialized. Type: boolean

Default value: false

sessionOptions

Configuration options for OrtSession.SessionOptions. Type: object

Fields of sessionOptions:

sessionOptions.intraOpNumThreads

Number of threads used to parallelize the execution within nodes. Type: number

sessionOptions.interOpNumThreads

Number of threads used to parallelize the execution of the graph (across nodes). Type: number

sessionOptions.graphOptimizationLevel

Optimization level enum. Allowed values are: - NO_OPT - disable all optimizations - BASIC_OPT - enable basic optimizations - EXTENDED_OPT = enable all optimizations, - ALL_OPT = enable all optimizations and also enable extended optimizations. Type: object

Supported values: no_opt, basic_opt, extended_opt, all_opt.

Default value: ALL_OPT

sessionOptions.executionMode

Execution mode. Supported values are: sequential, parallel. Type: object

Supported values: sequential, parallel.

Default value: PARALLEL

sessionOptions.memoryPatternOptimization

Enable memory pattern optimization. Type: boolean

Default value: true

sessionOptions.enableCpuMemArena

Enable CPU memory arena. Default is true. Type: boolean

Default value: true

sessionOptions.profilerFilePath

The file to write profile information to. Enables profiling. Type: string

sessionOptions.enableCuda

Enable CUDA execution provider. Type: boolean

Default value: false

sessionOptions.cudaDeviceId

CUDA device ID to use. Type: number

Default value: 0

sessionOptions.cudaExecutionProviderOptions

CUDA execution provider options as key-value pairs. Type: object

sessionOptions.enableCpu

Enable CPU execution provider. Default is true. Type: boolean

Default value: true

sessionOptions.enableMemoryReuse

Enable memory reuse. Default is true. Type: boolean

Default value: true

sessionOptions.logSeverityLevel

Log severity level. 0 = verbose, 1 = info, 2 = warning, 3 = error, 4 = fatal. Type: number

sessionOptions.logId

Log ID. Type: string

sessionOptions.onnxSessionConfigurator

This consumer is optional and complements the existing options exposed by the VoltSp builder. It enables full configuration of the ONNX session, including memory management, execution behavior, and optimization settings. Type: object

JSON Schema

You can validate or explore the configuration using its JSON Schema.

Usage Examples

version: 1 name: NetworkIntrusionDetection

resources: - name: "s3-models-storage" s3: credentials: accessKey: "..." secretKey: "..."

source: stdin: {}

processors: - onnx: modelUri: "s3-models-storage://models/intrusion-detection.onnx" inputTensorName: "input" outputTensorNames: - "numbers_out" printDownloadProgress: true cache: directory: "/tmp/models/"

sink: stdout: {}

From the pipeline’s perspective, ONNX requires an input request type that implements org.voltdb.stream.plugin.onnx.api.InferenceRequest. Implementations of InferenceRequest provide access to the underlying tensor values used during model inference.

The example model expects float_input as a input tensor name, and among other outputs output_label as String[].

Let's examine pseudo code.

public record Event(long id, float[] numbers) {}

public record EventInferenceRequest(Event event) implements InferenceRequest {

    @Override
    public float[] getInputVector() {
        return event.numbers();
    }
}

public class EventConsumer implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder builder) {
        builder
            .consumeFromSource(...)
            .processWith(event -> new EventInferenceRequest(event))
            .processWith(OnnxProcessorConfigBuilder.<EventInferenceRequest>builder()
                .withInputTensorName("float_input")
                .withOutputTensorNames("output_label"))
            .processWith(result -> {
                EventInferenceRequest request = result.request();
                Event event = request.event();

                Map<String, Object> output = result.outputTensor();
                String[] ol = (String[]) output.get("output_label");
                String label = ol[0];

                return new ClassifiedEvent(event.id(), label);
            })
            ...
    }
}