@@ -156,123 +156,6 @@ class TestCOWDataSource extends HoodieClientTestBase {
156
156
assertEquals(snapshotDF1.count() - inputDF2.count(), snapshotDF2.count())
157
157
}
158
158
159
-
160
- @ ParameterizedTest
161
- @ ValueSource (booleans = Array (true , false ))
162
- def testCopyOnWriteStorage (isMetadataEnabled : Boolean ) {
163
- // Insert Operation
164
- val records1 = recordsToStrings(dataGen.generateInserts(" 000" , 100 )).toList
165
- val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2 ))
166
- inputDF1.write.format(" org.apache.hudi" )
167
- .options(commonOpts)
168
- .option(DataSourceWriteOptions .OPERATION .key, DataSourceWriteOptions .INSERT_OPERATION_OPT_VAL )
169
- .option(HoodieMetadataConfig .ENABLE .key, isMetadataEnabled)
170
- .mode(SaveMode .Overwrite )
171
- .save(basePath)
172
-
173
- assertTrue(HoodieDataSourceHelpers .hasNewCommits(fs, basePath, " 000" ))
174
- val commitInstantTime1 = HoodieDataSourceHelpers .latestCommit(fs, basePath)
175
-
176
- // Snapshot query
177
- val snapshotDF1 = spark.read.format(" org.apache.hudi" )
178
- .option(HoodieMetadataConfig .ENABLE .key, isMetadataEnabled)
179
- .load(basePath + " /*/*/*" )
180
- assertEquals(100 , snapshotDF1.count())
181
-
182
- // Upsert based on the written table with Hudi metadata columns
183
- val verificationRowKey = snapshotDF1.limit(1 ).select(" _row_key" ).first.getString(0 )
184
- val updateDf = snapshotDF1.filter(col(" _row_key" ) === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
185
-
186
- updateDf.write.format(" org.apache.hudi" )
187
- .options(commonOpts)
188
- .option(HoodieMetadataConfig .ENABLE .key, isMetadataEnabled)
189
- .mode(SaveMode .Append )
190
- .save(basePath)
191
- val commitInstantTime2 = HoodieDataSourceHelpers .latestCommit(fs, basePath)
192
-
193
- val snapshotDF2 = spark.read.format(" hudi" )
194
- .option(HoodieMetadataConfig .ENABLE .key, isMetadataEnabled)
195
- .load(basePath + " /*/*/*" )
196
- assertEquals(100 , snapshotDF2.count())
197
- assertEquals(updatedVerificationVal, snapshotDF2.filter(col(" _row_key" ) === verificationRowKey).select(verificationCol).first.getString(0 ))
198
-
199
- // Upsert Operation without Hudi metadata columns
200
- val records2 = recordsToStrings(dataGen.generateUpdates(" 001" , 100 )).toList
201
- val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2 , 2 ))
202
- val uniqueKeyCnt = inputDF2.select(" _row_key" ).distinct().count()
203
-
204
- inputDF2.write.format(" org.apache.hudi" )
205
- .options(commonOpts)
206
- .option(HoodieMetadataConfig .ENABLE .key, isMetadataEnabled)
207
- .mode(SaveMode .Append )
208
- .save(basePath)
209
-
210
- val commitInstantTime3 = HoodieDataSourceHelpers .latestCommit(fs, basePath)
211
- assertEquals(3 , HoodieDataSourceHelpers .listCommitsSince(fs, basePath, " 000" ).size())
212
-
213
- // Snapshot Query
214
- val snapshotDF3 = spark.read.format(" org.apache.hudi" )
215
- .option(HoodieMetadataConfig .ENABLE .key, isMetadataEnabled)
216
- .load(basePath + " /*/*/*" )
217
- assertEquals(100 , snapshotDF3.count()) // still 100, since we only updated
218
-
219
- // Read Incremental Query
220
- // we have 2 commits, try pulling the first commit (which is not the latest)
221
- val firstCommit = HoodieDataSourceHelpers .listCommitsSince(fs, basePath, " 000" ).get(0 )
222
- val hoodieIncViewDF1 = spark.read.format(" org.apache.hudi" )
223
- .option(DataSourceReadOptions .QUERY_TYPE .key, DataSourceReadOptions .QUERY_TYPE_INCREMENTAL_OPT_VAL )
224
- .option(DataSourceReadOptions .BEGIN_INSTANTTIME .key, " 000" )
225
- .option(DataSourceReadOptions .END_INSTANTTIME .key, firstCommit)
226
- .load(basePath)
227
- assertEquals(100 , hoodieIncViewDF1.count()) // 100 initial inserts must be pulled
228
- var countsPerCommit = hoodieIncViewDF1.groupBy(" _hoodie_commit_time" ).count().collect()
229
- assertEquals(1 , countsPerCommit.length)
230
- assertEquals(firstCommit, countsPerCommit(0 ).get(0 ))
231
-
232
- // Test incremental query has no instant in range
233
- val emptyIncDF = spark.read.format(" org.apache.hudi" )
234
- .option(DataSourceReadOptions .QUERY_TYPE .key, DataSourceReadOptions .QUERY_TYPE_INCREMENTAL_OPT_VAL )
235
- .option(DataSourceReadOptions .BEGIN_INSTANTTIME .key, " 000" )
236
- .option(DataSourceReadOptions .END_INSTANTTIME .key, " 001" )
237
- .load(basePath)
238
- assertEquals(0 , emptyIncDF.count())
239
-
240
- // Upsert an empty dataFrame
241
- val emptyRecords = recordsToStrings(dataGen.generateUpdates(" 002" , 0 )).toList
242
- val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1 ))
243
- emptyDF.write.format(" org.apache.hudi" )
244
- .options(commonOpts)
245
- .option(HoodieMetadataConfig .ENABLE .key, isMetadataEnabled)
246
- .mode(SaveMode .Append )
247
- .save(basePath)
248
-
249
- // pull the latest commit
250
- val hoodieIncViewDF2 = spark.read.format(" org.apache.hudi" )
251
- .option(DataSourceReadOptions .QUERY_TYPE .key, DataSourceReadOptions .QUERY_TYPE_INCREMENTAL_OPT_VAL )
252
- .option(DataSourceReadOptions .BEGIN_INSTANTTIME .key, commitInstantTime2)
253
- .load(basePath)
254
-
255
- assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
256
- countsPerCommit = hoodieIncViewDF2.groupBy(" _hoodie_commit_time" ).count().collect()
257
- assertEquals(1 , countsPerCommit.length)
258
- assertEquals(commitInstantTime3, countsPerCommit(0 ).get(0 ))
259
-
260
- // pull the latest commit within certain partitions
261
- val hoodieIncViewDF3 = spark.read.format(" org.apache.hudi" )
262
- .option(DataSourceReadOptions .QUERY_TYPE .key, DataSourceReadOptions .QUERY_TYPE_INCREMENTAL_OPT_VAL )
263
- .option(DataSourceReadOptions .BEGIN_INSTANTTIME .key, commitInstantTime2)
264
- .option(DataSourceReadOptions .INCR_PATH_GLOB .key, " /2016/*/*/*" )
265
- .load(basePath)
266
- assertEquals(hoodieIncViewDF2.filter(col(" _hoodie_partition_path" ).contains(" 2016" )).count(), hoodieIncViewDF3.count())
267
-
268
- val timeTravelDF = spark.read.format(" org.apache.hudi" )
269
- .option(DataSourceReadOptions .QUERY_TYPE .key, DataSourceReadOptions .QUERY_TYPE_INCREMENTAL_OPT_VAL )
270
- .option(DataSourceReadOptions .BEGIN_INSTANTTIME .key, " 000" )
271
- .option(DataSourceReadOptions .END_INSTANTTIME .key, firstCommit)
272
- .load(basePath)
273
- assertEquals(100 , timeTravelDF.count()) // 100 initial inserts must be pulled
274
- }
275
-
276
159
@ Test def testOverWriteModeUseReplaceAction (): Unit = {
277
160
val records1 = recordsToStrings(dataGen.generateInserts(" 001" , 5 )).toList
278
161
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2 ))
0 commit comments