Skip to content

Commit

Permalink
fix scan options (#117)
Browse files Browse the repository at this point in the history
* fix scan options

* fix scan options
  • Loading branch information
patkivikram authored Feb 28, 2021
1 parent 7d29cad commit 750e3ad
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 22 deletions.
13 changes: 3 additions & 10 deletions faust/stores/aerospike.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,7 @@ def _iterkeys(self) -> Iterator[bytes]:
scan: aerospike.Scan = self.client.scan(
namespace=self.namespace, set=self.table_name
)
scan_opts = {
"concurrent": True,
"nobins": True,
"priority": 2,
}
for result in scan.results(policy=scan_opts):
for result in scan.results():
yield result[0][2]
except Exception as ex:
self.log.error(
Expand All @@ -150,11 +145,10 @@ def _iterkeys(self) -> Iterator[bytes]:

def _itervalues(self) -> Iterator[bytes]:
try:
scan_opts = {"concurrent": True, "priority": aerospike.SCAN_PRIORITY_MEDIUM}
scan: aerospike.Scan = self.client.scan(
namespace=self.namespace, set=self.table_name
)
for result in scan.results(policy=scan_opts):
for result in scan.results():
(key, meta, bins) = result
if bins:
yield bins[self.BIN_KEY]
Expand All @@ -168,12 +162,11 @@ def _itervalues(self) -> Iterator[bytes]:

def _iteritems(self) -> Iterator[Tuple[bytes, bytes]]:
try:
scan_opts = {"concurrent": True, "priority": aerospike.SCAN_PRIORITY_MEDIUM}

scan: aerospike.Scan = self.client.scan(
namespace=self.namespace, set=self.table_name
)
for result in scan.results(policy=scan_opts):
for result in scan.results():
(key_data, meta, bins) = result
(ns, set, policy, key) = key_data

Expand Down
17 changes: 5 additions & 12 deletions tests/unit/stores/test_aerospike.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,6 @@ def test_iterkeys_error(self, store):
def test_iterkeys_success(self, store):
scan = MagicMock()
store.client.scan = MagicMock(return_value=scan)
scan_opts = {
"concurrent": True,
"nobins": True,
"priority": 2,
}
scan_result = {
(("UUID1", "t1", "key1"), MagicMock(), MagicMock()),
(("UUID2", "t2", "key2"), MagicMock(), MagicMock()),
Expand All @@ -144,16 +139,15 @@ def test_iterkeys_success(self, store):
a = {"key1", "key2"}
result = a - set(store._iterkeys())
assert len(result) == 0
scan.results.assert_called_with(policy=scan_opts)
assert scan.results.called
store.client.scan.assert_called_with(
namespace=store.namespace, set=store.table_name
)

def test_itervalues_success(self, store):
with patch("faust.stores.aerospike.aerospike", MagicMock()) as aero:
with patch("faust.stores.aerospike.aerospike", MagicMock()):
scan = MagicMock()
store.client.scan = MagicMock(return_value=scan)
scan_opts = {"concurrent": True, "priority": aero.SCAN_PRIORITY_MEDIUM}
scan_result = [
(MagicMock(), {"ttl": 4294967295, "gen": 4}, {"value_key": "value1"}),
(MagicMock(), {"ttl": 4294967295, "gen": 4}, None),
Expand All @@ -162,7 +156,7 @@ def test_itervalues_success(self, store):
a = {None, "value1"}
result = a - set(store._itervalues())
assert len(result) == 0
scan.results.assert_called_with(policy=scan_opts)
scan.results.assert_called_with()
store.client.scan.assert_called_with(
namespace=store.namespace, set=store.table_name
)
Expand All @@ -178,11 +172,10 @@ def test_iteritems_error(self, store):
set(store._iteritems())

def test_iteritems_success(self, store):
with patch("faust.stores.aerospike.aerospike", MagicMock()) as aero:
with patch("faust.stores.aerospike.aerospike", MagicMock()):

scan = MagicMock()
store.client.scan = MagicMock(return_value=scan)
scan_opts = {"concurrent": True, "priority": aero.SCAN_PRIORITY_MEDIUM}
scan_result = [
(
("UUID1", "t1", MagicMock(), "key1"),
Expand All @@ -199,7 +192,7 @@ def test_iteritems_success(self, store):
a = {("key1", "value1"), ("key2", "value2")}
result = a - set(store._iteritems())
assert len(result) == 0
scan.results.assert_called_with(policy=scan_opts)
assert scan.results.assert_called
store.client.scan.assert_called_with(
namespace=store.namespace, set=store.table_name
)
Expand Down

0 comments on commit 750e3ad

Please sign in to comment.