Skip to content

Commit

Permalink
# This is a combination of 2 commits.
Browse files Browse the repository at this point in the history
# This is the 1st commit message:

# This is a combination of 3 commits.
# This is the 1st commit message:
# This is a combination of 3 commits.
# This is the 1st commit message:
Out S3 change

so that user can use minimal policy at AWS, to work with fluentd
like S3:PutObject

# This is the commit message fluent#2:

Spacing handled in out-S3 plugin

# This is the commit message fluent#3:

Identation modified

# This is the commit message fluent#4:

Introduce check_bucket paramater
To skip the using of auto_create_bucket for bucket existence checking

# This is the commit message fluent#5:

Fixes:

Test cases added.
README has been modified with check_bucket's usage and sample AWS policy.

# This is the commit message fluent#6:

Fixes:

Issues for extra spoaces. Modified

# This is the commit message fluent#7:

Spacing issues

# This is the commit message fluent#8:

Update test_out_s3.rb

# This is the commit message fluent#9:

time slicer modified for utc also

# This is the commit message fluent#10:

removing test cases

# This is the commit message fluent#1:

renaming time_slicer variable

# This is the commit message fluent#2:

Test casees added

Test cases for out_s3 plugin has been added. This feature will work with hardened
S3 policy like with only PutObject permission. As it will not check bucket existence
not even object existence, as for each object creation it will name it with timestamp.
So anyhow it will be unique everytime.

Only thing it willl assume that bucket is created already.

# This is the commit message fluent#3:

Some variable renamed

# This is the commit message fluent#2:

Clean up dependencies

# This is the commit message fluent#3:

Update README for placeholders

# This is the commit message fluent#2:

v0.8.0.rc1
  • Loading branch information
Chiranjit Datta authored and chillaxd committed Dec 19, 2016
1 parent 410158b commit af5d40a
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 40 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
source "http://rubygems.org"

gem 'json', '= 1.8.3'
gem 'uuidtools'
gem 'fluentd', '= 0.14.8'

gemspec
43 changes: 32 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Simply use RubyGems:
**aws_key_id**

AWS access key id. This parameter is required when your agent is not
running on EC2 instance with an IAM Role. When using an IAM role, make
running on EC2 instance with an IAM Role. When using an IAM role, make
sure to configure `instance_profile_credentials`. Usage can be found below.

**aws_sec_key**
Expand Down Expand Up @@ -85,8 +85,9 @@ The format of S3 object keys. You can use several built-in variables:
* %{time_slice}
* %{index}
* %{file_extension}
* %{uuid_flush}
* %{hex_random}
* %{uuid_flush}
* %{hostname}

to decide keys dynamically.

