Skip to content

Commit

Permalink
feat(pubsub): support new forms of topic ingestion (#11537)
Browse files Browse the repository at this point in the history
* feat(pubsub): support new forms of topic ingestion

* add test for kafka topics

* rename msk state, remove output only field
  • Loading branch information
hongalex authored Jan 31, 2025
1 parent 0dd7d3d commit 46d6ed4
Show file tree
Hide file tree
Showing 2 changed files with 314 additions and 1 deletion.
246 changes: 245 additions & 1 deletion pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,189 @@ func (i *IngestionDataSourceCloudStoragePubSubAvroFormat) isCloudStorageIngestio
return true
}

// EventHubsState denotes the possible states for ingestion from Event Hubs.
type EventHubsState int

const (
// EventHubsStateUnspecified is the default value. This value is unused.
EventHubsStateUnspecified = iota

// EventHubsStateActive means the state is active.
EventHubsStateActive

// EventHubsStatePermissionDenied indicates encountered permission denied error
// while consuming data from Event Hubs.
// This can happen when `client_id`, or `tenant_id` are invalid. Or the
// right permissions haven't been granted.
EventHubsStatePermissionDenied

// EventHubsStatePublishPermissionDenied indicates permission denied encountered
// while publishing to the topic.
EventHubsStatePublishPermissionDenied

// EventHubsStateNamespaceNotFound indicates the provided Event Hubs namespace couldn't be found.
EventHubsStateNamespaceNotFound

// EventHubsStateNotFound indicates the provided Event Hub couldn't be found.
EventHubsStateNotFound

// EventHubsStateSubscriptionNotFound indicates the provided Event Hubs subscription couldn't be found.
EventHubsStateSubscriptionNotFound

// EventHubsStateResourceGroupNotFound indicates the provided Event Hubs resource group couldn't be found.
EventHubsStateResourceGroupNotFound
)

// IngestionDataSourceAzureEventHubs are ingestion settings for Azure Event Hubs.
type IngestionDataSourceAzureEventHubs struct {
// Output only field that indicates the state of the Event Hubs ingestion source.
State EventHubsState

// Name of the resource group within the Azure subscription
ResourceGroup string

// Name of the Event Hubs namespace
Namespace string

// Rame of the Event Hub.
EventHub string

// Client ID of the Azure application that is being used to authenticate Pub/Sub.
ClientID string

// Tenant ID of the Azure application that is being used to authenticate Pub/Sub.
TenantID string

// The Azure subscription ID
SubscriptionID string

// GCPServiceAccount is the GCP service account to be used for Federated Identity
// authentication.
GCPServiceAccount string
}

var _ IngestionDataSource = (*IngestionDataSourceAzureEventHubs)(nil)

func (i *IngestionDataSourceAzureEventHubs) isIngestionDataSource() bool {
return true
}

// AmazonMSKState denotes the possible states for ingestion from Amazon MSK.
type AmazonMSKState int

const (
// AmazonMSKStateUnspecified is the default value. This value is unused.
AmazonMSKStateUnspecified = iota

// AmazonMSKActive indicates MSK topic is active.
AmazonMSKActive

// AmazonMSKPermissionDenied indicates permission denied encountered while consuming data from Amazon MSK.
AmazonMSKPermissionDenied

// AmazonMSKPublishPermissionDenied indicates permission denied encountered while publishing to the topic.
AmazonMSKPublishPermissionDenied

// AmazonMSKClusterNotFound indicates the provided Msk cluster wasn't found.
AmazonMSKClusterNotFound

// AmazonMSKTopicNotFound indicates the provided topic wasn't found.
AmazonMSKTopicNotFound
)

// IngestionDataSourceAmazonMSK are ingestion settings for Amazon MSK.
type IngestionDataSourceAmazonMSK struct {
// An output-only field that indicates the state of the Amazon
// MSK ingestion source.
State AmazonMSKState

// The Amazon Resource Name (ARN) that uniquely identifies the
// cluster.
ClusterARN string

// The name of the topic in the Amazon MSK cluster that Pub/Sub
// will import from.
Topic string

// AWS role ARN to be used for Federated Identity authentication
// with Amazon MSK. Check the Pub/Sub docs for how to set up this role and
// the required permissions that need to be attached to it.
AWSRoleARN string

// The GCP service account to be used for Federated Identity
// authentication with Amazon MSK (via a `AssumeRoleWithWebIdentity` call
// for the provided role). The `aws_role_arn` must be set up with
// `accounts.google.com:sub` equals to this service account number.
GCPServiceAccount string
}

var _ IngestionDataSource = (*IngestionDataSourceAmazonMSK)(nil)

func (i *IngestionDataSourceAmazonMSK) isIngestionDataSource() bool {
return true
}

// ConfluentCloudState denotes state of ingestion topic with confluent cloud
type ConfluentCloudState int

const (
// ConfluentCloudStateUnspecified is the default value. This value is unused.
ConfluentCloudStateUnspecified = iota

// ConfluentCloudActive indicates the state is active.
ConfluentCloudActive = 1

// ConfluentCloudPermissionDenied indicates permission denied encountered
// while consuming data from Confluent Cloud.
ConfluentCloudPermissionDenied = 2

// ConfluentCloudPublishPermissionDenied indicates permission denied encountered
// while publishing to the topic.
ConfluentCloudPublishPermissionDenied = 3

// ConfluentCloudUnreachableBootstrapServer indicates the provided bootstrap
// server address is unreachable.
ConfluentCloudUnreachableBootstrapServer = 4

// ConfluentCloudClusterNotFound indicates the provided cluster wasn't found.
ConfluentCloudClusterNotFound = 5

// ConfluentCloudTopicNotFound indicates the provided topic wasn't found.
ConfluentCloudTopicNotFound = 6
)

// IngestionDataSourceConfluentCloud are ingestion settings for confluent cloud.
type IngestionDataSourceConfluentCloud struct {
// An output-only field that indicates the state of the
// Confluent Cloud ingestion source.
State ConfluentCloudState

// The address of the bootstrap server. The format is url:port.
BootstrapServer string

// The id of the cluster.
ClusterID string

// The name of the topic in the Confluent Cloud cluster that
// Pub/Sub will import from.
Topic string

// The id of the identity pool to be used for Federated Identity
// authentication with Confluent Cloud. See
// https://docs.confluent.io/cloud/current/security/authenticate/workload-identities/identity-providers/oauth/identity-pools.html#add-oauth-identity-pools.
IdentityPoolID string

// The GCP service account to be used for Federated Identity
// authentication with `identity_pool_id`.
GCPServiceAccount string
}

var _ IngestionDataSource = (*IngestionDataSourceConfluentCloud)(nil)

func (i *IngestionDataSourceConfluentCloud) isIngestionDataSource() bool {
return true
}

func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *IngestionDataSourceSettings {
if pbs == nil {
return nil
Expand Down Expand Up @@ -625,6 +808,34 @@ func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *In
MinimumObjectCreateTime: cs.GetMinimumObjectCreateTime().AsTime(),
MatchGlob: cs.GetMatchGlob(),
}
} else if e := pbs.GetAzureEventHubs(); e != nil {
s.Source = &IngestionDataSourceAzureEventHubs{
State: EventHubsState(e.GetState()),
ResourceGroup: e.GetResourceGroup(),
Namespace: e.GetNamespace(),
EventHub: e.GetEventHub(),
ClientID: e.GetClientId(),
TenantID: e.GetTenantId(),
SubscriptionID: e.GetSubscriptionId(),
GCPServiceAccount: e.GetGcpServiceAccount(),
}
} else if m := pbs.GetAwsMsk(); m != nil {
s.Source = &IngestionDataSourceAmazonMSK{
State: AmazonMSKState(m.GetState()),
ClusterARN: m.GetClusterArn(),
Topic: m.GetTopic(),
AWSRoleARN: m.GetAwsRoleArn(),
GCPServiceAccount: m.GetGcpServiceAccount(),
}
} else if c := pbs.GetConfluentCloud(); c != nil {
s.Source = &IngestionDataSourceConfluentCloud{
State: ConfluentCloudState(c.GetState()),
BootstrapServer: c.GetBootstrapServer(),
ClusterID: c.GetClusterId(),
Topic: c.GetTopic(),
IdentityPoolID: c.GetIdentityPoolId(),
GCPServiceAccount: c.GetGcpServiceAccount(),
}
}

