Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ public String getRsPath(ServerName sn) {
* @param suffix ending of znode name
* @return result of properly joining prefix with suffix
*/
public static String joinZNode(String prefix, String suffix) {
return prefix + ZNodePaths.ZNODE_PATH_SEPARATOR + suffix;
public static String joinZNode(String prefix, String... suffix) {
StringBuilder sb = new StringBuilder(prefix);
for (String s : suffix) {
sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(s);
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.metrics.Counter;
import org.apache.hadoop.hbase.metrics.Histogram;
Expand All @@ -33,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;

/**
Expand Down Expand Up @@ -1011,6 +1013,19 @@ final void doReleaseLock(TEnvironment env, ProcedureStore store) {
releaseLock(env);
}

protected final ProcedureSuspendedException suspend(int timeoutMillis, boolean jitter)
throws ProcedureSuspendedException {
if (jitter) {
// 10% possible jitter
double add = (double) timeoutMillis * ThreadLocalRandom.current().nextDouble(0.1);
timeoutMillis += add;
}
setTimeout(timeoutMillis);
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
}

@Override
public int compareTo(final Procedure<TEnvironment> other) {
return Long.compare(getProcId(), other.getProcId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,3 +719,15 @@ enum AssignReplicationQueuesState {
message AssignReplicationQueuesStateData {
required ServerName crashed_server = 1;
}

enum MigrateReplicationQueueFromZkToTableState {
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 1;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 2;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 3;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 4;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 5;
}

message MigrateReplicationQueueFromZkToTableStateData {
repeated string disabled_peer_id = 1;
}
10 changes: 10 additions & 0 deletions hbase-replication/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -184,4 +185,22 @@ void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
* @return Whether the replication queue table exists
*/
boolean hasData() throws ReplicationException;

// the below 3 methods are used for migrating
/**
* Update the replication queue datas for a given region server.
*/
void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
throws ReplicationException;

/**
* Update last pushed sequence id for the given regions and peers.
*/
void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
throws ReplicationException;

/**
* Add the given hfile refs to the given peer.
*/
void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
Expand All @@ -46,6 +48,7 @@
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
Expand Down Expand Up @@ -74,12 +77,6 @@ public class TableReplicationQueueStorage implements ReplicationQueueStorage {

private final TableName tableName;

@FunctionalInterface
private interface TableCreator {

void create() throws IOException;
}

public TableReplicationQueueStorage(Connection conn, TableName tableName) {
this.conn = conn;
this.tableName = tableName;
Expand Down Expand Up @@ -541,4 +538,60 @@ public boolean hasData() throws ReplicationException {
throw new ReplicationException("failed to get replication queue table", e);
}
}

@Override
public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
throws ReplicationException {
List<Put> puts = new ArrayList<>();
for (ReplicationQueueData data : datas) {
if (data.getOffsets().isEmpty()) {
continue;
}
Put put = new Put(Bytes.toBytes(data.getId().toString()));
data.getOffsets().forEach((walGroup, offset) -> {
put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString()));
});
puts.add(put);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‘puts.add(put)’ should be moved after ‘put.addColumn’?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a single put, where it contains a lot column, but for the same row.

}
try (Table table = conn.getTable(tableName)) {
table.put(puts);
} catch (IOException e) {
throw new ReplicationException("failed to batch update queues", e);
}
}

@Override
public void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
throws ReplicationException {
Map<String, Put> peerId2Put = new HashMap<>();
for (ZkLastPushedSeqId lastPushedSeqId : lastPushedSeqIds) {
peerId2Put
.computeIfAbsent(lastPushedSeqId.getPeerId(), peerId -> new Put(Bytes.toBytes(peerId)))
.addColumn(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(lastPushedSeqId.getEncodedRegionName()),
Bytes.toBytes(lastPushedSeqId.getLastPushedSeqId()));
}
try (Table table = conn.getTable(tableName)) {
table
.put(peerId2Put.values().stream().filter(p -> !p.isEmpty()).collect(Collectors.toList()));
} catch (IOException e) {
throw new ReplicationException("failed to batch update last pushed sequence ids", e);
}
}

@Override
public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs)
throws ReplicationException {
if (hfileRefs.isEmpty()) {
return;
}
Put put = new Put(Bytes.toBytes(peerId));
for (String ref : hfileRefs) {
put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(ref), HConstants.EMPTY_BYTE_ARRAY);
}
try (Table table = conn.getTable(tableName)) {
table.put(put);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‘table.put(put)’ should be moved into the for loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a single put, where it contains a lot column, but for the same row.

} catch (IOException e) {
throw new ReplicationException("failed to batch update hfile references", e);
}
}
}
Loading