Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

E2E: cover kamu-cli commands #882

Merged
merged 13 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,31 @@ containerized = { max-threads = 8 } # Don't use too much memory
engine = { max-threads = 2 } # Engine tests are very memory-hungry
database = { max-threads = 8 } # Don't use too much memory

# NOTE: --> There is an incompatibility between nextest and sqlx:
# - nextest implies multiprocessing,
# - while sqlx has a lock on cleanup within the current process
# (https://github.com/launchbadge/sqlx/pull/2640#issuecomment-1659455042).

# TODO: Delete this workaround when this PR is merged:
# - Fix: nextest cleanup race condition by bonega
# https://github.com/launchbadge/sqlx/pull/3334
#
# NOTE: There is an incompatibility between nextest and sqlx:
# - nextest implies multiprocessing,
# - while sqlx has a lock on cleanup within the current process
# (https://github.com/launchbadge/sqlx/pull/2640#issuecomment-1659455042).
[[profile.default.overrides]]
filter = "test(::mysql::)"
test-group = "mysql"
retries = { count = 3, backoff = "exponential", delay = "3s" }

[[profile.default.overrides]]
filter = "test(::postgres::)"
retries = { count = 3, backoff = "exponential", delay = "3s" }
# NOTE: <-- There is an incompatibility between nextest and sqlx

# NOTE: Periodic missing rows when the system is under load
# https://github.com/kamu-data/kamu-engine-risingwave/issues/7
[[profile.default.overrides]]
filter = "test(::risingwave::)"
retries = { count = 3, backoff = "exponential", delay = "3s" }

[[profile.default.overrides]]
filter = "test(::setup::)"
test-group = "setup"
Expand Down
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,32 @@ Recommendation: for ease of reading, use the following order:
- Fixed
-->

## [Unreleased]
### Added
- Added (or expanded) E2E tests for:
- `kamu config` command
- `kamu init` command
- `kamu add` command
- `kamu rename` command
- `kamu ingest` command
- `kamu inspect` command
- `kamu log` command
- `kamu new` command
- `kamu reset` command
- `kamu search` command
- `kamu sql` command
- `kamu system gc` command
- `kamu system info` command
- `kamu system diagnose` command
- `kamu tail` command
- `kamu login` command
- `kamu logout` command
- `kamu push` command
- `kamu pull` command
- E2E: HTTP middleware is implemented, which improves stability of E2E tests
### Fixed
- `kamu add`: fixed behavior when using `--stdin` and `--name` arguments

## [0.205.0] - 2024-10-15
### Changed
- `kamu push <dataset>` command now can be called without `--to` reference and Alias or Remote dataset repository will be used as destination
Expand Down
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,4 @@ debug = "line-tables-only"
# datafusion-odata = { git = 'https://github.com/kamu-data/datafusion-odata.git', branch = '42.0.0-axum-0.6' }
# datafusion-ethers = { git = "https://github.com/kamu-data/datafusion-ethers.git", tag = "42.0.0" }
# object_store = { git = 'https://github.com/s373r/arrow-rs', branch = 'add-debug-logs', package = "object_store" }
assert_cmd = { git = 'https://github.com/kamu-data/assert_cmd', branch = "deactivate-output-truncation" }
4 changes: 2 additions & 2 deletions resources/cli-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ To regenerate this schema from existing code, use the following command:
* `inspect` — Group of commands for exploring dataset metadata
* `list [ls]` — List all datasets in the workspace
* `log` — Shows dataset metadata history
* `login` — Authentiates with a remote ODF server interactively
* `login` — Authenticates with a remote ODF server interactively
* `logout` — Logs out from a remote Kamu server
* `new` — Creates a new dataset manifest from a template
* `notebook` — Starts the notebook server for exploring the data in the workspace
Expand Down Expand Up @@ -508,7 +508,7 @@ Using a filter to inspect blocks containing query changes of a derivative datase

## `kamu login`

Authentiates with a remote ODF server interactively
Authenticates with a remote ODF server interactively

