Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/follow.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def follow(job, count, items):
job.refresh()
continue
stream = items(offset+1)
for event in results.ResultsReader(stream):
for event in results.JSONResultsReader(stream):
pprint(event)
offset = total

Expand Down Expand Up @@ -72,10 +72,10 @@ def main():

if job['reportSearch'] is not None: # Is it a transforming search?
count = lambda: int(job['numPreviews'])
items = lambda _: job.preview()
items = lambda _: job.preview(output_mode='json')
else:
count = lambda: int(job['eventCount'])
items = lambda offset: job.events(offset=offset)
items = lambda offset: job.events(offset=offset, output_mode='json')

try:
follow(job, count, items)
Expand Down
4 changes: 2 additions & 2 deletions examples/oneshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"(e.g., export PYTHONPATH=~/splunk-sdk-python.")

def pretty(response):
reader = results.ResultsReader(response)
reader = results.JSONResultsReader(response)
for result in reader:
if isinstance(result, dict):
pprint(result)
Expand All @@ -46,7 +46,7 @@ def main():
search = opts.args[0]
service = connect(**opts.kwargs)
socket.setdefaulttimeout(None)
response = service.jobs.oneshot(search)
response = service.jobs.oneshot(search, output_mode='json')

pretty(response)

Expand Down
7 changes: 5 additions & 2 deletions examples/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@
"""A script that reads XML search results from stdin and pretty-prints them
back to stdout. The script is designed to be used with the search.py
example, eg: './search.py "search 404" | ./results.py'"""

from __future__ import absolute_import
from pprint import pprint
import sys, os

sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))

import splunklib.results as results


def pretty():
reader = results.ResultsReader(sys.stdin)
reader = results.JSONResultsReader(sys.stdin)
for event in reader:
pprint(event)


if __name__ == "__main__":
pretty()
4 changes: 2 additions & 2 deletions examples/search_modes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def modes(argv):
while not job.is_ready():
time.sleep(0.5)
pass
reader = results.ResultsReader(job.events())
reader = results.JSONResultsReader(job.events(output_mode='json'))
# Events found: 0
print('Events found with adhoc_search_level="smart": %s' % len([e for e in reader]))

Expand All @@ -33,7 +33,7 @@ def modes(argv):
while not job.is_ready():
time.sleep(0.5)
pass
reader = results.ResultsReader(job.events())
reader = results.JSONResultsReader(job.events(output_mode='json'))
# Events found: 10
print('Events found with adhoc_search_level="verbose": %s' % len([e for e in reader]))

Expand Down
7 changes: 4 additions & 3 deletions examples/stail.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from pprint import pprint

from splunklib.client import connect
from splunklib.results import ResultsReader
from splunklib.results import ResultsReader, JSONResultsReader

try:
import utils
Expand All @@ -49,9 +49,10 @@ def main():
search=search,
earliest_time="rt",
latest_time="rt",
search_mode="realtime")
search_mode="realtime",
output_mode="json")

for result in ResultsReader(result.body):
for result in JSONResultsReader(result.body):
if result is not None:
print(pprint(result))

Expand Down
39 changes: 18 additions & 21 deletions splunklib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2767,9 +2767,8 @@ def pause(self):
return self

def results(self, **query_params):
"""Returns a streaming handle to this job's search results. To get a
nice, Pythonic iterator, pass the handle to :class:`splunklib.results.ResultsReader`,
as in::
"""Returns a streaming handle to this job's search results. To get a nice, Pythonic iterator, pass the handle
to :class:`splunklib.results.JSONResultsReader` along with the query param "output_mode='json'", as in::

