Skip to content

Commit

Permalink
Merge pull request #900 from cosmo0920/add-suspend-option-to-in_dummy…
Browse files Browse the repository at this point in the history
…-plugin

Add suspend option to in dummy plugin
  • Loading branch information
tagomoris authored Aug 8, 2016
2 parents c8f5b7a + dcdebeb commit a97ef73
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 3 deletions.
11 changes: 9 additions & 2 deletions lib/fluent/plugin/in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ class DummyInput < Input
helpers :thread, :storage

BIN_NUM = 10
DEFAULT_STORAGE_TYPE = 'local'

desc "The value is the tag assigned to the generated events."
config_param :tag, :string
desc "It configures how many events to generate per second."
config_param :rate, :integer, default: 1
desc "If specified, each generated event has an auto-incremented key field."
config_param :auto_increment_key, :string, default: nil
desc "The boolean to suspend-and-resume incremental value after restart"
config_param :suspend, :bool, default: false
desc "The dummy data to be generated. An array of JSON hashes or a single JSON hash."
config_param :dummy, default: [{"message"=>"dummy"}] do |val|
begin
Expand All @@ -58,12 +61,16 @@ def initialize
def configure(conf)
super
@dummy_index = 0
config = conf.elements.select{|e| e.name == 'storage' }.first
@storage = storage_create(usage: 'suspend', conf: config, default_type: DEFAULT_STORAGE_TYPE)
end

def start
super

@storage = storage_create(type: 'local')
@storage.put(:increment_value, 0) unless @storage.get(:increment_value)
@storage.put(:dummy_index, 0) unless @storage.get(:dummy_index)

if @auto_increment_key && !@storage.get(:auto_increment_value)
@storage.put(:auto_increment_value, -1)
end
Expand Down Expand Up @@ -100,7 +107,7 @@ def generate
d = @dummy[@dummy_index]
unless d
@dummy_index = 0
d = @dummy[0]
d = @dummy[@dummy_index]
end
@dummy_index += 1
if @auto_increment_key
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/storage_local.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def configure(conf)
raise Fluent::ConfigError, "Unexpected error: failed to read data from plugin storage file: '#{@path}'"
end
else
raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{@path}'" unless File.writable?(@path)
raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{dir}'" unless File.writable?(dir)
end
end
end
Expand Down
95 changes: 95 additions & 0 deletions test/plugin/test_in_dummy.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require_relative '../helper'
require 'fluent/test/driver/input'
require 'fluent/plugin/in_dummy'
require 'fileutils'

class DummyTest < Test::Unit::TestCase
def setup
Expand Down Expand Up @@ -90,4 +91,98 @@ def create_driver(conf)
end
end
end

TEST_PLUGIN_STORAGE_PATH = File.join( File.dirname(File.dirname(__FILE__)), 'tmp', 'in_dummy', 'store' )
FileUtils.mkdir_p TEST_PLUGIN_STORAGE_PATH

sub_test_case "doesn't suspend internal counters in default" do
config1 = {
'tag' => 'dummy',
'rate' => '2',
'dummy' => '[{"x": 1, "y": "1"}, {"x": 2, "y": "2"}, {"x": 3, "y": "3"}]',
'auto_increment_key' => 'id',
'suspend' => false,
}
conf1 = config_element('ROOT', '', config1, [])
test "value of auto increment key is not suspended after stop-and-start" do
assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json'))

d1 = create_driver(conf1)
d1.run(timeout: 0.5) do
d1.instance.emit(4)
end

first_id1 = d1.events.first[2]['id']
assert_equal 0, first_id1

last_id1 = d1.events.last[2]['id']
assert { last_id1 > 0 }

assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json'))

d2 = create_driver(conf1)
d2.run(timeout: 0.5) do
d2.instance.emit(4)
end

first_id2 = d2.events.first[2]['id']
assert_equal 0, first_id2

assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json'))
end
end

sub_test_case "suspend internal counters if suspend is true" do
setup do
FileUtils.rm_rf(TEST_PLUGIN_STORAGE_PATH)
FileUtils.mkdir_p(File.join(TEST_PLUGIN_STORAGE_PATH, 'json'))
FileUtils.chmod_R(0755, File.join(TEST_PLUGIN_STORAGE_PATH, 'json'))
end

config2 = {
'@id' => 'test-02',
'tag' => 'dummy',
'rate' => '2',
'dummy' => '[{"x": 1, "y": "1"}, {"x": 2, "y": "2"}, {"x": 3, "y": "3"}]',
'auto_increment_key' => 'id',
'suspend' => true,
}
conf2 = config_element('ROOT', '', config2, [
config_element(
'storage', '',
{'@type' => 'local',
'@id' => 'test-02',
'path' => File.join(TEST_PLUGIN_STORAGE_PATH,
'json', 'test-02.json'),
'persistent' => true,
})
])
test "value of auto increment key is suspended after stop-and-start" do
assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-02.json'))

d1 = create_driver(conf2)
d1.run(timeout: 0.5) do
d1.instance.emit(4)
end

first_id1 = d1.events.first[2]['id']
assert_equal 0, first_id1

last_id1 = d1.events.last[2]['id']
assert { last_id1 > 0 }

assert File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-02.json'))

d2 = create_driver(conf2)
d2.run(timeout: 0.5) do
d2.instance.emit(4)
end
d2.events

first_id2 = d2.events.first[2]['id']
assert_equal last_id1 + 1, first_id2

assert File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-02.json'))
end
end
end

0 comments on commit a97ef73

Please sign in to comment.