Skip to content

Commit

Permalink
use the term "bundling" instead of "batching" (#1479)
Browse files Browse the repository at this point in the history
  • Loading branch information
pongad authored Dec 15, 2016
1 parent daaf061 commit 663d6cf
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@
* A Cloud Pub/Sub <a href="https://cloud.google.com/pubsub/docs/publisher">publisher</a>, that is
* associated with a specific topic at creation.
*
* <p>A {@link Publisher} provides built-in capabilities to automatically handle batching of
* <p>A {@link Publisher} provides built-in capabilities to automatically handle bundling of
* messages, controlling memory utilization, and retrying API calls on transient errors.
*
* <p>With customizable options that control:
*
* <ul>
* <li>Message batching: such as number of messages or max batch byte size.
* <li>Message bundling: such as number of messages or max bundle byte size.
* <li>Flow control: such as max outstanding messages and maximum outstanding bytes.
* <li>Retries: such as the maximum duration of retries for a failing batch of messages.
* <li>Retries: such as the maximum duration of retries for a failing bundle of messages.
* </ul>
*
* <p>If no credentials are provided, the {@link Publisher} will use application default credentials
Expand All @@ -51,7 +51,7 @@
* <pre>
* Publisher publisher =
* Publisher.Builder.newBuilder(MY_TOPIC)
* .setMaxBatchDuration(new Duration(10 * 1000))
* .setMaxBundleDuration(new Duration(10 * 1000))
* .build();
* List<ListenableFuture<String>> results = new ArrayList<>();
*
Expand Down Expand Up @@ -81,23 +81,23 @@ public interface Publisher {
String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub";

// API limits.
int MAX_BATCH_MESSAGES = 1000;
int MAX_BATCH_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
int MAX_BUNDLE_MESSAGES = 1000;
int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)

// Meaningful defaults.
int DEFAULT_MAX_BATCH_MESSAGES = 100;
int DEFAULT_MAX_BATCH_BYTES = 1000; // 1 kB
Duration DEFAULT_MAX_BATCH_DURATION = new Duration(1); // 1ms
int DEFAULT_MAX_BUNDLE_MESSAGES = 100;
int DEFAULT_MAX_BUNDLE_BYTES = 1000; // 1 kB
Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
Duration MIN_SEND_BATCH_DURATION = new Duration(10 * 1000); // 10 seconds
Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds

/** Topic to which the publisher publishes to. */
String getTopic();

/**
* Schedules the publishing of a message. The publishing of the message may occur immediately or
* be delayed based on the publisher batching options.
* be delayed based on the publisher bundling options.
*
* <p>Depending on chosen flow control {@link #failOnFlowControlLimits option}, the returned
* future might immediately fail with a {@link CloudPubsubFlowControlException} or block the
Expand All @@ -109,13 +109,13 @@ public interface Publisher {
ListenableFuture<String> publish(PubsubMessage message);

/** Maximum amount of time to wait until scheduling the publishing of messages. */
Duration getMaxBatchDuration();
Duration getMaxBundleDuration();

/** Maximum number of bytes to batch before publishing. */
long getMaxBatchBytes();
/** Maximum number of bytes to bundle before publishing. */
long getMaxBundleBytes();

/** Maximum number of messages to batch before publishing. */
long getMaxBatchMessages();
/** Maximum number of messages to bundle before publishing. */
long getMaxBundleMessages();

/**
* Maximum number of outstanding (i.e. pending to publish) messages before limits are enforced.
Expand Down Expand Up @@ -155,18 +155,18 @@ public interface Publisher {
final class Builder {
String topic;

// Batching options
int maxBatchMessages;
int maxBatchBytes;
Duration maxBatchDuration;
// Bundling options
int maxBundleMessages;
int maxBundleBytes;
Duration maxBundleDuration;

// Client-side flow control options
Optional<Integer> maxOutstandingMessages;
Optional<Integer> maxOutstandingBytes;
boolean failOnFlowControlLimits;

// Send batch deadline
Duration sendBatchDeadline;
// Send bundle deadline
Duration sendBundleDeadline;

// RPC options
Duration requestTimeout;
Expand All @@ -192,11 +192,11 @@ private void setDefaults() {
channelBuilder = Optional.absent();
maxOutstandingMessages = Optional.absent();
maxOutstandingBytes = Optional.absent();
maxBatchMessages = DEFAULT_MAX_BATCH_MESSAGES;
maxBatchBytes = DEFAULT_MAX_BATCH_BYTES;
maxBatchDuration = DEFAULT_MAX_BATCH_DURATION;
maxBundleMessages = DEFAULT_MAX_BUNDLE_MESSAGES;
maxBundleBytes = DEFAULT_MAX_BUNDLE_BYTES;
maxBundleDuration = DEFAULT_MAX_BUNDLE_DURATION;
requestTimeout = DEFAULT_REQUEST_TIMEOUT;
sendBatchDeadline = MIN_SEND_BATCH_DURATION;
sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;
failOnFlowControlLimits = false;
executor = Optional.absent();
}
Expand Down Expand Up @@ -224,16 +224,16 @@ public Builder setChannelBuilder(
return this;
}

// Batching options
// Bundling options

/**
* Maximum number of messages to send per publish call.
*
* <p>It also sets a target to when to trigger a publish.
*/
public Builder setMaxBatchMessages(int messages) {
public Builder setMaxBundleMessages(int messages) {
Preconditions.checkArgument(messages > 0);
maxBatchMessages = messages;
maxBundleMessages = messages;
return this;
}

Expand All @@ -244,19 +244,19 @@ public Builder setMaxBatchMessages(int messages) {
*
* <p>This will not be honored if a single message is published that exceeds this maximum.
*/
public Builder setMaxBatchBytes(int bytes) {
public Builder setMaxBundleBytes(int bytes) {
Preconditions.checkArgument(bytes > 0);
maxBatchBytes = bytes;
maxBundleBytes = bytes;
return this;
}

/**
* Time to wait, since the first message is kept in memory for batching, before triggering a
* Time to wait, since the first message is kept in memory for bundling, before triggering a
* publish call.
*/
public Builder setMaxBatchDuration(Duration duration) {
public Builder setMaxBundleDuration(Duration duration) {
Preconditions.checkArgument(duration.getMillis() >= 0);
maxBatchDuration = duration;
maxBundleDuration = duration;
return this;
}

Expand Down Expand Up @@ -289,10 +289,10 @@ public Builder setFailOnFlowControlLimits(boolean fail) {
return this;
}

/** Maximum time to attempt sending (and retrying) a batch of messages before giving up. */
public Builder setSendBatchDeadline(Duration deadline) {
Preconditions.checkArgument(deadline.compareTo(MIN_SEND_BATCH_DURATION) >= 0);
sendBatchDeadline = deadline;
/** Maximum time to attempt sending (and retrying) a bundle of messages before giving up. */
public Builder setSendBundleDeadline(Duration deadline) {
Preconditions.checkArgument(deadline.compareTo(MIN_SEND_BUNDLE_DURATION) >= 0);
sendBundleDeadline = deadline;
return this;
}

Expand Down Expand Up @@ -329,14 +329,14 @@ public MaxOutstandingMessagesReachedException(int currentMaxMessages) {
this.currentMaxMessages = currentMaxMessages;
}

public int getCurrentMaxBatchMessages() {
public int getCurrentMaxBundleMessages() {
return currentMaxMessages;
}

@Override
public String toString() {
return String.format(
"The maximum number of batch messages: %d have been reached.", currentMaxMessages);
"The maximum number of bundle messages: %d have been reached.", currentMaxMessages);
}
}

Expand All @@ -351,14 +351,14 @@ public MaxOutstandingBytesReachedException(int currentMaxBytes) {
this.currentMaxBytes = currentMaxBytes;
}

public int getCurrentMaxBatchBytes() {
public int getCurrentMaxBundleBytes() {
return currentMaxBytes;
}

@Override
public String toString() {
return String.format(
"The maximum number of batch bytes: %d have been reached.", currentMaxBytes);
"The maximum number of bundle bytes: %d have been reached.", currentMaxBytes);
}
}
}
Loading

0 comments on commit 663d6cf

Please sign in to comment.