DiskArrays.jl

Fabian Gans, Felix Cremer

DiskArrays.jl in a nutshell

  • for data stored on disk or remote data
  • representing n-dimensional arrays
  • with (relatively) fast bulk access and slow random access
  • for larger than memory datasets
  • talk about data chunking
  • helps data dataformat package authors implementing the julia array interface
  • helps downstream analysis tools by providing abstractions for working with on-disk data across formats

Why DiskArrays.jl

using NetCDF, Makie, CairoMakie
data = NetCDF.open("tos_O1_2001-2002.nc","tos")
### Here we access the on-disk data like a normal Julia array
timestep10 = data[:,10:end-10,10]
heatmap(timestep10)

What we want to achieve

Treat arrays on disk as if they were regular Julia arrays

  • getindex/setindex!
  • map and broadcast
  • reduce and mapreduce
  • iterate, collect, zip
  • thats what AbstractArrays are for

Reminder: AbstractArray Interface

struct AccessCountArray{T,N,P<:AbstractArray{T,N}} <: AbstractArray{T,N}
    parent::P
    accesscount::Base.RefValue
end
Base.size(a::AccessCountArray) = size(a.parent)
Base.IndexStyle(a::Type{<:AccessCountArray}) = IndexLinear()
function Base.getindex(a::AccessCountArray, i::Int) 
    a.accesscount[] += 1
    a.parent[i]
end
AccessCountArray(a::AbstractArray) = AccessCountArray(a,Ref(0))

ca = AccessCountArray(reshape(1:50,5,10))
5×10 AccessCountArray{Int64, 2, Base.ReshapedArray{Int64, 2, UnitRange{Int64}, Tuple{}}}:
 1   6  11  16  21  26  31  36  41  46
 2   7  12  17  22  27  32  37  42  47
 3   8  13  18  23  28  33  38  43  48
 4   9  14  19  24  29  34  39  44  49
 5  10  15  20  25  30  35  40  45  50
ca.accesscount[]
200

Reminder: AbstractArray Interface

ca .+ 3.0
5×10 Matrix{Float64}:
 4.0   9.0  14.0  19.0  24.0  29.0  34.0  39.0  44.0  49.0
 5.0  10.0  15.0  20.0  25.0  30.0  35.0  40.0  45.0  50.0
 6.0  11.0  16.0  21.0  26.0  31.0  36.0  41.0  46.0  51.0
 7.0  12.0  17.0  22.0  27.0  32.0  37.0  42.0  47.0  52.0
 8.0  13.0  18.0  23.0  28.0  33.0  38.0  43.0  48.0  53.0
sum(ca,dims = 1)
1×10 Matrix{Int64}:
 15  40  65  90  115  140  165  190  215  240
ca*[1,2,3,4,5,6,7,8,9,10]
5-element Vector{Int64}:
 1705
 1760
 1815
 1870
 1925
ca.accesscount[]
360

Reminder: AbstractArray Interface

ca[5:-2:1,[1,2,4]]
3×3 Matrix{Int64}:
 5  10  20
 3   8  18
 1   6  16
ca[[true,true,false,false,true],3,1]
3-element Vector{Int64}:
 11
 12
 15
ca[[CartesianIndex(3,1),CartesianIndex(1,5)]]
2-element Vector{Int64}:
  3
 21
ca.accesscount[]
374

AbstractArray Interface



Assumption: Access to single elements is efficient

The problem

  • typically random access to disk-based array has high latency
  • significant call overhead
  • we typically want to read larger portions from a file
  • Solutions:
    1. MMap -> Not possible for compressed data
    2. Caching -> Adds overhead of one lookup per random access
    3. Reinvent the AbstractArray Interface but based on large chunk access

The problem

  • typically random access to disk-based array has high latency
  • significant call overhead
  • we typically want to read larger portions from a file
  • Solutions:
    1. MMap -> Not possible for compressed data
    2. Caching -> Adds overhead of one lookup per random access
    3. Reinvent the AbstractArray Interface but based on large chunk access

Solution: The AbstractDiskArray interface

Example implementation for NetCDF.jl

# Implement the DiskArrays interface
import DiskArrays: AbstractDiskArray, readblock!, writeblock!
mutable struct NcVar{T,N,M} <: AbstractDiskArray{T,N}
    ...
end
function readblock!(v::NcVar{<:Any,N}, aout, r::Vararg{<:AbstractUnitRange,N}) where N
  @assert length(r) == ndims(aout)
  @assert size(aout) == length.(r)
  ...
end
function writeblock!(v::NcVar{<:Any,N}, a, r::Vararg{<:AbstractUnitRange,N}) where N
  ...
end

Let’s make a DiskArray

using DiskArrays: DiskArrays, AbstractDiskArray
struct AccessCountDiskArray{T,N,P<:AbstractArray{T,N}} <: AbstractDiskArray{T,N}
    parent::P
    accesscount::Base.RefValue
