Skip to content
This repository has been archived by the owner on Jan 27, 2020. It is now read-only.

Build your own SPQR Operator (direct response)

Christian Kreutzfeldt edited this page Apr 8, 2015 · 5 revisions

This tutorial will show you how to implement, annotated and deploy your own direct response operator component. The snippets down below show how to write a simple content filter. You can find the complete listing in our repository.

The simple stuff

Like all component implementations you are requested to provide getter and setter methods for accessing the component identifier and its type. Additionally, each operator must provide a getter for reading the total number of processed messages.

Lifecycle

Each component provides two lifecycle methods that are - according to their name - invoked on component initialization and during its shutdown.

initialize(Properties)

The initialize method does .. what it's name says: it may be used to initialize the component instance. It receives all key/value pairs provided to the component configuration which lives inside a pipeline definition.

In this case we retrieve the paths and expressions to apply on the message content:

public void initialize(Properties properties) throws RequiredInputMissingException, 
     ComponentInitializationFailedException {
		
  for(int i = 1; i < Integer.MAX_VALUE; i++) {
    String expression = properties.getProperty(CFG_FIELD_PREFIX + i + ".expression");
    if(StringUtils.isBlank(expression))
      break;
			
    String path = properties.getProperty(CFG_FIELD_PREFIX + i + ".path");
    String valueType = properties.getProperty(CFG_FIELD_PREFIX + i + ".type");
			
    this.fields.add(new JsonContentFilterFieldSetting(
        path.split("\\."), expression, 
        StringUtils.equalsIgnoreCase("STRING", valueType) ? 
            JsonContentType.STRING : JsonContentType.NUMERICAL));
  }
		
  if(logger.isDebugEnabled())
    logger.debug("json content filter [id="+id+"] initialized");		
}

shutdown

The shutdown method is invoked by the surrounding micro pipeline. Typically this happens when the micro pipeline itself is shut down. But in case the micro pipeline tries to handle error situations on its own it may shut down and restart selected components on its own.

When the shutdown method get triggered it must ensure that all consumed resources are freed and notified about the shutdown.

In the case of our content filter there are no resources to free and thus the shutdown method looks as follows

public boolean shutdown() {
  return true;
}

Message processing

The operator is invoked for each incoming message which must be evaluated against the configured expressions for specific paths. To fetch the content for given paths, a retrieval method is required.

Content retrieval

In order to evaluate content against a pre-assigned expression, the content must be extracted from the JSON object representation. The following method will do the job:

protected String getTextFieldValue(final JsonNode jsonNode, final String[] fieldPath) {
  int fieldAccessStep = 0;
  JsonNode contentNode = jsonNode;
  while(fieldAccessStep < fieldPath.length) {
    contentNode = contentNode.get(fieldPath[fieldAccessStep]);
    fieldAccessStep++;
  }	
  return contentNode.textValue();
}

Content evaluation

Each incoming message is handed over to onMessage which evaluates the content against the previously provided pattern for all provided paths:

public StreamingDataMessage[] onMessage(StreamingDataMessage message) {
		
  // increment number of messages processed so far 
  this.totalNumOfMessages++; 
		
  // do nothing if either the event or the body is empty
  if(message == null || message.getBody() == null || message.getBody().length < 1)
    return EMPTY_MESSAGES_ARRAY;
		
  JsonNode jsonNode = null;
  try {
    jsonNode = jsonMapper.readTree(message.getBody());
  } catch(IOException e) {
    logger.error("Failed to read message body to json node. Ignoring message. Error: " + e.getMessage());
  }
		
  // return null in case the message could not be parsed into 
  // an object representation - the underlying processor does
  // not forward any NULL messages
  if(jsonNode == null)
    return EMPTY_MESSAGES_ARRAY;

  // step through fields considered to be relevant, extract values and apply filtering function
  for(final JsonContentFilterFieldSetting fieldSettings : fields) {
    // read value into string representation for further investigation
    String value = getTextFieldValue(jsonNode, fieldSettings.getPath());
		
    if(!fieldSettings.getExpression().matcher(StringUtils.trim(value)).matches())
      return EMPTY_MESSAGES_ARRAY;
  }
		
  return new StreamingDataMessage[]{message};
}
Clone this wiki locally