Skip to content

Commit 8f22c97

Browse files
itamarstjohann8384
andauthored
If response code from OpenTSDB is 400, don't retry. (#435)
Co-authored-by: Jonathan Creasy <[email protected]>
1 parent b5c50d0 commit 8f22c97

File tree

5 files changed

+99
-8
lines changed

5 files changed

+99
-8
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ python:
77
- "3.6"
88
- "3.7"
99
install:
10-
- pip install pylint pylint_runner ordereddict mysqlclient requests feedparser prometheus_client
10+
- pip install pylint pylint_runner ordereddict mysqlclient requests feedparser prometheus_client flask
1111
script:
1212
- pylint_runner --rcfile .pylintrc
1313
- ./tests.py

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ This project adheres to [Semantic Versioning](http://semver.org/).
88
- A namespace prefix can be added to all metrics.
99
- An optional status monitoring API, serving JSON over HTTP
1010

11+
### Bugfixes
12+
13+
- If response code from OpenTSDB is 400, don't retry sending since this means we
14+
sent bad data.
15+
16+
1117
## [1.3.1](https://github.com/OpenTSDB/tcollector/issues?utf8=%E2%9C%93&q=milestone%3A1.3.1+)
1218
### Collectors Added
1319
- docker.py - Pulls metrics from a local Docker instance, tries /var/run/docker.sock, then localhost API

fake_opentsdb.py

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
"""
2+
A fake OpenTSDB server using Flask.
3+
4+
You can modify the response code using the FAKE_OPENTSDB_RESPONSE environment
5+
variable.
6+
"""
7+
8+
import os
9+
10+
from flask import Flask
11+
app = Flask(__name__)
12+
13+
RESPONSE_CODE = int(os.environ.get("FAKE_OPENTSDB_RESPONSE", 204))
14+
15+
16+
@app.route('/api/put', methods=["PUT", "POST"])
17+
def put_message():
18+
# This could be extended to write the messages to disk, for use by
19+
# additional tests...
20+
return "", RESPONSE_CODE

tcollector.py

+16-6
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@
4444
import importlib
4545
from queue import Queue, Empty, Full # pylint: disable=import-error
4646
from urllib.request import Request, urlopen # pylint: disable=maybe-no-member,no-name-in-module,import-error
47-
from urllib.error import HTTPError # pylint: disable=maybe-no-member,no-name-in-module,import-error
47+
from urllib.error import HTTPError, URLError # pylint: disable=maybe-no-member,no-name-in-module,import-error
4848
from http.server import HTTPServer, BaseHTTPRequestHandler # pylint: disable=maybe-no-member,no-name-in-module,import-error
4949
else:
5050
from Queue import Queue, Empty, Full # pylint: disable=maybe-no-member,no-name-in-module,import-error
51-
from urllib2 import Request, urlopen, HTTPError # pylint: disable=maybe-no-member,no-name-in-module,import-error
51+
from urllib2 import Request, urlopen, HTTPError, URLError # pylint: disable=maybe-no-member,no-name-in-module,import-error
5252
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler # pylint: disable=maybe-no-member,no-name-in-module,import-error
5353

5454
# global variables.
@@ -846,7 +846,10 @@ def send_data_via_http(self):
846846
% base64.b64encode("%s:%s" % (self.http_username, self.http_password)))
847847
req.add_header("Content-Type", "application/json")
848848
try:
849-
response = urlopen(req, json.dumps(metrics))
849+
body = json.dumps(metrics)
850+
if not isinstance(body, bytes):
851+
body = body.encode("utf-8")
852+
response = urlopen(req, body)
850853
LOG.debug("Received response %s %s", response.getcode(), response.read().rstrip('\n'))
851854

