Skip to content

Commit

Permalink
Merge pull request #1423 from fluent/support-tls-by-server-plugin-helper
Browse files Browse the repository at this point in the history
Support TLS by socket/server plugin helper
  • Loading branch information
tagomoris authored Jan 30, 2017
2 parents 7a10f03 + 9a011dd commit c95fa22
Show file tree
Hide file tree
Showing 15 changed files with 1,628 additions and 117 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: ruby
cache: bundler

# script: bundle exec rake test TESTOPTS=-v
script: bundle exec rake test TESTOPTS=-v

# http://rubies.travis-ci.org/
# See here for osx_image -> OSX versions: https://docs.travis-ci.com/user/languages/objective-c
Expand Down Expand Up @@ -43,6 +43,7 @@ branches:
- v0.14

sudo: false
dist: trusty # for TLSv1.2 support

addons:
apt:
Expand Down
14 changes: 14 additions & 0 deletions example/in_forward_tls.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<source>
@type forward
port 24224
<transport tls>
insecure true
</transport>
</source>

<match test>
@type stdout
# <buffer>
# flush_interval 10s
# </buffer>
</match>
18 changes: 18 additions & 0 deletions example/out_forward_tls.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<source>
@type dummy
tag test
</source>

<match test>
@type forward
transport tls
tls_insecure_mode true
<server>
# first server
host 127.0.0.1
port 24224
</server>
<buffer>
flush_interval 0
</buffer>
</match>
4 changes: 4 additions & 0 deletions lib/fluent/config/section.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def [](key)
@params[key.to_sym]
end

def []=(key, value)
@params[key.to_sym] = value
end

def respond_to?(symbol, include_all=false)
case symbol
when :inspect, :nil?, :to_h, :+, :instance_of?, :kind_of?, :[], :respond_to?, :respond_to_missing?
Expand Down
36 changes: 31 additions & 5 deletions lib/fluent/configurable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,8 @@ def initialize
end
end

def configure(conf)
@config = conf

logger = self.respond_to?(:log) ? log : (defined?($log) ? $log : nil)
def configure_proxy_generate
proxy = self.class.merged_configure_proxy
conf.corresponding_proxies << proxy

if self.respond_to?(:owner) && self.owner
owner_proxy = owner.class.merged_configure_proxy
Expand All @@ -63,6 +59,36 @@ def configure(conf)
proxy.overwrite_defaults(owner_proxy) if owner_proxy
end

proxy
end

def configured_section_create(name, conf = nil)
conf ||= Fluent::Config::Element.new(name.to_s, '', {}, [])
root_proxy = configure_proxy_generate
proxy = if name.nil? # root
root_proxy
else
root_proxy.sections[name]
end
# take care to raise Fluent::ConfigError if conf mismatched to proxy
Fluent::Config::SectionGenerator.generate(proxy, conf, nil, nil)
end

def configure(conf)
@config = conf

logger = if self.respond_to?(:log)
self.log
elsif self.respond_to?(:owner) && self.owner.respond_to?(:log)
self.owner.log
elsif defined?($log)
$log
else
nil
end
proxy = configure_proxy_generate
conf.corresponding_proxies << proxy

# In the nested section, can't get plugin class through proxies so get plugin class here
plugin_class = Fluent::Plugin.lookup_type_from_class(proxy.name.to_s)
root = Fluent::Config::SectionGenerator.generate(proxy, conf, logger, plugin_class)
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def start

shared_socket = system_config.workers > 1

