Skip to content

Commit

Permalink
Add support for incomplete entities to Datastore.put (#1040)
Browse files Browse the repository at this point in the history
Add support for entities with incomplete key to Datastore.put
  • Loading branch information
mziccard committed Jun 8, 2016
1 parent d3a49ea commit eb6f182
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,65 @@ public final void update(Entity... entities) {
}
}

@SafeVarargs
private void putInternal(FullEntity<Key> entity) {
Key key = entity.key();
toAdd.remove(key);
toUpdate.remove(key);
toDelete.remove(key);
toPut.put(key, entity);
}

@Override
public final void put(Entity... entities) {
public final Entity put(FullEntity<?> entity) {
return DatastoreHelper.put(this, entity);
}

@SuppressWarnings("unchecked")
@Override
public final void putWithDeferredIdAllocation(FullEntity<?>... entities) {
validateActive();
for (Entity entity : entities) {
Key key = entity.key();
toAdd.remove(key);
toUpdate.remove(key);
toDelete.remove(key);
toPut.put(key, entity);
for (FullEntity<?> entity : entities) {
IncompleteKey key = entity.key();
Preconditions.checkArgument(key != null, "Entity must have a key");
if (key instanceof Key) {
putInternal(Entity.convert((FullEntity<Key>) entity));
} else {
toAddAutoId.add((FullEntity<IncompleteKey>) entity);
}
}
}

@SuppressWarnings("unchecked")
@Override
public final List<Entity> put(FullEntity<?>... entities) {
validateActive();
List<IncompleteKey> incompleteKeys = Lists.newArrayListWithExpectedSize(entities.length);
for (FullEntity<?> entity : entities) {
IncompleteKey key = entity.key();
Preconditions.checkArgument(key != null, "Entity must have a key");
if (!(key instanceof Key)) {
incompleteKeys.add(key);
}
}
Iterator<Key> allocated;
if (!incompleteKeys.isEmpty()) {
IncompleteKey[] toAllocate = Iterables.toArray(incompleteKeys, IncompleteKey.class);
allocated = datastore().allocateId(toAllocate).iterator();
} else {
allocated = Collections.emptyIterator();
}
List<Entity> answer = Lists.newArrayListWithExpectedSize(entities.length);
for (FullEntity<?> entity : entities) {
if (entity.key() instanceof Key) {
putInternal((FullEntity<Key>) entity);
answer.add(Entity.convert((FullEntity<Key>) entity));
} else {
Entity entityWithAllocatedId = Entity.builder(allocated.next(), entity).build();
putInternal(entityWithAllocatedId);
answer.add(entityWithAllocatedId);
}
}
return answer;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,28 +77,35 @@ interface TransactionCallable<T> {
* @throws DatastoreException upon failure
* @see #allocateId(IncompleteKey)
*/
List<Key> allocateId(IncompleteKey... key);
List<Key> allocateId(IncompleteKey... keys);

/**
* {@inheritDoc}
* @throws DatastoreException upon failure
*/
@Override
void update(Entity... entity);
void update(Entity... entities);

/**
* {@inheritDoc}
* @throws DatastoreException upon failure
*/
@Override
void put(Entity... entity);
Entity put(FullEntity<?> entity);

/**
* {@inheritDoc}
* @throws DatastoreException upon failure
*/
@Override
void delete(Key... key);
List<Entity> put(FullEntity<?>... entities);

/**
* {@inheritDoc}
* @throws DatastoreException upon failure
*/
@Override
void delete(Key... keys);

/**
* Returns a new KeyFactory for this service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,60 +19,72 @@
import java.util.List;

/**
* An interface to represent a batch of write operations.
* All write operation for a batch writer will be applied to the Datastore in one RPC call.
* An interface to represent a batch of write operations. All write operation for a batch writer
* will be applied to the Datastore in one RPC call.
*/
interface DatastoreBatchWriter extends DatastoreWriter {

/**
* Datastore add operation.
* This method will also allocate id for any entity with an incomplete key.
* As oppose to {@link #add(FullEntity)}, this method will defer any necessary id allocation
* Datastore add operation. This method will also allocate id for any entity with an incomplete
* key. As opposed to {@link #add(FullEntity)}, this method will defer any necessary id allocation
* to submit time.
*
* @throws IllegalArgumentException if any of the given entities is missing a key
* @throws DatastoreException if a given entity with a
* complete key was already added to this writer or if not active
* @throws DatastoreException if a given entity with a complete key was already added to this
* writer or if not active
*/
void addWithDeferredIdAllocation(FullEntity<?>... entity);
void addWithDeferredIdAllocation(FullEntity<?>... entities);

/**
* {@inheritDoc}
* For entities with complete keys that were marked for deletion in this writer the operation
* will be changed to {@link #put}.
* @throws DatastoreException if a given entity with the
* same complete key was already added to this writer, if writer is not active or
* if id allocation for an entity with an incomplete key failed.
*
* @throws DatastoreException if a given entity with the same complete key was already added to
* this writer, if writer is not active or if id allocation for an entity with an incomplete
* key failed.
*/
@Override
List<Entity> add(FullEntity<?>... entity);
List<Entity> add(FullEntity<?>... entities);

/**
* {@inheritDoc}
* This operation will be converted to {@link #put} operation for entities that were already
* added or put in this writer
* @throws DatastoreException if an entity is marked for
* deletion in this writer or if not active
* added or put in this writer.
*
* @throws DatastoreException if an entity is marked for deletion in this writer or if not active
*/
@Override
void update(Entity... entity);
void update(Entity... entities);

/**
* {@inheritDoc}
* This operation will also remove from this batch any prior writes for entities with the same
* keys
* keys
*
* @throws DatastoreException if not active
*/
@Override
void delete(Key... key);
void delete(Key... keys);

/**
* Datastore put operation. This method will also allocate id for any entity with an incomplete
* key. As opposed to {@link #put(FullEntity[])}, this method will defer any necessary id
* allocation to submit time.
*
* @throws IllegalArgumentException if any of the given entities is missing a key
* @throws DatastoreException if not active
*/
void putWithDeferredIdAllocation(FullEntity<?>... entities);

/**
* {@inheritDoc}
* This operation will also remove from this writer any prior writes for the same entities.
*
* @throws DatastoreException if not active
*/
@Override
void put(Entity... entity);
List<Entity> put(FullEntity<?>... entities);

/**
* Returns {@code true} if still active (write operations were not sent to the Datastore).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ static Entity add(DatastoreWriter writer, FullEntity<?> entity) {
return writer.add(new FullEntity<?>[] {entity}).get(0);
}

static Entity put(DatastoreWriter writer, FullEntity<?> entity) {
return writer.put(new FullEntity<?>[] {entity}).get(0);
}

static KeyFactory newKeyFactory(DatastoreOptions options) {
return new KeyFactory(options.projectId(), options.namespace());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,12 @@ com.google.datastore.v1beta3.RunQueryResponse runQuery(
try {
return RetryHelper.runWithRetries(
new Callable<com.google.datastore.v1beta3.RunQueryResponse>() {
@Override public com.google.datastore.v1beta3.RunQueryResponse call()
throws DatastoreException {
return datastoreRpc.runQuery(requestPb);
}
}, retryParams, EXCEPTION_HANDLER, options().clock());
@Override
public com.google.datastore.v1beta3.RunQueryResponse call()
throws DatastoreException {
return datastoreRpc.runQuery(requestPb);
}
}, retryParams, EXCEPTION_HANDLER, options().clock());
} catch (RetryHelperException e) {
throw DatastoreException.translateAndThrow(e);
}
Expand Down Expand Up @@ -125,11 +126,12 @@ com.google.datastore.v1beta3.AllocateIdsResponse allocateIds(
try {
return RetryHelper.runWithRetries(
new Callable<com.google.datastore.v1beta3.AllocateIdsResponse>() {
@Override public com.google.datastore.v1beta3.AllocateIdsResponse call()
throws DatastoreException {
return datastoreRpc.allocateIds(requestPb);
}
}, retryParams, EXCEPTION_HANDLER, options().clock());
@Override
public com.google.datastore.v1beta3.AllocateIdsResponse call()
throws DatastoreException {
return datastoreRpc.allocateIds(requestPb);
}
}, retryParams, EXCEPTION_HANDLER, options().clock());
} catch (RetryHelperException e) {
throw DatastoreException.translateAndThrow(e);
}
Expand Down Expand Up @@ -166,7 +168,7 @@ public List<Entity> add(FullEntity<?>... entities) {
"Duplicate entity with the key %s", entity.key());
}
} else {
Preconditions.checkArgument(entity.hasKey(), "entity %s is missing a key", entity);
Preconditions.checkArgument(entity.hasKey(), "Entity %s is missing a key", entity);
}
mutationsPb.add(com.google.datastore.v1beta3.Mutation.newBuilder()
.setInsert(entity.toPb()).build());
Expand Down Expand Up @@ -281,19 +283,19 @@ com.google.datastore.v1beta3.LookupResponse lookup(
try {
return RetryHelper.runWithRetries(
new Callable<com.google.datastore.v1beta3.LookupResponse>() {
@Override public com.google.datastore.v1beta3.LookupResponse call()
throws DatastoreException {
return datastoreRpc.lookup(requestPb);
}
}, retryParams, EXCEPTION_HANDLER, options().clock());
@Override
public com.google.datastore.v1beta3.LookupResponse call()
throws DatastoreException {
return datastoreRpc.lookup(requestPb);
}
}, retryParams, EXCEPTION_HANDLER, options().clock());
} catch (RetryHelperException e) {
throw DatastoreException.translateAndThrow(e);
}
}

@SafeVarargs
@Override
public final void update(Entity... entities) {
public void update(Entity... entities) {
if (entities.length > 0) {
List<com.google.datastore.v1beta3.Mutation> mutationsPb =
new ArrayList<>();
Expand All @@ -309,22 +311,47 @@ public final void update(Entity... entities) {
}
}

@SafeVarargs
@Override
public final void put(Entity... entities) {
if (entities.length > 0) {
List<com.google.datastore.v1beta3.Mutation> mutationsPb =
new ArrayList<>();
Map<Key, Entity> dedupEntities = new LinkedHashMap<>();
for (Entity entity : entities) {
dedupEntities.put(entity.key(), entity);
}
for (Entity e : dedupEntities.values()) {
public Entity put(FullEntity<?> entity) {
return DatastoreHelper.put(this, entity);
}

@SuppressWarnings("unchecked")
@Override
public List<Entity> put(FullEntity<?>... entities) {
if (entities.length == 0) {
return Collections.emptyList();
}
List<com.google.datastore.v1beta3.Mutation> mutationsPb = new ArrayList<>();
Map<Key, Entity> dedupEntities = new LinkedHashMap<>();
for (FullEntity<?> entity : entities) {
Preconditions.checkArgument(entity.hasKey(), "Entity %s is missing a key", entity);
if (entity.key() instanceof Key) {
Entity completeEntity = Entity.convert((FullEntity<Key>) entity);
dedupEntities.put(completeEntity.key(), completeEntity);
} else {
mutationsPb.add(
com.google.datastore.v1beta3.Mutation.newBuilder().setUpsert(e.toPb()).build());
com.google.datastore.v1beta3.Mutation.newBuilder().setUpsert(entity.toPb()).build());
}
}
for (Entity entity : dedupEntities.values()) {
mutationsPb.add(
com.google.datastore.v1beta3.Mutation.newBuilder().setUpsert(entity.toPb()).build());
}
com.google.datastore.v1beta3.CommitResponse commitResponse = commitMutation(mutationsPb);
Iterator<com.google.datastore.v1beta3.MutationResult> mutationResults =
commitResponse.getMutationResultsList().iterator();
ImmutableList.Builder<Entity> responseBuilder = ImmutableList.builder();
for (FullEntity<?> entity : entities) {
Entity completeEntity = dedupEntities.get(entity.key());
if (completeEntity != null) {
responseBuilder.add(completeEntity);
} else {
responseBuilder.add(
Entity.builder(Key.fromPb(mutationResults.next().getKey()), entity).build());
}
commitMutation(mutationsPb);
}
return responseBuilder.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface DatastoreReader {
* @throws DatastoreException upon failure
* @see #get(Key)
*/
Iterator<Entity> get(Key... key);
Iterator<Entity> get(Key... keys);

/**
* Returns a list with a value for each given key (ordered by input). {@code null} values are
Expand Down
Loading

0 comments on commit eb6f182

Please sign in to comment.