diff --git a/redisai/client.py b/redisai/client.py index aa6bd04..1411ad8 100644 --- a/redisai/client.py +++ b/redisai/client.py @@ -528,6 +528,66 @@ def tensorget( else processor.tensorget(res, as_numpy, as_numpy_mutable, meta_only) ) + def scriptstore( + self, key: AnyStr, device: str, script: str, entry_points: Union[str, Sequence[str]], tag: AnyStr = None + ) -> str: + """ + Set the script to RedisAI. The difference from scriptset is that in scriptstore + you must specify entry points within your script. These functions must have specific + signature: 'def entry_point(tensors: List[Tensor], keys: List[str], args: List[str])'. + RedisAI uses the TorchScript engine to execute the script. So the script should + have only TorchScript supported constructs. That being said, it's important to + mention that using redisai script to do post processing or pre processing for a + Tensorflow (or any other backend) is completely valid. For more details about + TorchScript and supported ops, checkout TorchScript documentation. + + Parameters + ---------- + key : AnyStr + Script key at the server + device : str + Device name. Allowed devices are CPU and GPU. If multiple GPUs are available. + it can be specified using the format GPU:. For example: GPU:0 + script : str + Script itself, as a Python string + entry_points : Union[str, Sequence[str]] + A list of functions in the script that may serve as entry point for the + execution. Each entry point must have the specify signature: + def entry_point(tensors: List[Tensor], keys: List[str], args: List[str])) + Note that the script may contain additional helper functions that doesn't + have to follow this signature. + tag : AnyStr + Any string that will be saved in RedisAI as tag for the script + + Returns + ------- + str + 'OK' if success, raise an exception otherwise + + Note + ---- + Even though ``script`` is pure Python code, it's a subset of Python language and not + all the Python operations are supported. For more details, checkout TorchScript + documentation. It's also important to note that that the script is executed on a high + performance C++ runtime instead of the Python interpreter. And hence ``script`` should + not have any import statements (A common mistake people make all the time) + + Example + ------- + >>> script = r''' + >>> def bar(tensors: List[Tensor], keys: List[str], args: List[str]): + >>> a = tensors[0] + >>> b = tensors[1] + >>> return a + b + >>>''' + >>> con.scriptstore('ket', 'cpu', script, 'bar') + 'OK' + """ + args = builder.scriptstore(key, device, script, entry_points, tag) + res = self.execute_command(*args) + return res if not self.enable_postprocess else processor.scriptstore(res) + + @deprecated(version="1.2.0", reason="Use scriptstore instead") def scriptset( self, key: AnyStr, device: str, script: str, tag: AnyStr = None ) -> str: @@ -622,10 +682,11 @@ def scriptdel(self, key: AnyStr) -> str: res = self.execute_command(*args) return res if not self.enable_postprocess else processor.scriptdel(res) + @deprecated(version="1.2.0", reason="Use scriptexecute instead") def scriptrun( self, key: AnyStr, - function: AnyStr, + function: str, inputs: Union[AnyStr, Sequence[AnyStr]], outputs: Union[AnyStr, Sequence[AnyStr]], ) -> str: @@ -636,13 +697,13 @@ def scriptrun( ---------- key : AnyStr Script key - function : AnyStr + function : str Name of the function in the ``script`` inputs : Union[AnyStr, List[AnyStr]] Tensor(s) which is already saved in the RedisAI using a tensorset call. These tensors will be used as the input for the modelrun outputs : Union[AnyStr, List[AnyStr]] - keys on which the outputs to be saved. If those keys exist already, modelrun + keys on which the outputs to be saved. If those keys exist already, scriptrun will overwrite them with new values Returns @@ -659,6 +720,62 @@ def scriptrun( res = self.execute_command(*args) return res if not self.enable_postprocess else processor.scriptrun(res) + def scriptexecute( + self, + key: AnyStr, + function: str, + keys: Union[AnyStr, Sequence[AnyStr]] = None, + inputs: Union[AnyStr, Sequence[AnyStr]] = None, + args: Union[AnyStr, Sequence[AnyStr]] = None, + outputs: Union[AnyStr, Sequence[AnyStr]] = None, + timeout: int = None, + ) -> str: + """ + Run an already set script. Similar to modelexecute. + Must specify keys or inputs. + + Parameters + ---------- + key : AnyStr + Script key + function : str + Name of the function in the ``script`` + keys : Union[AnyStr, Sequence[AnyStr]] + Denotes the list of Redis key names that the script will access to + during its execution, for both read and/or write operations. + inputs : Union[AnyStr, Sequence[AnyStr]] + Denotes the input tensors list. + args : Union[AnyStr, Sequence[AnyStr]] + Denotes the list of additional arguments that a user can send to the + script. All args are sent as strings, but can be casted to other types + supported by torch script, such as int, or float. + outputs : Union[AnyStr, List[AnyStr]] + Denotes the output tensors keys' list. If those keys exist already, + scriptexecute will overwrite them with new values. + timeout : int + The max number on milisecinds that may pass before the request is prossced + (meaning that the result will not be computed after that time and TIMEDOUT + is returned in that case). + + Returns + ------- + str + 'OK' if success, raise an exception otherwise + + Example + ------- + >>> con.scriptexecute('myscript', 'bar', inputs=['a', 'b'], outputs=['c']) + 'OK' + >>> con.scriptexecute('myscript{tag}', 'addn', + >>> inputs=['mytensor1{tag}', 'mytensor2{tag}', 'mytensor3{tag}'], + >>> args=['5.0'], + >>> outputs=['result{tag}']) + 'OK' + """ + args = builder.scriptexecute(key, function, keys, inputs, args, outputs, timeout) + res = self.execute_command(*args) + return res if not self.enable_postprocess else processor.scriptexecute(res) + def scriptscan(self) -> List[List[AnyStr]]: """ Returns the list of all the script in the RedisAI server. Scriptscan API is diff --git a/redisai/command_builder.py b/redisai/command_builder.py index 53e422b..200a4d3 100644 --- a/redisai/command_builder.py +++ b/redisai/command_builder.py @@ -204,6 +204,26 @@ def tensorget(key: AnyStr, as_numpy: bool = True, meta_only: bool = False) -> Se return args +def scriptstore( + name: AnyStr, + device: str, + script: str, + entry_points: Union[str, Sequence[str]], + tag: AnyStr = None +) -> Sequence: + if device.upper() not in utils.allowed_devices: + raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") + if name is None or script is None or entry_points is None: + raise ValueError("Missing required arguments for script store command") + args = ["AI.SCRIPTSTORE", name, device] + if tag: + args += ["TAG", tag] + args += ["ENTRY_POINTS", len(utils.listify(entry_points)), *utils.listify(entry_points)] + args.append("SOURCE") + args.append(script) + return args + + def scriptset(name: AnyStr, device: str, script: str, tag: AnyStr = None) -> Sequence: if device.upper() not in utils.allowed_devices: raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") @@ -228,10 +248,12 @@ def scriptdel(name: AnyStr) -> Sequence: def scriptrun( name: AnyStr, - function: AnyStr, + function: str, inputs: Union[AnyStr, Sequence[AnyStr]], outputs: Union[AnyStr, Sequence[AnyStr]], ) -> Sequence: + if name is None or function is None: + raise ValueError("Missing required arguments for script run command") args = ( "AI.SCRIPTRUN", name, @@ -244,6 +266,33 @@ def scriptrun( return args +def scriptexecute( + name: AnyStr, + function: str, + keys: Union[AnyStr, Sequence[AnyStr]], + inputs: Union[AnyStr, Sequence[AnyStr]], + input_args: Union[AnyStr, Sequence[AnyStr]], + outputs: Union[AnyStr, Sequence[AnyStr]], + timeout: int, +) -> Sequence: + if name is None or function is None or (keys is None and inputs is None): + raise ValueError("Missing required arguments for script execute command") + args = ["AI.SCRIPTEXECUTE", name, function] + + if keys is not None: + args += ["KEYS", len(utils.listify(keys)), *utils.listify(keys)] + if inputs is not None: + args += ["INPUTS", len(utils.listify(inputs)), *utils.listify(inputs)] + if input_args is not None: + args += ["ARGS", len(utils.listify(input_args)), *utils.listify(input_args)] + if outputs is not None: + args += ["OUTPUTS", len(utils.listify(outputs)), *utils.listify(outputs)] + if timeout is not None: + args += ["TIMEOUT", timeout] + + return args + + def scriptscan() -> Sequence: return ("AI._SCRIPTSCAN",) diff --git a/redisai/postprocessor.py b/redisai/postprocessor.py index 42bd141..ae93fab 100644 --- a/redisai/postprocessor.py +++ b/redisai/postprocessor.py @@ -70,8 +70,10 @@ def infoget(res): "modelrun", "tensorset", "scriptset", + "scriptstore", "scriptdel", "scriptrun", + "scriptexecute", "inforeset", ) for fn in decoding_functions: diff --git a/redisai/utils.py b/redisai/utils.py index ca8007f..ba41809 100644 --- a/redisai/utils.py +++ b/redisai/utils.py @@ -1,5 +1,4 @@ from typing import AnyStr, ByteString, Callable, List, Sequence, Union - import numpy as np dtype_dict = { diff --git a/test/test.py b/test/test.py index 569d9ea..4690117 100644 --- a/test/test.py +++ b/test/test.py @@ -28,9 +28,43 @@ def __exit__(self, *args): MODEL_DIR = os.path.dirname(os.path.abspath(__file__)) + "/testdata" TENSOR_DIR = MODEL_DIR -script = r""" +script_old = r""" def bar(a, b): return a + b + +def bar_variadic(a, args : List[Tensor]): + return args[0] + args[1] +""" + +script = r""" +def bar(tensors: List[Tensor], keys: List[str], args: List[str]): + a = tensors[0] + b = tensors[1] + return a + b + +def bar_variadic(tensors: List[Tensor], keys: List[str], args: List[str]): + a = tensors[0] + l = tensors[1:] + return a + l[0] +""" + +script_with_redis_commands = r""" +def redis_string_int_to_tensor(redis_value: Any): + return torch.tensor(int(str(redis_value))) + +def int_set_get(tensors: List[Tensor], keys: List[str], args: List[str]): + key = keys[0] + value = int(args[0]) + redis.execute("SET", key, str(value)) + res = redis.execute("GET", key) + return redis_string_int_to_tensor(res) + +def func(tensors: List[Tensor], keys: List[str], args: List[str]): + redis.execute("SET", keys[0], args[0]) + a = torch.stack(tensors).sum() + b = redis_string_int_to_tensor(redis.execute("GET", keys[0])) + redis.execute("DEL", keys[0]) + return b + a """ @@ -349,28 +383,97 @@ def test_run_tf_model(self): con.modeldel("m") self.assertRaises(ResponseError, con.modelget, "m") - def test_scripts(self): + # AI.SCRIPTRUN is deprecated by AI.SCRIPTEXECUTE + # and AI.SCRIPTSET is deprecated by AI.SCRIPTSTORE + def test_deprecated_scriptset_and_scriptrun(self): con = self.get_client() - self.assertRaises(ResponseError, con.scriptset, - "ket", "cpu", "return 1") - con.scriptset("ket", "cpu", script) + self.assertRaises(ResponseError, con.scriptset, "scr", "cpu", "return 1") + con.scriptset("scr", "cpu", script_old) con.tensorset("a", (2, 3), dtype="float") con.tensorset("b", (2, 3), dtype="float") + + # test bar(a, b) + con.scriptrun("scr", "bar", inputs=["a", "b"], outputs=["c"]) + tensor = con.tensorget("c", as_numpy=False) + self.assertEqual([4, 6], tensor["values"]) + + # test bar_variadic(a, args : List[Tensor]) + con.scriptrun("scr", "bar_variadic", inputs=["a", "$", "b", "b"], outputs=["c"]) + tensor = con.tensorget("c", as_numpy=False) + self.assertEqual([4, 6], tensor["values"]) + + def test_scriptstore(self): + con = self.get_client() # try with bad arguments: - self.assertRaises( - ResponseError, con.scriptrun, "ket", "bar", inputs=["a"], outputs=["c"] - ) - con.scriptrun("ket", "bar", inputs=["a", "b"], outputs=["c"]) + with self.assertRaises(ValueError) as e: + con.scriptstore("test", "cpu", script, entry_points=None) + self.assertEqual(str(e.exception), "Missing required arguments for script store command") + self.assertRaises(ValueError, con.scriptstore, "test", "cpu", script=None, entry_points="bar") + with self.assertRaises(ResponseError) as e: + con.scriptstore("test", "cpu", "return 1", "f") + self.assertEqual(str(e.exception), + "expected def but found 'return' here: File \"\", line 1 return 1 ~~~~~~ <--- HERE ") + + def test_scripts_execute(self): + con = self.get_client() + # try with bad arguments: + with self.assertRaises(ValueError) as e: + con.scriptexecute("test", function=None, keys=None, inputs=None) + self.assertEqual(str(e.exception), "Missing required arguments for script execute command") + with self.assertRaises(ResponseError) as e: + con.scriptexecute("test", "bar", inputs=["a"], outputs=["c"]) + self.assertEqual(str(e.exception), "script key is empty") + + con.scriptstore("test", "cpu", script, "bar") + con.tensorset("a", (2, 3), dtype="float") + con.tensorset("b", (2, 3), dtype="float") + con.scriptexecute("test", "bar", inputs=["a", "b"], outputs=["c"]) tensor = con.tensorget("c", as_numpy=False) self.assertEqual([4, 6], tensor["values"]) - script_det = con.scriptget("ket") + script_det = con.scriptget("test") self.assertTrue(script_det["device"] == "cpu") self.assertTrue(script_det["source"] == script) - script_det = con.scriptget("ket", meta_only=True) + script_det = con.scriptget("test", meta_only=True) self.assertTrue(script_det["device"] == "cpu") self.assertNotIn("source", script_det) - con.scriptdel("ket") - self.assertRaises(ResponseError, con.scriptget, "ket") + # delete the script + con.scriptdel("test") + self.assertRaises(ResponseError, con.scriptget, "test") + + # store new script + con.scriptstore("myscript{1}", "cpu", script, ["bar", "bar_variadic"], "version1") + con.tensorset("a{1}", [2, 3, 2, 3], shape=(2, 2), dtype="float") + con.tensorset("b{1}", [2, 3, 2, 3], shape=(2, 2), dtype="float") + con.scriptexecute("myscript{1}", "bar", inputs=["a{1}", "b{1}"], outputs=["c{1}"]) + values = con.tensorget("c{1}", as_numpy=False) + self.assertTrue(np.allclose(values["values"], [4.0, 6.0, 4.0, 6.0])) + + con.tensorset("b1{1}", [2, 3, 2, 3], shape=(2, 2), dtype="float") + con.scriptexecute("myscript{1}", 'bar_variadic', + inputs=["a{1}", "b1{1}", "b{1}"], + outputs=["c{1}"]) + + values = con.tensorget("c{1}", as_numpy=False)['values'] + self.assertEqual(values, [4.0, 6.0, 4.0, 6.0]) + + def test_scripts_redis_commands(self): + con = self.get_client() + con.scriptstore("myscript{1}", "cpu", script_with_redis_commands, ["int_set_get", "func"]) + con.scriptexecute("myscript{1}", "int_set_get", keys=["x{1}", "{1}"], args=["3"], outputs=["y{1}"]) + values = con.tensorget("y{1}", as_numpy=False) + self.assertTrue(np.allclose(values["values"], [3])) + + con.tensorset("mytensor1{1}", [40], dtype="float") + con.tensorset("mytensor2{1}", [10], dtype="float") + con.tensorset("mytensor3{1}", [1], dtype="float") + con.scriptexecute("myscript{1}", "func", + keys=["key{1}"], + inputs=["mytensor1{1}", "mytensor2{1}", "mytensor3{1}"], + args=["3"], + outputs=["my_output{1}"]) + values = con.tensorget("my_output{1}", as_numpy=False) + self.assertTrue(np.allclose(values["values"], [54])) + self.assertIsNone(con.get("key{1}")) def test_run_onnxml_model(self): mlmodel_path = os.path.join(MODEL_DIR, "boston.onnx")