Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;


import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;

/**
Expand Down Expand Up @@ -114,6 +116,25 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private boolean dropOnDeletedTables;
private boolean isSerial = false;

/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating
* different Connection implementations, or initialize it in a different way,
* so defining createConnection as protected for possible overridings.
*/
protected Connection createConnection(Configuration conf) throws IOException {
return ConnectionFactory.createConnection(conf);
}

/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating
* different ReplicationSinkManager implementations, or initialize it in a different way,
* so defining createReplicationSinkManager as protected for possible overridings.
*/
protected ReplicationSinkManager createReplicationSinkManager(Connection conn) {
return new ReplicationSinkManager((ClusterConnection) conn, this.ctx.getPeerId(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: redundant cast to conn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compilation fails without this cast, as ReplicationSinkManager expects ClusterConnection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I meant pass ClusterConnection directly, but I guess that brings us back to your other comment as to why you want to use generic "Connection" object.

this, this.conf);
}

@Override
public void init(Context context) throws IOException {
super.init(context);
Expand All @@ -133,12 +154,16 @@ public void init(Context context) throws IOException {
// TODO: This connection is replication specific or we should make it particular to
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
Connection connection = createConnection(this.conf);
//Since createConnection method may be overridden by extending classes, we need to make sure
//it's indeed returning a ClusterConnection instance.
Preconditions.checkState(connection instanceof ClusterConnection);
this.conn = (ClusterConnection) connection;
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();
// ReplicationQueueInfo parses the peerId out of the znode for us
this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
this.replicationSinkMgr = createReplicationSinkManager(conn);
// per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
Expand Down