Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Add new workflow metadata endpoint #240

Merged
merged 3 commits into from
Jul 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ lib/
build/
*/build/

# asdf tool versions
.tool-versions

2 changes: 1 addition & 1 deletion dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Common place to define all the version dependencies
*/
ext {
revConductor = '3.13.7'
revConductor = '3.14.0-SNAPSHOT'
revActivation = '2.0.0'
revAmqpClient = '5.13.0'
revAwaitility = '3.1.6'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class AMQPConstants {

/** this when set will create a rabbitmq queue */
public static String AMQP_QUEUE_TYPE = "amqp_queue";

/** this when set will create a rabbitmq exchange */
public static String AMQP_EXCHANGE_TYPE = "amqp_exchange";

Expand Down Expand Up @@ -53,19 +54,22 @@ public class AMQPConstants {
* <p>{@see <a href="https://www.rabbitmq.com/queues.html">RabbitMQ</a>}.
*/
public static boolean DEFAULT_AUTO_DELETE = false;

/**
* default rabbitmq delivery mode This is a property of the message When set to 1 the will be
* non persistent and 2 will be persistent {@see <a
* href="https://www.rabbitmq.com/releases/rabbitmq-java-client/v3.5.4/rabbitmq-java-client-javadoc-3.5.4/com/rabbitmq/client/MessageProperties.html>
* Message Properties</a>}.
*/
public static int DEFAULT_DELIVERY_MODE = 2;

/**
* default rabbitmq delivery mode This is a property of the channel limit to get the number of
* unacknowledged messages. {@see <a
* href="https://www.rabbitmq.com/consumer-prefetch.html>Consumer Prefetch</a>}.
*/
public static int DEFAULT_BATCH_SIZE = 1;

/**
* default rabbitmq delivery mode This is a property of the amqp implementation which sets teh
* polling time to drain the in-memory queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public String getExchangeBoundQueueName() {
return exchangeBoundQueueName;
}


public String getExchangeType() {
return exchangeType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,12 @@ void runObserve(
}

@Test
public void testGetMessagesFromExistingExchangeWithDefaultConfiguration() throws IOException, TimeoutException {
public void testGetMessagesFromExistingExchangeWithDefaultConfiguration()
throws IOException, TimeoutException {
// Mock channel and connection
Channel channel = mockBaseChannel();
Connection connection = mockGoodConnection(channel);
testGetMessagesFromExchangeAndDefaultConfiguration(
channel, connection, true, true);
testGetMessagesFromExchangeAndDefaultConfiguration(channel, connection, true, true);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
@ConfigurationProperties("conductor.event-queues.jsm")
public class JetStreamProperties {
private String listenerQueuePrefix = "";

/** The durable subscriber name for the subscription */
private String durableName = "defaultQueue";

private String streamStorageType = "file";

/** The NATS connection url */
private String url = Options.DEFAULT_URL;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
*/
package com.netflix.conductor.postgres.config;

import com.netflix.conductor.core.utils.IDGenerator;
import java.util.Map;

import javax.annotation.PostConstruct;
Expand All @@ -27,6 +26,7 @@
import org.springframework.context.annotation.Import;

import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.postgres.storage.PostgresPayloadStorage;

@Configuration(proxyBeanMethods = false)
Expand Down Expand Up @@ -76,6 +76,7 @@ public Flyway flywayForExternalDb() {
@DependsOn({"flywayForExternalDb"})
public ExternalPayloadStorage postgresExternalPayloadStorage(
PostgresPayloadProperties properties) {
return new PostgresPayloadStorage(properties, dataSource, idGenerator, DEFAULT_MESSAGE_TO_USER);
return new PostgresPayloadStorage(
properties, dataSource, idGenerator, DEFAULT_MESSAGE_TO_USER);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -12,7 +11,6 @@
*/
package com.netflix.conductor.postgres.storage;

import com.netflix.conductor.core.exception.NonTransientException;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.sql.Connection;
Expand All @@ -30,6 +28,7 @@

import com.netflix.conductor.common.run.ExternalStorageLocation;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.core.exception.NonTransientException;
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.postgres.config.PostgresPayloadProperties;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -12,7 +11,6 @@
*/
package com.netflix.conductor.postgres.storage;

import com.netflix.conductor.core.utils.IDGenerator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -40,6 +38,7 @@

import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.core.utils.IDGenerator;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
Expand Down Expand Up @@ -75,7 +74,8 @@ public void setup() {
new PostgresPayloadStorage(
testPostgres.getTestProperties(),
testPostgres.getDataSource(),
new IDGenerator(), errorMessage);
new IDGenerator(),
errorMessage);
}

@Test
Expand Down Expand Up @@ -268,22 +268,22 @@ private String getKey(String input) {

private void assertCount(int expected) throws SQLException {
try (PreparedStatement stmt =
testPostgres
.getDataSource()
.getConnection()
.prepareStatement(
"SELECT count(id) FROM external.external_payload");
ResultSet rs = stmt.executeQuery()) {
testPostgres
.getDataSource()
.getConnection()
.prepareStatement(
"SELECT count(id) FROM external.external_payload");
ResultSet rs = stmt.executeQuery()) {
rs.next();
assertEquals(expected, rs.getInt(1));
}
}

private String getCreatedOn(String key) throws SQLException {
try (Connection conn = testPostgres.getDataSource().getConnection();
PreparedStatement stmt =
conn.prepareStatement(
"SELECT created_on FROM external.external_payload WHERE id = ?")) {
PreparedStatement stmt =
conn.prepareStatement(
"SELECT created_on FROM external.external_payload WHERE id = ?")) {
stmt.setString(1, key);
try (ResultSet rs = stmt.executeQuery()) {
rs.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public RestClientBuilder restClientBuilder(ElasticSearchProperties properties) {
return builder;
}

@Primary // If you are including this project, it's assumed you want ES to be your indexing mechanism
@Primary // If you are including this project, it's assumed you want ES to be your indexing
// mechanism
@Bean
public IndexDAO es7IndexDAO(
RestClientBuilder restClientBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ private void addIndex(String index, final String mappingFilename) throws IOExcep
logger.info("Index '{}' already exists", index);
}
}

/**
* Adds an index to elasticsearch if it does not exist.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ private String readNumber(InputStream is) throws Exception {
String numValue = sb.toString().trim();
return numValue;
}

/**
* Reads an escaped string
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ public List<WorkflowDef> getAllWorkflowDefs() {
GET_ALL_WORKFLOW_DEF_QUERY, q -> q.executeAndFetch(WorkflowDef.class));
}

@Override
public List<WorkflowDef> getAllWorkflowDefsLatestVersions() {
final String GET_ALL_WORKFLOW_DEF_LATEST_VERSIONS_QUERY =
"SELECT json_data FROM meta_workflow_def wd WHERE wd.version = (SELECT MAX(version) FROM meta_workflow_def wd2 WHERE wd2.name = wd.name)";
return queryWithTransaction(
GET_ALL_WORKFLOW_DEF_LATEST_VERSIONS_QUERY,
q -> q.executeAndFetch(WorkflowDef.class));
}

public List<WorkflowDef> getAllLatest() {
final String GET_ALL_LATEST_WORKFLOW_DEF_QUERY =
"SELECT json_data FROM meta_workflow_def WHERE version = " + "latest_version";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.builder.EqualsBuilder;
Expand Down Expand Up @@ -277,4 +279,43 @@ public void testEventHandlers() {
assertNotNull(byEvents);
assertEquals(1, byEvents.size());
}

@Test
public void testGetAllWorkflowDefsLatestVersions() {
WorkflowDef def = new WorkflowDef();
def.setName("test1");
def.setVersion(1);
def.setDescription("description");
def.setCreatedBy("unit_test");
def.setCreateTime(1L);
def.setOwnerApp("ownerApp");
def.setUpdatedBy("unit_test2");
def.setUpdateTime(2L);
metadataDAO.createWorkflowDef(def);

def.setName("test2");
metadataDAO.createWorkflowDef(def);
def.setVersion(2);
metadataDAO.createWorkflowDef(def);

def.setName("test3");
def.setVersion(1);
metadataDAO.createWorkflowDef(def);
def.setVersion(2);
metadataDAO.createWorkflowDef(def);
def.setVersion(3);
metadataDAO.createWorkflowDef(def);

// Placed the values in a map because they might not be stored in order of defName.
// To test, needed to confirm that the versions are correct for the definitions.
Map<String, WorkflowDef> allMap =
metadataDAO.getAllWorkflowDefsLatestVersions().stream()
.collect(Collectors.toMap(WorkflowDef::getName, Function.identity()));

assertNotNull(allMap);
assertEquals(3, allMap.size());
assertEquals(1, allMap.get("test1").getVersion());
assertEquals(2, allMap.get("test2").getVersion());
assertEquals(3, allMap.get("test3").getVersion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,15 @@ public List<WorkflowDef> getAllWorkflowDefs() {
GET_ALL_WORKFLOW_DEF_QUERY, q -> q.executeAndFetch(WorkflowDef.class));
}

@Override
public List<WorkflowDef> getAllWorkflowDefsLatestVersions() {
final String GET_ALL_WORKFLOW_DEF_LATEST_VERSIONS_QUERY =
"SELECT json_data FROM meta_workflow_def wd WHERE wd.version = (SELECT MAX(version) FROM meta_workflow_def wd2 WHERE wd2.name = wd.name)";
return queryWithTransaction(
GET_ALL_WORKFLOW_DEF_LATEST_VERSIONS_QUERY,
q -> q.executeAndFetch(WorkflowDef.class));
}

public List<WorkflowDef> getAllLatest() {
final String GET_ALL_LATEST_WORKFLOW_DEF_QUERY =
"SELECT json_data FROM meta_workflow_def WHERE version = " + "latest_version";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.builder.EqualsBuilder;
Expand Down Expand Up @@ -280,4 +282,43 @@ public void testEventHandlers() {
assertNotNull(byEvents);
assertEquals(1, byEvents.size());
}

@Test
public void testGetAllWorkflowDefsLatestVersions() {
WorkflowDef def = new WorkflowDef();
def.setName("test1");
def.setVersion(1);
def.setDescription("description");
def.setCreatedBy("unit_test");
def.setCreateTime(1L);
def.setOwnerApp("ownerApp");
def.setUpdatedBy("unit_test2");
def.setUpdateTime(2L);
metadataDAO.createWorkflowDef(def);

def.setName("test2");
metadataDAO.createWorkflowDef(def);
def.setVersion(2);
metadataDAO.createWorkflowDef(def);

def.setName("test3");
def.setVersion(1);
metadataDAO.createWorkflowDef(def);
def.setVersion(2);
metadataDAO.createWorkflowDef(def);
def.setVersion(3);
metadataDAO.createWorkflowDef(def);

// Placed the values in a map because they might not be stored in order of defName.
// To test, needed to confirm that the versions are correct for the definitions.
Map<String, WorkflowDef> allMap =
metadataDAO.getAllWorkflowDefsLatestVersions().stream()
.collect(Collectors.toMap(WorkflowDef::getName, Function.identity()));

assertNotNull(allMap);
assertEquals(3, allMap.size());
assertEquals(1, allMap.get("test1").getVersion());
assertEquals(2, allMap.get("test2").getVersion());
assertEquals(3, allMap.get("test3").getVersion());
}
}