Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #240 from alexmay48/alexmay48/new-workflow-metadat…
Browse files Browse the repository at this point in the history
…a-endpoint

Add new workflow metadata endpoint
  • Loading branch information
v1r3n authored Jul 23, 2023
2 parents c0fe98c + b298484 commit 9871961
Show file tree
Hide file tree
Showing 16 changed files with 133 additions and 22 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ lib/
build/
*/build/

# asdf tool versions
.tool-versions

2 changes: 1 addition & 1 deletion dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Common place to define all the version dependencies
*/
ext {
revConductor = '3.13.7'
revConductor = '3.14.0-SNAPSHOT'
revActivation = '2.0.0'
revAmqpClient = '5.13.0'
revAwaitility = '3.1.6'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class AMQPConstants {

/** this when set will create a rabbitmq queue */
public static String AMQP_QUEUE_TYPE = "amqp_queue";

/** this when set will create a rabbitmq exchange */
public static String AMQP_EXCHANGE_TYPE = "amqp_exchange";

Expand Down Expand Up @@ -53,19 +54,22 @@ public class AMQPConstants {
* <p>{@see <a href="https://www.rabbitmq.com/queues.html">RabbitMQ</a>}.
*/
public static boolean DEFAULT_AUTO_DELETE = false;

/**
* default rabbitmq delivery mode This is a property of the message When set to 1 the will be
* non persistent and 2 will be persistent {@see <a
* href="https://www.rabbitmq.com/releases/rabbitmq-java-client/v3.5.4/rabbitmq-java-client-javadoc-3.5.4/com/rabbitmq/client/MessageProperties.html>
* Message Properties</a>}.
*/
public static int DEFAULT_DELIVERY_MODE = 2;

/**
* default rabbitmq delivery mode This is a property of the channel limit to get the number of
* unacknowledged messages. {@see <a
* href="https://www.rabbitmq.com/consumer-prefetch.html>Consumer Prefetch</a>}.
*/
public static int DEFAULT_BATCH_SIZE = 1;

/**
* default rabbitmq delivery mode This is a property of the amqp implementation which sets teh
* polling time to drain the in-memory queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public String getExchangeBoundQueueName() {
return exchangeBoundQueueName;
}


public String getExchangeType() {
return exchangeType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,12 @@ void runObserve(
}

@Test
public void testGetMessagesFromExistingExchangeWithDefaultConfiguration() throws IOException, TimeoutException {
public void testGetMessagesFromExistingExchangeWithDefaultConfiguration()
throws IOException, TimeoutException {
// Mock channel and connection
Channel channel = mockBaseChannel();
Connection connection = mockGoodConnection(channel);
testGetMessagesFromExchangeAndDefaultConfiguration(
channel, connection, true, true);
testGetMessagesFromExchangeAndDefaultConfiguration(channel, connection, true, true);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
@ConfigurationProperties("conductor.event-queues.jsm")
public class JetStreamProperties {
private String listenerQueuePrefix = "";

/** The durable subscriber name for the subscription */
private String durableName = "defaultQueue";

private String streamStorageType = "file";

/** The NATS connection url */
private String url = Options.DEFAULT_URL;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
*/
package com.netflix.conductor.postgres.config;

import com.netflix.conductor.core.utils.IDGenerator;
import java.util.Map;

import javax.annotation.PostConstruct;
Expand All @@ -27,6 +26,7 @@
import org.springframework.context.annotation.Import;

import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.postgres.storage.PostgresPayloadStorage;

@Configuration(proxyBeanMethods = false)
Expand Down Expand Up @@ -76,6 +76,7 @@ public Flyway flywayForExternalDb() {
@DependsOn({"flywayForExternalDb"})
public ExternalPayloadStorage postgresExternalPayloadStorage(
PostgresPayloadProperties properties) {
return new PostgresPayloadStorage(properties, dataSource, idGenerator, DEFAULT_MESSAGE_TO_USER);
return new PostgresPayloadStorage(
properties, dataSource, idGenerator, DEFAULT_MESSAGE_TO_USER);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -12,7 +11,6 @@
*/
package com.netflix.conductor.postgres.storage;

import com.netflix.conductor.core.exception.NonTransientException;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.sql.Connection;
Expand All @@ -30,6 +28,7 @@

import com.netflix.conductor.common.run.ExternalStorageLocation;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.core.exception.NonTransientException;
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.postgres.config.PostgresPayloadProperties;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -12,7 +11,6 @@
*/
package com.netflix.conductor.postgres.storage;

import com.netflix.conductor.core.utils.IDGenerator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -40,6 +38,7 @@

import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.core.utils.IDGenerator;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
Expand Down Expand Up @@ -75,7 +74,8 @@ public void setup() {
new PostgresPayloadStorage(
testPostgres.getTestProperties(),
testPostgres.getDataSource(),
new IDGenerator(), errorMessage);
new IDGenerator(),
errorMessage);
}

@Test
Expand Down Expand Up @@ -268,22 +268,22 @@ private String getKey(String input) {

private void assertCount(int expected) throws SQLException {
try (PreparedStatement stmt =
testPostgres
.getDataSource()
.getConnection()
.prepareStatement(
"SELECT count(id) FROM external.external_payload");
ResultSet rs = stmt.executeQuery()) {
testPostgres
.getDataSource()
.getConnection()
.prepareStatement(
"SELECT count(id) FROM external.external_payload");
ResultSet rs = stmt.executeQuery()) {
rs.next();
assertEquals(expected, rs.getInt(1));
}
}

private String getCreatedOn(String key) throws SQLException {
try (Connection conn = testPostgres.getDataSource().getConnection();
PreparedStatement stmt =
conn.prepareStatement(
"SELECT created_on FROM external.external_payload WHERE id = ?")) {
PreparedStatement stmt =
conn.prepareStatement(
"SELECT created_on FROM external.external_payload WHERE id = ?")) {
stmt.setString(1, key);
try (ResultSet rs = stmt.executeQuery()) {
rs.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public RestClientBuilder restClientBuilder(ElasticSearchProperties properties) {
return builder;
}

@Primary // If you are including this project, it's assumed you want ES to be your indexing mechanism
@Primary // If you are including this project, it's assumed you want ES to be your indexing
// mechanism
@Bean
public IndexDAO es7IndexDAO(
RestClientBuilder restClientBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ private void addIndex(String index, final String mappingFilename) throws IOExcep
logger.info("Index '{}' already exists", index);
}
}

/**
* Adds an index to elasticsearch if it does not exist.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ private String readNumber(InputStream is) throws Exception {
String numValue = sb.toString().trim();
return numValue;
}

/**
* Reads an escaped string
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ public List<WorkflowDef> getAllWorkflowDefs() {
GET_ALL_WORKFLOW_DEF_QUERY, q -> q.executeAndFetch(WorkflowDef.class));
}

@Override
public List<WorkflowDef> getAllWorkflowDefsLatestVersions() {
final String GET_ALL_WORKFLOW_DEF_LATEST_VERSIONS_QUERY =
"SELECT json_data FROM meta_workflow_def wd WHERE wd.version = (SELECT MAX(version) FROM meta_workflow_def wd2 WHERE wd2.name = wd.name)";
return queryWithTransaction(
GET_ALL_WORKFLOW_DEF_LATEST_VERSIONS_QUERY,
q -> q.executeAndFetch(WorkflowDef.class));
}

public List<WorkflowDef> getAllLatest() {
final String GET_ALL_LATEST_WORKFLOW_DEF_QUERY =
"SELECT json_data FROM meta_workflow_def WHERE version = " + "latest_version";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.builder.EqualsBuilder;
Expand Down Expand Up @@ -277,4 +279,43 @@ public void testEventHandlers() {
assertNotNull(byEvents);
assertEquals(1, byEvents.size());
}

@Test
public void testGetAllWorkflowDefsLatestVersions() {
WorkflowDef def = new WorkflowDef();
def.setName("test1");
def.setVersion(1);
def.setDescription("description");
def.setCreatedBy("unit_test");
def.setCreateTime(1L);
def.setOwnerApp("ownerApp");
def.setUpdatedBy("unit_test2");
def.setUpdateTime(2L);
metadataDAO.createWorkflowDef(def);

def.setName("test2");
metadataDAO.createWorkflowDef(def);
def.setVersion(2);
metadataDAO.createWorkflowDef(def);

def.setName("test3");
def.setVersion(1);
metadataDAO.createWorkflowDef(def);
def.setVersion(2);
metadataDAO.createWorkflowDef(def);
def.setVersion(3);
metadataDAO.createWorkflowDef(def);

// Placed the values in a map because they might not be stored in order of defName.
// To test, needed to confirm that the versions are correct for the definitions.
Map<String, WorkflowDef> allMap =
metadataDAO.getAllWorkflowDefsLatestVersions().stream()
.collect(Collectors.toMap(WorkflowDef::getName, Function.identity()));

assertNotNull(allMap);
assertEquals(3, allMap.size());
assertEquals(1, allMap.get("test1").getVersion());
assertEquals(2, allMap.get("test2").getVersion());
assertEquals(3, allMap.get("test3").getVersion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,15 @@ public List<WorkflowDef> getAllWorkflowDefs() {
GET_ALL_WORKFLOW_DEF_QUERY, q -> q.executeAndFetch(WorkflowDef.class));
}

@Override
public List<WorkflowDef> getAllWorkflowDefsLatestVersions() {
final String GET_ALL_WORKFLOW_DEF_LATEST_VERSIONS_QUERY =
"SELECT json_data FROM meta_workflow_def wd WHERE wd.version = (SELECT MAX(version) FROM meta_workflow_def wd2 WHERE wd2.name = wd.name)";
return queryWithTransaction(
GET_ALL_WORKFLOW_DEF_LATEST_VERSIONS_QUERY,
q -> q.executeAndFetch(WorkflowDef.class));
}

public List<WorkflowDef> getAllLatest() {
final String GET_ALL_LATEST_WORKFLOW_DEF_QUERY =
"SELECT json_data FROM meta_workflow_def WHERE version = " + "latest_version";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.builder.EqualsBuilder;
Expand Down Expand Up @@ -280,4 +282,43 @@ public void testEventHandlers() {
assertNotNull(byEvents);
assertEquals(1, byEvents.size());
}

@Test
public void testGetAllWorkflowDefsLatestVersions() {
WorkflowDef def = new WorkflowDef();
def.setName("test1");
def.setVersion(1);
def.setDescription("description");
def.setCreatedBy("unit_test");
def.setCreateTime(1L);
def.setOwnerApp("ownerApp");
def.setUpdatedBy("unit_test2");
def.setUpdateTime(2L);
metadataDAO.createWorkflowDef(def);

def.setName("test2");
metadataDAO.createWorkflowDef(def);
def.setVersion(2);
metadataDAO.createWorkflowDef(def);

def.setName("test3");
def.setVersion(1);
metadataDAO.createWorkflowDef(def);
def.setVersion(2);
metadataDAO.createWorkflowDef(def);
def.setVersion(3);
metadataDAO.createWorkflowDef(def);

// Placed the values in a map because they might not be stored in order of defName.
// To test, needed to confirm that the versions are correct for the definitions.
Map<String, WorkflowDef> allMap =
metadataDAO.getAllWorkflowDefsLatestVersions().stream()
.collect(Collectors.toMap(WorkflowDef::getName, Function.identity()));

assertNotNull(allMap);
assertEquals(3, allMap.size());
assertEquals(1, allMap.get("test1").getVersion());
assertEquals(2, allMap.get("test2").getVersion());
assertEquals(3, allMap.get("test3").getVersion());
}
}

0 comments on commit 9871961

Please sign in to comment.