Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6853bab
feat: Add existing parquet files
jonathanc-n Feb 9, 2025
c574c5f
Merge branch 'main' into add-existing-files
jonathanc-n Feb 9, 2025
3f258ad
clippy fix
jonathanc-n Feb 9, 2025
39ce23d
Merge branch 'add-existing-files' of https://github.com/jonathanc-n/i…
jonathanc-n Feb 9, 2025
dd4abb7
Merge branch 'main' into add-existing-files
jonathanc-n Feb 10, 2025
65abfc3
Merge branch 'main' into add-existing-files
jonathanc-n Feb 12, 2025
4cb5b1e
change data file builder
jonathanc-n Feb 12, 2025
909d098
Merge branch 'add-existing-files' of https://github.com/jonathanc-n/i…
jonathanc-n Feb 12, 2025
e1dd355
fmt fix
jonathanc-n Feb 12, 2025
8756a71
clippy fix
jonathanc-n Feb 12, 2025
afbc642
Merge branch 'main' into add-existing-files
jonathanc-n Feb 12, 2025
a9c6b94
switch to unpartitioned
jonathanc-n Feb 13, 2025
e01ead8
code organization fixes
jonathanc-n Feb 18, 2025
96aabfe
Merge branch 'main' into add-existing-files
jonathanc-n Feb 18, 2025
b38496e
Update crates/iceberg/src/transaction.rs
jonathanc-n Feb 18, 2025
267124d
Merge branch 'main' into add-existing-files
jonathanc-n Feb 18, 2025
33cac26
Merge branch 'main' into add-existing-files
jonathanc-n Feb 21, 2025
86ea8eb
fixes
jonathanc-n Feb 21, 2025
c0cfb56
clippy
jonathanc-n Feb 21, 2025
175d4ce
Merge branch 'main' into add-existing-files
jonathanc-n Feb 24, 2025
0c7caaa
Merge branch 'main' into add-existing-files
jonathanc-n Feb 25, 2025
6f0bc0b
fixes
jonathanc-n Feb 28, 2025
d6cf198
Update crates/iceberg/src/writer/file_writer/parquet_writer.rs
jonathanc-n Mar 3, 2025
2665ebc
Update crates/iceberg/src/writer/file_writer/parquet_writer.rs
jonathanc-n Mar 3, 2025
d90871a
Update crates/iceberg/src/transaction.rs
jonathanc-n Mar 3, 2025
0b5d78c
Merge branch 'main' into add-existing-files
jonathanc-n Mar 3, 2025
43a6c85
clippy fix
jonathanc-n Mar 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,14 +1129,14 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
/// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer.
/// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`].
/// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`].
struct ArrowFileReader<R: FileRead> {
pub struct ArrowFileReader<R: FileRead> {
meta: FileMetadata,
r: R,
}

impl<R: FileRead> ArrowFileReader<R> {
/// Create a new ArrowFileReader
fn new(meta: FileMetadata, r: R) -> Self {
pub fn new(meta: FileMetadata, r: R) -> Self {
Self { meta, r }
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,7 @@ pub mod tests {
use crate::TableIdent;

pub struct TableTestFixture {
table_location: String,
pub table_location: String,
pub table: Table,
}

Expand Down
23 changes: 23 additions & 0 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::ops::Index;
use std::sync::{Arc, OnceLock};

use ::serde::de::{MapAccess, Visitor};
use ordered_float::OrderedFloat;
use serde::de::{Error, IntoDeserializer};
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use serde_json::Value as JsonValue;
Expand Down Expand Up @@ -266,6 +267,28 @@ impl PrimitiveType {
| (PrimitiveType::Binary, PrimitiveLiteral::Binary(_))
)
}

/// Used to convert `PrimitiveType` to `Literal`.
pub fn type_to_literal(&self) -> Literal {
Literal::Primitive(match self {
PrimitiveType::Boolean => PrimitiveLiteral::Boolean(false),
PrimitiveType::Int => PrimitiveLiteral::Int(0),
PrimitiveType::Long => PrimitiveLiteral::Long(0),
PrimitiveType::Float => PrimitiveLiteral::Float(OrderedFloat(0.0)),
PrimitiveType::Double => PrimitiveLiteral::Double(OrderedFloat(0.0)),
PrimitiveType::Decimal { .. } => PrimitiveLiteral::Int128(0),
PrimitiveType::Date => PrimitiveLiteral::Int(0),
PrimitiveType::Time => PrimitiveLiteral::Long(0),
PrimitiveType::Timestamp => PrimitiveLiteral::Long(0),
PrimitiveType::Timestamptz => PrimitiveLiteral::Long(0),
PrimitiveType::TimestampNs => PrimitiveLiteral::Long(0),
PrimitiveType::TimestamptzNs => PrimitiveLiteral::Long(0),
PrimitiveType::String => PrimitiveLiteral::String(String::new()),
PrimitiveType::Uuid => PrimitiveLiteral::UInt128(0),
PrimitiveType::Fixed(_size) => PrimitiveLiteral::Binary(Vec::new()),
PrimitiveType::Binary => PrimitiveLiteral::Binary(Vec::new()),
})
}
}

impl Serialize for Type {
Expand Down
260 changes: 255 additions & 5 deletions crates/iceberg/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,27 @@
//! This module contains transaction api.

use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::mem::discriminant;
use std::ops::RangeFrom;
use std::sync::Arc;

use parquet::arrow::async_reader::AsyncFileReader;
use parquet::file::metadata::ParquetMetaData;
use uuid::Uuid;

use crate::arrow::ArrowFileReader;
use crate::error::Result;
use crate::io::OutputFile;
use crate::io::{FileIO, OutputFile};
use crate::spec::{
DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter,
ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference, SnapshotRetention,
SortDirection, SortField, SortOrder, Struct, StructType, Summary, Transform, MAIN_BRANCH,
visit_schema, DataContentType, DataFile, DataFileBuilder, DataFileFormat, FormatVersion,
ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriterBuilder, NullOrder, Operation,
SchemaRef, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder,
Struct, StructType, Summary, TableMetadata, Transform, MAIN_BRANCH,
};
use crate::table::Table;
use crate::writer::file_writer::parquet_writer::{IndexByParquetPathName, MinMaxColAggregator};
use crate::TableUpdate::UpgradeFormatVersion;
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};

Expand Down Expand Up @@ -169,6 +175,172 @@ impl<'a> Transaction<'a> {

catalog.update_table(table_commit).await
}

/// Adds existing parquet files
pub async fn add_parquet_files(
Comment thread
jonathanc-n marked this conversation as resolved.
Outdated
self,
file_paths: Vec<String>,
check_duplicate_files: bool,
Comment thread
jonathanc-n marked this conversation as resolved.
Outdated
) -> Result<Transaction<'a>> {
if check_duplicate_files {
Comment thread
jonathanc-n marked this conversation as resolved.
Outdated
let unique_paths: HashSet<_> = file_paths.iter().collect();
if unique_paths.len() != file_paths.len() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Duplicate file paths provided",
));
}
}
let table_metadata = self.table.metadata();

