Compressing Custom Data Formats
A simple way to enhance generic compressors is to parse the input data before compression. Furthermore, if the data is homogeneous training can also be done. After parsing into structured outputs, OpenZL offers a training tool that clusters these outputs to exploit correlations and tries multiple compression strategies per cluster to create a good compressor.
Running the example
First generate some data using parsing_data_generator_correlated.py
(which can be found in openzl/examples
folder). This is in its own custom format. We will use /tmp/openzl
as the working directory for this exercise and generate a single file to compress. Training Orchestration will properly set up a test/train split for your data. For now, we will train and test on the single file. Inside /tmp/openzl
, create a train
directory and an empty file called correlated
. The following command will populate the file with data:
Run:
Next, run cmake. If you used these cmake steps to buildzli
you can run the following command to run cmake.
Then go to the cmakebuild/examples
directory and run the training binary:
Now if you run:
You should get the following output (note training is non-determinisitc so results may vary):
We can compare this to zstd lv1 to lv10 where we get:
1#correlated : 854156 -> 478813 (x1.784), 288.0 MB/s, 727.3 MB/s
2#correlated : 854156 -> 354016 (x2.413), 253.4 MB/s, 789.9 MB/s
3#correlated : 854156 -> 344204 (x2.482), 168.0 MB/s, 780.2 MB/s
4#correlated : 854156 -> 337962 (x2.527), 134.1 MB/s, 774.5 MB/s
5#correlated : 854156 -> 327736 (x2.606), 86.8 MB/s, 743.6 MB/s
6#correlated : 854156 -> 327251 (x2.610), 69.5 MB/s, 795.9 MB/s
7#correlated : 854156 -> 325568 (x2.624), 67.1 MB/s, 787.6 MB/s
8#correlated : 854156 -> 324584 (x2.632), 57.9 MB/s, 785.7 MB/s
9#correlated : 854156 -> 324416 (x2.633), 54.4 MB/s 786.7 MB/s
10#correlated : 854156 -> 324350 (x2.633), 50.3 MB/s, 795.1 MB/s
Training
The following diagram illustrates the flow of the data through the system to produce a trained compressor.
graph LR
classDef hidden display: none;
parser[Parser];
trainer[Trainer];
serializer[Compressor Serializer];
input1:::hidden -->|Training Data| parser;
parser --> |Parsed output|trainer;
parser --> |Parsed output|trainer;
parser --> |Parsed output|trainer;
trainer --> |ClusteringConfig| serializer;
serializer --> |Serialized Compressor|output:::hidden
In this three step process of parsing, clustering and compressing, the trainer is designed to handle the choice of how to cluster and compress. The parser produces structured outputs, then the trainer searches the space of compressors with the parsed outputs grouped (or clustered) in different ways, and uses the provided set of successors to produce a good compressor.
graph LR
classDef hidden display: none;
parser[Parser];
cluster[Clustering Graph];
Succ1[Successor 1];
Succ2[Successor 2];
input:::hidden ---> |Serialized Input|parser
parser ---> |Parsed Output|cluster
parser ---> |Parsed Output|cluster
parser ---> |Parsed Output|cluster
parser ---> |Parsed Output|cluster
cluster ---> |Clustered outputs|Succ1
cluster ---> |Clustered outputs|Succ1
cluster ---> |Clustered outputs|Succ2
Note
The clustering graph is a standard graph in OpenZL that is ingests configuration to perform clustering. This allows a configuration drives the graph to choose how parsed inputs are clustered, and the successors these clustered inputs go to. The trainer works by paramaterizing the clustering graph with different configurations, and searching for the parametrization that compresses optimally. The ZL_Clustering_Config
struct is the configuration used for the clustering graph.
Since the search space is very big and running compression is required to test the performance of a compressor, this operation is quite slow if ran on the entire set of data which motivates separating the data into training and test sets. As long as the training set has similar compressability properties to the test set, it is sufficient to search on a small training set to find correlation and the correct set of successors for the output to use.
This page presents a toy example using a custom data format on how to write a parser, and integrate it with training. The full example source is here. We will highlight important snippets of the code to explain the process.
Parsing
The data format
This section describes the "numeric arrays" format assumed by the example. Conceptually, this is a concatenation of an unspecified number of 1, 2, 4, or 8-byte wide integers. The file consists of some number of array sectors, each with the following format:
- First, a 4-byte little-endian integer
n
representing the number of bytes in the data section of this array. - A 1-byte number
w
specifying the width of the data in bytes. - A 4-byte little-endian integer
t
representing the numeric tag of the following data block. This tag is an identifier for the data source of the block where using the same tag indicates that the data is similar in structure with other blocks with the same tag. We will explain in more detail later why this field is useful. - A data block containing
n
bytes, representing an array ofn / w
integers of widthw
.
In this exercise, we use the generator script examples/parsing_data_generator_correlated.py
to generate random data that fits this data format. We generate a single file to do basic testing.
Writing a parser
The first step of this process is to write a parser for this data format. Parsing is a component of the compressor and a function graph is the way to implement this component in OpenZL.
The general strategy is:
- Create a ZL_FunctionGraphFn that takes the unparsed input
- Lex the input into separate outputs based on semantic meaning
ZL_Edge_runDispatchNode()
to separate the input into constituent outputs based on the lexed chunks.- Send each constituent output to a successor graph.
Setting up the parser
OpenZL dictates that every graph function matches the signature of ZL_FunctionGraphFn.
Here, the unparsed input will live in the first input, so we grab pointers to the raw data.
static ZL_Report parsingCompressorGraphFn(
ZL_Graph* graph,
ZL_Edge* inputEdges[],
size_t numInputs) noexcept
{
// Sets up the error context for rich error messages.
ZL_RESULT_DECLARE_SCOPE_REPORT(graph);
assert(numInputs == 1);
const ZL_Input* const input = ZL_Edge_getData(inputEdges[0]);
const uint8_t* const inputData = (const uint8_t*)ZL_Input_ptr(input);
const size_t inputSize = ZL_Input_numElts(input);
dispatch
. But before we can do that, we must lex the input so the dispatcher knows what to do. The dispatch is called with the following snippet:
const ZL_DispatchInstructions instructions = {
.segmentSizes = sizes.data(),
.tags = dispatchIdxs.data(),
.nbSegments = sizes.size(),
.nbTags = currentDispatchIdx,
};
ZL_TRY_LET(
ZL_EdgeList,
dispatchEdges,
ZL_Edge_runDispatchNode(inputEdges[0], &instructions));
assert(dispatchEdges.nbEdges == 2 + currentDispatchIdx);
Note
Under the hood, ZL_Edge_runDispatchNode()
calls ZL_NODE_DISPATCH
, which takes input a list of sizes segmentSizes[]
and dispatch indices tags[]
. It will iteratively copy chunks of size segmentSizes[i]
to output tags[i]
until the entire segment list has been processed.
Our next goal is to lex the input to populate these segment sizes and tags so the dispatch node splits the input properly.
Lexing the input
For this example, we aim to lex the input such that the same types of data are grouped. We define a tag for number of bytes, element width and numeric tags. We also need to track the tags of the data blocks.
We need to track the mapping from tags to dispatch indices, so we create a mapping for this, and have a counter currentDispatchIdx
to track the total number of unique tags that each correspond to an output. This will allow us to ensure all inputs with the same tag are dispatched to the same output. It is also necessary to store a reverse mapping to pass the necessary metadata to the clustering graph later. This will be explained in more detail later.
std::unordered_map<uint32_t, uint32_t> tagToDispatchIdx;
// Reverse mapping from dispatchIdxs to tag
std::unordered_map<uint32_t, uint32_t> dispatchIdxToTag;
// Current dispatch index
uint32_t currentDispatchIdx = 0;
constexpr unsigned kNumBytesTag = 100;
constexpr unsigned kEltWidthTag = 101;
constexpr unsigned kInputTag = 102;
dispatchIdxToTag[currentDispatchIdx] = kNumBytesTag;
tagToDispatchIdx[kNumBytesTag] = currentDispatchIdx++;
dispatchIdxToTag[currentDispatchIdx] = kEltWidthTag;
tagToDispatchIdx[kEltWidthTag] = currentDispatchIdx++;
dispatchIdxToTag[currentDispatchIdx] = kInputTag;
tagToDispatchIdx[kInputTag] = currentDispatchIdx++;
// A tag that tracks the current dispatch index
for (size_t inputPos = 0; inputPos < inputSize;) {
// Return an error if there isn't enough bytes for a header.
ZL_ERR_IF_LT(inputSize - inputPos, 5, srcSize_tooSmall);
const uint32_t numBytes = readLE32(inputData + inputPos);
const uint8_t eltWidth = inputData[inputPos + 4];
const uint32_t inputTag = readLE32(inputData + inputPos + 5);
// Increment dispatch index if there is an unseen tag
if (tagToDispatchIdx.count(inputTag) == 0) {
dispatchIdxToTag[currentDispatchIdx] = inputTag;
tagToDispatchIdx[inputTag] = currentDispatchIdx++;
}
eltWidths.push_back(eltWidth);
ZL_ERR_IF_NE(numBytes % eltWidth, 0, corruption);
inputPos += 9;
ZL_ERR_IF_LT(inputSize - inputPos, numBytes, srcSize_tooSmall);
inputPos += numBytes;
dispatchIdxs.push_back(tagToDispatchIdx[kNumBytesTag]);
sizes.push_back(4);
dispatchIdxs.push_back(tagToDispatchIdx[kEltWidthTag]);
sizes.push_back(1);
dispatchIdxs.push_back(tagToDispatchIdx[kInputTag]);
sizes.push_back(4);
dispatchIdxs.push_back(tagToDispatchIdx[inputTag]);
sizes.push_back(numBytes);
}
Setting output metadata
We can set metadata on the output to provide information to future graphs or nodes that will use that output. The clustering graph (implemented in src/openzl/compress/graphs/generic_clustering_graph.c
) is a standard graph in OpenZL that uses metadata of its inputs to figure out how it should be clustered. It is compulsory to tag each output if training is going to be run on the parsed outputs.
The purpose of tags is for the trainer to be able to identify homogeneous inputs. The trainer builds a compressor with configuration that always compresses data with the same tag and type with the same successor. For example, if a dataset contains a column of names of cities, and is split across multiple files, it is the parser's responsibility to tag the column of names with the same tag for every file. This way, in a future unseen file, the column of city names can be compressed in the same manner. It is ideal to give separate tags to different 'columns' of the same file, because training can make decisions about clustering at the tag granularity.
In our case, we have five fixed inputs with different types of information. We give each of these outputs a different metadata tag and store all output ZL_Edge*
pointers in a unified std::vector
for convenience. It is better to give a different metadata tag if it is unknown if outputs are similar since the trainer has the ability to group the data if it is better grouped.
// Send tags and sizes to compress generic
ZL_ERR_IF_ERR(ZL_Edge_setDestination(
dispatchEdges.edges[0], ZL_GRAPH_COMPRESS_GENERIC));
ZL_ERR_IF_ERR(ZL_Edge_setDestination(
dispatchEdges.edges[1], ZL_GRAPH_COMPRESS_GENERIC));
dispatchEdges.edges += 2;
// The outputs with indices [0, 5) are for the tags, sizes and the 3
// metadata fields. Set the metadata according to the mapping
for (size_t i = 0; i < 3; i++) {
ZL_ERR_IF_ERR(ZL_Edge_setIntMetadata(
dispatchEdges.edges[i],
ZL_CLUSTERING_TAG_METADATA_ID,
dispatchIdxToTag[i]));
outputEdges.push_back(dispatchEdges.edges[i]);
}
We then handle remaining numeric array outputs and set metadata for them too.
Type conversions
The dispatch node interprets outputs data as a ZL_Type_Serial
type, therefore to take advantage of the fact the data is inherently LE data of a specified fixed width, we need to run a conversion node on the output. ZL_Node_interpretAsLE
interprets serial input data as LE data with the specified number of bits, returning output data with ZL_Type_Numeric
type.
for (size_t i = 0; i < eltWidths.size(); i++) {
// Creates a node that interprets serial data as little-endian numeric
// and converts to the specified eltWidth output
ZL_NodeID node = ZL_Node_interpretAsLE(eltWidths[i] * 8);
auto dispatchIdx = i + 3;
ZL_TRY_LET_CONST(
ZL_EdgeList,
convertEdges,
ZL_Edge_runNode(dispatchEdges.edges[dispatchIdx], node));
assert(convertEdges.nbEdges == 1);
// The input format specifies that the input tag is written to the 4
// bytes specified. Assigns tag accordingly
ZL_ERR_IF_ERR(ZL_Edge_setIntMetadata(
convertEdges.edges[0],
ZL_CLUSTERING_TAG_METADATA_ID,
dispatchIdxToTag[dispatchIdx]));
outputEdges.push_back(convertEdges.edges[0]);
}
std::vector
for convenience.
Sending outputs to a successor
We set up a custom graph as the destination for all the parsed outputs. We check that a custom graph has been provided and send the outputs all to this graph.
// Get the custom graphs
ZL_GraphIDList customGraphs = ZL_Graph_getCustomGraphs(graph);
// Expect there to be one custom graph to handle the outputs produced by the
// parser
ZL_ERR_IF_NE(customGraphs.nbGraphIDs, 1, graphParameter_invalid);
// Expect the number of output edges are equal to the output edges produced
// by dispatch as conversion is single input single output
assert(outputEdges.size() == dispatchEdges.nbEdges - 2);
// Send to custom graph chosen. The clustering graph is the intended
// destination to be chosen here.
ZL_ERR_IF_ERR(ZL_Edge_setParameterizedDestination(
outputEdges.data(),
outputEdges.size(),
customGraphs.graphids[0],
NULL));
ZL_GRAPH_COMPRESS_GENERIC
for this graph if we do not want to use training and just want to compress our output. This gives the following result:
Currently, ZL_GRAPH_COMPRESS_GENERIC
does not utilize any correlation across outputs and uses a fixed successor for all outputs of a given type. The generic clustering graph is the intended destination in this scenario.
Graph registration
A graph used in a compressor must be registered on the compressor first if it is a non-standard graph.
auto parsingCompressorGraph = compressor.getGraph("Parsing Compressor");
if (!parsingCompressorGraph) {
ZL_Type inputTypeMask = ZL_Type_serial;
ZL_FunctionGraphDesc parsingCompressor = {
.name = "!Parsing Compressor",
.graph_f = openzl::examples::parsingCompressorGraphFn,
.inputTypeMasks = &inputTypeMask,
.nbInputs = 1,
.customGraphs = NULL,
.nbCustomGraphs = 0,
.localParams = {},
};
parsingCompressorGraph =
compressor.registerFunctionGraph(parsingCompressor);
}
For the graph to be serializable, its base graph must be registered without LocalParams
and without customGraphs
. We use the registerFunctionGraph
function to register this graph.
The next step is to paramaterize the graph with the required parameters. In our case, the graph requires the parameterized clustering graph to be passed in. In the previous scenario where we stated ZL_GRAPH_COMPRESS_GENERIC
was used to compress the input, we pass in the graph registered via ZL_Clustering_registerGraph
as the clustering graph instead. We use the function parameterizeGraph
on the Compressor
object to do this parameterization.
std::vector<ZL_GraphID> customGraphs = { clusteringGraph };
openzl::GraphParameters parsingCompressorGraphParams = {
.customGraphs = std::move(customGraphs),
};
parsingCompressorGraph = compressor.parameterizeGraph(
parsingCompressorGraph.value(), parsingCompressorGraphParams);
After creating a generic registration function for the parser, we must create a specialized compressor profile for the parser. The compressor profile allows you to specify successors, and optionally clustering codecs. Good successors to pick here depends on the parsed data. For example, time series data will work well with compressors which have ZL_NODE_DELTA
. A reasonable set of successors can be found in custom_parsers/shared_components/clustering.cpp
. We have also built a graph builder(Documentation under construction) that can be used at this stage of successor selection.
Running training and compression
After handling I/O, create a compressor, register the new compressor profile built and set the starting graph for the compressor as this graph. Then we can can train directly by calling train
with the appropriate parameters.
openzl::Compressor compressor;
auto graphId = registerGraph_ParsingCompressor(compressor);
openzl::unwrap(
ZL_Compressor_selectStartingGraphID(compressor.get(), graphId),
"Failed to select starting graph ID",
compressor.get());
openzl::training::TrainParams trainParams = {
.compressorGenFunc = createCompressorFromSerialized,
.threads = 1,
.clusteringTrainer = openzl::training::ClusteringTrainer::Greedy,
};
auto multiInputs = openzl::training::inputSetToMultiInputs(*inputs);
auto serialized =
openzl::training::train(multiInputs, compressor, trainParams)[0];
compressorGenFunc
requires a function to create a compressor from a serialized compressor registering dependencies.
static std::unique_ptr<openzl::Compressor> createCompressorFromSerialized(
openzl::poly::string_view serialized)
{
auto compressor = std::make_unique<openzl::Compressor>();
registerGraph_ParsingCompressor(*compressor);
compressor->deserialize(serialized);
return compressor;
}
train
function returns a serialized compressor which can be saved. In this example, we ignore the serialized compressor and directly test the trained compressor on our data.
openzl::CCtx cctx;
std::string out;
auto testCompressor = createCompressorFromSerialized(*serialized);
// Try compressing every file provided
for (const auto& inputPtr : *inputs) {
cctx.setParameter(openzl::CParam::FormatVersion, ZL_MAX_FORMAT_VERSION);
cctx.refCompressor(*testCompressor);
// Allocate size for output buffer
out.resize(openzl::compressBound(inputPtr->contents().size()));
auto csize = cctx.compressSerial(out, inputPtr->contents());
std::cerr << "Compressed " << inputPtr->contents().size()
<< " bytes to " << csize << std::endl;
}
Training Orchestration
Read here for how to set up some basic training orchestration. So far, we have tested on a single file. This will provide more details on how to use compressor serialization and set up a test/train split for your data.
Full example source
// Copyright (c) Meta Platforms, Inc. and affiliates.
#include <chrono>
#include <fstream>
#include <iostream>
#include <string>
#include <unordered_map>
#include <vector>
#include "openzl/codecs/zl_clustering.h"
#include "openzl/codecs/zl_conversion.h"
#include "openzl/cpp/CCtx.hpp"
#include "openzl/cpp/Compressor.hpp"
#include "openzl/cpp/DCtx.hpp"
#include "openzl/zl_compressor.h"
#include "openzl/zl_graph_api.h"
#include "tools/io/InputFile.h"
#include "tools/io/InputSetBuilder.h"
#include "tools/training/train.h"
#include "tools/training/utils/utils.h"
#include "examples/example_utils.h"
#include "openzl/common/logging.h"
#include "openzl/shared/mem.h" // Cheat with a private header for ZL_readLE32
#include "openzl/zl_errors.h"
namespace openzl::examples {
static uint32_t readLE32(const void* ptr)
{
return ZL_readLE32(ptr);
}
static ZL_Report parsingCompressorGraphFn(
ZL_Graph* graph,
ZL_Edge* inputEdges[],
size_t numInputs) noexcept
{
// Sets up the error context for rich error messages.
ZL_RESULT_DECLARE_SCOPE_REPORT(graph);
assert(numInputs == 1);
const ZL_Input* const input = ZL_Edge_getData(inputEdges[0]);
const uint8_t* const inputData = (const uint8_t*)ZL_Input_ptr(input);
const size_t inputSize = ZL_Input_numElts(input);
std::vector<unsigned> dispatchIdxs;
std::vector<size_t> sizes;
// Field used for node conversion
std::vector<uint8_t> eltWidths;
std::unordered_map<uint32_t, uint32_t> tagToDispatchIdx;
// Reverse mapping from dispatchIdxs to tag
std::unordered_map<uint32_t, uint32_t> dispatchIdxToTag;
// Current dispatch index
uint32_t currentDispatchIdx = 0;
constexpr unsigned kNumBytesTag = 100;
constexpr unsigned kEltWidthTag = 101;
constexpr unsigned kInputTag = 102;
dispatchIdxToTag[currentDispatchIdx] = kNumBytesTag;
tagToDispatchIdx[kNumBytesTag] = currentDispatchIdx++;
dispatchIdxToTag[currentDispatchIdx] = kEltWidthTag;
tagToDispatchIdx[kEltWidthTag] = currentDispatchIdx++;
dispatchIdxToTag[currentDispatchIdx] = kInputTag;
tagToDispatchIdx[kInputTag] = currentDispatchIdx++;
// A tag that tracks the current dispatch index
// The format is:
//
// [4-byte little-endian num-bytes]
// [1-byte element-width]
// [4-byte input tag]
// [(num-bytes)-byte data]
// ...
//
// We're going to separate the number of elements and element width fields
// into their own outputs. Then we'll separate the numeric data into streams
// based on the element width.
for (size_t inputPos = 0; inputPos < inputSize;) {
// Return an error if there isn't enough bytes for a header.
ZL_ERR_IF_LT(inputSize - inputPos, 5, srcSize_tooSmall);
const uint32_t numBytes = readLE32(inputData + inputPos);
const uint8_t eltWidth = inputData[inputPos + 4];
const uint32_t inputTag = readLE32(inputData + inputPos + 5);
// Increment dispatch index if there is an unseen tag
if (tagToDispatchIdx.count(inputTag) == 0) {
dispatchIdxToTag[currentDispatchIdx] = inputTag;
tagToDispatchIdx[inputTag] = currentDispatchIdx++;
}
eltWidths.push_back(eltWidth);
ZL_ERR_IF_NE(numBytes % eltWidth, 0, corruption);
inputPos += 9;
ZL_ERR_IF_LT(inputSize - inputPos, numBytes, srcSize_tooSmall);
inputPos += numBytes;
dispatchIdxs.push_back(tagToDispatchIdx[kNumBytesTag]);
sizes.push_back(4);
dispatchIdxs.push_back(tagToDispatchIdx[kEltWidthTag]);
sizes.push_back(1);
dispatchIdxs.push_back(tagToDispatchIdx[kInputTag]);
sizes.push_back(4);
dispatchIdxs.push_back(tagToDispatchIdx[inputTag]);
sizes.push_back(numBytes);
}
// Dispatch each field of the input based on the tag & the size.
// We'll end up with numTags + 2 output streams.
// The first output stream is the tags stream (content of tags vector).
// The second output stream is the sizes stream (content of sizes vector).
// Then there is an output stream for each tag.
const ZL_DispatchInstructions instructions = {
.segmentSizes = sizes.data(),
.tags = dispatchIdxs.data(),
.nbSegments = sizes.size(),
.nbTags = currentDispatchIdx,
};
ZL_TRY_LET(
ZL_EdgeList,
dispatchEdges,
ZL_Edge_runDispatchNode(inputEdges[0], &instructions));
assert(dispatchEdges.nbEdges == 2 + currentDispatchIdx);
// A list to store all output edges without a destination
std::vector<ZL_Edge*> outputEdges;
outputEdges.reserve(dispatchEdges.nbEdges);
// Send tags and sizes to compress generic
ZL_ERR_IF_ERR(ZL_Edge_setDestination(
dispatchEdges.edges[0], ZL_GRAPH_COMPRESS_GENERIC));
ZL_ERR_IF_ERR(ZL_Edge_setDestination(
dispatchEdges.edges[1], ZL_GRAPH_COMPRESS_GENERIC));
dispatchEdges.edges += 2;
// The outputs with indices [0, 5) are for the tags, sizes and the 3
// metadata fields. Set the metadata according to the mapping
for (size_t i = 0; i < 3; i++) {
ZL_ERR_IF_ERR(ZL_Edge_setIntMetadata(
dispatchEdges.edges[i],
ZL_CLUSTERING_TAG_METADATA_ID,
dispatchIdxToTag[i]));
outputEdges.push_back(dispatchEdges.edges[i]);
}
// Convert the serial streams to numeric streams as specified by the format
assert(eltWidths.size() == dispatchEdges.nbEdges - 5);
for (size_t i = 0; i < eltWidths.size(); i++) {
// Creates a node that interprets serial data as little-endian numeric
// and converts to the specified eltWidth output
ZL_NodeID node = ZL_Node_interpretAsLE(eltWidths[i] * 8);
auto dispatchIdx = i + 3;
ZL_TRY_LET_CONST(
ZL_EdgeList,
convertEdges,
ZL_Edge_runNode(dispatchEdges.edges[dispatchIdx], node));
assert(convertEdges.nbEdges == 1);
// The input format specifies that the input tag is written to the 4
// bytes specified. Assigns tag accordingly
ZL_ERR_IF_ERR(ZL_Edge_setIntMetadata(
convertEdges.edges[0],
ZL_CLUSTERING_TAG_METADATA_ID,
dispatchIdxToTag[dispatchIdx]));
outputEdges.push_back(convertEdges.edges[0]);
}
// Get the custom graphs
ZL_GraphIDList customGraphs = ZL_Graph_getCustomGraphs(graph);
// Expect there to be one custom graph to handle the outputs produced by the
// parser
ZL_ERR_IF_NE(customGraphs.nbGraphIDs, 1, graphParameter_invalid);
// Expect the number of output edges are equal to the output edges produced
// by dispatch as conversion is single input single output
assert(outputEdges.size() == dispatchEdges.nbEdges - 2);
// Send to custom graph chosen. The clustering graph is the intended
// destination to be chosen here.
ZL_ERR_IF_ERR(ZL_Edge_setParameterizedDestination(
outputEdges.data(),
outputEdges.size(),
customGraphs.graphids[0],
NULL));
return ZL_returnSuccess();
}
} // namespace openzl::examples
static ZL_GraphID registerParsingCompressorGraph(
openzl::Compressor& compressor,
const ZL_GraphID clusteringGraph)
{
auto parsingCompressorGraph = compressor.getGraph("Parsing Compressor");
if (!parsingCompressorGraph) {
ZL_Type inputTypeMask = ZL_Type_serial;
ZL_FunctionGraphDesc parsingCompressor = {
.name = "!Parsing Compressor",
.graph_f = openzl::examples::parsingCompressorGraphFn,
.inputTypeMasks = &inputTypeMask,
.nbInputs = 1,
.customGraphs = NULL,
.nbCustomGraphs = 0,
.localParams = {},
};
parsingCompressorGraph =
compressor.registerFunctionGraph(parsingCompressor);
}
std::vector<ZL_GraphID> customGraphs = { clusteringGraph };
openzl::GraphParameters parsingCompressorGraphParams = {
.customGraphs = std::move(customGraphs),
};
parsingCompressorGraph = compressor.parameterizeGraph(
parsingCompressorGraph.value(), parsingCompressorGraphParams);
return parsingCompressorGraph.value();
}
static ZL_GraphID registerGraph_ParsingCompressor(
openzl::Compressor& compressor)
{
/* Use an empty default config if we don't know
* what data we are working with. nbClusters and nbTypeDefaults must be 0 if
* pointers are uninitialized. */
ZL_ClusteringConfig defaultConfig{
.nbClusters = 0,
.nbTypeDefaults = 0,
};
/* A set of successors we expect may be useful for our data set. */
std::vector<ZL_GraphID> successors = {
ZL_GRAPH_STORE,
ZL_GRAPH_ZSTD,
ZL_GRAPH_COMPRESS_GENERIC,
ZL_Compressor_registerStaticGraph_fromNode1o(
compressor.get(), ZL_NODE_DELTA_INT, ZL_GRAPH_FIELD_LZ),
};
/* Create the clustering graph */
ZL_GraphID clusteringGraph = ZL_Clustering_registerGraph(
compressor.get(),
&defaultConfig,
successors.data(),
successors.size());
return registerParsingCompressorGraph(compressor, clusteringGraph);
}
static std::unique_ptr<openzl::Compressor> createCompressorFromSerialized(
openzl::poly::string_view serialized)
{
auto compressor = std::make_unique<openzl::Compressor>();
registerGraph_ParsingCompressor(*compressor);
compressor->deserialize(serialized);
return compressor;
}
static void train_example(
const std::string& inputDir,
const std::string& outputPath)
{
openzl::tools::io::InputSetBuilder builder(true);
auto inputs =
openzl::tools::io::InputSetBuilder(true).add_path(inputDir).build();
openzl::Compressor compressor;
auto graphId = registerGraph_ParsingCompressor(compressor);
openzl::unwrap(
ZL_Compressor_selectStartingGraphID(compressor.get(), graphId),
"Failed to select starting graph ID",
compressor.get());
openzl::training::TrainParams trainParams = {
.compressorGenFunc = createCompressorFromSerialized,
.threads = 1,
.clusteringTrainer = openzl::training::ClusteringTrainer::Greedy,
};
auto multiInputs = openzl::training::inputSetToMultiInputs(*inputs);
auto serialized =
openzl::training::train(multiInputs, compressor, trainParams)[0];
openzl::CCtx cctx;
std::string out;
auto testCompressor = createCompressorFromSerialized(*serialized);
// Try compressing every file provided
for (const auto& inputPtr : *inputs) {
cctx.setParameter(openzl::CParam::FormatVersion, ZL_MAX_FORMAT_VERSION);
cctx.refCompressor(*testCompressor);
// Allocate size for output buffer
out.resize(openzl::compressBound(inputPtr->contents().size()));
auto csize = cctx.compressSerial(out, inputPtr->contents());
std::cerr << "Compressed " << inputPtr->contents().size()
<< " bytes to " << csize << std::endl;
}
// Saves a compressor to the designated path
std::ofstream output(outputPath);
if (!output) {
std::cerr << "Error opening file for writing: " << outputPath
<< std::endl;
}
output << *serialized;
output.close();
}
static void test_example(
const std::string& inputDir,
const std::string& compressorPath)
{
openzl::tools::io::InputSetBuilder builder(true);
auto inputs =
openzl::tools::io::InputSetBuilder(true).add_path(inputDir).build();
openzl::tools::io::InputFile compressorFile(compressorPath);
openzl::Compressor compressor;
// Register dependencies
registerGraph_ParsingCompressor(compressor);
// Deserialize the compressor
compressor.deserialize(compressorFile.contents());
// Statistics
size_t totalCompressedSize = 0;
size_t totalUncompressedSize = 0;
size_t cTimeUs = 0;
size_t dTimeUs = 0;
// Benchmark compressions
for (auto& inputPtr : *inputs) {
std::string out;
openzl::CCtx cctx;
cctx.refCompressor(compressor);
cctx.setParameter(openzl::CParam::FormatVersion, ZL_MAX_FORMAT_VERSION);
const auto& contents = inputPtr->contents();
const auto uncompressedSize = contents.size();
out.resize(openzl::compressBound(contents.size()));
auto start = std::chrono::high_resolution_clock::now();
auto csize = cctx.compressSerial(out, contents);
auto end = std::chrono::high_resolution_clock::now();
out.resize(csize);
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
end - start);
cTimeUs += duration.count();
// Do a decompression to benchmark decompression and ensure data round
// trips
start = std::chrono::high_resolution_clock::now();
openzl::DCtx dctx;
auto regen = dctx.decompressSerial(out);
end = std::chrono::high_resolution_clock::now();
duration = std::chrono::duration_cast<std::chrono::microseconds>(
end - start);
dTimeUs += duration.count();
if (regen != contents) {
throw std::runtime_error(
"Data mismatch! Compression did not round trip.");
}
totalCompressedSize += csize;
totalUncompressedSize += uncompressedSize;
}
auto cMbps = totalUncompressedSize / (double)cTimeUs;
auto compressionRatio = totalUncompressedSize / (double)totalCompressedSize;
auto dMbps = totalUncompressedSize / (double)dTimeUs;
// Print out the statistics
fprintf(stderr,
"Compressed %zu bytes to %zu bytes(cMbps: %.2f MB/s, dMbps %.2f MB/s, %.2fx)\n",
totalUncompressedSize,
totalCompressedSize,
cMbps,
dMbps,
compressionRatio);
}
int main(int argc, char** argv)
{
// Use debug log level
ZL_g_logLevel = ZL_LOG_LVL_DEBUG;
{
if (argc != 4) {
fprintf(stderr,
"Usage: %s <train|test> <input folder> <output path>\n",
argv[0]);
fprintf(stderr,
"\tTrains data in the format: [32-bit num-bytes][8-bit element-width][num-bytes data]...");
return 1;
}
}
const std::string command = argv[1];
const std::string inputDir = argv[2];
const std::string outputPath = argv[3];
if (command == "train") {
train_example(inputDir, outputPath);
} else if (command == "test") {
test_example(inputDir, outputPath);
}
return 0;
}