852855
# clear out the sendq
@@ -857,9 +860,16 @@ def send_data_via_http(self):
857860
# print line,
858861
# print
859862
except HTTPError as e:
860-
LOG.error("Got error %s %s", e, e.read().rstrip('\n'))
861-
# for line in http_error:
862-
# print line,
863+
if e.code == 400:
864+
LOG.error("Some data was bad, so not going to resend it.")
865+
# This means one or more of the data points were bad
866+
# (http://opentsdb.net/docs/build/html/api_http/put.html#response).
867+
# As such, there's no point resending them.
868+
self.sendq = []
869+
870+
LOG.error("Got error %s %s", e, e.read())
871+
except URLError as e:
872+
LOG.error("Got error URL %s", e)
863873

864874

865875
def setup_logging(logfile=DEFAULT_LOG, max_bytes=None, backup_count=None):

tests.py

+56-1
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,19 @@
1414

1515
import os
1616
import sys
17+
import time
1718
from stat import S_ISDIR, S_ISREG, ST_MODE
1819
import unittest
20+
import subprocess
1921

2022
import mocks
2123
import tcollector
2224
import json
2325
import threading
26+
try:
27+
import flask
28+
except ImportError:
29+
flask = None
2430

2531
PY3 = sys.version_info[0] > 2
2632

@@ -101,6 +107,54 @@ def test_endtoend(self):
101107
self.assertEqual(json.loads(r), [c.to_json() for c in collectors.values()])
102108

103109

110+
@unittest.skipUnless(flask, "Flask not installed")
111+
class SenderThreadHTTPTests(unittest.TestCase):
112+
"""Tests for HTTP sending."""
113+
114+
def run_fake_opentsdb(self, response_code):
115+
env = os.environ.copy()
116+
env["FLASK_APP"] = "fake_opentsdb.py"
117+
env["FAKE_OPENTSDB_RESPONSE"] = str(response_code)
118+
flask = subprocess.Popen(["flask", "run", "--port", "4242"], env=env)
119+
time.sleep(1) # wait for it to start
120+
self.addCleanup(flask.terminate)
121+
122+
def send_query_with_response_code(self, response_code):
123+
"""
124+
Send a HTTP query using SenderThread, with server that returns given
125+
response code.
126+
"""
127+
self.run_fake_opentsdb(response_code)
128+
reader = tcollector.ReaderThread(1, 10, True)
129+
sender = tcollector.SenderThread(
130+
reader, False, [("localhost", 4242)], False, {},
131+
http=True, http_api_path="api/put"
132+
)
133+
sender.sendq.append("mymetric 123 12 a=b")
134+
sender.send_data()
135+
return sender
136+
137+
def test_normal(self):
138+
"""If response is OK, sendq is cleared."""
139+
sender = self.send_query_with_response_code(204)
140+
self.assertEqual(len(sender.sendq), 0)
141+
142+
def test_error(self):
143+
"""
144+
If response is unexpected, e.g. 500 error, sendq is not cleared so we
145+
can retry.
146+
"""
147+
sender = self.send_query_with_response_code(500)
148+
self.assertEqual(len(sender.sendq), 1)
149+
150+
def test_bad_messages(self):
151+
"""
152+
If response is 400, sendq is cleared since there's no point retrying
153+
bad messages.
154+
"""
155+
sender = self.send_query_with_response_code(400)
156+
self.assertEqual(len(sender.sendq), 0)
157+
104158
class NamespacePrefixTests(unittest.TestCase):
105159
"""Tests for metric namespace prefix."""
106160

@@ -114,7 +168,6 @@ def test_prefix_added(self):
114168
self.assertEqual(collector.lines_received, 1)
115169
self.assertEqual(collector.lines_invalid, 0)
116170

117-
118171
class TSDBlacklistingTests(unittest.TestCase):
119172
"""
120173
Tests of TSD blacklisting logic
@@ -379,6 +432,8 @@ def test_multi_empty_line_put_cond(self):
379432
self.assertEqual(stderr, [])
380433

381434
if __name__ == '__main__':
435+
import logging
436+
logging.basicConfig()
382437
cdir = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])),
383438
'collectors')
384439
tcollector.setup_python_path(cdir) # pylint: disable=maybe-no-member

0 commit comments

Comments
 (0)