diff --git a/src/datachain/query/dataset.py b/src/datachain/query/dataset.py index 8e5251be0..caddcbe41 100644 --- a/src/datachain/query/dataset.py +++ b/src/datachain/query/dataset.py @@ -1438,6 +1438,7 @@ def cleanup(self) -> None: # This is needed to always use a new connection with all metastore and warehouse # implementations, as errors may close or render unusable the existing # connections. + assert len(self.temp_table_names) == len(set(self.temp_table_names)) with self.catalog.metastore.clone(use_new_connection=True) as metastore: metastore.cleanup_tables(self.temp_table_names) with self.catalog.warehouse.clone(use_new_connection=True) as warehouse: @@ -1531,6 +1532,7 @@ def clone(self, new_table=True) -> "Self": obj.steps = obj.steps.copy() if new_table: obj.table = self.get_table() + obj.temp_table_names = [] return obj @detach @@ -1870,11 +1872,11 @@ def _add_dependencies(self, dataset: "DatasetRecord", version: str): def exec(self) -> "Self": """Execute the query.""" + query = self.clone() try: - query = self.clone() query.apply_steps() finally: - self.cleanup() + query.cleanup() return query def save(