Skip to content

Pipeline

The Pipeline class is the core builder for image and array processing pipelines.

Overview

from polars_cv import Pipeline

pipe = (
    Pipeline()
    .source("image_bytes")
    .resize(height=224, width=224)
    .grayscale()
)

API Reference

Pipeline

Modular pipeline builder for image and array operations.

A pipeline defines a sequence of operations that can be applied to a Polars expression using the .cv.pipe() accessor. The pipeline is executed when .sink() is called on the resulting expression.

All operations accept either literal values or Polars expressions. Expressions are resolved at execution time per row.

Example
>>> from polars_cv import Pipeline
>>> import polars as pl
>>>
>>> # Define a reusable pipeline (without a sink)
>>> preprocess = (
...     Pipeline()
...     .source("image_bytes")
...     .resize(height=224, width=224)
...     .grayscale()
... )
>>>
>>> # Apply to a DataFrame and choose the output format at the sink
>>> df = pl.DataFrame({"image": [img_bytes]})
>>> result = df.with_columns(
...     processed=pl.col("image").cv.pipe(preprocess).sink("numpy")
... )

Pipelines support typed domain tracking for transitions between images, geometry, and numeric results: - buffer: Image/array data (default) - contour: Polygon geometry - scalar: Single numeric values - vector: Multiple numeric values (e.g., bounding boxes)

__init__

__init__() -> None

Initialize an empty pipeline.

current_domain

current_domain() -> str

Get the current data domain of the pipeline.

Returns:

Type Description
str

Current domain: "buffer", "contour", "scalar", or "vector".

output_dtype

output_dtype() -> str

Get the expected output dtype of the pipeline.

This is the dtype of the buffer after all operations have been applied. Used for static type inference in list/array sinks. May be "auto" if the dtype has not yet been determined (e.g. an image source with no dtype-fixing operation applied).

Returns:

Type Description
str

Output dtype string: "u8", "f32", "f64", "auto", etc.

source

source(
    format: str = "image_bytes",
    *,
    dtype: str | None = None,
    width: IntOrExpr | None = None,
    height: IntOrExpr | None = None,
    shape: "LazyPipelineExpr | None" = None,
    fill_value: int = 255,
    background: int = 0,
    cloud_options: "CloudOptions | dict[str, Any] | None" = None,
    require_contiguous: bool = False,
    on_error: str = "raise",
) -> "Pipeline"

Define the input source format.

Image sources ("image_bytes" and "file_path") auto-detect the format and preserve native dtype. PNG/JPEG decode to u8, 16-bit PNG to u16, and TIFF may produce u8, u16, f32, or f64. All decoded images are always 3D [H, W, C].

Because the dtype is not known until runtime, it starts as "auto" in the contract system. Operations with deterministic output dtypes (e.g. normalize -> f32, threshold -> u8, cast) resolve it. If you sink to "list" or "array", the dtype must be known at planning time — either via an explicit dtype here, a cast() in the pipeline, or an operation that fixes the output dtype.

Parameters:

Name Type Description Default
format str

