Onnx¶
Runs ONNX model inference for general machine learning tasks. This processor can either
use an existing model reference defined in the resources section or
create a new model instance using the provided URI.
Both input and output labels can be found using https://netron.app/.
Input¶
The onnx processor accepts float arrays (tensors) and uses the inputTensorName
setting to present them to the model as a named input.
Outputs¶
The inference result is post-processed to convert some of the ONNX specific data types to Java
(e.g. OnnxSequence). Note that most models will already output data as map and rarely return OnnxMap as a single field.
Processing result is encapsulated as an InferenceResult type object that contains the original input tensor
and the output values.
When outputTensorNames is provided the results will be filtered by those names which improve performance.
This is because ONNX is storing result data of-heap and accessing result values comes with the cost.
Models¶
Supports standard ONNX models (.onnx files) for various machine learning tasks including
classification, regression, and anomaly detection.
Downloads¶
The modelUri can point to a location on a local disk (using file:// scheme) or on a remote storage.
Remote storage support depends on available plugins, e.g., S3 plugin allows downloads from an S3-compatible bucket.
.processWith(OnnxProcessorConfigBuilder.builder()
.withModelResource(value)
.withModelUri(value)
.withInputTensorName(value)
.withOutputTensorNames(value)
.withPrintDownloadProgress(value)
.withCache(builder -> builder
.withDirectory(value)
.withMaxCacheSize(value)
.withExpirationTime(value)
.withCleanupOnStart(value)
)
.withSessionOptions(builder -> builder
.withIntraOpNumThreads(value)
.withInterOpNumThreads(value)
.withGraphOptimizationLevel(value)
.withExecutionMode(value)
.withMemoryPatternOptimization(value)
.withEnableCpuMemArena(value)
.withProfilerFilePath(value)
.withEnableCuda(value)
.withCudaDeviceId(value)
.withCudaExecutionProviderOptions(value)
.withEnableCpu(value)
.withEnableMemoryReuse(value)
.withLogSeverityLevel(value)
.withLogId(value)
.withOnnxSessionConfigurator(value)
)
)
processor:
onnx:
modelResource: value
modelUri: value
inputTensorName: value
outputTensorNames: value
printDownloadProgress: value
cache:
directory: value
maxCacheSize: value
expirationTime: value
cleanupOnStart: value
sessionOptions:
intraOpNumThreads: value
interOpNumThreads: value
graphOptimizationLevel: value
executionMode: value
memoryPatternOptimization: value
enableCpuMemArena: value
profilerFilePath: value
enableCuda: value
cudaDeviceId: value
cudaExecutionProviderOptions: value
enableCpu: value
enableMemoryReuse: value
logSeverityLevel: value
logId: value
onnxSessionConfigurator: value
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-onnx-api</artifactId>
<version>1.6.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-onnx-api', version: '1.6.0'
Properties¶
modelResource¶
Reference to an existing ONNX model resource. If specified, modelUri is ignored.
Type: object
modelUri¶
URI to the ONNX model file. Required if modelResource is not specified.
Type: string
inputTensorName¶
Name of the input tensor in the ONNX model. Required.
Type: string
outputTensorNames¶
Specifies the names of output tensors to retrieve from the ONNX model.
When provided, only the tensors matching these names are included in the results.
If omitted, all available output tensors are passed to the next operator.
Type: array
printDownloadProgress¶
Whether to display progress information during model file downloads.
Type: boolean
Default value: false
cache¶
This configuration controls how model files are cached locally, including the cache location,
size limits, expiration policy, and cleanup behavior. If not provided files will be cached in the /tmp directory.
Type: object
Fields of cache:
cache.directory¶
Directory where files will be cached. If not specified, a temporary directory will be created.
Type: string
cache.maxCacheSize¶
Maximum size of the cache in bytes. Files will be evicted when the cache exceeds this size. Use 0 for unlimited.
Type: number
Default value: 0
cache.expirationTime¶
Duration after which cached files are considered stale and will not be used by the system.
Type: object
cache.cleanupOnStart¶
Whether to clean up expired or invalid cache entries when the cache is initialized.
Type: boolean
Default value: false
sessionOptions¶
Configuration options for OrtSession.SessionOptions.
Type: object
Fields of sessionOptions:
sessionOptions.intraOpNumThreads¶
Number of threads used to parallelize the execution within nodes.
Type: number
sessionOptions.interOpNumThreads¶
Number of threads used to parallelize the execution of the graph (across nodes).
Type: number
sessionOptions.graphOptimizationLevel¶
Optimization level enum. Allowed values are:
- NO_OPT - disable all optimizations
- BASIC_OPT - enable basic optimizations
- EXTENDED_OPT = enable all optimizations,
- ALL_OPT = enable all optimizations and also enable extended optimizations.
Type: object
Supported values: no_opt, basic_opt, extended_opt, all_opt.
Default value: ALL_OPT
sessionOptions.executionMode¶
Execution mode. Supported values are: sequential, parallel.
Type: object
Supported values: sequential, parallel.
Default value: PARALLEL
sessionOptions.memoryPatternOptimization¶
Enable memory pattern optimization.
Type: boolean
Default value: true
sessionOptions.enableCpuMemArena¶
Enable CPU memory arena. Default is true.
Type: boolean
Default value: true
sessionOptions.profilerFilePath¶
The file to write profile information to. Enables profiling.
Type: string
sessionOptions.enableCuda¶
Enable CUDA execution provider.
Type: boolean
Default value: false
sessionOptions.cudaDeviceId¶
CUDA device ID to use.
Type: number
Default value: 0
sessionOptions.cudaExecutionProviderOptions¶
CUDA execution provider options as key-value pairs.
Type: object
sessionOptions.enableCpu¶
Enable CPU execution provider. Default is true.
Type: boolean
Default value: true
sessionOptions.enableMemoryReuse¶
Enable memory reuse. Default is true.
Type: boolean
Default value: true
sessionOptions.logSeverityLevel¶
Log severity level. 0 = verbose, 1 = info, 2 = warning, 3 = error, 4 = fatal.
Type: number
sessionOptions.logId¶
Log ID.
Type: string
sessionOptions.onnxSessionConfigurator¶
This consumer is optional and complements the existing options exposed by the VoltSp builder.
It enables full configuration of the ONNX session, including memory management, execution behavior, and optimization settings.
Type: object
JSON Schema¶
You can validate or explore the configuration using its JSON Schema.
Usage Examples¶
version: 1 name: NetworkIntrusionDetection
resources: - name: "s3-models-storage" s3: credentials: accessKey: "..." secretKey: "..."
source: stdin: {}
processors: - onnx: modelUri: "s3-models-storage://models/intrusion-detection.onnx" inputTensorName: "input" outputTensorNames: - "numbers_out" printDownloadProgress: true cache: directory: "/tmp/models/"
sink: stdout: {}
From the pipeline’s perspective, ONNX requires an input request type that implements org.voltdb.stream.plugin.onnx.api.InferenceRequest.
Implementations of InferenceRequest provide access to the underlying tensor values used during model inference.
The example model expects float_input as a input tensor name, and among other outputs output_label as String[].
Let's examine pseudo code.
public record Event(long id, float[] numbers) {}
public record EventInferenceRequest(Event event) implements InferenceRequest {
@Override
public float[] getInputVector() {
return event.numbers();
}
}
public class EventConsumer implements VoltPipeline {
@Override
public void define(VoltStreamBuilder builder) {
builder
.consumeFromSource(...)
.processWith(event -> new EventInferenceRequest(event))
.processWith(OnnxProcessorConfigBuilder.<EventInferenceRequest>builder()
.withInputTensorName("float_input")
.withOutputTensorNames("output_label"))
.processWith(result -> {
EventInferenceRequest request = result.request();
Event event = request.event();
Map<String, Object> output = result.outputTensor();
String[] ol = (String[]) output.get("output_label");
String label = ol[0];
return new ClassifiedEvent(event.id(), label);
})
...
}
}