You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In Spark and Hadoop ecosystem, we usually assume that every single node is unstable, and treat a part of nodes' failures as the norm, but it does not suitable for writing data to a third-party database that does not support distributed transactions and deduplicated mechanism.
To achieve the exactly-once semantic for writing, we must disable the following Spark configurations to avoid data duplication in writing.
In a preemption-enabled YARN cluster, one container is killed would cause the whole ClickHouse writing job failure.
Proposal
Briefly, since ClickHouse supports MOVE PARTITION TO TABLE, we can write data to temporary tables instead of writing data to the target table directly, and move the data to the target table iff all partition write success.
Because of the retry and speculation mechanism, one partition of DataFrame/RDD may be written multiple times by different tasks, a stage success means each partition of DataFrame/RDD has been written success at least one time, and the driver finally knows that information, then it can choose one of each partition's success data to move to the target table.
Because the task can be identified by partition_id and attemp_id, and the ClickHouse's MOVE PARTITION TO TABLE requires the following conditions
Both tables must have the same structure.
Both tables must have the same partition key.
Both tables must be the same engine family (replicated or non-replicated).
Both tables must have the same storage policy.
We may need to create a temporary table using CREATE TABLE LIKE per task, w/ name {may_another_db}.{original_table_name}_{application_id}_{stage_id}_{stage_attempt_id}_{task_id}_{task_attempt_id}
After the stage success, move the data to the target table and drop others.
The Spark DataSource V2 API StagedTable is helpful for implementation.
public interface StagedTable extends Table {
/**
* Finalize the creation or replacement of this table.
*/
void commitStagedChanges();
/**
* Abort the changes that were staged, both in metadata and from temporary outputs of this
* table's writers.
*/
void abortStagedChanges();
}
For Distributed table which is sharding by rand(), we may need some strategy on the Spark side e.g. calculate hash(*) for every row as sharding keys to avoid writing data into different shards in task retry.
For Replicated table, the CREATE TABLE LIKE should also change the Zookeeper path.
Limitations
The operation MOVE PARTTION on the commit phase of tables generated by success tasks is not atomic, we may still get dirty data if something goes wrong in that phase, especially for Replicated tables.
Alternatives
Use ReplacingMergeTree instead of MergeTree
I'd like to say it's my first choice in the beginning I was learning to use ClickHouse. It's good for off-line batch load data to ClickHouse and especially temporary data duplication is acceptable because the deduplicate occurs in the background asynchronously or manually triggered by OPTIMIZE ... SYNC. Furthermore, the deduplication can only handle the data in the same instance and the same partition, which means it does not perform well on Distributed table which is sharding by rand()
The text was updated successfully, but these errors were encountered:
pan3793
changed the title
Partition level exactly once semantic for writing
Partition level exactly-once semantic for writing
Jun 20, 2022
Background
In Spark and Hadoop ecosystem, we usually assume that every single node is unstable, and treat a part of nodes' failures as the norm, but it does not suitable for writing data to a third-party database that does not support distributed transactions and deduplicated mechanism.
To achieve the exactly-once semantic for writing, we must disable the following Spark configurations to avoid data duplication in writing.
In a preemption-enabled YARN cluster, one container is killed would cause the whole ClickHouse writing job failure.
Proposal
Briefly, since ClickHouse supports MOVE PARTITION TO TABLE, we can write data to temporary tables instead of writing data to the target table directly, and move the data to the target table iff all partition write success.
Because of the retry and speculation mechanism, one partition of DataFrame/RDD may be written multiple times by different tasks, a stage success means each partition of DataFrame/RDD has been written success at least one time, and the driver finally knows that information, then it can choose one of each partition's success data to move to the target table.
Because the task can be identified by
partition_id
andattemp_id
, and the ClickHouse's MOVE PARTITION TO TABLE requires the following conditionsWe may need to create a temporary table using
CREATE TABLE LIKE
per task, w/ name{may_another_db}
.{original_table_name}_{application_id}_{stage_id}_{stage_attempt_id}_{task_id}_{task_attempt_id}
After the stage success, move the data to the target table and drop others.
The Spark DataSource V2 API
StagedTable
is helpful for implementation.For
Distributed
table which is sharding byrand()
, we may need some strategy on the Spark side e.g. calculatehash(*)
for every row as sharding keys to avoid writing data into different shards in task retry.For
Replicated
table, theCREATE TABLE LIKE
should also change the Zookeeper path.Limitations
The operation
MOVE PARTTION
on the commit phase of tables generated by success tasks is not atomic, we may still get dirty data if something goes wrong in that phase, especially forReplicated
tables.Alternatives
ReplacingMergeTree
instead ofMergeTree
I'd like to say it's my first choice in the beginning I was learning to use ClickHouse. It's good for off-line batch load data to ClickHouse and especially temporary data duplication is acceptable because the deduplicate occurs in the background asynchronously or manually triggered by
OPTIMIZE ... SYNC
. Furthermore, the deduplication can only handle the data in the same instance and the same partition, which means it does not perform well onDistributed
table which is sharding byrand()
The text was updated successfully, but these errors were encountered: