Skip to content
This repository was archived by the owner on Apr 13, 2018. It is now read-only.

Update to support new namespace for Storm 1.0.0 #32

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@


<properties>
<aws-java-sdk.version>1.7.13</aws-java-sdk.version>
<storm.version>0.9.2-incubating</storm.version>
<curator-framework.version>1.1.3</curator-framework.version>
<guava.version>13.0-final</guava.version>
<commons-lang3.version>3.0</commons-lang3.version>
<aws-java-sdk.version>1.10.74</aws-java-sdk.version>
<storm.version>1.0.0</storm.version>
<curator-framework.version>1.3.3</curator-framework.version>
<guava.version>19.0</guava.version>
<commons-lang3.version>3.4</commons-lang3.version>
</properties>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.ArrayList;
import java.util.List;

import backtype.storm.tuple.Fields;
import org.apache.storm.tuple.Fields;

import com.amazonaws.services.kinesis.model.Record;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import java.util.List;

import backtype.storm.tuple.Fields;
import org.apache.storm.tuple.Fields;

import com.amazonaws.services.kinesis.model.Record;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;
import java.util.concurrent.Callable;

import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.Config;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public static byte[] kryoSerializeObject(final Object obj) {
final ByteArrayOutputStream os = new ByteArrayOutputStream();
final Output output = new Output(os);

kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
// Better initialization to keep a fallback and prevent crash
((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.writeClassAndObject(output, obj);

output.flush();
Expand All @@ -70,7 +71,9 @@ public static Object kryoDeserializeObject(final byte[] ser) {
final Kryo kryo = new Kryo();
final Input input = new Input(new ByteArrayInputStream(ser));

kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
// Better initialization to keep a fallback and prevent crash
((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());

return kryo.readClassAndObject(input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ class ZookeeperShardState {
try {
zk = CuratorFrameworkFactory.newClient(config.getZookeeperConnectionString(),
new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_NUM_RETRIES));
} catch (IOException e) {
zk.start();
} catch (Exception e) {
LOG.error("Could not connect to ZooKeeper", e);
throw new KinesisSpoutException(e);
}
zk.start();
}

/**
Expand Down
10 changes: 5 additions & 5 deletions src/main/samples/SampleBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;

import com.amazonaws.services.kinesis.model.Record;

Expand Down
2 changes: 1 addition & 1 deletion src/main/samples/SampleKinesisRecordScheme.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.util.ArrayList;
import java.util.List;

import backtype.storm.tuple.Fields;
import org.apache.storm.tuple.Fields;

import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.stormspout.IKinesisRecordScheme;
Expand Down
17 changes: 9 additions & 8 deletions src/main/samples/SampleTopology.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
import java.io.IOException;
import java.util.Properties;

import org.apache.storm.generated.AuthorizationException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
Expand All @@ -46,7 +47,7 @@ public class SampleTopology {
private static String zookeeperEndpoint;
private static String zookeeperPrefix;

public static void main(String[] args) throws IllegalArgumentException, KeeperException, InterruptedException, AlreadyAliveException, InvalidTopologyException, IOException {
public static void main(String[] args) throws IllegalArgumentException, KeeperException, InterruptedException, AlreadyAliveException, InvalidTopologyException, IOException, AuthorizationException {
String propertiesFile = null;
String mode = null;

Expand Down