Expand All @@ -98,7 +99,8 @@ the sequential number starts from 0, increments when multiple files are
uploaded to S3 in the same time slice.
* %{file_extention} is always "gz" for
now.
* %{uuid_flush} a uuid that is replaced everytime the buffer will be flushed
* %{uuid_flush} a uuid that is replaced everytime the buffer will be flushed. If you want to use this placeholder, install `uuidtools` gem first.
* %{hostname} is replaced with `Socket.gethostname` result. This is same as "#{Socket.gethostname}".
* %{hex_random} a random hex string that is replaced for each buffer chunk, not
assured to be unique. This is used to follow a way of peformance tuning, `Add
a Hex Hash Prefix to Key Name`, written in [Request Rate and Performance
Expand Down Expand Up @@ -130,14 +132,6 @@ You get:
"log/events/ts=20130111-23/events_1.gz"
"log/events/ts=20130112-00/events_0.gz"

The
[fluent-mixin-config-placeholders](https://github.com/tagomoris/fluent-mixin-config-placeholders) mixin is also incorporated, so additional variables such
as %{hostname}, %{uuid}, etc. can be used in the s3_object_key_format. This
could prove useful in preventing filename conflicts when writing from multiple
servers.

s3_object_key_format %{path}/events/ts=%{time_slice}/events_%{index}-%{hostname}.%{file_extension}

**force_path_style**

:force_path_style (Boolean) — default: false — When set to true, the
Expand Down Expand Up @@ -215,6 +209,33 @@ You can change key name by "message_key" option.

Create S3 bucket if it does not exists. Default is true.

**check_bucket**

Check bucket at the time of ensure_bucket. Default is true.

When it is false,
fluentd will not check aws s3 for the existence of the mentioned bucket. This is the
case where bucket will be pre-created before running fluentd.

**check_object**

Check object before creation if it exists or not. Default is true.

When it is false,
s3_object_key_format will be %{path}%{date_slice}_%{time_slice}.%{file_extension}
where, time_slice will be in hhmmss format, so that each object will be unique.
Example object name, assuming it is created on 2016/16/11 3:30:54 PM
20161611_153054.txt (extension can be anything as per user's choice)

**Example when check_bucket=false and check_object=false**
When the mentioned configuration will be made, fluentd will work with the
minimum IAM poilcy, like:
"Statement": [{
"Effect": "Allow",
"Action": "s3:PutObject",
"Resource": ["*"]
}]

**check_apikey_on_start**

Check AWS key on start. Default is true.
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.2
0.8.0.rc1
2 changes: 0 additions & 2 deletions fluent-plugin-s3.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ Gem::Specification.new do |gem|

gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
gem.add_dependency "aws-sdk", [">= 2.3.22", "< 3"]
gem.add_dependency "yajl-ruby", "~> 1.0"
gem.add_dependency "fluent-mixin-config-placeholders", ">= 0.3.0"
gem.add_development_dependency "rake", ">= 0.9.2"
gem.add_development_dependency "rr", "= 1.1.2"
gem.add_development_dependency "test-unit", ">= 3.0.8"
Expand Down
65 changes: 42 additions & 23 deletions lib/fluent/plugin/out_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ def desc(description)
config_param :hex_random_length, :integer, :default => 4
desc "Overwrite already existing path"
config_param :overwrite, :bool, :default => false
desc "Check bucket before creation"
config_param :check_bucket, :bool, :default => true
desc "Check object before creation"
config_param :check_object, :bool, :default => true
desc "Specifies the AWS KMS key ID to use for object encryption"
config_param :ssekms_key_id, :string, :default => nil, :secret => true
desc "Specifies the algorithm to use to when encrypting the object"
Expand Down Expand Up @@ -148,10 +152,12 @@ def configure(conf)
@path_slicer = Proc.new {|path|
Time.now.strftime(path)
}
@hms_slicer = Time.now.strftime("%H%M%S")
else
@path_slicer = Proc.new {|path|
Time.now.utc.strftime(path)
}
@hms_slicer = Time.now.utc.strftime("%H%M%S")
end

if @hex_random_length > MAX_HEX_RANDOM_LENGTH
Expand Down Expand Up @@ -179,7 +185,7 @@ def start
@bucket = @s3.bucket(@s3_bucket)

check_apikeys if @check_apikey_on_start
ensure_bucket
ensure_bucket if @check_bucket

super
end
Expand All @@ -192,35 +198,48 @@ def write(chunk)
i = 0
previous_path = nil

begin
path = @path_slicer.call(@path)
if @check_object
begin
path = @path_slicer.call(@path)

@values_for_s3_object_chunk[chunk.unique_id] ||= {
"hex_random" => hex_random(chunk),
}
values_for_s3_object_key = {
"path" => path,
"time_slice" => chunk.key,
"file_extension" => @compressor.ext,
"index" => i,
"uuid_flush" => uuid_random,
}.merge!(@values_for_s3_object_chunk[chunk.unique_id])

s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
values_for_s3_object_key[expr[2...expr.size-1]]
}
if (i > 0) && (s3path == previous_path)
if @overwrite
log.warn "#{s3path} already exists, but will overwrite"
break
else
raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
end
end

@values_for_s3_object_chunk[chunk.unique_id] ||= {
"hex_random" => hex_random(chunk),
}
i += 1
previous_path = s3path
end while @bucket.object(s3path).exists?
else
@s3_object_key_format = "%{path}/%{time_slice}_%{hms_slice}.%{file_extension}"
values_for_s3_object_key = {
"path" => path,
"path" => @path_slicer.call(@path),
"time_slice" => chunk.key,
"file_extension" => @compressor.ext,
"index" => i,
"uuid_flush" => uuid_random,
}.merge!(@values_for_s3_object_chunk[chunk.unique_id])

"hms_slice" => @hms_slicer,
}
s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
values_for_s3_object_key[expr[2...expr.size-1]]
}
if (i > 0) && (s3path == previous_path)
if @overwrite
log.warn "#{s3path} already exists, but will overwrite"
break
else
raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
end
end

i += 1
previous_path = s3path
end while @bucket.object(s3path).exists?
end

