-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathconnector.yaml
299 lines (277 loc) · 11.2 KB
/
connector.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
version: "1.0"
specification:
name: postgres
summary: Conduit connector for PostgreSQL
description: |
## Source
The Postgres Source Connector connects to a database with the provided `url` and
starts creating records for each change detected in the provided tables.
Upon starting, the source takes a snapshot of the provided tables in the database,
then switches into CDC mode. In CDC mode, the plugin reads from a buffer of CDC events.
### Snapshot
When the connector first starts, snapshot mode is enabled. The connector acquires
a read-only lock on the tables, and then reads all rows of the tables into Conduit.
Once all rows in that initial snapshot are read the connector releases its lock and
switches into CDC mode.
This behavior is enabled by default, but can be turned off by adding
`"snapshotMode": "never"` to the Source configuration.
### Change Data Capture
This connector implements Change Data Capture (CDC) features for PostgreSQL by
creating a logical replication slot and a publication that listens to changes in the
configured tables. Every detected change is converted into a record. If there are no
records available, the connector blocks until a record is available or the connector
receives a stop signal.
#### Logical Replication Configuration
When the connector switches to CDC mode, it attempts to run the initial setup commands
to create its logical replication slot and publication. It will connect to an existing
slot if one with the configured name exists.
The Postgres user specified in the connection URL must have sufficient privileges to
run all of these setup commands, or it will fail.
Example pipeline configuration that's using logical replication:
```yaml
version: 2.2
pipelines:
- id: pg-to-log
status: running
connectors:
- id: pg
type: source
plugin: builtin:postgres
settings:
url: "postgres://exampleuser:examplepass@localhost:5433/exampledb?sslmode=disable"
tables: "users"
cdcMode: "logrepl"
logrepl.publicationName: "examplepub"
logrepl.slotName": "exampleslot"
- id: log
type: destination
plugin: builtin:log
settings:
level: info
```
:warning: When the connector or pipeline is deleted, the connector will automatically
attempt to delete the replication slot and publication. This is the default behaviour
and can be disabled by setting `logrepl.autoCleanup` to `false`.
### Key Handling
The connector will automatically look up the primary key column for the specified tables
and use them as the key value. If that can't be determined, the connector will return
an error.
## Destination
The Postgres Destination takes a Conduit record and stores it using a SQL statement.
The Destination is designed to handle different payloads and keys. Because of this,
each record is individually parsed and upserted.
### Handling record operations
Based on the `Operation` field in the record, the destination will either insert,
update or delete the record in the target table. Snapshot records are always inserted.
If the target table already contains a record with the same key as a record being
inserted, the record will be updated (upserted). This can overwrite and thus potentially
lose data, so keys should be assigned correctly from the Source.
If the target table does not contain a record with the same key as a record being
deleted, the record will be ignored.
If there is no key, the record will be simply appended.
version: v0.11.0
author: Meroxa, Inc.
source:
parameters:
- name: url
description: URL is the connection string for the Postgres database.
type: string
default: ""
validations:
- type: required
value: ""
- name: cdcMode
description: CDCMode determines how the connector should listen to changes.
type: string
default: auto
validations:
- type: inclusion
value: auto,logrepl
- name: logrepl.autoCleanup
description: |-
LogreplAutoCleanup determines if the replication slot and publication should be
removed when the connector is deleted.
type: bool
default: "true"
validations: []
- name: logrepl.publicationName
description: |-
LogreplPublicationName determines the publication name in case the
connector uses logical replication to listen to changes (see CDCMode).
type: string
default: conduitpub
validations: []
- name: logrepl.slotName
description: |-
LogreplSlotName determines the replication slot name in case the
connector uses logical replication to listen to changes (see CDCMode).
type: string
default: conduitslot
validations: []
- name: logrepl.withAvroSchema
description: |-
WithAvroSchema determines whether the connector should attach an avro schema on each
record.
type: bool
default: "true"
validations: []
- name: sdk.batch.delay
description: Maximum delay before an incomplete batch is read from the source.
type: duration
default: "0"
validations:
- type: greater-than
value: "-1"
- name: sdk.batch.size
description: Maximum size of batch before it gets read from the source.
type: int
default: "0"
validations:
- type: greater-than
value: "-1"
- name: sdk.schema.context.enabled
description: |-
Specifies whether to use a schema context name. If set to false, no schema context name will
be used, and schemas will be saved with the subject name specified in the connector
(not safe because of name conflicts).
type: bool
default: "true"
validations: []
- name: sdk.schema.context.name
description: |-
Schema context name to be used. Used as a prefix for all schema subject names.
If empty, defaults to the connector ID.
type: string
default: ""
validations: []
- name: sdk.schema.extract.key.enabled
description: Whether to extract and encode the record key with a schema.
type: bool
default: "false"
validations: []
- name: sdk.schema.extract.key.subject
description: |-
The subject of the key schema. If the record metadata contains the field
"opencdc.collection" it is prepended to the subject name and separated
with a dot.
type: string
default: key
validations: []
- name: sdk.schema.extract.payload.enabled
description: Whether to extract and encode the record payload with a schema.
type: bool
default: "false"
validations: []
- name: sdk.schema.extract.payload.subject
description: |-
The subject of the payload schema. If the record metadata contains the
field "opencdc.collection" it is prepended to the subject name and
separated with a dot.
type: string
default: payload
validations: []
- name: sdk.schema.extract.type
description: The type of the payload schema.
type: string
default: avro
validations:
- type: inclusion
value: avro
- name: snapshot.fetchSize
description: Snapshot fetcher size determines the number of rows to retrieve at a time.
type: int
default: "50000"
validations: []
- name: snapshotMode
description: SnapshotMode is whether the plugin will take a snapshot of the entire table before starting cdc mode.
type: string
default: initial
validations:
- type: inclusion
value: initial,never
- name: table
description: 'Deprecated: use `tables` instead.'
type: string
default: ""
validations: []
- name: tables
description: |-
Tables is a List of table names to read from, separated by a comma, e.g.:"table1,table2".
Use "*" if you'd like to listen to all tables.
type: string
default: ""
validations: []
destination:
parameters:
- name: url
description: URL is the connection string for the Postgres database.
type: string
default: ""
validations:
- type: required
value: ""
- name: key
description: Key represents the column name for the key used to identify and update existing rows.
type: string
default: ""
validations: []
- name: sdk.batch.delay
description: Maximum delay before an incomplete batch is written to the destination.
type: duration
default: "0"
validations: []
- name: sdk.batch.size
description: Maximum size of batch before it gets written to the destination.
type: int
default: "0"
validations:
- type: greater-than
value: "-1"
- name: sdk.rate.burst
description: |-
Allow bursts of at most X records (0 or less means that bursts are not
limited). Only takes effect if a rate limit per second is set. Note that
if `sdk.batch.size` is bigger than `sdk.rate.burst`, the effective batch
size will be equal to `sdk.rate.burst`.
type: int
default: "0"
validations:
- type: greater-than
value: "-1"
- name: sdk.rate.perSecond
description: Maximum number of records written per second (0 means no rate limit).
type: float
default: "0"
validations:
- type: greater-than
value: "-1"
- name: sdk.record.format
description: |-
The format of the output record. See the Conduit documentation for a full
list of supported formats (https://conduit.io/docs/using/connectors/configuration-parameters/output-format).
type: string
default: opencdc/json
validations: []
- name: sdk.record.format.options
description: |-
Options to configure the chosen output record format. Options are normally
key=value pairs separated with comma (e.g. opt1=val2,opt2=val2), except
for the `template` record format, where options are a Go template.
type: string
default: ""
validations: []
- name: sdk.schema.extract.key.enabled
description: Whether to extract and decode the record key with a schema.
type: bool
default: "true"
validations: []
- name: sdk.schema.extract.payload.enabled
description: Whether to extract and decode the record payload with a schema.
type: bool
default: "true"
validations: []
- name: table
description: Table is used as the target table into which records are inserted.
type: string
default: '{{ index .Metadata "opencdc.collection" }}'
validations: []