Skip to content

Quick Start

Installation

From PyPI:

pip install openzl

From source:

git clone https://github.com/facebook/openzl
cd openzl/py
pip install .

Running the example

This example is self-contained and can be run after installing OpenZL and NumPy. The full source can be found in examples/py/quick_start.py.

python3 examples/py/quick_start.py

Imports

import random
import struct
from typing import List

import numpy as np
import openzl.ext as zl

Generate test data

In this example, we'll work with very simple test data that is little-endian int64 data that is either low cardinality or sorted.

NUM_SAMPLES = 100
SAMPLE_SIZE = 100000


def generate_low_cardinality_sample(rng: random.Random) -> bytes:
    alphabet_size = rng.randint(1, 1000)
    alphabet = [rng.randint(0, 2**64 - 1) for _ in range(alphabet_size)]
    values = random.choices(alphabet, k=SAMPLE_SIZE)
    return b"".join(struct.pack("<Q", v) for v in values)


def generate_sorted_sample(rng: random.Random) -> bytes:
    MAX_INCREASE = 10000
    start = rng.randint(0, 2**63 - 1)
    values = [start]
    for _ in range(SAMPLE_SIZE - 1):
        values.append(values[-1] + random.randint(0, MAX_INCREASE))
    return b"".join(struct.pack("<Q", v) for v in values)


def generate_test_data() -> List[bytes]:
    rng = random.Random(0)
    samples = []
    for _ in range(NUM_SAMPLES):
        if rng.random() < 0.5:
            s = generate_low_cardinality_sample(rng)
        else:
            s = generate_sorted_sample(rng)
        samples.append(s)
    return samples


TEST_DATA_SET = generate_test_data()
TEST_DATA_SET_SIZE = sum(len(s) for s in TEST_DATA_SET)

Setting up a simple Compressor

The Compressor tells OpenZL how to compress the data it recieves. This is how OpenZL is specialized to build a format-specific compressor for a particular use case. For now, we will just tell OpenZL to use the generic compression backend graphs.Compress. Later on, after we get through the basics, we'll build more complex compressors.

def build_simple_compressor() -> zl.Compressor:
    compressor = zl.Compressor()

    # Set the compression graph to pass the input data to
    # OpenZL's generic compression backend.
    graph = zl.graphs.Compress()(compressor)
    compressor.select_starting_graph(graph)

    return compressor

Bytes compression & decompression

OpenZL's compression interface can accept more than just a single input of bytes, but for now we will keep it simple. The compression method needs to take in the compressor we've built, but the decompression doesn't, because we write the steps needed to decompress the data into the compressed representation.

def compress_bytes(compressor: zl.Compressor, data: bytes) -> bytes:
    cctx = zl.CCtx()

    # Tell the cctx to use the compressor we've built.
    cctx.ref_compressor(compressor)

    # Select the OpenZL format version to encode with.
    # This should be the latest version that the decompressor supports.
    cctx.set_parameter(zl.CParam.FormatVersion, zl.MAX_FORMAT_VERSION)

    # Compress a single input of serial data.
    return cctx.compress([zl.Input(zl.Type.Serial, data)])


def decompress_bytes(compressed: bytes) -> bytes:
    dctx = zl.DCtx()

    # Decompress the compressed data.
    # OpenZL compressed data can decompress to multiple outputs of several types.
    # Here, we expect to receive a single output of serial data.
    outputs = dctx.decompress(compressed)
    if len(outputs) != 1 or outputs[0].type != zl.Type.Serial:
        raise RuntimeError("Only one serial output supported")

    # Convert the OpenZL output to bytes.
    # This costs a copy, but is simple. The output data can also be accessed as a NumPy array with zero copies.
    return outputs[0].content.as_bytes()

Putting it all together

We can now build a function that tells us the compressed size when using the simple Compressor that we've built. The same testing function can be built for all future compressors.

def test_compressor(compressor: zl.Compressor, dataset: List[bytes]) -> int:
    compressed_size = 0
    for data in dataset:
        compressed = compress_bytes(compressor, data)
        decompressed = decompress_bytes(compressed)
        if decompressed != data:
            raise RuntimeError("Corruption!")
        compressed_size += len(compressed)
    return compressed_size


compressor = build_simple_compressor()
compressed_size = test_compressor(compressor, TEST_DATA_SET)
print(f"Simple bytes compressor ratio: {TEST_DATA_SET_SIZE / compressed_size:.2f}")
Simple bytes compressor: 4.14

Building a simple Int64 compressor

So far, we haven't told OpenZL anything about the data we're compressing. All it knows is that it is compressing bytes. However, OpenZL excels when it knows the format of the data it is compressing.

The simplest case is just telling OpenZL that the data is numeric data with a certain width. In this case, we'll compress little-endian int64 data. The only difference from build_simple_compressor() is that we added a node that converts from serial data to numeric data. The Compress graph accepts any input type and handles it appropiately.