import splunklib.client as client
import splunklib.results as results
Expand All @@ -2778,7 +2777,7 @@ def results(self, **query_params):
job = service.jobs.create("search * | head 5")
while not job.is_done():
sleep(.2)
rr = results.ResultsReader(job.results())
rr = results.JSONResultsReader(job.results(output_mode='json'))
for result in rr:
if isinstance(result, results.Message):
# Diagnostic messages may be returned in the results
Expand Down Expand Up @@ -2808,19 +2807,17 @@ def results(self, **query_params):
def preview(self, **query_params):
"""Returns a streaming handle to this job's preview search results.

Unlike :class:`splunklib.results.ResultsReader`, which requires a job to
be finished to
return any results, the ``preview`` method returns any results that have
been generated so far, whether the job is running or not. The
returned search results are the raw data from the server. Pass
the handle returned to :class:`splunklib.results.ResultsReader` to get a
nice, Pythonic iterator over objects, as in::
Unlike :class:`splunklib.results.JSONResultsReader`along with the query param "output_mode='json'",
which requires a job to be finished to return any results, the ``preview`` method returns any results that
have been generated so far, whether the job is running or not. The returned search results are the raw data
from the server. Pass the handle returned to :class:`splunklib.results.JSONResultsReader` to get a nice,
Pythonic iterator over objects, as in::

import splunklib.client as client
import splunklib.results as results
service = client.connect(...)
job = service.jobs.create("search * | head 5")
rr = results.ResultsReader(job.preview())
rr = results.JSONResultsReader(job.preview(output_mode='json'))
for result in rr:
if isinstance(result, results.Message):
# Diagnostic messages may be returned in the results
Expand Down Expand Up @@ -2975,15 +2972,15 @@ def create(self, query, **kwargs):
return Job(self.service, sid)

def export(self, query, **params):
"""Runs a search and immediately starts streaming preview events.
This method returns a streaming handle to this job's events as an XML
document from the server. To parse this stream into usable Python objects,
pass the handle to :class:`splunklib.results.ResultsReader`::
"""Runs a search and immediately starts streaming preview events. This method returns a streaming handle to
this job's events as an XML document from the server. To parse this stream into usable Python objects,
pass the handle to :class:`splunklib.results.JSONResultsReader` along with the query param
"output_mode='json'"::

import splunklib.client as client
import splunklib.results as results
service = client.connect(...)
rr = results.ResultsReader(service.jobs.export("search * | head 5"))
rr = results.JSONResultsReader(service.jobs.export("search * | head 5",output_mode='json'))
for result in rr:
if isinstance(result, results.Message):
# Diagnostic messages may be returned in the results
Expand Down Expand Up @@ -3032,14 +3029,14 @@ def itemmeta(self):
def oneshot(self, query, **params):
"""Run a oneshot search and returns a streaming handle to the results.

The ``InputStream`` object streams XML fragments from the server. To
parse this stream into usable Python objects,
pass the handle to :class:`splunklib.results.ResultsReader`::
The ``InputStream`` object streams XML fragments from the server. To parse this stream into usable Python
objects, pass the handle to :class:`splunklib.results.JSONResultsReader` along with the query param
"output_mode='json'" ::

import splunklib.client as client
import splunklib.results as results
service = client.connect(...)
rr = results.ResultsReader(service.jobs.oneshot("search * | head 5"))
rr = results.JSONResultsReader(service.jobs.oneshot("search * | head 5",output_mode='json'))
for result in rr:
if isinstance(result, results.Message):
# Diagnostic messages may be returned in the results
Expand Down
99 changes: 88 additions & 11 deletions splunklib/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,19 @@

from __future__ import absolute_import

from io import BytesIO
from io import BufferedReader, BytesIO

from splunklib import six

from splunklib.six import deprecated

try:
import xml.etree.cElementTree as et
except:
import xml.etree.ElementTree as et

from collections import OrderedDict
from json import loads as json_loads

try:
from splunklib.six.moves import cStringIO as StringIO
Expand All @@ -54,6 +58,7 @@
"Message"
]


class Message(object):
"""This class represents informational messages that Splunk interleaves in the results stream.

Expand All @@ -64,6 +69,7 @@ class Message(object):

m = Message("DEBUG", "There's something in that variable...")
"""

def __init__(self, type_, message):
self.type = type_
self.message = message
Expand All @@ -77,6 +83,7 @@ def __eq__(self, other):
def __hash__(self):
return hash((self.type, self.message))


class _ConcatenatedStream(object):
"""Lazily concatenate zero or more streams into a stream.

Expand All @@ -89,6 +96,7 @@ class _ConcatenatedStream(object):
s = _ConcatenatedStream(StringIO("abc"), StringIO("def"))
assert s.read() == "abcdef"
"""

def __init__(self, *streams):
self.streams = list(streams)

Expand All @@ -107,6 +115,7 @@ def read(self, n=None):
del self.streams[0]
return response


class _XMLDTDFilter(object):
"""Lazily remove all XML DTDs from a stream.

Expand All @@ -120,6 +129,7 @@ class _XMLDTDFilter(object):
s = _XMLDTDFilter("<?xml abcd><element><?xml ...></element>")
assert s.read() == "<element></element>"
"""