tmp = Tempfile.new("s3-")
tmp.binmode
Expand Down
55 changes: 52 additions & 3 deletions test/test_out_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def test_configure
assert_equal false, d.instance.force_path_style
assert_equal nil, d.instance.compute_checksums
assert_equal nil, d.instance.signature_version
assert_equal true, d.instance.check_bucket
assert_equal true, d.instance.check_object
end

def test_s3_endpoint_with_valid_endpoint
Expand Down Expand Up @@ -118,6 +120,14 @@ def test_configure_with_hex_random_length
create_driver(conf + "\nhex_random_length 16\n")
end
end

def test_configure_with_no_check_on_s3
conf = CONFIG.clone
conf << "\ncheck_bucket false\ncheck_object false\n"
d = create_driver(conf)
assert_equal false, d.instance.check_bucket
assert_equal false, d.instance.check_object
end

def test_path_slicing
config = CONFIG.clone.gsub(/path\slog/, "path log/%Y/%m/%d")
Expand All @@ -137,6 +147,14 @@ def test_path_slicing_utc
slice = path_slicer.call(path)
assert_equal slice, Time.now.utc.strftime("log/%Y/%m/%d")
end

def test_hms_slicing
assert_equal '131415', Time.parse("2011-01-02 13:14:15").strftime("%H%M%S")
end

def test_hms_slicing_utc
assert_equal '131415', Time.parse("2011-01-02 13:14:15 UTC").strftime("%H%M%S")
end

def test_format
d = create_driver
Expand Down Expand Up @@ -272,6 +290,36 @@ def check_apikeys
end.configure(conf)
d
end

def test_write_with_hardened_s3_policy
# Partial mock the S3Bucket, not to make an actual connection to Amazon S3
setup_mocks(true, false)
s3_local_file_path = "/tmp/s3-test.txt"
# @s3_object_key_format will be hard_coded with timestamp only,
# as in this case, it will not check for object existence, not even bucker existence
# check_bukcet and check_object both of this config parameter should be false
# @s3_object_key_format = "%{path}/%{time_slice}_%{hms_slice}.%{file_extension}"
s3path = "log/20110102_131415.gz"
setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: s3path, check_object: false)

# We must use TimeSlicedOutputTestDriver instead of BufferedOutputTestDriver,
# to make assertions on chunks' keys
d = create_time_sliced_driver

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
d.emit({"a"=>1}, time)
d.emit({"a"=>2}, time)

# Finally, the instance of S3Output is initialized and then invoked
d.run
Zlib::GzipReader.open(s3_local_file_path) do |gz|
data = gz.read
assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] +
%[2011-01-02T13:14:15Z\ttest\t{"a":2}\n],
data
end
FileUtils.rm_f(s3_local_file_path)
end

def test_write_with_custom_s3_object_key_format
# Partial mock the S3Bucket, not to make an actual connection to Amazon S3
Expand Down Expand Up @@ -364,14 +412,14 @@ def test_write_with_custom_s3_object_key_format_containing_hex_random_placeholde
FileUtils.rm_f(s3_local_file_path)
end

def setup_mocks(exists_return = false)
def setup_mocks(exists_return = false, check_bucket = true)
@s3_client = stub(Aws::S3::Client.new(:stub_responses => true))
mock(Aws::S3::Client).new(anything).at_least(0) { @s3_client }
@s3_resource = mock(Aws::S3::Resource.new(:client => @s3_client))
mock(Aws::S3::Resource).new(:client => @s3_client) { @s3_resource }
@s3_bucket = mock(Aws::S3::Bucket.new(:name => "test",
:client => @s3_client))
@s3_bucket.exists? { exists_return }
@s3_bucket.exists? { exists_return } if check_bucket
@s3_object = mock(Aws::S3::Object.new(:bucket_name => "test_bucket",
:key => "test",
:client => @s3_client))
Expand All @@ -382,12 +430,13 @@ def setup_mocks(exists_return = false)
def setup_s3_object_mocks(params = {})
s3path = params[:s3path] || "log/events/ts=20110102-13/events_0-testing.node.local.gz"
s3_local_file_path = params[:s3_local_file_path] || "/tmp/s3-test.txt"
check_s3_object = params[:check_object] || true

# Assert content of event logs which are being sent to S3
s3obj = stub(Aws::S3::Object.new(:bucket_name => "test_bucket",
:key => "test",
:client => @s3_client))
s3obj.exists? { false }
s3obj.exists? { false } if check_s3_object

tempfile = File.new(s3_local_file_path, "w")
stub(Tempfile).new("s3-") { tempfile }
Expand Down

0 comments on commit af5d40a

Please sign in to comment.