Skip to content

Commit

Permalink
HBase 1.x compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
HexyinUESTC committed Aug 21, 2024
1 parent 70113f6 commit 6966c4f
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 70 deletions.
106 changes: 53 additions & 53 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

<properties>
<hadoop.version>1.2.1</hadoop.version>
<hbase.version>0.98.24-hadoop1</hbase.version>
<hbase.version>1.3.6</hbase.version>
<java.source.version>1.8</java.source.version>
<java.target.version>1.8</java.target.version>
<junit.version>4.13.1</junit.version>
Expand All @@ -58,6 +58,58 @@
</properties>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>hsqldb</artifactId>
<groupId>hsqldb</groupId>
</exclusion>
<exclusion>
<artifactId>kfs</artifactId>
<groupId>net.sf.kosmosfs</groupId>
</exclusion>
<exclusion>
<artifactId>core</artifactId>
<groupId>org.eclipse.jdt</groupId>
</exclusion>
<exclusion>
<artifactId>jets3t</artifactId>
<groupId>net.java.dev.jets3t</groupId>
</exclusion>
<exclusion>
<artifactId>oro</artifactId>
<groupId>oro</groupId>
</exclusion>
<exclusion>
<artifactId>jasper-compiler</artifactId>
<groupId>tomcat</groupId>
</exclusion>
<exclusion>
<artifactId>jasper-runtime</artifactId>
<groupId>tomcat</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-mapper-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>jettison</artifactId>
<groupId>org.codehaus.jettison</groupId>
</exclusion>
<exclusion>
<artifactId>commons-collections</artifactId>
<groupId>commons-collections</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>obkv-table-client</artifactId>
<version>${table.client.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
Expand Down Expand Up @@ -117,58 +169,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>hsqldb</artifactId>
<groupId>hsqldb</groupId>
</exclusion>
<exclusion>
<artifactId>kfs</artifactId>
<groupId>net.sf.kosmosfs</groupId>
</exclusion>
<exclusion>
<artifactId>core</artifactId>
<groupId>org.eclipse.jdt</groupId>
</exclusion>
<exclusion>
<artifactId>jets3t</artifactId>
<groupId>net.java.dev.jets3t</groupId>
</exclusion>
<exclusion>
<artifactId>oro</artifactId>
<groupId>oro</groupId>
</exclusion>
<exclusion>
<artifactId>jasper-compiler</artifactId>
<groupId>tomcat</groupId>
</exclusion>
<exclusion>
<artifactId>jasper-runtime</artifactId>
<groupId>tomcat</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-mapper-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>jettison</artifactId>
<groupId>org.codehaus.jettison</groupId>
</exclusion>
<exclusion>
<artifactId>commons-collections</artifactId>
<groupId>commons-collections</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>obkv-table-client</artifactId>
<version>${table.client.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
86 changes: 73 additions & 13 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
import com.alipay.oceanbase.rpc.property.Property;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
Expand All @@ -39,7 +40,6 @@
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult;
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
import com.alipay.oceanbase.hbase.exception.OperationTimeoutException;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
Expand Down Expand Up @@ -368,8 +368,31 @@ public boolean exists(Get get) throws IOException {
return !r.isEmpty();
}

@Override
public boolean[] existsAll(List<Get> list) throws IOException {
if (list.isEmpty()) {
return new boolean[]{};
}
if (list.size() == 1) {
return new boolean[]{exists(list.get(0))};
}

// todo: Optimize after CheckExistenceOnly is finished
Result[] r = get(list);
boolean[] ret = new boolean[r.length];
for (int i = 0; i < r.length; ++i){
ret[i] = !r[i].isEmpty();
}
return ret;
}

public Boolean[] exists(List<Get> gets) throws IOException {
throw new FeatureNotSupportedException("not supported yet'");
Boolean[] result = new Boolean[gets.size()];
boolean[] exists = existsAll(gets);
for (int i = 0; i < gets.size(); ++i) {
result[i] = exists[i];
}
return result;
}

public void batch(List<? extends Row> actions, Object[] results) {
Expand Down Expand Up @@ -460,9 +483,14 @@ public Result call() throws IOException {

filter = buildObHTableFilter(get.getFilter(), get.getTimeRange(),
get.getMaxVersions(), entry.getValue());

obTableQuery = buildObTableQuery(filter, get.getRow(), true,
get.getRow(), true, -1);
if (get.isClosestRowBefore()) {
obTableQuery = buildObTableQuery(filter, null, false,
get.getRow(), true, 1);
obTableQuery.setScanOrder(ObScanOrder.Reverse);
} else {
obTableQuery = buildObTableQuery(filter, get.getRow(), true,
get.getRow(), true, -1);
}

request = buildObTableQueryRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
Expand Down Expand Up @@ -535,6 +563,9 @@ public ResultScanner call() throws IOException {
if (scan.isReversed()) { // reverse scan 时设置为逆序
obTableQuery.setScanOrder(ObScanOrder.Reverse);
}
obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong(
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE));
request = buildObTableQueryAsyncRequest(obTableQuery, getTargetTableName(tableNameString));
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
.execute(request);
Expand All @@ -558,7 +589,9 @@ public ResultScanner call() throws IOException {
}

// no support set maxResultSize.
// obTableQuery.setMaxResultSize(scan.getMaxResultSize());
obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong(
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE));

request = buildObTableQueryAsyncRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
Expand Down Expand Up @@ -655,7 +688,12 @@ private void validatePut(Put put) {
*/
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put)
throws IOException {
return checkAndMutation(row, family, qualifier, value, put);
return checkAndPut(row, family, qualifier, CompareFilter.CompareOp.EQUAL, value, put);
}

@Override
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
return checkAndMutation(row, family, qualifier, compareOp, value, put);
}

private void innerDelete(Delete delete) throws IOException {
Expand Down Expand Up @@ -708,10 +746,15 @@ public void delete(List<Delete> deletes) throws IOException {
*/
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value,
Delete delete) throws IOException {
return checkAndMutation(row, family, qualifier, value, delete);
return checkAndDelete(row, family, qualifier, CompareFilter.CompareOp.EQUAL, value, delete);
}

private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, byte[] value,
@Override
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
return checkAndMutation(row, family, qualifier, compareOp, value, delete);
}

private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value,
Mutation mutation) throws IOException {
try {
checkArgument(row != null, "row is null");
Expand All @@ -721,7 +764,7 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, by

checkArgument(!mutation.isEmpty(), "mutation is empty");

String filterString = buildCheckAndMutateFilterString(family, qualifier, value);
String filterString = buildCheckAndMutateFilterString(family, qualifier, compareOp, value);

ObHTableFilter filter = buildObHTableFilter(filterString, null, 1, qualifier);

Expand Down Expand Up @@ -1120,13 +1163,30 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
throw new FeatureNotSupportedException("not supported yet'");
}

// operationTimeout means operation max wait time in client
@Override
public void setOperationTimeout(int operationTimeout) {
this.operationTimeout = operationTimeout;
this.operationExecuteInPool = this.configuration.getBoolean(
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
}

@Override
public int getOperationTimeout() {
return operationTimeout;
}

// rpcTimeout means server max execute time, equal Table API rpc_execute_time, it must be set before OHTable init; please pass this parameter through conf
@Override
public void setRpcTimeout(int rpcTimeout) {
}

@Override
public int getRpcTimeout() {
return Integer.parseInt(configuration.get(Property.RPC_EXECUTE_TIMEOUT.getKey()));
}

public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) {
this.obTableClient.setRuntimeBatchExecutor(runtimeBatchExecutor);
}
Expand Down Expand Up @@ -1215,13 +1275,13 @@ private ObHTableFilter buildObHTableFilter(Filter filter, TimeRange timeRange, i
return obHTableFilter;
}

private String buildCheckAndMutateFilterString(byte[] family, byte[] qualifier, byte[] value) {
private String buildCheckAndMutateFilterString(byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value) {
if (value != null) {
return ("CheckAndMutateFilter(=, 'binary:" + Bytes.toString(value) + "', '"
return ("CheckAndMutateFilter("+ HBaseFilterUtils.toParseableString(compareOp) +", 'binary:" + Bytes.toString(value) + "', '"
+ Bytes.toString(family) + "', '"
+ (qualifier == null ? "" : Bytes.toString(qualifier)) + "', false)");
} else {
return ("CheckAndMutateFilter(=, 'binary:', '" + Bytes.toString(family) + "', '"
return ("CheckAndMutateFilter("+ HBaseFilterUtils.toParseableString(compareOp) +", 'binary:', '" + Bytes.toString(family) + "', '"
+ (qualifier == null ? "" : Bytes.toString(qualifier)) + "', true)");
}
}
Expand Down
43 changes: 42 additions & 1 deletion src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.alipay.oceanbase.hbase.core.Lifecycle;
import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException;
import com.alipay.oceanbase.rpc.property.Property;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
Expand Down Expand Up @@ -195,6 +196,28 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
throw new FeatureNotSupportedException("not supported yet'");
}

@Override
public void setOperationTimeout(int operationTimeout) {
checkStatus();
ohTable.setOperationTimeout(operationTimeout);
}

@Override
public int getOperationTimeout() {
checkStatus();
return ohTable.getOperationTimeout();
}

@Override
public void setRpcTimeout(int rpcTimeout) {
conf.set(Property.RPC_EXECUTE_TIMEOUT.getKey(), String.valueOf(rpcTimeout));
}

@Override
public int getRpcTimeout() {
return Integer.parseInt(conf.get(Property.RPC_EXECUTE_TIMEOUT.getKey()));
}

@Override
public byte[] getTableName() {
return tableName;
Expand Down Expand Up @@ -224,9 +247,15 @@ public boolean exists(Get get) throws IOException {
return ohTable.exists(get);
}

@Override
public boolean[] existsAll(List<Get> list) throws IOException {
checkStatus();
return ohTable.existsAll(list);
}

@Override
public Boolean[] exists(List<Get> gets) throws IOException {
throw new FeatureNotSupportedException("not supported yet'");
return ohTable.exists(gets);
}

// Not support.
Expand Down Expand Up @@ -314,6 +343,12 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] v
return ohTable.checkAndPut(row, family, qualifier, value, put);
}

@Override
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
checkStatus();
return ohTable.checkAndPut(row, family, qualifier, compareOp, value, put);
}

@Override
public void delete(Delete delete) throws IOException {
checkStatus();
Expand All @@ -333,6 +368,12 @@ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[
return ohTable.checkAndDelete(row, family, qualifier, value, delete);
}

@Override
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
checkStatus();
return ohTable.checkAndDelete(row, family, qualifier, compareOp, value, delete);
}

// Not support.
@Override
public void mutateRow(RowMutations rm) throws IOException {
Expand Down
Loading

0 comments on commit 6966c4f

Please sign in to comment.