Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public class DeltaLakeMetadata
private final boolean hideNonDeltaLakeTables;
private final boolean unsafeWritesEnabled;
private final JsonCodec<DataFileInfo> dataFileInfoCodec;
private final JsonCodec<DeltaLakeUpdateResult> deleteResultJsonCodec;
private final JsonCodec<DeltaLakeUpdateResult> updateResultJsonCodec;
private final TransactionLogWriterFactory transactionLogWriterFactory;
private final String nodeVersion;
private final String nodeId;
Expand All @@ -247,7 +247,7 @@ public DeltaLakeMetadata(
boolean hideNonDeltaLakeTables,
boolean unsafeWritesEnabled,
JsonCodec<DataFileInfo> dataFileInfoCodec,
JsonCodec<DeltaLakeUpdateResult> deleteResultJsonCodec,
JsonCodec<DeltaLakeUpdateResult> updateResultJsonCodec,
TransactionLogWriterFactory transactionLogWriterFactory,
NodeManager nodeManager,
CheckpointWriterManager checkpointWriterManager,
Expand All @@ -263,7 +263,7 @@ public DeltaLakeMetadata(
this.hideNonDeltaLakeTables = hideNonDeltaLakeTables;
this.unsafeWritesEnabled = unsafeWritesEnabled;
this.dataFileInfoCodec = requireNonNull(dataFileInfoCodec, "dataFileInfoCodec is null");
this.deleteResultJsonCodec = requireNonNull(deleteResultJsonCodec, "deleteResultJsonCodec is null");
this.updateResultJsonCodec = requireNonNull(updateResultJsonCodec, "updateResultJsonCodec is null");
this.transactionLogWriterFactory = requireNonNull(transactionLogWriterFactory, "transactionLogWriterFactory is null");
this.nodeVersion = nodeManager.getCurrentNode().getVersion();
this.nodeId = nodeManager.getCurrentNode().getNodeIdentifier();
Expand Down Expand Up @@ -1369,7 +1369,7 @@ private void finishWrite(ConnectorSession session, ConnectorTableHandle tableHan

List<DeltaLakeUpdateResult> updateResults = fragments.stream()
.map(Slice::getBytes)
.map(deleteResultJsonCodec::fromJson)
.map(updateResultJsonCodec::fromJson)
.collect(toImmutableList());

String tableLocation = metastore.getTableLocation(handle.getSchemaTableName(), session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class DeltaLakeMetadataFactory
private final TransactionLogAccess transactionLogAccess;
private final TypeManager typeManager;
private final JsonCodec<DataFileInfo> dataFileInfoCodec;
private final JsonCodec<DeltaLakeUpdateResult> deleteResultJsonCodec;
private final JsonCodec<DeltaLakeUpdateResult> updateResultJsonCodec;
private final TransactionLogWriterFactory transactionLogWriterFactory;
private final NodeManager nodeManager;
private final CheckpointWriterManager checkpointWriterManager;
Expand All @@ -63,7 +63,7 @@ public DeltaLakeMetadataFactory(
DeltaLakeConfig deltaLakeConfig,
@HideNonDeltaLakeTables boolean hideNonDeltaLakeTables,
JsonCodec<DataFileInfo> dataFileInfoCodec,
JsonCodec<DeltaLakeUpdateResult> deleteResultJsonCodec,
JsonCodec<DeltaLakeUpdateResult> updateResultJsonCodec,
TransactionLogWriterFactory transactionLogWriterFactory,
NodeManager nodeManager,
CheckpointWriterManager checkpointWriterManager,
Expand All @@ -75,7 +75,7 @@ public DeltaLakeMetadataFactory(
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.dataFileInfoCodec = requireNonNull(dataFileInfoCodec, "dataFileInfoCodec is null");
this.deleteResultJsonCodec = requireNonNull(deleteResultJsonCodec, "deleteResultJsonCodec is null");
this.updateResultJsonCodec = requireNonNull(updateResultJsonCodec, "updateResultJsonCodec is null");
this.transactionLogWriterFactory = requireNonNull(transactionLogWriterFactory, "transactionLogWriterFactory is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.checkpointWriterManager = requireNonNull(checkpointWriterManager, "checkpointWriterManager is null");
Expand Down Expand Up @@ -109,7 +109,7 @@ public DeltaLakeMetadata create(ConnectorIdentity identity)
hideNonDeltaLakeTables,
unsafeWritesEnabled,
dataFileInfoCodec,
deleteResultJsonCodec,
updateResultJsonCodec,
transactionLogWriterFactory,
nodeManager,
checkpointWriterManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public void setup(Binder binder)
Multibinder<SystemTableProvider> systemTableProviders = newSetBinder(binder, SystemTableProvider.class);
systemTableProviders.addBinding().to(PropertiesSystemTableProvider.class).in(Scopes.SINGLETON);

binder.bind(DeltaLakeSessionProperties.class).in(Scopes.SINGLETON);
binder.bind(DeltaLakeTableProperties.class).in(Scopes.SINGLETON);
binder.bind(DeltaLakeAnalyzeProperties.class).in(Scopes.SINGLETON);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ public DeltaLakePageSinkProvider(
HdfsEnvironment hdfsEnvironment,
JsonCodec<DataFileInfo> dataFileInfoCodec,
DeltaLakeWriterStats stats,
DeltaLakeConfig deltalakeConfig,
DeltaLakeConfig deltaLakeConfig,
TypeManager typeManager,
NodeVersion nodeVersion)
{
this.pageIndexerFactory = pageIndexerFactory;
this.hdfsEnvironment = hdfsEnvironment;
this.dataFileInfoCodec = dataFileInfoCodec;
this.stats = stats;
this.maxPartitionsPerWriter = deltalakeConfig.getMaxPartitionsPerWriter();
this.maxPartitionsPerWriter = deltaLakeConfig.getMaxPartitionsPerWriter();
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ public TransactionLogAccess(
FileFormatDataSourceStats fileFormatDataSourceStats,
HdfsEnvironment hdfsEnvironment,
ParquetReaderConfig parquetReaderConfig,
DeltaLakeConfig deltalakeConfig)
DeltaLakeConfig deltaLakeConfig)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.checkpointSchemaManager = requireNonNull(checkpointSchemaManager, "checkpointSchemaManager is null");
this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.parquetReaderOptions = requireNonNull(parquetReaderConfig, "parquetReaderConfig is null").toParquetReaderOptions();
requireNonNull(deltalakeConfig, "deltalakeConfig is null");
this.checkpointRowStatisticsWritingEnabled = deltalakeConfig.isCheckpointRowStatisticsWritingEnabled();
requireNonNull(deltaLakeConfig, "deltaLakeConfig is null");
this.checkpointRowStatisticsWritingEnabled = deltaLakeConfig.isCheckpointRowStatisticsWritingEnabled();

tableSnapshots = EvictableCacheBuilder.newBuilder()
.expireAfterWrite(config.getMetadataCacheTtl().toMillis(), TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ public void testInitialSplits()
{
long fileSize = 20_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(fileSize));
DeltaLakeConfig deltalakeConfig = new DeltaLakeConfig()
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxInitialSplits(1000)
.setMaxInitialSplitSize(DataSize.ofBytes(5_000));

DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltalakeConfig);
List<DeltaLakeSplit> splits = getSplits(splitManager, deltalakeConfig);
DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig);
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);

List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 5_000, fileSize),
Expand All @@ -107,13 +107,13 @@ public void testNonInitialSplits()
{
long fileSize = 50_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(fileSize));
DeltaLakeConfig deltalakeConfig = new DeltaLakeConfig()
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxInitialSplits(5)
.setMaxInitialSplitSize(DataSize.ofBytes(5_000))
.setMaxSplitSize(DataSize.ofBytes(20_000));

DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltalakeConfig);
List<DeltaLakeSplit> splits = getSplits(splitManager, deltalakeConfig);
DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig);
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);

List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 5_000, fileSize),
Expand All @@ -134,14 +134,14 @@ public void testSplitsFromMultipleFiles()
long firstFileSize = 1_000;
long secondFileSize = 20_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(firstFileSize), addFileEntryOfSize(secondFileSize));
DeltaLakeConfig deltalakeConfig = new DeltaLakeConfig()
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxInitialSplits(3)
.setMaxInitialSplitSize(DataSize.ofBytes(2_000))
.setMaxSplitSize(DataSize.ofBytes(10_000));

DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltalakeConfig);
DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig);

List<DeltaLakeSplit> splits = getSplits(splitManager, deltalakeConfig);
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);
List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 1_000, firstFileSize),
makeSplit(0, 2_000, secondFileSize),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,13 @@ public static Connector createConnector(String catalogName, Map<String, String>
return createConnector(catalogName, config, context, module, Optional.empty(), Optional.empty());
}

public static Connector createConnector(String catalogName, Map<String, String> config, ConnectorContext context, Module module, Optional<HiveMetastore> metastore, Optional<CachingDirectoryLister> cachingDirectoryLister)
public static Connector createConnector(
String catalogName,
Map<String, String> config,
ConnectorContext context,
Module module,
Optional<HiveMetastore> metastore,
Optional<CachingDirectoryLister> cachingDirectoryLister)
{
requireNonNull(config, "config is null");

Expand Down