let data_files = Transaction::parquet_files_to_data_files(
&self,
self.table.file_io(),
file_paths,
table_metadata,
)
.await?;

let mut fast_append_action = self.fast_append(Some(Uuid::new_v4()), Vec::new())?;
fast_append_action.add_data_files(data_files)?;

fast_append_action.apply().await
}

async fn parquet_files_to_data_files(
Comment thread
jonathanc-n marked this conversation as resolved.
Outdated
&self,
file_io: &FileIO,
file_paths: Vec<String>,
table_metadata: &TableMetadata,
) -> Result<Vec<DataFile>> {
let mut data_files: Vec<DataFile> = Vec::new();
let partition_value =
self.create_default_partition_value(&table_metadata.default_partition_type)?;

for file_path in file_paths {
let input_file = file_io.new_input(&file_path)?;
if !input_file.exists().await? {
return Err(Error::new(
ErrorKind::DataInvalid,
"File does not exist".to_string(),
));
}
Comment thread
jonathanc-n marked this conversation as resolved.
Outdated
let file_metadata = input_file.metadata().await?;
let file_size_in_bytes = file_metadata.size as usize;
let reader = input_file.reader().await?;

let mut parquet_reader = ArrowFileReader::new(file_metadata, reader);
let parquet_metadata = parquet_reader.get_metadata().await.map_err(|err| {
Error::new(
ErrorKind::DataInvalid,
format!("Error reading Parquet metadata: {}", err),
)
})?;
let builder = self.parquet_to_data_file_builder(
table_metadata.current_schema().clone(),
parquet_metadata,
&partition_value,
file_size_in_bytes,
file_path,
)?;
let data_file = builder.build().unwrap();
data_files.push(data_file);
}
Ok(data_files)
}

