Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
Expand Down Expand Up @@ -68,7 +69,7 @@ public void start(CoprocessorEnvironment env) throws IOException {
groupInfoManager = RSGroupInfoManagerImpl.getInstance(master);
groupAdminServer = new RSGroupAdminServer(master, groupInfoManager);
Class<?> clazz =
master.getConfiguration().getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, null);
master.getConfiguration().getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, null);
if (!RSGroupableBalancer.class.isAssignableFrom(clazz)) {
throw new IOException("Configured balancer does not support RegionServer groups.");
}
Expand Down Expand Up @@ -108,85 +109,101 @@ RSGroupAdminServiceImpl getGroupAdminService() {

@Override
public void postClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
List<ServerName> servers, List<ServerName> notClearedServers) throws IOException {
List<ServerName> servers, List<ServerName> notClearedServers) throws IOException {
Set<Address> clearedServer =
servers.stream().filter(server -> !notClearedServers.contains(server))
.map(ServerName::getAddress).collect(Collectors.toSet());
servers.stream().filter(server -> !notClearedServers.contains(server))
.map(ServerName::getAddress).collect(Collectors.toSet());
if (!clearedServer.isEmpty()) {
groupAdminServer.removeServers(clearedServer);
}
}

private void checkGroupExists(Optional<String> optGroupName) throws IOException {
private RSGroupInfo checkGroupExists(Optional<String> optGroupName, Supplier<String> forWhom)
throws IOException {
if (optGroupName.isPresent()) {
String groupName = optGroupName.get();
if (groupAdminServer.getRSGroupInfo(groupName) == null) {
throw new ConstraintException("Region server group " + groupName + " does not exit");
RSGroupInfo group = groupAdminServer.getRSGroupInfo(groupName);
if (group == null) {
throw new ConstraintException(
"Region server group " + groupName + " for " + forWhom.get() + " does not exit");
}
return group;
}
return null;
}

private boolean rsgroupHasServersOnline(TableDescriptor desc) throws IOException {
RSGroupInfo rsGroupInfo;
Optional<String> optGroupName = desc.getRegionServerGroup();
if (optGroupName.isPresent()) {
String groupName = optGroupName.get();
if (groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
// do not check for default group
return true;
}
rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName);
if (rsGroupInfo == null) {
throw new ConstraintException(
"RSGroup " + groupName + " for table " + desc.getTableName() + " does not exist");
}
} else {
NamespaceDescriptor nd =
master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString());
String groupNameOfNs = nd.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP);
if (groupNameOfNs == null || groupNameOfNs.equals(RSGroupInfo.DEFAULT_GROUP)) {
// do not check for default group
return true;
}
rsGroupInfo = groupAdminServer.getRSGroupInfo(groupNameOfNs);
if (rsGroupInfo == null) {
throw new ConstraintException("RSGroup " + groupNameOfNs + " for table " +
desc.getTableName() + "(inherit from namespace) does not exist");
}
private Optional<String> getNamespaceGroup(NamespaceDescriptor namespaceDesc) {
return Optional
.ofNullable(namespaceDesc.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP));
}

// Do not allow creating new tables/namespaces which has an empty rs group, expect the default rs
// group. Notice that we do not check for online servers, as this is not stable because region
// servers can die at any time.
private void checkGroupNotEmpty(RSGroupInfo rsGroupInfo, Supplier<String> forWhom)
throws ConstraintException {
if (rsGroupInfo == null || rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
// we do not have a rs group config or we explicitly set the rs group to default, then no need
// to check.
return;
}
if (rsGroupInfo.getServers().isEmpty()) {
throw new ConstraintException(
"No servers in the rsgroup " + rsGroupInfo.getName() + " for " + forWhom.get());
}
return master.getServerManager().createDestinationServersList().stream()
.anyMatch(onlineServer -> rsGroupInfo.containsServer(onlineServer.getAddress()));
}

