diff --git a/src/common/storage/src/parquet_rs.rs b/src/common/storage/src/parquet_rs.rs index 3860d91af5bd5..ec728dc06a090 100644 --- a/src/common/storage/src/parquet_rs.rs +++ b/src/common/storage/src/parquet_rs.rs @@ -93,6 +93,7 @@ pub async fn read_metadata_async( None => operator.stat(path).await?.content_length(), Some(n) => n, }; + check_footer_size(file_size)?; // read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index 6511019d51fc6..047b1097548bf 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -159,6 +159,7 @@ impl FuseTable { None, ) .await; + if need_to_save_statistics { let table_statistics_location: String = table_statistics_location.unwrap(); match &res { @@ -166,13 +167,10 @@ impl FuseTable { table_statistics_location, Arc::new(table_statistics.unwrap()), ), - Err(e) => { - if Self::no_side_effects_in_meta_store(e) { - let _ = operator.delete(&table_statistics_location).await; - } - } + Err(e) => info!("update_table_meta failed. {}", e), } } + res } @@ -235,30 +233,13 @@ impl FuseTable { }; // 3. let's roll - let reply = catalog.update_table_meta(table_info, req).await; - match reply { - Ok(_) => { - TableSnapshot::cache().put(snapshot_location.clone(), Arc::new(snapshot)); - // try keep a hit file of last snapshot - Self::write_last_snapshot_hint(operator, location_generator, snapshot_location) - .await; - Ok(()) - } - Err(e) => { - // commit snapshot to meta server failed. - // figure out if the un-committed snapshot is safe to be removed. - if Self::no_side_effects_in_meta_store(&e) { - // currently, only in this case (TableVersionMismatched), we are SURE about - // that the table state insides meta store has NOT been changed. - info!( - "removing uncommitted table snapshot at location {}, of table {}, {}", - snapshot_location, table_info.desc, table_info.ident - ); - let _ = operator.delete(&snapshot_location).await; - } - Err(e) - } - } + catalog.update_table_meta(table_info, req).await?; + + // update_table_meta succeed, populate the snapshot cache item and try keeping a hit file of last snapshot + TableSnapshot::cache().put(snapshot_location.clone(), Arc::new(snapshot)); + Self::write_last_snapshot_hint(operator, location_generator, snapshot_location).await; + + Ok(()) } // Left a hint file which indicates the location of the latest snapshot diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 971bcfb3185f7..78e0b3951a469 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -506,10 +506,7 @@ where F: SnapshotGenerator + Send + 'static State::AbortOperation(e) => { let duration = self.start_time.elapsed(); metrics_inc_commit_aborts(); - // todo: use histogram when it ready metrics_inc_commit_milliseconds(duration.as_millis()); - let op = self.abort_operation.clone(); - op.abort(self.ctx.clone(), self.dal.clone()).await?; error!( "transaction aborted after {} retries, which took {} ms, cause: {:?}", self.retries,