|
10 | 10 | import random |
11 | 11 |
|
12 | 12 | import json |
| 13 | +import struct |
| 14 | +import datetime |
13 | 15 | import socket |
| 16 | +import msgpack |
14 | 17 | import requests |
15 | 18 | import requests.exceptions |
16 | 19 | from six.moves import xrange |
@@ -128,7 +131,7 @@ def __init__(self, |
128 | 131 |
|
129 | 132 | self._headers = { |
130 | 133 | 'Content-Type': 'application/json', |
131 | | - 'Accept': 'text/plain' |
| 134 | + 'Accept': 'application/x-msgpack' |
132 | 135 | } |
133 | 136 |
|
134 | 137 | @property |
@@ -277,13 +280,22 @@ def request(self, url, method='GET', params=None, data=None, |
277 | 280 | time.sleep((2 ** _try) * random.random() / 100.0) |
278 | 281 | if not retry: |
279 | 282 | raise |
| 283 | + |
| 284 | + def reformat_error(response): |
| 285 | + err = self._parse_msgpack(response) |
| 286 | + if err: |
| 287 | + return json.dumps(err, separators=(',', ':')) |
| 288 | + else: |
| 289 | + return response.content |
| 290 | + |
280 | 291 | # if there's not an error, there must have been a successful response |
281 | 292 | if 500 <= response.status_code < 600: |
282 | | - raise InfluxDBServerError(response.content) |
| 293 | + raise InfluxDBServerError(reformat_error(response)) |
283 | 294 | elif response.status_code == expected_response_code: |
284 | 295 | return response |
285 | 296 | else: |
286 | | - raise InfluxDBClientError(response.content, response.status_code) |
| 297 | + err_msg = reformat_error(response) |
| 298 | + raise InfluxDBClientError(err_msg, response.status_code) |
287 | 299 |
|
288 | 300 | def write(self, data, params=None, expected_response_code=204, |
289 | 301 | protocol='json'): |
@@ -342,6 +354,21 @@ def _read_chunked_response(response, raise_errors=True): |
342 | 354 | _key, []).extend(result[_key]) |
343 | 355 | return ResultSet(result_set, raise_errors=raise_errors) |
344 | 356 |
|
| 357 | + @staticmethod |
| 358 | + def _parse_msgpack(response): |
| 359 | + """Return the decoded response if it is encoded as msgpack.""" |
| 360 | + def hook(code, data): |
| 361 | + if code == 5: |
| 362 | + (epoch_s, epoch_ns) = struct.unpack(">QI", data) |
| 363 | + time = datetime.datetime.utcfromtimestamp(epoch_s) |
| 364 | + time += datetime.timedelta(microseconds=(epoch_ns / 1000)) |
| 365 | + return time.isoformat() + 'Z' |
| 366 | + return msgpack.ExtType(code, data) |
| 367 | + |
| 368 | + headers = response.headers |
| 369 | + if headers and headers["Content-Type"] == "application/x-msgpack": |
| 370 | + return msgpack.unpackb(response.content, ext_hook=hook, raw=False) |
| 371 | + |
345 | 372 | def query(self, |
346 | 373 | query, |
347 | 374 | params=None, |
@@ -434,10 +461,11 @@ def query(self, |
434 | 461 | expected_response_code=expected_response_code |
435 | 462 | ) |
436 | 463 |
|
437 | | - if chunked: |
438 | | - return self._read_chunked_response(response) |
439 | | - |
440 | | - data = response.json() |
| 464 | + data = self._parse_msgpack(response) |
| 465 | + if not data: |
| 466 | + if chunked: |
| 467 | + return self._read_chunked_response(response) |
| 468 | + data = response.json() |
441 | 469 |
|
442 | 470 | results = [ |
443 | 471 | ResultSet(result, raise_errors=raise_errors) |
|
0 commit comments