def build_simple_int64_compressor() -> zl.Compressor:
    compressor = zl.Compressor()
    graph = zl.nodes.ConvertSerialToNumLE64()(
        compressor, successor=zl.graphs.Compress()
    )
    compressor.select_starting_graph(graph)
    return compressor


compressor = build_simple_int64_compressor()
compressed_size = test_compressor(compressor, TEST_DATA_SET)
print(f"Simple Int64 compressor ratio: {TEST_DATA_SET_SIZE / compressed_size:.2f}")
Simple Int64 compressor: 5.18

Expanding on the Int64 compressor

Just telling OpenZL the data is numeric will bring serious improvements. However, if you know more about your data, you can do even better.

def build_sorted_graph(compressor: zl.Compressor) -> zl.GraphID:
    """
    Run a delta on the input and pass it to the generic compression graph.
    """
    return zl.nodes.DeltaInt()(compressor, zl.graphs.Compress())


def build_low_cardinality_graph(compressor: zl.Compressor) -> zl.GraphID:
    """
    Tokenize the input data, sorting the alphabet in ascending order.
    Send the alphabet to the sorted graph, and send the indices to
    the generic compression graph.
    """
    tokenize = zl.nodes.Tokenize(type=zl.Type.Numeric, sort=True)
    return tokenize(
        compressor,
        alphabet=build_sorted_graph(compressor),
        indices=zl.graphs.Compress(),
    )


class QuickStartSelector(zl.Selector):
    """
    If the data is sorted, pass the input to the sorted_graph.
    Otherwise, pass the input to the low_cardinality_graph.
    """

    def __init__(self, sorted_graph: zl.GraphID, low_cardinality_graph: zl.GraphID):
        super().__init__()
        self._sorted_graph = sorted_graph
        self._low_cardinality_graph = low_cardinality_graph

    def selector_description(self) -> zl.SelectorDescription:
        return zl.SelectorDescription(
            name="quick_start_selector",
            input_type_mask=zl.TypeMask.Numeric,
        )

    def select(self, state: zl.SelectorState, input: zl.Input) -> zl.GraphID:
        data = input.content.as_nparray()
        is_sorted = np.all(data[:-1] <= data[1:])
        if is_sorted:
            return self._sorted_graph
        else:
            return self._low_cardinality_graph


def build_better_int64_compressor() -> zl.Compressor:
    compressor = zl.Compressor()

    # Build the two backend numeric compression graphs.
    sorted_graph = build_sorted_graph(compressor)
    low_cardinality_graph = build_low_cardinality_graph(compressor)

    # Set up the selector graph which inspects the input data and selects the
    # correct backend graph.
    graph = compressor.register_selector_graph(
        QuickStartSelector(sorted_graph, low_cardinality_graph)
    )

    # Add a conversion to int64 in front of the selector.
    graph = zl.nodes.ConvertSerialToNumLE64()(compressor, graph)

    compressor.select_starting_graph(graph)
    return compressor


compressor = build_better_int64_compressor()
compressed_size = test_compressor(compressor, TEST_DATA_SET)
print(f"Better Int64 compressor ratio: {TEST_DATA_SET_SIZE / compressed_size:.2f}")
Better Int64 compressor: 5.26

Compressing native-endian numeric data

OpenZL can also compress native-endian numeric data from a NumPy array, PyTorch tensor, a dlpack tensor, or any object which implements the Buffer Protocol. The compressor you build is exactly the same, it just starts with numeric data instead of serial data.

This example shows that OpenZL correctly interprets the NumPy array as uint32 values, since it is able to compress the data with the constant graph.

def compress_numpy_array(compressor: zl.Compressor, data: np.ndarray) -> bytes:
    """
    Compresses a single 1-D array of numeric data.
    """
    cctx = zl.CCtx()
    cctx.ref_compressor(compressor)
    cctx.set_parameter(zl.CParam.FormatVersion, zl.MAX_FORMAT_VERSION)

    # Compress a single input of numeric data.
    return cctx.compress([zl.Input(zl.Type.Numeric, data)])


def decompress_numpy_array(compressed: bytes) -> np.ndarray:
    dctx = zl.DCtx()
    outputs = dctx.decompress(compressed)
    if len(outputs) != 1 or outputs[0].type != zl.Type.Numeric:
        raise RuntimeError("Only one numeric output supported")

    # This is a zero-copy operation because it uses the buffer protocol
    return outputs[0].content.as_nparray()


def build_constant_compressor() -> zl.Compressor:
    compressor = zl.Compressor()
    graph = zl.graphs.Constant()(compressor)
    compressor.select_starting_graph(graph)
    return compressor


data = np.array([42] * 1000, dtype=np.uint32)
compressed = compress_numpy_array(build_constant_compressor(), data)
decompressed = decompress_numpy_array(compressed)
if not np.array_equal(data, decompressed):
    raise RuntimeError("Corruption")