**Usage:** `kamu login [OPTIONS] [SERVER] [COMMAND]`

Expand Down
2 changes: 2 additions & 0 deletions src/adapter/graphql/src/mutations/datasets_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ impl DatasetsMut {
}

// TODO: Multi-tenancy
// https://github.com/kamu-data/kamu-cli/issues/891

// TODO: Multi-tenant resolution for derivative dataset inputs (should it only
// work by ID?)
#[allow(unused_variables)]
Expand Down
11 changes: 11 additions & 0 deletions src/adapter/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ workspace = true
doctest = false


[features]
default = []

e2e = ["dep:messaging-outbox", "dep:http-body-util"]


[dependencies]
database-common = { workspace = true }
database-common-macros = { workspace = true }
Expand Down Expand Up @@ -86,6 +92,11 @@ tracing = "0.1"
url = { version = "2", features = ["serde"] }
uuid = { version = "1", default-features = false, features = ["v4"] }

# Optional
messaging-outbox = { optional = true, workspace = true }

http-body-util = { optional = true, version = "0.1" }


[dev-dependencies]
container-runtime = { workspace = true }
Expand Down
105 changes: 105 additions & 0 deletions src/adapter/http/src/e2e/e2e_middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright Kamu Data, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use axum::body::{Body, Bytes};
use axum::extract::Request;
use axum::middleware::Next;
use axum::response::Response;
use http::Method;
use http_common::ApiError;
use serde::Deserialize;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

