Example solution that demonstrates the use of the EDK npm package to create various pipeline operations.
Run npm install
to install the package dependencies.
Run edk build
to create the schema.json
.
The project will involve creating a pipeline for each pipeline operation, then visualising the outputs.
In order to have some data to apply in the pipelines, we will first add some json datasources:
edk add datasource json --name "Test One" --uri file://files/test_one.json --def_dir src/datasource
edk add datasource json --name "Test Two" --uri file://files/test_two.json --def_dir src/datasource
This will create two empty datasources, we can populate the output expressions
edk-io detect json --asset test_one.source --defaults
edk-io detect json --asset test_two.source --defaults
The generates datasources will contain some complex nested data so we can exercise a broad range of Expressions, as described for test_one.source
:
import * as ELARA from "@elaraai/edk/lib"
const test_one_struct_type = ELARA.StructType({
string: 'string',
date: 'datetime',
number: 'float',
integer: 'integer',
'boolean': 'boolean',
struct: ELARA.StructType({
string: 'string',
date: 'datetime',
number: 'float',
integer: 'integer',
'boolean': 'boolean',
}),
array: 'set',
});
export default ELARA.JsonSourceSchema({
name: "Test One",
uri: "file://files/test_one.jsonl",
primary_key: ELARA.Variable("string", 'string'),
selections: {
string: ELARA.Parse(ELARA.Variable("string", 'string')),
date: ELARA.Parse(ELARA.Variable("date", 'datetime')),
number: ELARA.Parse(ELARA.Variable("number", 'float')),
integer: ELARA.Parse(ELARA.Variable("integer", 'integer')),
'boolean': ELARA.Parse(ELARA.Variable("boolean", 'boolean')),
array: ELARA.Parse(ELARA.Variable("array", 'set')),
Dict: ELARA.Parse(ELARA.Variable("Dict", ELARA.DictType('float'))),
struct: ELARA.Parse(ELARA.Variable("struct", test_one_struct_type)),
},
})
Now we can add some piplines, which were added with the following EDK commands:
- select:
edk add pipeline --name Select --def_dir src/datasource
- filter:
edk add pipeline --name Filter --def_dir src/datasource
- aggregation:
edk add pipeline --name Aggregate --def_dir src/datasource
- disaggregate:
edk add pipeline --name "Disaggregate Dict" --def_dir src/datasource
- join:
edk add pipeline --name Join --def_dir src/datasource
- http:
edk add pipeline --name Http --def_dir src/datasource
This will create an empty datasource for each operation, for example for the aggregation pipeline:
import * as ELARA from "@elaraai/edk/lib"
export default ELARA.PipelineSchema({
name: 'Aggregate',
input_table: null,
operations: [],
})
In the next steps we will add an input table and operation to each pipeline.
The select operation allows us to select output expressions from input expressions, in this case we will create some new variables from test_one.source
:
import * as ELARA from "@elaraai/edk/lib"
import { Multiply, SelectOperation, StringJoin, Variable } from "@elaraai/edk/lib"
import test_one from "../../gen/test_one.source"
export default ELARA.PipelineSchema({
name: 'Select',
input_table: test_one.output,
operations: [
SelectOperation({
// dont keep all the input expressions, only output the new ones
keep_all: false,
// the new expressions to create
selections: {
// output the `number` value multiplied by 2.
multiply: Multiply(test_one.output.fields.number, 2),
// combine several fields incluing a formatted date into a single string
'String Join': StringJoin`${test_one.output.fields.string}.${ELARA.Print(test_one.output.fields.date, 'DD/MM/YYYY')}`
},
// use the `String Join` value as the output key expression
primary_key: Variable('String Join', 'string')
})
],
})
The filter operation allows you to filter an input by a predicate expression:
import * as ELARA from "@elaraai/edk/lib"
import { Greater } from "@elaraai/edk/lib"
import test_one from "../../gen/test_one.source"
export default ELARA.PipelineSchema({
name: 'Filter',
input_table: test_one.output,
operations: [
ELARA.FilterOperation({
predicate: Greater(test_one.output.fields.number, 10)
})
],
})
The aggregation operation allows you to aggregate (or group) a Table Row into fewer Row's based on one or more Expressions. We can use test_one.source
to group the by the combination of the integer
and string
fields since group_value
can be any string Expression
of the input table. For each group we can calculate a range of values:
import * as ELARA from "@elaraai/edk/lib"
import test_one from "../../gen/test_one.source"
export default ELARA.PipelineSchema({
name: 'Aggregate',
input_table: test_one.output,
operations: [
ELARA.AggregateOperation({
// Create a variable to place the groiup values into
group_field: ELARA.Variable("Aggregate.Group", 'string'),
// Group by the unique combinations of `integer` and `string`
group_value: ELARA.StringJoin`${test_one.output.fields.integer}.${test_one.output.fields.string}`,
aggregations: {
// Count the number of rows where of `number` is not null.
count: ELARA.Count(test_one.output.fields.number),
// Count the number of distinct, non-null values of `string`
distinctcount: ELARA.DistinctCount(test_one.output.fields.string),
// Count the number of distinct, non-null values of a the combination of `integer` and `string`.
unique: ELARA.Unique(ELARA.StringJoin`${test_one.output.fields.integer}.${test_one.output.fields.string}`),
// Find the sum of the non-null values of `number`.
sum: ELARA.Sum(test_one.output.fields.number),
// Find the mean of the non-null values of `number`.
mean: ELARA.Mean(test_one.output.fields.number),
// Fnd the standard deviation of the non-null values of `number`.
stddev: ELARA.StdDev(test_one.output.fields.number),
// return true if one or more value of `boolean` is true.
some: ELARA.Some(test_one.output.fields.boolean),
// return true if all values of `boolean` is true.
every: ELARA.Every(test_one.output.fields.boolean),
// Find the mean of the non-null values of `number`.
mode: ELARA.Mode(test_one.output.fields.number),
// Find the median of the non-null values of `number`.
median: ELARA.Median(test_one.output.fields.number),
// Find the minimum of the non-null values of `number`.
minimum: ELARA.Minimum(test_one.output.fields.number),
// Find the maximum of the non-null values of `number`.
maximum: ELARA.Maximum(test_one.output.fields.number),
// Find the difference between the largest and smallest non-null values of `number`.
span: ELARA.Span(test_one.output.fields.number),
// Return the `string` value for the record with the minimum value for `number`.
findminimum: ELARA.FindMinimum(test_one.output.fields.string, test_one.output.fields.number),
// Return the `string` value for the record with the maximum value for `number`.
findmaximum: ELARA.FindMaximum(test_one.output.fields.string, test_one.output.fields.number),
// find the set of the non-null string values of `string`
collectset: ELARA.CollectSet(test_one.output.fields.string),
// create a dictionary of distinct key-value pairs of `string` => `number`
collectdict: ELARA.CollectDict(test_one.output.fields.string, test_one.output.fields.number),
// create a dictionary of the count of non-null values for keys of `string`
collectdictcount: ELARA.CollectDictCount(test_one.output.fields.string),
// create a dictionary of the sum of non-null values for key-value pairs of `string` => `number`
collectdictsum: ELARA.CollectDictSum(test_one.output.fields.string, test_one.output.fields.number),
// calculate the sum of the values of `Dict` where missing elements are presumed to have value of zero.
sparsedictsum: ELARA.SparseDictSum(test_one.output.fields.Dict),
// calculate the average values of `Dict` where missing elements are presumed to have value of zero.
sparsedictmean: ELARA.SparseDictMean(test_one.output.fields.Dict),
// calculate the average values of `Dict` where missing elements are presumed to have value of zero.
sparsedictvariance: ELARA.SparseDictMean(test_one.output.fields.Dict),
// calculate the covariance of the values of a `Dict` where missing elements are presumed to have value of zero.
sparsedictcovariance: ELARA.SparseDictCovariance(test_one.output.fields.Dict),
}
})
],
})
The disaggregate operation allows the opposite of aggregate by allowing us to "unroll" any collection expression into the values (or key values). We can use test_one.source
to disggregate the Dict
Expression
into its elements, we can output the collection values as well as some original input value:
import * as ELARA from "@elaraai/edk/lib"
import { DisaggregateOperation, StringJoin, Variable } from "@elaraai/edk/lib"
import test_one from "../../gen/test_one.source"
export default ELARA.PipelineSchema({
name: "Disaggregate Dict",
input_table: test_one.output,
operations: [
DisaggregateOperation({
// the collection we want to disaggregate
collection: test_one.output.fields.Dict,
// the variable containing each dictionary value
value: Variable("value", "float"),
// the variable containing each dictionary key
key: Variable("key", "string"),
// set to true because we want to keep all the original input expressions also
keep_all: true,
selections: {
// select the Key
"Key": Variable("key", "string"),
// select the value
"Value": Variable("value", "float"),
},
primary_key: StringJoin`${test_one.output.fields.string}. ${Variable("Key", "string")}`
}),
],
})
The distribution operation allows us to calculate the probability distributions for samples within a stream, based on a group key, in this case we will calculate the distribution of the number
variable from test_one.source
, within each value of string
:
import * as ELARA from "@elaraai/edk/lib"
import { DistributionOperation } from "@elaraai/edk/lib"
import test_one from "../../gen/test_one.source"
export default ELARA.PipelineSchema({
name: "Distribution",
input_table: test_one.output,
operations: [
DistributionOperation({
samples: test_one.output.fields.number,
group_key: test_one.output.fields.string,
distribution: 'GaussianKDE',
normalization: 'ProbabilityDensity'
})
],
})
The join operation allows us to perform an expression-based relational join between two tables, in this case we will join test_one.source
with test_two.source
:
import * as ELARA from "@elaraai/edk/lib"
import { Variable } from "@elaraai/edk/lib"
import test_one from "../../gen/test_one.source"
import test_two from "../../gen/test_two.source"
export default ELARA.PipelineSchema({
name: 'Join',
input_table: test_one.output,
operations: [
ELARA.JoinOperation({
// the table being joined in
source_table: test_two.output,
// the join key expression for the source, could be any string expression
source_key: test_two.output.fields.string,
// the join key expression for the target, could be any string expression
target_key: test_one.output.fields.string,
// the selections to apply to the source
source_selections: {
string: test_two.output.fields.string,
date: test_two.output.fields.date,
number: test_two.output.fields.number,
integer: test_two.output.fields.integer,
'boolean': test_two.output.fields['boolean'],
},
// the selections to apply to the source
target_selections: {
target_string: test_one.output.fields.string,
target_date: test_one.output.fields.date,
target_number: test_one.output.fields.number,
target_integer: test_one.output.fields.integer,
target_boolean: test_one.output.fields['boolean']
},
// perform an inner join on the keys
join_type: 'Inner',
// use the string expression foe the output primary key
output_key: Variable("string", 'string')
}),
],
})
The offset operation allows you to correlate neighbouring data points in a timestream or other sorted data set. In the general case, a table might consist of rows from multiple distinct timestreams (or other sorted data), so the operation allows you to specify both a group key as well as a sort key, in addition to the offset (the number of rows away you are interested in). We can use test_one.source
to demonstrate its usage, first grouping by the boolean
field and then sorting by the date
field in order to identify the previous datapoint by the string
field (which in this case is the table's primary key):
import * as ELARA from "@elaraai/edk/lib"
import { OffsetOperation, Variable } from "@elaraai/edk/lib"
import test_one from "../../gen/test_one.source"
export default ELARA.PipelineSchema({
name: 'Offset',
input_table: test_one.output,
operations: [
OffsetOperation({
// We are interested in the "previous" row in time
offset: -1,
// Separate time series based on the boolean field (optional)
group_key: test_one.output.fields.boolean,
// Sort each group in ascending order in time
sort_key: test_one.output.fields.date,
// Variable that specifies whether the a row at the chosen offset exists or not - in this case, whether it is the first row or not (optional)
// (This variable is accessible inside offset_selections)
offset_exists: Variable("previous_exists", "boolean"),
// Additional fields to append to each row based on the previous row's data
// (All the existing fields are preserved through the operation, and the primary key remains the same)
offset_selections: {
// Whether the row is the first in the group or not
previous_exists: Variable("previous_exists", "boolean"),
// The string field (primary key)
previous_string: test_one.output.fields.string,
},
})
],
})
The http operation allows us to interact with a RESTful API both by making requests but also propogating the response. As an example we have configured a public facing endpoint at webhook to allow us to post some data, then capture a response to propogate:
import * as ELARA from "@elaraai/edk/lib"
import { Const, Environment, Equal, HttpOperation, IfElse, mapValues, Parse, RestApiRequest, RestApiResponse, Struct, StructType, Variable } from "@elaraai/edk/lib"
import test_one_source from "../../gen/test_one.source"
const response_headers_type = StructType({
'content-type': 'string',
});
const test_one = test_one_source.output
export default ELARA.PipelineSchema({
name: "Http",
input_table: test_one,
operations: [
HttpOperation({
// the `REQUEST_URL` is specified as an environment variable
request: RestApiRequest({
url: Environment('REQUEST_URL'),
// POST some data
method: 'POST',
// send and accept json data
accept: 'application/json',
content: 'application/json',
// we could also add a header with some basic authentication (but no need in this case)
// headers: Struct({
// Authorisation: StringJoin`Bearer ${Environment("TOKEN")}`
// })
// send all of the expressions in `test_one` as a struct
body: Struct(test_one.fields),
// delay each request if the status code is 400
delay_ms: IfElse(
Equal(Variable('status_code', 'integer'), 400n),
Const(62000n),
Const(400n)
),
}),
response: RestApiResponse({
// place the status code in this variable - can be used in the output value
status_code_variable: Variable("status_code", 'integer'),
// place the status text in this variable - can be used in the output value
status_text_variable: Variable("status_text", 'string'),
// the expression to parse the response headers
headers: Parse(Variable("headers", response_headers_type)),
// the variable for the headers - can be used in the output value
headers_variable: Variable("headers", response_headers_type),
// the expression to parse the response body
body: Parse(Variable("body", StructType(mapValues(test_one.fields, f => f.type)))),
// the variable for the body - can be used in the output value
body_variable: Variable("body", StructType(mapValues(test_one.fields, f => f.type))),
// the expression for the output value produced on recieving a response
value: Struct({
body: Variable("body", StructType(mapValues(test_one.fields, f => f.type))),
headers: Variable("headers", response_headers_type),
status_code: Variable("status_code", 'integer'),
status_text: Variable("status_text", 'string'),
})
})
})
],
})
The join operation allows us to perform an expression based relational join between two tables, in this case we will join test_one.source
with test_two.source
:
import * as ELARA from "@elaraai/edk/lib"
import { Variable } from "@elaraai/edk/lib"
import test_one from "../../gen/test_one.source"
import test_two from "../../gen/test_two.source"
export default ELARA.PipelineSchema({
name: 'Join',
input_table: test_one.output,
operations: [
ELARA.JoinOperation({
// the table being joined in
source_table: test_two.output,
// the join key expression for the source, could be any string expression
source_key: test_two.output.fields.string,
// the join key expression for the target, could be any string expression
target_key: test_one.output.fields.string,
// the selections to apply to the source
source_selections: {
string: test_two.output.fields.string,
date: test_two.output.fields.date,
number: test_two.output.fields.number,
integer: test_two.output.fields.integer,
'boolean': test_two.output.fields['boolean'],
},
// the selections to apply to the source
target_selections: {
target_string: test_one.output.fields.string,
target_date: test_one.output.fields.date,
target_number: test_one.output.fields.number,
target_integer: test_one.output.fields.integer,
target_boolean: test_one.output.fields['boolean']
},
// perform an inner join on the keys
join_type: 'Inner',
// use the string expression foe the output primary key
output_key: Variable("string", 'string')
}),
],
})
The select operation allows us to select output expressions from input expressions, in this case we will create some new variables from test_one.source
:
import * as ELARA from "@elaraai/edk/lib"
import { Multiply, SelectOperation, StringJoin, Variable } from "@elaraai/edk/lib"
import test_one from "../../gen/test_one.source"
export default ELARA.PipelineSchema({
name: 'Select',
input_table: test_one.output,
operations: [
SelectOperation({
// dont keep all the input expressions, only output the new ones
keep_all: false,
// the new expressions to create
selections: {
// output the `number` value multiplied by 2.
multiply: Multiply(test_one.output.fields.number, 2),
// combine several fields incluing a formatted date into a single string
'String Join': StringJoin`${test_one.output.fields.string}.${ELARA.Print(test_one.output.fields.date, 'DD/MM/YYYY')}`
},
// use the `String Join` value as the output key expression
primary_key: Variable('String Join', 'string')
})
],
})
The application was added for the project with the following command: edk add plugin --name Application --def_dir src/plugin
. After running edk update
the application contents was added to display the pipeline outputs with the PipelinePlugin
for a default SuperUser
:
import * as ELARA from "@elaraai/edk/lib"
import { ApplicationPlugin, Const, EnvironmentVariable, SuperUser, PipelinePlugin } from "@elaraai/edk/lib"
import aggregate from "../../gen/aggregate.pipeline"
import disaggregate_dict from "../../gen/disaggregate_dict.pipeline"
import filter from "../../gen/filter.pipeline"
import http from "../../gen/http.pipeline"
import join from "../../gen/join.pipeline"
import select from "../../gen/select.pipeline"
export default ELARA.Schema(
ApplicationPlugin({
name: "Pipelines",
schemas: {
"Operations" : PipelinePlugin({
pipelines: [
aggregate,
disaggregate_dict,
filter,
http,
join,
select
]
})
},
users: [
SuperUser({
email: '[email protected]',
name: 'Admin',
password: Const('admin'),
})
],
environments: [
EnvironmentVariable({ name: 'REQUEST_URL' }),
]
})
)
General reference documentation for EDK usage is available in the following links: