Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 265 additions & 0 deletions _data-prepper/pipelines/configuration/sources/rds.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
---
layout: default
title: rds
parent: Sources
grand_parent: Pipelines
nav_order: 95
---

# rds

Check failure on line 9 in _data-prepper/pipelines/configuration/sources/rds.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.HeadingCapitalization] 'rds' is a heading and should be in sentence case. Raw Output: {"message": "[OpenSearch.HeadingCapitalization] 'rds' is a heading and should be in sentence case.", "location": {"path": "_data-prepper/pipelines/configuration/sources/rds.md", "range": {"start": {"line": 9, "column": 3}}}, "severity": "ERROR"}

Check failure on line 9 in _data-prepper/pipelines/configuration/sources/rds.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.Spelling] Error: rds. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks. Raw Output: {"message": "[OpenSearch.Spelling] Error: rds. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks.", "location": {"path": "_data-prepper/pipelines/configuration/sources/rds.md", "range": {"start": {"line": 9, "column": 3}}}, "severity": "ERROR"}

The `rds` source enables change data capture (CDC) on [Amazon Relational Database Service (RDS)](https://aws.amazon.com/rds/) and [Amazon Aurora](https://aws.amazon.com/aurora/) databases. It can receive database events, such as `INSERT`, `UPDATE`, or `DELETE`, using database replication logs and supports initial load using RDS exports to Amazon S3.

The source supports the following database engines:
- Aurora MySQL and Aurora PostgreSQL
- RDS MySQL and RDS PostgreSQL

The source includes two ingestion options to ingest data from Aurora/RDS:

1. Export: A full initial export using Aurora/RDS export to S3 gets an initial load of the current state of the Aurora/RDS database.
2. Stream: Stream events from database replication logs (MySQL binlog or PostgreSQL WAL).

Check failure on line 20 in _data-prepper/pipelines/configuration/sources/rds.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.Spelling] Error: binlog. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks. Raw Output: {"message": "[OpenSearch.Spelling] Error: binlog. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks.", "location": {"path": "_data-prepper/pipelines/configuration/sources/rds.md", "range": {"start": {"line": 20, "column": 64}}}, "severity": "ERROR"}

## Usage

The following example pipeline specifies an `rds` source. It ingests data from an Aurora MySQL cluster:

```yaml
version: "2"
rds-pipeline:
source:
rds:
db_identifier: "my-rds-instance"
engine: "aurora-mysql"
database: "mydb"
authentication:
username: "myuser"
password: "mypassword"
s3_bucket: "my-export-bucket"
s3_region: "us-west-2"
s3_prefix: "rds-exports"
export:
kms_key_id: "arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012"
export_role_arn: "arn:aws:iam::123456789012:role/rds-export-role"
stream: true
aws:
region: "us-west-2"
sts_role_arn: "arn:aws:iam::123456789012:role/my-pipeline-role"
```

## Configuration options

The following tables describe the configuration options for the `rds` source.

Option | Required | Type | Description
:--- | :--- | :--- | :---
`db_identifier` | Yes | String | The identifier for the RDS instance or Aurora cluster.
`cluster` | No | Boolean | Whether the `db_identifier` refers to a cluster (`true`) or an instance (`false`). Default is `false`. For Aurora engines, this option is always `true`.
`engine` | Yes | String | The database engine type. Must be one of: `mysql`, `postgresql`, `aurora-mysql`, or `aurora-postgresql`.
`database` | Yes | String | The name of the database to connect to.
`tables` | No | Object | Configuration for filtering tables to include or exclude. See [tables](#tables) for more information.
`authentication` | Yes | Object | Database authentication credentials. See [authentication](#authentication) for more information.
`aws` | Yes | Object | The AWS configuration. See [aws](#aws) for more information.

Check failure on line 61 in _data-prepper/pipelines/configuration/sources/rds.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.HeadingCapitalization] 'aws' is a heading and should be in sentence case. Raw Output: {"message": "[OpenSearch.HeadingCapitalization] 'aws' is a heading and should be in sentence case.", "location": {"path": "_data-prepper/pipelines/configuration/sources/rds.md", "range": {"start": {"line": 61, "column": 52}}}, "severity": "ERROR"}

Check failure on line 61 in _data-prepper/pipelines/configuration/sources/rds.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.Spelling] Error: aws. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks. Raw Output: {"message": "[OpenSearch.Spelling] Error: aws. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks.", "location": {"path": "_data-prepper/pipelines/configuration/sources/rds.md", "range": {"start": {"line": 61, "column": 52}}}, "severity": "ERROR"}

Check failure on line 61 in _data-prepper/pipelines/configuration/sources/rds.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.Spelling] Error: aws. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks. Raw Output: {"message": "[OpenSearch.Spelling] Error: aws. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks.", "location": {"path": "_data-prepper/pipelines/configuration/sources/rds.md", "range": {"start": {"line": 61, "column": 2}}}, "severity": "ERROR"}
`acknowledgments` | No | Boolean | When `true`, enables the source to receive [end-to-end acknowledgments]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/pipelines#end-to-end-acknowledgments) when events are received by OpenSearch sinks. Default is `true`.
`s3_data_file_acknowledgment_timeout` | No | Duration | The amount of time that elapses before the data read from an RDS export expires when used with acknowledgments. Default is 30 minutes.
`stream_acknowledgment_timeout` | No | Duration | The amount of time that elapses before the data read from database streams expires when used with acknowledgments. Default is 10 minutes.
`s3_bucket` | Yes | String | The S3 bucket name where RDS export data will be stored.
`s3_prefix` | No | String | The prefix for S3 objects in the export bucket.
`s3_region` | No | String | The AWS Region for the S3 bucket. If not specified, uses the same region as specified in [aws](#aws) configuration.

Check failure on line 67 in _data-prepper/pipelines/configuration/sources/rds.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.Spelling] Error: aws. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks. Raw Output: {"message": "[OpenSearch.Spelling] Error: aws. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks.", "location": {"path": "_data-prepper/pipelines/configuration/sources/rds.md", "range": {"start": {"line": 67, "column": 119}}}, "severity": "ERROR"}
`partition_count` | No | Integer | The number of folder partitions in the S3 buffer. Must be between 1 and 1000. Default is 100.
`export` | No | Object | Configuration for RDS export operations. See [export](#export-options) for more information.
`stream` | No | Boolean | Whether to enable streaming of database change events. Default is `false`.
`tls` | No | Object | TLS configuration for database connections. See [tls](#tls-options) for more information.

Check failure on line 71 in _data-prepper/pipelines/configuration/sources/rds.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.Spelling] Error: tls. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks. Raw Output: {"message": "[OpenSearch.Spelling] Error: tls. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks.", "location": {"path": "_data-prepper/pipelines/configuration/sources/rds.md", "range": {"start": {"line": 71, "column": 72}}}, "severity": "ERROR"}
`disable_s3_read_for_leader` | No | Boolean | Whether to disable S3 read operations for the leader node. Default is `false`.

### aws

Use the following options in the AWS configuration.

Option | Required | Type | Description
:--- | :--- | :--- | :---
`region` | No | String | The AWS Region to use for credentials. Defaults to [standard SDK behavior to determine the Region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
`sts_role_arn` | No | String | The AWS Security Token Service (AWS STS) role to assume for requests to Amazon RDS and Amazon S3. Defaults to `null`, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).
`sts_external_id` | No | String | The external ID to use when assuming the STS role. Must be between 2 and 1224 characters.
`sts_header_overrides` | No | Map | A map of header overrides that the AWS Identity and Access Management (IAM) role assumes for the source plugin. Maximum of 5 headers.

### authentication

Check failure on line 85 in _data-prepper/pipelines/configuration/sources/rds.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.HeadingCapitalization] 'authentication' is a heading and should be in sentence case. Raw Output: {"message": "[OpenSearch.HeadingCapitalization] 'authentication' is a heading and should be in sentence case.", "location": {"path": "_data-prepper/pipelines/configuration/sources/rds.md", "range": {"start": {"line": 85, "column": 5}}}, "severity": "ERROR"}

Use the following options for database authentication.

Option | Required | Type | Description
:--- | :--- | :--- | :---
`username` | Yes | String | The database username for authentication.
`password` | Yes | String | The database password for authentication.

### tables

Use the following options to filter which tables to include in the data capture.

Option | Required | Type | Description
:--- | :--- | :--- | :---
`include` | No | List | A list of table names to include in data capture. Maximum of 1000 tables. If specified, only these tables will be processed.
`exclude` | No | List | A list of table names to exclude from data capture. Maximum of 1000 tables. These tables will be ignored even if they match include patterns.

### export options

The following options let you customize the RDS export functionality.

Option | Required | Type | Description
:--- | :--- | :--- | :---
`kms_key_id` | Yes | String | The AWS KMS key ID or ARN to use for encrypting the export data.
`export_role_arn` | Yes | String | The ARN of the IAM role that RDS will assume to perform the export operation.

### tls options

The following options let you configure TLS for database connections.

Option | Required | Type | Description
:--- | :--- | :--- | :---
`insecure` | No | Boolean | Whether to disable TLS encryption for database connections. Default is `false` (TLS enabled).

## Exposed metadata attributes

The following metadata will be added to each event that is processed by the `rds` source. These metadata attributes can be accessed using the [expression syntax `getMetadata` function]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/get-metadata/).

* `primary_key`: The primary key of the database record. For tables with composite primary keys, values are concatenated with a `|` separator.
* `event_timestamp`: The timestamp in epoch millisecond when the database change occurred. For export events, this represents the export time. For stream events, this represents the transaction commit time.
* `document_version`: A long integer number generated from event timestamp to use as document version.
* `opensearch_action`: The bulk action that will be used to send the event to OpenSearch, such as `index`, `delete`, etc.

Check warning on line 127 in _data-prepper/pipelines/configuration/sources/rds.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.LatinismsElimination] Using 'etc.' is unnecessary. Remove. Raw Output: {"message": "[OpenSearch.LatinismsElimination] Using 'etc.' is unnecessary. Remove.", "location": {"path": "_data-prepper/pipelines/configuration/sources/rds.md", "range": {"start": {"line": 127, "column": 118}}}, "severity": "WARNING"}
* `change_event_type`: The stream event type, can be `insert`, `update` or `delete`.
* `table_name`: The name of the database table that the event came from.
* `schema_name`: The name of the schema that the event came from. For MySQL, schema_name is the same as database_name.
* `database_name`: The name of the database that the event came from.
* `ingestion_type`: Indicates whether the event comes from export or stream. Valid values are `EXPORT` and `STREAM`.
* `s3_partition_key`: Events are put in a staging S3 bucket before processing. This metadata indicates the location in S3 bucket where the event is put before getting processed.

## Permissions

The following are the required permissions for running RDS as a source:

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "allowReadingFromS3Buckets",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:DeleteObject",
"s3:GetBucketLocation",
"s3:ListBucket",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::s3_bucket",
"arn:aws:s3:::s3_bucket/*"
]
},
{
"Sid": "AllowDescribeInstances",
"Effect": "Allow",
"Action": [
"rds:DescribeDBInstances"
],
"Resource": [
"arn:aws:rds:region:account-id:db:*"
]
},
{
"Sid": "AllowDescribeClusters",
"Effect": "Allow",
"Action": [
"rds:DescribeDBClusters"
],
"Resource": [
"arn:aws:rds:region:account-id:cluster:*"
]
},
{
"Sid": "AllowSnapshots",
"Effect": "Allow",
"Action": [
"rds:DescribeDBClusterSnapshots",
"rds:CreateDBClusterSnapshot",
"rds:DescribeDBSnapshots",
"rds:CreateDBSnapshot",
"rds:AddTagsToResource"
],
"Resource": [
"arn:aws:rds:region:account-id:cluster:*",
"arn:aws:rds:region:account-id:cluster-snapshot:*",
"arn:aws:rds:region:account-id:db:*",
"arn:aws:rds:region:account-id:snapshot:*"
]
},
{
"Sid": "AllowExport",
"Effect": "Allow",
"Action": [
"rds:StartExportTask"
],
"Resource": [
"arn:aws:rds:region:account-id:cluster:*",
"arn:aws:rds:region:account-id:cluster-snapshot:*",
"arn:aws:rds:region:account-id:snapshot:*"
]
},
{
"Sid": "AllowDescribeExports",
"Effect": "Allow",
"Action": [
"rds:DescribeExportTasks"
],
"Resource": "*"
},
{
"Sid": "AllowAccessToKmsForExport",
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:Encrypt",
"kms:DescribeKey",
"kms:RetireGrant",
"kms:CreateGrant",
"kms:ReEncrypt*",
"kms:GenerateDataKey*"
],
"Resource": [
"arn:aws:kms:region:account-id:key/export-key-id"
]
},
{
"Sid": "AllowPassingExportRole",
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": [
"arn:aws:iam::account-id:role/export-role"
]
}
]
}
```

## Metrics

The `rds` source includes the following metrics.

* `exportJobSuccess`: The number of RDS export tasks that have succeeded.
* `exportJobFailure`: The number of RDS export tasks that have failed.
* `exportS3ObjectsTotal`: The total number of export data files found in S3.
* `exportS3ObjectsProcessed`: The total number of export data files that have been processed successfully from S3.
* `exportS3ObjectsErrors`: The total number of export data files that fail to be processed from S3.
* `exportRecordsTotal`: The total number of records found in the export.
* `exportRecordsProcessed`: The total number of export records that have been processed successfully.
* `exportRecordsProcessingErrors`: The number of export record processing errors.
* `changeEventsProcessed`: The number of change events processed from database streams.
* `changeEventsProcessingErrors`: The number of processing errors for change events from database streams.
* `bytesReceived`: The total number of bytes received by the source, both export and stream included.
* `bytesProcessed`: The total number of bytes processed by the source, both export and stream included.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* `bytesProcessed`: The total number of bytes processed by the source, both export and stream included.
* `bytesProcessed`: The total number of bytes processed by the source.

* `positiveAcknowledgementSets`: The number of acknowledgement sets that are positively acknowledged in stream processing.
* `negativeAcknowledgementSets`: The number of acknowledgement sets that are negatively acknowledged in stream processing.
* `checkpointCount`: The total number of checkpoints in stream processing.
* `noDataExtendLeaseCount`: The number of times that lease is extended on a partition with no new data processed since last checkpoint.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* `noDataExtendLeaseCount`: The number of times that lease is extended on a partition with no new data processed since last checkpoint.
* `noDataExtendLeaseCount`: The number of times that the lease is extended on a partition with no new data processed since the last checkpoint.

* `giveupPartitionCount`: The number of times a partition is given up.
* `replicationLogEntryProcessingTime`: The time it takes to process a replication log event.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* `replicationLogEntryProcessingTime`: The time it takes to process a replication log event.
* `replicationLogEntryProcessingTime`: The time taken to process a replication log event.

* `replicationLogEntryProcessingErrors`: The number of replication log events that fail to process.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* `replicationLogEntryProcessingErrors`: The number of replication log events that fail to process.
* `replicationLogEntryProcessingErrors`: The number of replication log events that have failed to process.

Loading