Skip to content

Commit

Permalink
feat: Add support for MergeToCell API (#2258)
Browse files Browse the repository at this point in the history
* feat: Add support for MergeToCell API

* feat: Add support for MergeToCell API

* fix build

* fix build

* fix format

* fix build

* fix build

* fix build

* fix format

* fix test

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Update WriteAggregate.java

* Update WriteAggregate.java

* Update WriteAggregate.java

* Update WriteAggregate.java

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and lqiu96 committed Jul 26, 2024
1 parent 4cecb0c commit 95d48e6
Show file tree
Hide file tree
Showing 13 changed files with 262 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ Builder addToCell(@Nonnull String familyName, Value qualifier, Value timestamp,
return this;
}

Builder mergeToCell(@Nonnull String familyName, Value qualifier, Value timestamp, Value input) {
this.entriesBuilder().add(MergeToCell.create(familyName, qualifier, timestamp, input));
return this;
}

abstract ChangeStreamMutation build();
}

Expand Down Expand Up @@ -210,6 +215,13 @@ public RowMutation toRowMutation(@Nonnull String tableId) {
addToCell.getQualifier(),
addToCell.getTimestamp(),
addToCell.getInput());
} else if (entry instanceof MergeToCell) {
MergeToCell mergeToCell = (MergeToCell) entry;
rowMutation.mergeToCell(
mergeToCell.getFamily(),
mergeToCell.getQualifier(),
mergeToCell.getTimestamp(),
mergeToCell.getInput());
} else {
throw new IllegalArgumentException("Unexpected Entry type.");
}
Expand Down Expand Up @@ -242,6 +254,13 @@ public RowMutationEntry toRowMutationEntry() {
addToCell.getQualifier(),
addToCell.getTimestamp(),
addToCell.getInput());
} else if (entry instanceof MergeToCell) {
MergeToCell mergeToCell = (MergeToCell) entry;
rowMutationEntry.mergeToCell(
mergeToCell.getFamily(),
mergeToCell.getQualifier(),
mergeToCell.getTimestamp(),
mergeToCell.getInput());
} else {
throw new IllegalArgumentException("Unexpected Entry type.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ void addToCell(
@Nonnull Value timestamp,
@Nonnull Value value);

void mergeToCell(
@Nonnull String familyName,
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value value);

/**
* Called to start a SetCell.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ public void addToCell(
this.changeStreamMutationBuilder.addToCell(familyName, qualifier, timestamp, input);
}

@Override
public void mergeToCell(
@Nonnull String familyName,
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value input) {
this.changeStreamMutationBuilder.mergeToCell(familyName, qualifier, timestamp, input);
}

/** {@inheritDoc} */
@Override
public void startCell(String family, ByteString qualifier, long timestampMicros) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalApi;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import javax.annotation.Nonnull;

/** Representation of an MergeToCell mod in a data change. */
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
@AutoValue
public abstract class MergeToCell implements Entry, Serializable {
public static MergeToCell create(
@Nonnull String family,
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value input) {
return new AutoValue_MergeToCell(family, qualifier, timestamp, input);
}

@Nonnull
public abstract String getFamily();

@Nonnull
public abstract Value getQualifier();

@Nonnull
public abstract Value getTimestamp();

@Nonnull
public abstract Value getInput();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.bigtable.v2.Mutation.DeleteFromColumn;
import com.google.bigtable.v2.Mutation.DeleteFromFamily;
import com.google.bigtable.v2.Mutation.DeleteFromRow;
import com.google.bigtable.v2.Mutation.MergeToCell;
import com.google.bigtable.v2.Mutation.SetCell;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -308,6 +309,24 @@ public Mutation addToCell(
return this;
}

@Override
public Mutation mergeToCell(
@Nonnull String familyName,
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value value) {
com.google.bigtable.v2.Mutation.Builder builder = com.google.bigtable.v2.Mutation.newBuilder();
MergeToCell.Builder mergeToCellBuilder = builder.getMergeToCellBuilder();
mergeToCellBuilder.setFamilyName(familyName);

qualifier.buildTo(mergeToCellBuilder.getColumnQualifierBuilder());
timestamp.buildTo(mergeToCellBuilder.getTimestampBuilder());
value.buildTo(mergeToCellBuilder.getInputBuilder());

addMutation(builder.build());
return this;
}

private void addMutation(com.google.bigtable.v2.Mutation mutation) {
Preconditions.checkState(numMutations + 1 <= MAX_MUTATIONS, "Too many mutations per row");
Preconditions.checkState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,20 @@ default T addToCell(
return addToCell(familyName, ByteString.copyFromUtf8(qualifier), timestamp, value);
}

/**
* Merges a ByteString accumulator value to a cell in an aggregate column family.
*
* <p>This is a convenience override that converts Strings to ByteStrings.
*
* <p>Note: The timestamp values are in microseconds but must match the granularity of the
* table(defaults to `MILLIS`). Therefore, the given value must be a multiple of 1000 (millisecond
* granularity). For example: `1571902339435000`.
*/
default T mergeToCell(
@Nonnull String familyName, @Nonnull String qualifier, long timestamp, ByteString value) {
return mergeToCell(familyName, ByteString.copyFromUtf8(qualifier), timestamp, value);
}

/**
* Adds an int64 value to an aggregate cell. The column family must be an aggregate family and
* have an "int64" input type or this mutation will be rejected.
Expand All @@ -155,6 +169,22 @@ default T addToCell(
Value.IntValue.create(input));
}

/**
* Merges a ByteString accumulator value to a cell in an aggregate column family.
*
* <p>Note: The timestamp values are in microseconds but must match the granularity of the
* table(defaults to `MILLIS`). Therefore, the given value must be a multiple of 1000 (millisecond
* granularity). For example: `1571902339435000`.
*/
default T mergeToCell(
@Nonnull String familyName, @Nonnull ByteString qualifier, long timestamp, ByteString input) {
return mergeToCell(
familyName,
Value.RawValue.create(qualifier),
Value.RawTimestamp.create(timestamp),
Value.RawValue.create(input));
}

/**
* Adds a {@link Value} to an aggregate cell. The column family must be an aggregate family and
* have an input type matching the type of {@link Value} or this mutation will be rejected.
Expand All @@ -168,4 +198,18 @@ T addToCell(
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value input);

/**
* Merges a {@link Value} accumulator to an aggregate cell. The column family must be an aggregate
* family or this mutation will be rejected.
*
* <p>Note: The timestamp values are in microseconds but must match the granularity of the
* table(defaults to `MILLIS`). Therefore, the given value must be a multiple of 1000 (millisecond
* granularity). For example: `1571902339435000`.
*/
T mergeToCell(
@Nonnull String familyName,
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value input);
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,16 @@ public RowMutation addToCell(
return this;
}

@Override
public RowMutation mergeToCell(
@Nonnull String familyName,
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value input) {
mutation.mergeToCell(familyName, qualifier, timestamp, input);
return this;
}

@InternalApi
public MutateRowRequest toProto(RequestContext requestContext) {
MutateRowRequest.Builder builder = MutateRowRequest.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ public RowMutationEntry addToCell(
return this;
}

@Override
public RowMutationEntry mergeToCell(
@Nonnull String familyName,
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value input) {
mutation.mergeToCell(familyName, qualifier, timestamp, input);
return this;
}

@InternalApi
public MutateRowsRequest.Entry toProto() {
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,16 @@ State handleDataChange(ReadChangeStreamResponse.DataChange dataChange) {
Value.fromProto(mod.getAddToCell().getColumnQualifier()),
Value.fromProto(mod.getAddToCell().getTimestamp()),
Value.fromProto(mod.getAddToCell().getInput()));
continue;
}
// Case 5: MergeToCell
if (mod.hasMergeToCell()) {
builder.mergeToCell(
mod.getMergeToCell().getFamilyName(),
Value.fromProto(mod.getMergeToCell().getColumnQualifier()),
Value.fromProto(mod.getMergeToCell().getTimestamp()),
Value.fromProto(mod.getMergeToCell().getInput()));
continue;
}
throw new IllegalStateException(
"Received unknown mod type. You may need to upgrade your Bigtable client.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public void userInitiatedMutationTest() throws IOException, ClassNotFoundExcepti
Value.rawValue(ByteString.copyFromUtf8("col1")),
Value.rawTimestamp(1000),
Value.intValue(1234))
.mergeToCell(
"agg-family",
Value.rawValue(ByteString.copyFromUtf8("col2")),
Value.rawTimestamp(1000),
Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
.build();
Expand Down Expand Up @@ -150,6 +155,11 @@ public void toRowMutationTest() {
Value.rawValue(ByteString.copyFromUtf8("qual1")),
Value.rawTimestamp(1000),
Value.intValue(1234))
.mergeToCell(
"agg-family",
Value.rawValue(ByteString.copyFromUtf8("qual2")),
Value.rawTimestamp(1000),
Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
.build();
Expand All @@ -161,7 +171,7 @@ public void toRowMutationTest() {
NameUtil.formatTableName(
REQUEST_CONTEXT.getProjectId(), REQUEST_CONTEXT.getInstanceId(), TABLE_ID);
assertThat(mutateRowRequest.getTableName()).isEqualTo(tableName);
assertThat(mutateRowRequest.getMutationsList()).hasSize(4);
assertThat(mutateRowRequest.getMutationsList()).hasSize(5);
assertThat(mutateRowRequest.getMutations(0).getSetCell().getValue())
.isEqualTo(ByteString.copyFromUtf8("fake-value"));
assertThat(mutateRowRequest.getMutations(1).getDeleteFromFamily().getFamilyName())
Expand All @@ -178,6 +188,14 @@ public void toRowMutationTest() {
.setTimestamp(Value.rawTimestamp(1000).toProto())
.setInput(Value.intValue(1234).toProto())
.build());
assertThat(mutateRowRequest.getMutations(4).getMergeToCell())
.isEqualTo(
Mutation.MergeToCell.newBuilder()
.setFamilyName("agg-family")
.setColumnQualifier(Value.rawValue(ByteString.copyFromUtf8("qual2")).toProto())
.setTimestamp(Value.rawTimestamp(1000).toProto())
.setInput(Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))).toProto())
.build());
}

@Test
Expand Down Expand Up @@ -220,6 +238,11 @@ public void toRowMutationEntryTest() {
Value.rawValue(ByteString.copyFromUtf8("qual1")),
Value.rawTimestamp(1000),
Value.intValue(1234))
.mergeToCell(
"agg-family",
Value.rawValue(ByteString.copyFromUtf8("qual2")),
Value.rawTimestamp(1000),
Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
.build();
Expand All @@ -228,7 +251,7 @@ public void toRowMutationEntryTest() {
RowMutationEntry rowMutationEntry = changeStreamMutation.toRowMutationEntry();
MutateRowsRequest.Entry mutateRowsRequestEntry = rowMutationEntry.toProto();
assertThat(mutateRowsRequestEntry.getRowKey()).isEqualTo(ByteString.copyFromUtf8("key"));
assertThat(mutateRowsRequestEntry.getMutationsList()).hasSize(4);
assertThat(mutateRowsRequestEntry.getMutationsList()).hasSize(5);
assertThat(mutateRowsRequestEntry.getMutations(0).getSetCell().getValue())
.isEqualTo(ByteString.copyFromUtf8("fake-value"));
assertThat(mutateRowsRequestEntry.getMutations(1).getDeleteFromFamily().getFamilyName())
Expand All @@ -245,6 +268,14 @@ public void toRowMutationEntryTest() {
.setTimestamp(Value.rawTimestamp(1000).toProto())
.setInput(Value.intValue(1234).toProto())
.build());
assertThat(mutateRowsRequestEntry.getMutations(4).getMergeToCell())
.isEqualTo(
Mutation.MergeToCell.newBuilder()
.setFamilyName("agg-family")
.setColumnQualifier(Value.rawValue(ByteString.copyFromUtf8("qual2")).toProto())
.setTimestamp(Value.rawTimestamp(1000).toProto())
.setInput(Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))).toProto())
.build());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.bigtable.v2.Mutation.DeleteFromColumn;
import com.google.bigtable.v2.Mutation.DeleteFromFamily;
import com.google.bigtable.v2.Mutation.DeleteFromRow;
import com.google.bigtable.v2.Mutation.MergeToCell;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.common.primitives.Longs;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -195,6 +196,21 @@ public void addToCellTest() {
assertThat(actual).containsExactly(builder.build());
}

@Test
public void mergeToCellTest() {
mutation.mergeToCell("cf1", "q", 10000, ByteString.copyFrom(Longs.toByteArray(1234L)));
List<com.google.bigtable.v2.Mutation> actual = mutation.getMutations();

com.google.bigtable.v2.Mutation.Builder builder = com.google.bigtable.v2.Mutation.newBuilder();
MergeToCell.Builder mergeToCellBuilder = builder.getMergeToCellBuilder();
mergeToCellBuilder.setFamilyName("cf1");
mergeToCellBuilder.getColumnQualifierBuilder().setRawValue(ByteString.copyFromUtf8("q"));
mergeToCellBuilder.getTimestampBuilder().setRawTimestampMicros(10000);
mergeToCellBuilder.getInputBuilder().setRawValue(ByteString.copyFrom(Longs.toByteArray(1234L)));

assertThat(actual).containsExactly(builder.build());
}

@Test
public void serializationTest() throws IOException, ClassNotFoundException {
Mutation expected = Mutation.create().setCell("cf", "q", "val");
Expand Down Expand Up @@ -281,7 +297,8 @@ public void fromProtoTest() {
ByteString.copyFromUtf8("fake-value"))
.deleteCells("fake-family", ByteString.copyFromUtf8("fake-qualifier"))
.deleteFamily("fake-family2")
.addToCell("agg-family", "qual1", 1000, 1234);
.addToCell("agg-family", "qual1", 1000, 1234)
.mergeToCell("agg-family", "qual2", 1000, ByteString.copyFrom(Longs.toByteArray(1234L)));

List<com.google.bigtable.v2.Mutation> protoMutation = mutation.getMutations();

Expand Down
Loading

0 comments on commit 95d48e6

Please sign in to comment.