Skip to content

Required Zarr Filters

This file implements all the filters in my Zarr.jl PR, so that we can register Kerchunk and users can use it without Zarr being an issue.

julia
import Zarr: Filter, zencode, zdecode, JSON, getfilter, filterdict, typestr

Delta compression

julia
"""
    DeltaFilter(; DecodingType, [EncodingType = DecodingType])

Delta-based compression for Zarr arrays.  (Delta encoding is Julia `diff`, decoding is Julia `cumsum`).
"""
struct DeltaFilter{T, TENC} <: Filter{T, TENC}
end

function DeltaFilter(; DecodingType = Float16, EncodingType = DecodingType)
    return DeltaFilter{DecodingType, EncodingType}()
end

DeltaFilter{T}() where T = DeltaFilter{T, T}()

function zencode(data::AbstractArray, filter::DeltaFilter{DecodingType, EncodingType}) where {DecodingType, EncodingType}
    arr = reinterpret(DecodingType, vec(data))

    enc = similar(arr, EncodingType)

perform the delta operation

julia
    enc[begin] = arr[begin]
    enc[begin+1:end] .= diff(arr)
    return enc
end

function zdecode(data::AbstractArray, filter::DeltaFilter{DecodingType, EncodingType}) where {DecodingType, EncodingType}
    encoded = reinterpret(EncodingType, vec(data))
    decoded = DecodingType.(cumsum(encoded))
    return decoded
end

function JSON.lower(filter::DeltaFilter{T, Tenc}) where {T, Tenc}
    return Dict("id" => "delta", "dtype" => typestr(T), "atype" => typestr(Tenc))
end

function getfilter(::Type{<: DeltaFilter}, d)
    return DeltaFilter{typestr(d["dtype"], haskey(d, "atype") ? typestr(d["atype"]) : d["dtype"])}()
end


"""
    FixedScaleOffsetFilter{T,TENC}(scale, offset)

A compressor that scales and offsets the data.

!!! note
    The geographic CF standards define scale/offset decoding as `x * scale + offset`,
    but this filter defines it as `x / scale + offset`.  Constructing a `FixedScaleOffsetFilter`
    from CF data means `FixedScaleOffsetFilter(1/cf_scale_factor, cf_add_offset)`.
"""
struct FixedScaleOffsetFilter{ScaleOffsetType, T, Tenc} <: Filter{T, Tenc}
    scale::ScaleOffsetType
    offset::ScaleOffsetType
end

FixedScaleOffsetFilter{T}(scale::ScaleOffsetType, offset::ScaleOffsetType) where {T, ScaleOffsetType} = FixedScaleOffsetFilter{T, ScaleOffsetType}(scale, offset)
FixedScaleOffsetFilter(scale::ScaleOffsetType, offset::ScaleOffsetType) where {ScaleOffsetType} = FixedScaleOffsetFilter{ScaleOffsetType, ScaleOffsetType}(scale, offset)

function FixedScaleOffsetFilter(; scale::ScaleOffsetType, offset::ScaleOffsetType, T, Tenc = T) where ScaleOffsetType
    return FixedScaleOffsetFilter{ScaleOffsetType, T, Tenc}(scale, offset)
end

function zencode(a::AbstractArray, c::FixedScaleOffsetFilter{ScaleOffsetType, T, Tenc}) where {T, Tenc, ScaleOffsetType}
    return @. convert(Tenc, # convert to the encoding type after applying the scale and offset
        round((a - c.offset) * c.scale) # apply scale and offset, and round to nearest integer
    )
end

function zdecode(a::AbstractArray, c::FixedScaleOffsetFilter{ScaleOffsetType, T, Tenc}) where {T, Tenc, ScaleOffsetType}
    return _reinterpret(Base.nonmissingtype(T), @. a / c.scale + c.offset)
end


function getfilter(::Type{<: FixedScaleOffsetFilter}, d::Dict)
    scale = d["scale"]
    offset = d["offset"]

