diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java index 4e7d412b7759..a7bec6424cfe 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java @@ -285,40 +285,6 @@ public String params() { } } - /** - * This class demonstrates how to publish messages to a Pub/Sub topic. - * - * @see Publish - * messages to a topic - */ - private static class PublishMessagesAction extends PubSubAction>> { - @Override - public void run(PubSub pubsub, Tuple> params) { - String topic = params.x(); - List messages = params.y(); - pubsub.publish(topic, messages); - System.out.printf("Published %d messages to topic %s%n", messages.size(), topic); - } - - @Override - Tuple> parse(String... args) throws Exception { - if (args.length < 2) { - throw new IllegalArgumentException("Missing required topic and messages"); - } - String topic = args[0]; - List messages = new ArrayList<>(); - for (String payload : Arrays.copyOfRange(args, 1, args.length)) { - messages.add(Message.of(payload)); - } - return Tuple.of(topic, messages); - } - - @Override - public String params() { - return " +"; - } - } - private abstract static class SubscriptionAction extends PubSubAction { @Override String parse(String... args) throws Exception { @@ -643,7 +609,6 @@ public void run(PubSub pubsub, Tuple> param) throws Excepti ACTIONS.put("get-policy", new ParentAction(GET_IAM_ACTIONS)); ACTIONS.put("add-identity", new ParentAction(REPLACE_IAM_ACTIONS)); ACTIONS.put("test-permissions", new ParentAction(TEST_IAM_ACTIONS)); - ACTIONS.put("publish", new PublishMessagesAction()); ACTIONS.put("replace-push-config", new ReplacePushConfigAction()); } diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java deleted file mode 100644 index e57475a845e8..000000000000 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2016 Google Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.examples.pubsub.snippets; - -import com.google.cloud.pubsub.Message; -import com.google.cloud.pubsub.PubSub; -import com.google.cloud.pubsub.PubSubOptions; -import com.google.cloud.pubsub.Topic; -import com.google.cloud.pubsub.TopicInfo; - -/** - * A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub topic and asynchronously - * publish messages to it. - */ -public class CreateTopicAndPublishMessages { - - public static void main(String... args) throws Exception { - try (PubSub pubsub = PubSubOptions.getDefaultInstance().getService()) { - Topic topic = pubsub.create(TopicInfo.of("test-topic")); - Message message1 = Message.of("First message"); - Message message2 = Message.of("Second message"); - topic.publishAsync(message1, message2); - } - } -} diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PubSubSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PubSubSnippets.java index 6eb9048b205f..bac2e1d3d999 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PubSubSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PubSubSnippets.java @@ -186,99 +186,6 @@ public boolean deleteTopicAsync(String topicName) return deleted; } - /** - * Example of publishing one message to a topic. - */ - // [TARGET publish(String, Message)] - // [VARIABLE "my_topic_name"] - public String publishOneMessage(String topicName) { - // [START publishOneMessage] - Message message = Message.of("payload"); - String messageId = pubsub.publish(topicName, message); - // [END publishOneMessage] - return messageId; - } - - /** - * Example of asynchronously publishing one message to a topic. - */ - // [TARGET publishAsync(String, Message)] - // [VARIABLE "my_topic_name"] - public String publishOneMessageAsync(String topicName) - throws ExecutionException, InterruptedException { - // [START publishOneMessageAsync] - Message message = Message.of("payload"); - Future future = pubsub.publishAsync(topicName, message); - // ... - String messageId = future.get(); - // [END publishOneMessageAsync] - return messageId; - } - - /** - * Example of publishing a list of messages to a topic. - */ - // [TARGET publish(String, Iterable)] - // [VARIABLE "my_topic_name"] - public List publishMessageList(String topicName) { - // [START publishMessageList] - List messages = new LinkedList<>(); - messages.add(Message.of("payload1")); - messages.add(Message.of("payload2")); - List messageIds = pubsub.publish(topicName, messages); - // [END publishMessageList] - return messageIds; - } - - /** - * Example of asynchronously publishing a list of messages to a topic. - */ - // [TARGET publishAsync(String, Iterable)] - // [VARIABLE "my_topic_name"] - public List publishMessageListAsync(String topicName) - throws ExecutionException, InterruptedException { - // [START publishMessageListAsync] - List messages = new LinkedList<>(); - messages.add(Message.of("payload1")); - messages.add(Message.of("payload2")); - Future> future = pubsub.publishAsync(topicName, messages); - // ... - List messageIds = future.get(); - // [END publishMessageListAsync] - return messageIds; - } - - /** - * Example of publishing some messages to a topic. - */ - // [TARGET publish(String, Message, Message...)] - // [VARIABLE "my_topic_name"] - public List publishMessages(String topicName) { - // [START publishMessages] - Message message1 = Message.of("payload1"); - Message message2 = Message.of("payload2"); - List messageIds = pubsub.publish(topicName, message1, message2); - // [END publishMessages] - return messageIds; - } - - /** - * Example of asynchronously publishing some messages to a topic. - */ - // [TARGET publishAsync(String, Message, Message...)] - // [VARIABLE "my_topic_name"] - public List publishMessagesAsync(String topicName) - throws ExecutionException, InterruptedException { - // [START publishMessagesAsync] - Message message1 = Message.of("payload1"); - Message message2 = Message.of("payload2"); - Future> future = pubsub.publishAsync(topicName, message1, message2); - // ... - List messageIds = future.get(); - // [END publishMessagesAsync] - return messageIds; - } - /** * Example of creating a pull subscription for a topic. */ diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/TopicSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/TopicSnippets.java index 368481266a5f..6d0c4548da7c 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/TopicSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/TopicSnippets.java @@ -113,94 +113,6 @@ public boolean deleteAsync() throws ExecutionException, InterruptedException { return deleted; } - /** - * Example of publishing one message to the topic. - */ - // [TARGET publish(Message)] - public String publishOneMessage() { - // [START publishOneMessage] - Message message = Message.of("payload"); - String messageId = topic.publish(message); - // [END publishOneMessage] - return messageId; - } - - /** - * Example of asynchronously publishing one message to the topic. - */ - // [TARGET publishAsync(Message)] - public String publishOneMessageAsync() - throws ExecutionException, InterruptedException { - // [START publishOneMessageAsync] - Message message = Message.of("payload"); - Future future = topic.publishAsync(message); - // ... - String messageId = future.get(); - // [END publishOneMessageAsync] - return messageId; - } - - - /** - * Example of publishing a list of messages to the topic. - */ - // [TARGET publish(Iterable)] - public List publishMessageList() { - // [START publishMessageList] - List messages = new LinkedList<>(); - messages.add(Message.of("payload1")); - messages.add(Message.of("payload2")); - List messageIds = topic.publish(messages); - // [END publishMessageList] - return messageIds; - } - - /** - * Example of asynchronously publishing a list of messages to the topic. - */ - // [TARGET publishAsync(Iterable)] - public List publishMessageListAsync() - throws ExecutionException, InterruptedException { - // [START publishMessageListAsync] - List messages = new LinkedList<>(); - messages.add(Message.of("payload1")); - messages.add(Message.of("payload2")); - Future> future = topic.publishAsync(messages); - // ... - List messageIds = future.get(); - // [END publishMessageListAsync] - return messageIds; - } - - /** - * Example of publishing some messages to the topic. - */ - // [TARGET publish(Message, Message...)] - public List publishMessages() { - // [START publishMessages] - Message message1 = Message.of("payload1"); - Message message2 = Message.of("payload2"); - List messageIds = topic.publish(message1, message2); - // [END publishMessages] - return messageIds; - } - - /** - * Example of asynchronously publishing some messages to the topic. - */ - // [TARGET publishAsync(Message, Message...)] - public List publishMessagesAsync() - throws ExecutionException, InterruptedException { - // [START publishMessagesAsync] - Message message1 = Message.of("payload1"); - Message message2 = Message.of("payload2"); - Future> future = topic.publishAsync(message1, message2); - // ... - List messageIds = future.get(); - // [END publishMessagesAsync] - return messageIds; - } - /** * Example of listing subscriptions for the topic, specifying the page size. */ diff --git a/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITTopicSnippets.java b/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITTopicSnippets.java index 57c3c70d04aa..62c88175daff 100644 --- a/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITTopicSnippets.java +++ b/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITTopicSnippets.java @@ -70,12 +70,6 @@ public void testTopic() throws ExecutionException, InterruptedException { assertEquals(topic, updatedTopic); updatedTopic = topicSnippets.reloadAsync(); assertEquals(topic, updatedTopic); - assertNotNull(topicSnippets.publishOneMessage()); - assertNotNull(topicSnippets.publishOneMessageAsync()); - assertEquals(2, topicSnippets.publishMessageList().size()); - assertEquals(2, topicSnippets.publishMessageListAsync().size()); - assertEquals(2, topicSnippets.publishMessages().size()); - assertEquals(2, topicSnippets.publishMessagesAsync().size()); } @Test diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index a2978998426f..9439e4665baf 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -282,137 +282,7 @@ public static PullOption executorFactory(ExecutorFactory executorFactory) { */ Future> listTopicsAsync(ListOption... options); - /** - * Publishes a message to the provided topic. This method returns a service-generated id for the - * published message. Service-generated ids are guaranteed to be unique within the topic. - * - *

