diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index bfcc18706312..597ec2091d6c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -447,6 +447,24 @@ default CompletableFuture deleteAll(List deletes) { return allOf(delete(deletes)).thenApply(r -> null); } + /** + * Batch checkAndRowMutates the specified data into the table. + * @param checkAndRowMutates The list of rows to apply. + * @return A list of {@link CompletableFuture}s that represent the result for + * each checkAndRowMutate. + */ + List> checkAndRowMutate(List checkAndRowMutates); + + /** + * A simple version of batch checkAndRowMutate. It will fail if there are any failures. + * @param checkAndRowMutates list of things to checkAndRowMutate. + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + default CompletableFuture> checkAndRowMutateAll( + List checkAndRowMutates) { + return allOf(checkAndRowMutate(checkAndRowMutates)); + } + /** * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The * ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 2256a4c1a891..1b7c67da449d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -222,6 +222,13 @@ public List> delete(List deletes) { return rawTable.delete(deletes).stream().map(this::wrap).collect(toList()); } + @Override + public List> checkAndRowMutate( + List checkAndRowMutates) { + return rawTable.checkAndRowMutate(checkAndRowMutates) + .stream().map(this::wrap).collect(toList()); + } + @Override public List> batch(List actions) { return rawTable. batch(actions).stream().map(this::wrap).collect(toList()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndRowMutate.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndRowMutate.java new file mode 100644 index 000000000000..61090ac6e4a2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndRowMutate.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +/** + * Used to perform CheckAndRowMutate operations on a single row. + */ +@InterfaceAudience.Public +public class CheckAndRowMutate implements Row { + private final byte[] row; + private final byte[] family; + private byte[] qualifier; + private TimeRange timeRange = null; + private CompareOperator op; + private byte[] value; + private Mutation mutation = null; + + /** + * Create a CheckAndRowMutate operation for the specified row. + * + * @param row row key + * @param family family + */ + public CheckAndRowMutate(byte[] row, byte[] family) { + this.row = Bytes.copy(Mutation.checkRow(row)); + this.family = Preconditions.checkNotNull(family, "family is null"); + } + + /** + * Create a CheckAndRowMutate operation for the specified row, + * and an existing row lock. + * + * @param row row key + * @param family family + * @param qualifier qualifier + * @param value value + * @param mutation mutation + */ + public CheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp, + byte[] value, Mutation mutation) { + this.row = Bytes.copy(Mutation.checkRow(row)); + this.family = Preconditions.checkNotNull(family, "family is null"); + this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null"); + this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); + this.value = Preconditions.checkNotNull(value, "value is null"); + this.mutation = mutation; + } + + public CheckAndRowMutate qualifier(byte[] qualifier) { + this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" + + " an empty byte array, or just do not call this method if you want a null qualifier"); + return this; + } + + public CheckAndRowMutate timeRange(TimeRange timeRange) { + this.timeRange = timeRange; + return this; + } + + public CheckAndRowMutate ifNotExists() { + this.op = CompareOperator.EQUAL; + this.value = null; + return this; + } + + public CheckAndRowMutate ifMatches(CompareOperator compareOp, byte[] value) { + this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); + this.value = Preconditions.checkNotNull(value, "value is null"); + return this; + } + + public CheckAndRowMutate addMutation(Mutation mutation) throws IOException { + this.mutation = mutation; + return this; + } + + @Override + public int compareTo(Row i) { + return Bytes.compareTo(this.getRow(), i.getRow()); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof CheckAndRowMutate) { + CheckAndRowMutate other = (CheckAndRowMutate)obj; + return compareTo(other) == 0; + } + return false; + } + + @Override + public int hashCode(){ + return Arrays.hashCode(row); + } + /** + * Method for retrieving the delete's row + * + * @return row + */ + @Override + public byte[] getRow() { + return this.row; + } + + public byte[] getFamily() { + return family; + } + + public byte[] getQualifier() { + return qualifier; + } + + public TimeRange getTimeRange() { + return timeRange; + } + + public CompareOperator getOp() { + return op; + } + + public byte[] getValue() { + return value; + } + + public Mutation getMutation() { + return mutation; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index c357b1f761dc..33f39feff0b9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -484,6 +484,14 @@ public List> delete(List deletes) { return voidMutate(deletes); } + @Override + public List> checkAndRowMutate( + List checkAndRowMutates) { + return batch(checkAndRowMutates, readRpcTimeoutNs).stream() + .> map(f -> f.thenApply(r -> ((Result)r).getExists())) + .collect(toList()); + } + @Override public List> batch(List actions) { return batch(actions, rpcTimeoutNs); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 41b0e47c0758..5df364d92d2c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -293,6 +293,19 @@ default void delete(List deletes) throws IOException { throw new NotImplementedException("Add an implementation!"); } + /** + * Batch checkAndRowMutates the specified data into the table. + * + * @param checkAndRowMutates The list of rows to apply. + * @throws IOException if a remote or network exception occurs. + */ + + default boolean[] checkAndRowMutate(final List checkAndRowMutates) + throws IOException { + throw new NotImplementedException("Add an implementation!"); + } + + /** * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it * adds the Put/Delete/RowMutations. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java index 0a2a66eecaeb..aa05a6c3765e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java @@ -220,6 +220,12 @@ public void delete(List deletes) throws IOException { FutureUtils.get(table.deleteAll(deletes)); } + @Override + public boolean [] checkAndRowMutate(final List checkAndRowMutates) + throws IOException { + return Booleans.toArray(FutureUtils.get(table.checkAndRowMutateAll(checkAndRowMutates))); + } + private static final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { private final AsyncTable.CheckAndMutateBuilder builder; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 81082174bc0a..4b03d4332dbc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndRowMutate; import org.apache.hadoop.hbase.client.ClientUtil; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; @@ -89,6 +90,7 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; @@ -3348,4 +3350,30 @@ public static RegionStatesCount toTableRegionStatesCount( .build(); } + public static ClientProtos.CheckAndRowMutate toCheckAndRowMutate( + final CheckAndRowMutate checkAndRowMutate) throws IOException { + ClientProtos.CheckAndRowMutate.Builder builder = + ClientProtos.CheckAndRowMutate.newBuilder(); + builder.setRow(UnsafeByteOperations.unsafeWrap(checkAndRowMutate.getRow())) + .setFamily(UnsafeByteOperations.unsafeWrap(checkAndRowMutate.getFamily())) + .setQualifier(UnsafeByteOperations.unsafeWrap(checkAndRowMutate.getQualifier() == null ? + HConstants.EMPTY_BYTE_ARRAY : checkAndRowMutate.getQualifier())) + .setComparator(ProtobufUtil.toComparator( + new BinaryComparator(checkAndRowMutate.getValue()))) + .setCompareType(HBaseProtos.CompareType.valueOf(checkAndRowMutate.getOp().name())) + .setTimeRange(ProtobufUtil.toTimeRange(checkAndRowMutate.getTimeRange())); + MutationProto.Builder mutationBuilder = MutationProto.newBuilder(); + MutationType mutateType = null; + Mutation mutation = checkAndRowMutate.getMutation(); + if (mutation instanceof Put) { + mutateType = MutationType.PUT; + } else if (mutation instanceof Delete) { + mutateType = MutationType.DELETE; + } else { + throw new DoNotRetryIOException(mutation.getClass().getName() + " is not support"); + } + builder.setMutation(ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder)); + return builder.build(); + } + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index d45423c95b01..2713a82a7d64 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndRowMutate; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -697,6 +698,9 @@ public static void buildRegionActions(final byte[] regionName, .setRequest(value))); } else if (row instanceof RowMutations) { rowMutationsList.add(action); + } else if (row instanceof CheckAndRowMutate) { + builder.addAction(actionBuilder.setCheckAndRowMutate( + ProtobufUtil.toCheckAndRowMutate((CheckAndRowMutate)row))); } else { throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); } @@ -820,6 +824,9 @@ public static void buildNoDataRegionActions(final byte[] regionName, .setRequest(value))); } else if (row instanceof RowMutations) { rowMutationsList.add(action); + } else if (row instanceof CheckAndRowMutate) { + builder.addAction(actionBuilder.setCheckAndRowMutate( + ProtobufUtil.toCheckAndRowMutate((CheckAndRowMutate)row))); } else { throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index a22c6237bc76..c0f071c6799d 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -92,6 +92,16 @@ message Get { optional bool load_column_families_on_demand = 14; /* DO NOT add defaults to load_column_families_on_demand. */ } +message CheckAndRowMutate { + required bytes row = 1; + required bytes family = 2; + required bytes qualifier = 3; + required CompareType compare_type = 4; + required Comparator comparator = 5; + optional TimeRange time_range = 6; + required MutationProto mutation = 7; +} + message Result { // Result includes the Cells or else it just has a count of Cells // that are carried otherwise. @@ -444,6 +454,7 @@ message Action { optional MutationProto mutation = 2; optional Get get = 3; optional CoprocessorServiceCall service_call = 4; + optional CheckAndRowMutate checkAndRowMutate = 5; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 661311e3e70c..713dafbb774b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -898,6 +898,23 @@ private List doNonAtomicRegionMutation(final HRegion region, default: throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); } + } else if (action.hasCheckAndRowMutate()) { + ClientProtos.CheckAndRowMutate checkAndRowMutate = action.getCheckAndRowMutate(); + byte[] row = checkAndRowMutate.getRow().toByteArray(); + byte[] family = checkAndRowMutate.getFamily().toByteArray(); + byte[] qualifier = checkAndRowMutate.getQualifier().toByteArray(); + CompareOperator compareOp = + CompareOperator.valueOf(checkAndRowMutate.getCompareType().name()); + ByteArrayComparable comparator = + ProtobufUtil.toComparator(checkAndRowMutate.getComparator()); + TimeRange timeRange = checkAndRowMutate.hasTimeRange() ? + ProtobufUtil.toTimeRange(checkAndRowMutate.getTimeRange()) : + TimeRange.allTime(); + Mutation mutation = ProtobufUtil.toMutation(checkAndRowMutate.getMutation()); + boolean result = region.checkAndMutate(row, family, + qualifier, compareOp, comparator, timeRange, mutation); + r = new Result(); + r.setExists(result); } else { throw new HBaseIOException("Unexpected Action type"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java index 2e9bb74eca1c..782e30fc2ba8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java @@ -139,6 +139,12 @@ public List> delete(List deletes) { return null; } + @Override + public List> checkAndRowMutate( + List checkAndRowMutates) { + return null; + } + @Override public List> batch(List actions) { return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndRowMutate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndRowMutate.java new file mode 100644 index 000000000000..14cf95de4dd9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndRowMutate.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category(MediumTests.class) +public class TestCheckAndRowMutate { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCheckAndRowMutate.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] ROWKEY = Bytes.toBytes("12345"); + private static final byte[] FAMILY = Bytes.toBytes("cf"); + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private Table createTable() throws IOException, InterruptedException { + final TableName tableName = TableName.valueOf(name.getMethodName()); + Table table = TEST_UTIL.createTable(tableName, FAMILY); + TEST_UTIL.waitTableAvailable(tableName.getName(), 5000); + return table; + } + + private void putOneRow(Table table) throws IOException { + Put put = new Put(ROWKEY); + put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); + put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); + put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); + table.put(put); + } + + private void getOneRowAndAssertAllExist(final Table table) throws IOException { + Get get = new Get(ROWKEY); + Result result = table.get(get); + assertEquals("Column A value should be a", "a", + Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + assertEquals("Column B value should be b", "b", + Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + assertEquals("Column C value should be c", "c", + Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + } + + private void deleteOneRow(Table table) throws IOException { + Delete delete = new Delete(ROWKEY); + table.delete(delete); + } + + private List makeCheckAndRowMutatesWithPut() throws IOException { + List checkAndRowMutates = new ArrayList<>(); + Put put = new Put(ROWKEY); + put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("aa")); + CheckAndRowMutate checkAndRowMutate = new CheckAndRowMutate(ROWKEY, FAMILY) + .qualifier(Bytes.toBytes("A")) + .ifMatches(CompareOperator.EQUAL, Bytes.toBytes("a")) + .addMutation(put); + checkAndRowMutates.add(checkAndRowMutate); + + put = new Put(ROWKEY); + put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")); + checkAndRowMutate = new CheckAndRowMutate(ROWKEY, FAMILY) + .qualifier(Bytes.toBytes("B")) + .ifMatches(CompareOperator.EQUAL, Bytes.toBytes("b")) + .addMutation(put); + checkAndRowMutates.add(checkAndRowMutate); + + put = new Put(ROWKEY); + put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("cc")); + checkAndRowMutate = new CheckAndRowMutate(ROWKEY, FAMILY) + .qualifier(Bytes.toBytes("C")) + .ifMatches(CompareOperator.GREATER, Bytes.toBytes("c")) + .addMutation(put); + checkAndRowMutates.add(checkAndRowMutate); + return checkAndRowMutates; + } + + private void getOneRowAndAssertCNotChanged(final Table table) throws IOException { + Get get = new Get(ROWKEY); + Result result = table.get(get); + assertEquals("Column A value should be aa", "aa", + Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + assertEquals("Column B value should be bb", "bb", + Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + assertEquals("Column B value should be c", "c", + Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + } + + private List makeCheckAndRowMutatesWithDelete() throws IOException { + List checkAndRowMutates = new ArrayList<>(); + Delete delete = new Delete(ROWKEY); + delete.addColumns(FAMILY, Bytes.toBytes("A")); + CheckAndRowMutate checkAndRowMutate = new CheckAndRowMutate(ROWKEY, FAMILY) + .qualifier(Bytes.toBytes("A")) + .ifMatches(CompareOperator.EQUAL, Bytes.toBytes("a")) + .addMutation(delete); + checkAndRowMutates.add(checkAndRowMutate); + + delete = new Delete(ROWKEY); + delete.addColumns(FAMILY, Bytes.toBytes("B")); + checkAndRowMutate = new CheckAndRowMutate(ROWKEY, FAMILY) + .qualifier(Bytes.toBytes("B")) + .ifMatches(CompareOperator.EQUAL, Bytes.toBytes("b")) + .addMutation(delete); + checkAndRowMutates.add(checkAndRowMutate); + + delete = new Delete(ROWKEY); + delete.addColumns(FAMILY, Bytes.toBytes("C")); + checkAndRowMutate = new CheckAndRowMutate(ROWKEY, FAMILY) + .qualifier(Bytes.toBytes("C")) + .ifMatches(CompareOperator.GREATER, Bytes.toBytes("c")) + .addMutation(delete); + checkAndRowMutates.add(checkAndRowMutate); + return checkAndRowMutates; + } + + private void getOneRowAndAssertCNotDelete(final Table table) throws IOException { + Get get = new Get(ROWKEY); + Result result = table.get(get); + assertNull("Column A should not exist", result.getValue(FAMILY, Bytes.toBytes("A"))); + assertNull("Column B should not exist", result.getValue(FAMILY, Bytes.toBytes("B"))); + assertEquals("Column C value should be c", "c", + Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + } + + private List makeCheckAndRowMutatesWithPutAndDelete() throws IOException { + List checkAndRowMutates = new ArrayList<>(); + Put put = new Put(ROWKEY); + put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("aa")); + CheckAndRowMutate checkAndRowMutate = new CheckAndRowMutate(ROWKEY, FAMILY) + .qualifier(Bytes.toBytes("A")) + .ifMatches(CompareOperator.EQUAL, Bytes.toBytes("a")) + .addMutation(put); + checkAndRowMutates.add(checkAndRowMutate); + + put = new Put(ROWKEY); + put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")); + checkAndRowMutate = new CheckAndRowMutate(ROWKEY, FAMILY) + .qualifier(Bytes.toBytes("B")) + .ifMatches(CompareOperator.GREATER, Bytes.toBytes("b")) + .addMutation(put); + checkAndRowMutates.add(checkAndRowMutate); + + Delete delete = new Delete(ROWKEY); + delete.addColumns(FAMILY, Bytes.toBytes("C")); + checkAndRowMutate = new CheckAndRowMutate(ROWKEY, FAMILY) + .qualifier(Bytes.toBytes("C")) + .ifMatches(CompareOperator.EQUAL, Bytes.toBytes("c")) + .addMutation(delete); + checkAndRowMutates.add(checkAndRowMutate); + return checkAndRowMutates; + } + + private void getOneRowAndAssertAChangedAndCDelete(final Table table) throws IOException { + Get get = new Get(ROWKEY); + Result result = table.get(get); + assertEquals("Column A value should be aa", "aa", + Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + assertEquals("Column B value should be bb", "b", + Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + assertNull("Column C should not exist", result.getValue(FAMILY, Bytes.toBytes("C"))); + } + + @Test + public void testCheckAndRowMutateWithPut() throws Throwable { + try (Table table = createTable()) { + // put one row + putOneRow(table); + // get row back and assert the values + getOneRowAndAssertAllExist(table); + + // put the same row again with C column not changed + List carm = makeCheckAndRowMutatesWithPut(); + boolean[] result = table.checkAndRowMutate(carm); + assertTrue(result[0]); + assertTrue(result[1]); + assertFalse(result[2]); + getOneRowAndAssertCNotChanged(table); + + // delete one row + deleteOneRow(table); + // put one row + putOneRow(table); + // get row back and assert the values + getOneRowAndAssertAllExist(table); + + // put the same row again with C column not deleted + carm = makeCheckAndRowMutatesWithDelete(); + result = table.checkAndRowMutate(carm); + assertTrue(result[0]); + assertTrue(result[1]); + assertFalse(result[2]); + getOneRowAndAssertCNotDelete(table); + + // delete one row + deleteOneRow(table); + // put one row + putOneRow(table); + // get row back and assert the values + getOneRowAndAssertAllExist(table); + + // put the same row again with A column changed and C column deleted + carm = makeCheckAndRowMutatesWithPutAndDelete(); + result = table.checkAndRowMutate(carm); + assertTrue(result[0]); + assertFalse(result[1]); + assertTrue(result[2]); + getOneRowAndAssertAChangedAndCDelete(table); + } + } +}