if pbs.PlatformLogsSettings != nil {
Expand Down Expand Up @@ -681,7 +892,6 @@ func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings
case *IngestionDataSourceCloudStorageAvroFormat:
pbs.Source = &pb.IngestionDataSourceSettings_CloudStorage_{
CloudStorage: &pb.IngestionDataSourceSettings_CloudStorage{
State: pb.IngestionDataSourceSettings_CloudStorage_State(cs.State),
Bucket: cs.Bucket,
InputFormat: &pb.IngestionDataSourceSettings_CloudStorage_AvroFormat_{
AvroFormat: &pb.IngestionDataSourceSettings_CloudStorage_AvroFormat{},
Expand All @@ -704,6 +914,40 @@ func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings
}
}
}
if e, ok := out.(*IngestionDataSourceAzureEventHubs); ok {
pbs.Source = &pb.IngestionDataSourceSettings_AzureEventHubs_{
AzureEventHubs: &pb.IngestionDataSourceSettings_AzureEventHubs{
ResourceGroup: e.ResourceGroup,
Namespace: e.Namespace,
EventHub: e.EventHub,
ClientId: e.ClientID,
TenantId: e.TenantID,
SubscriptionId: e.SubscriptionID,
GcpServiceAccount: e.GCPServiceAccount,
},
}
}
if m, ok := out.(*IngestionDataSourceAmazonMSK); ok {
pbs.Source = &pb.IngestionDataSourceSettings_AwsMsk_{
AwsMsk: &pb.IngestionDataSourceSettings_AwsMsk{
ClusterArn: m.ClusterARN,
Topic: m.Topic,
AwsRoleArn: m.AWSRoleARN,
GcpServiceAccount: m.GCPServiceAccount,
},
}
}
if c, ok := out.(*IngestionDataSourceConfluentCloud); ok {
pbs.Source = &pb.IngestionDataSourceSettings_ConfluentCloud_{
ConfluentCloud: &pb.IngestionDataSourceSettings_ConfluentCloud{
BootstrapServer: c.BootstrapServer,
ClusterId: c.ClusterID,
Topic: c.Topic,
IdentityPoolId: c.IdentityPoolID,
GcpServiceAccount: c.GCPServiceAccount,
},
}
}
}
return pbs
}
Expand Down
69 changes: 69 additions & 0 deletions pubsub/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,75 @@ func TestTopic_IngestionCloudStorage(t *testing.T) {
}
}

func TestTopic_Ingestion(t *testing.T) {
c, srv := newFake(t)
defer c.Close()
defer srv.Close()
id := "test-topic-3p-ingestion"
gcpSA := "[email protected]"
azureIngestion := &IngestionDataSourceAzureEventHubs{
ResourceGroup: "fake-resource-group",
Namespace: "fake-namespace",
EventHub: "fake-event-hub",
ClientID: "11111111-1111-1111-1111-111111111111",
TenantID: "22222222-2222-2222-2222-222222222222",
SubscriptionID: "33333333-3333-3333-3333-333333333333",
GCPServiceAccount: gcpSA,
}
mskIngestion := &IngestionDataSourceAmazonMSK{
ClusterARN: "arn:aws:kafka:us-east-1:111111111111:cluster/fake-cluster-name/11111111-1111-1",
Topic: "fake-msk-topic-name",
AWSRoleARN: "arn:aws:iam::111111111111:role/fake-role-name",
GCPServiceAccount: gcpSA,
}
confluentCloud := &IngestionDataSourceConfluentCloud{
BootstrapServer: "fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092",
ClusterID: "fake-cluster-id",
Topic: "fake-confluent-topic-name",
IdentityPoolID: "fake-pool-id",
GCPServiceAccount: gcpSA,
}
want := TopicConfig{
IngestionDataSourceSettings: &IngestionDataSourceSettings{
Source: azureIngestion,
},
}
topic := mustCreateTopicWithConfig(t, c, id, &want)
got, err := topic.Config(context.Background())
if err != nil {
t.Fatalf("error getting topic config: %v", err)
}
want.State = TopicStateActive
opt := cmpopts.IgnoreUnexported(TopicConfig{})
if !testutil.Equal(got, want, opt) {
t.Errorf("got %v, want %v", got, want)
}

// Update ingestion settings to use MSK
ctx := context.Background()
settings := &IngestionDataSourceSettings{
Source: mskIngestion,
}
config2, err := topic.Update(ctx, TopicConfigToUpdate{IngestionDataSourceSettings: settings})
if err != nil {
t.Fatal(err)
}
if !testutil.Equal(config2.IngestionDataSourceSettings, settings, opt) {
t.Errorf("\ngot %+v\nwant %+v", config2.IngestionDataSourceSettings, settings)
}

settings = &IngestionDataSourceSettings{
Source: confluentCloud,
}
config3, err := topic.Update(ctx, TopicConfigToUpdate{IngestionDataSourceSettings: settings})
if err != nil {
t.Fatal(err)
}
if !testutil.Equal(config3.IngestionDataSourceSettings, settings, opt) {
t.Errorf("\ngot %+v\nwant %+v", config3.IngestionDataSourceSettings, settings)
}
}

func TestListTopics(t *testing.T) {
ctx := context.Background()
c, srv := newFake(t)
Expand Down

0 comments on commit 46d6ed4

Please sign in to comment.