Skip to content

Commit

Permalink
Add version to config and contract
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker committed Aug 29, 2023
1 parent c9462bf commit 258d1c9
Show file tree
Hide file tree
Showing 39 changed files with 87 additions and 45 deletions.
3 changes: 2 additions & 1 deletion config/tests/test.debezium-with-schema-registry.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: dozer-kafka-with-schema-registry-test
version: 1

api:
rest:
Expand Down Expand Up @@ -36,4 +37,4 @@ endpoints:
sql: select id from products;
index:
primary_key:
- id
- id
1 change: 1 addition & 0 deletions config/tests/test.debezium.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: dozer-kafka-test
version: 1

api:
rest:
Expand Down
3 changes: 2 additions & 1 deletion config/tests/test.postgres.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: postgres-test
version: 1

api:
rest:
Expand Down Expand Up @@ -38,4 +39,4 @@ endpoints:
sql: select id from products_test;
index:
primary_key:
- id
- id
1 change: 1 addition & 0 deletions config/tests/test.snowflake.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: dozer-snowflake-test
version: 1

api:
rest:
Expand Down
5 changes: 4 additions & 1 deletion dozer-cli/src/cli/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ pub fn generate_config_repl() -> Result<(), OrchestrationError> {
let mut rl = Editor::<InitHelper, DefaultHistory>::new()
.map_err(|e| OrchestrationError::CliError(CliError::ReadlineError(e)))?;
rl.set_helper(Some(InitHelper {}));
let mut default_config = Config::default();
let mut default_config = Config {
version: 1,
..Default::default()
};
let default_app_name = "quick-start-app";
let questions: Vec<Question> = vec![
(
Expand Down
5 changes: 5 additions & 0 deletions dozer-cli/src/cli/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ fn test_yml_content_full() -> (&'static str, Config) {
});
let config = Config {
app_name: "dozer-config-sample".to_owned(),
version: 1,
home_dir: DEFAULT_HOME_DIR.to_owned(),
api: Some(api_config),
connections: vec![test_connection],
Expand All @@ -35,6 +36,7 @@ fn test_yml_content_full() -> (&'static str, Config) {
(
r#"
app_name: dozer-config-sample
version: 1,
home_dir: './.dozer'
api:
rest:
Expand Down Expand Up @@ -91,6 +93,7 @@ fn test_yml_content_full() -> (&'static str, Config) {
fn test_yml_content_missing_api_config() -> &'static str {
r#"
app_name: dozer-config-sample
version: 1,
connections:
- db_type: Postgres
authentication: !Postgres
Expand Down Expand Up @@ -121,6 +124,7 @@ fn test_yml_content_missing_api_config() -> &'static str {
fn test_yml_content_missing_internal_config() -> &'static str {
r#"
app_name: dozer-config-sample
version: 1,
api:
rest:
port: 8080
Expand Down Expand Up @@ -228,6 +232,7 @@ fn test_config() -> Config {
let api_config = test_api_config();
Config {
app_name: "dozer-config-sample".to_owned(),
version: 1,
home_dir: DEFAULT_HOME_DIR.to_owned(),
api: Some(api_config),
connections: vec![test_connection],
Expand Down
2 changes: 2 additions & 0 deletions dozer-cli/src/live/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,10 @@ fn get_contract(dozer_and_contract: &Option<DozerAndContract>) -> Result<&Contra

async fn create_contract(dozer: SimpleOrchestrator) -> Result<Contract, OrchestrationError> {
let dag = create_dag(&dozer).await?;
let version = dozer.config.version;
let schemas = DagSchemas::new(dag)?;
let contract = Contract::new(
version as usize,
&schemas,
&dozer.config.endpoints,
// We don't care about API generation options here. They are handled in `run_all`.
Expand Down
1 change: 1 addition & 0 deletions dozer-cli/src/pipeline/tests/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fn get_default_config() -> Config {

Config {
app_name: "multi".to_string(),
version: 1,
api: Default::default(),
flags: Default::default(),
connections: vec![grpc_conn.clone()],
Expand Down
39 changes: 7 additions & 32 deletions dozer-cli/src/simple/build/contract/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ pub struct EdgeType {

pub type PipelineContract = daggy::Dag<NodeType, EdgeType>;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(crate = "dozer_types::serde")]
pub struct Contract {
pub version: usize,
pub pipeline: PipelineContract,
pub endpoints: BTreeMap<String, EndpointSchema>,
}

impl Contract {
pub fn new(
version: usize,
dag_schemas: &DagSchemas,
endpoints: &[ApiEndpoint],
enable_token: bool,
Expand Down Expand Up @@ -111,47 +114,19 @@ impl Contract {
);

Ok(Self {
version,
pipeline,
endpoints: endpoint_schemas,
})
}

pub fn serialize(&self, build_path: &BuildPath) -> Result<(), BuildError> {
serde_json_to_path(&build_path.dag_path, &self.pipeline)?;

for (endpoint_name, schema) in &self.endpoints {
let endpoint_path = build_path.get_endpoint_path(endpoint_name);
serde_json_to_path(&endpoint_path.schema_path, schema)?;
}

serde_json_to_path(&build_path.dag_path, &self)?;
Ok(())
}

pub fn deserialize(build_path: &BuildPath) -> Result<Self, BuildError> {
let pipeline: daggy::Dag<NodeType, EdgeType> = serde_json_from_path(&build_path.dag_path)?;

let mut endpoints = BTreeMap::new();
for (node_index, node) in pipeline.node_references() {
// Endpoint must have zero out degree.
if pipeline
.edges_directed(node_index, Direction::Outgoing)
.count()
> 0
{
continue;
}

// `NodeHandle::id` is the endpoint name.
let endpoint_name = node.handle.id.clone();
let endpoint_path = build_path.get_endpoint_path(&endpoint_name);
let schema: EndpointSchema = serde_json_from_path(&endpoint_path.schema_path)?;
endpoints.insert(endpoint_name, schema);
}

Ok(Self {
pipeline,
endpoints,
})
serde_json_from_path(&build_path.dag_path)
}
}

Expand Down
14 changes: 10 additions & 4 deletions dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use dozer_types::indicatif::MultiProgress;

use dozer_types::models::connection::Connection;

use crate::errors::OrchestrationError;
use crate::errors::{BuildError, OrchestrationError};

use super::Contract;

pub struct Executor<'a> {
connections: &'a [Connection],
Expand Down Expand Up @@ -123,9 +125,13 @@ async fn create_log_endpoint(
) -> Result<LogEndpoint, OrchestrationError> {
let endpoint_path = build_path.get_endpoint_path(endpoint_name);

let schema_string = tokio::fs::read_to_string(&endpoint_path.schema_path)
.await
.map_err(|e| OrchestrationError::FileSystem(endpoint_path.schema_path.into(), e))?;
let contract = Contract::deserialize(build_path)?;
let schema = contract
.endpoints
.get(endpoint_name)
.ok_or_else(|| BuildError::MissingEndpoint(endpoint_name.to_owned()))?;
let schema_string =
dozer_types::serde_json::to_string(schema).map_err(BuildError::SerdeJson)?;

let descriptor_bytes = tokio::fs::read(&build_path.descriptor_path)
.await
Expand Down
3 changes: 3 additions & 0 deletions dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,10 @@ impl SimpleOrchestrator {
.as_ref()
.map(|flags| flags.push_events)
.unwrap_or_else(default_push_events);
let version = self.config.version as usize;

let contract = build::Contract::new(
version,
&dag_schemas,
&self.config.endpoints,
enable_token,
Expand Down
7 changes: 6 additions & 1 deletion dozer-cli/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ fn test_sql_merge_in_config() {
let yaml = format!(
r#"
app_name: dozer-config-sample
version: 1
sql:
{}
"#,
Expand All @@ -31,7 +32,10 @@ fn test_sql_merge_in_config() {
fn test_sql_from_single_sql_source_in_config() {
let query = "select * from table_b";

let yaml = r#"app_name: dozer-config-sample"#;
let yaml = r#"
app_name: dozer-config-sample
version: 1
"#;

let mut combined_yaml = serde_yaml::Value::Mapping(Mapping::new());

Expand All @@ -50,6 +54,7 @@ fn test_sql_from_single_yaml_source_in_config() {
let yaml = format!(
r#"
app_name: dozer-config-sample
version: 1
sql:
{}
"#,
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/src/tests/cases/postgres/dozer-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: 1-hypercharge-postgres-sample
version: 1
connections:
- config: !Postgres
user: postgres
Expand Down
3 changes: 2 additions & 1 deletion dozer-ingestion/src/tests/cases/snowflake/dozer-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: snowflake-test
version: 1
connections:
- config: !Snowflake
server: "{{SN_SERVER}}"
Expand Down Expand Up @@ -28,4 +29,4 @@ endpoints:
table_name: customers_data
index:
primary_key:
- C_CUSTKEY
- C_CUSTKEY
2 changes: 2 additions & 0 deletions dozer-ingestion/tests/test_suite/connectors/dozer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ async fn create_nested_dozer_server(
let cache_dir = dozer_dir.join("cache");
std::fs::create_dir_all(&cache_dir).unwrap();
let config = dozer_types::models::config::Config {
version: 1,
app_name: "nested-dozer-connector-test".to_owned(),
home_dir: dozer_dir.to_str().unwrap().to_owned(),
cache_dir: cache_dir.to_str().unwrap().to_owned(),
Expand Down Expand Up @@ -288,6 +289,7 @@ impl Drop for DozerConnectorTest {

static DOZER_CONFIG: &str = r#"
app_name: dozer-nested
version: 1
connections:
- config: !Grpc
schemas: !Path ./schema.json
Expand Down
3 changes: 0 additions & 3 deletions dozer-log/src/home_dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,9 @@ pub struct BuildPath {

impl BuildPath {
pub fn get_endpoint_path(&self, endpoint_name: &str) -> EndpointPath {
let schema_path = self.contracts_dir.join(format!("{}.json", endpoint_name));
let log_dir_relative_to_data_dir = self.log_dir_relative_to_data_dir.join(endpoint_name);
EndpointPath {
build_id: self.id.clone(),
schema_path,
log_dir_relative_to_data_dir,
}
}
Expand All @@ -193,6 +191,5 @@ impl BuildPath {
#[derive(Debug, Clone)]
pub struct EndpointPath {
pub build_id: BuildId,
pub schema_path: Utf8PathBuf,
pub log_dir_relative_to_data_dir: Utf8PathBuf,
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: dozer-samples-connectors-local-storage
version: 1

connections:
- config : !LocalStorage
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: dozer-samples-connectors-postgres
version: 1

connections:
- name: pagila_conn
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: working_app
version: 1
connections:
- config: !Ethereum
provider: !Log
Expand Down
1 change: 1 addition & 0 deletions dozer-tests/src/e2e_tests/cases/eth-logs/dozer-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: working_app
version: 1
connections:
- config: !Ethereum
provider: !Log
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: working_app
version: 1
connections:
- config: !Ethereum
provider: !Log
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: dozer-ingest-users
version: 1
connections:
- config: !Grpc
schemas: !Inline |
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: dozer-eth-dashboard
version: 1
connections:
- config: !Ethereum
provider: !Trace
Expand Down
1 change: 1 addition & 0 deletions dozer-tests/src/e2e_tests/cases/mongodb/dozer-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: mongodb-e2e-test
version: 1
connections:
- config: !MongoDB
connection_string: mongodb://localhost/sample_mflix
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: chinook-mysql
version: 1

connections:
- name: chinook_mysql
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: pg_nested_aggregation
version: 1
connections:
- config: !Postgres
user: postgres
Expand Down
1 change: 1 addition & 0 deletions dozer-tests/src/e2e_tests/cases/postgres/dozer-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: postgres-e2e-test
version: 1
connections:
- config: !Postgres
user: postgres
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: sql-aggregations
version: 1

connections:
- config : !LocalStorage
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: sql-join-sample
version: 1

connections:
- config : !LocalStorage
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: sql-window-functions-sample
version: 1

connections:
- config : !LocalStorage
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
app_name: flight-microservices
version: 1
cache_max_map_size: 2147483648
connections:
- config: !Postgres
Expand Down Expand Up @@ -118,4 +119,4 @@ endpoints:
index:
primary_key:
- flight_no
- days_of_week
- days_of_week
Loading

0 comments on commit 258d1c9

Please sign in to comment.