Skip to content

Service Processor for trigger like functionality

edwardcapriolo edited this page Jan 22, 2013 · 1 revision

Terminology

  • CREATESERVICEPROCESS: Creates and loads a service processor
  • SERVICEPROCESS: Executes a service processor

Hive level description

A ServiceProcess is a high level API. ServiceProcess has access to Cassandra's internals as well as all of IntraVert's internals. This makes it the most complex interface to utilize and the most powerful. Because of this it is better to use a Filter, Process, or MultiProcess instead of ServiceProcess if possible. A service process can be used to:

  • Perform complex multi-step operations server side
  • Examine and change request objects
  • Interface directly with IntraVert
  • Interface directly with Cassandra (StorageProxy, CFMetaData, etc)

API

package org.usergrid.vx.experimental;
import org.vertx.java.core.Vertx;
/* You asked for it. You are basically a first class IntraOp, and can
 * do anything. With great power comes great responsibility. */
public interface ServiceProcessor {
    public void process(IntraReq req, IntraRes res, IntraState state,
		int i, Vertx vertx, IntraService is);
    }
}

Example

An application wishes to insert data to a user table. However the user table has other related index tables name userbystate and userbylastname. Rather then have application side logic determine how many inserts are required a ServiceProcessor can handle this.

Our user data is below:

Map reqObj = new HashMap();
reqObj.put("userid", "bsmith");
reqObj.put("fname", "bob");
reqObj.put("lname", "smith");
reqObj.put("city", "NYC");

Create ServiceProcess using groovy that will do multiple inserts on the server side.

req.add( Operations.createServiceProcess("buildMySecondary", "groovy",
    // Removing the quoting and escaped newlines for readability.
    "import org.usergrid.vx.experimental.*;
    public class TwoExBuilder implements ServiceProcessor {
		public void process(IntraReq req, IntraRes res, IntraState state, int i,
		Vertx vertx, IntraService is) {
	IntraOp op = req.getE().get(i);
	Map params = (Map) op.getOp().get("params");
	String uid = (String) params.get("userid");
	String fname = (String) params.get("fname");
	String lname = (String) params.get("lname");
	String city = (String) params.get("city");
	
	IntraReq innerReq = new IntraReq();
	innerReq.add( Operations.setColumnFamilyOp("users") );
	innerReq.add( Operations.setOp(uid, "fname", fname) );
	innerReq.add( Operations.setOp(uid, "lname", lname) );
	innerReq.add( Operations.setColumnFamilyOp("usersbycity") );
	innerReq.add( Operations.setOp(city, uid, "") );
	innerReq.add( Operations.setColumnFamilyOp("usersbylast") );
	innerReq.add( Operations.setOp(lname, uid, "") );
	
	IntraRes innerRes = new IntraRes();
	is.executeReq(innerReq, innerRes, state, vertx);
	
	if (innerRes.getException() != null){
		res.setExceptionAndId(innerRes.getException(), i);
	} else {
		res.getOpsRes().put(i, "OK");
	}
} 
    ");

Next we will call the service passing it the request object. (See step 6)

req.add( Operations.setKeyspaceOp("myks") );//1
req.add( Operations.setAutotimestampOp() );//2
req.add( Operations.serviceProcess("buildMySecondary", reqObj) );//6

If successful the column family usersbycity should have a column in NYC rowkey. We can confirm this by reading the data back.

req.add( Operations.setColumnFamilyOp("usersbycity")); //7
req.add( Operations.sliceOp("NYC", "a", "z", 5)); //8
...
List<Map> r = (List<Map>) res.getOpsRes().get(8);
Assert.assertEquals("bsmith", ByteBufferUtil.string( (ByteBuffer)r.get(0).get("name")));

ServiceProcessor allows us to move our application logic server side. Rather then sending N mutations in a batch, which uses more network IO, the user sends the data across the wire only once.