Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added: Customizable publishers #5

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions src/main/java/com/gnip/core/endpoint/GnipStreamingEndpoint.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.gnip.core.endpoint;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.twitter.hbc.core.HttpConstants;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;

import java.util.Map;
import java.util.concurrent.ConcurrentMap;

public abstract class GnipStreamingEndpoint implements StreamingEndpoint {
private static final String BASE_PATH = "/accounts/%s/publishers/%s/streams/%s/%s.json";
protected final String account;
protected final String publisher;
protected final String product;
protected final String label;
protected final ConcurrentMap<String, String> queryParameters = Maps.newConcurrentMap();

public GnipStreamingEndpoint(String account, String publisher, String product, String label) {
this(account, publisher, product, label, 0);
}

public GnipStreamingEndpoint(String account, String publisher, String product, String label, int clientId) {
this.account = Preconditions.checkNotNull(account);
this.publisher = Preconditions.checkNotNull(publisher);
this.product = Preconditions.checkNotNull(product);
this.label = Preconditions.checkNotNull(label);

if (clientId > 0) {
addQueryParameter("client", String.valueOf(clientId));
}
}

@Override
public String getURI() {
String uri = String.format(BASE_PATH, account.trim(), publisher.trim(), product.trim(), label.trim());

if (queryParameters.isEmpty()) {
return uri;
} else {
return uri + "?" + generateParamString(queryParameters);
}
}

protected String generateParamString(Map<String, String> params) {
return Joiner.on("&")
.withKeyValueSeparator("=")
.join(params);
}

@Override
public String getHttpMethod() {
return HttpConstants.HTTP_GET;
}

@Override
public String getPostParamString() {
return null;
}

@Override
public String getQueryParamString() {
return generateParamString(queryParameters);
}

@Override
public void addQueryParameter(String param, String value) {
queryParameters.put(param, value);
}

@Override
public void removeQueryParameter(String param) {
queryParameters.remove(param);
}

// These don't do anything
@Override
public void setBackfillCount(int count) { }

@Override
public void setApiVersion(String apiVersion) { }

@Override
public void addPostParameter(String param, String value) { }

@Override
public void removePostParameter(String param) { }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.gnip.core.endpoint;

public class RealTimeGnipStreamingEndpoint extends GnipStreamingEndpoint {

public RealTimeGnipStreamingEndpoint(String account, String publisher, String product, String label) {
super(account, publisher, product, label);
}

public RealTimeGnipStreamingEndpoint(String account, String publisher, String product, String label, int clientId) {
super(account, publisher, product, label, clientId);
}
}
9 changes: 5 additions & 4 deletions src/main/java/com/twitter/kinesis/ConnectorApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.gnip.core.endpoint.GnipStreamingEndpoint;
import com.gnip.core.endpoint.RealTimeGnipStreamingEndpoint;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.EnterpriseStreamingEndpoint;
import com.twitter.hbc.core.endpoint.RealTimeEnterpriseStreamingEndpoint;
import com.twitter.hbc.core.processor.LineStringProcessor;
import com.twitter.hbc.httpclient.auth.BasicAuth;
import com.twitter.kinesis.metrics.HBCStatsTrackerMetric;
Expand Down Expand Up @@ -88,10 +88,11 @@ private void start() throws InterruptedException {
client.connect();
}

private EnterpriseStreamingEndpoint endpoint() {
private GnipStreamingEndpoint endpoint() {
String account = this.environment.accountName();
String publisher = this.environment.publisher();
String label = this.environment.streamLabel();
String product = this.environment.product();
return new RealTimeEnterpriseStreamingEndpoint(account, product, label);
return new RealTimeGnipStreamingEndpoint(account, publisher, product, label);
}
}
1 change: 1 addition & 0 deletions src/main/resources/config.properties.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ gnip.user.name=YOUR_GNIP_USERNAME
gnip.user.password=YOUR_GNIP_PASSWORD
gnip.account.name=YOUR_GNPI_ACCOUNT_NAME
gnip.product=YOUR_GNIP_PRODUCT
gnip.publisher=YOUR_GNIP_PUBLISHER
gnip.stream.label=YOUR_GNIP_STREAM_LABEL

#AWS Account Information
Expand Down