@Override
public void preCreateTableAction(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableDescriptor desc, RegionInfo[] regions) throws IOException {
checkGroupExists(desc.getRegionServerGroup());
if (!desc.getTableName().isSystemTable() && !rsgroupHasServersOnline(desc)) {
throw new HBaseIOException("No online servers in the rsgroup for " + desc);
TableDescriptor desc, RegionInfo[] regions) throws IOException {
if (desc.getTableName().isSystemTable()) {
// do not check for system tables as we may block the bootstrap.
return;
}
Supplier<String> forWhom = () -> "table " + desc.getTableName();
RSGroupInfo rsGroupInfo = checkGroupExists(desc.getRegionServerGroup(), forWhom);
if (rsGroupInfo == null) {
// we do not set rs group info on table, check if we have one on namespace
String namespace = desc.getTableName().getNamespaceAsString();
NamespaceDescriptor nd = master.getClusterSchema().getNamespace(namespace);
forWhom = () -> "table " + desc.getTableName() + "(inherit from namespace)";
rsGroupInfo = checkGroupExists(getNamespaceGroup(nd), forWhom);
}
checkGroupNotEmpty(rsGroupInfo, forWhom);
}

@Override
public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName, TableDescriptor currentDescriptor, TableDescriptor newDescriptor)
throws IOException {
checkGroupExists(newDescriptor.getRegionServerGroup());
TableName tableName, TableDescriptor currentDescriptor, TableDescriptor newDescriptor)
throws IOException {
if (!currentDescriptor.getRegionServerGroup().equals(newDescriptor.getRegionServerGroup())) {
Supplier<String> forWhom = () -> "table " + newDescriptor.getTableName();
RSGroupInfo rsGroupInfo = checkGroupExists(newDescriptor.getRegionServerGroup(), forWhom);
checkGroupNotEmpty(rsGroupInfo, forWhom);
}
return MasterObserver.super.preModifyTable(ctx, tableName, currentDescriptor, newDescriptor);
}

private void checkNamespaceGroup(NamespaceDescriptor nd) throws IOException {
Supplier<String> forWhom = () -> "namespace " + nd.getName();
RSGroupInfo rsGroupInfo = checkGroupExists(getNamespaceGroup(nd), forWhom);
checkGroupNotEmpty(rsGroupInfo, forWhom);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought these method can be refacotred to 3 methods:

  1. getRSGroupForTable()
  2. getRSGroupForNamespace()
  3. checkRSGroup() : it will check RSGroup exists and not empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here is not that straight-forward. If we are modifying table, we do not need to check the group if the group config is not changed or we just removed the group config. So does namespace...

@Override
public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
NamespaceDescriptor ns) throws IOException {
checkGroupExists(
Optional.ofNullable(ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP)));
NamespaceDescriptor ns) throws IOException {
checkNamespaceGroup(ns);
}

@Override
public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
NamespaceDescriptor currentNsDescriptor, NamespaceDescriptor newNsDescriptor)
throws IOException {
checkGroupExists(Optional
.ofNullable(newNsDescriptor.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP)));
NamespaceDescriptor currentNsDescriptor, NamespaceDescriptor newNsDescriptor)
throws IOException {
if (!Objects.equals(
currentNsDescriptor.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP),
newNsDescriptor.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP))) {
checkNamespaceGroup(newNsDescriptor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,14 @@ public void testNamespaceConstraint() throws Exception {
String nsName = tablePrefix + "_foo";
String groupName = tablePrefix + "_foo";
LOG.info("testNamespaceConstraint");
rsGroupAdmin.addRSGroup(groupName);
addGroup(groupName, 1);
assertTrue(observer.preAddRSGroupCalled);
assertTrue(observer.postAddRSGroupCalled);

admin.createNamespace(NamespaceDescriptor.create(nsName)
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, groupName).build());
RSGroupInfo rsGroupInfo = rsGroupAdmin.getRSGroupInfo(groupName);
rsGroupAdmin.moveServers(rsGroupInfo.getServers(), RSGroupInfo.DEFAULT_GROUP);
// test removing a referenced group
try {
rsGroupAdmin.removeRSGroup(groupName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand Down Expand Up @@ -136,45 +133,6 @@ public boolean evaluate() throws Exception {
Assert.assertEquals(1, admin.getRegions(targetServer).size());
}

@Test
public void testCreateWhenRsgroupNoOnlineServers() throws Exception {
LOG.info("testCreateWhenRsgroupNoOnlineServers");

// set rsgroup has no online servers and test create table
final RSGroupInfo appInfo = addGroup("appInfo", 1);
Iterator<Address> iterator = appInfo.getServers().iterator();
List<ServerName> serversToDecommission = new ArrayList<>();
ServerName targetServer = getServerName(iterator.next());
assertTrue(master.getServerManager().getOnlineServers().containsKey(targetServer));
serversToDecommission.add(targetServer);
admin.decommissionRegionServers(serversToDecommission, true);
assertEquals(1, admin.listDecommissionedRegionServers().size());

final TableName tableName = TableName.valueOf(tablePrefix + "_ns", name.getMethodName());
admin.createNamespace(NamespaceDescriptor.create(tableName.getNamespaceAsString())
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, appInfo.getName()).build());
final TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build();
try {
admin.createTable(desc);
fail("Shouldn't create table successfully!");
} catch (Exception e) {
LOG.debug("create table error", e);
}

// recommission and test create table
admin.recommissionRegionServer(targetServer, null);
assertEquals(0, admin.listDecommissionedRegionServers().size());
admin.createTable(desc);
// wait for created table to be assigned
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return getTableRegionMap().get(desc.getTableName()) != null;
}
});
}

@Test
public void testDefaultNamespaceCreateAndAssign() throws Exception {
LOG.info("testDefaultNamespaceCreateAndAssign");
Expand Down