Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/datachain/query/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,7 @@ def cleanup(self) -> None:
# This is needed to always use a new connection with all metastore and warehouse
# implementations, as errors may close or render unusable the existing
# connections.
assert len(self.temp_table_names) == len(set(self.temp_table_names))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C: trying to add this instead of tests (we should not be getting duplicates)

In tests we have additional check that no tables left behind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @dreadatour not sure if it makes to add a specific complicated test ... this should be working better

with self.catalog.metastore.clone(use_new_connection=True) as metastore:
metastore.cleanup_tables(self.temp_table_names)
with self.catalog.warehouse.clone(use_new_connection=True) as warehouse:
Expand Down Expand Up @@ -1531,6 +1532,7 @@ def clone(self, new_table=True) -> "Self":
obj.steps = obj.steps.copy()
if new_table:
obj.table = self.get_table()
obj.temp_table_names = []
return obj

@detach
Expand Down Expand Up @@ -1870,11 +1872,11 @@ def _add_dependencies(self, dataset: "DatasetRecord", version: str):

def exec(self) -> "Self":
"""Execute the query."""
query = self.clone()
try:
query = self.clone()
query.apply_steps()
finally:
self.cleanup()
query.cleanup()
return query

def save(
Expand Down