Skip to content

Commit

Permalink
Merge pull request #24 from Open-EO/danlooo/issue23
Browse files Browse the repository at this point in the history
Enforce broadcasting
  • Loading branch information
danlooo authored Jan 15, 2024
2 parents 8d21293 + 73fc164 commit b2e9c64
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 49 deletions.
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Markdown = "d6f4376e-aef5-505a-96c1-9c027394607a"
OrderedCollections = "bac558e1-5e72-5ebc-8fee-abe8a469f55d"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce"
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
StructTypes = "856f2bd8-1eba-4b0a-8007-ebc267875bd4"

[compat]
Expand Down
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ using Pkg
Pkg.add(url="https://github.com/Open-EO/openeo-julia-client.git")
```

Connect to an openEO backend server and list available collections of raster data image datasets:
Connect to an openEO backend server and list available collections of raster image datasets:

```julia
using OpenEOClient
Expand All @@ -45,11 +45,13 @@ con.collections
# LANDSAT8_L2: Landsat 8 level 2 ARD, European Coverage
```

Further computations require a free registration at the [Copernicus Data Space](https://dataspace.copernicus.eu).
Further computations require a free registration at an openEO backend.
Here, we use the [Copernicus Data Space](https://dataspace.copernicus.eu).
Calculate the enhanced vegetation index (EVI) analog to this [tutorial](https://documentation.dataspace.copernicus.eu/APIs/openEO/Python_Client/Python.html):

```julia
using OpenEOClient
using Statistics
con = connect("openeo.dataspace.copernicus.eu/openeo", "", OpenEOClient.oidc_auth)
cube = DataCube(con, "SENTINEL2_L2A",
BoundingBox(west=5.14, south=51.17, east=5.17, north=51.19),
Expand All @@ -58,15 +60,17 @@ cube = DataCube(con, "SENTINEL2_L2A",
blue = cube["B02"] * 0.0001
red = cube["B04"] * 0.0001
nir = cube["B08"] * 0.0001
evi = 2.5 * (nir - red) / (nir + 6.0 * red - 7.5 * blue + 1.0)
evi = @. 2.5 * (nir - red) / (nir + 6.0 * red - 7.5 * blue + 1.0)
mean_evi = mean(evi, dims = "t")
# openEO DataCube
# collection: SENTINEL2_L2A
# bands: Single band
# dimensions: ["t", "x", "y"]
# bands: Unknown
# spatial extent: BoundingBox{Float64}(5.14, 51.17, 5.17, 51.19)
# temporal extent: ("2021-02-01", "2021-02-10")
# license: proprietary
# connection: https://openeo.dataspace.copernicus.eu/openeo/
```

Up to now, the analysis workflow is just being constructed on the client.
It can be executed on the server using `compute_result(evi)` which returns the file name of the downloaded result.
It can be executed on the server using `compute_result(mean_evi)` which returns the file name of the downloaded result.
134 changes: 115 additions & 19 deletions src/Cubes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
# - convertsion chain: DataCube -> ProcessCall -> openEO JSON
#

import Base: convert, promote, promote_rule
import Base: +, -, *, /, cos, sqrt, abs
import Base: broadcasted, +, -, *, /, cos, sqrt, abs, ==, !, !=, maximum, reduce
using Statistics

"""
openEO n-dimensional array of ratser data
Expand All @@ -15,8 +15,8 @@ This process graph can be grown iterativeley by applying functions and operators
struct DataCube
connection::Connection
call::ProcessCall

bands
dimensions
spatial_extent
temporal_extent
description
Expand All @@ -26,7 +26,7 @@ end

function Base.show(io::IO, ::MIME"text/plain", c::DataCube)
bands_str = if isnothing(c.bands)
"Single band"
"Unknown"
elseif length(c.bands) == 1
c.bands[1]
elseif length(c.bands) <= 5
Expand All @@ -35,19 +35,24 @@ function Base.show(io::IO, ::MIME"text/plain", c::DataCube)
c.bands[1:min(length(c.bands), 5)] |> x -> vcat(x, ["..."]) |> x -> join(x, ", ")
end

dimensions_str = isnothing(c.dimensions) ? "Unknown" : c.dimensions
collection_str = isnothing(c.collection) ? "Unknown" : c.collection["id"]

println(io, "openEO DataCube")
println(io, " collection: $collection_str")
println(io, " dimensions: $dimensions_str")
println(io, " bands: $bands_str")
println(io, " spatial extent: $(c.spatial_extent)")
println(io, " temporal extent: $(c.temporal_extent)")
println(io, " license: $(c.license)")
print(io, " connection: https://$(c.connection.credentials.host)/$(c.connection.credentials.version)")
end

print_json(cube::DataCube) = cube.call |> ProcessGraph |> print_json

function DataCube(connection::Connection, collection_id::String, spatial_extent::BoundingBox, temporal_extent::Tuple{String,String}, bands::Vector{String})
collection = describe_collection(connection.credentials, collection_id)
dimensions = collection[Symbol("cube:dimensions")] |> keys .|> String

call = ProcessCall("load_collection", Dict(
:id => collection_id,
Expand All @@ -57,7 +62,7 @@ function DataCube(connection::Connection, collection_id::String, spatial_extent:
))

return DataCube(
connection, call, bands,
connection, call, bands, dimensions,
spatial_extent, temporal_extent,
collection.description,
collection.license,
Expand All @@ -67,6 +72,7 @@ end

function DataCube(connection::Connection, collection_id)
collection = describe_collection(connection.credentials, collection_id)
dimensions = collection[Symbol("cube:dimensions")] |> keys .|> String

bands = try
collection["cube:dimensions"].bands.values |> Vector{String}
Expand Down Expand Up @@ -96,16 +102,18 @@ function DataCube(connection::Connection, collection_id)
))

return DataCube(
connection, call, bands,
connection, call, bands, dimensions,
spatial_extent, temporal_extent,
collection.description,
collection.license,
collection
)
end

function get_band(cube::DataCube, band::String)
band in cube.bands || error("Band $get_band must be one of $(cube.arguments[:bands])")
function to_band(cube::DataCube, band::String)
band in cube.bands || error("Band $to_band must be one of $(cube.arguments[:bands])")

dimensions = setdiff(cube.dimensions, ["bands"])

args = Dict(
:data => cube.call,
Expand All @@ -116,10 +124,10 @@ function get_band(cube::DataCube, band::String)
)) |> ProcessGraph
)
call = ProcessCall("reduce_dimension", args)
return DataCube(cube.connection, call, nothing, cube.spatial_extent, cube.temporal_extent, cube.description, cube.license, cube.collection)
return DataCube(cube.connection, call, nothing, dimensions, cube.spatial_extent, cube.temporal_extent, cube.description, cube.license, cube.collection)
end

Base.getindex(cube::DataCube, band_name) = get_band(cube, band_name)
Base.getindex(cube::DataCube, band_name) = to_band(cube, band_name)

compute_result(cube::DataCube) = cube.call |> ProcessGraph |> cube.connection.compute_result
ProcessGraph(cube::DataCube) = ProcessGraph(cube.call)
Expand Down Expand Up @@ -174,7 +182,7 @@ function binary_operator(cube::DataCube, number::Real, openeo_process::String, r
end

return DataCube(
cube.connection, call, nothing,
cube.connection, call, cube.bands, cube.dimensions,
cube.spatial_extent, cube.temporal_extent,
cube.collection.description,
cube.collection.license,
Expand All @@ -200,7 +208,9 @@ function binary_operator(cube1::DataCube, cube2::DataCube, openeo_process::Strin
))

return DataCube(
cube1.connection, call, nothing,
cube1.connection, call,
merge(cube1.bands, cube2.bands),
merge(cube1.dimensions, cube2.dimensions),
merge(cube1.spatial_extent, cube2.spatial_extent),
merge(cube1.temporal_extent, cube2.temporal_extent),
merge(cube1.description, cube2.description),
Expand All @@ -209,17 +219,103 @@ function binary_operator(cube1::DataCube, cube2::DataCube, openeo_process::Strin
)
end

+(cube::DataCube, number::Real) = binary_operator(cube, number, "add")
+(number::Real, cube::DataCube) = binary_operator(cube, number, "add", true)
-(cube::DataCube, number::Real) = binary_operator(cube, number, "subtract")
-(number::Real, cube::DataCube) = binary_operator(cube, number, "subtract", true)
function reduce_dimension(cube::DataCube, openeo_process::String, dimension::String)
Symbol(openeo_process) in keys(cube.connection.processes) || error("Reducer process not found on backend")
Symbol(dimension) in keys(cube.collection[Symbol("cube:dimensions")]) || error("Dimension not found")

call = ProcessCall("reduce_dimension", Dict(
:data => cube.call,
:dimension => dimension,
:reducer => ProcessCall(openeo_process, Dict(:data => ProcessCallParameter("data"))) |> ProcessGraph
))

bands = dimension == "bands" ? nothing : cube.bands
dimensions = isnothing(cube.dimensions) ? nothing : setdiff(cube.dimensions, [dimension])

return DataCube(
cube.connection, call,
bands, dimensions, nothing, nothing,
cube.collection.description,
cube.collection.license,
cube.collection
)
end

function unary_operator(cube::DataCube, openeo_process::String)
if cube.call.process_id in ["apply", "reduce_dimension"]
# can append operation to last existing process call
argument = Dict("apply" => :process, "reduce_dimension" => :reducer)[cube.call.process_id]
last_call = cube.call.arguments[argument].process_graph |> last

new_steps = cube.call.arguments[argument].process_graph
# Mark only last step as a result node
for call in new_steps
call.result = false
end

new_call = ProcessCall(openeo_process, Dict(:x => ProcessCallParameter("x")); result=true)
push!(new_steps, new_call)

call = cube.call
call.arguments[argument] = ProcessGraph(new_steps)
else
call = ProcessCall("apply", Dict(
:data => cube.call,
:process => ProcessCall(openeo_process, Dict(:x => ProcessCallParameter("x")); result=true) |> ProcessGraph
))
end

DataCube(
cube.connection,
call,
cube.bands,
cube.dimensions,
cube.spatial_extent,
cube.temporal_extent,
cube.description,
cube.license,
cube.collection
)
end

# element wise operations of cube and number
*(cube::DataCube, number::Real) = binary_operator(cube, number, "multiply")
*(number::Real, cube::DataCube) = binary_operator(cube, number, "multiply", true)
/(cube::DataCube, number::Real) = binary_operator(cube, number, "divide")
/(number::Real, cube::DataCube) = binary_operator(cube, number, "divide", true)
broadcasted(::typeof(+), cube::DataCube, number::Real) = binary_operator(cube, number, "add")
broadcasted(::typeof(+), number::Real, cube::DataCube) = binary_operator(cube, number, "add", true)
broadcasted(::typeof(-), cube::DataCube, number::Real) = binary_operator(cube, number, "subtract")
broadcasted(::typeof(-), number::Real, cube::DataCube) = binary_operator(cube, number, "subtract", true)
broadcasted(::typeof(*), cube::DataCube, number::Real) = binary_operator(cube, number, "multiply")
broadcasted(::typeof(*), number::Real, cube::DataCube) = binary_operator(cube, number, "multiply", true)
broadcasted(::typeof(/), cube::DataCube, number::Real) = binary_operator(cube, number, "divide")
broadcasted(::typeof(/), number::Real, cube::DataCube) = binary_operator(cube, number, "divide", true)

broadcasted(::typeof(==), cube::DataCube, number::Real) = binary_operator(cube, number, "eq")
broadcasted(::typeof(==), number::DataCube, cube::Real) = binary_operator(cube, number, "eq", true)

# element wise operations of two data cubes
+(cube1::DataCube, cube2::DataCube) = binary_operator(cube1, cube2, "add")
-(cube1::DataCube, cube2::DataCube) = binary_operator(cube1, cube2, "subtract")
*(cube1::DataCube, cube2::DataCube) = binary_operator(cube1, cube2, "multiply")
/(cube1::DataCube, cube2::DataCube) = binary_operator(cube1, cube2, "divide")
broadcasted(::typeof(+), cube1::DataCube, cube2::DataCube) = binary_operator(cube1, cube2, "add")
broadcasted(::typeof(-), cube1::DataCube, cube2::DataCube) = binary_operator(cube1, cube2, "subtract")
broadcasted(::typeof(*), cube1::DataCube, cube2::DataCube) = binary_operator(cube1, cube2, "multiply")
broadcasted(::typeof(/), cube1::DataCube, cube2::DataCube) = binary_operator(cube1, cube2, "divide")

# element wise unary operations
broadcasted(::typeof(sqrt), cube::DataCube) = unary_operator(cube, "sqrt")
broadcasted(::typeof(abs), cube::DataCube) = unary_operator(cube, "abs")
broadcasted(::typeof(sin), cube::DataCube) = unary_operator(cube, "sin")
broadcasted(::typeof(cos), cube::DataCube) = unary_operator(cube, "cos")
broadcasted(::typeof(!), cube::DataCube) = unary_operator(cube, "not")

# reduce operations
function reduce(op::Process, cube::DataCube; dims::String)
("data" in op.parameters .|> x -> x.name) || error("Process must have argument data.")
dims in cube.dimensions || error("Dimension $dims not present in the data cube.")

reduce_dimension(cube::DataCube, op.id, dims)
end

maximum(cube::DataCube; dims::String) = reduce_dimension(cube::DataCube, "max", dims)
Statistics.mean(cube::DataCube; dims::String) = reduce_dimension(cube::DataCube, "mean", dims)
18 changes: 9 additions & 9 deletions src/OpenEOClient.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@ include("API.jl")
include("Cubes.jl")

export
Band,
BoundingBox,
compute_result,
connect,
list_collections,
DataCube,
describe_collection,
to_band,
list_collections,
list_jobs,
list_processes,
register_processes,
print_json,
ProcessCall,
Processes,
ProcessGraph,
ProcessGraph,
BoundingBox,
DataCube,
compute_result,
print_json,
get_band,
Band
reduce_dimension,
register_processes
end
61 changes: 45 additions & 16 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ password = ENV["OPENEO_PASSWORD"]
@test "api_version" in keys(response)
@test "backend_version" in keys(response)

list_processes(unauth_con)
list_collections(unauth_con)
@test list_processes(unauth_con) |> typeof == Vector{OpenEOClient.Process}
@test list_collections(unauth_con) |> typeof == Vector{OpenEOClient.Collection}

c1 = connect(host, version)
c2 = connect(host, version, username, password)
Expand All @@ -40,19 +40,48 @@ password = ENV["OPENEO_PASSWORD"]
("2020-01-20", "2020-01-30"), ["B01", "B02"]
)

@test_throws MethodError abs(cube)
@test abs.(cube) |> typeof == DataCube

cube["B01"]
cube["B01"] + 1
1 + cube["B01"]
cube["B01"] + 1 + 2
cube + 1
1 + cube + 1
cube["B01"] + cube["B02"]
cube + cube
cube + 1
cube + 1 + 2
1 + cube
@test (cube["B01"]+1+1).call.arguments[:reducer] |> length == 3
@test (cube["B01"] + cube["B02"]).call.process_id == "merge_cubes"
@test (cube+2*3).call.arguments[:process] |> length == 1
@test cube2 = cube + 2 * 3 |> typeof == DataCube
cube["B01"] .+ 1
1 .+ cube["B01"]
cube["B01"] .+ 1 .+ 2
1 .+ cube .+ 1
cube["B01"] .+ cube["B02"]
cube .+ cube
cube .+ 1
cube .+ 1 .+ 2
1 .+ cube
@test (cube["B01"].+1 .+ 1).call.arguments[:reducer] |> length == 3
@test (cube["B01"] .+ cube["B02"]).call.process_id == "merge_cubes"
@test (cube .+ 2*3).call.arguments[:process] |> length == 1
@test cube2 = cube .+ 2 * 3 |> typeof == DataCube

# OpenEO applies processes per element thus enforce broadcasting
@test sin.(cube) |> typeof == DataCube
@test_throws MethodError sin(cube)
@test cube ./ 2 |> typeof == DataCube
@test cube / 2 |> typeof == DataCube
@test_throws MethodError 2 / cube
@test 2 ./ cube |> typeof == DataCube
@test_throws MethodError cube / cube
@test cube ./ cube |> typeof == DataCube
@test cube + cube |> typeof == DataCube
@test cube .+ cube |> typeof == DataCube

@test_throws ErrorException reduce_dimension(cube, "xxx", "t")
@test_throws ErrorException reduce_dimension(cube, "min", "xxx")
@test reduce_dimension(cube["B02"], "min", "t") |> typeof == DataCube
@test reduce_dimension(cube, "min", "t") |> typeof == DataCube

@test maximum(cube["B01"], dims="t") |> typeof == DataCube
@test reduce(cube.connection.max, cube["B01"], dims="t") |> typeof == DataCube
@test_throws ErrorException maximum(cube["B01"], dims="xx")
@test_throws ErrorException reduce(cube.connection.sin, cube, dims="t")

@test cube.dimensions == ["x", "y", "t", "bands"]
@test cube["B01"].dimensions == ["x", "y", "t"]
@test maximum(cube["B01"], dims = "t").dimensions == ["x", "y"]
@test maximum(cube, dims = "bands").dimensions == ["x", "y", "t"]
end

0 comments on commit b2e9c64

Please sign in to comment.