/// `ParquetMetadata` to data file builder
pub fn parquet_to_data_file_builder(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some suggestion for this method:

  1. It has a lot of duplicated with parquet file writer, we should reuse them.
  2. This method should not be part of Transaction, it should be parquet module.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i mentioned the problem earlier with reusing the function here. Writer returns raw metadata which is what the original ParquetWriter::to_data_file_builder. In this case we are doing a read with the arrowfilereader which can only return the parsed metadata. This can be fixed if there is some conversion function to convert raw to parsed but I haven't seemed to be able to find one 🤔

&self,
schema: SchemaRef,
metadata: Arc<ParquetMetaData>,
partition: &Struct,
written_size: usize,
file_path: String,
) -> Result<DataFileBuilder> {
let index_by_parquet_path = {
let mut visitor = IndexByParquetPathName::new();
visit_schema(&schema, &mut visitor)?;
visitor
};

let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = {
let mut per_col_size: HashMap<i32, u64> = HashMap::new();
let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
let mut min_max_agg = MinMaxColAggregator::new(schema);

for row_group in metadata.row_groups() {
for column_chunk_metadata in row_group.columns() {
let parquet_path = column_chunk_metadata.column_descr().path().string();

let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else {
continue;
};

*per_col_size.entry(field_id).or_insert(0) +=
column_chunk_metadata.compressed_size() as u64;
*per_col_val_num.entry(field_id).or_insert(0) +=
column_chunk_metadata.num_values() as u64;

if let Some(statistics) = column_chunk_metadata.statistics() {
if let Some(null_count) = statistics.null_count_opt() {
*per_col_null_val_num.entry(field_id).or_insert(0) += null_count;
}

min_max_agg.update(field_id, statistics.clone())?;
}
}
}
(
per_col_size,
per_col_val_num,
per_col_null_val_num,
min_max_agg.produce(),
)
};

let mut builder = DataFileBuilder::default();
builder
.content(DataContentType::Data)
.file_path(file_path)
.file_format(DataFileFormat::Parquet)
.partition(partition.clone())
.record_count(metadata.file_metadata().num_rows() as u64)
.file_size_in_bytes(written_size as u64)
.column_sizes(column_sizes)
.value_counts(value_counts)
.null_value_counts(null_value_counts)
.lower_bounds(lower_bounds)
.upper_bounds(upper_bounds)
.split_offsets(
metadata
.row_groups()
.iter()
.filter_map(|group| group.file_offset())
.collect(),
);

Ok(builder)
}

fn create_default_partition_value(&self, partition_type: &StructType) -> Result<Struct> {
let literals = partition_type
.fields()
.iter()
.map(|field| {
let primitive_type = field.field_type.as_primitive_type().ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"Partition field should only be a primitive type.",
)
})?;
Ok(Some(primitive_type.type_to_literal()))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the type_to_literal function, I think this is correct. We don't want to set default values in the partition spec. I think there are two options:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this makes sense, we can go with first option and add a todo on the check if partition exist.

})
.collect::<Result<Vec<_>>>()?;

Ok(Struct::from_iter(literals))
}
}

/// FastAppendAction is a transaction action for fast append data files to the table.
Expand Down Expand Up @@ -607,6 +779,7 @@ mod tests {
use std::io::BufReader;

use crate::io::FileIOBuilder;
use crate::scan::tests::TableTestFixture;
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Struct,
TableMetadata,
Expand Down Expand Up @@ -847,6 +1020,7 @@ mod tests {
.sequence_number()
.expect("Inherit sequence number by load manifest")
);

assert_eq!(
new_snapshot.snapshot_id(),
manifest.entries()[0].snapshot_id().unwrap()
Expand All @@ -869,4 +1043,80 @@ mod tests {
"Should not allow to do same kinds update in same transaction"
);
}

#[tokio::test]
async fn test_add_existing_parquet_files() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let tx = crate::transaction::Transaction::new(&fixture.table);

let file_paths = vec![
format!("{}/1.parquet", &fixture.table_location),
format!("{}/2.parquet", &fixture.table_location),
format!("{}/3.parquet", &fixture.table_location),
];

// attempt to add the existing Parquet files with fast append
let new_tx = tx
.add_parquet_files(file_paths.clone(), true)
.await
.expect("Adding existing Parquet files should succeed");

let mut found_add_snapshot = false;
let mut found_set_snapshot_ref = false;
for update in new_tx.updates.iter() {
match update {
TableUpdate::AddSnapshot { .. } => {
found_add_snapshot = true;
}
TableUpdate::SetSnapshotRef {
ref_name,
reference,
} => {
found_set_snapshot_ref = true;
assert_eq!(ref_name, crate::transaction::MAIN_BRANCH);
assert!(reference.snapshot_id > 0);
}
_ => {}
}
}
assert!(found_add_snapshot);
assert!(found_set_snapshot_ref);

let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &new_tx.updates[0] {
snapshot
} else {
panic!("Expected the first update to be an AddSnapshot update");
};

let manifest_list = new_snapshot
.load_manifest_list(fixture.table.file_io(), fixture.table.metadata())
.await
.expect("Failed to load manifest list");

assert_eq!(manifest_list.entries().len(), 2);

// Load the manifest from the manifest list
let manifest = manifest_list.entries()[0]
.load_manifest(fixture.table.file_io())
.await
.expect("Failed to load manifest");

// Since we added three files with add_parquet_files, check that the manifest contains three entries
assert_eq!(manifest.entries().len(), 3);

// Verify each file path appears in manifest
let manifest_paths: Vec<String> = manifest
.entries()
.iter()
.map(|entry| entry.data_file().file_path.clone())
.collect();
for path in file_paths {
assert!(
manifest_paths.contains(&path),
"Manifest does not contain expected file path: {}",
path
);
}
}
}
2 changes: 1 addition & 1 deletion crates/iceberg/src/writer/file_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use super::CurrentFileStatus;
use crate::spec::DataFileBuilder;
use crate::Result;

mod parquet_writer;
pub mod parquet_writer;
Comment thread
jonathanc-n marked this conversation as resolved.
Outdated
pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};
mod track_writer;

Expand Down
Loading