Skip to content

Parsing

Installation

From PyPI:

pip install openzl

From source:

git clone https://github.com/facebook/openzl
cd openzl/py
pip install .

Running the example

This example is self-contained and can be run after installing OpenZL and NumPy. The full source can be found in examples/py/parsing.py.

python3 examples/py/parsing.py

Imports

import random
import struct
from typing import List

import numpy as np
import openzl.ext as zl

Generate test data

We'll generate simple data in the following format.

[4-byte number of elements]
[1-byte element width]
[(number-of-elements * element-width) bytes of data]
...

It is a trivial example, but it is enough to show that you can improve compression by parsing the data, especially if the data is numeric.

NUM_SAMPLES = 10


def generate_test_data() -> List[bytes]:
    FMT = {
        1: "B",
        2: "H",
        4: "I",
        8: "Q",
    }

    rng = random.Random(0)
    samples = []
    for _ in range(NUM_SAMPLES):
        sample = []
        for _ in range(100):
            elt_width = rng.choice([1, 2, 4, 8])
            num_elts = rng.randint(1000, 10000)
            max_value = rng.randint(127, 255)

            values = [rng.randint(0, max_value) for _ in range(num_elts)]
            sample.append(
                struct.pack(
                    f"<IB{num_elts}{FMT[elt_width]}", num_elts, elt_width, *values
                )
            )
        samples.append(b"".join(sample))

    return samples


TEST_DATA_SET = generate_test_data()
TEST_DATA_SET_SIZE = sum(len(s) for s in TEST_DATA_SET)

Setting up the parser

OpenZL allows constructing graphs at runtime, similar to the Selector that we saw in the quick start example, but more powerful. This is done through FunctionGraphs. Inside a function graph you can:

  • Inspect the input edge, and the data attached to that edge.
  • Run a node on any edge, and get the resulting edges back.
  • Send an edge to a destination graph.
  • Inspect parameters.
  • Try compression with [tryGraph][openzl.ext.GraphState.tryGraph] and see the compressed size.

At the end of a function graph, every incoming edge, and newly created edge has to be consumed by running a node on it, or sending it to a destination graph.

In this example, we'll be parsing the input and separating data sections by their element width. Then we'll convert the outputs into numeric data, and compress them using OpenZL's generic numeric compression graph.

class ParsingFunctionGraph(zl.FunctionGraph):
    """
    Parses data in the format:
    [4-byte number of elements]
    [1-byte element width]
    [(num_elts * element_width) bytes of data]
    """

    def __init__(self) -> None:
        super().__init__()

    def function_graph_description(self) -> zl.FunctionGraphDescription:
        return zl.FunctionGraphDescription(
            name="parsing_function_graph",
            input_type_masks=[zl.TypeMask.Serial],
        )

    def graph(self, state: zl.GraphState) -> None:
        # Get the input data to the graph
        data = state.edges[0].input.content.as_bytes()

        # We'll split the data into different outputs.
        # Each metadata field gets its own output
        NUM_ELTS_TAG = 0
        ELT_WIDTH_TAG = 1
        # Each integer width gets its own output
        UINT8_TAG = 2
        UINT16_TAG = 3
        UINT32_TAG = 4
        UINT64_TAG = 5
        NUM_TAGS = 6

        WIDTH_TO_TAG = {
            1: UINT8_TAG,
            2: UINT16_TAG,
            4: UINT32_TAG,
            8: UINT64_TAG,
        }

        # Lex the input into tags identifying each segment, and the size of that segment.
        tags = []
        sizes = []

        while len(data) > 0:
            if len(data) < 5:
                raise ValueError("Invalid data: bad header")
            num_elts, elt_width = struct.unpack("<IB", data[:5])
            data = data[5:]
            if elt_width not in {1, 2, 4, 8}:
                raise ValueError(f"Invalid element width: {elt_width}")
            if num_elts * elt_width > len(data):
                raise ValueError(
                    f"Invalid number of elements: {num_elts} * {elt_width} > {len(data)}"
                )

            # Lex the metadata
            tags.append(NUM_ELTS_TAG)
            sizes.append(4)
            tags.append(ELT_WIDTH_TAG)
            sizes.append(1)

            # Lex the data
            tags.append(WIDTH_TO_TAG[elt_width])
            sizes.append(num_elts * elt_width)

            data = data[num_elts * elt_width :]

        # Run the dispatch node. This creates two singleton streams: the tags and sizes.
        # It also creates NUM_TAGS streams, one for each tag following those.
        dispatch = zl.nodes.DispatchSerial(
            segment_tags=tags, segment_sizes=sizes, num_tags=NUM_TAGS
        )
        (
            tags_edge,
            sizes_edge,
            num_elts_edge,
            elt_width_edge,
            uint8_edge,
            uint16_edge,
            uint32_edge,
            uint64_edge,
        ) = dispatch.run(state.edges[0])

        # We've handled the input edge. Now every newly created edge needs to be handled.

        # The tags and sizes edges are already numeric, so we just send them to the numeric graph.
        self._numeric_graph(tags_edge)
        self._numeric_graph(sizes_edge)

        # The dispatched edges are all serial. They need to be converted to numeric first.
        self._conversion_graph(num_elts_edge, width=4)
        self._conversion_graph(elt_width_edge, width=1)
        self._conversion_graph(uint8_edge, width=1)
        self._conversion_graph(uint16_edge, width=2)
        self._conversion_graph(uint32_edge, width=4)
        self._conversion_graph(uint64_edge, width=8)

    def _numeric_graph(self, edge: zl.Edge) -> None:
        # Just send the data to the generic numeric compression graph
        zl.graphs.Compress().set_destination(edge)

    def _conversion_graph(self, edge: zl.Edge, width: int) -> None:
        # Convert the data to numeric in little-endian format with the given width.
        conversion = zl.nodes.ConvertSerialToNumLE(int_size_bytes=width)
        edge = conversion.run(edge)[0]
        # Send the converted edge to the numeric graph
        self._numeric_graph(edge)

Set up the compressor

All you have to do is register the ParsingFunctionGraph with the compressor. This gives you a GraphID that can be used as the starting graph, or as a component in a larger graph.

def build_parsing_compressor() -> zl.Compressor:
    compressor = zl.Compressor()
    graph = compressor.register_function_graph(ParsingFunctionGraph())
    compressor.select_starting_graph(graph)
    return compressor

Putting it all together

This all looks very similar to the quick start example.

def build_zstd_compressor() -> zl.Compressor:
    compressor = zl.Compressor()
    graph = zl.graphs.Zstd()(compressor)
    compressor.select_starting_graph(graph)
    return compressor


def compress(compressor: zl.Compressor, data: bytes) -> bytes:
    cctx = zl.CCtx()

    # Tell the cctx to use the compressor we've built.
    cctx.ref_compressor(compressor)

    # Select the OpenZL format version to encode with.
    # This should be the latest version that the decompressor supports.
    cctx.set_parameter(zl.CParam.FormatVersion, zl.MAX_FORMAT_VERSION)

    # Compress a single input of serial data.
    return cctx.compress([zl.Input(zl.Type.Serial, data)])


def decompress(compressed: bytes) -> bytes:
    dctx = zl.DCtx()

    # Decompress the compressed data.
    # OpenZL compressed data can decompress to multiple outputs of several types.
    # Here, we expect to recieve a single output of serial data.
    outputs = dctx.decompress(compressed)
    if len(outputs) != 1 or outputs[0].type != zl.Type.Serial:
        raise RuntimeError("Only one serial output supported")

    # Convert the OpenZL output to bytes.
    # This costs a copy, but is simple. The output data can also be accessed as a NumPy array with zero copies.
    return outputs[0].content.as_bytes()


def test_compressor(compressor: zl.Compressor, dataset: List[bytes]) -> int:
    compressed_size = 0
    for data in dataset:
        compressed = compress(compressor, data)
        decompressed = decompress(compressed)
        if decompressed != data:
            raise RuntimeError("Corruption!")
        compressed_size += len(compressed)
    return compressed_size


compressor = build_zstd_compressor()
compressed_size = test_compressor(compressor, TEST_DATA_SET)
print(f"Zstd compressor ratio: {TEST_DATA_SET_SIZE / compressed_size:.2f}")

compressor = build_parsing_compressor()
compressed_size = test_compressor(compressor, TEST_DATA_SET)
print(f"Parsing compressor ratio: {TEST_DATA_SET_SIZE / compressed_size:.2f}")

In this trivial example, we get an immediate boost by parsing the input, and handling it as numeric data.

Zstd compressor ratio: 2.98
Parsing compressor ratio: 3.27