Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

configurable sub-second precision with no time key #249

Merged
merged 1 commit into from
May 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Note: For Amazon Elasticsearch Service please consider using [fluent-plugin-aws-
+ [logstash_prefix](#logstash_prefix)
+ [logstash_dateformat](#logstash_dateformat)
+ [time_key_format](#time_key_format)
+ [time_precision](#time_precision)
+ [time_key](#time_key)
+ [time_key_exclude_timestamp](#time_key_exclude_timestamp)
+ [utc_index](#utc_index)
Expand Down Expand Up @@ -140,6 +141,12 @@ For example to parse ISO8601 times with sub-second precision:
time_key_format %Y-%m-%dT%H:%M:%S.%N%z
```

### time_precision

Should the record not include a `time_key`, define the degree of sub-second time precision to preserve from the `time` portion of the routed event.

For example, should your input plugin not include a `time_key` in the record but it able to pass a `time` to the router when emitting the event (AWS CloudWatch events are an example of this), then this setting will allow you to preserve the sub-second time resolution of those events. This is the case for: [fluent-plugin-cloudwatch-ingest](https://github.com/sampointer/fluent-plugin-cloudwatch-ingest).

### time_key

By default, when inserting records in [Logstash](https://www.elastic.co/products/logstash) format, `@timestamp` is dynamically created with the time at log ingestion. If you'd like to use a custom time, include an `@timestamp` with your record.
Expand Down Expand Up @@ -360,7 +367,7 @@ remove_keys a_parent, a_routing # a_parent and a_routing fields won't be sent to

### remove_keys_on_update

Remove keys on update will not update the configured keys in elasticsearch when a record is being updated.
Remove keys on update will not update the configured keys in elasticsearch when a record is being updated.
This setting only has any effect if the write operation is update or upsert.

If the write setting is upsert then these keys are only removed if the record is being
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/out_elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ConnectionFailure < StandardError; end
config_param :target_index_key, :string, :default => nil
config_param :target_type_key, :string, :default => nil
config_param :time_key_format, :string, :default => nil
config_param :time_precision, :integer, :default => 0
config_param :logstash_format, :bool, :default => false
config_param :logstash_prefix, :string, :default => "logstash"
config_param :logstash_dateformat, :string, :default => "%Y.%m.%d"
Expand Down Expand Up @@ -299,7 +300,7 @@ def write_objects(tag, chunk)
record[TIMESTAMP_FIELD] = rts unless @time_key_exclude_timestamp
else
dt = Time.at(time).to_datetime
record[TIMESTAMP_FIELD] = dt.to_s
record[TIMESTAMP_FIELD] = dt.iso8601(@time_precision)
end
dt = dt.new_offset(0) if @utc_index
target_index = "#{@logstash_prefix}-#{dt.strftime(@logstash_dateformat)}"
Expand Down
49 changes: 40 additions & 9 deletions test/plugin/test_out_elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ def test_template_create
to_return(:status => 200, :body => "", :headers => {})

driver('test', config)
end
end


def test_template_create_invalid_filename
config = %{
Expand Down Expand Up @@ -177,14 +177,14 @@ def test_templates_create
to_return(:status => 200, :body => "", :headers => {})
stub_request(:put, "https://john:[email protected]:777/es//_template/logstash3").
to_return(:status => 200, :body => "", :headers => {})

driver('test', config)

assert_requested( :put, "https://john:[email protected]:777/es//_template/logstash1", times: 1)
assert_requested( :put, "https://john:[email protected]:777/es//_template/logstash2", times: 1)
assert_not_requested(:put, "https://john:[email protected]:777/es//_template/logstash3") #exists
end

def test_templates_not_used
cwd = File.dirname(__FILE__)
template_file = File.join(cwd, 'test_template.json')
Expand All @@ -199,7 +199,7 @@ def test_templates_not_used
template_name logstash
template_file #{template_file}
templates {"logstash1":"#{template_file}", "logstash2":"#{template_file}" }
}
}
# connection start
stub_request(:head, "https://john:[email protected]:777/es//").
to_return(:status => 200, :body => "", :headers => {})
Expand Down Expand Up @@ -254,7 +254,7 @@ def test_templates_can_be_partially_created_if_error_occurs
assert_raise(RuntimeError) {
driver('test', config)
}

assert_requested(:put, "https://john:[email protected]:777/es//_template/logstash1", times: 1)
assert_not_requested(:put, "https://john:[email protected]:777/es//_template/logstash2")
end
Expand Down Expand Up @@ -770,6 +770,37 @@ def test_uses_custom_time_key_format_obscure_format
assert_equal(index_cmds[1]['@timestamp'], ts)
end

def test_uses_no_subsecond_precision_by_default
driver.configure("logstash_format true\n")
stub_elastic_ping
stub_elastic
begin
time = Fluent::EventTime.new(Time.now.to_i, 000000000)
rescue
time = Fluent::Engine.now
end
driver.emit(sample_record, time)
driver.run
assert(index_cmds[1].has_key? '@timestamp')
assert_equal(index_cmds[1]['@timestamp'], Time.at(time).iso8601)
end

def test_uses_subsecond_precision_when_configured
driver.configure("logstash_format true
time_precision 3\n")
stub_elastic_ping
stub_elastic
begin
time = Fluent::EventTime.new(Time.now.to_i, 000000000)
rescue
time = Fluent::Engine.now
end
driver.emit(sample_record, time)
driver.run
assert(index_cmds[1].has_key? '@timestamp')
assert_equal(index_cmds[1]['@timestamp'], Time.at(time).iso8601(3))
end

def test_doesnt_add_tag_key_by_default
stub_elastic_ping
stub_elastic
Expand Down Expand Up @@ -931,7 +962,7 @@ def test_reconnect_on_error_enabled
stub_request(:post, "http://localhost:9200/_bulk").with do |req|
raise ZeroDivisionError, "any not host_unreachable_exceptions exception"
end

driver.configure("reconnect_on_error true\n")
driver.emit(sample_record)

Expand All @@ -955,7 +986,7 @@ def test_reconnect_on_error_disabled
stub_request(:post, "http://localhost:9200/_bulk").with do |req|
raise ZeroDivisionError, "any not host_unreachable_exceptions exception"
end

driver.configure("reconnect_on_error false\n")
driver.emit(sample_record)

Expand Down