26
26
27
27
Based on the elastichsearch collector
28
28
29
- """
29
+ """
30
+
31
+ from __future__ import print_function
30
32
31
33
import errno
32
- import httplib
33
34
try :
34
35
import json
35
36
except ImportError :
45
46
except ImportError :
46
47
flume_conf = None
47
48
49
+ try :
50
+ from http .client import HTTPConnection , OK
51
+ except ImportError :
52
+ from httplib import HTTPConnection , OK
53
+
48
54
COLLECTION_INTERVAL = 15 # seconds
49
55
DEFAULT_TIMEOUT = 10.0 # seconds
50
56
FLUME_HOST = "localhost"
54
60
EXCLUDE = [ 'StartTime' , 'StopTime' , 'Type' ]
55
61
56
62
def err (msg ):
57
- print >> sys .stderr , msg
63
+ print ( msg , file = sys .stderr )
58
64
59
65
class FlumeError (RuntimeError ):
60
66
"""Exception raised if we don't get a 200 OK from Flume webserver."""
@@ -66,7 +72,7 @@ def request(server, uri):
66
72
"""Does a GET request of the given uri on the given HTTPConnection."""
67
73
server .request ("GET" , uri )
68
74
resp = server .getresponse ()
69
- if resp .status != httplib . OK :
75
+ if resp .status != OK :
70
76
raise FlumeError (resp )
71
77
return json .loads (resp .read ())
72
78
@@ -94,11 +100,11 @@ def main(argv):
94
100
95
101
utils .drop_privileges ()
96
102
socket .setdefaulttimeout (DEFAULT_TIMEOUT )
97
- server = httplib . HTTPConnection (FLUME_HOST , FLUME_PORT )
103
+ server = HTTPConnection (FLUME_HOST , FLUME_PORT )
98
104
try :
99
105
server .connect ()
100
- except socket .error , ( erno , e ) :
101
- if erno == errno .ECONNREFUSED :
106
+ except socket .error as exc :
107
+ if exc . errno == errno .ECONNREFUSED :
102
108
return 13 # No Flume server available, ask tcollector to not respawn us.
103
109
raise
104
110
if json is None :
@@ -108,22 +114,22 @@ def main(argv):
108
114
def printmetric (metric , value , ** tags ):
109
115
if tags :
110
116
tags = " " + " " .join ("%s=%s" % (name , value )
111
- for name , value in tags .iteritems ())
117
+ for name , value in tags .items ())
112
118
else :
113
119
tags = ""
114
- print ( "flume.%s %d %s %s" % (metric , ts , value , tags ))
120
+ print (( "flume.%s %d %s %s" % (metric , ts , value , tags ) ))
115
121
116
122
while True :
117
123
# Get the metrics
118
124
ts = int (time .time ()) # In case last call took a while.
119
125
stats = flume_metrics (server )
120
126
121
127
for metric in stats :
122
- (component , name ) = metric .split ("." )
123
- tags = {component .lower (): name }
124
- for key ,value in stats [metric ].items ():
125
- if key not in EXCLUDE :
126
- printmetric (key .lower (), value , ** tags )
128
+ (component , name ) = metric .split ("." )
129
+ tags = {component .lower (): name }
130
+ for key ,value in stats [metric ].items ():
131
+ if key not in EXCLUDE :
132
+ printmetric (key .lower (), value , ** tags )
127
133
128
134
time .sleep (COLLECTION_INTERVAL )
129
135
0 commit comments