end
Base.size(a::AccessCountDiskArray) = size(a.parent)
function DiskArrays.readblock!(a::AccessCountDiskArray, aout, i::AbstractUnitRange...) 
    aout .= view(a.parent,i...)
    a.accesscount[] += 1
end
AccessCountDiskArray(a::AbstractArray) = AccessCountDiskArray(a,Ref(0))

da = AccessCountDiskArray(reshape(1:50,5,10))
5×10 AccessCountDiskArray{Int64, 2, Base.ReshapedArray{Int64, 2, UnitRange{Int64}, Tuple{}}}

Unchunked
da.accesscount[]
0

The AbstractDiskArray Interface

da[5:-2:1,[1,2,4]]
3×3 Matrix{Int64}:
 5  10  20
 3   8  18
 1   6  16
da[[true,true,false,false,true],3,1]
3-element Vector{Int64}:
 11
 12
 15
da[[CartesianIndex(3,1),CartesianIndex(1,5)]]
2-element Vector{Int64}:
  3
 21
da.accesscount[]
3

The AbstractDiskArray Interface


da .+ 3.0
5×10 DiskArrays.BroadcastDiskArray{Float64, 2, Base.Broadcast.Broadcasted{DiskArrays.ChunkStyle{2}, Tuple{Base.OneTo{Int64}, Base.OneTo{Int64}}, typeof(+), Tuple{AccessCountDiskArray{Int64, 2, Base.ReshapedArray{Int64, 2, UnitRange{Int64}, Tuple{}}}, Float64}}}

Chunked: (
    [5]
    [10]
)
da.accesscount[]
3

Broadcasting for DiskArrays is lazy

copy(da .+ 3.0)
5×10 Matrix{Float64}:
 4.0   9.0  14.0  19.0  24.0  29.0  34.0  39.0  44.0  49.0
 5.0  10.0  15.0  20.0  25.0  30.0  35.0  40.0  45.0  50.0
 6.0  11.0  16.0  21.0  26.0  31.0  36.0  41.0  46.0  51.0
 7.0  12.0  17.0  22.0  27.0  32.0  37.0  42.0  47.0  52.0
 8.0  13.0  18.0  23.0  28.0  33.0  38.0  43.0  48.0  53.0
da.accesscount[]
4

Reductions for AbstractDiskArrays

sum(da,dims = 1)
1×10 Matrix{Int64}:
 15  40  65  90  115  140  165  190  215  240
sum(da .+ 3.0, dims=1)
1×10 Matrix{Float64}:
 30.0  55.0  80.0  105.0  130.0  155.0  180.0  205.0  230.0  255.0
da.accesscount[]
6

Summary of the AbstractDiskArray Interface

  • packages implement readblock! and writeblock! to read and write n-dimensional rectangles of their data
  • in return they get indexing behavior according to the AbstractArray interface
  • while minimizing the number of calls into readblock!
  • is used by Zarr.jl NetCDF.jl NCDatasets.jl ArchGDAL.jl GRIBDatasets.jl HDF5Utils.jl
  • maybe HDF5.jl in the future
  • in addition simple lazy broadcasting and reductions

Summary of DiskArrays Broadcast and Reductions

  • data is accessed chunk by chunk
  • in a serial way
  • built-in methods suitable for medium-sized out-of-core arrays
  • distributed computing can be built on DiskArrays abstractions
  • DiskArrays provides an interface to talk about chunking structure

Chunking

Chunking

chunks

Implementation for NetCDF.jl

import DiskArrays: haschunks, eachchunk, Chunked, Unchunked, GridChunks
haschunks(v::NcVar) = all(iszero,v.chunksize) ? Unchunked(SubRanges()) : Chunked()
eachchunk(v::NcVar) = getchunksize(v)
getchunksize(v::NcVar) = getchunksize(haschunks(v),v)
getchunksize(::Chunked, v::NcVar) = GridChunks(v,reverse(map(Int64,v.chunksize)))
getchunksize(::Unchunked, v::NcVar) = estimate_chunksize(v)

Chunking

  • Most large datasets have some chunking structure
    • to optimize access of subsets along different dimensions
    • to facilitate compression
  • if haschunks is not defined for an AbstractDiskArray
    • default to Unchunked so that data is split along the last dimension(s) so that parts fit in memory
    • when haschunks returns Chunked, a method for eachchunk needs to be implemented

DiskArray types for chunks

using DiskArrays: IrregularChunks, RegularChunks, GridChunks
dayinmonth = [31,28,31,30,31,30,31,31,30,31,30,31]
monthlychunks = IrregularChunks(chunksizes = dayinmonth)
otherchunks = RegularChunks(200,0,200)
chunks = GridChunks(otherchunks,monthlychunks)
1×12 GridChunks{2, Tuple{RegularChunks, IrregularChunks}}:
 (1:200, 1:31)  (1:200, 32:59)  …  (1:200, 305:334)  (1:200, 335:365)
for c in chunks
    println("Doing something with chunk $c")
