Skip to content

Commit

Permalink
configurable sub-second precision with no time key
Browse files Browse the repository at this point in the history
For input plugins that do not provide a time key as part of the
record but do provide a `time` to the router, allow the degree
of sub-second time precision to be configurable.

Some sources (such as AWS CloudWatch) may provide an accurate time
source but do not include a time portion for an otherwise free-form
record: there is nothing to parse.

In this case the casting of a `DateTime` to a `String` loses any
sub-second precision.
  • Loading branch information
Sam Pointer committed Mar 28, 2017
1 parent 83f978f commit e3650a7
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 11 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Note: For Amazon Elasticsearch Service please consider using [fluent-plugin-aws-
+ [logstash_prefix](#logstash_prefix)
+ [logstash_dateformat](#logstash_dateformat)
+ [time_key_format](#time_key_format)
+ [time_precision](#time_precision)
+ [time_key](#time_key)
+ [time_key_exclude_timestamp](#time_key_exclude_timestamp)
+ [utc_index](#utc_index)
Expand Down Expand Up @@ -140,6 +141,12 @@ For example to parse ISO8601 times with sub-second precision:
time_key_format %Y-%m-%dT%H:%M:%S.%N%z
```

### time_precision

Should the record not include a `time_key`, define the degree of sub-second time precision to preserve from the `time` portion of the routed event.

For example, should your input plugin not include a `time_key` in the record but it able to pass a `time` to the router when emitting the event (AWS CloudWatch events are an example of this), then this setting will allow you to preserve the sub-second time resolution of those events. This is the case for: [fluent-plugin-cloudwatch-ingest](https://github.com/sampointer/fluent-plugin-cloudwatch-ingest).

### time_key

By default, when inserting records in [Logstash](https://www.elastic.co/products/logstash) format, `@timestamp` is dynamically created with the time at log ingestion. If you'd like to use a custom time, include an `@timestamp` with your record.
Expand Down Expand Up @@ -360,7 +367,7 @@ remove_keys a_parent, a_routing # a_parent and a_routing fields won't be sent to

### remove_keys_on_update

Remove keys on update will not update the configured keys in elasticsearch when a record is being updated.
Remove keys on update will not update the configured keys in elasticsearch when a record is being updated.
This setting only has any effect if the write operation is update or upsert.

If the write setting is upsert then these keys are only removed if the record is being
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/out_elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ConnectionFailure < StandardError; end
config_param :target_index_key, :string, :default => nil
config_param :target_type_key, :string, :default => nil
config_param :time_key_format, :string, :default => nil
config_param :time_precision, :integer, :default => 0
config_param :logstash_format, :bool, :default => false
config_param :logstash_prefix, :string, :default => "logstash"
config_param :logstash_dateformat, :string, :default => "%Y.%m.%d"
Expand Down Expand Up @@ -299,7 +300,7 @@ def write_objects(tag, chunk)
record[TIMESTAMP_FIELD] = rts unless @time_key_exclude_timestamp
else
dt = Time.at(time).to_datetime
record[TIMESTAMP_FIELD] = dt.to_s
record[TIMESTAMP_FIELD] = dt.iso8601(@time_precision)
end
dt = dt.new_offset(0) if @utc_index
target_index = "#{@logstash_prefix}-#{dt.strftime(@logstash_dateformat)}"
Expand Down
49 changes: 40 additions & 9 deletions test/plugin/test_out_elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ def test_template_create
to_return(:status => 200, :body => "", :headers => {})

driver('test', config)
end
end


def test_template_create_invalid_filename
config = %{
Expand Down Expand Up @@ -177,14 +177,14 @@ def test_templates_create
to_return(:status => 200, :body => "", :headers => {})
stub_request(:put, "https://john:[email protected]:777/es//_template/logstash3").
to_return(:status => 200, :body => "", :headers => {})

driver('test', config)

assert_requested( :put, "https://john:[email protected]:777/es//_template/logstash1", times: 1)
assert_requested( :put, "https://john:[email protected]:777/es//_template/logstash2", times: 1)
assert_not_requested(:put, "https://john:[email protected]:777/es//_template/logstash3") #exists
end

def test_templates_not_used
cwd = File.dirname(__FILE__)
template_file = File.join(cwd, 'test_template.json')
Expand All @@ -199,7 +199,7 @@ def test_templates_not_used
template_name logstash
template_file #{template_file}
templates {"logstash1":"#{template_file}", "logstash2":"#{template_file}" }
}
}
# connection start
stub_request(:head, "https://john:[email protected]:777/es//").
to_return(:status => 200, :body => "", :headers => {})
Expand Down Expand Up @@ -254,7 +254,7 @@ def test_templates_can_be_partially_created_if_error_occurs
assert_raise(RuntimeError) {
driver('test', config)
}

assert_requested(:put, "https://john:[email protected]:777/es//_template/logstash1", times: 1)
assert_not_requested(:put, "https://john:[email protected]:777/es//_template/logstash2")
end
Expand Down Expand Up @@ -770,6 +770,37 @@ def test_uses_custom_time_key_format_obscure_format
assert_equal(index_cmds[1]['@timestamp'], ts)
end

def test_uses_no_subsecond_precision_by_default
driver.configure("logstash_format true\n")
stub_elastic_ping
stub_elastic
begin
time = Fluent::EventTime.new(Time.now.to_i, 000000000)
rescue
time = Fluent::Engine.now
end
driver.emit(sample_record, time)
driver.run
assert(index_cmds[1].has_key? '@timestamp')
assert_equal(index_cmds[1]['@timestamp'], Time.at(time).iso8601)
end

def test_uses_subsecond_precision_when_configured
driver.configure("logstash_format true
time_precision 3\n")
stub_elastic_ping
stub_elastic
begin
time = Fluent::EventTime.new(Time.now.to_i, 000000000)
rescue
time = Fluent::Engine.now
end
driver.emit(sample_record, time)
driver.run
assert(index_cmds[1].has_key? '@timestamp')
assert_equal(index_cmds[1]['@timestamp'], Time.at(time).iso8601(3))
end

def test_doesnt_add_tag_key_by_default
stub_elastic_ping
stub_elastic
Expand Down Expand Up @@ -931,7 +962,7 @@ def test_reconnect_on_error_enabled
stub_request(:post, "http://localhost:9200/_bulk").with do |req|
raise ZeroDivisionError, "any not host_unreachable_exceptions exception"
end

driver.configure("reconnect_on_error true\n")
driver.emit(sample_record)

Expand All @@ -955,7 +986,7 @@ def test_reconnect_on_error_disabled
stub_request(:post, "http://localhost:9200/_bulk").with do |req|
raise ZeroDivisionError, "any not host_unreachable_exceptions exception"
end

driver.configure("reconnect_on_error false\n")
driver.emit(sample_record)

Expand Down

0 comments on commit e3650a7

Please sign in to comment.