Types must be converted from strings to the actual Julia types they represent.

julia
    string_T = d["dtype"]
    string_Tenc = get(d, "atype", string_T)
    T = typestr(string_T)
    Tenc = typestr(string_Tenc)
    return FixedScaleOffsetFilter{Tenc, T, Tenc}(scale, offset)
end

function JSON.lower(c::FixedScaleOffsetFilter{ScaleOffsetType, T, Tenc}) where {ScaleOffsetType, T, Tenc}
    return Dict("id" => "fixedscaleoffset", "scale" => c.scale, "offset" => c.offset, "dtype" => typestr(T), "atype" => typestr(Tenc))
end

Fletcher32 filter

This "filter" basically injects a 4-byte checksum at the end of the data, to ensure data integrity.

The implementation is based on the numcodecs implementation here and the original C implementation for NetCDF linked therein.

julia
"""
    Fletcher32Filter()

A compressor that uses the Fletcher32 checksum algorithm to compress and uncompress data.

Note that this goes from UInt8 to UInt8, and is effectively only checking
the checksum and cropping the last 4 bytes of the data during decoding.
"""
struct Fletcher32Filter <: Filter{UInt8, UInt8}
end

getfilter(::Type{<: Fletcher32Filter}, d::Dict) = Fletcher32Filter()
JSON.lower(::Fletcher32Filter) = Dict("id" => "fletcher32")

function _checksum_fletcher32(data::AbstractVector{UInt8})
    len = length(data) / 2 # length in 16-bit words
    sum1::UInt32 = 0
    sum2::UInt32 = 0
    data_idx = 1

    #=
    Compute the checksum for pairs of bytes.
    The magic `360` value is the largest number of sums that can be performed without overflow in UInt32.
    =#
    while len > 0
        tlen = len > 360 ? 360 : len
        len -= tlen
        while tlen > 0
            sum1 += begin # create a 16 bit word from two bytes, the first one shifted to the end of the word
                (UInt16(data[data_idx]) << 8) | UInt16(data[data_idx + 1])
            end
            sum2 += sum1
            data_idx += 2
            tlen -= 1
            if tlen < 1
                break
            end
        end
        sum1 = (sum1 & 0xffff) + (sum1 >> 16)
        sum2 = (sum2 & 0xffff) + (sum2 >> 16)
    end

if the length of the data is odd, add the first byte to the checksum again (?!)

julia
    if length(data) % 2 == 1
        sum1 += UInt16(data[1]) << 8
        sum2 += sum1
        sum1 = (sum1 & 0xffff) + (sum1 >> 16)
        sum2 = (sum2 & 0xffff) + (sum2 >> 16)
    end
    return (sum2 << 16) | sum1
end

function zencode(data, ::Fletcher32Filter)
    bytes = reinterpret(UInt8, data)
    checksum = _checksum_fletcher32(bytes)
    result = copy(bytes)
    append!(result, reinterpret(UInt8, [checksum])) # TODO: decompose this without the extra allocation of wrapping in Array
    return result
end

function zdecode(data, ::Fletcher32Filter)
    bytes = reinterpret(UInt8, data)
    checksum = _checksum_fletcher32(view(bytes, 1:length(bytes) - 4))
    stored_checksum = only(reinterpret(UInt32, view(bytes, (length(bytes) - 3):length(bytes))))
    if checksum != stored_checksum
        throw(ErrorException("""
        Checksum mismatch in Fletcher32 decoding.

        The computed value is $(checksum) and the stored value is $(stored_checksum).
        This might be a sign that the data is corrupted.
        """)) # TODO: make this a custom error type
    end
    return view(bytes, 1:length(bytes) - 4)
end

Quantize compression

julia
"""
    QuantizeFilter(; digits, DecodingType, [EncodingType = DecodingType])

Quantization based compression for Zarr arrays.
"""
struct QuantizeFilter{T, TENC} <: Filter{T, TENC}
    digits::Int32