log.info "listening a tcp port", port: @port, bind: @bind
log.info "listening port", port: @port, bind: @bind
server_create_connection(
:in_forward_server, @port,
bind: @bind,
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def start
super

@buffer = ''
server_create(:in_tcp_server, @port, proto: :tcp, bind: @bind) do |data, conn|
server_create(:in_tcp_server, @port, bind: @bind) do |data, conn|
@buffer << data
begin
pos = 0
Expand Down
96 changes: 81 additions & 15 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,17 @@ class ConnectionClosedError < Error; end

LISTEN_PORT = 24224

desc 'The transport protocol.'
config_param :transport, :enum, list: [:tcp, :tls], default: :tcp
# TODO: TLS session cache/tickets
# TODO: Connection keepalive

desc 'The timeout time when sending event logs.'
config_param :send_timeout, :time, default: 60
# TODO: add linger_timeout, recv_timeout

desc 'The transport protocol to use for heartbeats.(udp,tcp,none)'
config_param :heartbeat_type, :enum, list: [:tcp, :udp, :none], default: :tcp
desc 'The protocol to use for heartbeats (default is the same with "transport").'
config_param :heartbeat_type, :enum, list: [:transport, :tcp, :udp, :none], default: :transport
desc 'The interval of the heartbeat packer.'
config_param :heartbeat_interval, :time, default: 1
desc 'The wait time before accepting a server fault recovery.'
Expand Down Expand Up @@ -75,6 +80,19 @@ class ConnectionClosedError < Error; end
desc 'Compress buffered data.'
config_param :compress, :enum, list: [:text, :gzip], default: :text

desc 'The default version of TLS transport.'
config_param :tls_version, :enum, list: Fluent::PluginHelper::Socket::TLS_SUPPORTED_VERSIONS, default: Fluent::PluginHelper::Socket::TLS_DEFAULT_VERSION
desc 'The cipher configuration of TLS transport.'
config_param :tls_ciphers, :string, default: Fluent::PluginHelper::Socket::CIPHERS_DEFAULT
desc 'Skip all verification of certificates or not.'
config_param :tls_insecure_mode, :bool, default: false
desc 'Allow self signed certificates or not.'
config_param :tls_allow_self_signed_cert, :bool, default: false
desc 'Verify hostname of servers and certificates or not in TLS transport.'
config_param :tls_verify_hostname, :bool, default: true
desc 'The additional CA certificate path for TLS.'
config_param :tls_cert_path, :array, value_type: :string, default: nil

config_section :security, required: false, multi: false do
desc 'The hostname'
config_param :self_hostname, :string
Expand All @@ -85,7 +103,7 @@ class ConnectionClosedError < Error; end
config_section :server, param_name: :servers do
desc "The IP address or host name of the server."
config_param :host, :string
desc "The name of the server. Used in log messages."
desc "The name of the server. Used for logging and certificate verification in TLS transport (when host is address)."
config_param :name, :string, default: nil
desc "The port number of the host."
config_param :port, :integer, default: LISTEN_PORT
Expand Down Expand Up @@ -136,9 +154,29 @@ def configure(conf)
@read_interval = @read_interval_msec / 1000.0
@recover_sample_size = @recover_wait / @heartbeat_interval

if @heartbeat_type == :tcp
log.warn "'heartbeat_type tcp' is deprecated. use 'transport' instead."
@heartbeat_type = :transport
end

if @dns_round_robin
if @heartbeat_type == :udp
raise Fluent::ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option"
raise Fluent::ConfigError, "forward output heartbeat type must be 'transport' or 'none' to use dns_round_robin option"
end
end

if @transport == :tls
if @tls_cert_path && !@tls_cert_path.empty?
@tls_cert_path.each do |path|
raise Fluent::ConfigError, "specified cert path does not exist:#{path}" unless File.exist?(path)
raise Fluent::ConfigError, "specified cert path is not readable:#{path}" unless File.readable?(path)
end
end

if @tls_insecure_mode
log.warn "TLS transport is configured in insecure way"
@tls_verify_hostname = false
@tls_allow_self_signed_cert = true
end
end

Expand Down Expand Up @@ -275,14 +313,34 @@ def select_a_healthy_node
raise NoNodesAvailable, "no nodes are available"
end

def create_transfer_socket(host, port, &block)
socket_create_tcp(
host, port,
linger_timeout: @send_timeout,
send_timeout: @send_timeout,
recv_timeout: @ack_response_timeout,
&block
)
def create_transfer_socket(host, port, hostname, &block)
case @transport
when :tls
socket_create_tls(
host, port,
version: @tls_version,
ciphers: @tls_ciphers,
insecure: @tls_insecure_mode,
verify_fqdn: @tls_verify_hostname,
fqdn: hostname,
allow_self_signed_cert: @tls_allow_self_signed_cert,
cert_paths: @tls_cert_path,
linger_timeout: @send_timeout,
send_timeout: @send_timeout,
recv_timeout: @ack_response_timeout,
&block
)
when :tcp
socket_create_tcp(
host, port,
linger_timeout: @send_timeout,
send_timeout: @send_timeout,
recv_timeout: @ack_response_timeout,
&block
)
else
raise "BUG: unknown transport protocol #{@transport}"
end
end

# MessagePack FixArray length is 3
Expand Down Expand Up @@ -458,6 +516,14 @@ def initialize(sender, server, failure:)
@available = true
@state = nil

# @hostname is used for certificate verification & TLS SNI
host_is_hostname = !(IPAddr.new(@host) rescue false)
@hostname = case
when host_is_hostname then @host
when @name then @name
else nil
end

@usock = nil

@username = server.username
Expand Down Expand Up @@ -557,7 +623,7 @@ def send_data_actual(sock, tag, chunk)
end

def send_data(tag, chunk)
sock = @sender.create_transfer_socket(resolved_host, port)
sock = @sender.create_transfer_socket(resolved_host, port, @hostname)
begin
send_data_actual(sock, tag, chunk)
rescue
Expand Down Expand Up @@ -589,8 +655,8 @@ def send_heartbeat
end

case @sender.heartbeat_type
when :tcp
@sender.create_transfer_socket(dest_addr, port) do |sock|
when :transport
@sender.create_transfer_socket(dest_addr, port, @hostname) do |sock|
## don't send any data to not cause a compatibility problem
# sock.write FORWARD_TCP_HEARTBEAT_DATA

Expand Down
Loading

0 comments on commit c95fa22

Please sign in to comment.