Skip to content

Commit 82521ee

Browse files
authored
feat(internal): copy pubsub Message and PublishResult to internal/pubsub (#3351)
Message and PublishResult will be used by the pubsub and pubsublite packages, which require access to internal fields that should not be made public.
1 parent 28fbb9a commit 82521ee

File tree

2 files changed

+154
-0
lines changed

2 files changed

+154
-0
lines changed

internal/pubsub/message.go

+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright 2020 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
14+
package pubsub
15+
16+
import (
17+
"time"
18+
)
19+
20+
// AckHandler implements ack/nack handling.
21+
type AckHandler interface {
22+
// OnAck processes a message ack.
23+
OnAck()
24+
25+
// OnNack processes a message nack.
26+
OnNack()
27+
}
28+
29+
// Message represents a Pub/Sub message.
30+
type Message struct {
31+
// ID identifies this message. This ID is assigned by the server and is
32+
// populated for Messages obtained from a subscription.
33+
//
34+
// This field is read-only.
35+
ID string
36+
37+
// Data is the actual data in the message.
38+
Data []byte
39+
40+
// Attributes represents the key-value pairs the current message is
41+
// labelled with.
42+
Attributes map[string]string
43+
44+
// PublishTime is the time at which the message was published. This is
45+
// populated by the server for Messages obtained from a subscription.
46+
//
47+
// This field is read-only.
48+
PublishTime time.Time
49+
50+
// DeliveryAttempt is the number of times a message has been delivered.
51+
// This is part of the dead lettering feature that forwards messages that
52+
// fail to be processed (from nack/ack deadline timeout) to a dead letter topic.
53+
// If dead lettering is enabled, this will be set on all attempts, starting
54+
// with value 1. Otherwise, the value will be nil.
55+
// This field is read-only.
56+
DeliveryAttempt *int
57+
58+
// OrderingKey identifies related messages for which publish order should
59+
// be respected. If empty string is used, message will be sent unordered.
60+
OrderingKey string
61+
62+
// ackh handles Ack() or Nack().
63+
ackh AckHandler
64+
}
65+
66+
// Ack indicates successful processing of a Message passed to the Subscriber.Receive callback.
67+
// It should not be called on any other Message value.
68+
// If message acknowledgement fails, the Message will be redelivered.
69+
// Client code must call Ack or Nack when finished for each received Message.
70+
// Calls to Ack or Nack have no effect after the first call.
71+
func (m *Message) Ack() {
72+
if m.ackh != nil {
73+
m.ackh.OnAck()
74+
}
75+
}
76+
77+
// Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback.
78+
// It should not be called on any other Message value.
79+
// Nack will result in the Message being redelivered more quickly than if it were allowed to expire.
80+
// Client code must call Ack or Nack when finished for each received Message.
81+
// Calls to Ack or Nack have no effect after the first call.
82+
func (m *Message) Nack() {
83+
if m.ackh != nil {
84+
m.ackh.OnNack()
85+
}
86+
}
87+
88+
// NewMessage creates a message with an AckHandler implementation, which should
89+
// not be nil.
90+
func NewMessage(ackh AckHandler) *Message {
91+
return &Message{ackh: ackh}
92+
}
93+
94+
// MessageAckHandler provides access to the internal field Message.ackh.
95+
func MessageAckHandler(m *Message) AckHandler {
96+
return m.ackh
97+
}

internal/pubsub/publish.go

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright 2020 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
14+
package pubsub
15+
16+
import "context"
17+
18+
// A PublishResult holds the result from a call to Publish.
19+
type PublishResult struct {
20+
ready chan struct{}
21+
serverID string
22+
err error
23+
}
24+
25+
// Ready returns a channel that is closed when the result is ready.
26+
// When the Ready channel is closed, Get is guaranteed not to block.
27+
func (r *PublishResult) Ready() <-chan struct{} { return r.ready }
28+
29+
// Get returns the server-generated message ID and/or error result of a Publish call.
30+
// Get blocks until the Publish call completes or the context is done.
31+
func (r *PublishResult) Get(ctx context.Context) (serverID string, err error) {
32+
// If the result is already ready, return it even if the context is done.
33+
select {
34+
case <-r.Ready():
35+
return r.serverID, r.err
36+
default:
37+
}
38+
select {
39+
case <-ctx.Done():
40+
return "", ctx.Err()
41+
case <-r.Ready():
42+
return r.serverID, r.err
43+
}
44+
}
45+
46+
// NewPublishResult creates a PublishResult.
47+
func NewPublishResult() *PublishResult {
48+
return &PublishResult{ready: make(chan struct{})}
49+
}
50+
51+
// SetPublishResult sets the server ID and error for a publish result and closes
52+
// the Ready channel.
53+
func SetPublishResult(r *PublishResult, sid string, err error) {
54+
r.serverID = sid
55+
r.err = err
56+
close(r.ready)
57+
}

0 commit comments

Comments
 (0)