def __init__(self, stream):
self.stream = stream

Expand Down Expand Up @@ -150,6 +160,8 @@ def read(self, n=None):
n -= 1
return response


@deprecated("Use the JSONResultsReader function instead in conjuction with the 'output_mode' query param set to 'json'")
class ResultsReader(object):
"""This class returns dictionaries and Splunk messages from an XML results
stream.
Expand Down Expand Up @@ -177,6 +189,7 @@ class ResultsReader(object):
print "Message: %s" % result
print "is_preview = %s " % reader.is_preview
"""

# Be sure to update the docstrings of client.Jobs.oneshot,
# client.Job.results_preview and client.Job.results to match any
# changes made to ResultsReader.
Expand Down Expand Up @@ -257,16 +270,16 @@ def _parse_results(self, stream):
# So we'll define it here

def __itertext(self):
tag = self.tag
if not isinstance(tag, six.string_types) and tag is not None:
return
if self.text:
yield self.text
for e in self:
for s in __itertext(e):
yield s
if e.tail:
yield e.tail
tag = self.tag
if not isinstance(tag, six.string_types) and tag is not None:
return
if self.text:
yield self.text
for e in self:
for s in __itertext(e):
yield s
if e.tail:
yield e.tail

text = "".join(__itertext(elem))
values.append(text)
Expand All @@ -288,5 +301,69 @@ def __itertext(self):
raise


class JSONResultsReader(object):
"""This class returns dictionaries and Splunk messages from a JSON results
stream.
``JSONResultsReader`` is iterable, and returns a ``dict`` for results, or a
:class:`Message` object for Splunk messages. This class has one field,
``is_preview``, which is ``True`` when the results are a preview from a
running search, or ``False`` when the results are from a completed search.
This function has no network activity other than what is implicit in the
stream it operates on.
:param `stream`: The stream to read from (any object that supports
``.read()``).
**Example**::
import results
response = ... # the body of an HTTP response
reader = results.JSONResultsReader(response)
for result in reader:
if isinstance(result, dict):
print "Result: %s" % result
elif isinstance(result, results.Message):
print "Message: %s" % result
print "is_preview = %s " % reader.is_preview
"""

# Be sure to update the docstrings of client.Jobs.oneshot,
# client.Job.results_preview and client.Job.results to match any
# changes made to JSONResultsReader.
#
# This wouldn't be a class, just the _parse_results function below,
# except that you cannot get the current generator inside the
# function creating that generator. Thus it's all wrapped up for
# the sake of one field.
def __init__(self, stream):
# The search/jobs/exports endpoint, when run with
# earliest_time=rt and latest_time=rt, output_mode=json, streams a sequence of
# JSON documents, each containing a result, as opposed to one
# results element containing lots of results.
stream = BufferedReader(stream)
self.is_preview = None
self._gen = self._parse_results(stream)

def __iter__(self):
return self

def next(self):
return next(self._gen)

__next__ = next

def _parse_results(self, stream):
"""Parse results and messages out of *stream*."""
for line in stream.readlines():
strip_line = line.strip()
if strip_line.__len__() == 0: continue
parsed_line = json_loads(strip_line)
if "preview" in parsed_line:
self.is_preview = parsed_line["preview"]
if "messages" in parsed_line and parsed_line["messages"].__len__() > 0:
for message in parsed_line["messages"]:
msg_type = message.get("type", "Unknown Message Type")
text = message.get("text")
yield Message(msg_type, text)
if "result" in parsed_line:
yield parsed_line["result"]
if "results" in parsed_line:
for result in parsed_line["results"]:
yield result
13 changes: 13 additions & 0 deletions splunklib/six.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,3 +978,16 @@ def python_2_unicode_compatible(klass):
del i, importer
# Finally, add the importer to the meta path import hook.
sys.meta_path.append(_importer)

import warnings

def deprecated(message):
def deprecated_decorator(func):
def deprecated_func(*args, **kwargs):
warnings.warn("{} is a deprecated function. {}".format(func.__name__, message),
category=DeprecationWarning,
stacklevel=2)
warnings.simplefilter('default', DeprecationWarning)
return func(*args, **kwargs)
return deprecated_func
return deprecated_decorator
Loading