Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2555,4 +2555,12 @@ List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType, Server
* Flush master local region
*/
void flushMasterStore() throws IOException;

/**
* Check if a replication peer is enabled.
* @param peerId id of replication peer to check
* @return <code>true</code> if replication peer is enabled
* @throws IOException if a remote or network exception occurs
*/
boolean isReplicationPeerEnabled(String peerId) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better put this method together with other replication related method :)

}
Original file line number Diff line number Diff line change
Expand Up @@ -1084,4 +1084,9 @@ public List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType,
public void flushMasterStore() throws IOException {
get(admin.flushMasterStore());
}

@Override
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
return get(admin.isReplicationPeerEnabled(peerId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1776,4 +1776,12 @@ CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, Str
* Flush master local region
*/
CompletableFuture<Void> flushMasterStore();

/**
* Check if a replication peer is enabled.
* @param peerId id of replication peer to check
* @return true if replication peer is enabled. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -959,4 +959,9 @@ public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNam
public CompletableFuture<Void> flushMasterStore() {
return wrap(rawAdmin.flushMasterStore());
}

@Override
public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
return wrap(rawAdmin.isReplicationPeerEnabled(peerId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
Expand Down Expand Up @@ -4284,4 +4286,16 @@ Void> call(controller, stub, request.build(),
(s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
.call();
}

@Override
public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
request.setPeerId(peerId);
return this.<Boolean> newMasterCaller()
.action((controller, stub) -> this.<GetReplicationPeerStateRequest,
GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
(s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
resp -> resp.getIsEnabled()))
.call();
}
}
10 changes: 10 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,13 @@ message ModifyColumnStoreFileTrackerResponse {
message FlushMasterStoreRequest {}
message FlushMasterStoreResponse {}

message GetReplicationPeerStateRequest {
required string peer_id = 1;
}
message GetReplicationPeerStateResponse {
required bool is_enabled = 1;
}

service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
Expand Down Expand Up @@ -1203,6 +1210,9 @@ service MasterService {

rpc FlushMasterStore(FlushMasterStoreRequest)
returns(FlushMasterStoreResponse);

rpc IsReplicationPeerEnabled(GetReplicationPeerStateRequest)
returns(GetReplicationPeerStateResponse);
}

// HBCK Service definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
Expand Down Expand Up @@ -3491,4 +3493,16 @@ public FlushMasterStoreResponse flushMasterStore(RpcController controller,
}
return FlushMasterStoreResponse.newBuilder().build();
}

@Override
public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController controller,
GetReplicationPeerStateRequest request) throws ServiceException {
boolean isEnabled;
try {
isEnabled = server.getReplicationPeerManager().getPeerState(request.getPeerId());
} catch (ReplicationException ioe) {
throw new ServiceException(ioe);
}
return GetReplicationPeerStateResponse.newBuilder().setIsEnabled(isEnabled).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,15 @@ private void setPeerState(String peerId, boolean enabled) throws ReplicationExce
desc.getSyncReplicationState()));
}

public boolean getPeerState(String peerId) throws ReplicationException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better add a method to get the ReplicationPeerDescription? Anyway, not a big problem since this class is IA.Private.

ReplicationPeerDescription desc = peers.get(peerId);
if (desc != null) {
return desc.isEnabled();
} else {
throw new ReplicationException("Replication Peer of " + peerId + " does not exist.");
}
}

public void enablePeer(String peerId) throws ReplicationException {
setPeerState(peerId, true);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ ReplicationTests.class, LargeTests.class })
public class TestGetReplicationPeerState extends TestReplicationBase {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestGetReplicationPeerState.class);

@Test
public void testGetReplicationPeerState() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this to another existing test classes?


// Test disable replication peer
hbaseAdmin.disableReplicationPeer("2");
assertFalse(hbaseAdmin.isReplicationPeerEnabled("2"));

// Test enable replication peer
hbaseAdmin.enableReplicationPeer("2");
assertTrue(hbaseAdmin.isReplicationPeerEnabled("2"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -939,4 +939,9 @@ public Future<Void> modifyTableStoreFileTrackerAsync(TableName tableName, String
public void flushMasterStore() throws IOException {
admin.flushMasterStore();
}

@Override
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
return admin.isReplicationPeerEnabled(peerId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1314,4 +1314,9 @@ public Future<Void> modifyTableStoreFileTrackerAsync(TableName tableName, String
public void flushMasterStore() throws IOException {
throw new NotImplementedException("flushMasterStore not supported in ThriftAdmin");
}

@Override
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
throw new NotImplementedException("isReplicationPeerEnabled not supported in ThriftAdmin");
}
}