Skip to content

Commit

Permalink
Bugfix/1937/do not write append ref keys when staging incompletes (#1985
Browse files Browse the repository at this point in the history
)

#### Reference Issues/PRs
Fixes #1937 

#### What does this implement or fix?
See ticket for details. Being able to create the append ref linked-list
structure is useful for testing, so moved this to the `LibraryTool`
  • Loading branch information
alexowens90 authored Nov 6, 2024
1 parent f24449b commit a3b7545
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 17 deletions.
12 changes: 11 additions & 1 deletion python/arcticdb/toolbox/library_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,14 @@ def update_append_data_column_type(self, key : VariantKey, column : str, to_type
old_df = self.read_to_dataframe(key)
assert column in old_df.columns
new_df = old_df.astype({column: to_type})
return self.overwrite_append_data_with_dataframe(key, new_df)
return self.overwrite_append_data_with_dataframe(key, new_df)

def append_incomplete(self, symbol: str, df: pd.DataFrame, validate_index: bool = False):
"""
Appends the given dataframe to the APPEND_DATA key linked list. Useful for testing, as staging segments through
either the V1 or V2 API only creates APPEND_DATA keys, not the APPEND_REF key or the linked-list structure that
# streaming data does.
"""
dynamic_strings = self._nvs._resolve_dynamic_strings({})
_, item, norm_meta = self._nvs._try_normalize(symbol, df, None, False, dynamic_strings, None)
self._nvs.version_store.append_incomplete(symbol, item, norm_meta, None, validate_index)
7 changes: 2 additions & 5 deletions python/arcticdb/version_store/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,12 +610,9 @@ def write(
)
# TODO: allow_sparse for write_parallel / recursive normalizers as well.
if isinstance(item, NPDDataFrame):
if parallel:
if parallel or incomplete:
self.version_store.write_parallel(symbol, item, norm_meta, validate_index, False, None)
return None
elif incomplete:
self.version_store.append_incomplete(symbol, item, norm_meta, udm, validate_index)
return None
else:
vit = self.version_store.write_versioned_dataframe(
symbol, item, norm_meta, udm, prune_previous_version, sparsify_floats, validate_index
Expand Down Expand Up @@ -741,7 +738,7 @@ def append(
if isinstance(item, NPDDataFrame):
with _diff_long_stream_descriptor_mismatch(self):
if incomplete:
self.version_store.append_incomplete(symbol, item, norm_meta, udm, validate_index)
self.version_store.write_parallel(symbol, item, norm_meta, validate_index, False, None)
else:
vit = self.version_store.append(
symbol, item, norm_meta, udm, write_if_missing, prune_previous_version, validate_index
Expand Down
3 changes: 2 additions & 1 deletion python/tests/hypothesis/arcticdb/test_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def test_append_partial_read(version_store_factory, colnum, periods, rownum, col
def test_incomplete_append_partial_read(version_store_factory, colnum, periods, rownum, cols, tsbounds, append_point):
tz = "America/New_York"
version_store = version_store_factory(col_per_group=colnum, row_per_segment=rownum)
lib_tool = version_store.library_tool()
dtidx = pd.date_range("2019-02-06 11:43", periods=6).tz_localize(tz)
a = np.arange(dtidx.shape[0])
tf = TimeFrame(dtidx.values, columns_names=["a", "b", "c"], columns_values=[a, a + a, a * 10])
Expand All @@ -86,7 +87,7 @@ def test_incomplete_append_partial_read(version_store_factory, colnum, periods,
sid = "XXX"
version_store.write(sid, tf1)
tf2 = tf.tsloc[c2:]
version_store.append(sid, tf2, incomplete=True)
lib_tool.append_incomplete(sid, tf2)

dtr = (dtidx[tsbounds[0]], dtidx[tsbounds[1]])
vit = version_store.read(sid, date_range=dtr, columns=list(cols), incomplete=True)
Expand Down
13 changes: 7 additions & 6 deletions python/tests/integration/toolbox/test_library_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def iterate_through_version_chain(key):

def test_overwrite_append_data(object_and_mem_and_lmdb_version_store):
lib = object_and_mem_and_lmdb_version_store
lib_tool = lib.library_tool()
if lib._lib_cfg.lib_desc.version.encoding_version == 1:
# TODO: Fix the timeseries descriptor packing. Currently the [incomplete_segment_from_frame] function in cpp is
# not encoding aware so all incomplete writes are broken with v2 encoding.
Expand All @@ -252,9 +253,9 @@ def get_df(num_rows, start_index, col_type):

# Deliberately write mismatching incomplete types
lib.write(sym, get_df(3, 0, np.int64))
lib.write(sym, get_df(1, 3, np.int64), incomplete=True)
lib.write(sym, get_df(1, 4, str), incomplete=True)
lib.write(sym, get_df(1, 5, np.int64), incomplete=True)
lib_tool.append_incomplete(sym, get_df(1, 3, np.int64))
lib_tool.append_incomplete(sym, get_df(1, 4, str))
lib_tool.append_incomplete(sym, get_df(1, 5, np.int64))

def read_append_data_keys_from_ref(symbol):
append_ref = lib_tool.find_keys_for_symbol(KeyType.APPEND_REF, symbol)[0]
Expand Down Expand Up @@ -321,9 +322,9 @@ def read_type(key, column):
assert_frame_equal(lib.read(sym).data, get_df(15, 0, np.int64))

# Also try adding new incompletes all with wrong type and see that we again can't read or compact
lib.write(sym, get_df(1, 15, str), incomplete=True)
lib.write(sym, get_df(1, 16, str), incomplete=True)
lib.write(sym, get_df(1, 17, str), incomplete=True)
lib_tool.append_incomplete(sym, get_df(1, 15, str))
lib_tool.append_incomplete(sym, get_df(1, 16, str))
lib_tool.append_incomplete(sym, get_df(1, 17, str))
append_keys = read_append_data_keys_from_ref(sym)
assert len(append_keys) == 3
assert [read_type(key, "col") for key in append_keys] == [str_dtype, str_dtype, str_dtype]
Expand Down
3 changes: 2 additions & 1 deletion python/tests/unit/arcticdb/version_store/test_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,10 +633,11 @@ def test_append_docs_example(lmdb_version_store):
def test_read_incomplete_no_warning(s3_store_factory, sym, get_stderr):
pytest.skip("This test is flaky due to trying to retrieve the log messages")
lib = s3_store_factory(dynamic_strings=True, incomplete=True)
lib_tool = lib.library_tool()
symbol = sym

write_df = pd.DataFrame({"a": [1, 2, 3]}, index=pd.DatetimeIndex([1, 2, 3]))
lib.append(symbol, write_df, incomplete=True)
lib_tool.append_incomplete(symbol, write_df)
# Need to compact so that the APPEND_REF points to a non-existent APPEND_DATA (intentionally)
lib.compact_incomplete(symbol, True, False, False, True)
set_log_level("DEBUG")
Expand Down
21 changes: 18 additions & 3 deletions python/tests/unit/arcticdb/version_store/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,29 @@
)
from arcticdb.util._versions import IS_PANDAS_TWO
from arcticdb_ext.storage import KeyType
import arcticdb.toolbox.library_tool


def get_append_keys(lib, sym):
lib_tool = lib.library_tool()
keys = lib_tool.find_keys_for_symbol(arcticdb.toolbox.library_tool.KeyType.APPEND_DATA, sym)
keys = lib_tool.find_keys_for_symbol(KeyType.APPEND_DATA, sym)
return keys


def test_staging_doesnt_write_append_ref(lmdb_version_store_v1):
lib = lmdb_version_store_v1
lib_tool = lib.library_tool()
sym = "test_staging_doesnt_write_append_ref"
df = pd.DataFrame({"col": [0]})
lib.write(sym, df, parallel=True)
assert not len(lib_tool.find_keys_for_symbol(KeyType.APPEND_REF, sym))
lib.version_store.clear()
lib.write(sym, df, incomplete=True)
assert not len(lib_tool.find_keys_for_symbol(KeyType.APPEND_REF, sym))
lib.version_store.clear()
lib.append(sym, df, incomplete=True)
assert not len(lib_tool.find_keys_for_symbol(KeyType.APPEND_REF, sym))


def test_remove_incomplete(basic_store):
lib = basic_store
lib_tool = lib.library_tool()
Expand Down Expand Up @@ -565,7 +581,6 @@ def test_parallel_no_column_slicing(lmdb_version_store_tiny_segment):
def test_parallel_write_static_schema_type_changing(lmdb_version_store_tiny_segment, rows_per_incomplete, delete_staged_data_on_failure):
lib = lmdb_version_store_tiny_segment
sym = "test_parallel_write_static_schema_type_changing"
lib_tool = lib.library_tool()
df_0 = pd.DataFrame({"col": np.arange(rows_per_incomplete, dtype=np.uint8)}, index=pd.date_range("2024-01-01", periods=rows_per_incomplete))
df_1 = pd.DataFrame({"col": np.arange(rows_per_incomplete, 2 * rows_per_incomplete, dtype=np.uint16)}, index=pd.date_range("2024-01-03", periods=rows_per_incomplete))
lib.write(sym, df_0, parallel=True)
Expand Down

0 comments on commit a3b7545

Please sign in to comment.