/// Middleware that invokes Outbox messages processing for mutable requests
pub async fn e2e_middleware_fn(request: Request, next: Next) -> Result<Response, ApiError> {
let base_catalog = request
.extensions()
.get::<dill::Catalog>()
.cloned()
.expect("Catalog not found in http server extensions");

let (is_mutable_request, request) = analyze_request_for_mutability(request).await?;
let response = next.run(request).await;

if is_mutable_request && response.status().is_success() {
let outbox_executor = base_catalog
.get_one::<messaging_outbox::OutboxExecutor>()
.unwrap();

outbox_executor.run_while_has_tasks().await?;
}

Ok(response)
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

async fn analyze_request_for_mutability(request: Request) -> Result<(bool, Request), ApiError> {
{
let is_not_modifying_requests = request.method() != Method::POST;

if is_not_modifying_requests {
return Ok((false, request));
}
}
{
let is_rest_api_post_request = request.uri().path() != "/graphql";

if is_rest_api_post_request {
return Ok((true, request));
}
}
{
// In the case of GQL, we check whether the query is mutable or not
let (request_parts, request_body) = request.into_parts();
let buffered_request_body = buffer_request_body(request_body).await?;

let is_mutating_gql = if let Ok(body) = std::str::from_utf8(&buffered_request_body) {
let gql_request = serde_json::from_str::<SimplifiedGqlRequest>(body)
.map_err(ApiError::bad_request)?;

gql_request.query.starts_with("mutation")
} else {
false
};

let request = Request::from_parts(request_parts, Body::from(buffered_request_body));

Ok((is_mutating_gql, request))
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

async fn buffer_request_body<B>(request_body: B) -> Result<Bytes, ApiError>
where
B: axum::body::HttpBody<Data = Bytes>,
B::Error: std::error::Error + Send + Sync + 'static,
{
use http_body_util::BodyExt;

let body_bytes = match request_body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return Err(ApiError::bad_request(e));
}
};

Ok(body_bytes)
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Deserialize)]
struct SimplifiedGqlRequest {
query: String,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
3 changes: 3 additions & 0 deletions src/adapter/http/src/e2e/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod e2e_middleware;
mod e2e_router;

pub use e2e_middleware::*;
pub use e2e_router::*;
1 change: 1 addition & 0 deletions src/adapter/http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod access_token;
pub use access_token::*;
mod axum_utils;
pub mod data;
#[cfg(feature = "e2e")]
pub mod e2e;
mod simple_protocol;
pub mod smart_protocol;
Expand Down
2 changes: 1 addition & 1 deletion src/app/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ kamu-data-utils = { workspace = true }
kamu-adapter-auth-oso = { workspace = true }
kamu-adapter-flight-sql = { optional = true, workspace = true }
kamu-adapter-graphql = { workspace = true }
kamu-adapter-http = { workspace = true }
kamu-adapter-http = { workspace = true, features = ["e2e"], default-features = false }
kamu-adapter-oauth = { workspace = true }
kamu-adapter-odata = { workspace = true }
kamu-datafusion-cli = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/app/cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ pub struct Log {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

/// Authentiates with a remote ODF server interactively
/// Authenticates with a remote ODF server interactively
#[derive(Debug, clap::Args)]
pub struct Login {
#[command(subcommand)]
Expand Down
2 changes: 1 addition & 1 deletion src/app/cli/src/cli_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub fn get_command(
}
}
cli::Command::Inspect(c) => match c.subcommand {
cli::InspectSubCommand::Lineage(sc) => Box::new(LineageCommand::new(
cli::InspectSubCommand::Lineage(sc) => Box::new(InspectLineageCommand::new(
cli_catalog.get_one()?,
cli_catalog.get_one()?,
cli_catalog.get_one()?,
Expand Down
3 changes: 2 additions & 1 deletion src/app/cli/src/commands/add_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ impl Command for AddCommand {
"No manifest references or paths were provided",
));
}
if self.name.is_some() && (self.recursive || self.snapshot_refs.len() != 1) {
if self.name.is_some() && (self.recursive || !(self.snapshot_refs.len() == 1 || self.stdin))
{
return Err(CLIError::usage_error(
"Name override can be used only when adding a single manifest",
));
Expand Down
2 changes: 2 additions & 0 deletions src/app/cli/src/commands/ingest_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ impl Command for IngestCommand {
_ => Ok(()),
}?;

// TODO: `kamu ingest`: implement `--recursive` mode
// https://github.com/kamu-data/kamu-cli/issues/886
if self.recursive {
unimplemented!("Sorry, recursive ingest is not yet implemented")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub enum LineageOutputFormat {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub struct LineageCommand {
pub struct InspectLineageCommand {
dataset_repo: Arc<dyn DatasetRepository>,
provenance_svc: Arc<dyn ProvenanceService>,
workspace_layout: Arc<WorkspaceLayout>,
Expand All @@ -41,7 +41,7 @@ pub struct LineageCommand {
output_config: Arc<OutputConfig>,
}

impl LineageCommand {
impl InspectLineageCommand {
pub fn new<I>(
dataset_repo: Arc<dyn DatasetRepository>,
provenance_svc: Arc<dyn ProvenanceService>,
Expand Down Expand Up @@ -94,7 +94,7 @@ impl LineageCommand {

// TODO: Support temporality and evolution
#[async_trait::async_trait(?Send)]
impl Command for LineageCommand {
impl Command for InspectLineageCommand {
async fn run(&mut self) -> Result<(), CLIError> {
use futures::{StreamExt, TryStreamExt};
let mut dataset_handles: Vec<_> = if self.dataset_refs.is_empty() {
Expand Down
2 changes: 2 additions & 0 deletions src/app/cli/src/commands/log_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use crate::output::OutputConfig;
pub enum MetadataLogOutputFormat {
Shell,
Yaml,
// TODO: `kamu log`: support `--output-format json`
// https://github.com/kamu-data/kamu-cli/issues/887
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
3 changes: 3 additions & 0 deletions src/app/cli/src/commands/login_silent_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ use crate::{odf_server, CLIError, Command};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Debug)]
pub enum LoginSilentMode {
OAuth(LoginSilentModeOAuth),
Password(LoginSilentModePassword),
}

#[derive(Debug)]
pub struct LoginSilentModeOAuth {
pub provider: String,
pub access_token: String,
}

#[derive(Debug)]
pub struct LoginSilentModePassword {
pub login: String,
pub password: String,
Expand Down
Loading
Loading