Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions website/docs/hoodie_deltastreamer.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,54 @@ when reading data from Kafka:

- AVRO
- JSON
- PROTO

Note that for the proto-based Kafka source, it is mandatory to provide the proto schema classname, which is specified by
`hoodie.deltastreamer.schemaprovider.proto.class.name` config. Few other configurations that can be useful when using
proto class-based schema provider are as follows:

|Config| Default | Description | Scope | Since Version |
|---|---|---|---|---|
| hoodie.deltastreamer.schemaprovider.proto.flatten.wrappers | false | ProtoKafkaSource | When set to true wrapped primitives like Int64Value are translated to a record with a single 'value' field. | 0.13.0 |
| hoodie.deltastreamer.schemaprovider.proto.timestamps.as.records | false | ProtoKafkaSource | When set to true Timestamp fields are translated to a record with a seconds and nanos field. | 0.13.0 |
| hoodie.deltastreamer.schemaprovider.proto..max.recursion.depth | 5 | ProtoKafkaSource | The max depth to unravel the Proto schema when translating into an Avro schema. Setting this depth allows the user to convert a schema that is recursive in proto into something that can be represented in their lake format like Parquet. | 0.13.0 |

### Pulsar
[Apache Pulsar](https://pulsar.apache.org/) is an open-source, distributed messaging and streaming platform built for
the cloud. `PulsarSource` supports ingesting from Apache Pulsar through the deltastreamer. To use the source you can
use following command as an example:

```
export TOPIC_NAME=stonks
./bin/spark-submit \
--master 'local[2]' \
--deploy-mode client \
--packages io.streamnative.connectors:pulsar-spark-connector_2.12:3.1.1.4 \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer <hudi.jar> \
--table-type COPY_ON_WRITE \
--source-class org.apache.hudi.utilities.sources.PulsarSource \
--source-ordering-field ts \
--target-base-path file:///data/tables/$TOPIC_NAME \
--target-table $TOPIC_NAME \
--hoodie-conf hoodie.datasource.write.recordkey.field=key \
--hoodie-conf hoodie.datasource.write.partitionpath.field=date \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \
--hoodie-conf hoodie.deltastreamer.source.pulsar.topic=$TOPIC_NAME \
--hoodie-conf hoodie.deltastreamer.source.pulsar.offset.autoResetStrategy=EARLIEST \
--hoodie-conf hoodie.deltastreamer.source.pulsar.endpoint.service.url=pulsar://localhost:6650 \
--hoodie-conf hoodie.deltastreamer.source.pulsar.endpoint.admin.url=http://localhost:8080

```

**Configurations for PulsarSource:**

|Config| Default | Description | Scope | Since Version |
|---|---|---|---|---|
| hoodie.deltastreamer.source.pulsar.topic | | PulsarSource | Name of the target Pulsar topic to source the data from. This is a mandatory config | 0.13.0 |
| hoodie.deltastreamer.source.pulsar.endpoint.service.url | pulsar://localhost:6650 | PulsarSource | URL of the target Pulsar endpoint (of the form 'pulsar://host:port'). | 0.13.0 |
| hoodie.deltastreamer.source.pulsar.endpoint.admin.url | http://localhost:8080 | PulsarSource | URL of the Pulsar admin endpoint. | 0.13.0 |
| hoodie.deltastreamer.source.pulsar.offset.autoResetStrategy | LATEST | PulsarSource | Policy determining how offsets shall be automatically reset in case there is no checkpoint information present. Other options: EARLIEST and FAIL. | 0.13.0 |
| hoodie.deltastreamer.source.pulsar.maxRecords | 5,000,000 | PulsarSource | Max number of records obtained in a single each batch. | 0.13.0 |

### S3 Events
AWS S3 storage provides an event notification service which will post notifications when certain events happen in your S3 bucket:
Expand All @@ -340,6 +388,106 @@ to trigger/processing of new or changed data as soon as it is available on S3.

Insert code sample from this blog: https://hudi.apache.org/blog/2021/08/23/s3-events-source/#configuration-and-setup

### GCS Events

Google Cloud Storage (GCS) service provides an event notification mechanism which will post notifications when certain
events happen in your GCS bucket. You can read more
at [Pub/Sub Notifications](https://cloud.google.com/storage/docs/pubsub-notifications/). GCS will put these events in a
Cloud Pub/Sub topic. Apache Hudi provides a GcsEventsSource that can read from Cloud Pub/Sub to trigger/processing of
new or changed data as soon as it is available on GCS. The architecture is very similar to that
of [S3 events sources](https://hudi.apache.org/blog/2021/08/23/s3-events-source).

#### Setup

1. Configure Cloud Storage Pub/Sub notifications for the bucket. Follow Google’s
documentation [here](https://cloud.google.com/storage/docs/reporting-changes).
2. Create a Pub/Sub subscription corresponding to the topic.
3. Note the GCP Project Id, the Pub/Sub Subscription Id and use them for the following Hoodie configurations:
1. `hoodie.deltastreamer.source.gcs.project.id=GCP_PROJECT_ID`
2. `hoodie.deltastreamer.source.gcs.subscription.id=SUBSCRIPTION_ID`
3. Start the `GcsEventsSource` using the `HoodieDeltaStreamer` utility with --source-class parameter as
`org.apache.hudi.utilities.sources.GcsEventsSource` and `hoodie.deltastreamer.source.cloud.meta.ack=true`, and
path related configs as described in the detailed guide mentiond above.
4. Start the `GcsEventsHoodieIncrSource` using the `HoodieDeltaStreamer` utility with --source-class parameter as
`org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource` and other parameters as mentioned in the detailed guide
above.

Below are sample `spark-submit` commands for the two sources. Note that apart from hudi-utilities-bundle, you also need
to provide [hudi-gcp-bundle](https://mvnrepository.com/artifact/org.apache.hudi/hudi-gcp-bundle) for GCP Pub/Sub and GCS
connector dependencies.

<details>
<summary>GcsEventsSource</summary>
<p>

```bash
spark-submit \
--jars "/path/to/hudi-utilities-bundle_2.12-0.13.0.jar,/path/to/hudi-gcp-bundle-0.13.0.jar" \
--driver-memory 4g \
--executor-memory 4g \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
/path/to/hudi-utilities-bundle_2.12-0.13.0.jar \
--source-class org.apache.hudi.utilities.sources.GcsEventsSource \
--op INSERT \
--hoodie-conf hoodie.datasource.write.recordkey.field=id \
--source-ordering-field timeCreated \
--hoodie-conf hoodie.index.type=GLOBAL_BLOOM \
--filter-dupes \
--allow-commit-on-no-checkpoint-change \
--hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
--hoodie-conf hoodie.combine.before.insert=true \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \
--hoodie-conf hoodie.datasource.write.partitionpath.field=bucket \
--hoodie-conf hoodie.deltastreamer.source.gcs.project.id=GCP_PROJECT_ID \
--hoodie-conf hoodie.deltastreamer.source.gcs.subscription.id=SUSBCRIPTION_ID \
--hoodie-conf hoodie.deltastreamer.source.cloud.meta.ack=true \
--target-base-path /base/path/of/event/metadata/table \
--target-table gcs_meta_hive \
--continuous \
--source-limit 100 \
--min-sync-interval-seconds 100
```

</p>
</details>

<details>
<summary>GcsEventsHoodieIncrSource</summary>
<p>

```bash
spark-submit \
--jars "/path/to/hudi-utilities-bundle_2.12-0.13.0.jar,/path/to/hudi-gcp-bundle-0.13.0.jar" \
--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem \
--conf spark.hadoop.fs.AbstractFileSystem.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS \
--driver-memory 4g \
--executor-memory 4g \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
/path/to/hudi-utilities-bundle_2.12-0.13.0.jar \
--source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
--op INSERT \
--hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format=parquet \
--hoodie-conf hoodie.deltastreamer.source.cloud.data.select.file.extension=jsonl \
--hoodie-conf hoodie.deltastreamer.source.cloud.data.datafile.format=json \
--hoodie-conf hoodie.datasource.write.recordkey.field=id \
--hoodie-conf hoodie.datasource.write.partitionpath.field= \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
--hoodie-conf hoodie.deltastreamer.source.hoodieincr.path=/base/path/of/event/metadata/table \
--hoodie-conf hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT
--hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
--hoodie-conf hoodie.combine.before.insert=true \
--filter-dupes \
--source-ordering-field id \
--target-base-path /base/path/of/data/table \
--target-table gcs_data_hive \
--continuous \
--source-limit 100 \
--min-sync-interval-seconds 60
```

</p>
</details>

### JDBC Source
Hudi can read from a JDBC source with a full fetch of a table, or Hudi can even read incrementally with checkpointing from a JDBC source.

Expand Down