Skip to content
This repository has been archived by the owner on Apr 13, 2018. It is now read-only.

Update to support new namespace for Storm 1.0.0 #31

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apply plugin: 'java'
apply plugin: 'maven'

group = 'com.amazonaws'
version = '1.1.1'

description = """Amazon Kinesis Storm Spout for Java"""

ext.stormVersion = '1.0.0'
ext.AWSSdkVersion = '1.10.69'

repositories {

maven { url "http://repo.maven.apache.org/maven2" }
}
dependencies {
// Storm
compile group: 'org.apache.storm', name: 'storm-core', version: stormVersion
// AWS SDK
compile group: 'com.amazonaws', name: 'aws-java-sdk', version: AWSSdkVersion

compile group: 'org.apache.commons', name: 'commons-lang3', version:'3.0'
compile group: 'com.netflix.curator', name: 'curator-framework', version:'1.1.3'
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.ArrayList;
import java.util.List;

import backtype.storm.tuple.Fields;
import org.apache.storm.tuple.Fields;

import com.amazonaws.services.kinesis.model.Record;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import java.util.List;

import backtype.storm.tuple.Fields;
import org.apache.storm.tuple.Fields;

import com.amazonaws.services.kinesis.model.Record;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;
import java.util.concurrent.Callable;

import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.Config;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public static byte[] kryoSerializeObject(final Object obj) {
final ByteArrayOutputStream os = new ByteArrayOutputStream();
final Output output = new Output(os);

kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
// Better initialization to keep a fallback and prevent crash
((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.writeClassAndObject(output, obj);

output.flush();
Expand All @@ -70,7 +71,9 @@ public static Object kryoDeserializeObject(final byte[] ser) {
final Kryo kryo = new Kryo();
final Input input = new Input(new ByteArrayInputStream(ser));

kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
// Better initialization to keep a fallback and prevent crash
((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());

return kryo.readClassAndObject(input);
}
}
10 changes: 5 additions & 5 deletions src/main/samples/SampleBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;

import com.amazonaws.services.kinesis.model.Record;

Expand Down
2 changes: 1 addition & 1 deletion src/main/samples/SampleKinesisRecordScheme.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.util.ArrayList;
import java.util.List;

import backtype.storm.tuple.Fields;
import org.apache.storm.tuple.Fields;

import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.stormspout.IKinesisRecordScheme;
Expand Down
17 changes: 9 additions & 8 deletions src/main/samples/SampleTopology.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
import java.io.IOException;
import java.util.Properties;

import org.apache.storm.generated.AuthorizationException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
Expand All @@ -46,7 +47,7 @@ public class SampleTopology {
private static String zookeeperEndpoint;
private static String zookeeperPrefix;

public static void main(String[] args) throws IllegalArgumentException, KeeperException, InterruptedException, AlreadyAliveException, InvalidTopologyException, IOException {
public static void main(String[] args) throws IllegalArgumentException, KeeperException, InterruptedException, AlreadyAliveException, InvalidTopologyException, IOException, AuthorizationException {
String propertiesFile = null;
String mode = null;

Expand Down