end
Doing something with chunk (1:200, 1:31)
Doing something with chunk (1:200, 32:59)
Doing something with chunk (1:200, 60:90)
Doing something with chunk (1:200, 91:120)
Doing something with chunk (1:200, 121:151)
Doing something with chunk (1:200, 152:181)
Doing something with chunk (1:200, 182:212)
Doing something with chunk (1:200, 213:243)
Doing something with chunk (1:200, 244:273)
Doing something with chunk (1:200, 274:304)
Doing something with chunk (1:200, 305:334)
Doing something with chunk (1:200, 335:365)

Examples for requesting chunks of Arrays

using NetCDF, DiskArrays
data = NetCDF.open("tos_O1_2001-2002.nc","tos")
DiskArrays.eachchunk(data)
1×1×1 GridChunks{3, Tuple{RegularChunks, RegularChunks, RegularChunks}}:
[:, :, 1] =
 (1:180, 1:170, 1:24)

Examples for requesting chunks of Arrays

using Zarr
url = "https://s3.bgc-jena.mpg.de:9000/pyramids/ESACCI-BIOMASS-L4-AGB-MERGED-100m-2020-fv4.0.zarr"
array = zopen(url)["agb"]
DiskArrays.eachchunk(array)
396×154×1 GridChunks{3, Tuple{RegularChunks, RegularChunks, RegularChunks}}:
[:, :, 1] =
 (1:1024, 1:1024, 1:1)         …  (1:1024, 156673:157500, 1:1)
 (1025:2048, 1:1024, 1:1)         (1025:2048, 156673:157500, 1:1)
 (2049:3072, 1:1024, 1:1)         (2049:3072, 156673:157500, 1:1)
 (3073:4096, 1:1024, 1:1)         (3073:4096, 156673:157500, 1:1)
 (4097:5120, 1:1024, 1:1)         (4097:5120, 156673:157500, 1:1)
 (5121:6144, 1:1024, 1:1)      …  (5121:6144, 156673:157500, 1:1)
 (6145:7168, 1:1024, 1:1)         (6145:7168, 156673:157500, 1:1)
 (7169:8192, 1:1024, 1:1)         (7169:8192, 156673:157500, 1:1)
 (8193:9216, 1:1024, 1:1)         (8193:9216, 156673:157500, 1:1)
 (9217:10240, 1:1024, 1:1)        (9217:10240, 156673:157500, 1:1)
 ⋮                             ⋱  
 (396289:397312, 1:1024, 1:1)     (396289:397312, 156673:157500, 1:1)
 (397313:398336, 1:1024, 1:1)     (397313:398336, 156673:157500, 1:1)
 (398337:399360, 1:1024, 1:1)     (398337:399360, 156673:157500, 1:1)
 (399361:400384, 1:1024, 1:1)  …  (399361:400384, 156673:157500, 1:1)
 (400385:401408, 1:1024, 1:1)     (400385:401408, 156673:157500, 1:1)
 (401409:402432, 1:1024, 1:1)     (401409:402432, 156673:157500, 1:1)
 (402433:403456, 1:1024, 1:1)     (402433:403456, 156673:157500, 1:1)
 (403457:404480, 1:1024, 1:1)     (403457:404480, 156673:157500, 1:1)
 (404481:405000, 1:1024, 1:1)  …  (404481:405000, 156673:157500, 1:1)

Summary on chunking

  • Downstream analysis packages can query the chunks of a DiskArray
  • and optimize their data processing pipelines according to chunks

Examples

  • YAXArrays.jl for easy parallel mapslices-like operations on large datasets
  • DiskArrayEngine.jl as a prototype library for moving window operations, reductions and more, Distributed execution using Dagger.jl, but still work in progress

Alternative low-hanging fruit

  • use Dagger.jl, map chunks of an AbstractDiskArray to chunks of a Dagger DArray
  • any DiskArray could easily become a lazy DArray, we would immediately increase the number of data sources for Dagger
  • some years-old initial implementation here

Sparse data access optimization

  • DiskArrays translates getindex calls into dense readblock! calls
  • how should we translate this for sparse getindex calls
  • e.g. a[[1,2,3,6,7,8]]
  • possible strategies:
    1. minimize number of readblock! calls read bounding box
    2. minimize number of accesses for every chunk
    3. minimize number of readblock! calls while not transferring unnecessary data

When is this important?

Remote array structures

Sparse index optimization

Chunk indices

Summary

  • As a data package author:
    • use DiskArrays.jl to implement the nice array stuff easily
  • As a Big Data Analysis Framework author
    • use the abstractions defined in DiskArrays.jl to make your out-of-core processing library compatible with many data sources

A Big thank you to all contributors

Rough edges

  • still a list of broken tests
  • iteration of DiskArrays is fundamentally broken
  • ideally iterators will pass the array chunk by chunk
    • leads to strange behavior when zip-ping DiskArrays with different chunks
  • setindex! not as well tested as getindex (yet)