How to interpret input data. - "image_bytes": Decode PNG/JPEG/TIFF (auto-detect format and dtype; always 3D [H, W, C]) - "blob": VIEW protocol binary (self-describing) - "raw": Raw bytes (requires dtype) - "list": Polars nested List column - "array": Polars fixed-size Array column - "file_path": Read from path (local, s3://, gs://, az://, http://); decodes like "image_bytes" - "contour": Rasterize contour struct to binary mask

'image_bytes'
dtype str | None

For "raw": required data type of the raw bytes. For "image_bytes" / "file_path": asserts the expected dtype — at runtime, images with a different dtype are cast to this type (no-op if already matching). For "list" / "array": override for the inferred column element type.

None
width IntOrExpr | None

Output mask width for "contour" format.

None
height IntOrExpr | None

Output mask height for "contour" format.

None
shape 'LazyPipelineExpr | None'

Infer dimensions from another pipeline for "contour" format.

None
fill_value int

Value for pixels inside contour (default 255).

255
background int

Value for pixels outside contour (default 0).

0
cloud_options 'CloudOptions | dict[str, Any] | None'

Credentials for cloud storage (S3, GCS, Azure).

None
require_contiguous bool

For "list"/"array", whether to require rectangular data.

False
on_error str

Error handling strategy for source decoding. - "raise" (default): propagate decode errors (fails the entire batch). - "null": treat decode errors as null output for that row, allowing the rest of the batch to succeed.

'raise'
Example
>>> # Decode PNG/JPEG bytes from a column
>>> pipe = Pipeline().source("image_bytes").resize(height=224, width=224)
>>>
>>> # Read from file paths or URLs
>>> df = pl.DataFrame({"url": ["https://example.com/image.png"]})
>>> pipe = Pipeline().source("file_path").grayscale()
>>> expr = pl.col("url").cv.pipe(pipe).sink("numpy")
>>>
>>> # Assert dtype for list sink (cast if needed at runtime)
>>> pipe = Pipeline().source("image_bytes", dtype="f32").resize(height=224, width=224)
>>> expr = pl.col("img").cv.pipe(pipe).sink("list")
>>>
>>> # Gracefully handle corrupt images as null
>>> pipe = Pipeline().source("image_bytes", on_error="null").resize(height=224, width=224)
>>> expr = pl.col("img").cv.pipe(pipe).sink("png")

assert_shape

assert_shape(
    *,
    height: IntOrExpr | None = None,
    width: IntOrExpr | None = None,
    channels: IntOrExpr | None = None,
    batch: IntOrExpr | None = None,
) -> "Pipeline"

Provide shape hints for the pipeline.

Expressions are resolved per-row at execution time. Literal values help the planner optimize.

Parameters:

Name Type Description Default
height IntOrExpr | None

Image height (literal or expression).

None
width IntOrExpr | None

Image width (literal or expression).

None
channels IntOrExpr | None

Number of channels (literal or expression).

None
batch IntOrExpr | None

Batch size (literal or expression).

None

Returns:

Type Description
'Pipeline'

Self for chaining.

transpose

transpose(axes: list[int]) -> 'Pipeline'

Transpose dimensions.

Parameters:

Name Type Description Default
axes list[int]

New order of axes.

required

Returns:

Type Description
'Pipeline'

Self for chaining.

reshape

reshape(shape: list[int | Expr]) -> 'Pipeline'

Reshape array to new dimensions.

Parameters:

Name Type Description Default
shape list[int | Expr]

New shape (list of ints or expressions).

required

Returns:

Type Description
'Pipeline'

Self for chaining.

flip

flip(axes: list[int]) -> 'Pipeline'

Flip along specified axes.

Parameters:

Name Type Description Default
axes list[int]

Axes to flip.

required

Returns:

Type Description
'Pipeline'

Self for chaining.

flip_h

flip_h() -> 'Pipeline'

Flip horizontally (along width axis).

Returns:

Type Description
'Pipeline'

Self for chaining.

flip_v

flip_v() -> 'Pipeline'

Flip vertically (along height axis).

Returns:

Type Description
'Pipeline'

Self for chaining.

crop

crop(
    *,
    top: IntOrExpr = 0,
    left: IntOrExpr = 0,
    height: IntOrExpr | None = None,
    width: IntOrExpr | None = None,
) -> "Pipeline"

Extract a rectangular region.

Parameters:

Name Type Description Default
top IntOrExpr

Top offset.

0
left IntOrExpr

Left offset.

0
height IntOrExpr | None

Crop height (None = to end).

None
width IntOrExpr | None

Crop width (None = to end).

None

cast

cast(dtype: str) -> 'Pipeline'

Cast to a different data type.

Parameters:

Name Type Description Default
dtype str

Target data type (e.g., "f32", "u8").

required

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If dtype is invalid or domain is not buffer.

scale

scale(
    factor: FloatOrExpr, out_dtype: str | None = None
) -> "Pipeline"

Multiply all values by a factor.

Parameters:

Name Type Description Default
factor FloatOrExpr

Scale factor.

required
out_dtype str | None

Output type (promotes to f32 if None and input is int).

None

Raises:

Type Description
ValueError

If domain is not buffer.

normalize

normalize(
    method: str = "minmax",
    mean: list[float] | None = None,
    std: list[float] | None = None,
    out_dtype: str | None = None,
) -> "Pipeline"

Normalize values to a standard range.

Parameters:

Name Type Description Default
method str

Normalization method. One of: - "minmax": Scale values to [0, 1] range using per-element min/max. Output dtype is f32 by default. - "zscore": Standardize to mean=0, std=1 using per-element statistics. Output dtype is f32 by default. - "preset": Apply ImageNet-style channel-wise normalization using provided mean and std values. Each channel is normalized as (x - mean[c]) / std[c].

'minmax'
mean list[float] | None

Per-channel mean values. Required when method="preset". Common preset: [0.485, 0.456, 0.406] (ImageNet).

None
std list[float] | None

Per-channel standard deviation values. Required when method="preset". Common preset: [0.229, 0.224, 0.225] (ImageNet).

None
out_dtype str | None

Output type (default "f32").

None

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If method is invalid or preset is missing mean/std.

Example

Pipeline().source().normalize(method="minmax") Pipeline().source().normalize( ... method="preset", ... mean=[0.485, 0.456, 0.406], ... std=[0.229, 0.224, 0.225], ... )

clamp

clamp(
    min_val: FloatOrExpr,
    max_val: FloatOrExpr,
    out_dtype: str | None = None,
) -> "Pipeline"

Clamp values to a range.

This operation accepts any numeric input dtype and automatically handles type promotion. Integers are promoted to float32; floats are preserved.

Parameters:

Name Type Description Default
min_val FloatOrExpr

Minimum value (literal or expression).

required
max_val FloatOrExpr

Maximum value (literal or expression).

required
out_dtype str | None

Output dtype. Options: - None: Promote integers to f32, preserve floats - "f32": Output float32 - "f64": Output float64 - "preserve": Keep input dtype (floats preserved, integers -> f32)

None

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If domain is not buffer.

relu

relu() -> 'Pipeline'

Apply ReLU activation (max(0, x)).

All negative values are set to zero, positive values are unchanged. Works on any numeric dtype.

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If domain is not buffer.

Example
>>> pipe = Pipeline().source("image_bytes").relu()

channel_select

channel_select(*, index: IntOrExpr) -> 'Pipeline'

Extract a single channel from a multi-channel image.

Produces a 2D [H, W] buffer from a [H, W, C] input.

Domain: buffer → buffer

Parameters:

Name Type Description Default
index IntOrExpr

Channel index to extract (0-based). Accepts a Polars expression for per-row dynamic selection.

required

Returns:

Type Description
'Pipeline'

Self for chaining.

Example
>>> pipe = Pipeline().source("image_bytes").channel_select(index=0)  # Red channel

channel_swap

channel_swap(*, order: list[int]) -> 'Pipeline'

Reorder channels in a multi-channel image.

Domain: buffer → buffer

Parameters:

Name Type Description Default
order list[int]

New channel ordering, e.g. [2, 1, 0] for RGB-to-BGR.

required

Returns:

Type Description
'Pipeline'

Self for chaining.

Example
>>> pipe = Pipeline().source("image_bytes").channel_swap(order=[2, 1, 0])

adjust_contrast

adjust_contrast(*, factor: FloatOrExpr) -> 'Pipeline'

Adjust image contrast.

Scales pixel deviation from the mean: (pixel - mean) * factor + mean.

Domain: buffer → buffer

Parameters:

Name Type Description Default
factor FloatOrExpr

Contrast factor. 1.0 = no change, >1 = more contrast, <1 = less.

required

Returns:

Type Description
'Pipeline'

Self for chaining.

Example
>>> pipe = Pipeline().source("image_bytes").adjust_contrast(factor=1.5)

adjust_gamma

adjust_gamma(*, gamma: FloatOrExpr) -> 'Pipeline'

Apply gamma (power-law) correction.

Normalizes to [0,1], applies pixel^gamma, then denormalizes.

Domain: buffer → buffer

Parameters:

Name Type Description Default
gamma FloatOrExpr

Gamma value. <1 = brighter, >1 = darker, 1.0 = no change.

required

Returns:

Type Description
'Pipeline'

Self for chaining.

Example
>>> pipe = Pipeline().source("image_bytes").adjust_gamma(gamma=0.5)

adjust_brightness

adjust_brightness(*, factor: FloatOrExpr) -> 'Pipeline'

Adjust image brightness by scaling pixel values.

Convenience method equivalent to .scale(factor).clamp(min_val=0, max_val=255).

Domain: buffer → buffer

Parameters:

Name Type Description Default
factor FloatOrExpr

Brightness factor. 1.0 = no change, >1 = brighter, <1 = darker.

required

Returns:

Type Description
'Pipeline'

Self for chaining.

Example
>>> pipe = Pipeline().source("image_bytes").adjust_brightness(factor=1.2)

invert

invert() -> 'Pipeline'

Invert pixel values.

For u8: 255 - pixel. For float [0,1]: 1.0 - pixel.

Domain: buffer → buffer

Returns:

Type Description
'Pipeline'

Self for chaining.

Example
>>> pipe = Pipeline().source("image_bytes").invert()

cvt_color

cvt_color(from_space: str, to_space: str) -> 'Pipeline'

Convert between color spaces.

Domain: buffer → buffer

Parameters:

Name Type Description Default
from_space str

Source color space (rgb, bgr, hsv, lab, ycbcr, gray).

required
to_space str

Target color space (rgb, bgr, hsv, lab, ycbcr, gray).

required

Returns:

Type Description
'Pipeline'

Self for chaining.

Example
>>> pipe = Pipeline().source("image_bytes").cvt_color("rgb", "hsv")

to_hsv

to_hsv() -> 'Pipeline'

Convert from RGB to HSV color space.

Returns:

Type Description
'Pipeline'

Self for chaining.

to_lab

to_lab() -> 'Pipeline'

Convert from RGB to CIE LAB color space.

Output dtype is promoted to f32 (L=[0,100], a/b~[-128,127]).

Returns:

Type Description
'Pipeline'

Self for chaining.

to_bgr

to_bgr() -> 'Pipeline'

Convert from RGB to BGR channel order.

Returns:

Type Description
'Pipeline'

Self for chaining.

to_ycbcr

to_ycbcr() -> 'Pipeline'

Convert from RGB to YCbCr color space.

Returns:

Type Description
'Pipeline'

Self for chaining.

convolve2d

convolve2d(
    kernel: list[float],
    ksize: IntOrExpr,
    *,
    normalize: bool = False,
    border: str = "replicate",
) -> "Pipeline"

Apply generic 2D convolution with an arbitrary kernel.

Domain: buffer → buffer

Parameters:

Name Type Description Default
kernel list[float]

Flattened kernel values (row-major, ksize × ksize).

required
ksize IntOrExpr

Kernel dimension (must be odd; kernel is ksize × ksize). Accepts a Polars expression for per-row dynamic values.

required
normalize bool

If True, divide output by the sum of absolute kernel values.

False
border str

Border handling mode ("replicate", "zero", "reflect").

'replicate'

Returns:

Type Description
'Pipeline'

Self for chaining.

Example
>>> edge = Pipeline().source("image_bytes").convolve2d(
...     kernel=[-1, -1, -1, -1, 8, -1, -1, -1, -1],
...     ksize=3,
... )

sobel

sobel(*, axis: str = 'x', ksize: int = 3) -> 'Pipeline'

Sobel gradient operator.

Convenience method that delegates to :meth:convolve2d with standard Sobel kernels.

Domain: buffer → buffer

Parameters:

Name Type Description Default
axis str

Gradient direction — "x" (horizontal) or "y" (vertical).

'x'
ksize int

Kernel size (currently only 3 is supported).

3

Returns:

Type Description
'Pipeline'

Self for chaining.

Example
>>> gx = Pipeline().source("image_bytes").grayscale().sobel(axis="x")

laplacian

laplacian(*, ksize: int = 3) -> 'Pipeline'

Laplacian second-derivative operator.

Convenience method that delegates to :meth:convolve2d with a standard Laplacian kernel.

Domain: buffer → buffer

Parameters:

Name Type Description Default
ksize int

Kernel size (currently only 3 is supported).

3

Returns:

Type Description
'Pipeline'

Self for chaining.

Example
>>> lap = Pipeline().source("image_bytes").grayscale().laplacian()

sharpen

sharpen(*, strength: float = 1.0) -> 'Pipeline'

Sharpen using an unsharp-mask-style kernel.

The kernel sum is 1 (brightness-preserving) with strength controlling how aggressively edges are enhanced. strength=0 produces the identity; higher values increase edge emphasis.

Domain: buffer → buffer

Parameters:

Name Type Description Default
strength float

Sharpening strength (default 1.0).

1.0

Returns:

Type Description
'Pipeline'

Self for chaining.

Example
>>> sharp = Pipeline().source("image_bytes").sharpen(strength=1.5)

canny

canny(
    *,
    low_threshold: FloatOrExpr = 50.0,
    high_threshold: FloatOrExpr = 150.0,
) -> "Pipeline"

Canny edge detection.

Applies Gaussian blur, computes Sobel gradients, performs non-maximum suppression, and applies double-threshold hysteresis. Output is a U8 binary edge map (0 or 255).

Domain: buffer → buffer

Parameters:

Name Type Description Default
low_threshold FloatOrExpr

Lower hysteresis threshold.

50.0
high_threshold FloatOrExpr

Upper hysteresis threshold.

150.0

Returns:

Type Description
'Pipeline'

Self for chaining.

Example
>>> edges = Pipeline().source("image_bytes").canny(low_threshold=50, high_threshold=150)

erode

erode(
    *, ksize: IntOrExpr = 3, iterations: IntOrExpr = 1
) -> "Pipeline"

Morphological erosion (local minimum filter).

Shrinks bright regions / grows dark regions by computing the minimum value in a ksize × ksize rectangular neighborhood. Requires single-channel input (e.g., after .grayscale() or .threshold()).

Domain: buffer → buffer

Parameters:

Name Type Description Default
ksize IntOrExpr

Size of the square structuring element. Must be odd and >= 1. Accepts a Polars expression for per-row dynamic values.

3
iterations IntOrExpr

Number of times the erosion is applied. Accepts a Polars expression for per-row dynamic values.

1

Returns:

Type Description
'Pipeline'

New Pipeline with erosion applied.

Example
>>> mask = Pipeline().source("image_bytes").grayscale().threshold(128).erode(ksize=3)

dilate

dilate(
    *, ksize: IntOrExpr = 3, iterations: IntOrExpr = 1
) -> "Pipeline"

Morphological dilation (local maximum filter).

Grows bright regions / shrinks dark regions by computing the maximum value in a ksize × ksize rectangular neighborhood. Requires single-channel input (e.g., after .grayscale() or .threshold()).

Domain: buffer → buffer

Parameters:

Name Type Description Default
ksize IntOrExpr

Size of the square structuring element. Must be odd and >= 1. Accepts a Polars expression for per-row dynamic values.

3
iterations IntOrExpr

Number of times the dilation is applied. Accepts a Polars expression for per-row dynamic values.

1

Returns:

Type Description
'Pipeline'

New Pipeline with dilation applied.

Example
>>> mask = Pipeline().source("image_bytes").grayscale().threshold(128).dilate(ksize=3)

morphology_open

morphology_open(*, ksize: IntOrExpr = 3) -> 'Pipeline'

Morphological opening (erode then dilate).

Removes small bright spots while preserving larger structures. Equivalent to .erode(ksize=ksize).dilate(ksize=ksize).

Domain: buffer → buffer

Parameters:

Name Type Description Default
ksize IntOrExpr

Size of the square structuring element. Must be odd and >= 1. Accepts a Polars expression for per-row dynamic values.

3

Returns:

Type Description
'Pipeline'

New Pipeline with opening applied.

Example
>>> cleaned = Pipeline().source("image_bytes").grayscale().threshold(128).morphology_open(ksize=3)

morphology_close

morphology_close(*, ksize: IntOrExpr = 3) -> 'Pipeline'

Morphological closing (dilate then erode).

Fills small dark holes while preserving larger structures. Equivalent to .dilate(ksize=ksize).erode(ksize=ksize).

Domain: buffer → buffer

Parameters:

Name Type Description Default
ksize IntOrExpr

Size of the square structuring element. Must be odd and >= 1. Accepts a Polars expression for per-row dynamic values.

3

Returns:

Type Description
'Pipeline'

New Pipeline with closing applied.

Example
>>> filled = Pipeline().source("image_bytes").grayscale().threshold(128).morphology_close(ksize=3)

morphology_gradient

morphology_gradient(*, ksize: IntOrExpr = 3) -> 'Pipeline'

Morphological gradient (dilate - erode).

Produces an edge outline by computing the difference between dilation and erosion on the same input. Requires single-channel input.

Domain: buffer → buffer

Parameters:

Name Type Description Default
ksize IntOrExpr

Size of the square structuring element. Must be odd and >= 1. Accepts a Polars expression for per-row dynamic values.

3

Returns:

Type Description
'Pipeline'

New Pipeline with morphological gradient applied.

Example
>>> edges = Pipeline().source("image_bytes").grayscale().threshold(128).morphology_gradient(ksize=3)

equalize_histogram

equalize_histogram() -> 'Pipeline'

Apply histogram equalization for contrast enhancement.

Computes the cumulative histogram and maps each pixel through the normalized CDF. Operates per-channel on multi-channel images. Output is U8.

Domain: buffer → buffer

Returns:

Type Description
'Pipeline'

Self for chaining.

Example
>>> eq = Pipeline().source("image_bytes").grayscale().equalize_histogram()

resize

resize(
    *,
    height: IntOrExpr,
    width: IntOrExpr,
    filter: str = "lanczos3",
) -> "Pipeline"

Resize image to specified dimensions.

Parameters:

Name Type Description Default
height IntOrExpr

Target height.

required
width IntOrExpr

Target width.

required
filter str

Interpolation: "nearest", "bilinear", "lanczos3" (default).

'lanczos3'
Example

Pipeline().source("image_bytes").resize(height=224, width=224)

resize_scale

resize_scale(
    *,
    scale: FloatOrExpr | None = None,
    scale_x: FloatOrExpr | None = None,
    scale_y: FloatOrExpr | None = None,
    filter: str = "lanczos3",
) -> "Pipeline"

Resize image by scale factor.

Target dimensions are computed at runtime as: - new_width = input_width * scale_x - new_height = input_height * scale_y

Domain: buffer → buffer

Parameters:

Name Type Description Default
scale FloatOrExpr | None

Uniform scale factor (applies to both x and y).

None
scale_x FloatOrExpr | None

X (width) scale factor. If None, uses scale.

None
scale_y FloatOrExpr | None

Y (height) scale factor. If None, uses scale.

None
filter str

Resize filter ("nearest", "bilinear", "lanczos3").

'lanczos3'

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If neither scale nor scale_x/scale_y specified.

ValueError

If filter is invalid or current domain is not buffer.

Example
>>> # Uniform 50% downscale
>>> pipe = Pipeline().source("image_bytes").resize_scale(scale=0.5)
>>>
>>> # Non-uniform: half width, double height
>>> pipe = Pipeline().source("image_bytes").resize_scale(scale_x=0.5, scale_y=2.0)
>>>
>>> # Dynamic scale from column
>>> pipe = Pipeline().source("image_bytes").resize_scale(scale=pl.col("zoom"))

resize_to_height

resize_to_height(
    height: IntOrExpr, *, filter: str = "lanczos3"
) -> "Pipeline"

Resize image to target height, preserving aspect ratio.

Width is computed at runtime as: new_width = height * (input_width / input_height)

Domain: buffer → buffer

Parameters:

Name Type Description Default
height IntOrExpr

Target height (literal or expression).

required
filter str

Resize filter ("nearest", "bilinear", "lanczos3").

'lanczos3'

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If filter is invalid or current domain is not buffer.

Example
>>> pipe = Pipeline().source("image_bytes").resize_to_height(224)

resize_to_width

resize_to_width(
    width: IntOrExpr, *, filter: str = "lanczos3"
) -> "Pipeline"

Resize image to target width, preserving aspect ratio.

Height is computed at runtime as: new_height = width * (input_height / input_width)

Domain: buffer → buffer

Parameters:

Name Type Description Default
width IntOrExpr

Target width (literal or expression).

required
filter str

Resize filter ("nearest", "bilinear", "lanczos3").

'lanczos3'

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If filter is invalid or current domain is not buffer.

Example
>>> pipe = Pipeline().source("image_bytes").resize_to_width(224)

resize_max

resize_max(
    max_size: IntOrExpr, *, filter: str = "lanczos3"
) -> "Pipeline"

Resize image so the maximum dimension equals target, preserving aspect ratio.

If input is 200x100 and max_size=50, output is 50x25 (width was max, now 50).

Domain: buffer → buffer

Parameters:

Name Type Description Default
max_size IntOrExpr

Target for the maximum dimension (literal or expression).

required
filter str

Resize filter ("nearest", "bilinear", "lanczos3").

'lanczos3'

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If filter is invalid or current domain is not buffer.

Example
>>> # Ensure no dimension exceeds 224
>>> pipe = Pipeline().source("image_bytes").resize_max(224)

resize_min

resize_min(
    min_size: IntOrExpr, *, filter: str = "lanczos3"
) -> "Pipeline"

Resize image so the minimum dimension equals target, preserving aspect ratio.

If input is 200x100 and min_size=50, output is 100x50 (height was min, now 50).

Domain: buffer → buffer

Parameters:

Name Type Description Default
min_size IntOrExpr

Target for the minimum dimension (literal or expression).

required
filter str

Resize filter ("nearest", "bilinear", "lanczos3").

'lanczos3'

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If filter is invalid or current domain is not buffer.

Example
>>> # Ensure min dimension is at least 224
>>> pipe = Pipeline().source("image_bytes").resize_min(224)

pad

pad(
    *,
    top: IntOrExpr = 0,
    bottom: IntOrExpr = 0,
    left: IntOrExpr = 0,
    right: IntOrExpr = 0,
    value: FloatOrExpr = 0.0,
    mode: str = "constant",
) -> "Pipeline"

Add padding to the image.

Domain: buffer → buffer

Parameters:

Name Type Description Default
top IntOrExpr

Padding on top edge.

0
bottom IntOrExpr

Padding on bottom edge.

0
left IntOrExpr

Padding on left edge.

0
right IntOrExpr

Padding on right edge.

0
value FloatOrExpr

Fill value for "constant" mode (default 0). Accepts a Polars expression for per-row dynamic values.

0.0
mode str

Padding mode - "constant", "edge", "reflect", "symmetric".

'constant'

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If mode is invalid or current domain is not buffer.

Example
>>> pipe = Pipeline().source("image_bytes").pad(top=10, bottom=10)
>>> pipe = Pipeline().source("image_bytes").pad(left=20, right=20, value=128)

pad_to_size

pad_to_size(
    *,
    height: IntOrExpr,
    width: IntOrExpr,
    position: str = "center",
    value: FloatOrExpr = 0.0,
) -> "Pipeline"

Pad image to exact target size.

Dimensions are computed at runtime. If image is larger than target, it will NOT be cropped - use resize first if needed.

Domain: buffer → buffer

Parameters:

Name Type Description Default
height IntOrExpr

Target height.

required
width IntOrExpr

Target width.

required
position str

Where to place original content: - "center": Center content in padded area (default) - "top-left": Place at top-left corner - "bottom-right": Place at bottom-right corner

'center'
value FloatOrExpr

Fill value for padding (default 0). Accepts a Polars expression for per-row dynamic values.

0.0

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If position is invalid or current domain is not buffer.

Example
>>> # Pad 50x100 image to 100x200, centered
>>> pipe = Pipeline().source("image_bytes").pad_to_size(height=100, width=200)

letterbox

letterbox(
    *,
    height: IntOrExpr,
    width: IntOrExpr,
    value: FloatOrExpr = 0.0,
) -> "Pipeline"

Resize image maintaining aspect ratio and pad to exact target size.

This is a composed operation that: 1. Resizes the image so it fits within the target dimensions 2. Pads to reach exact target size with centered positioning

Domain: buffer → buffer

Parameters:

Name Type Description Default
height IntOrExpr

Target height.

required
width IntOrExpr

Target width.

required
value FloatOrExpr

Fill value for padding (default 0, typically black). Accepts a Polars expression for per-row dynamic values.

0.0

Returns:

Type Description
'Pipeline'

Self for chaining.

Example
>>> # Letterbox any image to 224x224 for VLM input
>>> pipe = Pipeline().source("image_bytes").letterbox(height=224, width=224)

grayscale

grayscale() -> 'Pipeline'

Convert to grayscale.

Uses standard luminance formula: 0.299R + 0.587G + 0.114B.

threshold

threshold(value: 'IntOrExpr | FloatOrExpr') -> 'Pipeline'

Apply binary threshold.

Each element is compared against the threshold; the output is a U8 binary mask (255 if element > value, 0 otherwise).

The threshold value range depends on the input dtype: - For u8 input: typically 0-255. - For float input (e.g., normalized [0, 1]): use a float value like 0.5.

Parameters:

Name Type Description Default
value 'IntOrExpr | FloatOrExpr'

Threshold value (int or float, or Polars expression).

required

blur

blur(sigma: FloatOrExpr) -> 'Pipeline'

Apply Gaussian blur.

Parameters:

Name Type Description Default
sigma FloatOrExpr

Standard deviation for Gaussian kernel.

required

rotate

rotate(
    angle: FloatOrExpr,
    *,
    expand: bool = False,
    interpolation: str = "bilinear",
    border_value: float = 0.0,
) -> "Pipeline"

Rotate image by specified angle.

For angles of 90, 180, or 270 degrees, this uses zero-copy view operations (interpolation and border_value are ignored). For arbitrary angles, the rotation is performed via an affine transformation using the specified interpolation and border value.

This is a convenience wrapper around the affine transform family. For more control (e.g., combined rotation + scale, or explicit output sizing), use :meth:rotate_and_scale or :meth:warp_affine.

Domain: buffer -> buffer

Parameters:

Name Type Description Default
angle FloatOrExpr

Rotation angle in degrees (positive = clockwise). Can be a literal float or Polars expression.

required
expand bool

If True, expand output dimensions to fit rotated image. If False (default), keep original dimensions (corners may be cropped).

False
interpolation str

Interpolation method for arbitrary angles -- "bilinear" (default) or "nearest". Ignored for 90/180/270 degree rotations.

'bilinear'
border_value float

Fill value for out-of-bounds pixels (default 0). Ignored for 90/180/270 degree rotations.

0.0

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If current domain is not buffer.

Example
>>> # Zero-copy 90-degree rotation
>>> pipe = Pipeline().source("image_bytes").rotate(90)
>>>
>>> # Arbitrary angle with expansion
>>> pipe = Pipeline().source("image_bytes").rotate(45, expand=True)
>>>
>>> # Dynamic angle from column
>>> pipe = Pipeline().source("image_bytes").rotate(pl.col("angle"))
>>>
>>> # Nearest-neighbor interpolation for pixel-art
>>> pipe = Pipeline().source("image_bytes").rotate(30, interpolation="nearest")

warp_affine

warp_affine(
    matrix: list[float],
    output_size: tuple[IntOrExpr, IntOrExpr],
    *,
    interpolation: str = "bilinear",
    border_value: float = 0.0,
) -> "Pipeline"

Apply a 2x3 affine transformation matrix.

The matrix [a, b, tx, c, d, ty] is a forward mapping from source to destination (same convention as OpenCV warpAffine)::

x_dst = a * x_src + b * y_src + tx
y_dst = c * x_src + d * y_src + ty

The kernel inverts this matrix internally for interpolation.

Domain: buffer → buffer

Parameters:

Name Type Description Default
matrix list[float]

Six-element list representing the 2x3 affine matrix [a, b, tx, c, d, ty] (forward mapping).

required
output_size tuple[IntOrExpr, IntOrExpr]

(height, width) of the output image. Each element accepts a Polars expression for per-row dynamic values.

required
interpolation str

Interpolation method -- "bilinear" (default) or "nearest".

'bilinear'
border_value float

Pixel value for out-of-bounds regions (default 0).

0.0

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If matrix does not have 6 elements or domain is wrong.

Example
>>> # Translate image by (50, 30)
>>> pipe = Pipeline().source("image_bytes").warp_affine(
...     matrix=[1.0, 0.0, 50.0, 0.0, 1.0, 30.0],
...     output_size=(224, 224),
... )

shear

shear(
    *,
    sx: float = 0.0,
    sy: float = 0.0,
    output_size: tuple[int, int] | None = None,
) -> "Pipeline"

Apply a shear transformation.

Convenience wrapper that builds a shear matrix and delegates to :meth:warp_affine.

Domain: buffer → buffer

Parameters:

Name Type Description Default
sx float

Horizontal shear factor.

0.0
sy float

Vertical shear factor.

0.0
output_size tuple[int, int] | None

(height, width) of the output. Required (auto-sizing not yet implemented).

None

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If output_size is not provided.

Example
>>> pipe = Pipeline().source("image_bytes").shear(sx=0.2, output_size=(100, 100))

rotate_and_scale

rotate_and_scale(
    *,
    angle: float,
    scale: float = 1.0,
    center: tuple[float, float] | None = None,
    output_size: tuple[int, int] | None = None,
) -> "Pipeline"

Combined rotation and scaling around a center point.

Convenience wrapper that builds a rotation+scale matrix and delegates to :meth:warp_affine.

Domain: buffer → buffer

Parameters:

Name Type Description Default
angle float

Rotation angle in degrees (positive = clockwise).

required
scale float

Scale factor (default 1.0).

1.0
center tuple[float, float] | None

(cx, cy) center of rotation. Required (auto-compute not yet implemented).

None
output_size tuple[int, int] | None

(height, width) of the output. Required (auto-sizing not yet implemented).

None

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If center or output_size is not provided.

Example
>>> pipe = Pipeline().source("image_bytes").rotate_and_scale(
...     angle=45.0, scale=1.2, center=(112, 112), output_size=(224, 224)
... )

perceptual_hash

perceptual_hash(
    algorithm: HashAlgorithm
    | str = HashAlgorithm.PERCEPTUAL,
    hash_size: int = 64,
) -> "Pipeline"

Compute a perceptual hash fingerprint.

Parameters:

Name Type Description Default
algorithm HashAlgorithm | str

"perceptual" (pHash), "average" (aHash), "difference" (dHash).

PERCEPTUAL
hash_size int

Number of bits in the hash (must be power of 2).

64
Example

Pipeline().source("image_bytes").perceptual_hash()

rasterize

rasterize(
    *,
    width: IntOrExpr | None = None,
    height: IntOrExpr | None = None,
    shape: "LazyPipelineExpr | None" = None,
    fill_value: IntOrExpr = 255,
    background: IntOrExpr = 0,
    anti_alias: bool = False,
) -> "Pipeline"

Rasterize contour to a binary mask.

Parameters:

Name Type Description Default
width IntOrExpr | None

Mask width.

None
height IntOrExpr | None

Mask height.

None
shape 'LazyPipelineExpr | None'

Match dimensions from another pipeline.

None
fill_value IntOrExpr

Inside value (default 255). Accepts a Polars expression for per-row dynamic values.

255
background IntOrExpr

Outside value (default 0). Accepts a Polars expression for per-row dynamic values.

0

Domain transition: contour → buffer

extract_contours

extract_contours(
    *,
    mode: str = "external",
    method: str = "simple",
    min_area: float | None = None,
) -> "Pipeline"

Extract contours from binary mask.

Parameters:

Name Type Description Default
mode str

"external" (outer only), "tree" (full hierarchy), "all".

'external'
method str

"simple" (remove redundant), "none" (all points), "approx".

'simple'
min_area float | None

Filter small contours.

None

Domain transition: buffer → contour

reduce_sum

reduce_sum() -> 'Pipeline'

Sum all elements in the buffer.

Domain transition: buffer → scalar

reduce_percentile

reduce_percentile(q: FloatOrExpr) -> 'Pipeline'

Compute the q-th percentile of all values.

Uses linear interpolation matching numpy.percentile default behavior.

Parameters:

Name Type Description Default
q FloatOrExpr

Percentile to compute, in [0, 100]. Accepts a Polars expression for per-row dynamic values.

required

Domain transition: buffer -> scalar

reduce_popcount

reduce_popcount() -> 'Pipeline'

Count set bits (1s) in the buffer.

Domain transition: buffer → scalar

reduce_max

reduce_max(axis: int | None = None) -> 'Pipeline'

Reduce buffer by computing the maximum value.

When axis is None, computes the global maximum across all elements, returning a single scalar. When axis is specified, reduces along that axis, returning a buffer with one fewer dimension.

Domain transition
  • axis=None: buffer → scalar
  • axis=N: buffer → buffer (reduced shape)

Parameters:

Name Type Description Default
axis int | None

Axis to reduce along. None for global reduction.

None

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If current domain is not buffer.

Example
>>> # Global maximum
>>> pipe = Pipeline().source("image_bytes").grayscale().reduce_max()
>>> df.with_columns(max_val=pl.col("image").cv.pipe(pipe).sink("native"))
>>>
>>> # Maximum along height axis (returns 1D array per column)
>>> pipe = Pipeline().source("image_bytes").reduce_max(axis=0)

reduce_min

reduce_min(axis: int | None = None) -> 'Pipeline'

Reduce buffer by computing the minimum value.

When axis is None, computes the global minimum across all elements, returning a single scalar. When axis is specified, reduces along that axis, returning a buffer with one fewer dimension.

Domain transition
  • axis=None: buffer → scalar
  • axis=N: buffer → buffer (reduced shape)

Parameters:

Name Type Description Default
axis int | None

Axis to reduce along. None for global reduction.

None

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If current domain is not buffer.

Example
>>> # Global minimum
>>> pipe = Pipeline().source("image_bytes").grayscale().reduce_min()
>>> df.with_columns(min_val=pl.col("image").cv.pipe(pipe).sink("native"))
>>>
>>> # Minimum along width axis
>>> pipe = Pipeline().source("image_bytes").reduce_min(axis=1)

reduce_mean

reduce_mean(axis: int | None = None) -> 'Pipeline'

Compute arithmetic mean.

Parameters:

Name Type Description Default
axis int | None

Axis to reduce along. If None, computes global mean.

None
Domain transition
  • axis=None: buffer → scalar
  • axis=N: buffer → buffer (reduced shape)

reduce_std

reduce_std(
    axis: int | None = None, ddof: IntOrExpr = 0
) -> "Pipeline"

Reduce buffer by computing the standard deviation.

When axis is None, computes the global standard deviation across all elements, returning a single scalar. When axis is specified, reduces along that axis, returning a buffer with one fewer dimension.

Domain transition
  • axis=None: buffer -> scalar
  • axis=N: buffer -> buffer (reduced shape)

Parameters:

Name Type Description Default
axis int | None

Axis to reduce along. None for global reduction.

None
ddof IntOrExpr

Delta degrees of freedom. 0 for population std (default), 1 for sample std. Accepts a Polars expression for per-row dynamic values.

0

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If current domain is not buffer.

Example
>>> # Global standard deviation
>>> pipe = Pipeline().source("image_bytes").grayscale().reduce_std()
>>> df.with_columns(std=pl.col("image").cv.pipe(pipe).sink("native"))
>>>
>>> # Sample std (ddof=1)
>>> pipe = Pipeline().source("image_bytes").reduce_std(ddof=1)

reduce_argmax

reduce_argmax(axis: int) -> 'Pipeline'

Reduce buffer by finding the index of the maximum value along an axis.

Unlike other reductions, argmax always requires an axis since the global argmax would be ambiguous for multi-dimensional arrays.

Domain transition: buffer → buffer (reduced shape, i64 dtype)

Parameters:

Name Type Description Default
axis int

Axis along which to find the maximum index.

required

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If current domain is not buffer.

Example
>>> # Find column with max value per row
>>> pipe = Pipeline().source("image_bytes").grayscale().reduce_argmax(axis=1)
>>> df.with_columns(max_col=pl.col("image").cv.pipe(pipe).sink("list"))

reduce_argmin

reduce_argmin(axis: int) -> 'Pipeline'

Reduce buffer by finding the index of the minimum value along an axis.

Unlike other reductions, argmin always requires an axis since the global argmin would be ambiguous for multi-dimensional arrays.

Domain transition: buffer → buffer (reduced shape, i64 dtype)

Parameters:

Name Type Description Default
axis int

Axis along which to find the minimum index.

required

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If current domain is not buffer.

Example
>>> # Find column with min value per row
>>> pipe = Pipeline().source("image_bytes").grayscale().reduce_argmin(axis=1)
>>> df.with_columns(min_col=pl.col("image").cv.pipe(pipe).sink("list"))

extract_shape

extract_shape() -> 'Pipeline'

Extract buffer shape as a struct {height, width, channels}.

Domain transition: buffer → vector

label_reduce

label_reduce(
    *,
    contours: Expr,
    reduction: str = "max",
    region_mode: str = "interior",
) -> "Pipeline"

Score contour regions against the current buffer values.

This is the buffer-space variant of label reduction. It accepts contours via a Polars expression and returns one score per contour.

Domain transition: buffer -> vector

Parameters:

Name Type Description Default
contours Expr

Contour-set expression (List[Contour]) to score.

required
reduction str

Reduction over contour region values ("max", "mean", "sum").

'max'
region_mode str

Region selection mode. "interior" — only pixels strictly inside the contour polygon. "boundary" — interior pixels plus pixels on the contour boundary (avoids zero-score artifacts for sub-pixel contours). "bbox" — all pixels within the bounding box.

'interior'

Returns:

Type Description
'Pipeline'

New pipeline with label reduction appended.

Raises:

Type Description
ValueError

If current domain is not buffer or args are invalid.

TypeError

If contours is not a Polars expression.

histogram

histogram(
    bins: IntOrExpr | list[float] = 256,
    range: tuple[float, float] | None = None,
    closed: str = "left",
    output: str = "buckets",
) -> "Pipeline"

Compute pixel value histogram.

Parameters:

Name Type Description Default
bins IntOrExpr | list[float]

Number of bins (default 256), a Polars expression for per-row dynamic bin count, or an explicit list of bin edges.

256
range tuple[float, float] | None

(min, max) tuple. Auto-detected if None.

None
closed str

"left" or "right" interval inclusiveness (default "left").

'left'
output str

"buckets" (list of structs), "counts" (bin counts), "normalized" (sum to 1.0), "quantized" (pixel indices), "edges" (bin edges).

'buckets'
Example

Pipeline().source("image_bytes").grayscale().histogram(bins=8)

area

area(*, signed: bool = False) -> 'Pipeline'

Compute the area of the contour using the Shoelace formula.

Domain transition: contour → scalar

Parameters:

Name Type Description Default
signed bool

If True, return signed area (negative for CW winding).

False

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If current domain is not contour.

perimeter

perimeter() -> 'Pipeline'

Compute the perimeter (arc length) of the contour.

Domain transition: contour → scalar

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If current domain is not contour.

centroid

centroid() -> 'Pipeline'

Compute the centroid (center of mass) of the contour.

Domain transition: contour → vector (returns [x, y])

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If current domain is not contour.

bounding_box

bounding_box() -> 'Pipeline'

Compute the axis-aligned bounding box of the contour.

Domain transition: contour → vector (returns [x, y, width, height])

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If current domain is not contour.

translate

translate(
    *, dx: FloatOrExpr, dy: FloatOrExpr
) -> "Pipeline"

Translate the contour by an offset.

Domain: contour → contour

Parameters:

Name Type Description Default
dx FloatOrExpr

X offset (horizontal translation).

required
dy FloatOrExpr

Y offset (vertical translation).

required

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If current domain is not contour.

scale_contour

scale_contour(
    *, sx: FloatOrExpr, sy: FloatOrExpr
) -> "Pipeline"

Scale the contour relative to its centroid.

Domain: contour → contour

Parameters:

Name Type Description Default
sx FloatOrExpr

X scale factor.

required
sy FloatOrExpr

Y scale factor.

required

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If current domain is not contour.

simplify

simplify(*, tolerance: FloatOrExpr) -> 'Pipeline'

Simplify the contour using Douglas-Peucker algorithm.

Domain: contour → contour

Parameters:

Name Type Description Default
tolerance FloatOrExpr

Maximum distance from original contour.

required

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If current domain is not contour.

convex_hull

convex_hull() -> 'Pipeline'

Compute the convex hull of the contour.

Domain: contour → contour

Returns:

Type Description
'Pipeline'

Self for chaining.

Raises:

Type Description
ValueError

If current domain is not contour.

validate

validate() -> None

Validate that the pipeline is well-formed.

Raises:

Type Description
ValueError

If pipeline is invalid.

has_source

has_source() -> bool

Check if the pipeline has a source defined.

Returns:

Type Description
bool

True if the pipeline has a source defined.

to_graph

to_graph(column: Expr | None = None) -> 'PipelineGraph'

Convert this linear pipeline to a graph representation.

This is the unified execution path - all pipelines are converted to graphs before execution. A Pipeline becomes a single node in the graph.

For multi-output with intermediate checkpoints, use LazyPipelineExpr composition with .pipe() and .alias() instead.

Parameters:

Name Type Description Default
column Expr | None

The input column expression. If None, must be set later via graph.set_root_column().

None

Returns:

Type Description
'PipelineGraph'

PipelineGraph representation of this pipeline.

Example
>>> pipe = Pipeline().source("image_bytes").resize(height=100, width=200)
>>> graph = pipe.to_graph(pl.col("image"))
>>> expr = graph.to_expr()

__repr__

__repr__() -> str

Return string representation of pipeline.