diff --git a/aerospike-stubs/aerospike.pyi b/aerospike-stubs/aerospike.pyi index 276ba9b145..01229c4b3b 100644 --- a/aerospike-stubs/aerospike.pyi +++ b/aerospike-stubs/aerospike.pyi @@ -1,4 +1,4 @@ -from typing import Any, Callable, Union, final, Literal, Optional, Final +from typing import Any, Callable, Union, final, Literal, Optional from aerospike_helpers.batch.records import BatchRecords from aerospike_helpers.metrics import MetricsPolicy @@ -463,3 +463,9 @@ def set_log_handler(callback: Callable = ...) -> None: ... def set_log_level(log_level: int) -> None: ... def set_serializer(callback: Callable) -> None: ... def unset_serializers() -> None: ... + +class PartitionStatus: + def __getitem__(self, attr): ... + +class PartitionsStatus: + def __getitem__(self, attr): ... diff --git a/src/include/conversions.h b/src/include/conversions.h index 0a54a87117..eb58305182 100644 --- a/src/include/conversions.h +++ b/src/include/conversions.h @@ -209,16 +209,11 @@ as_status convert_exp_list(AerospikeClient *self, PyObject *py_exp_list, as_status convert_partition_filter(AerospikeClient *self, PyObject *py_partition_filter, as_partition_filter *partition_filter, - as_partitions_status **ps, as_error *err); + as_error *err); as_status get_int_from_py_int(as_error *err, PyObject *py_long, int *int_pointer, const char *py_object_name); -as_status -as_partitions_status_to_pyobject(as_error *err, - const as_partitions_status *parts_status, - PyObject **py_dict); - as_status as_partition_status_to_pyobject( as_error *err, const as_partition_status *part_status, PyObject **py_tuple); diff --git a/src/include/partitions_status.h b/src/include/partitions_status.h new file mode 100644 index 0000000000..8560c2689e --- /dev/null +++ b/src/include/partitions_status.h @@ -0,0 +1,11 @@ +#include + +#include + +PyTypeObject *AerospikePartitionsStatusObject_Ready(); +PyTypeObject *AerospikePartitionStatusObject_Ready(); + +PyObject *create_py_partitions_status_object(as_error *err, + as_partitions_status *parts_all); + +extern PyTypeObject AerospikePartitionsStatusObject_Type; diff --git a/src/include/types.h b/src/include/types.h index 7f1176699c..124da69814 100644 --- a/src/include/types.h +++ b/src/include/types.h @@ -103,6 +103,14 @@ typedef struct { PyDictObject dict; } AerospikeKeyOrderedDict; +typedef struct { + PyObject_HEAD as_partitions_status *parts_all; +} AerospikePartitionsStatusObject; + +typedef struct { + PyObject_HEAD as_partition_status *part_status; +} AerospikePartitionStatusObject; + typedef struct { PyObject_HEAD /* Type-specific fields go here. */ diff --git a/src/main/aerospike.c b/src/main/aerospike.c index e0d3376a1b..785e5906b6 100644 --- a/src/main/aerospike.c +++ b/src/main/aerospike.c @@ -32,6 +32,7 @@ #include "module_functions.h" #include "nullobject.h" #include "cdt_types.h" +#include "partitions_status.h" #include "transaction.h" #include "config_provider.h" @@ -556,6 +557,8 @@ static struct type_name_to_creation_method py_module_types[] = { {"null", AerospikeNullObject_Ready}, {"CDTWildcard", AerospikeWildcardObject_Ready}, {"CDTInfinite", AerospikeInfiniteObject_Ready}, + {"PartitionsStatus", AerospikePartitionsStatusObject_Ready}, + {"PartitionStatus", AerospikePartitionStatusObject_Ready}, {"Transaction", AerospikeTransaction_Ready}, {"ConfigProvider", AerospikeConfigProvider_Ready}, }; diff --git a/src/main/conversions.c b/src/main/conversions.c index 071f78d8c6..cbadab048d 100644 --- a/src/main/conversions.c +++ b/src/main/conversions.c @@ -356,65 +356,6 @@ as_status as_partition_status_to_pyobject( return err->code; } -// creates a python dict of tuples from an as_partitions_status -// EX: {id:(id, init, done, digest, bval) for id in range (1000, 1004,1)} -// returns and empty dict if parts_status == NULL -as_status as_partitions_status_to_pyobject( - as_error *err, const as_partitions_status *parts_status, PyObject **py_dict) -{ - as_error_reset(err); - - PyObject *new_dict = PyDict_New(); - if (new_dict == NULL) { - as_error_update(err, AEROSPIKE_ERR_CLIENT, "failed to create new_dict"); - goto END; - } - - if (parts_status == NULL) { - // If parts_status is NULL return an empty dict because - // the query/scan is not tracking its partitions. - *py_dict = new_dict; - goto END; - } - - PyObject *py_done = PyBool_FromLong(parts_status->done); - PyDict_SetItemString(new_dict, PARTITIONS_STATUS_KEY_DONE, py_done); - Py_DECREF(py_done); - - PyObject *py_retry = PyBool_FromLong(parts_status->retry); - PyDict_SetItemString(new_dict, PARTITIONS_STATUS_KEY_RETRY, py_retry); - Py_DECREF(py_retry); - - for (int i = 0; i < parts_status->part_count; ++i) { - - const as_partition_status *part = &parts_status->parts[i]; - - PyObject *new_py_tuple = NULL; - if (as_partition_status_to_pyobject(err, part, &new_py_tuple) != - AEROSPIKE_OK) { - Py_DECREF(new_dict); - goto END; - } - - PyObject *py_id = PyLong_FromUnsignedLong((unsigned long)part->part_id); - - if (PyDict_SetItem(new_dict, py_id, new_py_tuple) != 0) { - as_error_update(err, AEROSPIKE_ERR_CLIENT, - "failed set item in new_dict"); - Py_DECREF(new_dict); - Py_DECREF(new_py_tuple); - Py_XDECREF(py_id); - goto END; - } - Py_DECREF(py_id); - } - - *py_dict = new_dict; - -END: - return err->code; -} - as_status as_user_info_to_pyobject(as_error *err, as_user *user, PyObject **py_as_user) { diff --git a/src/main/convert_partition_filter.c b/src/main/convert_partition_filter.c index a5ff85f8f6..17d2d9447d 100644 --- a/src/main/convert_partition_filter.c +++ b/src/main/convert_partition_filter.c @@ -24,37 +24,7 @@ #include "client.h" #include "conversions.h" - -as_partitions_status *parts_setup(uint16_t part_begin, uint16_t part_count, - const as_digest *digest) -{ - as_partitions_status *parts_all = - cf_malloc(sizeof(as_partitions_status) + - (sizeof(as_partition_status) * part_count)); - - memset(parts_all, 0, - sizeof(as_partitions_status) + - (sizeof(as_partition_status) * part_count)); - parts_all->ref_count = 1; - parts_all->part_begin = part_begin; - parts_all->part_count = part_count; - parts_all->done = false; - parts_all->retry = true; - - for (uint16_t i = 0; i < part_count; i++) { - as_partition_status *ps = &parts_all->parts[i]; - ps->part_id = part_begin + i; - ps->retry = true; - ps->digest.init = false; - ps->bval = 0; - } - - if (digest && digest->init) { - parts_all->parts[0].digest = *digest; - } - - return parts_all; -} +#include "partitions_status.h" /* * convert_partition_filter @@ -64,19 +34,15 @@ as_partitions_status *parts_setup(uint16_t part_begin, uint16_t part_count, */ as_status convert_partition_filter(AerospikeClient *self, PyObject *py_partition_filter, - as_partition_filter *filter, - as_partitions_status **pss, as_error *err) + as_partition_filter *filter, as_error *err) { - as_partitions_status *parts_all = NULL; - as_partition_status *ps = NULL; - // TODO what if py_partition_filter is NULL? if (!PyDict_Check(py_partition_filter)) { as_error_update( err, AEROSPIKE_ERR_PARAM, "invalid partition_filter policy, partition_filter must be a dict"); - goto ERROR_CLEANUP; + goto EXIT; } PyObject *begin = PyDict_GetItemString(py_partition_filter, "begin"); @@ -85,11 +51,16 @@ as_status convert_partition_filter(AerospikeClient *self, PyObject *parts_stat = PyDict_GetItemString(py_partition_filter, "partition_status"); - if (parts_stat && !PyDict_Check(parts_stat)) { - as_error_update( - err, AEROSPIKE_ERR_PARAM, - "invalid partition_filter policy, partition_status must be a dict"); - goto ERROR_CLEANUP; + if (parts_stat == Py_None) { + parts_stat = NULL; + } + + if (parts_stat && !PyObject_TypeCheck( + parts_stat, &AerospikePartitionsStatusObject_Type)) { + as_error_update(err, AEROSPIKE_ERR_PARAM, + "invalid partition_filter policy, partition_status " + "must be of type aerospike.PartitionsStatus"); + goto EXIT; } long tmp_begin = 0; @@ -101,15 +72,15 @@ as_status convert_partition_filter(AerospikeClient *self, "invalid partition_filter policy begin, begin must \ be an int between 0 and %d inclusive", CLUSTER_NPARTITIONS - 1); - goto ERROR_CLEANUP; + goto EXIT; } if (PyErr_Occurred() && PyErr_ExceptionMatches(PyExc_OverflowError)) { as_error_update(err, AEROSPIKE_ERR_PARAM, "invalid begin for partition id: %d, \ begin must fit in long", - ps->part_id); - goto ERROR_CLEANUP; + tmp_begin); + goto EXIT; } if (tmp_begin >= CLUSTER_NPARTITIONS || tmp_begin < 0) { @@ -117,7 +88,7 @@ as_status convert_partition_filter(AerospikeClient *self, "invalid partition_filter policy begin, begin must \ be an int between 0 and %d inclusive", CLUSTER_NPARTITIONS - 1); - goto ERROR_CLEANUP; + goto EXIT; } filter->begin = tmp_begin; @@ -131,15 +102,15 @@ as_status convert_partition_filter(AerospikeClient *self, "invalid partition_filter policy count, count must \ be an int between 1 and %d inclusive", CLUSTER_NPARTITIONS); - goto ERROR_CLEANUP; + goto EXIT; } if (PyErr_Occurred() && PyErr_ExceptionMatches(PyExc_OverflowError)) { as_error_update(err, AEROSPIKE_ERR_PARAM, "invalid count for partition id: %d, \ count must fit in long", - ps->part_id); - goto ERROR_CLEANUP; + tmp_count); + goto EXIT; } if (tmp_count > CLUSTER_NPARTITIONS || tmp_count < 1) { @@ -147,7 +118,7 @@ as_status convert_partition_filter(AerospikeClient *self, "invalid partition_filter policy count, count must \ be an int between 1 and %d inclusive", CLUSTER_NPARTITIONS); - goto ERROR_CLEANUP; + goto EXIT; } filter->count = tmp_count; @@ -157,7 +128,7 @@ as_status convert_partition_filter(AerospikeClient *self, "invalid partition filter range,\ begin: %u count: %u, valid range when begin + count <= %d", filter->begin, filter->count, CLUSTER_NPARTITIONS); - goto ERROR_CLEANUP; + goto EXIT; } filter->digest.init = 0; @@ -176,133 +147,12 @@ as_status convert_partition_filter(AerospikeClient *self, } } - parts_all = - parts_setup(filter->begin, filter->count, //cluster->n_partitions, - &filter->digest); - - if (parts_stat && PyDict_Check(parts_stat)) { - - PyObject *py_done = - PyDict_GetItemString(parts_stat, PARTITIONS_STATUS_KEY_DONE); - if (!py_done) { - as_error_update(err, AEROSPIKE_ERR_PARAM, - "partition_status dict missing key '%s'", - PARTITIONS_STATUS_KEY_DONE); - goto ERROR_CLEANUP; - } - - if (PyLong_Check(py_done)) { - parts_all->done = (bool)PyLong_AsLong(py_done); - } - else { - as_error_update(err, AEROSPIKE_ERR_PARAM, - "partition_status dict key '%s' must be an int", - PARTITIONS_STATUS_KEY_DONE); - goto ERROR_CLEANUP; - } - - PyObject *py_retry = - PyDict_GetItemString(parts_stat, PARTITIONS_STATUS_KEY_RETRY); - if (!py_retry) { - as_error_update(err, AEROSPIKE_ERR_PARAM, - "partition_status dict missing key '%s'", - PARTITIONS_STATUS_KEY_RETRY); - goto ERROR_CLEANUP; - } - - if (PyLong_Check(py_retry)) { - parts_all->retry = (bool)PyLong_AsLong(py_retry); - } - else { - as_error_update(err, AEROSPIKE_ERR_PARAM, - "partition_status dict key '%s' must be an int", - PARTITIONS_STATUS_KEY_RETRY); - goto ERROR_CLEANUP; - } - - for (uint16_t i = 0; i < parts_all->part_count; i++) { - ps = &parts_all->parts[i]; - - PyObject *key = PyLong_FromLong(ps->part_id); - PyObject *status_dict = PyDict_GetItem(parts_stat, key); - Py_DECREF(key); - - if (!status_dict || !PyTuple_Check(status_dict)) { - as_log_debug("invalid id for part_id: %d", ps->part_id); - continue; - } - - PyObject *init = PyTuple_GetItem(status_dict, 1); - if (init && PyLong_Check(init)) { - ps->digest.init = PyLong_AsLong(init); - } - else if (init) { - as_error_update(err, AEROSPIKE_ERR_PARAM, - "invalid init for part_id: %d", ps->part_id); - goto ERROR_CLEANUP; - } - - PyObject *retry = PyTuple_GetItem(status_dict, 2); - if (retry && PyLong_Check(retry)) { - ps->retry = (bool)PyLong_AsLong(retry); - } - else if (retry) { - as_error_update(err, AEROSPIKE_ERR_PARAM, - "invalid retry for part_id: %d", ps->part_id); - goto ERROR_CLEANUP; - } - - PyObject *value = PyTuple_GetItem(status_dict, 3); - if (value && PyByteArray_Check(value)) { - uint8_t *bytes_array = (uint8_t *)PyByteArray_AsString(value); - //uint32_t bytes_array_len = (uint32_t)PyByteArray_Size(value); - memcpy(ps->digest.value, bytes_array, AS_DIGEST_VALUE_SIZE); - } - else if (value) { - as_error_update(err, AEROSPIKE_ERR_PARAM, - "invalid digest value for part_id: %d", - ps->part_id); - goto ERROR_CLEANUP; - } - - PyObject *py_bval = PyTuple_GetItem(status_dict, 4); - - // NOTE this is done to maintain backwards compatibility with old 4 elemnt tuples - // used when only partition scans were supported. - if (PyErr_Occurred() && PyErr_ExceptionMatches(PyExc_IndexError)) { - PyErr_Clear(); - } - - if (py_bval && PyLong_Check(py_bval)) { - ps->bval = PyLong_AsUnsignedLongLong(py_bval); - if (PyErr_Occurred() && - PyErr_ExceptionMatches(PyExc_OverflowError)) { - as_error_update(err, AEROSPIKE_ERR_PARAM, - "invalid bval for partition id: %d, bval " - "must fit in unsigned long long", - ps->part_id); - goto ERROR_CLEANUP; - } - } - else if (py_bval) { - as_error_update(err, AEROSPIKE_ERR_PARAM, - "invalid bval for part_id: %d", ps->part_id); - goto ERROR_CLEANUP; - } - } - } - - if (parts_all) { - *pss = parts_all; + if (parts_stat) { + filter->parts_all = + ((AerospikePartitionsStatusObject *)parts_stat)->parts_all; } - return err->code; - -ERROR_CLEANUP: - - if (parts_all) { - free(parts_all); - } +EXIT: return err->code; } diff --git a/src/main/partitions_status/type.c b/src/main/partitions_status/type.c new file mode 100644 index 0000000000..fabf51e67f --- /dev/null +++ b/src/main/partitions_status/type.c @@ -0,0 +1,261 @@ +/******************************************************************************* + * Copyright 2013-2021 Aerospike, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +#include +#include +#include +// TODO: Don't need to include all types +#include "types.h" +#include "partitions_status.h" + +// Partition status object + +PyObject *AerospikePartitionStatusObject_Type_New(PyTypeObject *type, + PyObject *args, + PyObject *kwds) +{ + AerospikePartitionStatusObject *self = + (AerospikePartitionStatusObject *)type->tp_alloc(type, 0); + if (self == NULL) { + return NULL; + } + return (PyObject *)self; +} + +// TODO: make sure memory garbage collection works as intended +static void AerospikePartitionStatusObject_Type_Dealloc( + AerospikePartitionStatusObject *self) +{ + if (self->part_status) { + free(self->part_status); + } + Py_TYPE(self)->tp_free((PyObject *)self); +} + +static PyObject *AerospikePartitionStatus__getitem__(PyObject *self, + PyObject *py_key) +{ + AerospikePartitionStatusObject *py_partition_status = + (AerospikePartitionStatusObject *)self; + bool get_bval = false; + if (PyUnicode_Check(py_key)) { + const char *key = PyUnicode_AsUTF8(py_key); + if (!key) { + return NULL; + } + if (!strcmp(key, "bval")) { + get_bval = true; + } + } + else if (PyLong_Check(py_key)) { + unsigned long long index = PyLong_AsUnsignedLongLong(py_key); + if (index == (unsigned long long)-1 && PyErr_Occurred()) { + return NULL; + } + switch (index) { + case 4: + get_bval = true; + break; + } + } + + if (get_bval) { + uint64_t bval = py_partition_status->part_status->bval; + PyObject *py_bval = + PyLong_FromUnsignedLongLong((unsigned long long)bval); + if (!py_bval) { + return NULL; + } + return py_bval; + } + + PyErr_SetNone(PyExc_KeyError); + return NULL; +} + +static PyMethodDef AerospikePartitionStatus_Type_Methods[] = { + // {.ml_name = "__getitem__", + // .ml_meth = AerospikePartitionStatus__getitem__, + // .ml_flags = METH_O}, + {NULL}}; + +static PyMappingMethods AerospikePartitionStatus_Type_AsMapping = { + .mp_subscript = AerospikePartitionStatus__getitem__}; + +PyTypeObject AerospikePartitionStatusObject_Type = { + PyVarObject_HEAD_INIT(NULL, 0).tp_name = + FULLY_QUALIFIED_TYPE_NAME("PartitionStatus"), + .tp_basicsize = sizeof(AerospikePartitionStatusObject), + .tp_dealloc = (destructor)AerospikePartitionStatusObject_Type_Dealloc, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, + .tp_new = AerospikePartitionStatusObject_Type_New, + .tp_methods = AerospikePartitionStatus_Type_Methods, + .tp_as_mapping = &AerospikePartitionStatus_Type_AsMapping}; + +PyTypeObject *AerospikePartitionStatusObject_Ready() +{ + return PyType_Ready(&AerospikePartitionStatusObject_Type) == 0 + ? &AerospikePartitionStatusObject_Type + : NULL; +} + +static PyObject * +create_py_partition_status_object(as_partition_status *part_status) +{ + AerospikePartitionStatusObject *py_part_status = + (AerospikePartitionStatusObject *)PyObject_CallObject( + (PyObject *)&AerospikePartitionStatusObject_Type, NULL); + if (py_part_status == NULL) { + return NULL; + } + + as_partition_status *part_status_copy = malloc(sizeof(as_partition_status)); + memcpy(part_status_copy, part_status, sizeof(as_partition_status)); + py_part_status->part_status = part_status_copy; + + return (PyObject *)py_part_status; +} + +// Partitions status object + +static void AerospikePartitionsStatusObject_Type_Dealloc( + AerospikePartitionsStatusObject *self) +{ + if (self->parts_all != NULL) { + as_partitions_status_release(self->parts_all); + } + Py_TYPE(self)->tp_free((PyObject *)self); +} + +PyObject *AerospikePartitionsStatusObject_Type_New(PyTypeObject *type, + PyObject *args, + PyObject *kwds) +{ + AerospikePartitionsStatusObject *self = + (AerospikePartitionsStatusObject *)type->tp_alloc(type, 0); + if (self == NULL) { + return NULL; + } + return (PyObject *)self; +} + +// We don't want the user to define a PartitionsStatus object in the public API +// This object should only be created internally by an API method +// parts_all may be NULL if the partitions status isn't being tracked by the C client +// Returns Optional[aerospike.PartitionsStatus] or sets as_error on error +PyObject *create_py_partitions_status_object(as_error *err, + as_partitions_status *parts_all) +{ + if (parts_all == NULL) { + Py_RETURN_NONE; + } + + AerospikePartitionsStatusObject *py_parts_all = + (AerospikePartitionsStatusObject *)PyObject_CallObject( + (PyObject *)&AerospikePartitionsStatusObject_Type, NULL); + if (py_parts_all == NULL) { + PyErr_Clear(); + as_error_update( + err, AEROSPIKE_ERR_CLIENT, + "Unable to create new aerospike.PartitionsStatus object"); + return NULL; + } + + parts_all = as_partitions_status_reserve(parts_all); + py_parts_all->parts_all = parts_all; + return (PyObject *)py_parts_all; +} + +static PyObject *AerospikePartitionsStatus__getitem__(PyObject *self, + PyObject *py_key) +{ + AerospikePartitionsStatusObject *py_partitions_status = + (AerospikePartitionsStatusObject *)self; + if (PyUnicode_Check(py_key)) { + const char *key = PyUnicode_AsUTF8(py_key); + if (!key) { + return NULL; + } + + bool *bool_attr = NULL; + if (!strcmp(key, "retry")) { + bool_attr = &py_partitions_status->parts_all->retry; + } + else if (!strcmp(key, "done")) { + bool_attr = &py_partitions_status->parts_all->done; + } + + PyObject *py_attr = NULL; + if (bool_attr) { + py_attr = PyBool_FromLong(*bool_attr); + if (!py_attr) { + return NULL; + } + } + } + else if (PyLong_Check(py_key)) { + unsigned long partition_id = PyLong_AsUnsignedLong(py_key); + if (partition_id == (unsigned long)-1 && PyErr_Occurred()) { + return NULL; + } + unsigned long partition_idx = + partition_id - py_partitions_status->parts_all->part_begin; + // Check this is a valid partition index + if (partition_idx >= py_partitions_status->parts_all->part_count) { + PyErr_Format(PyExc_ValueError, + "Partition index must be between 0 and %d exclusive", + py_partitions_status->parts_all->part_count); + return NULL; + } + as_partition_status *part_status = + &py_partitions_status->parts_all->parts[partition_idx]; + PyObject *py_partition_status = + create_py_partition_status_object(part_status); + if (py_partition_status == NULL) { + return NULL; + } + return py_partition_status; + } + + PyErr_SetNone(PyExc_KeyError); + return NULL; +} + +static PyMethodDef AerospikePartitionsStatus_Type_Methods[] = { + // {.ml_name = "__getitem__", + // .ml_meth = AerospikePartitionsStatus__getitem__, + // .ml_flags = METH_O}, + {NULL}}; + +static PyMappingMethods AerospikePartitionsStatus_Type_AsMapping = { + .mp_subscript = AerospikePartitionsStatus__getitem__}; + +PyTypeObject AerospikePartitionsStatusObject_Type = { + PyVarObject_HEAD_INIT(NULL, 0).tp_name = + FULLY_QUALIFIED_TYPE_NAME("PartitionsStatus"), + .tp_basicsize = sizeof(AerospikePartitionsStatusObject), + .tp_dealloc = (destructor)AerospikePartitionsStatusObject_Type_Dealloc, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, + .tp_new = AerospikePartitionsStatusObject_Type_New, + .tp_methods = AerospikePartitionsStatus_Type_Methods, + .tp_as_mapping = &AerospikePartitionsStatus_Type_AsMapping}; + +PyTypeObject *AerospikePartitionsStatusObject_Ready() +{ + return PyType_Ready(&AerospikePartitionsStatusObject_Type) == 0 + ? &AerospikePartitionsStatusObject_Type + : NULL; +} diff --git a/src/main/query/foreach.c b/src/main/query/foreach.c index 19d996f8f2..da19a1eea2 100644 --- a/src/main/query/foreach.c +++ b/src/main/query/foreach.c @@ -161,7 +161,6 @@ PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args, as_partition_filter partition_filter = {0}; as_partition_filter *partition_filter_p = NULL; - as_partitions_status *ps = NULL; // Initialize error as_error_init(&err); @@ -190,7 +189,7 @@ PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args, PyDict_GetItemString(py_policy, "partition_filter"); if (py_partition_filter) { if (convert_partition_filter(self->client, py_partition_filter, - &partition_filter, &ps, + &partition_filter, &err) == AEROSPIKE_OK) { partition_filter_p = &partition_filter; data.partition_query = 1; @@ -209,17 +208,9 @@ PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args, // Invoke operation if (partition_filter_p) { - if (ps) { - as_partition_filter_set_partitions(partition_filter_p, ps); - } - aerospike_query_partitions(self->client->as, &data.error, query_policy_p, &self->query, partition_filter_p, each_result, &data); - - if (ps) { - as_partitions_status_release(ps); - } } else { aerospike_query_foreach(self->client->as, &err, query_policy_p, diff --git a/src/main/query/get_parts.c b/src/main/query/get_parts.c index aced6d02fe..424645b847 100644 --- a/src/main/query/get_parts.c +++ b/src/main/query/get_parts.c @@ -22,11 +22,12 @@ #include "exceptions.h" #include "query.h" #include "conversions.h" +#include "partitions_status.h" PyObject *AerospikeQuery_Get_Partitions_status(AerospikeQuery *self) { PyObject *py_parts = NULL; - const as_partitions_status *all_parts = NULL; + as_partitions_status *all_parts = NULL; as_error err; as_error_init(&err); @@ -36,8 +37,7 @@ PyObject *AerospikeQuery_Get_Partitions_status(AerospikeQuery *self) } all_parts = self->query.parts_all; - - as_partitions_status_to_pyobject(&err, all_parts, &py_parts); + py_parts = create_py_partitions_status_object(&err, all_parts); CLEANUP: if (err.code != AEROSPIKE_OK) { diff --git a/src/main/query/results.c b/src/main/query/results.c index bc0994cb42..d6c084309a 100644 --- a/src/main/query/results.c +++ b/src/main/query/results.c @@ -93,7 +93,6 @@ PyObject *AerospikeQuery_Results(AerospikeQuery *self, PyObject *args, as_partition_filter partition_filter = {0}; as_partition_filter *partition_filter_p = NULL; - as_partitions_status *ps = NULL; if (!self || !self->client->as) { as_error_update(&err, AEROSPIKE_ERR_PARAM, "Invalid aerospike object"); @@ -123,7 +122,7 @@ PyObject *AerospikeQuery_Results(AerospikeQuery *self, PyObject *args, PyDict_GetItemString(py_policy, "partition_filter"); if (py_partition_filter) { if (convert_partition_filter(self->client, py_partition_filter, - &partition_filter, &ps, + &partition_filter, &err) == AEROSPIKE_OK) { partition_filter_p = &partition_filter; } @@ -140,17 +139,9 @@ PyObject *AerospikeQuery_Results(AerospikeQuery *self, PyObject *args, Py_BEGIN_ALLOW_THREADS if (partition_filter_p) { - if (ps) { - as_partition_filter_set_partitions(partition_filter_p, ps); - } - aerospike_query_partitions(self->client->as, &err, query_policy_p, &self->query, partition_filter_p, each_result, &data); - - if (ps) { - as_partitions_status_release(ps); - } } else { aerospike_query_foreach(self->client->as, &err, query_policy_p, diff --git a/src/main/scan/foreach.c b/src/main/scan/foreach.c index b90c8c759c..6124d39b45 100644 --- a/src/main/scan/foreach.c +++ b/src/main/scan/foreach.c @@ -140,7 +140,6 @@ PyObject *AerospikeScan_Foreach(AerospikeScan *self, PyObject *args, as_partition_filter partition_filter = {0}; as_partition_filter *partition_filter_p = NULL; - as_partitions_status *ps = NULL; // Python Function Keyword Arguments static char *kwlist[] = {"callback", "policy", "options", "nodename", NULL}; @@ -186,7 +185,7 @@ PyObject *AerospikeScan_Foreach(AerospikeScan *self, PyObject *args, PyDict_GetItemString(py_policy, "partition_filter"); if (py_partition_filter) { if (convert_partition_filter(self->client, py_partition_filter, - &partition_filter, &ps, + &partition_filter, &data.error) == AEROSPIKE_OK) { partition_filter_p = &partition_filter; } @@ -217,15 +216,9 @@ PyObject *AerospikeScan_Foreach(AerospikeScan *self, PyObject *args, Py_BEGIN_ALLOW_THREADS // Invoke operation if (partition_filter_p) { - if (ps) { - as_partition_filter_set_partitions(partition_filter_p, ps); - } aerospike_scan_partitions(self->client->as, &data.error, scan_policy_p, &self->scan, partition_filter_p, each_result, &data); - if (ps) { - as_partitions_status_release(ps); - } } else if (nodename) { aerospike_scan_node(self->client->as, &data.error, scan_policy_p, diff --git a/src/main/scan/get_parts.c b/src/main/scan/get_parts.c index 54bc88e32f..d02de0627f 100644 --- a/src/main/scan/get_parts.c +++ b/src/main/scan/get_parts.c @@ -22,11 +22,12 @@ #include "exceptions.h" #include "scan.h" #include "conversions.h" +#include "partitions_status.h" PyObject *AerospikeScan_Get_Partitions_status(AerospikeScan *self) { PyObject *py_parts = NULL; - const as_partitions_status *all_parts = NULL; + as_partitions_status *all_parts = NULL; as_error err; as_error_init(&err); @@ -36,8 +37,7 @@ PyObject *AerospikeScan_Get_Partitions_status(AerospikeScan *self) } all_parts = self->scan.parts_all; - - as_partitions_status_to_pyobject(&err, all_parts, &py_parts); + py_parts = create_py_partitions_status_object(&err, all_parts); CLEANUP: if (err.code != AEROSPIKE_OK) { diff --git a/src/main/scan/results.c b/src/main/scan/results.c index aa53032544..939df8b694 100644 --- a/src/main/scan/results.c +++ b/src/main/scan/results.c @@ -89,7 +89,6 @@ PyObject *AerospikeScan_Results(AerospikeScan *self, PyObject *args, as_partition_filter partition_filter = {0}; as_partition_filter *partition_filter_p = NULL; - as_partitions_status *ps = NULL; if (PyArg_ParseTupleAndKeywords(args, kwds, "|OO:results", kwlist, &py_policy, &py_nodename) == false) { @@ -123,7 +122,7 @@ PyObject *AerospikeScan_Results(AerospikeScan *self, PyObject *args, PyDict_GetItemString(py_policy, "partition_filter"); if (py_partition_filter) { if (convert_partition_filter(self->client, py_partition_filter, - &partition_filter, &ps, + &partition_filter, &err) == AEROSPIKE_OK) { partition_filter_p = &partition_filter; } @@ -151,15 +150,9 @@ PyObject *AerospikeScan_Results(AerospikeScan *self, PyObject *args, Py_BEGIN_ALLOW_THREADS if (partition_filter_p) { - if (ps) { - as_partition_filter_set_partitions(partition_filter_p, ps); - } aerospike_scan_partitions(self->client->as, &err, scan_policy_p, &self->scan, partition_filter_p, each_result, &data); - if (ps) { - as_partitions_status_release(ps); - } } else if (nodename) { aerospike_scan_node(self->client->as, &err, scan_policy_p, &self->scan, diff --git a/test/new_tests/test_query_get_partitions_status.py b/test/new_tests/test_query_get_partitions_status.py index f054cdac29..308fdfa533 100644 --- a/test/new_tests/test_query_get_partitions_status.py +++ b/test/new_tests/test_query_get_partitions_status.py @@ -2,10 +2,11 @@ import pytest from .test_base_class import TestBaseClass +import aerospike class TestQueryGetPartitionsStatus(TestBaseClass): - @pytest.fixture(autouse=True) + @pytest.fixture(autouse=True, params=[aerospike.Client.scan, aerospike.Client.query]) def setup(self, request, as_connection): if self.server_version < [6, 0]: pytest.mark.xfail(reason="Servers older than 6.0 do not support partition queries.") @@ -47,6 +48,8 @@ def setup(self, request, as_connection): } as_connection.put(key, rec) + self.query_creation_method = request.param + def teardown(): for i in range(1, 100000): put = 0 @@ -70,16 +73,22 @@ def teardown(): request.addfinalizer(teardown) - def test_query_get_partitions_status_no_tracking(self): - query_obj = self.as_connection.query(self.test_ns, self.test_set) + def test_get_partitions_status_after_running_paginated_and_partitioned_query(self): + query_obj = self.query_creation_method(self.as_connection, self.test_ns, self.test_set) + query_obj.paginate() + ids = [] + + def callback(part_id, input_tuple): + ids.append(part_id) + + policy = {"partition_filter": {"begin": 1001, "count": 1}} + query_obj.foreach(callback, policy) + assert len(ids) == self.partition_1001_count stats = query_obj.get_partitions_status() - assert stats == {} + assert type(stats) == aerospike.PartitionsStatus - def test_get_partitions_status_after_foreach(self): - """ - Resume a query using foreach. - """ + def test_resume_terminated_paginated_query(self): records = 0 resumed_records = 0 @@ -89,67 +98,58 @@ def callback(part_id, input_tuple): return False records += 1 - query_obj = self.as_connection.query(self.test_ns, self.test_set) - + query_obj = self.query_creation_method(self.as_connection, self.test_ns, self.test_set) + query_obj.paginate() query_obj.foreach(callback, {"partition_filter": {"begin": 1001, "count": 1}}) assert records == 5 partition_status = query_obj.get_partitions_status() + assert type(partition_status) == aerospike.PartitionsStatus def resume_callback(part_id, input_tuple): nonlocal resumed_records resumed_records += 1 - query_obj2 = self.as_connection.query(self.test_ns, self.test_set) + query_obj2 = self.query_creation_method(self.as_connection, self.test_ns, self.test_set) policy = { "partition_filter": {"begin": 1001, "count": 1, "partition_status": partition_status}, } query_obj2.foreach(resume_callback, policy) - assert records + resumed_records == self.partition_1001_count - def test_query_get_partitions_status_results(self): - query_obj = self.as_connection.query(self.test_ns, self.test_set) + @pytest.mark.parametrize("policy", [ + {}, + {"partition_filter": {"begin": 1001, "count": 1}} + ]) + def test_get_partitions_status_after_finishing_paginated_query(self, policy): + query_obj = self.query_creation_method(self.as_connection, self.test_ns, self.test_set) - # policy = {'partition_filter': {'begin': 1001, 'count': 1}} query_obj.paginate() - query_obj.results() + results = query_obj.results(policy) stats = query_obj.get_partitions_status() - assert stats + assert type(stats) == aerospike.PartitionsStatus - def test_query_get_partitions_status_results_no_tracking(self): - query_obj = self.as_connection.query(self.test_ns, self.test_set) + if "partition_filter" in policy: + assert len(results) == self.partition_1001_count - # policy = {'partition_filter': {'begin': 1001, 'count': 1}} - query_obj.results() - - stats = query_obj.get_partitions_status() - assert not stats + # Negative tests - def test_query_get_partitions_status_results_parts(self): - query_obj = self.as_connection.query(self.test_ns, self.test_set) - - policy = {"partition_filter": {"begin": 1001, "count": 1}} - results = query_obj.results(policy) - assert len(results) == self.partition_1001_count + def test_get_partitions_status_without_running_query(self): + # Non-paginated queries don't support getting partitions status + query_obj = self.query_creation_method(self.as_connection, self.test_ns, self.test_set) stats = query_obj.get_partitions_status() - assert stats - - def test_query_get_partitions_status_foreach_parts(self): - query_obj = self.as_connection.query(self.test_ns, self.test_set) - ids = [] + assert stats is None - def callback(part_id, input_tuple): - ids.append(part_id) + def test_get_partitions_status_after_finishing_nonpaginated_query(self): + query_obj = self.query_creation_method(self.as_connection, self.test_ns, self.test_set) - policy = {"partition_filter": {"begin": 1001, "count": 1}} - query_obj.foreach(callback, policy) - assert len(ids) == self.partition_1001_count + # policy = {'partition_filter': {'begin': 1001, 'count': 1}} + query_obj.results() stats = query_obj.get_partitions_status() - assert stats + assert stats is None diff --git a/test/new_tests/test_query_pagination.py b/test/new_tests/test_query_pagination.py index 989b2e8033..a4a520f458 100644 --- a/test/new_tests/test_query_pagination.py +++ b/test/new_tests/test_query_pagination.py @@ -5,6 +5,7 @@ from aerospike import exception as e import aerospike from .as_status_codes import AerospikeStatus +import math class TestQueryPagination(TestBaseClass): @@ -137,13 +138,13 @@ def callback(part_id, input_tuple): + self.partition_1003_count ) self.partition_1000_count / num_populated_partitions + query_obj.max_records = math.ceil(all_records / num_populated_partitions) for i in range(num_populated_partitions): query_obj.foreach( callback, { "partition_filter": {"begin": 1000, "count": num_populated_partitions}, - "max_records": all_records / num_populated_partitions, }, ) @@ -175,12 +176,14 @@ def test_query_pagination_with_results_method(self): query_obj = self.as_connection.query(ns, st) - max_records = self.partition_1001_count / 2 + query_obj.max_records = math.ceil(self.partition_1001_count / 2) - for i in range(2): - records = query_obj.results({"partition_filter": {"begin": 1001, "count": 1}, "max_records": max_records}) + part_filter = {"begin": 1001, "count": 1} + for i in range(2): + records = query_obj.results({"partition_filter": part_filter}) all_recs += len(records) + part_filter = {"partition_status": query_obj.get_partitions_status()} assert all_recs == self.partition_1001_count assert query_obj.is_done() diff --git a/test/new_tests/test_query_partition.py b/test/new_tests/test_query_partition.py index 05a5ffbe9a..0de0ab643f 100644 --- a/test/new_tests/test_query_partition.py +++ b/test/new_tests/test_query_partition.py @@ -138,6 +138,7 @@ def callback(part_id, input_tuple): query_obj = self.as_connection.query(self.test_ns, self.test_set) query_obj.max_records = 1000 + query_obj.paginate() query_obj.records_per_second = 4000 query_obj.where(p.equals("s", "xyz")) @@ -343,7 +344,7 @@ def callback(part_id, input_tuple): records += 1 query_obj = self.as_connection.query(self.test_ns, self.test_set) - + query_obj.paginate() query_obj.foreach(callback, {"partition_filter": {"begin": 1001, "count": 2}}) assert records == 5 @@ -378,7 +379,7 @@ def callback(part_id, input_tuple): records += 1 query_obj = self.as_connection.query(self.test_ns, self.test_set) - + query_obj.paginate() query_obj.foreach(callback, {"partition_filter": {"begin": 1001, "count": 1}}) assert records == 5 @@ -511,123 +512,9 @@ def callback(part_id, input_tuple): assert err_code == AerospikeStatus.AEROSPIKE_ERR_PARAM assert "invalid partition filter range" in err_info.value.msg - @pytest.mark.parametrize( - "p_stats, expected, msg", - [ - ( - { - "done": False, - "retry": True, - 1001: (1001, True, False, bytearray(b"\xe9\xe31\x01sS\xedafw\x00W\xcdM\x80\xd0L\xee\\d"), 0), - 1002: ( - 1002, - "bad_init", - False, - bytearray(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"), - 0, - ), - }, - e.ParamError, - "invalid init for part_id: 1002", - ), - ( - { - "done": False, - "retry": True, - 1002: ( - 1002, - False, - "bad_done", - bytearray(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"), - 0, - ), - }, - e.ParamError, - "invalid retry for part_id: 1002", - ), - ( - {"done": False, "retry": True, 1003: (1003, False, False, "bad_digest", 0)}, - e.ParamError, - "invalid digest value for part_id: 1003", - ), - ( - { - "done": False, - "retry": True, - 1004: ( - 1004, - False, - False, - bytearray(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"), - "bad_bval", - ), - }, - e.ParamError, - "invalid bval for part_id: 1004", - ), - ( - { - "done": "bad_done", - "retry": True, - 1004: ( - 1004, - False, - False, - bytearray(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"), - 0, - ), - }, - e.ParamError, - "partition_status dict key 'done' must be an int", - ), - ( - { - "done": False, - "retry": "bad_retry", - 1004: ( - 1004, - False, - False, - bytearray(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"), - 0, - ), - }, - e.ParamError, - "partition_status dict key 'retry' must be an int", - ), - ( - { - "retry": True, - 1004: ( - 1004, - False, - False, - bytearray(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"), - 0, - ), - }, - e.ParamError, - "partition_status dict missing key 'done'", - ), - ( - { - "done": False, - 1004: ( - 1004, - False, - False, - bytearray(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"), - 0, - ), - }, - e.ParamError, - "partition_status dict missing key 'retry'", - ), - ], - ) - def test_query_partition_with_bad_status(self, p_stats, expected, msg): + def test_query_partition_with_bad_status(self): records = [] - policy = {"partition_filter": {"begin": 1000, "count": 5, "partition_status": p_stats}} + policy = {"partition_filter": {"begin": 1000, "count": 5, "partition_status": None}} query_obj = self.as_connection.query(self.test_ns, self.test_set) def callback(part_id, input_tuple): @@ -635,7 +522,7 @@ def callback(part_id, input_tuple): records.append(record) # query_obj.foreach(callback, policy) - with pytest.raises(expected) as exc: + with pytest.raises(e.ParamError) as exc: query_obj.foreach(callback, policy) - - assert msg in exc.value.msg + assert exc.value.msg == "invalid partition_filter policy, " \ + "partition_status must be of type aerospike.PartitionsStatus" diff --git a/test/new_tests/test_scan_get_partitions_status.py b/test/new_tests/test_scan_get_partitions_status.py deleted file mode 100644 index 52d6589140..0000000000 --- a/test/new_tests/test_scan_get_partitions_status.py +++ /dev/null @@ -1,151 +0,0 @@ -# -*- coding: utf-8 -*- - -import pytest -from .test_base_class import TestBaseClass - - -class TestScanGetPartitionsStatus(TestBaseClass): - @pytest.fixture(autouse=True) - def setup(self, request, as_connection): - self.test_ns = "test" - self.test_set = "demo" - - self.partition_1000_count = 0 - self.partition_1001_count = 0 - self.partition_1002_count = 0 - self.partition_1003_count = 0 - - as_connection.truncate(self.test_ns, None, 0) - - for i in range(1, 100000): - put = 0 - key = (self.test_ns, self.test_set, str(i)) - rec_partition = as_connection.get_key_partition_id(self.test_ns, self.test_set, str(i)) - - if rec_partition == 1000: - self.partition_1000_count += 1 - put = 1 - if rec_partition == 1001: - self.partition_1001_count += 1 - put = 1 - if rec_partition == 1002: - self.partition_1002_count += 1 - put = 1 - if rec_partition == 1003: - self.partition_1003_count += 1 - put = 1 - if put: - rec = { - "i": i, - "s": "xyz", - "l": [2, 4, 8, 16, 32, None, 128, 256], - "m": {"partition": rec_partition, "b": 4, "c": 8, "d": 16}, - } - as_connection.put(key, rec) - - def teardown(): - for i in range(1, 100000): - put = 0 - key = ("test", "demo", str(i)) - rec_partition = as_connection.get_key_partition_id(self.test_ns, self.test_set, str(i)) - - if rec_partition == 1000: - self.partition_1000_count += 1 - put = 1 - if rec_partition == 1001: - self.partition_1001_count += 1 - put = 1 - if rec_partition == 1002: - self.partition_1002_count += 1 - put = 1 - if rec_partition == 1003: - self.partition_1003_count += 1 - put = 1 - if put: - as_connection.remove(key) - - request.addfinalizer(teardown) - - def test_scan_get_partitions_status_no_tracking(self): - scan_obj = self.as_connection.scan(self.test_ns, self.test_set) - - stats = scan_obj.get_partitions_status() - assert stats == {} - - def test_get_partitions_status_after_foreach(self): - """ - Resume a scan using foreach. - """ - records = 0 - resumed_records = 0 - - def callback(part_id, input_tuple): - nonlocal records - if records == 5: - return False - records += 1 - - scan_obj = self.as_connection.scan(self.test_ns, self.test_set) - - scan_obj.foreach(callback, {"partition_filter": {"begin": 1001, "count": 1}}) - - assert records == 5 - - partition_status = scan_obj.get_partitions_status() - - def resume_callback(part_id, input_tuple): - nonlocal resumed_records - resumed_records += 1 - - scan_obj2 = self.as_connection.scan(self.test_ns, self.test_set) - - policy = { - "partition_filter": {"begin": 1001, "count": 1, "partition_status": partition_status}, - } - - scan_obj2.foreach(resume_callback, policy) - - assert records + resumed_records == self.partition_1001_count - - def test_scan_get_partitions_status_results(self): - scan_obj = self.as_connection.scan(self.test_ns, self.test_set) - - # policy = {'partition_filter': {'begin': 1001, 'count': 1}} - scan_obj.paginate() - scan_obj.results() - - stats = scan_obj.get_partitions_status() - assert stats - - def test_scan_get_partitions_status_results_no_tracking(self): - scan_obj = self.as_connection.scan(self.test_ns, self.test_set) - - # policy = {'partition_filter': {'begin': 1001, 'count': 1}} - scan_obj.results() - - stats = scan_obj.get_partitions_status() - assert not stats - - def test_scan_get_partitions_status_results_parts(self): - scan_obj = self.as_connection.scan(self.test_ns, self.test_set) - - policy = {"partition_filter": {"begin": 1001, "count": 1}} - results = scan_obj.results(policy) - assert len(results) == self.partition_1001_count - - stats = scan_obj.get_partitions_status() - assert stats - - def test_scan_get_partitions_status_foreach_parts(self): - scan_obj = self.as_connection.scan(self.test_ns, self.test_set) - ids = [] - - def callback(part_id, input_tuple): - ids.append(part_id) - - policy = {"partition_filter": {"begin": 1001, "count": 1}} - scan_obj.foreach(callback, policy) - assert len(ids) == self.partition_1001_count - - stats = scan_obj.get_partitions_status() - assert stats diff --git a/test/new_tests/test_scan_pagination.py b/test/new_tests/test_scan_pagination.py index bf3c505108..a015bba47c 100644 --- a/test/new_tests/test_scan_pagination.py +++ b/test/new_tests/test_scan_pagination.py @@ -270,7 +270,7 @@ def callback(part_id, input_tuple): def test_scan_pagination_with_multiple_results_call_on_same_scan_object(self): scan_obj = self.as_connection.scan(self.test_ns, self.test_set) - + scan_obj.paginate() records = scan_obj.results({"partition_filter": {"begin": 1002, "count": 1}}) assert len(records) == self.partition_1002_count diff --git a/test/new_tests/test_scan_partition.py b/test/new_tests/test_scan_partition.py index 7381d7785e..977ebdc821 100644 --- a/test/new_tests/test_scan_partition.py +++ b/test/new_tests/test_scan_partition.py @@ -325,7 +325,7 @@ def callback(part_id, input_tuple): records += 1 scan_obj = self.as_connection.scan(self.test_ns, self.test_set) - + scan_obj.paginate() scan_obj.foreach(callback, {"partition_filter": {"begin": 1001, "count": 1}}) assert records == 5 @@ -360,7 +360,7 @@ def callback(part_id, input_tuple): records += 1 scan_obj = self.as_connection.scan(self.test_ns, self.test_set) - + scan_obj.paginate() scan_obj.foreach(callback, {"partition_filter": {"begin": 1001, "count": 1}}) assert records == 5