Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigtable: add CRUD for clusters #3612

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions google-cloud-clients/google-cloud-bigtable-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<classifier>testlib</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,30 @@
*/
package com.google.cloud.bigtable.admin.v2;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.bigtable.admin.v2.ClusterName;
import com.google.bigtable.admin.v2.DeleteClusterRequest;
import com.google.bigtable.admin.v2.GetClusterRequest;
import com.google.bigtable.admin.v2.InstanceName;
import com.google.bigtable.admin.v2.ListClustersRequest;
import com.google.bigtable.admin.v2.ListClustersResponse;
import com.google.bigtable.admin.v2.LocationName;
import com.google.bigtable.admin.v2.ProjectName;
import com.google.cloud.bigtable.admin.v2.models.Cluster;
import com.google.cloud.bigtable.admin.v2.models.CreateClusterRequest;
import com.google.cloud.bigtable.admin.v2.models.PartialListClustersException;
import com.google.cloud.bigtable.admin.v2.stub.BigtableInstanceAdminStub;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.protobuf.Empty;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nonnull;

/**
Expand Down Expand Up @@ -105,4 +126,304 @@ public ProjectName getProjectName() {
public void close() {
stub.close();
}

/**
* Creates a new cluster in the specified instance.
*
* <p>Sample code:
*
* <pre>{@code
* Cluster cluster = client.createCluster(
* CreateClusterRequest.of("my-instance", "my-new-cluster")
* .setZone("us-east1-c")
* .setServeNodes(3)
* .setStorageType(StorageType.SSD)
* );
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public Cluster createCluster(CreateClusterRequest request) {
return awaitFuture(createClusterAsync(request));
}

/**
* Asynchronously creates a new cluster in the specified instance.
*
* <p>Sample code:
*
* <pre>{@code
* ApiFuture<Cluster> clusterFuture = client.createClusterAsync(
* CreateClusterRequest.of("my-instance", "my-new-cluster")
* .setZone("us-east1-c")
* .setServeNodes(3)
* .setStorageType(StorageType.SSD)
* );
*
* Cluster cluste = clusterFuture.get();
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public ApiFuture<Cluster> createClusterAsync(CreateClusterRequest request) {
return ApiFutures.transform(
stub.createClusterOperationCallable().futureCall(request.toProto(projectName)),
new ApiFunction<com.google.bigtable.admin.v2.Cluster, Cluster>() {
@Override
public Cluster apply(com.google.bigtable.admin.v2.Cluster proto) {
return Cluster.fromProto(proto);
}
},
MoreExecutors.directExecutor()
);
}

/**
* Get the cluster representation by ID.
*
* <p>Sample code:
*
* <pre>{@code
* Cluster cluster = client.getCluster("my-instance", "my-cluster");
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public Cluster getCluster(String instanceId, String clusterId) {
return awaitFuture(getClusterAsync(instanceId, clusterId));
}

/**
* Asynchronously gets the cluster representation by ID.
*
* <p>Sample code:
*
* <pre>{@code
* ApiFuture<Cluster> clusterFuture = client.getClusterAsync("my-instance", "my-cluster");
* Cluster cluster = clusterFuture.get();
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public ApiFuture<Cluster> getClusterAsync(String instanceId, String clusterId) {
ClusterName name = ClusterName.of(projectName.getProject(), instanceId, clusterId);

GetClusterRequest request = GetClusterRequest.newBuilder()
.setName(name.toString())
.build();

return ApiFutures.transform(
stub.getClusterCallable().futureCall(request),
new ApiFunction<com.google.bigtable.admin.v2.Cluster, Cluster>() {
@Override
public Cluster apply(com.google.bigtable.admin.v2.Cluster proto) {
return Cluster.fromProto(proto);
}
},
MoreExecutors.directExecutor()
);
}

/**
* Lists all clusters in the specified instance.
*
* <p>This method will throw a {@link PartialListClustersException} when any zone is
* unavailable. If partial listing are ok, the exception can be caught and inspected.
*
* <p>Sample code:
*
* <pre>{@code
* try {
* List<Cluster> clusters = clister.listClusters("my-instance");
* } catch (PartialListClustersException e) {
* System.out.println("The following zones are unavailable: " + e.getUnavailableZones());
* System.out.println("But the following clusters are reachable: " + e.getClusters())
* }
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public List<Cluster> listClusters(String instanceId) {
return awaitFuture(listClustersAsync(instanceId));
}

/**
* Asynchronously lists all clusters in the specified instance.
*
* <p>This method will throw a {@link PartialListClustersException} when any zone is
* unavailable. If partial listing are ok, the exception can be caught and inspected.
*
* <p>Sample code:
*
* <pre>{@code
* ApiFuture<Cluster> clustersFuture = client.listClustersAsync("my-instance");
*
* ApiFutures.addCallback(clustersFuture, new ApiFutureCallback<List<Cluster>>() {
* public void onFailure(Throwable t) {
* if (t instanceof PartialListClustersException) {
* PartialListClustersException partialError = (PartialListClustersException)t;
* System.out.println("The following zones are unavailable: " + partialError.getUnavailableZones());
* System.out.println("But the following clusters are reachable: " + partialError.getClusters());
* } else {
* t.printStackTrace();
* }
* }
*
* public void onSuccess(List<Cluster> result) {
* System.out.println("Found a complete set of instances: " + result);
* }
* }, MoreExecutors.directExecutor());
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public ApiFuture<List<Cluster>> listClustersAsync(String instanceId) {
InstanceName name = InstanceName.of(projectName.getProject(), instanceId);
ListClustersRequest request = ListClustersRequest.newBuilder()
.setParent(name.toString())
.build();

return ApiFutures.transform(
stub.listClustersCallable().futureCall(request),
new ApiFunction<ListClustersResponse, List<Cluster>>() {
@Override
public List<Cluster> apply(ListClustersResponse proto) {
// NOTE: pagination is intentionally ignored. The server does not implement it and never

This comment was marked as spam.

// will.
Verify.verify(proto.getNextPageToken().isEmpty(),
"Server returned an unexpected paginated response");

ImmutableList.Builder<Cluster> clusters = ImmutableList.builder();
for (com.google.bigtable.admin.v2.Cluster cluster : proto.getClustersList()) {
clusters.add(Cluster.fromProto(cluster));
}

ImmutableList.Builder<String> failedZones = ImmutableList.builder();
for (String locationStr : proto.getFailedLocationsList()) {
LocationName fullLocation = Objects.requireNonNull(LocationName.parse(locationStr));
failedZones.add(fullLocation.getLocation());
}

if (!failedZones.build().isEmpty()) {
throw new PartialListClustersException(failedZones.build(), clusters.build());
}

return clusters.build();
}
},
MoreExecutors.directExecutor()
);
}

/**
* Resizes the cluster's node count. Please note that only clusters that belong to a PRODUCTION
* instance can be resized.
*
* <p>Sample code:
*
* <pre>{@code
* Cluster cluster = clister.resizeCluster("my-instance", "my-cluster", 30);
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public Cluster resizeCluster(String instanceId, String clusterId, int numServeNodes) {
return awaitFuture(resizeClusterAsync(instanceId, clusterId, numServeNodes));
}

/**
* Asynchronously resizes the cluster's node count. Please note that only clusters that belong to
* a PRODUCTION instance can be resized.
*
* <pre>{@code
* ApiFuture<Cluster> clusterFuture = clister.resizeCluster("my-instance", "my-cluster", 30);
* Cluster cluster = clusterFuture.get();
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public ApiFuture<Cluster> resizeClusterAsync(String instanceId, String clusterId,
int numServeNodes) {

ClusterName name = ClusterName.of(projectName.getProject(), instanceId, clusterId);

com.google.bigtable.admin.v2.Cluster request = com.google.bigtable.admin.v2.Cluster.newBuilder()
.setName(name.toString())
.setServeNodes(numServeNodes)
.build();

return ApiFutures.transform(
stub.updateClusterOperationCallable().futureCall(request),
new ApiFunction<com.google.bigtable.admin.v2.Cluster, Cluster>() {
@Override
public Cluster apply(com.google.bigtable.admin.v2.Cluster proto) {
return Cluster.fromProto(proto);
}
},
MoreExecutors.directExecutor()
);
}

/**
* Deletes the specified cluster. Please note that an instance must have at least 1 cluster. To
* remove the last cluster, please use {@link BigtableInstanceAdminClient#deleteInstance(String)}.
*
* <p>Sample code:
*
* <pre>{@code
* client.deleteCluster("my-instance", "my-cluster");
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public void deleteCluster(String instanceId, String clusterId) {
awaitFuture(deleteClusterAsync(instanceId, clusterId));
}

/**
* Asynchronously deletes the specified cluster. Please note that an instance must have at least 1
* cluster. To remove the last cluster, please use {@link BigtableInstanceAdminClient#deleteInstanceAsync(String)}.
*
* <p>Sample code:
*
* <pre>{@code
* ApiFuture<Void> future = client.deleteClusterAsync("my-instance", "my-cluster");
* future.get();
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public ApiFuture<Void> deleteClusterAsync(String instanceId, String clusterId) {
ClusterName name = ClusterName.of(projectName.getProject(), instanceId, clusterId);

DeleteClusterRequest request = DeleteClusterRequest.newBuilder()
.setName(name.toString())
.build();

return ApiFutures.transform(
stub.deleteClusterCallable().futureCall(request),
new ApiFunction<Empty, Void>() {
@Override
public Void apply(Empty input) {
return null;
}
},
MoreExecutors.directExecutor()
);
}

/**
* Awaits the result of a future, taking care to propagate errors while maintaining the call site
* in a suppressed exception. This allows semantic errors to be caught across threads, while
* preserving the call site in the error. The caller's stacktrace will be made available as a
* suppressed exception.
*/
// TODO(igorbernstein2): try to move this into gax
private <T> T awaitFuture(ApiFuture<T> future) {
RuntimeException error;
try {
return Futures.getUnchecked(future);
} catch (UncheckedExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
error = (RuntimeException) e.getCause();
} else {
error = e;
}
} catch (RuntimeException e) {
error = e;
}
// Add the caller's stack as a suppressed exception
error.addSuppressed(new RuntimeException("Encountered error while awaiting future"));
throw error;
}
}
Loading