diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java index d49328ad7486..effa3b59ef0c 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java @@ -21,6 +21,7 @@ import com.google.cloud.pubsub.PubSub.MessageConsumer; import com.google.cloud.pubsub.PubSub.MessageProcessor; import com.google.cloud.pubsub.PubSub.PullOption; +import com.google.common.base.Function; import java.io.IOException; import java.io.ObjectInputStream; @@ -103,7 +104,11 @@ public boolean equals(Object obj) { return false; } Subscription other = (Subscription) obj; - return Objects.equals(toPb(), other.toPb()) && Objects.equals(options, other.options); + return Objects.equals(topic(), other.topic()) + && Objects.equals(name(), other.name()) + && Objects.equals(pushConfig(), other.pushConfig()) + && ackDeadlineSeconds() == other.ackDeadlineSeconds() + && Objects.equals(options, other.options); } public PubSub pubSub() { @@ -155,4 +160,14 @@ static Subscription fromPb(PubSub storage, com.google.pubsub.v1.Subscription sub SubscriptionInfo subscriptionInfo = SubscriptionInfo.fromPb(subscriptionPb); return new Subscription(storage, new BuilderImpl(subscriptionInfo)); } + + static Function fromPbFunction( + final PubSub pubsub) { + return new Function() { + @Override + public Subscription apply(com.google.pubsub.v1.Subscription subscriptionPb) { + return fromPb(pubsub, subscriptionPb); + } + }; + } } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java index 97e7b35becd9..4ec97ca7d3f7 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java @@ -18,13 +18,35 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.cloud.pubsub.spi.v1.PublisherApi; +import com.google.cloud.pubsub.spi.v1.SubscriberApi; import com.google.common.base.MoreObjects; import java.io.Serializable; import java.util.Objects; +import java.util.concurrent.TimeUnit; /** - * Pub/Sub subscription information. + * A Google Cloud Pub/Sub subscription. A subscription represents the stream of messages from a + * single, specific topic, to be delivered to the subscribing application. Pub/Sub subscriptions + * support both push and pull message delivery. + * + *

In a push subscription, the Pub/Sub server sends a request to the subscriber application, at a + * preconfigured endpoint (see {@link PushConfig}). The subscriber's HTTP response serves as an + * implicit acknowledgement: a success response indicates that the message has been succesfully + * processed and the Pub/Sub system can delete it from the subscription; a non-success response + * indicates that the Pub/Sub server should resend it (implicit "nack"). + * + *

In a pull subscription, the subscribing application must explicitly pull messages using one of + * {@link PubSub#pull(String, PubSub.PullOption...)}, + * {@link PubSub#pullAsync(String, PubSub.MessageProcessor)} or + * {@link PubSub#pullAsync(String, PubSub.PullOption...)}. The subscribing application must then + * explicitly acknowledge the messages using one of {@link PubSub#ack(String, Iterable)}, + * {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or + * {@link PubSub#ackAsync(String, String, String...)}. + * + * @see Pub/Sub Data Model + * @see Subscriber Guide */ public class SubscriptionInfo implements Serializable { @@ -35,20 +57,47 @@ public class SubscriptionInfo implements Serializable { private final PushConfig pushConfig; private final int ackDeadlineSeconds; - /** - * Builder for Subscription. + * Builder for {@code SubscriptionInfo} objects. */ public abstract static class Builder { + /** + * Sets the name of the subscription. The name must start with a letter, and contain only + * letters ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores + * ({@code _}), periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs + * ({@code %}). It must be between 3 and 255 characters in length and cannot begin with the + * string {@code goog}. + */ public abstract Builder name(String name); + /** + * Sets the name of the topic the subscription refers to. + */ public abstract Builder topic(String name); + /** + * Sets the push configuration for the subscription. If set, the subscription will be in + * push mode and the {@code pushConfig} parameter provides the push endpoint. If not set, the + * subscription will be in pull mode. + */ public abstract Builder pushConfig(PushConfig pushConfig); + /** + * Sets the maximum time after a subscriber receives a message before the subscriber should + * acknowledge the message. After message delivery but before the ack deadline expires and + * before the message is acknowledged, it is an outstanding message and will not be delivered + * again during that time (on a best-effort basis). For pull subscriptions, this value is used + * as the initial value for the ack deadline. To override the ack deadline value for a given + * message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push + * delivery, this value is used to set the request timeout for the call to the push endpoint. If + * not specified, the default value of 10 seconds is used. + */ public abstract Builder ackDeadLineSeconds(int ackDeadLineSeconds); + /** + * Creates a subscription object. + */ public abstract SubscriptionInfo build(); } @@ -108,31 +157,60 @@ public SubscriptionInfo build() { ackDeadlineSeconds = builder.ackDeadlineSeconds; } + /** + * Returns the name of the topic this subscription refers to. + */ public String topic() { return topic; } + /** + * Sets the name of the subscription. The name must start with a letter, and contain only + * letters ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores + * ({@code _}), periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs + * ({@code %}). It must be between 3 and 255 characters in length and cannot begin with the + * string {@code goog}. + */ public String name() { return name; } + /** + * Returns the push configuration for the subscription. If set, the subscription is in push mode + * and the returned value defines the push endpoint. If {@code null}, the subscription is in pull + * mode. + */ public PushConfig pushConfig() { return pushConfig; } + /** + * Returns the maximum time after a subscriber receives a message before the subscriber should + * acknowledge the message. After message delivery but before the ack deadline expires and + * before the message is acknowledged, it is an outstanding message and will not be delivered + * again during that time (on a best-effort basis). For pull subscriptions, this value is used + * as the initial value for the ack deadline. To override the ack deadline value for a given + * message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push + * delivery, this value is used to set the request timeout for the call to the push endpoint. If + * not specified, the default value of 10 seconds is used. + */ public long ackDeadlineSeconds() { return ackDeadlineSeconds; } @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { + if (this == obj) { return true; } - if (o == null || getClass() != o.getClass()) { + if (obj == null || !obj.getClass().equals(this.getClass())) { return false; } - return Objects.equals(toPb(), ((SubscriptionInfo) o).toPb()); + SubscriptionInfo other = (SubscriptionInfo) obj; + return Objects.equals(topic, other.topic) + && Objects.equals(name, other.name) + && Objects.equals(pushConfig, other.pushConfig) + && ackDeadlineSeconds == other.ackDeadlineSeconds; } @Override @@ -150,11 +228,11 @@ public String toString() { .toString(); } - com.google.pubsub.v1.Subscription toPb() { + com.google.pubsub.v1.Subscription toPb(String projectId) { com.google.pubsub.v1.Subscription.Builder builder = com.google.pubsub.v1.Subscription.newBuilder(); - builder.setTopic(topic); - builder.setName(name); + builder.setTopic(PublisherApi.formatTopicName(projectId, topic)); + builder.setName(SubscriberApi.formatSubscriptionName(projectId, name)); builder.setAckDeadlineSeconds(ackDeadlineSeconds); if (pushConfig != null) { builder.setPushConfig(pushConfig.toPb()); @@ -163,26 +241,67 @@ com.google.pubsub.v1.Subscription toPb() { } static SubscriptionInfo fromPb(com.google.pubsub.v1.Subscription subscription) { - Builder builder = builder(subscription.getTopic(), subscription.getName()); + Builder builder = builder(PublisherApi.parseTopicFromTopicName(subscription.getTopic()), + SubscriberApi.parseSubscriptionFromSubscriptionName(subscription.getName())); builder.ackDeadLineSeconds(subscription.getAckDeadlineSeconds()); - if (subscription.hasPushConfig()) { + // A subscription with an "empty" push config is a pull subscription + if (subscription.hasPushConfig() + && !subscription.getPushConfig().getPushEndpoint().equals("")) { builder.pushConfig(PushConfig.fromPb(subscription.getPushConfig())); } return builder.build(); } + /** + * Returns a builder for the subscription object. + */ public Builder toBuilder() { return new BuilderImpl(this); } + /** + * Creates a pull {@code SubscriptionInfo} object given the name of the topic and the name of the + * subscription. + * + * @param topic the name of the topic the subscription refers to + * @param name the name of the subscription. The name must start with a letter, and contain only + * letters ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores + * ({@code _}), periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs + * ({@code %}). It must be between 3 and 255 characters in length and cannot begin with the + * string {@code goog} + */ public static SubscriptionInfo of(String topic, String name) { return builder(topic, name).build(); } + /** + * Creates a push {@code SubscriptionInfo} object given the name of the topic, the name of the + * subscription and the push endpoint. + * + * @param topic the name of the topic the subscription refers to + * @param name the name of the subscription. The name must start with a letter, and contain only + * letters ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores + * ({@code _}), periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs + * ({@code %}). It must be between 3 and 255 characters in length and cannot begin with the + * string {@code goog} + * @param endpoint a URL locating the endpoint to which messages should be pushed. For example, + * an endpoint might use {@code https://example.com/push}. + */ public static SubscriptionInfo of(String topic, String name, String endpoint) { return builder(topic, name).pushConfig(PushConfig.of(endpoint)).build(); } + /** + * Creates a builder for {@code SubscriptionInfo} objects given the name of the topic and the name + * of the subscription. + * + * @param topic the name of the topic the subscription refers to + * @param name the name of the subscription. The name must start with a letter, and contain only + * letters ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores + * ({@code _}), periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs + * ({@code %}). It must be between 3 and 255 characters in length and cannot begin with the + * string {@code goog} + */ public static Builder builder(String topic, String name) { return new BuilderImpl(topic, name); } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/TopicInfo.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/TopicInfo.java index d69cb92f3244..acc5fbbd41ee 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/TopicInfo.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/TopicInfo.java @@ -126,6 +126,9 @@ static TopicInfo fromPb(com.google.pubsub.v1.Topic topicPb) { return builder(PublisherApi.parseTopicFromTopicName(topicPb.getName())).build(); } + /** + * Returns a builder for the topic object. + */ public Builder toBuilder() { return new BuilderImpl(this); } @@ -137,7 +140,7 @@ public Builder toBuilder() { * ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores ({@code _}), * periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs ({@code %}). * It must be between 3 and 255 characters in length and cannot begin with the string - * {@code goog}. + * {@code goog} */ public static TopicInfo of(String name) { return builder(name).build(); @@ -150,7 +153,7 @@ public static TopicInfo of(String name) { * ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores ({@code _}), * periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs ({@code %}). * It must be between 3 and 255 characters in length and cannot begin with the string - * {@code goog}. + * {@code goog} */ public static Builder builder(String name) { return new BuilderImpl(name); diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/SubscriptionInfoTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/SubscriptionInfoTest.java new file mode 100644 index 000000000000..574d40e39ce2 --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/SubscriptionInfoTest.java @@ -0,0 +1,91 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import org.junit.Test; + +public class SubscriptionInfoTest { + + private static final String TOPIC = "topic"; + private static final String NAME = "subscription"; + private static final String ENDPOINT = "https://example.com/push"; + private static final PushConfig PUSH_CONFIG = PushConfig.of(ENDPOINT); + private static final int ACK_DEADLINE = 42; + private static final SubscriptionInfo SUBSCRIPTION_INFO = SubscriptionInfo.builder(TOPIC, NAME) + .pushConfig(PUSH_CONFIG) + .ackDeadLineSeconds(ACK_DEADLINE) + .build(); + + @Test + public void testToBuilder() { + compareSubscriptionInfo(SUBSCRIPTION_INFO, SUBSCRIPTION_INFO.toBuilder().build()); + SubscriptionInfo subscriptionInfo = SUBSCRIPTION_INFO.toBuilder() + .topic("newTopic") + .name("newSubscription") + .build(); + assertEquals("newTopic", subscriptionInfo.topic()); + assertEquals("newSubscription", subscriptionInfo.name()); + subscriptionInfo = subscriptionInfo.toBuilder().name(NAME).topic(TOPIC).build(); + compareSubscriptionInfo(SUBSCRIPTION_INFO, subscriptionInfo); + } + + @Test + public void testBuilder() { + assertEquals(TOPIC, SUBSCRIPTION_INFO.topic()); + assertEquals(NAME, SUBSCRIPTION_INFO.name()); + assertEquals(PUSH_CONFIG, SUBSCRIPTION_INFO.pushConfig()); + assertEquals(ACK_DEADLINE, SUBSCRIPTION_INFO.ackDeadlineSeconds()); + } + + @Test + public void testOf() { + SubscriptionInfo subscriptionInfo = SubscriptionInfo.of(TOPIC, NAME); + assertEquals(TOPIC, subscriptionInfo.topic()); + assertEquals(NAME, subscriptionInfo.name()); + assertNull(subscriptionInfo.pushConfig()); + assertEquals(0, subscriptionInfo.ackDeadlineSeconds()); + subscriptionInfo = SubscriptionInfo.of(TOPIC, NAME, ENDPOINT); + assertEquals(TOPIC, subscriptionInfo.topic()); + assertEquals(NAME, subscriptionInfo.name()); + assertEquals(PushConfig.of(ENDPOINT), subscriptionInfo.pushConfig()); + assertEquals(0, subscriptionInfo.ackDeadlineSeconds()); + } + + @Test + public void testToAndFromPb() { + compareSubscriptionInfo(SUBSCRIPTION_INFO, + SubscriptionInfo.fromPb(SUBSCRIPTION_INFO.toPb("project"))); + SubscriptionInfo subscriptionInfo = SubscriptionInfo.of(TOPIC, NAME); + compareSubscriptionInfo(subscriptionInfo, + SubscriptionInfo.fromPb(subscriptionInfo.toPb("project"))); + subscriptionInfo = SubscriptionInfo.of(TOPIC, NAME, ENDPOINT); + compareSubscriptionInfo(subscriptionInfo, + SubscriptionInfo.fromPb(subscriptionInfo.toPb("project"))); + } + + private void compareSubscriptionInfo(SubscriptionInfo expected, SubscriptionInfo value) { + assertEquals(expected, value); + assertEquals(expected.topic(), value.topic()); + assertEquals(expected.name(), value.name()); + assertEquals(expected.pushConfig(), value.pushConfig()); + assertEquals(expected.ackDeadlineSeconds(), value.ackDeadlineSeconds()); + assertEquals(expected.hashCode(), value.hashCode()); + } +}