Skip to content

Commit

Permalink
Add whenDone method and CompletionCallback to Job and Operation (#1016)
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed Jun 9, 2016
1 parent f176d81 commit 3b35a9e
Show file tree
Hide file tree
Showing 18 changed files with 852 additions and 330 deletions.
17 changes: 4 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,7 @@ if (table == null) {
}
System.out.println("Loading data into table " + tableId);
Job loadJob = table.load(FormatOptions.csv(), "gs://bucket/path");
while (!loadJob.isDone()) {
Thread.sleep(1000L);
}
loadJob = loadJob.waitFor();
if (loadJob.status().error() != null) {
System.out.println("Job completed with errors");
} else {
Expand Down Expand Up @@ -203,7 +201,6 @@ import com.google.cloud.compute.Compute;
import com.google.cloud.compute.ComputeOptions;
import com.google.cloud.compute.Disk;
import com.google.cloud.compute.DiskId;
import com.google.cloud.compute.Operation;
import com.google.cloud.compute.Snapshot;
Compute compute = ComputeOptions.defaultInstance().service();
Expand All @@ -212,12 +209,10 @@ Disk disk = compute.getDisk(diskId, Compute.DiskOption.fields());
if (disk != null) {
String snapshotName = "disk-name-snapshot";
Operation operation = disk.createSnapshot(snapshotName);
while (!operation.isDone()) {
Thread.sleep(1000L);
}
operation = operation.waitFor();
if (operation.errors() == null) {
// use snapshot
Snapshot snapshot = compute.getSnapshot("disk-name-snapshot");
Snapshot snapshot = compute.getSnapshot(snapshotName);
}
}
```
Expand All @@ -234,8 +229,6 @@ import com.google.cloud.compute.InstanceId;
import com.google.cloud.compute.InstanceInfo;
import com.google.cloud.compute.MachineTypeId;
import com.google.cloud.compute.NetworkId;
import com.google.cloud.compute.NetworkInterface;
import com.google.cloud.compute.Operation;
Compute compute = ComputeOptions.defaultInstance().service();
ImageId imageId = ImageId.of("debian-cloud", "debian-8-jessie-v20160329");
Expand All @@ -246,9 +239,7 @@ InstanceId instanceId = InstanceId.of("us-central1-a", "instance-name");
MachineTypeId machineTypeId = MachineTypeId.of("us-central1-a", "n1-standard-1");
Operation operation =
compute.create(InstanceInfo.of(instanceId, machineTypeId, attachedDisk, networkInterface));
while (!operation.isDone()) {
Thread.sleep(1000L);
}
operation = operation.waitFor();
if (operation.errors() == null) {
// use instance
Instance instance = compute.getInstance(instanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.Clock;
import com.google.cloud.WaitForOption;
import com.google.cloud.WaitForOption.CheckingPeriod;
import com.google.cloud.WaitForOption.Timeout;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* A Google BigQuery Job.
Expand Down Expand Up @@ -143,6 +150,59 @@ public boolean isDone() {
return job == null || job.status().state() == JobStatus.State.DONE;
}

/**
* Blocks until this job completes its execution, either failing or succeeding. This method
* returns current job's latest information. If the job no longer exists, this method returns
* {@code null}. By default, the job status is checked every 500 milliseconds, to configure this
* value use {@link WaitForOption#checkEvery(long, TimeUnit)}. Use
* {@link WaitForOption#timeout(long, TimeUnit)} to set the maximum time to wait.
*
* <p>Example usage of {@code waitFor()}:
* <pre> {@code
* Job completedJob = job.waitFor();
* if (completedJob == null) {
* // job no longer exists
* } else if (completedJob.status().error() != null) {
* // job failed, handle error
* } else {
* // job completed successfully
* }}</pre>
*
* <p>Example usage of {@code waitFor()} with checking period and timeout:
* <pre> {@code
* Job completedJob = job.waitFor(WaitForOption.checkEvery(1, TimeUnit.SECONDS),
* WaitForOption.timeout(60, TimeUnit.SECONDS));
* if (completedJob == null) {
* // job no longer exists
* } else if (completedJob.status().error() != null) {
* // job failed, handle error
* } else {
* // job completed successfully
* }}</pre>
*
* @param waitOptions options to configure checking period and timeout
* @throws BigQueryException upon failure
* @throws InterruptedException if the current thread gets interrupted while waiting for the job
* to complete
* @throws TimeoutException if the timeout provided with
* {@link WaitForOption#timeout(long, TimeUnit)} is exceeded. If no such option is provided
* this exception is never thrown.
*/
public Job waitFor(WaitForOption... waitOptions) throws InterruptedException, TimeoutException {
Timeout timeout = Timeout.getOrDefault(waitOptions);
CheckingPeriod checkingPeriod = CheckingPeriod.getOrDefault(waitOptions);
long timeoutMillis = timeout.timeoutMillis();
Clock clock = options.clock();
long startTime = clock.millis();
while (!isDone()) {
if (timeoutMillis != -1 && (clock.millis() - startTime) >= timeoutMillis) {
throw new TimeoutException();
}
checkingPeriod.sleep();
}
return reload();
}

/**
* Fetches current job's latest information. Returns {@code null} if the job does not exist.
*
Expand All @@ -151,7 +211,7 @@ public boolean isDone() {
* @throws BigQueryException upon failure
*/
public Job reload(BigQuery.JobOption... options) {
return bigquery.getJob(jobId().job(), options);
return bigquery.getJob(jobId(), options);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@
* }
* System.out.println("Loading data into table " + tableId);
* Job loadJob = table.load(FormatOptions.csv(), "gs://bucket/path");
* while (!loadJob.isDone()) {
* Thread.sleep(1000L);
* }
* loadJob = loadJob.waitFor();
* if (loadJob.status().error() != null) {
* System.out.println("Job completed with errors");
* } else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,18 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

import com.google.cloud.Clock;
import com.google.cloud.WaitForOption;
import com.google.cloud.bigquery.JobStatistics.CopyStatistics;

import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class JobTest {

Expand Down Expand Up @@ -66,6 +74,9 @@ public class JobTest {
private Job expectedJob;
private Job job;

@Rule
public final ExpectedException thrown = ExpectedException.none();

private void initializeExpectedJob(int optionsCalls) {
expect(serviceMockReturnsOptions.options()).andReturn(mockOptions).times(optionsCalls);
replay(serviceMockReturnsOptions);
Expand Down Expand Up @@ -177,13 +188,113 @@ public void testIsDone_NotExists() throws Exception {
assertTrue(job.isDone());
}

@Test
public void testWaitFor() throws InterruptedException, TimeoutException {
initializeExpectedJob(2);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
JobStatus status = createStrictMock(JobStatus.class);
expect(status.state()).andReturn(JobStatus.State.DONE);
expect(bigquery.options()).andReturn(mockOptions);
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
Job completedJob = expectedJob.toBuilder().status(status).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(completedJob);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(completedJob);
replay(status, bigquery, mockOptions);
initializeJob();
assertSame(completedJob, job.waitFor());
verify(status, mockOptions);
}

@Test
public void testWaitFor_Null() throws InterruptedException, TimeoutException {
initializeExpectedJob(1);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
expect(bigquery.options()).andReturn(mockOptions);
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(null);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
replay(bigquery, mockOptions);
initializeJob();
assertNull(job.waitFor());
verify(mockOptions);
}

@Test
public void testWaitForWithCheckingPeriod() throws InterruptedException, TimeoutException {
initializeExpectedJob(3);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
timeUnit.sleep(42);
EasyMock.expectLastCall();
JobStatus status = createStrictMock(JobStatus.class);
expect(status.state()).andReturn(JobStatus.State.RUNNING);
expect(status.state()).andReturn(JobStatus.State.DONE);
expect(bigquery.options()).andReturn(mockOptions);
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
Job runningJob = expectedJob.toBuilder().status(status).build();
Job completedJob = expectedJob.toBuilder().status(status).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(completedJob);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(completedJob);
replay(status, bigquery, timeUnit, mockOptions);
initializeJob();
assertSame(completedJob, job.waitFor(WaitForOption.checkEvery(42, timeUnit)));
verify(status, timeUnit, mockOptions);
}

@Test
public void testWaitForWithCheckingPeriod_Null() throws InterruptedException, TimeoutException {
initializeExpectedJob(2);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
timeUnit.sleep(42);
EasyMock.expectLastCall();
expect(bigquery.options()).andReturn(mockOptions);
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
Job runningJob = expectedJob.toBuilder().status(new JobStatus(JobStatus.State.RUNNING)).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(null);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
replay(bigquery, timeUnit, mockOptions);
initializeJob();
assertNull(job.waitFor(WaitForOption.checkEvery(42, timeUnit)));
verify(bigquery, timeUnit, mockOptions);
}

@Test
public void testWaitForWithTimeout() throws InterruptedException, TimeoutException {
initializeExpectedJob(2);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
timeUnit.sleep(1);
EasyMock.expectLastCall();
Clock clock = createStrictMock(Clock.class);
expect(clock.millis()).andReturn(0L);
expect(clock.millis()).andReturn(1L);
expect(clock.millis()).andReturn(3L);
JobStatus status = createStrictMock(JobStatus.class);
expect(status.state()).andReturn(JobStatus.State.RUNNING);
expect(status.state()).andReturn(JobStatus.State.RUNNING);
expect(bigquery.options()).andReturn(mockOptions);
expect(mockOptions.clock()).andReturn(clock);
Job runningJob = expectedJob.toBuilder().status(status).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
replay(status, bigquery, timeUnit, clock, mockOptions);
initializeJob();
thrown.expect(TimeoutException.class);
job.waitFor(WaitForOption.checkEvery(1, timeUnit),
WaitForOption.timeout(3, TimeUnit.MILLISECONDS));
verify(status, timeUnit, clock, mockOptions);
}

@Test
public void testReload() throws Exception {
initializeExpectedJob(4);
JobInfo updatedInfo = JOB_INFO.toBuilder().etag("etag").build();
Job expectedJob = new Job(serviceMockReturnsOptions, new JobInfo.BuilderImpl(updatedInfo));
expect(bigquery.options()).andReturn(mockOptions);
expect(bigquery.getJob(JOB_INFO.jobId().job())).andReturn(expectedJob);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(expectedJob);
replay(bigquery);
initializeJob();
Job updatedJob = job.reload();
Expand All @@ -194,7 +305,7 @@ public void testReload() throws Exception {
public void testReloadNull() throws Exception {
initializeExpectedJob(1);
expect(bigquery.options()).andReturn(mockOptions);
expect(bigquery.getJob(JOB_INFO.jobId().job())).andReturn(null);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
replay(bigquery);
initializeJob();
assertNull(job.reload());
Expand All @@ -206,7 +317,7 @@ public void testReloadWithOptions() throws Exception {
JobInfo updatedInfo = JOB_INFO.toBuilder().etag("etag").build();
Job expectedJob = new Job(serviceMockReturnsOptions, new JobInfo.BuilderImpl(updatedInfo));
expect(bigquery.options()).andReturn(mockOptions);
expect(bigquery.getJob(JOB_INFO.jobId().job(), BigQuery.JobOption.fields()))
expect(bigquery.getJob(JOB_INFO.jobId(), BigQuery.JobOption.fields()))
.andReturn(expectedJob);
replay(bigquery);
initializeJob();
Expand Down
Loading

0 comments on commit 3b35a9e

Please sign in to comment.