Example of publishing one message to a topic. - *

 {@code
-   * String topicName = "my_topic_name";
-   * Message message = Message.of("payload");
-   * String messageId = pubsub.publish(topicName, message);
-   * }
- * - * @param topic the topic where the message is published - * @param message the message to publish - * @return a unique service-generated id for the message - * @throws PubSubException upon failure, if the topic does not exist or if the message has empty - * payload and no attributes - */ - String publish(String topic, Message message); - - /** - * Sends a request for publishing a message to the provided topic. This method returns a - * {@code Future} object to consume the result. {@link Future#get()} returns a service-generated - * id for the published message. Service-generated ids are guaranteed to be unique within the - * topic. - * - *

Example of asynchronously publishing one message to a topic. - *

 {@code
-   * String topicName = "my_topic_name";
-   * Message message = Message.of("payload");
-   * Future future = pubsub.publishAsync(topicName, message);
-   * // ...
-   * String messageId = future.get();
-   * }
- * - * @param topic the topic where the message is published - * @param message the message to publish - * @return a {@code Future} for the unique service-generated id for the message - */ - Future publishAsync(String topic, Message message); - - /** - * Publishes a number of messages to the provided topic. This method returns a list of - * service-generated ids for the published messages. Service-generated ids are guaranteed to be - * unique within the topic. - * - *

Example of publishing some messages to a topic. - *

 {@code
-   * String topicName = "my_topic_name";
-   * Message message1 = Message.of("payload1");
-   * Message message2 = Message.of("payload2");
-   * List messageIds = pubsub.publish(topicName, message1, message2);
-   * }
- * - * @param topic the topic where the message is published - * @param message the first message to publish - * @param messages other messages to publish - * @return a list of unique, service-generated, ids. Ids are in the same order as the messages. - * @throws PubSubException upon failure, if the topic does not exist or if one of the messages has - * empty payload and no attributes - */ - List publish(String topic, Message message, Message... messages); - - /** - * Sends a request to publish a number of messages to the provided topic. This method returns a - * {@code Future} object to consume the result. {@link Future#get()} returns a list of - * service-generated ids for the published messages. Service-generated ids are guaranteed to be - * unique within the topic. - * - *

Example of asynchronously publishing some messages to a topic. - *

 {@code
-   * String topicName = "my_topic_name";
-   * Message message1 = Message.of("payload1");
-   * Message message2 = Message.of("payload2");
-   * Future> future = pubsub.publishAsync(topicName, message1, message2);
-   * // ...
-   * List messageIds = future.get();
-   * }
- * - * @param topic the topic where the message is published - * @param message the first message to publish - * @param messages other messages to publish - * @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as - * the messages. - */ - Future> publishAsync(String topic, Message message, Message... messages); - - /** - * Publishes a number of messages to the provided topic. This method returns a list of - * service-generated ids for the published messages. Service-generated ids are guaranteed to be - * unique within the topic. - * - *

Example of publishing a list of messages to a topic. - *

 {@code
-   * String topicName = "my_topic_name";
-   * List messages = new LinkedList<>();
-   * messages.add(Message.of("payload1"));
-   * messages.add(Message.of("payload2"));
-   * List messageIds = pubsub.publish(topicName, messages);
-   * }
- * - * @param topic the topic where the message is published - * @param messages the messages to publish - * @return a list of unique, service-generated, ids. Ids are in the same order as the messages. - * @throws PubSubException upon failure, if the topic does not exist or if one of the messages has - * empty payload and no attributes - */ - List publish(String topic, Iterable messages); - - /** - * Sends a request to publish a number of messages to the provided topic. This method returns a - * {@code Future} object to consume the result. {@link Future#get()} returns a list of - * service-generated ids for the published messages. Service-generated ids are guaranteed to be - * unique within the topic. - * - *

Example of asynchronously publishing a list of messages to a topic. - *

 {@code
-   * String topicName = "my_topic_name";
-   * List messages = new LinkedList<>();
-   * messages.add(Message.of("payload1"));
-   * messages.add(Message.of("payload2"));
-   * Future> future = pubsub.publishAsync(topicName, messages);
-   * // ...
-   * List messageIds = future.get();
-   * }
- * - * @param topic the topic where the message is published - * @param messages the messages to publish - * @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as - * the messages - */ - Future> publishAsync(String topic, Iterable messages); + Publisher getPublisher(TopicInfo topic) throws IOException; /** * Creates a new subscription. @@ -672,7 +542,7 @@ public static PullOption executorFactory(ExecutorFactory executorFactory) { */ Future> listSubscriptionsAsync(String topic, ListOption... options); - Subscriber subscriber(SubscriptionInfo subscription, Subscriber.MessageReceiver receiver) + Subscriber getSubscriber(SubscriptionInfo subscription, Subscriber.MessageReceiver receiver) throws IOException; /** diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index 67a9e5f4b3d0..dc0d56b7cfdb 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -34,7 +34,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; @@ -55,10 +54,7 @@ import com.google.pubsub.v1.ListTopicsRequest; import com.google.pubsub.v1.ListTopicsResponse; import com.google.pubsub.v1.ModifyPushConfigRequest; -import com.google.pubsub.v1.PublishRequest; -import com.google.pubsub.v1.PublishResponse; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -289,54 +285,13 @@ public Future> listTopicsAsync(ListOption... options) { } @Override - public String publish(String topic, Message message) { - return get(publishAsync(topic, message)); - } - - private static PublishRequest publishRequest(PubSubOptions serviceOptions, String topic, - Iterable messages) { - PublishRequest.Builder builder = PublishRequest.newBuilder(); - builder.setTopic(PublisherClient.formatTopicName(serviceOptions.getProjectId(), topic)); - builder.addAllMessages(Iterables.transform(messages, Message.TO_PB_FUNCTION)); - return builder.build(); - } - - @Override - public Future publishAsync(String topic, Message message) { - return transform( - rpc.publish(publishRequest(getOptions(), topic, Collections.singletonList(message))), - new Function() { - @Override - public String apply(PublishResponse publishResponse) { - return publishResponse.getMessageIdsList().get(0); - } - }); - } - - @Override - public List publish(String topic, Message message, Message... messages) { - return publish(topic, Lists.asList(message, messages)); - } - - @Override - public Future> publishAsync(String topic, Message message, Message... messages) { - return publishAsync(topic, Lists.asList(message, messages)); - } - - @Override - public List publish(String topic, Iterable messages) { - return get(publishAsync(topic, messages)); - } - - @Override - public Future> publishAsync(String topic, Iterable messages) { - return transform(rpc.publish(publishRequest(getOptions(), topic, messages)), - new Function>() { - @Override - public List apply(PublishResponse publishResponse) { - return publishResponse.getMessageIdsList(); - } - }); + public Publisher getPublisher(TopicInfo topic) throws IOException { + // TODO(pongad): Provide a way to pass in the rest of the options. + String topicName = + PublisherClient.formatTopicName(getOptions().getProjectId(), topic.getName()); + return Publisher.Builder.newBuilder(topicName) + .setCredentials(getOptions().getCredentials()) + .build(); } @Override @@ -493,8 +448,8 @@ public Future> listSubscriptionsAsync(String topic, } @Override - public Subscriber subscriber(SubscriptionInfo subscription, Subscriber.MessageReceiver receiver) - throws IOException { + public Subscriber getSubscriber( + SubscriptionInfo subscription, Subscriber.MessageReceiver receiver) throws IOException { // TODO(pongad): Provide a way to pass in the rest of the options. String subName = SubscriberClient.formatSubscriptionName( diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java index 3a5a8dd7c58c..c995294877ff 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java @@ -202,138 +202,6 @@ public Future reloadAsync() { return pubsub.getTopicAsync(getName()); } - /** - * Publishes a message to this topic. This method returns a service-generated id for the published - * message. Service-generated ids are guaranteed to be unique within the topic. - * - *

Example of publishing one message to the topic. - *

 {@code
-   * Message message = Message.of("payload");
-   * String messageId = topic.publish(message);
-   * }
- * - * @param message the message to publish - * @return a unique service-generated id for the message - * @throws PubSubException upon failure, if the topic does not exist or if the message has empty - * payload and no attributes - */ - public String publish(Message message) { - return pubsub.publish(getName(), message); - } - - /** - * Sends a request for publishing a message to the this topic. This method returns a - * {@code Future} object to consume the result. {@link Future#get()} returns a service-generated - * id for the published message. Service-generated ids are guaranteed to be unique within the - * topic. - * - *

Example of asynchronously publishing one message to the topic. - *

 {@code
-   * Message message = Message.of("payload");
-   * Future future = topic.publishAsync(message);
-   * // ...
-   * String messageId = future.get();
-   * }
- * - * @param message the message to publish - * @return a {@code Future} for the unique service-generated id for the message - */ - public Future publishAsync(Message message) { - return pubsub.publishAsync(getName(), message); - } - - /** - * Publishes a number of messages to this topic. This method returns a list of service-generated - * ids for the published messages. Service-generated ids are guaranteed to be unique within the - * topic. - * - *

Example of publishing some messages to the topic. - *

 {@code
-   * Message message1 = Message.of("payload1");
-   * Message message2 = Message.of("payload2");
-   * List messageIds = topic.publish(message1, message2);
-   * }
- * - * @param message the first message to publish - * @param messages other messages to publish - * @return a list of unique, service-generated, ids. Ids are in the same order as the messages. - * @throws PubSubException upon failure, if the topic does not exist or if one of the messages has - * empty payload and no attributes - */ - public List publish(Message message, Message... messages) { - return pubsub.publish(getName(), message, messages); - } - - /** - * Sends a request to publish a number of messages to this topic. This method returns a - * {@code Future} object to consume the result. {@link Future#get()} returns a list of - * service-generated ids for the published messages. Service-generated ids are guaranteed to be - * unique within the topic. - * - *

Example of asynchronously publishing some messages to the topic. - *

 {@code
-   * Message message1 = Message.of("payload1");
-   * Message message2 = Message.of("payload2");
-   * Future> future = topic.publishAsync(message1, message2);
-   * // ...
-   * List messageIds = future.get();
-   * }
- * - * @param message the first message to publish - * @param messages other messages to publish - * @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as - * the messages. - */ - public Future> publishAsync(Message message, Message... messages) { - return pubsub.publishAsync(getName(), message, messages); - } - - /** - * Publishes a number of messages to this topic. This method returns a list ofservice-generated - * ids for the published messages. Service-generated ids are guaranteed to be unique within the - * topic. - * - *

Example of publishing a list of messages to the topic. - *

 {@code
-   * List messages = new LinkedList<>();
-   * messages.add(Message.of("payload1"));
-   * messages.add(Message.of("payload2"));
-   * List messageIds = topic.publish(messages);
-   * }
- * - * @param messages the messages to publish - * @return a list of unique, service-generated, ids. Ids are in the same order as the messages. - * @throws PubSubException upon failure, if the topic does not exist or if one of the messages has - * empty payload and no attributes - */ - public List publish(Iterable messages) { - return pubsub.publish(getName(), messages); - } - - /** - * Sends a request to publish a number of messages to this topic. This method returns a - * {@code Future} object to consume the result. {@link Future#get()} returns a list of - * service-generated ids for the published messages. Service-generated ids are guaranteed to be - * unique within the topic. - * - *

Example of asynchronously publishing a list of messages to the topic. - *

 {@code
-   * List messages = new LinkedList<>();
-   * messages.add(Message.of("payload1"));
-   * messages.add(Message.of("payload2"));
-   * Future> future = topic.publishAsync(messages);
-   * // ...
-   * List messageIds = future.get();
-   * }
- * - * @param messages the messages to publish - * @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as - * the messages. - */ - public Future> publishAsync(Iterable messages) { - return pubsub.publishAsync(getName(), messages); - } - /** * Lists the identities of the subscriptions for this topic. This method returns a {@link Page} * object that can be used to consume paginated results. Use {@link ListOption} to specify the diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java index cdafc29b8556..eac041476f34 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java @@ -138,78 +138,6 @@ public void testListTopicsAsync() throws ExecutionException, InterruptedExceptio assertTrue(topic3.delete()); } - @Test - public void testPublishOneMessage() { - String topic = formatForTest("test-publish-one-message-topic"); - pubsub().create(TopicInfo.of(topic)); - Message message = Message.of("payload"); - assertNotNull(pubsub().publish(topic, message)); - assertTrue(pubsub().deleteTopic(topic)); - } - - @Test - public void testPublishNonExistingTopic() { - String topic = formatForTest("test-publish-non-existing-topic"); - Message message = Message.of("payload"); - thrown.expect(PubSubException.class); - pubsub().publish(topic, message); - } - - @Test - public void testPublishOneMessageAsync() throws ExecutionException, InterruptedException { - String topic = formatForTest("test-publish-one-message-async-topic"); - pubsub().create(TopicInfo.of(topic)); - Message message = Message.of("payload"); - Future publishFuture = pubsub().publishAsync(topic, message); - assertNotNull(publishFuture.get()); - assertTrue(pubsub().deleteTopic(topic)); - } - - @Test - public void testPublishMoreMessages() { - String topic = formatForTest("test-publish-more-messages-topic"); - pubsub().create(TopicInfo.of(topic)); - Message message1 = Message.of("payload1"); - Message message2 = Message.of("payload2"); - List messageIds = pubsub().publish(topic, message1, message2); - assertEquals(2, messageIds.size()); - assertTrue(pubsub().deleteTopic(topic)); - } - - @Test - public void testPublishMoreMessagesAsync() throws ExecutionException, InterruptedException { - String topic = formatForTest("test-publish-more-messages-topic-async-topic"); - pubsub().create(TopicInfo.of(topic)); - Message message1 = Message.of("payload1"); - Message message2 = Message.of("payload2"); - Future> publishFuture = pubsub().publishAsync(topic, message1, message2); - assertEquals(2, publishFuture.get().size()); - assertTrue(pubsub().deleteTopic(topic)); - } - - @Test - public void testPublishMessageList() { - String topic = formatForTest("test-publish-message-list-topic"); - pubsub().create(TopicInfo.of(topic)); - Message message1 = Message.of("payload1"); - Message message2 = Message.of("payload2"); - List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); - assertEquals(2, messageIds.size()); - assertTrue(pubsub().deleteTopic(topic)); - } - - @Test - public void testPublishMessagesListAsync() throws ExecutionException, InterruptedException { - String topic = formatForTest("test-publish-message-list-async-topic"); - pubsub().create(TopicInfo.of(topic)); - Message message1 = Message.of("payload1"); - Message message2 = Message.of("payload2"); - Future> publishFuture = - pubsub().publishAsync(topic, ImmutableList.of(message1, message2)); - assertEquals(2, publishFuture.get().size()); - assertTrue(pubsub().deleteTopic(topic)); - } - @Test public void testCreateGetAndDeleteSubscription() { String topic = formatForTest("test-create-get-delete-subscription-topic"); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java index 23beb76c8b06..3f02a7628805 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java @@ -482,104 +482,6 @@ public void testListTopicsAsyncWithOptions() throws ExecutionException, Interrup assertArrayEquals(topicList.toArray(), Iterables.toArray(page.getValues(), Topic.class)); } - @Test - public void testPublishOneMessage() { - PublishRequest request = PublishRequest.newBuilder() - .setTopic(TOPIC_NAME_PB) - .addAllMessages(ImmutableList.of(MESSAGE.toPb())) - .build(); - String messageId = "messageId"; - PublishResponse response = PublishResponse.newBuilder().addMessageIds(messageId).build(); - Future responseFuture = Futures.immediateFuture(response); - EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); - EasyMock.replay(pubsubRpcMock); - pubsub = new PubSubImpl(options); - assertEquals(messageId, pubsub.publish(TOPIC, MESSAGE)); - } - - @Test - public void testPublishOneMessageAsync() throws ExecutionException, InterruptedException { - PublishRequest request = PublishRequest.newBuilder() - .setTopic(TOPIC_NAME_PB) - .addMessages(MESSAGE.toPb()) - .build(); - String messageId = "messageId"; - PublishResponse response = PublishResponse.newBuilder().addMessageIds(messageId).build(); - Future responseFuture = Futures.immediateFuture(response); - EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); - EasyMock.replay(pubsubRpcMock); - pubsub = new PubSubImpl(options); - assertEquals(messageId, pubsub.publishAsync(TOPIC, MESSAGE).get()); - } - - @Test - public void testPublishMoreMessages() { - PublishRequest request = PublishRequest.newBuilder() - .setTopic(TOPIC_NAME_PB) - .addAllMessages(ImmutableList.of(MESSAGE.toPb(), MESSAGE.toPb())) - .build(); - List messageIds = ImmutableList.of("messageId1", "messageId2"); - PublishResponse response = PublishResponse.newBuilder() - .addAllMessageIds(messageIds) - .build(); - Future responseFuture = Futures.immediateFuture(response); - EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); - EasyMock.replay(pubsubRpcMock); - pubsub = new PubSubImpl(options); - assertEquals(messageIds, pubsub.publish(TOPIC, MESSAGE, MESSAGE)); - } - - @Test - public void testPublishMoreMessagesAsync() throws ExecutionException, InterruptedException { - PublishRequest request = PublishRequest.newBuilder() - .setTopic(TOPIC_NAME_PB) - .addAllMessages(ImmutableList.of(MESSAGE.toPb(), MESSAGE.toPb())) - .build(); - List messageIds = ImmutableList.of("messageId1", "messageId2"); - PublishResponse response = PublishResponse.newBuilder() - .addAllMessageIds(messageIds) - .build(); - Future responseFuture = Futures.immediateFuture(response); - EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); - EasyMock.replay(pubsubRpcMock); - pubsub = new PubSubImpl(options); - assertEquals(messageIds, pubsub.publishAsync(TOPIC, MESSAGE, MESSAGE).get()); - } - - @Test - public void testPublishMessageList() { - PublishRequest request = PublishRequest.newBuilder() - .setTopic(TOPIC_NAME_PB) - .addAllMessages(ImmutableList.of(MESSAGE.toPb(), MESSAGE.toPb())) - .build(); - List messageIds = ImmutableList.of("messageId1", "messageId2"); - PublishResponse response = PublishResponse.newBuilder() - .addAllMessageIds(messageIds) - .build(); - Future responseFuture = Futures.immediateFuture(response); - EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); - EasyMock.replay(pubsubRpcMock); - pubsub = new PubSubImpl(options); - assertEquals(messageIds, pubsub.publish(TOPIC, ImmutableList.of(MESSAGE, MESSAGE))); - } - - @Test - public void testPublishMessageListAsync() throws ExecutionException, InterruptedException { - PublishRequest request = PublishRequest.newBuilder() - .setTopic(TOPIC_NAME_PB) - .addAllMessages(ImmutableList.of(MESSAGE.toPb(), MESSAGE.toPb())) - .build(); - List messageIds = ImmutableList.of("messageId1", "messageId2"); - PublishResponse response = PublishResponse.newBuilder() - .addAllMessageIds(messageIds) - .build(); - Future responseFuture = Futures.immediateFuture(response); - EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); - EasyMock.replay(pubsubRpcMock); - pubsub = new PubSubImpl(options); - assertEquals(messageIds, pubsub.publishAsync(TOPIC, ImmutableList.of(MESSAGE, MESSAGE)).get()); - } - @Test public void testCreateSubscription() { com.google.pubsub.v1.Subscription subscriptionPb = SUBSCRIPTION_INFO.toPb(PROJECT); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java index 438759b73184..9c909be203e7 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java @@ -188,87 +188,6 @@ public void testDeleteAsyncFalse() throws ExecutionException, InterruptedExcepti assertFalse(topic.deleteAsync().get()); } - @Test - public void testPublishOneMessage() { - initializeExpectedTopic(1); - expect(pubsub.getOptions()).andReturn(mockOptions); - Message message = Message.of("payload1"); - String messageId = "messageId"; - expect(pubsub.publish(NAME, message)).andReturn(messageId); - replay(pubsub); - initializeTopic(); - assertEquals(messageId, topic.publish(message)); - } - - @Test - public void testPublishOneMessageAsync() throws ExecutionException, InterruptedException { - initializeExpectedTopic(1); - expect(pubsub.getOptions()).andReturn(mockOptions); - Message message = Message.of("payload1"); - String messageId = "messageId"; - expect(pubsub.publishAsync(NAME, message)) - .andReturn(Futures.immediateFuture(messageId)); - replay(pubsub); - initializeTopic(); - assertEquals(messageId, topic.publishAsync(message).get()); - } - - @Test - public void testPublishMoreMessages() { - initializeExpectedTopic(1); - expect(pubsub.getOptions()).andReturn(mockOptions); - Message message1 = Message.of("payload1"); - Message message2 = Message.of("payload2"); - List messageIds = ImmutableList.of("messageId1", "messageId2"); - expect(pubsub.publish(NAME, message1, message2)).andReturn(messageIds); - replay(pubsub); - initializeTopic(); - assertEquals(messageIds, topic.publish(message1, message2)); - } - - @Test - public void testPublishMoreMessagesAsync() throws ExecutionException, InterruptedException { - initializeExpectedTopic(1); - expect(pubsub.getOptions()).andReturn(mockOptions); - Message message1 = Message.of("payload1"); - Message message2 = Message.of("payload2"); - List messageIds = ImmutableList.of("messageId1", "messageId2"); - expect(pubsub.publishAsync(NAME, message1, message2)) - .andReturn(Futures.immediateFuture(messageIds)); - replay(pubsub); - initializeTopic(); - assertEquals(messageIds, topic.publishAsync(message1, message2).get()); - } - - @Test - public void testPublishMessageList() { - initializeExpectedTopic(1); - expect(pubsub.getOptions()).andReturn(mockOptions); - Message message1 = Message.of("payload1"); - Message message2 = Message.of("payload2"); - List messages = ImmutableList.of(message1, message2); - List messageIds = ImmutableList.of("messageId1", "messageId2"); - expect(pubsub.publish(NAME, messages)).andReturn(messageIds); - replay(pubsub); - initializeTopic(); - assertEquals(messageIds, topic.publish(messages)); - } - - @Test - public void testPublishMessageListAsync() throws ExecutionException, InterruptedException { - initializeExpectedTopic(1); - expect(pubsub.getOptions()).andReturn(mockOptions); - Message message1 = Message.of("payload1"); - Message message2 = Message.of("payload2"); - List messages = ImmutableList.of(message1, message2); - List messageIds = ImmutableList.of("messageId1", "messageId2"); - expect(pubsub.publishAsync(NAME, messages)) - .andReturn(Futures.immediateFuture(messageIds)); - replay(pubsub); - initializeTopic(); - assertEquals(messageIds, topic.publishAsync(messages).get()); - } - @Test public void testListSubscriptions() { initializeExpectedTopic(1);