Skip to content

Commit 10ac156

Browse files
colinsurprenantjordansissel
authored andcommitted
add robustness to tweets stream handling
Fixes #1450
1 parent bf11118 commit 10ac156

File tree

1 file changed

+36
-22
lines changed

1 file changed

+36
-22
lines changed

lib/logstash/inputs/twitter.rb

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -65,27 +65,41 @@ def register
6565
public
6666
def run(queue)
6767
@logger.info("Starting twitter tracking", :keywords => @keywords)
68-
@client.filter(:track => @keywords.join(",")) do |tweet|
69-
@logger.info? && @logger.info("Got tweet", :user => tweet.user.screen_name, :text => tweet.text)
70-
if @full_tweet
71-
event = LogStash::Event.new(LogStash::Util.stringify_symbols(tweet.to_hash))
72-
event.timestamp = LogStash::Timestamp.new(tweet.created_at)
73-
else
74-
event = LogStash::Event.new(
75-
LogStash::Event::TIMESTAMP => LogStash::Timestamp.new(tweet.created_at),
76-
"message" => tweet.full_text,
77-
"user" => tweet.user.screen_name,
78-
"client" => tweet.source,
79-
"retweeted" => tweet.retweeted?,
80-
"source" => "http://twitter.com/#{tweet.user.screen_name}/status/#{tweet.id}"
81-
)
82-
end
83-
decorate(event)
84-
event["in-reply-to"] = tweet.in_reply_to_status_id if tweet.reply?
85-
unless tweet.urls.empty?
86-
event["urls"] = tweet.urls.map(&:expanded_url).map(&:to_s)
87-
end
88-
queue << event
89-
end # client.filter
68+
begin
69+
@client.filter(:track => @keywords.join(",")) do |tweet|
70+
if tweet.is_a?(Twitter::Tweet)
71+
@logger.debug? && @logger.debug("Got tweet", :user => tweet.user.screen_name, :text => tweet.text)
72+
if @full_tweet
73+
event = LogStash::Event.new(LogStash::Util.stringify_symbols(tweet.to_hash))
74+
event.timestamp = LogStash::Timestamp.new(tweet.created_at)
75+
else
76+
event = LogStash::Event.new(
77+
LogStash::Event::TIMESTAMP => LogStash::Timestamp.new(tweet.created_at),
78+
"message" => tweet.full_text,
79+
"user" => tweet.user.screen_name,
80+
"client" => tweet.source,
81+
"retweeted" => tweet.retweeted?,
82+
"source" => "http://twitter.com/#{tweet.user.screen_name}/status/#{tweet.id}"
83+
)
84+
event["in-reply-to"] = tweet.in_reply_to_status_id if tweet.reply?
85+
unless tweet.urls.empty?
86+
event["urls"] = tweet.urls.map(&:expanded_url).map(&:to_s)
87+
end
88+
end
89+
90+
decorate(event)
91+
queue << event
92+
end
93+
end # client.filter
94+
rescue LogStash::ShutdownSignal
95+
return
96+
rescue Twitter::Error::TooManyRequests => e
97+
@logger.warn("Twitter too many requests error, sleeping for #{e.rate_limit.reset_in}s")
98+
sleep(e.rate_limit.reset_in)
99+
retry
100+
rescue => e
101+
@logger.warn("Twitter client error", :message => e.message, :exception => e, :backtrace => e.backtrace)
102+
retry
103+
end
90104
end # def run
91105
end # class LogStash::Inputs::Twitter

0 commit comments

Comments
 (0)