end

function QuantizeFilter(; digits = 10, T = Float16, Tenc = T)
    return QuantizeFilter{T, Tenc}(digits)
end

QuantizeFilter{T, Tenc}(; digits = 10) where {T, Tenc} = QuantizeFilter{T, Tenc}(digits)
QuantizeFilter{T}(; digits = 10) where T = QuantizeFilter{T, T}(digits)

function zencode(data::AbstractArray, filter::QuantizeFilter{DecodingType, EncodingType}) where {DecodingType, EncodingType}
    arr = reinterpret(DecodingType, vec(data))

    precision = 10.0^(-filter.digits)

    _exponent = log(10, precision) # log 10 in base `precision`
    exponent = _exponent < 0 ? floor(Int, _exponent) : ceil(Int, _exponent)

    bits = ceil(log(2, 10.0^(-exponent)))
    scale = 2.0^bits

    enc = @. round(scale * arr) / scale

    if EncodingType == DecodingType
        return enc
    else
        return reinterpret(EncodingType, enc)
    end
end

Decoding is a no-op; quantization is a lossy filter but data is encoded directly.

julia
function zdecode(data::AbstractArray, filter::QuantizeFilter{DecodingType, EncodingType}) where {DecodingType, EncodingType}
    return data
end

function JSON.lower(filter::QuantizeFilter{T, Tenc}) where {T, Tenc}
    return Dict("type" => "quantize", "digits" => filter.digits, "dtype" => typestr(T), "atype" => typestr(Tenc))
end

function getfilter(::Type{<: QuantizeFilter}, d)
    return QuantizeFilter{typestr(d["dtype"], typestr(d["atype"]))}(; digits = d["digits"])
end

Shuffle compression

This file implements the shuffle compressor.

julia
struct ShuffleFilter <: Filter{UInt8, UInt8}
    elementsize::Csize_t
end

ShuffleFilter(; elementsize = 4) = ShuffleFilter(elementsize)

function _do_shuffle!(dest::AbstractVector{UInt8}, source::AbstractVector{UInt8}, elementsize::Csize_t)
    count = fld(length(source), elementsize) # elementsize is in bytes, so this works
    for i in 0:(count-1)
        offset = i * elementsize
        for byte_index in 0:(elementsize-1)
            j = byte_index * count + i
            dest[j+1] = source[offset + byte_index+1]
        end
    end
end

function _do_unshuffle!(dest::AbstractVector{UInt8}, source::AbstractVector{UInt8}, elementsize::Csize_t)
    count = fld(length(source), elementsize) # elementsize is in bytes, so this works
    for i in 0:(elementsize-1)
        offset = i * count
        for byte_index in 0:(count-1)
            j = byte_index * elementsize + i
            dest[j+1] = source[offset + byte_index+1]
        end
    end
end

function zencode(a::AbstractArray, c::ShuffleFilter)
    if c.elementsize <= 1 # no shuffling needed if elementsize is 1
        return a
    end
    source = reinterpret(UInt8, vec(a))
    dest = Vector{UInt8}(undef, length(source))
    _do_shuffle!(dest, source, c.elementsize)
    return dest
end

function zdecode(a::AbstractArray, c::ShuffleFilter)
    if c.elementsize <= 1 # no shuffling needed if elementsize is 1
        return a
    end
    source = reinterpret(UInt8, vec(a))
    dest = Vector{UInt8}(undef, length(source))
    _do_unshuffle!(dest, source, c.elementsize)
    return dest
end

function getfilter(::Type{ShuffleFilter}, d::Dict)
    return ShuffleFilter(d["elementsize"])
end

function JSON.lower(c::ShuffleFilter)
    return Dict("id" => "shuffle", "elementsize" => Int64(c.elementsize))
end

This page was generated using Literate.jl.