Skip to content

Commit

Permalink
[pinpoint-apm#10131] Migrate HBase1 API to HBase2 API
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jul 19, 2023
1 parent e27b1ad commit 90097de
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@

package com.navercorp.pinpoint.common.hbase;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import com.navercorp.pinpoint.common.util.ArrayUtils;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import com.navercorp.pinpoint.common.util.BytesUtils;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceExistException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.logging.log4j.Logger;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* @author emeroad
Expand Down Expand Up @@ -78,46 +76,42 @@ public boolean createNamespaceIfNotExists(String namespace, Map<String, String>
}

@Override
public List<HTableDescriptor> getTableDescriptors(String namespace) {
public List<TableDescriptor> getTableDescriptors(String namespace) {
return execute(admin -> {
HTableDescriptor[] htds = admin.listTableDescriptorsByNamespace(namespace);
if (ArrayUtils.isEmpty(htds)) {
return Collections.emptyList();
}
return Arrays.asList(htds);
return admin.listTableDescriptorsByNamespace(BytesUtils.toBytes(namespace));
});
}

@Override
public HTableDescriptor getTableDescriptor(TableName tableName) {
return execute(admin -> admin.getTableDescriptor(tableName));
public TableDescriptor getTableDescriptor(TableName tableName) {
return execute(admin -> admin.getDescriptor(tableName));
}

@Override
public void createTable(HTableDescriptor htd) {
public void createTable(TableDescriptor tableDescriptor) {
execute(admin -> {
admin.createTable(htd);
logger.info("{} table created, htd : {}", htd.getTableName(), htd);
admin.createTable(tableDescriptor);
logger.info("{} table created, tableDescriptor : {}", tableDescriptor.getTableName(), tableDescriptor);
return null;
});
}

@Override
public void createTable(HTableDescriptor htd, byte[][] splitKeys) {
public void createTable(TableDescriptor tableDescriptor, byte[][] splitKeys) {
execute(admin -> {
admin.createTable(htd, splitKeys);
logger.info("{} table created with {} split keys, htd : {}", htd.getTableName(), splitKeys.length + 1, htd);
admin.createTable(tableDescriptor, splitKeys);
logger.info("{} table created with {} split keys, tableDescriptor : {}", tableDescriptor.getTableName(), splitKeys.length + 1, tableDescriptor);
return null;
});
}

@Override
public boolean createTableIfNotExists(HTableDescriptor htd) {
public boolean createTableIfNotExists(TableDescriptor tableDescriptor) {
return execute(admin -> {
TableName tableName = htd.getTableName();
TableName tableName = tableDescriptor.getTableName();
if (!admin.tableExists(tableName)) {
admin.createTable(htd);
logger.info("{} table created, htd : {}", htd.getTableName(), htd);
admin.createTable(tableDescriptor);
logger.info("{} table created, tableDescriptor : {}", tableDescriptor.getTableName(), tableDescriptor);
return true;
}
return false;
Expand Down Expand Up @@ -166,20 +160,19 @@ public void dropTable(TableName tableName) {
}

@Override
public void modifyTable(HTableDescriptor htd) {
final TableName tableName = htd.getTableName();
public void modifyTable(TableDescriptor tableDescriptor) {
execute(admin -> {
admin.modifyTable(tableName, htd);
logger.info("{} table modified, htd : {}", tableName, htd);
admin.modifyTable(tableDescriptor);
logger.info("table modified, tableDescriptor : {}", tableDescriptor);
return null;
});
}

@Override
public void addColumn(TableName tableName, HColumnDescriptor hcd) {
public void addColumn(TableName tableName, ColumnFamilyDescriptor columnDescriptor) {
execute(admin -> {
admin.addColumn(tableName, hcd);
logger.info("{} table added column : {}", tableName, hcd);
admin.addColumnFamily(tableName, columnDescriptor);
logger.info("{} table added column : {}", tableName, columnDescriptor);
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package com.navercorp.pinpoint.common.hbase;

import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;

import java.util.List;
import java.util.Map;
Expand All @@ -32,15 +32,15 @@ public interface HbaseAdminOperation {

boolean createNamespaceIfNotExists(String namespace, Map<String, String> configurations);

List<HTableDescriptor> getTableDescriptors(String namespace);
List<TableDescriptor> getTableDescriptors(String namespace);

HTableDescriptor getTableDescriptor(TableName tableName);
TableDescriptor getTableDescriptor(TableName tableName);

void createTable(HTableDescriptor hTableDescriptor);
void createTable(TableDescriptor tableDescriptor);

void createTable(HTableDescriptor hTableDescriptor, byte[][] splitKeys);
void createTable(TableDescriptor tableDescriptor, byte[][] splitKeys);

boolean createTableIfNotExists(HTableDescriptor hTableDescriptor);
boolean createTableIfNotExists(TableDescriptor tableDescriptor);

boolean tableExists(TableName tableName);

Expand All @@ -50,9 +50,9 @@ public interface HbaseAdminOperation {

void dropTable(TableName tableName);

void modifyTable(HTableDescriptor hTableDescriptor);
void modifyTable(TableDescriptor tableDescriptor);

void addColumn(TableName tableName, HColumnDescriptor hColumnDescriptor);
void addColumn(TableName tableName, ColumnFamilyDescriptor columnDescriptor);

<T> T execute(AdminCallback<T> action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import com.navercorp.pinpoint.common.hbase.AdminCallback;
import com.navercorp.pinpoint.common.hbase.HbaseAdminOperation;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.logging.log4j.Logger;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -60,33 +60,33 @@ public boolean createNamespaceIfNotExists(String namespace, Map<String, String>
}

@Override
public List<HTableDescriptor> getTableDescriptors(String namespace) {
public List<TableDescriptor> getTableDescriptors(String namespace) {
return delegate.getTableDescriptors(namespace);
}

@Override
public HTableDescriptor getTableDescriptor(TableName tableName) {
public TableDescriptor getTableDescriptor(TableName tableName) {
return delegate.getTableDescriptor(tableName);
}

@Override
public void createTable(HTableDescriptor htd) {
logger.info("Creating table : {}.", htd);
public void createTable(TableDescriptor tableDescriptor) {
logger.info("Creating table : {}.", tableDescriptor);
}

@Override
public void createTable(HTableDescriptor htd, byte[][] splitKeys) {
logger.info("Creating table : {} with {} splitKeys.", htd, splitKeys.length);
public void createTable(TableDescriptor tableDescriptor, byte[][] splitKeys) {
logger.info("Creating table : {} with {} splitKeys.", tableDescriptor, splitKeys.length);
}

@Override
public boolean createTableIfNotExists(HTableDescriptor htd) {
TableName tableName = htd.getTableName();
public boolean createTableIfNotExists(TableDescriptor tableDescriptor) {
TableName tableName = tableDescriptor.getTableName();
boolean tableExists = delegate.tableExists(tableName);
if (tableExists) {
return false;
}
this.createTable(htd);
this.createTable(tableDescriptor);
return true;
}

Expand Down Expand Up @@ -127,13 +127,13 @@ public void dropTable(TableName tableName) {
}

@Override
public void modifyTable(HTableDescriptor htd) {
logger.info("Modifying table : {}, desc : {}", htd.getTableName(), htd);
public void modifyTable(TableDescriptor tableDescriptor) {
logger.info("Modifying table : {}, desc : {}", tableDescriptor.getTableName(), tableDescriptor);
}

@Override
public void addColumn(TableName tableName, HColumnDescriptor hcd) {
logger.info("Adding column to table : {}, column : {}", tableName, hcd);
public void addColumn(TableName tableName, ColumnFamilyDescriptor columnDescriptor) {
logger.info("Adding column to table : {}, column : {}", tableName, columnDescriptor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import com.navercorp.pinpoint.common.hbase.HbaseAdminOperation;
import com.navercorp.pinpoint.common.util.ArrayUtils;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Arrays;
import java.util.Objects;
Expand All @@ -37,13 +37,13 @@ public class CreateTableCommand extends TableCommand {
private final byte[][] splitKeys;

CreateTableCommand(TableName tableName, Compression.Algorithm compressionAlgorithm, byte[][] splitKeys) {
super(new HTableDescriptor(Objects.requireNonNull(tableName, "tableName")), compressionAlgorithm);
super(tableName, compressionAlgorithm);
this.splitKeys = Objects.requireNonNull(splitKeys, "splitKeys");
}

@Override
public boolean execute(HbaseAdminOperation hbaseAdminOperation) {
HTableDescriptor htd = getHtd();
TableDescriptor htd = buildDescriptor();
TableName tableName = htd.getTableName();
if (hbaseAdminOperation.tableExists(tableName)) {
return false;
Expand All @@ -61,11 +61,9 @@ public boolean execute(HbaseAdminOperation hbaseAdminOperation) {

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("CreateTableCommand{");
sb.append("htd=").append(getHtd());
sb.append("compressionAlgorithm=").append(getCompressionAlgorithm().getName());
sb.append(", splitKeys=").append(Arrays.toString(splitKeys));
sb.append('}');
return sb.toString();
return "CreateTableCommand{htd=" + buildDescriptor() +
"compressionAlgorithm=" + getCompressionAlgorithm().getName() +
", splitKeys=" + Arrays.toString(splitKeys) +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import com.navercorp.pinpoint.hbase.schema.reader.core.ChangeSet;
import com.navercorp.pinpoint.hbase.schema.reader.core.ChangeType;
import com.navercorp.pinpoint.hbase.schema.reader.core.TableChange;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.springframework.util.CollectionUtils;

Expand Down Expand Up @@ -51,10 +51,10 @@ public HbaseSchemaCommandManager(String namespace, String compression) {
this(namespace, compression, Collections.emptyList());
}

public HbaseSchemaCommandManager(String namespace, String compression, List<HTableDescriptor> currentHtds) {
public HbaseSchemaCommandManager(String namespace, String compression, List<TableDescriptor> currentHtds) {
this.namespace = StringUtils.defaultIfEmpty(namespace, NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR);
this.compressionAlgorithm = getCompressionAlgorithm(compression);
for (HTableDescriptor htd : filterTablesByNamespace(currentHtds)) {
for (TableDescriptor htd : filterTablesByNamespace(currentHtds)) {
tableCommandMap.put(htd.getTableName(), new ModifyTableCommand(htd, this.compressionAlgorithm));
}
}
Expand All @@ -71,12 +71,12 @@ private Compression.Algorithm getCompressionAlgorithm(String compression) {
throw new IllegalArgumentException("Unknown compression option : " + compression);
}

private List<HTableDescriptor> filterTablesByNamespace(List<HTableDescriptor> htds) {
private List<TableDescriptor> filterTablesByNamespace(List<TableDescriptor> htds) {
if (CollectionUtils.isEmpty(htds)) {
return Collections.emptyList();
}
List<HTableDescriptor> filteredHtds = new ArrayList<>();
for (HTableDescriptor htd : htds) {
List<TableDescriptor> filteredHtds = new ArrayList<>();
for (TableDescriptor htd : htds) {
TableName tableName = htd.getTableName();
String namespace = tableName.getNamespaceAsString();
if (this.namespace.equalsIgnoreCase(namespace)) {
Expand Down Expand Up @@ -138,12 +138,11 @@ public List<TableCommand> getCommands() {
.collect(Collectors.toList());
}

public List<HTableDescriptor> getSchemaSnapshot() {
public List<TableDescriptor> getSchemaSnapshot() {
return tableCommandMap.entrySet().stream()
.filter(e -> affectedTables.contains(e.getKey()))
.map(Map.Entry::getValue)
.map(TableCommand::getHtd)
.map(HTableDescriptor::new)
.map(TableCommand::buildDescriptor)
.collect(Collectors.toList());
}
}
Loading

0 comments on commit 90097de

Please sign in to comment.