From 5522ae6ffce076dd508446da6b6f3bce80584b05 Mon Sep 17 00:00:00 2001 From: Azeem Sajid Date: Tue, 9 Jun 2020 16:26:36 +0500 Subject: [PATCH] Initial commit. --- .gitignore | 56 +++++ Gemfile | 3 + LICENSE | 202 +++++++++++++++++ README.md | 61 ++++++ Rakefile | 13 ++ fluent-plugin-protobuf-http.gemspec | 28 +++ lib/fluent/plugin/in_protobuf_http.rb | 299 ++++++++++++++++++++++++++ test/helper.rb | 8 + test/plugin/test_in_protobuf_http.rb | 18 ++ 9 files changed, 688 insertions(+) create mode 100644 .gitignore create mode 100644 Gemfile create mode 100644 LICENSE create mode 100644 README.md create mode 100644 Rakefile create mode 100644 fluent-plugin-protobuf-http.gemspec create mode 100644 lib/fluent/plugin/in_protobuf_http.rb create mode 100644 test/helper.rb create mode 100644 test/plugin/test_in_protobuf_http.rb diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e3200e0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,56 @@ +*.gem +*.rbc +/.config +/coverage/ +/InstalledFiles +/pkg/ +/spec/reports/ +/spec/examples.txt +/test/tmp/ +/test/version_tmp/ +/tmp/ + +# Used by dotenv library to load environment variables. +# .env + +# Ignore Byebug command history file. +.byebug_history + +## Specific to RubyMotion: +.dat* +.repl_history +build/ +*.bridgesupport +build-iPhoneOS/ +build-iPhoneSimulator/ + +## Specific to RubyMotion (use of CocoaPods): +# +# We recommend against adding the Pods directory to your .gitignore. However +# you should judge for yourself, the pros and cons are mentioned at: +# https://guides.cocoapods.org/using/using-cocoapods.html#should-i-check-the-pods-directory-into-source-control +# +# vendor/Pods/ + +## Documentation cache and generated files: +/.yardoc/ +/_yardoc/ +/doc/ +/rdoc/ + +## Environment normalization: +/.bundle/ +/vendor/bundle +/lib/bundler/man/ + +# for a library or gem, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# Gemfile.lock +# .ruby-version +# .ruby-gemset + +# unless supporting rvm < 1.11.0 or doing something fancy, ignore this: +.rvmrc + +# Used by RuboCop. Remote config files pulled in from inherit_from directive. +# .rubocop-https?--* diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..fa75df1 --- /dev/null +++ b/Gemfile @@ -0,0 +1,3 @@ +source 'https://rubygems.org' + +gemspec diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..9a19e4e --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2020 Azeem Sajid + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..a9bacb3 --- /dev/null +++ b/README.md @@ -0,0 +1,61 @@ +# fluent-plugin-protobuf-http + +[Fluentd](https://fluentd.org/) HTTP input plugin for Protocol Buffers. + +## Installation + +### RubyGems + +``` +$ gem install fluent-plugin-protobuf-http +``` + +### Bundler + +Add following line to your Gemfile: +```ruby +gem "fluent-plugin-protobuf-http" +``` + +And then execute: +``` +$ bundle +``` + +## Configuration + +* **bind** (string) (optional): The address to listen to. + * Default value: `0.0.0.0`. +* **port** (integer) (optional): The port to listen to. + * Default value: `8080`. +* **proto_dir** (string) (required): The directory path that contains the .proto files. +* **in_mode** (enum) (optional): The mode of incoming (supported) events. + * Available values: binary, json + * Default value: `binary`. +* **out_mode** (enum) (optional): The mode of outgoing (emitted) events. + * Available values: binary, json + * Default value: `binary`. +* **tag** (string) (required): The tag for the event. + +### Example + +``` + + @type protobuf_http + @id protobuf_http_input + + bind 0.0.0.0 + port 8080 + tag debug.test + + proto_dir ~/fluent/protos + in_mode binary + out_mode json + +``` + +## Copyright + +* Copyright(c) 2020 [Azeem Sajid](https://www.linkedin.com/in/az33msajid/) +* License + * Apache License, Version 2.0 diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..7229d36 --- /dev/null +++ b/Rakefile @@ -0,0 +1,13 @@ +require 'bundler' +Bundler::GemHelper.install_tasks + +require 'rake/testtask' + +Rake::TestTask.new(:test) do |t| + t.libs.push('lib', 'test') + t.test_files = FileList['test/**/test_*.rb'] + t.verbose = true + t.warning = true +end + +task default: [:test] diff --git a/fluent-plugin-protobuf-http.gemspec b/fluent-plugin-protobuf-http.gemspec new file mode 100644 index 0000000..feee5bd --- /dev/null +++ b/fluent-plugin-protobuf-http.gemspec @@ -0,0 +1,28 @@ +lib = File.expand_path('../lib', __dir__) +$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) + +Gem::Specification.new do |spec| + spec.name = 'fluent-plugin-protobuf-http' + spec.version = '0.1.0' + spec.authors = ['Azeem Sajid'] + spec.email = ['azeem.sajid@gmail.com'] + + spec.summary = 'fluentd HTTP Input Plugin for Protocol Buffers' + spec.description = 'fluentd HTTP Input Plugin for Protocol Buffers with Single and Batch Messages Support]' + spec.homepage = 'https://github.com/iamAzeem/fluent-plugin-protobuf-http' + spec.license = 'Apache-2.0' + + test_files, files = `git ls-files -z`.split("\x0").partition do |f| + f.match(%r{^(test|spec|features)/}) + end + spec.files = files + spec.executables = files.grep(%r{^bin/}) { |f| File.basename(f) } + spec.test_files = test_files + spec.require_paths = ['lib'] + + spec.add_development_dependency 'bundler', '~> 1.14' + spec.add_development_dependency 'rake', '~> 12.0' + spec.add_development_dependency 'test-unit', '~> 3.0' + spec.add_runtime_dependency 'fluentd', ['>= 0.14.10', '< 2'] + spec.add_runtime_dependency 'google-protobuf', '~> 3.12', '>= 3.12.2' +end diff --git a/lib/fluent/plugin/in_protobuf_http.rb b/lib/fluent/plugin/in_protobuf_http.rb new file mode 100644 index 0000000..05cc648 --- /dev/null +++ b/lib/fluent/plugin/in_protobuf_http.rb @@ -0,0 +1,299 @@ +# +# Copyright 2020 Azeem Sajid +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'fluent/plugin/input' +require 'fluent/config/error' +require 'fluent/plugin_helper/http_server' +require 'webrick/httputils' +require 'json' + +module Fluent + module Plugin + class ProtobufHttpInput < Fluent::Plugin::Input + Fluent::Plugin.register_input('protobuf_http', self) + + helpers :http_server, :event_emitter + + desc 'The address to listen to.' + config_param :bind, :string, default: '0.0.0.0' + desc 'The port to listen to.' + config_param :port, :integer, default: 8080 + + desc 'The directory path that contains the .proto files.' + config_param :proto_dir, :string + + desc 'The mode of incoming (supported) events.' + config_param :in_mode, :enum, list: %i[binary json], default: :binary + desc 'The mode of outgoing (emitted) events.' + config_param :out_mode, :enum, list: %i[binary json], default: :binary + + desc 'The tag for the event.' + config_param :tag, :string + + def initialize + super + + @protos = [] # list of *.proto files + @compiled_protos = [] # list of compiled protos i.e. *_pb.rb files + @msgclass_lookup = {} # Lookup Hash: { msgtype => msgclass } + end + + def compile_protos + log.debug("Checking proto_dir [#{@proto_dir}]...") + + path = File.expand_path(@proto_dir).freeze + raise Fluent::ConfigError, "protos_dir does not exist! [#{path}]" unless Dir.exist?(path) + + @protos = Dir["#{path}/*.proto"].freeze + raise Fluent::ConfigError, "Empty proto_dir! [#{path}]" unless @protos.any? + + log.info("Compiling .proto files [#{@protos.length}]...") + + `protoc --ruby_out=#{path} --proto_path=#{path} #{path}/*.proto` + raise Fluent::ConfigError, 'Could not compile! See error(s) above.' unless $?.success? + + log.info("Compiled successfully:\n- #{@protos.join("\n- ")}") + + @protos.each do |proto| + @compiled_protos.push(get_compiled_proto(proto)) + end + + log.info("Compiled .proto files:\n- #{@compiled_protos.join("\n- ")}") + end + + def get_compiled_proto(proto) + proto_suffix = '.proto'.freeze + compiled_proto_suffix = '_pb.rb'.freeze + + compiled_proto = proto.chomp(proto_suffix) + compiled_proto_suffix + raise Fluent::ConfigError, "Compiled proto not found! [#{compiled_proto}]" unless File.file?(compiled_proto) + + compiled_proto + end + + def populate_msgclass_lookup + @compiled_protos.each do |compiled_proto| + msg_types = get_msg_types(compiled_proto) + next unless msg_types.any? + + begin + require compiled_proto + rescue LoadError => e + raise Fluent::ConfigError, "Possible 'import' issue! Use a single self-contianed .proto file! #{e}" + end + + msg_types.each do |msg_type| + @msgclass_lookup[msg_type] = get_msg_class(msg_type) + end + end + + raise Fluent::ConfigError, "No message types found! Check proto_dir [#{@proto_dir}]!" if @msgclass_lookup.empty? + + log.info("Registered messages [#{@msgclass_lookup.keys.length}]:\n- #{@msgclass_lookup.keys.join("\n- ")}") + end + + def get_msg_types(compiled_proto) + log.debug("Extracting message types [#{compiled_proto}]...") + msg_types = [] + File.foreach(compiled_proto) do |line| + if line.lstrip.start_with?('add_message') + msg_type = line[/"([^"]*)"/, 1].freeze # regex: 'msg_type' + msg_types.push(msg_type) unless msg_type.nil? + end + end + + if msg_types.any? + log.info("Total [#{msg_types.length}] message types in [#{compiled_proto}]:\n- #{msg_types.join("\n- ")}") + else + log.warn("No message types found! [#{compiled_proto}]") + end + + msg_types + end + + def get_msg_class(msg_type) + msg = Google::Protobuf::DescriptorPool.generated_pool.lookup(msg_type) + raise Fluent::ConfigError, "Message type ['#{msg_type}'] not registered!'" if msg.nil? + + msg.msgclass + end + + def start + super + + compile_protos + populate_msgclass_lookup + + log.info("Starting protobuf server [#{@bind}:#{@port}]...") + + http_server_create_http_server(:protobuf_server, addr: @bind, port: @port, logger: log) do |server| + server.post("/#{tag}") do |req| + peeraddr = "#{req.peeraddr[2]}:#{req.peeraddr[1]}".freeze # ip:port + serialized_msg = req.body.freeze + + log.info("[R] {#{@in_mode}} [#{peeraddr}, size: #{serialized_msg.length} bytes]") + log.debug("Dumping serialized message [#{serialized_msg.length} bytes]:\n#{serialized_msg}") + + content_type = req.header['content-type'][0] + + unless valid_content_type?(content_type) + status = "Invalid 'Content-Type' header! [#{content_type}]".freeze + log.warn("[X] Message rejected! [#{peeraddr}] #{status}") + next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json] + end + + log.debug("[>] Content-Type: #{content_type}") + + msgtype, batch = get_query_params(req.query_string) + unless @msgclass_lookup.key?(msgtype) + status = "Invalid 'msgtype' in 'query_string'! [#{msgtype}]".freeze + log.warn("[X] Message rejected! [#{peeraddr}] #{status}") + next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json] + end + + log.debug("[>] Query parameters: [msgtype: #{content_type}, batch: #{batch}]") + + deserialized_msg = deserialize_msg(msgtype, serialized_msg) + + if deserialized_msg.nil? + status = "Incompatible message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]".freeze + log.warn("[X] Message rejected! [#{peeraddr}] #{status}") + next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json] + end + + is_batch = !batch.nil? && batch == 'true' + log.debug("[>] Message validated! [msgtype: #{content_type}, is_batch: #{is_batch}]") + + # Log single message + + unless is_batch + log.info("[S] {#{@in_mode}} [#{peeraddr}, msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]") + + time = Fluent::Engine.now + event_msg = serialize_msg(msgtype, deserialized_msg) + record = { 'message' => event_msg } + router.emit(@tag, time, record) + + log.info("[S] {#{@out_mode}} [#{peeraddr}, msgtype: #{msgtype}, size: #{event_msg.length} bytes]") + next [200, { 'Content-Type' => 'text/plain' }, nil] + end + + # Log batch messages + + log.info("[B] {#{@in_mode}} [#{peeraddr}, msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]") + + if deserialized_msg.type.nil? || deserialized_msg.batch.nil? || deserialized_msg.batch.empty? + status = "Invalid 'batch' message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]".freeze + log.warn("[X] Message rejected! [#{peeraddr}] #{status}") + next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json] + end + + batch_type = deserialized_msg.type + batch_msgs = deserialized_msg.batch + batch_size = batch_msgs.length + + log.info("[B] Emitting message stream/batch [batch_size: #{batch_size} messages]...") + + stream = MultiEventStream.new + batch_msgs.each do |batch_msg| + time = Fluent::Engine.now + record = { 'message' => serialize_msg(batch_type, batch_msg) } + stream.add(time, record) + end + + router.emit_stream(@tag, stream) + + status = "Batch received! [batch_type: #{batch_type}, batch_size: #{batch_size} messages]".freeze + log.info("[B] {#{@out_mode}} [#{peeraddr}, msgtype: #{msgtype}] #{status}") + [200, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json] + end + end + end + + def valid_content_type?(content_type) + hdr_binary = 'application/octet-stream'.freeze + hdr_json = 'application/json'.freeze + + case @in_mode + when :binary + content_type == hdr_binary + when :json + content_type == hdr_json + when :binary_and_json + content_type == hdr_binary || content_type == hdr_json + end + end + + def get_query_params(query_string) + if query_string.nil? + log.warn("Empty query string! 'msgtype' is required!") + return nil + end + + query = WEBrick::HTTPUtils.parse_query(query_string) + msgtype = query['msgtype'] + log.warn("'msgtype' not found in 'query_string' [#{query_string}]") if msgtype.nil? + + batch = query['batch'] + log.warn("'batch' not found in 'query_string' [#{query_string}]") if batch.nil? + + [msgtype, batch] + end + + def deserialize_msg(msgtype, serialized_msg) + msgclass = @msgclass_lookup[msgtype] + log.debug("Deserializing {#{@in_mode}} message of type [#{msgclass}]...") + begin + case @in_mode + when :binary + msgclass.decode(serialized_msg) + when :json + msgclass.decode_json(serialized_msg) + end + rescue Google::Protobuf::ParseError => e + log.error("Incompatible message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes] #{e}") + nil + rescue => e + log.error("Deserializaton failed! Error: #{e}") + nil + end + end + + def serialize_msg(msgtype, deserialized_msg) + msgclass = @msgclass_lookup[msgtype] + log.debug("Serializing [#{@in_mode} > #{@out_mode}]...") + begin + case @out_mode + when :binary + msgclass.encode(deserialized_msg) + when :json + msgclass.encode_json(deserialized_msg) + end + rescue => e + log.error("Serialization failed! [msgtype: #{msgtype}, msg: #{deserialized_msg}] Error: #{e}") + nil + end + end + + def shutdown + @compiled_protos.each do |compiled_proto| + File.delete(compiled_proto) + end + + super + end + end + end +end diff --git a/test/helper.rb b/test/helper.rb new file mode 100644 index 0000000..1562063 --- /dev/null +++ b/test/helper.rb @@ -0,0 +1,8 @@ +$LOAD_PATH.unshift(File.expand_path("../../", __FILE__)) +require "test-unit" +require "fluent/test" +require "fluent/test/driver/input" +require "fluent/test/helpers" + +Test::Unit::TestCase.include(Fluent::Test::Helpers) +Test::Unit::TestCase.extend(Fluent::Test::Helpers) diff --git a/test/plugin/test_in_protobuf_http.rb b/test/plugin/test_in_protobuf_http.rb new file mode 100644 index 0000000..8b946d3 --- /dev/null +++ b/test/plugin/test_in_protobuf_http.rb @@ -0,0 +1,18 @@ +require "helper" +require "fluent/plugin/in_protobuf_http.rb" + +class ProtobufHttpInputTest < Test::Unit::TestCase + setup do + Fluent::Test.setup + end + + test "failure" do + flunk + end + + private + + def create_driver(conf) + Fluent::Test::Driver::Input.new(Fluent::Plugin::ProtobufHttpInput).configure(conf) + end +end