From 88a9869f9090f073e2cf4e8bc7a5350e708f86c5 Mon Sep 17 00:00:00 2001 From: Abhi Shah Date: Tue, 8 Mar 2022 16:59:43 +0530 Subject: [PATCH 1/5] JSONResultsReader changes --- examples/follow.py | 6 ++-- examples/oneshot.py | 4 +-- examples/search_modes.py | 4 +-- examples/stail.py | 7 +++-- splunklib/results.py | 65 +++++++++++++++++++++++++++++++++++++++- tests/test_job.py | 24 +++++++-------- tests/test_results.py | 2 +- 7 files changed, 88 insertions(+), 24 deletions(-) diff --git a/examples/follow.py b/examples/follow.py index 64b3e1ac6..cbb559deb 100755 --- a/examples/follow.py +++ b/examples/follow.py @@ -42,7 +42,7 @@ def follow(job, count, items): job.refresh() continue stream = items(offset+1) - for event in results.ResultsReader(stream): + for event in results.JSONResultsReader(stream): pprint(event) offset = total @@ -72,10 +72,10 @@ def main(): if job['reportSearch'] is not None: # Is it a transforming search? count = lambda: int(job['numPreviews']) - items = lambda _: job.preview() + items = lambda _: job.preview(output_mode='json') else: count = lambda: int(job['eventCount']) - items = lambda offset: job.events(offset=offset) + items = lambda offset: job.events(offset=offset, output_mode='json') try: follow(job, count, items) diff --git a/examples/oneshot.py b/examples/oneshot.py index dc34bb8cb..8429aedfb 100755 --- a/examples/oneshot.py +++ b/examples/oneshot.py @@ -32,7 +32,7 @@ "(e.g., export PYTHONPATH=~/splunk-sdk-python.") def pretty(response): - reader = results.ResultsReader(response) + reader = results.JSONResultsReader(response) for result in reader: if isinstance(result, dict): pprint(result) @@ -46,7 +46,7 @@ def main(): search = opts.args[0] service = connect(**opts.kwargs) socket.setdefaulttimeout(None) - response = service.jobs.oneshot(search) + response = service.jobs.oneshot(search, output_mode='json') pretty(response) diff --git a/examples/search_modes.py b/examples/search_modes.py index f3e05f362..66fa77cd4 100644 --- a/examples/search_modes.py +++ b/examples/search_modes.py @@ -24,7 +24,7 @@ def modes(argv): while not job.is_ready(): time.sleep(0.5) pass - reader = results.ResultsReader(job.events()) + reader = results.JSONResultsReader(job.events(output_mode='json')) # Events found: 0 print('Events found with adhoc_search_level="smart": %s' % len([e for e in reader])) @@ -33,7 +33,7 @@ def modes(argv): while not job.is_ready(): time.sleep(0.5) pass - reader = results.ResultsReader(job.events()) + reader = results.ResultsReader(job.events(output_mode='json')) # Events found: 10 print('Events found with adhoc_search_level="verbose": %s' % len([e for e in reader])) diff --git a/examples/stail.py b/examples/stail.py index 85f38a853..3df3f10d7 100755 --- a/examples/stail.py +++ b/examples/stail.py @@ -25,7 +25,7 @@ from pprint import pprint from splunklib.client import connect -from splunklib.results import ResultsReader +from splunklib.results import ResultsReader, JSONResultsReader try: import utils @@ -49,9 +49,10 @@ def main(): search=search, earliest_time="rt", latest_time="rt", - search_mode="realtime") + search_mode="realtime", + output_mode="json") - for result in ResultsReader(result.body): + for result in JSONResultsReader(result.body): if result is not None: print(pprint(result)) diff --git a/splunklib/results.py b/splunklib/results.py index 66e9ad7d1..19c8182df 100644 --- a/splunklib/results.py +++ b/splunklib/results.py @@ -34,7 +34,7 @@ from __future__ import absolute_import -from io import BytesIO +from io import BufferedReader, BytesIO from splunklib import six try: @@ -43,6 +43,7 @@ import xml.etree.ElementTree as et from collections import OrderedDict +from json import loads as json_loads try: from splunklib.six.moves import cStringIO as StringIO @@ -287,6 +288,68 @@ def __itertext(self): else: raise +class JSONResultsReader(object): + """This class returns dictionaries and Splunk messages from a JSON results + stream. + ``JSONResultsReader`` is iterable, and returns a ``dict`` for results, or a + :class:`Message` object for Splunk messages. This class has one field, + ``is_preview``, which is ``True`` when the results are a preview from a + running search, or ``False`` when the results are from a completed search. + This function has no network activity other than what is implicit in the + stream it operates on. + :param `stream`: The stream to read from (any object that supports + ``.read()``). + **Example**:: + import results + response = ... # the body of an HTTP response + reader = results.JSONResultsReader(response) + for result in reader: + if isinstance(result, dict): + print "Result: %s" % result + elif isinstance(result, results.Message): + print "Message: %s" % result + print "is_preview = %s " % reader.is_preview + """ + # Be sure to update the docstrings of client.Jobs.oneshot, + # client.Job.results_preview and client.Job.results to match any + # changes made to JSONResultsReader. + # + # This wouldn't be a class, just the _parse_results function below, + # except that you cannot get the current generator inside the + # function creating that generator. Thus it's all wrapped up for + # the sake of one field. + def __init__(self, stream): + # The search/jobs/exports endpoint, when run with + # earliest_time=rt and latest_time=rt, output_mode=json, streams a sequence of + # JSON documents, each containing a result, as opposed to one + # results element containing lots of results. + stream = BufferedReader(stream) + self.is_preview = None + self._gen = self._parse_results(stream) + def __iter__(self): + return self + def next(self): + return next(self._gen) + __next__ = next + + def _parse_results(self, stream): + """Parse results and messages out of *stream*.""" + for line in stream.readlines(): + strip_line = line.strip() + if strip_line.__len__() == 0 : continue + parsed_line = json_loads(strip_line) + if "preview" in parsed_line: + self.is_preview = parsed_line["preview"] + if "messages" in parsed_line and parsed_line["messages"].__len__() > 0: + for message in parsed_line["messages"]: + msg_type = message.get("type", "Unknown Message Type") + text = message.get("text") + yield Message(msg_type, text) + if "result" in parsed_line: + yield parsed_line["result"] + if "results" in parsed_line: + for result in parsed_line["results"]: + yield result \ No newline at end of file diff --git a/tests/test_job.py b/tests/test_job.py index 4de34b611..44326086b 100755 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -54,8 +54,8 @@ def test_oneshot_with_garbage_fails(self): def test_oneshot(self): jobs = self.service.jobs - stream = jobs.oneshot("search index=_internal earliest=-1m | head 3") - result = results.ResultsReader(stream) + stream = jobs.oneshot("search index=_internal earliest=-1m | head 3", output_mode='json') + result = results.JSONResultsReader(stream) ds = list(result) self.assertEqual(result.is_preview, False) self.assertTrue(isinstance(ds[0], dict) or \ @@ -69,8 +69,8 @@ def test_export_with_garbage_fails(self): def test_export(self): jobs = self.service.jobs - stream = jobs.export("search index=_internal earliest=-1m | head 3") - result = results.ResultsReader(stream) + stream = jobs.export("search index=_internal earliest=-1m | head 3", output_mode='json') + result = results.JSONResultsReader(stream) ds = list(result) self.assertEqual(result.is_preview, False) self.assertTrue(isinstance(ds[0], dict) or \ @@ -82,7 +82,7 @@ def test_export_docstring_sample(self): import splunklib.client as client import splunklib.results as results service = self.service # cheat - rr = results.ResultsReader(service.jobs.export("search * | head 5")) + rr = results.JSONResultsReader(service.jobs.export("search * | head 5", output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -98,7 +98,7 @@ def test_results_docstring_sample(self): job = service.jobs.create("search * | head 5") while not job.is_done(): sleep(0.2) - rr = results.ResultsReader(job.results()) + rr = results.JSONResultsReader(job.results(output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -113,7 +113,7 @@ def test_preview_docstring_sample(self): import splunklib.results as results service = self.service # cheat job = service.jobs.create("search * | head 5") - rr = results.ResultsReader(job.preview()) + rr = results.JSONResultsReader(job.preview(output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -130,7 +130,7 @@ def test_oneshot_docstring_sample(self): import splunklib.client as client import splunklib.results as results service = self.service # cheat - rr = results.ResultsReader(service.jobs.oneshot("search * | head 5")) + rr = results.JSONResultsReader(service.jobs.oneshot("search * | head 5", output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -295,12 +295,12 @@ def test_get_preview_and_events(self): self.assertEventuallyTrue(self.job.is_done) self.assertLessEqual(int(self.job['eventCount']), 3) - preview_stream = self.job.preview() - preview_r = results.ResultsReader(preview_stream) + preview_stream = self.job.preview(output_mode='json') + preview_r = results.JSONResultsReader(preview_stream) self.assertFalse(preview_r.is_preview) - events_stream = self.job.events() - events_r = results.ResultsReader(events_stream) + events_stream = self.job.events(output_mode='json') + events_r = results.JSONResultsReader(events_stream) n_events = len([x for x in events_r if isinstance(x, dict)]) n_preview = len([x for x in preview_r if isinstance(x, dict)]) diff --git a/tests/test_results.py b/tests/test_results.py index 52e290f25..5fdca2b91 100755 --- a/tests/test_results.py +++ b/tests/test_results.py @@ -30,7 +30,7 @@ def test_read_from_empty_result_set(self): job = self.service.jobs.create("search index=_internal_does_not_exist | head 2") while not job.is_done(): sleep(0.5) - self.assertEqual(0, len(list(results.ResultsReader(io.BufferedReader(job.results()))))) + self.assertEqual(0, len(list(results.JSONResultsReader(io.BufferedReader(job.results(output_mode='json')))))) def test_read_normal_results(self): xml_text = """ From debd64cf863a71dcbf26149abadaa82f048b7958 Mon Sep 17 00:00:00 2001 From: Abhi Shah Date: Tue, 8 Mar 2022 17:08:25 +0530 Subject: [PATCH 2/5] deprecated annotation for ResultsReader --- splunklib/results.py | 38 ++++++++++++++++++++++++++------------ tox.ini | 1 + 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/splunklib/results.py b/splunklib/results.py index 19c8182df..1487486fc 100644 --- a/splunklib/results.py +++ b/splunklib/results.py @@ -36,7 +36,10 @@ from io import BufferedReader, BytesIO +import deprecation + from splunklib import six + try: import xml.etree.cElementTree as et except: @@ -55,6 +58,7 @@ "Message" ] + class Message(object): """This class represents informational messages that Splunk interleaves in the results stream. @@ -65,6 +69,7 @@ class Message(object): m = Message("DEBUG", "There's something in that variable...") """ + def __init__(self, type_, message): self.type = type_ self.message = message @@ -78,6 +83,7 @@ def __eq__(self, other): def __hash__(self): return hash((self.type, self.message)) + class _ConcatenatedStream(object): """Lazily concatenate zero or more streams into a stream. @@ -90,6 +96,7 @@ class _ConcatenatedStream(object): s = _ConcatenatedStream(StringIO("abc"), StringIO("def")) assert s.read() == "abcdef" """ + def __init__(self, *streams): self.streams = list(streams) @@ -108,6 +115,7 @@ def read(self, n=None): del self.streams[0] return response + class _XMLDTDFilter(object): """Lazily remove all XML DTDs from a stream. @@ -121,6 +129,7 @@ class _XMLDTDFilter(object): s = _XMLDTDFilter("") assert s.read() == "" """ + def __init__(self, stream): self.stream = stream @@ -151,6 +160,8 @@ def read(self, n=None): n -= 1 return response + +@deprecation.deprecated(deprecated_in="1.16.9", details="Use the JSONResultsReader function instead") class ResultsReader(object): """This class returns dictionaries and Splunk messages from an XML results stream. @@ -178,6 +189,7 @@ class ResultsReader(object): print "Message: %s" % result print "is_preview = %s " % reader.is_preview """ + # Be sure to update the docstrings of client.Jobs.oneshot, # client.Job.results_preview and client.Job.results to match any # changes made to ResultsReader. @@ -258,16 +270,16 @@ def _parse_results(self, stream): # So we'll define it here def __itertext(self): - tag = self.tag - if not isinstance(tag, six.string_types) and tag is not None: - return - if self.text: - yield self.text - for e in self: - for s in __itertext(e): - yield s - if e.tail: - yield e.tail + tag = self.tag + if not isinstance(tag, six.string_types) and tag is not None: + return + if self.text: + yield self.text + for e in self: + for s in __itertext(e): + yield s + if e.tail: + yield e.tail text = "".join(__itertext(elem)) values.append(text) @@ -288,6 +300,7 @@ def __itertext(self): else: raise + class JSONResultsReader(object): """This class returns dictionaries and Splunk messages from a JSON results stream. @@ -310,6 +323,7 @@ class JSONResultsReader(object): print "Message: %s" % result print "is_preview = %s " % reader.is_preview """ + # Be sure to update the docstrings of client.Jobs.oneshot, # client.Job.results_preview and client.Job.results to match any # changes made to JSONResultsReader. @@ -339,7 +353,7 @@ def _parse_results(self, stream): """Parse results and messages out of *stream*.""" for line in stream.readlines(): strip_line = line.strip() - if strip_line.__len__() == 0 : continue + if strip_line.__len__() == 0: continue parsed_line = json_loads(strip_line) if "preview" in parsed_line: self.is_preview = parsed_line["preview"] @@ -352,4 +366,4 @@ def _parse_results(self, stream): yield parsed_line["result"] if "results" in parsed_line: for result in parsed_line["results"]: - yield result \ No newline at end of file + yield result diff --git a/tox.ini b/tox.ini index 227be746c..58ee004ca 100644 --- a/tox.ini +++ b/tox.ini @@ -33,6 +33,7 @@ deps = pytest unittest2 unittest-xml-reporting python-dotenv + deprecation distdir = build commands = From af44b5e52053c79f48be860bc7c5b6d6bf392088 Mon Sep 17 00:00:00 2001 From: Abhi Shah Date: Thu, 10 Mar 2022 17:55:34 +0530 Subject: [PATCH 3/5] added deprecated function annotation --- examples/results.py | 7 +++++-- examples/search_modes.py | 2 +- splunklib/client.py | 39 ++++++++++++++++++--------------------- splunklib/results.py | 6 +++--- splunklib/six.py | 13 +++++++++++++ 5 files changed, 40 insertions(+), 27 deletions(-) diff --git a/examples/results.py b/examples/results.py index 9c0f18751..e18e8f567 100755 --- a/examples/results.py +++ b/examples/results.py @@ -17,18 +17,21 @@ """A script that reads XML search results from stdin and pretty-prints them back to stdout. The script is designed to be used with the search.py example, eg: './search.py "search 404" | ./results.py'""" - + from __future__ import absolute_import from pprint import pprint import sys, os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) import splunklib.results as results + def pretty(): - reader = results.ResultsReader(sys.stdin) + reader = results.JSONResultsReader(sys.stdin) for event in reader: pprint(event) + if __name__ == "__main__": pretty() diff --git a/examples/search_modes.py b/examples/search_modes.py index 66fa77cd4..f1d1687f2 100644 --- a/examples/search_modes.py +++ b/examples/search_modes.py @@ -33,7 +33,7 @@ def modes(argv): while not job.is_ready(): time.sleep(0.5) pass - reader = results.ResultsReader(job.events(output_mode='json')) + reader = results.JSONResultsReader(job.events(output_mode='json')) # Events found: 10 print('Events found with adhoc_search_level="verbose": %s' % len([e for e in reader])) diff --git a/splunklib/client.py b/splunklib/client.py index 7b0772f11..0979140c2 100644 --- a/splunklib/client.py +++ b/splunklib/client.py @@ -2767,9 +2767,8 @@ def pause(self): return self def results(self, **query_params): - """Returns a streaming handle to this job's search results. To get a - nice, Pythonic iterator, pass the handle to :class:`splunklib.results.ResultsReader`, - as in:: + """Returns a streaming handle to this job's search results. To get a nice, Pythonic iterator, pass the handle + to :class:`splunklib.results.JSONResultsReader` along with the query param "output_mode='json'", as in:: import splunklib.client as client import splunklib.results as results @@ -2778,7 +2777,7 @@ def results(self, **query_params): job = service.jobs.create("search * | head 5") while not job.is_done(): sleep(.2) - rr = results.ResultsReader(job.results()) + rr = results.JSONResultsReader(job.results(output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -2808,19 +2807,17 @@ def results(self, **query_params): def preview(self, **query_params): """Returns a streaming handle to this job's preview search results. - Unlike :class:`splunklib.results.ResultsReader`, which requires a job to - be finished to - return any results, the ``preview`` method returns any results that have - been generated so far, whether the job is running or not. The - returned search results are the raw data from the server. Pass - the handle returned to :class:`splunklib.results.ResultsReader` to get a - nice, Pythonic iterator over objects, as in:: + Unlike :class:`splunklib.results.JSONResultsReader`along with the query param "output_mode='json'", + which requires a job to be finished to return any results, the ``preview`` method returns any results that + have been generated so far, whether the job is running or not. The returned search results are the raw data + from the server. Pass the handle returned to :class:`splunklib.results.JSONResultsReader` to get a nice, + Pythonic iterator over objects, as in:: import splunklib.client as client import splunklib.results as results service = client.connect(...) job = service.jobs.create("search * | head 5") - rr = results.ResultsReader(job.preview()) + rr = results.JSONResultsReader(job.preview(output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -2975,15 +2972,15 @@ def create(self, query, **kwargs): return Job(self.service, sid) def export(self, query, **params): - """Runs a search and immediately starts streaming preview events. - This method returns a streaming handle to this job's events as an XML - document from the server. To parse this stream into usable Python objects, - pass the handle to :class:`splunklib.results.ResultsReader`:: + """Runs a search and immediately starts streaming preview events. This method returns a streaming handle to + this job's events as an XML document from the server. To parse this stream into usable Python objects, + pass the handle to :class:`splunklib.results.JSONResultsReader` along with the query param + "output_mode='json'":: import splunklib.client as client import splunklib.results as results service = client.connect(...) - rr = results.ResultsReader(service.jobs.export("search * | head 5")) + rr = results.JSONResultsReader(service.jobs.export("search * | head 5",output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -3032,14 +3029,14 @@ def itemmeta(self): def oneshot(self, query, **params): """Run a oneshot search and returns a streaming handle to the results. - The ``InputStream`` object streams XML fragments from the server. To - parse this stream into usable Python objects, - pass the handle to :class:`splunklib.results.ResultsReader`:: + The ``InputStream`` object streams XML fragments from the server. To parse this stream into usable Python + objects, pass the handle to :class:`splunklib.results.JSONResultsReader` along with the query param + "output_mode='json'" :: import splunklib.client as client import splunklib.results as results service = client.connect(...) - rr = results.ResultsReader(service.jobs.oneshot("search * | head 5")) + rr = results.JSONResultsReader(service.jobs.oneshot("search * | head 5",output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results diff --git a/splunklib/results.py b/splunklib/results.py index 1487486fc..5f3966859 100644 --- a/splunklib/results.py +++ b/splunklib/results.py @@ -36,10 +36,10 @@ from io import BufferedReader, BytesIO -import deprecation - from splunklib import six +from splunklib.six import deprecated + try: import xml.etree.cElementTree as et except: @@ -161,7 +161,7 @@ def read(self, n=None): return response -@deprecation.deprecated(deprecated_in="1.16.9", details="Use the JSONResultsReader function instead") +@deprecated("Use the JSONResultsReader function instead in conjuction with the 'output_mode' query param set to 'json'") class ResultsReader(object): """This class returns dictionaries and Splunk messages from an XML results stream. diff --git a/splunklib/six.py b/splunklib/six.py index 5fe9f8e14..d13e50c93 100644 --- a/splunklib/six.py +++ b/splunklib/six.py @@ -978,3 +978,16 @@ def python_2_unicode_compatible(klass): del i, importer # Finally, add the importer to the meta path import hook. sys.meta_path.append(_importer) + +import warnings + +def deprecated(message): + def deprecated_decorator(func): + def deprecated_func(*args, **kwargs): + warnings.warn("{} is a deprecated function. {}".format(func.__name__, message), + category=DeprecationWarning, + stacklevel=2) + warnings.simplefilter('default', DeprecationWarning) + return func(*args, **kwargs) + return deprecated_func + return deprecated_decorator \ No newline at end of file From 9ea866224273372ecb25d0c51bc169019485206e Mon Sep 17 00:00:00 2001 From: Abhi Shah Date: Thu, 10 Mar 2022 19:00:07 +0530 Subject: [PATCH 4/5] Update test_validators.py --- tests/searchcommands/test_validators.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/searchcommands/test_validators.py b/tests/searchcommands/test_validators.py index 38836c4aa..cc524b307 100755 --- a/tests/searchcommands/test_validators.py +++ b/tests/searchcommands/test_validators.py @@ -208,10 +208,9 @@ def test(integer): def test_float(self): # Float validator test - import random - maxsize = random.random() + 1 - minsize = random.random() - 1 + maxsize = 1.5 + minsize = -1.5 validator = validators.Float() From 55d46038b5b4c54e4191d11672db0ac748227492 Mon Sep 17 00:00:00 2001 From: Abhi Shah Date: Wed, 23 Mar 2022 15:56:58 +0530 Subject: [PATCH 5/5] Update stail.py --- examples/stail.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/stail.py b/examples/stail.py index 3df3f10d7..6ba4ee54e 100755 --- a/examples/stail.py +++ b/examples/stail.py @@ -25,7 +25,7 @@ from pprint import pprint from splunklib.client import connect -from splunklib.results import ResultsReader, JSONResultsReader +from splunklib.results import JSONResultsReader try: import utils