Skip to content

Commit

Permalink
API to read and write dataset flow configurations (#386)
Browse files Browse the repository at this point in the history
* GraphQL API to manipulate flow configurations
* Differentiating which type of flow requires which kind of configuration
* Covered new GraphQL API for flow config with unit tests
* Experiment: Applied @OneOf directive in GraphQL to create union input model for schedules
* Formater fixes
  • Loading branch information
zaychenko-sergei authored Dec 27, 2023
1 parent f182f1a commit a1d128d
Show file tree
Hide file tree
Showing 21 changed files with 1,529 additions and 6 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed race condition during tests parallel execution (`tests::ingest::test_fetch::*` test group)

## [0.150.0] - 2023-12-27
### Added
- GraphQL API to configure automatic run of dataset flows:
- a schedule for main flows, like ingest of root datasets
- a batching condition for dependent flows, such as executing transforms
### Changed
- Changed logic in `SimpleTransferProtocol` now block data and checkpoint downloading/uploading
in parallel. Default parallel tasks is 10, but it could be changed by changing
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 93 additions & 0 deletions resources/schema.gql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
directive @oneOf on INPUT_OBJECT

type Account {
"""
Unique and stable identitfier of this account
Expand Down Expand Up @@ -141,6 +143,10 @@ type CreateDatasetResultSuccess implements CreateDatasetResult & CreateDatasetFr
message: String!
}

type CronExpression {
cronExpression: String!
}

type DataBatch {
format: DataBatchFormat!
content: String!
Expand Down Expand Up @@ -233,6 +239,10 @@ type Dataset {
"""
metadata: DatasetMetadata!
"""
Access to the flow configurations of this dataset
"""
flows: DatasetFlows!
"""
Creation time of the first metadata block in the chain
"""
createdAt: DateTime!
Expand Down Expand Up @@ -286,6 +296,35 @@ type DatasetEdge {
node: Dataset!
}

type DatasetFlowConfigs {
"""
Returns defined configuration for a flow of specified type
"""
byType(datasetFlowType: DatasetFlowType!): FlowConfiguration
}

type DatasetFlowConfigsMut {
setConfigSchedule(datasetFlowType: DatasetFlowType!, paused: Boolean!, schedule: ScheduleInput!): SetFlowConfigResult!
setConfigBatching(datasetFlowType: DatasetFlowType!, paused: Boolean!, throttlingPeriod: TimeDeltaInput, minimalDataBatch: Int): SetFlowConfigResult!
}

enum DatasetFlowType {
INGEST
EXECUTE_QUERY
COMPACTION
}

type DatasetFlows {
"""
Returns interface for flow configurations queries
"""
configs: DatasetFlowConfigs!
}

type DatasetFlowsMut {
configs: DatasetFlowConfigsMut!
}

scalar DatasetID

enum DatasetKind {
Expand Down Expand Up @@ -366,6 +405,10 @@ type DatasetMut {
"""
metadata: DatasetMetadataMut!
"""
Access to the mutable flow configurations of this dataset
"""
flows: DatasetFlowsMut!
"""
Rename the dataset
"""
rename(newName: DatasetName!): RenameResult!
Expand Down Expand Up @@ -526,6 +569,19 @@ type FetchStepUrl {
}


type FlowConfiguration {
paused: Boolean!
schedule: FlowConfigurationSchedule
batching: FlowConfigurationBatching
}

type FlowConfigurationBatching {
throttlingPeriod: TimeDelta
minimalDataBatch: Int
}

union FlowConfigurationSchedule = TimeDelta | CronExpression


type InputSlice {
datasetId: DatasetID!
Expand Down Expand Up @@ -828,6 +884,11 @@ type RequestHeader {
value: String!
}

input ScheduleInput @oneOf {
timeDelta: TimeDeltaInput
cronExpression: String
}

type Search {
"""
Perform search across all resources
Expand Down Expand Up @@ -870,6 +931,21 @@ type SetDataSchema {
schema: DataSchema!
}

type SetFlowConfigIncompatibleDatasetKind implements SetFlowConfigResult {
expectedDatasetKind: DatasetKind!
actualDatasetKind: DatasetKind!
message: String!
}

interface SetFlowConfigResult {
message: String!
}

type SetFlowConfigSuccess implements SetFlowConfigResult {
config: FlowConfiguration!
message: String!
}

type SetInfo {
description: String
keywords: [String!]
Expand Down Expand Up @@ -1056,6 +1132,23 @@ type TemporalTable {
primaryKey: [String!]!
}

type TimeDelta {
every: Int!
unit: TimeUnit!
}

input TimeDeltaInput {
every: Int!
unit: TimeUnit!
}

enum TimeUnit {
MINUTES
HOURS
DAYS
WEEKS
}

union Transform = TransformSql

type TransformInput {
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/graphql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ thiserror = { version = "1", default-features = false }

[dev-dependencies]
# TODO: Limit to mock or in-memory implementations only
event-bus = { workspace = true }
kamu = { workspace = true }
event-bus = {workspace = true }
kamu-flow-system-inmem = { workspace = true }

env_logger = "0.10"
mockall = "0.11"
Expand Down
Loading

0 comments on commit a1d128d

Please sign in to comment.