Skip to content

Commit

Permalink
E2E: cover kamu-cli commands (#882)
Browse files Browse the repository at this point in the history
* E2E: add (or expand) `config`/`init`/`add`/`rename` command tests (#878)

* KamuCliApiServerHarnessOptions::with_kamu_config(): introduce

* E2E: cover "kamu config" command

* E2E, test_init_in_an_existing_workspace(): add

* E2E, test_config_set_value(): respect potential CI agent podman configuration

* Updated yanked crate: futures-channel

* E2E, test_config_get_with_default(): respect potential CI agent podman configuration

* kamu-cli-e2e-common: extract "player-scores" & "leaderboard" datasets into statics

* assert_cmd: add patch

* E2E, test_add_dataset_from_stdin(): add

* E2E, test_add_dataset_from_stdin(): add

* AddCommand::validate_args(): fix "name" arg processing

* E2E, test_add_dataset_with_name(): mix add commands

* E2E, test_add_dataset_with_replace(): add

* Interact: fix if is_tty

* CHANGELOG.md: update

* E2E, test_delete_dataset(): add

* E2E, test_delete_dataset_recursive(): add

* E2E, test_delete_dataset_all(): add

* E2E, test_rename_dataset(): add

* assert_cmd: use patch (now from the kamu-data org)

* kamu-cli-puppet: remove unused dep

* Extracted to the separate PR

* E2E: add (or expand) `ingest`/`inspect`/`log`/`new`/`reset`/`search`/etc command tests (#883)

* E2E, ingest_data_to_player_scores_from_stdio(): add

* clippy fixes

* E2E, test_ingest_from_stdin(): also check data via the tail command

* E2E, ingest_data_to_player_scores_from_stdio(): check data for all ingest steps

* E2E, test_ingest_recursive(): add... a part of the test

* E2E, test_ingest_with_source_name(): add

* E2E, test_inspect_lineage(): add

* IngestCommand: add a todo

* DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_{1,2,3}: stabilize tests

* Rename: LineageCommand -> InspectLineageCommand

* E2E: stabilize ingest command tests

* KamuCliPuppetExt::list_blocks(): add

* E2E, test_inspect_query(): add

* E2E, test_inspect_schema(): add

* kamu-cli, Login: fix a doc string typo

* E2E, test_log(): add

* E2E, test_new_{root,derivative}(): add

* E2E, test_reset(): add

* cli-reference.md: update

* test_repo_alias_command -> test_repo_command

* E2E, KamuApiServerClientExt::ingest_data(): add

* E2E, test_search_multi_user(): add

* E2E, test_search_by_name(): add

* E2E, test_search_by_repo(): add

* E2E, test_sql_command(): add

* E2E, test_gc(): add

* E2E, test_system_info(): add

* E2E, test_system_diagnose(): add

* CI: test fixes

* assert_ingest_data_to_player_scores_from_stdio(): stabilize rows order

* assert_ingest_data_to_player_scores_from_stdio(): stabilize rows order [2]

* test_sql_command(): use the default name

* assert_ingest_data_to_player_scores_from_stdio(): stabilize rows order [3]

* CHANGELOG.md: update

* CI: windows build fixes

* E2E, test_tail(): add

* assert_ingest_data_to_player_scores_from_stdio(): stabilize rows order [4]

* assert_ingest_data_to_player_scores_from_stdio(): stabilize rows order [5]

* pretty_assertions::assert_eq!(): expected first, actual after

* Remove extra files

* E2E: cover "kamu login" / "kamu logout" commands

* `Push, pull, compact, verify` commands e2e tests (#893)

* Add query commitments example

* Release (patch) `0.204.5`: fix `--yes / -y` flag: fixed when working from a TTY (#881)

* Interact: fix if is_tty

* CHANGELOG.md: update

* Release (patch): 0.204.5

* Updated yanked crate (futures-util to 0.3.31)

* CI: Fixes `kamu-base-with-data-mt` image builds (#885)

* Images, kamu-base-with-data-mt: add "kamu" to predefined users

* Images, kamu-base-with-data-mt: init-workspace.py use .kamuconfig

* CHANGELOG.md: update

* Add push pull tests

* Add s3 tests

* Remove unused code

* Test transform engine error

* Revert check

* Add container extra groups

* Add verify and compact commands

* Fix grammar

* Fix review commets

* Revert "Merge branch 'master' into chore/push-and-pul-command-e2e-tests"

This reverts commit a4ec39c, reversing
changes made to 1130fa5.

* Clean code

* Fix review comments. Iter 2

* Revert ToDo comment

---------

Co-authored-by: Sergii Mikhtoniuk <[email protected]>
Co-authored-by: Dima Pristupa <[email protected]>

* E2E: add Outbox processing middleware  (#901)

* E2E, e2e_middleware_fn(): implement

* kamu-adapter-http: hide E2E things behind the "e2e" feature gate

* query_handler_post_v2(): fix a typo

* kamu-cli: integrate the E2E middleware

* CHANGELOG.md: update

* Updates after rebasing

* CHANGELOG.md: add missed commands

* CHANGELOG.md: add a note about a "kamu add" fix

* KamuCliPuppetExt::ingest_data(): simplify

* E2E: activate new tests for SQLite

* E2E: activate new tests for PostgreSQL

* E2E: remove "kamu config" tests for SQLite & PostgreSQL

* E2E: test_smart_pull_derivative(): use the tail command + stabilization (#903)

* E2E: test_smart_pull_derivative(): use the tail command

* E2E: run postgres tests one by one

* E2E: test_smart_pull_derivative(): stabilize order

* E2E: wait_for_flows_to_finish(): increase takes count

* E2E, test_ingest_command: stabilize order without the "match_time" changing

* E2E, test_log_command: stabilize order without the "match_time" changing

* E2E, test_sql_command: stabilize order without the "match_time" changing

* E2E, test_tail_command: stabilize order without the "match_time" changing

* E2E, test_dataset_trigger_flow(): stabilize order without the "match_time" changing

* E2E, test_smart_transfer_protocol: stabilize order without the "match_time" changing

* KamuCliPuppetExt: absorb helper methods (#906)

* E2E, test_rest_api_request_dataset_tail(): use a const instead of a hardcoded value

* E2E, test_verify_command: use a consts instead of hardcoded values

* E2E: KamuApiServerClient::get_node_url(): absorb

* CI: mark postgres as flaky but run with 8 threads

* Tests: add flaky "risingwave" group

* E2E, test_dataset_trigger_flow(): add "risingwave" test-group

* CI: go not specify threads for postgres

* E2E, KamuCliPuppetExt::assert_player_scores_dataset_data(): extract

* E2E, KamuCliPuppetExt::assert_success_command_execution(): absorb

* E2E, KamuCliPuppetExt::assert_failure_command_execution(): absorb

* E2E, assert_ingest_data_to_player_scores_from_stdio(): use assert_success_command_execution_with_input()

* KamuCliPuppetExt::assert_player_scores_dataset_data(): use assert_success_command_execution()

* E2E, test_config_command: use new methods

* E2E, test_inspect_command: use new methods

* E2E, test_search_command: use new methods

* E2E, test_sql_command: use new methods

* E2E, test_tail_command: use new methods

* E2E, test_system_api_server_gql_query: use new methods

* E2E, test_add_command: use new methods

* KamuCliPuppetExt: use iter for maybe_expected_stderr

* E2E, test_compact_command: use new methods

* E2E, test_delete_command: use new methods

* E2E, test_ingest_command: use new methods

* E2E, test_init_command: use new methods

* E2E, test_login_command: use new methods

* E2E, test_new_command: use new methods

* E2E, test_rename_command: use new methods

* E2E, test_reset_command: use new methods

* E2E, test_smart_transfer_protocol: use new methods

* E2E, test_system_gc_command: use new methods

* E2E, test_verify_command: use new methods

* E2E, test_workspace_svc: use new methods

* KamuCliPuppetExt::assert_failure_command_execution_with_input(): implement

---------

Co-authored-by: Roman Boiko <[email protected]>
Co-authored-by: Sergii Mikhtoniuk <[email protected]>
  • Loading branch information
3 people authored Oct 16, 2024
1 parent b84630a commit 1ba2367
Show file tree
Hide file tree
Showing 131 changed files with 6,434 additions and 285 deletions.
20 changes: 16 additions & 4 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,31 @@ containerized = { max-threads = 8 } # Don't use too much memory
engine = { max-threads = 2 } # Engine tests are very memory-hungry
database = { max-threads = 8 } # Don't use too much memory

# NOTE: --> There is an incompatibility between nextest and sqlx:
# - nextest implies multiprocessing,
# - while sqlx has a lock on cleanup within the current process
# (https://github.com/launchbadge/sqlx/pull/2640#issuecomment-1659455042).

# TODO: Delete this workaround when this PR is merged:
# - Fix: nextest cleanup race condition by bonega
# https://github.com/launchbadge/sqlx/pull/3334
#
# NOTE: There is an incompatibility between nextest and sqlx:
# - nextest implies multiprocessing,
# - while sqlx has a lock on cleanup within the current process
# (https://github.com/launchbadge/sqlx/pull/2640#issuecomment-1659455042).
[[profile.default.overrides]]
filter = "test(::mysql::)"
test-group = "mysql"
retries = { count = 3, backoff = "exponential", delay = "3s" }

[[profile.default.overrides]]
filter = "test(::postgres::)"
retries = { count = 3, backoff = "exponential", delay = "3s" }
# NOTE: <-- There is an incompatibility between nextest and sqlx

# NOTE: Periodic missing rows when the system is under load
# https://github.com/kamu-data/kamu-engine-risingwave/issues/7
[[profile.default.overrides]]
filter = "test(::risingwave::)"
retries = { count = 3, backoff = "exponential", delay = "3s" }

[[profile.default.overrides]]
filter = "test(::setup::)"
test-group = "setup"
Expand Down
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,32 @@ Recommendation: for ease of reading, use the following order:
- Fixed
-->

## [Unreleased]
### Added
- Added (or expanded) E2E tests for:
- `kamu config` command
- `kamu init` command
- `kamu add` command
- `kamu rename` command
- `kamu ingest` command
- `kamu inspect` command
- `kamu log` command
- `kamu new` command
- `kamu reset` command
- `kamu search` command
- `kamu sql` command
- `kamu system gc` command
- `kamu system info` command
- `kamu system diagnose` command
- `kamu tail` command
- `kamu login` command
- `kamu logout` command
- `kamu push` command
- `kamu pull` command
- E2E: HTTP middleware is implemented, which improves stability of E2E tests
### Fixed
- `kamu add`: fixed behavior when using `--stdin` and `--name` arguments

## [0.205.0] - 2024-10-15
### Changed
- `kamu push <dataset>` command now can be called without `--to` reference and Alias or Remote dataset repository will be used as destination
Expand Down
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,4 @@ debug = "line-tables-only"
# datafusion-odata = { git = 'https://github.com/kamu-data/datafusion-odata.git', branch = '42.0.0-axum-0.6' }
# datafusion-ethers = { git = "https://github.com/kamu-data/datafusion-ethers.git", tag = "42.0.0" }
# object_store = { git = 'https://github.com/s373r/arrow-rs', branch = 'add-debug-logs', package = "object_store" }
assert_cmd = { git = 'https://github.com/kamu-data/assert_cmd', branch = "deactivate-output-truncation" }
4 changes: 2 additions & 2 deletions resources/cli-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ To regenerate this schema from existing code, use the following command:
* `inspect` — Group of commands for exploring dataset metadata
* `list [ls]` — List all datasets in the workspace
* `log` — Shows dataset metadata history
* `login`Authentiates with a remote ODF server interactively
* `login`Authenticates with a remote ODF server interactively
* `logout` — Logs out from a remote Kamu server
* `new` — Creates a new dataset manifest from a template
* `notebook` — Starts the notebook server for exploring the data in the workspace
Expand Down Expand Up @@ -508,7 +508,7 @@ Using a filter to inspect blocks containing query changes of a derivative datase

## `kamu login`

Authentiates with a remote ODF server interactively
Authenticates with a remote ODF server interactively

**Usage:** `kamu login [OPTIONS] [SERVER] [COMMAND]`

Expand Down
2 changes: 2 additions & 0 deletions src/adapter/graphql/src/mutations/datasets_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ impl DatasetsMut {
}

// TODO: Multi-tenancy
// https://github.com/kamu-data/kamu-cli/issues/891

// TODO: Multi-tenant resolution for derivative dataset inputs (should it only
// work by ID?)
#[allow(unused_variables)]
Expand Down
11 changes: 11 additions & 0 deletions src/adapter/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ workspace = true
doctest = false


[features]
default = []

e2e = ["dep:messaging-outbox", "dep:http-body-util"]


[dependencies]
database-common = { workspace = true }
database-common-macros = { workspace = true }
Expand Down Expand Up @@ -86,6 +92,11 @@ tracing = "0.1"
url = { version = "2", features = ["serde"] }
uuid = { version = "1", default-features = false, features = ["v4"] }

# Optional
messaging-outbox = { optional = true, workspace = true }

http-body-util = { optional = true, version = "0.1" }


[dev-dependencies]
container-runtime = { workspace = true }
Expand Down
105 changes: 105 additions & 0 deletions src/adapter/http/src/e2e/e2e_middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright Kamu Data, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use axum::body::{Body, Bytes};
use axum::extract::Request;
use axum::middleware::Next;
use axum::response::Response;
use http::Method;
use http_common::ApiError;
use serde::Deserialize;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

/// Middleware that invokes Outbox messages processing for mutable requests
pub async fn e2e_middleware_fn(request: Request, next: Next) -> Result<Response, ApiError> {
let base_catalog = request
.extensions()
.get::<dill::Catalog>()
.cloned()
.expect("Catalog not found in http server extensions");

let (is_mutable_request, request) = analyze_request_for_mutability(request).await?;
let response = next.run(request).await;

if is_mutable_request && response.status().is_success() {
let outbox_executor = base_catalog
.get_one::<messaging_outbox::OutboxExecutor>()
.unwrap();

outbox_executor.run_while_has_tasks().await?;
}

Ok(response)
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

async fn analyze_request_for_mutability(request: Request) -> Result<(bool, Request), ApiError> {
{
let is_not_modifying_requests = request.method() != Method::POST;

if is_not_modifying_requests {
return Ok((false, request));
}
}
{
let is_rest_api_post_request = request.uri().path() != "/graphql";

if is_rest_api_post_request {
return Ok((true, request));
}
}
{
// In the case of GQL, we check whether the query is mutable or not
let (request_parts, request_body) = request.into_parts();
let buffered_request_body = buffer_request_body(request_body).await?;

let is_mutating_gql = if let Ok(body) = std::str::from_utf8(&buffered_request_body) {
let gql_request = serde_json::from_str::<SimplifiedGqlRequest>(body)
.map_err(ApiError::bad_request)?;

gql_request.query.starts_with("mutation")
} else {
false
};

let request = Request::from_parts(request_parts, Body::from(buffered_request_body));

Ok((is_mutating_gql, request))
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

async fn buffer_request_body<B>(request_body: B) -> Result<Bytes, ApiError>
where
B: axum::body::HttpBody<Data = Bytes>,
B::Error: std::error::Error + Send + Sync + 'static,
{
use http_body_util::BodyExt;

let body_bytes = match request_body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return Err(ApiError::bad_request(e));
}
};

Ok(body_bytes)
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Deserialize)]
struct SimplifiedGqlRequest {
query: String,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
3 changes: 3 additions & 0 deletions src/adapter/http/src/e2e/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod e2e_middleware;
mod e2e_router;

pub use e2e_middleware::*;
pub use e2e_router::*;
1 change: 1 addition & 0 deletions src/adapter/http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod access_token;
pub use access_token::*;
mod axum_utils;
pub mod data;
#[cfg(feature = "e2e")]
pub mod e2e;
mod simple_protocol;
pub mod smart_protocol;
Expand Down
2 changes: 1 addition & 1 deletion src/app/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ kamu-data-utils = { workspace = true }
kamu-adapter-auth-oso = { workspace = true }
kamu-adapter-flight-sql = { optional = true, workspace = true }
kamu-adapter-graphql = { workspace = true }
kamu-adapter-http = { workspace = true }
kamu-adapter-http = { workspace = true, features = ["e2e"], default-features = false }
kamu-adapter-oauth = { workspace = true }
kamu-adapter-odata = { workspace = true }
kamu-datafusion-cli = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/app/cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ pub struct Log {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

/// Authentiates with a remote ODF server interactively
/// Authenticates with a remote ODF server interactively
#[derive(Debug, clap::Args)]
pub struct Login {
#[command(subcommand)]
Expand Down
2 changes: 1 addition & 1 deletion src/app/cli/src/cli_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub fn get_command(
}
}
cli::Command::Inspect(c) => match c.subcommand {
cli::InspectSubCommand::Lineage(sc) => Box::new(LineageCommand::new(
cli::InspectSubCommand::Lineage(sc) => Box::new(InspectLineageCommand::new(
cli_catalog.get_one()?,
cli_catalog.get_one()?,
cli_catalog.get_one()?,
Expand Down
3 changes: 2 additions & 1 deletion src/app/cli/src/commands/add_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ impl Command for AddCommand {
"No manifest references or paths were provided",
));
}
if self.name.is_some() && (self.recursive || self.snapshot_refs.len() != 1) {
if self.name.is_some() && (self.recursive || !(self.snapshot_refs.len() == 1 || self.stdin))
{
return Err(CLIError::usage_error(
"Name override can be used only when adding a single manifest",
));
Expand Down
2 changes: 2 additions & 0 deletions src/app/cli/src/commands/ingest_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ impl Command for IngestCommand {
_ => Ok(()),
}?;

// TODO: `kamu ingest`: implement `--recursive` mode
// https://github.com/kamu-data/kamu-cli/issues/886
if self.recursive {
unimplemented!("Sorry, recursive ingest is not yet implemented")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub enum LineageOutputFormat {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub struct LineageCommand {
pub struct InspectLineageCommand {
dataset_repo: Arc<dyn DatasetRepository>,
provenance_svc: Arc<dyn ProvenanceService>,
workspace_layout: Arc<WorkspaceLayout>,
Expand All @@ -41,7 +41,7 @@ pub struct LineageCommand {
output_config: Arc<OutputConfig>,
}

impl LineageCommand {
impl InspectLineageCommand {
pub fn new<I>(
dataset_repo: Arc<dyn DatasetRepository>,
provenance_svc: Arc<dyn ProvenanceService>,
Expand Down Expand Up @@ -94,7 +94,7 @@ impl LineageCommand {

// TODO: Support temporality and evolution
#[async_trait::async_trait(?Send)]
impl Command for LineageCommand {
impl Command for InspectLineageCommand {
async fn run(&mut self) -> Result<(), CLIError> {
use futures::{StreamExt, TryStreamExt};
let mut dataset_handles: Vec<_> = if self.dataset_refs.is_empty() {
Expand Down
2 changes: 2 additions & 0 deletions src/app/cli/src/commands/log_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use crate::output::OutputConfig;
pub enum MetadataLogOutputFormat {
Shell,
Yaml,
// TODO: `kamu log`: support `--output-format json`
// https://github.com/kamu-data/kamu-cli/issues/887
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
3 changes: 3 additions & 0 deletions src/app/cli/src/commands/login_silent_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ use crate::{odf_server, CLIError, Command};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Debug)]
pub enum LoginSilentMode {
OAuth(LoginSilentModeOAuth),
Password(LoginSilentModePassword),
}

#[derive(Debug)]
pub struct LoginSilentModeOAuth {
pub provider: String,
pub access_token: String,
}

#[derive(Debug)]
pub struct LoginSilentModePassword {
pub login: String,
pub password: String,
Expand Down
Loading

0 comments on commit 1ba2367

Please sign in to comment.