Skip to content

Commit

Permalink
Merge pull request #162 from conglisc/master
Browse files Browse the repository at this point in the history
Bring in changes on data governance for messaging toolkit
  • Loading branch information
conglisc committed Nov 20, 2015
2 parents dc3c612 + 298c93c commit 6343d19
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.ibm.streamsx.messaging.common;

import java.util.HashMap;
import java.util.Map;

import com.ibm.streams.operator.AbstractOperator;

public class DataGovernanceUtil {

public static void registerForDataGovernance(AbstractOperator operator, String assetName, String assetType, String parentAssetName, String parentAssetType, boolean isInput, String operatorType) {
Map<String, String> properties = new HashMap<String, String>();
if(isInput) {
properties.put(IGovernanceConstants.TAG_REGISTER_TYPE, IGovernanceConstants.TAG_REGISTER_TYPE_INPUT);
properties.put(IGovernanceConstants.PROPERTY_INPUT_OPERATOR_TYPE, operatorType);
} else {
properties.put(IGovernanceConstants.TAG_REGISTER_TYPE, IGovernanceConstants.TAG_REGISTER_TYPE_OUTPUT);
properties.put(IGovernanceConstants.PROPERTY_OUTPUT_OPERATOR_TYPE, operatorType);
}
properties.put(IGovernanceConstants.PROPERTY_SRC_NAME, assetName);
properties.put(IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_STREAMS_PREFIX + assetType);
if(parentAssetName != null) {
properties.put(IGovernanceConstants.PROPERTY_SRC_PARENT_PREFIX, IGovernanceConstants.PROPERTY_PARENT_PREFIX);
properties.put(IGovernanceConstants.PROPERTY_PARENT_PREFIX + IGovernanceConstants.PROPERTY_SRC_NAME, parentAssetName);
properties.put(IGovernanceConstants.PROPERTY_PARENT_PREFIX + IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_STREAMS_PREFIX + parentAssetType);
properties.put(IGovernanceConstants.PROPERTY_PARENT_PREFIX + IGovernanceConstants.PROPERTY_PARENT_TYPE, "$" + parentAssetType);
}

operator.setTagData(IGovernanceConstants.TAG_OPERATOR_IGC, properties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.ibm.streamsx.messaging.common;

public interface IGovernanceConstants {
public static final String TAG_OPERATOR_IGC = "OperatorIGC";
public static final String TAG_REGISTER_TYPE = "registerType";
public static final String TAG_REGISTER_TYPE_INPUT = "input";
public static final String TAG_REGISTER_TYPE_OUTPUT = "output";

public static final String ASSET_STREAMS_PREFIX = "$Streams-";

public static final String ASSET_JMS_SERVER_TYPE = "JMSServer";

public static final String ASSET_JMS_MESSAGE_TYPE = "JMS";

public static final String ASSET_KAFKA_TOPIC_TYPE = "KafkaTopic";

public static final String ASSET_MQTT_TOPIC_TYPE = "MQTT";
public static final String ASSET_MQTT_SERVER_TYPE = "MQServer";

public static final String PROPERTY_SRC_NAME = "srcName";
public static final String PROPERTY_SRC_TYPE = "srcType";

public static final String PROPERTY_SRC_PARENT_PREFIX = "srcParent";
public static final String PROPERTY_PARENT_TYPE = "parentType";

public static final String PROPERTY_PARENT_PREFIX = "p1";

public static final String PROPERTY_INPUT_OPERATOR_TYPE = "inputOperatorType";
public static final String PROPERTY_OUTPUT_OPERATOR_TYPE = "outputOperatorType";

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
import com.ibm.streams.operator.compile.OperatorContextChecker;
import com.ibm.streams.operator.logging.LogLevel;
import com.ibm.streams.operator.logging.LoggerNames;
import com.ibm.streams.operator.logging.TraceLevel;
import com.ibm.streams.operator.metrics.Metric;
import com.ibm.streams.operator.model.CustomMetric;
import com.ibm.streams.operator.model.Parameter;
import com.ibm.streams.operator.state.Checkpoint;
import com.ibm.streams.operator.state.ConsistentRegionContext;
import com.ibm.streams.operator.state.StateHandler;
import com.ibm.streamsx.messaging.common.DataGovernanceUtil;
import com.ibm.streamsx.messaging.common.IGovernanceConstants;


//The JMSSink operator publishes data from Streams to a JMS Provider queue or a topic.
Expand Down Expand Up @@ -553,6 +556,16 @@ public synchronized void initialize(OperatorContext context)
throw new RuntimeException("No valid message class is specified.");
}

// register for data governance
registerForDataGovernance(connectionDocumentParser.getProviderURL(), connectionDocumentParser.getDestination());

}

private void registerForDataGovernance(String providerURL, String destination) {
logger.log(TraceLevel.INFO, "JMSSink - Registering for data governance with providerURL: " + providerURL
+ " destination: " + destination);
DataGovernanceUtil.registerForDataGovernance(this, destination, IGovernanceConstants.ASSET_JMS_MESSAGE_TYPE,
providerURL, IGovernanceConstants.ASSET_JMS_SERVER_TYPE, false, "JMSSink");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@

import com.ibm.streams.operator.OperatorContext;
import com.ibm.streams.operator.OperatorContext.ContextCheck;
import com.ibm.streams.operator.Type.MetaType;
import com.ibm.streams.operator.OutputTuple;
import com.ibm.streams.operator.StreamSchema;
import com.ibm.streams.operator.StreamingOutput;
import com.ibm.streams.operator.Type;
import com.ibm.streams.operator.Type.MetaType;
import com.ibm.streams.operator.compile.OperatorContextChecker;
import com.ibm.streams.operator.logging.LogLevel;
import com.ibm.streams.operator.logging.LoggerNames;
import com.ibm.streams.operator.logging.TraceLevel;
import com.ibm.streams.operator.metrics.Metric;
import com.ibm.streams.operator.model.CustomMetric;
import com.ibm.streams.operator.model.Parameter;
Expand All @@ -38,6 +39,8 @@
import com.ibm.streams.operator.state.ConsistentRegionContext;
import com.ibm.streams.operator.state.StateHandler;
import com.ibm.streams.operator.types.RString;
import com.ibm.streamsx.messaging.common.DataGovernanceUtil;
import com.ibm.streamsx.messaging.common.IGovernanceConstants;

//The JMSSource operator converts a message JMS queue or topic to stream
public class JMSSource extends ProcessTupleProducer implements StateHandler{
Expand Down Expand Up @@ -551,6 +554,17 @@ public synchronized void initialize(OperatorContext context)
default:
throw new RuntimeException("No valid message class is specified.");
}

// register for data governance
registerForDataGovernance(connectionDocumentParser.getProviderURL(), connectionDocumentParser.getDestination());

}

private void registerForDataGovernance(String providerURL, String destination) {
logger.log(TraceLevel.INFO, "JMSSource - Registering for data governance with providerURL: " + providerURL
+ " destination: " + destination);
DataGovernanceUtil.registerForDataGovernance(this, destination, IGovernanceConstants.ASSET_JMS_MESSAGE_TYPE,
providerURL, IGovernanceConstants.ASSET_JMS_SERVER_TYPE, true, "JMSSource");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.ibm.streams.operator.model.Parameter;
import com.ibm.streams.operator.model.PrimitiveOperator;
import com.ibm.streams.operator.state.ConsistentRegionContext;
import com.ibm.streamsx.messaging.common.DataGovernanceUtil;
import com.ibm.streamsx.messaging.common.IGovernanceConstants;

@InputPorts(@InputPortSet(cardinality=1, optional=false,
description="The tuples arriving on this port are expected to contain three attributes \\\"key\\\", \\\"topic\\\" and \\\"message\\\". " +
Expand Down Expand Up @@ -81,6 +83,24 @@ public void initialize(OperatorContext context)
//TODO: check for minimum properties
trace.log(TraceLevel.INFO, "Initializing producer");
client.initProducer();

// register for data governance
// only register user specified topic in param
registerForDataGovernance();
}

private void registerForDataGovernance() {
trace.log(TraceLevel.INFO, "KafkaSink -- Registering for data governance");

if (!topics.isEmpty()) {
for (String topic : topics) {
trace.log(TraceLevel.INFO, OPER_NAME + " -- data governance - topic to register: " + topic);
DataGovernanceUtil.registerForDataGovernance(this, topic, IGovernanceConstants.ASSET_KAFKA_TOPIC_TYPE,
null, null, false, "KafkaSink");
}
} else {
trace.log(TraceLevel.INFO, "KafkaSink -- Registering for data governance -- topics is empty");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@


import java.io.IOException;
import java.util.List;
import java.util.logging.Logger;

import com.ibm.streams.operator.OperatorContext;
import java.util.List;
import java.util.logging.Logger;

import com.ibm.streams.operator.OperatorContext;
import com.ibm.streams.operator.OperatorContext.ContextCheck;
import com.ibm.streams.operator.compile.OperatorContextChecker;
import com.ibm.streams.operator.logging.TraceLevel;
import com.ibm.streams.operator.logging.TraceLevel;
import com.ibm.streams.operator.model.Icons;
import com.ibm.streams.operator.model.OutputPortSet;
import com.ibm.streams.operator.model.OutputPorts;
import com.ibm.streams.operator.model.Parameter;
import com.ibm.streams.operator.model.PrimitiveOperator;
import com.ibm.streams.operator.model.OutputPortSet;
import com.ibm.streams.operator.model.OutputPorts;
import com.ibm.streams.operator.model.Parameter;
import com.ibm.streams.operator.model.PrimitiveOperator;
import com.ibm.streams.operator.state.Checkpoint;
import com.ibm.streams.operator.state.ConsistentRegionContext;
import com.ibm.streams.operator.state.StateHandler;
import com.ibm.streamsx.messaging.common.DataGovernanceUtil;
import com.ibm.streamsx.messaging.common.IGovernanceConstants;

@OutputPorts(@OutputPortSet(cardinality=1, optional=false,
description="Messages received from Kafka are sent on this output port."))
Expand Down Expand Up @@ -100,6 +102,23 @@ public void initialize(OperatorContext context)
}

}

// register for data governance
registerForDataGovernance();

}

private void registerForDataGovernance() {
trace.log(TraceLevel.INFO, "KafkaSource - Registering for data governance");
if (topics != null) {
for (String topic : topics) {
trace.log(TraceLevel.INFO, "KafkaSource - data governance - topic: " + topic);
DataGovernanceUtil.registerForDataGovernance(this, topic, IGovernanceConstants.ASSET_KAFKA_TOPIC_TYPE,
null, null, true, "KafkaSource");
}
} else {
trace.log(TraceLevel.INFO, "KafkaSource - Registering for data governance -- topics is empty");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import com.ibm.streams.operator.state.StateHandler;
import com.ibm.streams.operator.types.Blob;
import com.ibm.streams.operator.types.RString;
import com.ibm.streamsx.messaging.common.DataGovernanceUtil;
import com.ibm.streamsx.messaging.common.IGovernanceConstants;

/**
* Class for an operator that consumes tuples and does not produce an output stream.
Expand Down Expand Up @@ -461,7 +463,44 @@ public synchronized void initialize(OperatorContext context)
initRelaunching(context);
// do not connect here... connection is done on the publish thread when a message
// is ready to be published
}

// register for data governance
// if static topic, then register topic, else only register the server
if (topicAttributeName == null) {
registerForDataGovernance();
} else {
// register the "server" for governance
registerServerForDataGovernance();
}
}

private void registerForDataGovernance() {
String uri = getServerUri();
String topic = getTopics();
TRACE.log(TraceLevel.INFO,
"MQTTSink - Registering for data governance with server uri: " + uri + " and topic: " + topic);

if (topic != null && !topic.isEmpty() && uri != null && !uri.isEmpty()) {
DataGovernanceUtil.registerForDataGovernance(this, topic, IGovernanceConstants.ASSET_MQTT_TOPIC_TYPE, uri,
IGovernanceConstants.ASSET_MQTT_SERVER_TYPE, false, "MQTTSink");
} else {
TRACE.log(TraceLevel.INFO,
"MQTTSink - Registering for data governance -- aborted. topic and/or url is null");
}
}

private void registerServerForDataGovernance() {
String uri = getServerUri();
TRACE.log(TraceLevel.INFO, "MQTTSource - Registering only server for data governance with server uri: " + uri);

if (uri != null && !uri.isEmpty()) {
DataGovernanceUtil.registerForDataGovernance(this, uri, IGovernanceConstants.ASSET_MQTT_SERVER_TYPE, null,
null, false, "MQTTSink");
} else {
TRACE.log(TraceLevel.INFO,
"MQTTSource - Registering only server for data governance -- aborted. uri is null");
}
}

/**
* Notification that initialization is complete and all input and output ports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import com.ibm.streams.operator.state.ConsistentRegionContext;
import com.ibm.streams.operator.types.RString;
import com.ibm.streams.operator.types.ValueFactory;
import com.ibm.streamsx.messaging.common.DataGovernanceUtil;
import com.ibm.streamsx.messaging.common.IGovernanceConstants;
import com.ibm.streamsx.messaging.mqtt.MqttClientRequest.MqttClientRequestType;

/**
Expand Down Expand Up @@ -306,6 +308,9 @@ public synchronized void initialize(OperatorContext context)
mqttWrapper.setClientID(getClientID());
mqttWrapper.setCommandTimeout(getCommandTimeout());
mqttWrapper.setKeepAliveInterval(getKeepAliveInterval());

// register for data governance
registerForDataGovernance();

/*
* Create the thread for producing tuples.
Expand Down Expand Up @@ -356,6 +361,21 @@ public void run() {
clientRequestThread.setDaemon(true);
}

private void registerForDataGovernance() {
String uri = getServerUri();
List<String> topics = getTopics();
TRACE.log(TraceLevel.INFO, "MQTTSource - Registering for data governance with server uri: " + uri + " and topics: " + topics.toArray().toString());

if(topics != null && uri != null && !uri.isEmpty()) {
for (String topic : topics) {
TRACE.log(TraceLevel.INFO, "MQTTSource - Registering for data governance with server uri: " + uri + " and topic: " + topic);
DataGovernanceUtil.registerForDataGovernance(this, topic, IGovernanceConstants.ASSET_MQTT_TOPIC_TYPE, uri, IGovernanceConstants.ASSET_MQTT_SERVER_TYPE, true, "MQTTSource");
}
} else {
TRACE.log(TraceLevel.INFO, "MQTTSource - Registering for data governance -- aborted. topic and/or uri is null");
}
}

protected void handleClientRequests() {
while (!shutdown)
{
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.messaging/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ The &lt;attribute&gt; element has three possible attributes:
* xml
</info:description>
<info:version>3.0.0</info:version>
<info:requiredProductVersion>4.0.0.0</info:requiredProductVersion>
<info:requiredProductVersion>4.1.0.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
<info:resources>
Expand Down

0 comments on commit 6343d19

Please sign in to comment.