Skip to content

Commit

Permalink
remove more code related to synchronous and byte limit
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex committed Jan 6, 2025
1 parent 3b8ffea commit 9b2cdbb
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 51 deletions.
23 changes: 0 additions & 23 deletions pubsub/v2/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ import (
"cloud.google.com/go/pubsub/internal/distribution"
vkit "cloud.google.com/go/pubsub/v2/apiv1"
pb "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
gax "github.com/googleapis/gax-go/v2"
"github.com/googleapis/gax-go/v2/apierror"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protowire"
Expand Down Expand Up @@ -386,27 +384,6 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
return nil, nil
}

// Get messages using the Pull RPC.
// This may block indefinitely. It may also return zero messages, after some time waiting.
func (it *messageIterator) pullMessages(maxToPull int32) ([]*pb.ReceivedMessage, error) {
// Use it.ctx as the RPC context, so that if the iterator is stopped, the call
// will return immediately.
res, err := it.subc.Pull(it.ctx, &pb.PullRequest{
Subscription: it.subName,
MaxMessages: maxToPull,
}, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes)))
switch {
case errors.Is(err, context.Canceled):
return nil, nil
case status.Code(err) == codes.Canceled:
return nil, nil
case err != nil:
return nil, err
default:
return res.ReceivedMessages, nil
}
}

func (it *messageIterator) recvMessages() ([]*pb.ReceivedMessage, error) {
res, err := it.ps.Recv()
if err != nil {
Expand Down
21 changes: 0 additions & 21 deletions pubsub/v2/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,27 +402,6 @@ func TestIterator_ModifyAckContextDeadline(t *testing.T) {
}
}

func TestIterator_SynchronousPullCancel(t *testing.T) {
srv := pstest.NewServer()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)

_, client, err := initConn(ctx, srv.Addr)
if err != nil {
t.Fatal(err)
}
iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{})

// Cancelling the iterator and pulling should not result in any errors.
iter.cancel()

if _, err := iter.pullMessages(100); err != nil {
t.Fatalf("Got error in pullMessages: %v", err)
}
}

func TestIterator_BoundedDuration(t *testing.T) {
// Use exported fields for time.Duration fields so they
// print nicely. Otherwise, they will print as integers.
Expand Down
7 changes: 0 additions & 7 deletions pubsub/v2/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@ const (
MaxPublishRequestBytes = 1e7
)

const (
// TODO: math.MaxInt was added in Go 1.17. We should use that once 1.17
// becomes the minimum supported version of Go.
intSize = 32 << (^uint(0) >> 63)
maxInt = 1<<(intSize-1) - 1
)

// ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes.
var ErrOversizedMessage = bundler.ErrOversizedItem

Expand Down

0 comments on commit 9b2cdbb

Please sign in to comment.