Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

undefined method msgpack_each #1342

Closed
Ponyboy47 opened this issue Nov 30, 2016 · 4 comments
Closed

undefined method msgpack_each #1342

Ponyboy47 opened this issue Nov 30, 2016 · 4 comments

Comments

@Ponyboy47
Copy link

I am testing the latest fluentd version (0.14.9) when I ran into the issue. Running on CentOS 7, ruby 2.3

The company I work for is using an output plugin that worked perfectly on 0.14.8, so I'm not sure why this issue is happening in 0.14.9.

When the plugin tries to flush the output I get this error:

2016-11-30 16:05:53 -0700 [warn]: plugin/output.rb:1031:block in try_flush: failed to flush the buffer. plugin_id="object:17d8f38" .... error_class=NoMethodError error="undefined method 
`msgpack_each' for #<Fluent::Plugin::Buffer::MemoryChunk:0x000000028fca08>"

the plugin's format(tag, time, record):

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

In the write(chunk):

def write(chunk)
  data = []
  chunk.msgpack_each do |(tag, time, record)|
  # lots of other stuff
  ...
end

I saw issue 1272 that has the same error, but the version of fluentd I'm running has all the stuff from the commit that solved the issue.

Let me know what additional info you may need.

@repeatedly
Copy link
Member

What the output plugin do you use?

@Ponyboy47
Copy link
Author

This is the plugin. It was originally written for 0.10. It worked on 0.14.8 though, and I recently found the article about upgrading plugins for the 0.14 API and was making sure it will be compatible with v1.0 when it gets released. I haven't pushed my changes yet, but I hadn't made any changes to the way the plugin works either so the source on GitHub should be enough I think.

@Ponyboy47
Copy link
Author

So I think I figured it out. I needed to override #formatted_to_msgpack_binary to return true. After doing that it works properly

@mohit5540
Copy link

@Ponyboy47
I am using Fluentd Clickhouse plugin.

# -*- encoding: utf-8 -*-

module Fluent
  class ClickHouseOutput < Fluent::BufferedOutput
    Fluent::Plugin.register_output('clickhouse', self)

    include Fluent::SetTimeKeyMixin
    include Fluent::SetTagKeyMixin

    config_param :host,     :string, :default => 'localhost'
    config_param :port,     :string, :default => 1982
    config_param :urls,     :string, :default => nil
    config_param :database, :string, :default => "default"
    config_param :username, :string, :default => nil
    config_param :password, :string, :default => nil, :secret => true

    config_param :table, :string, :default => nil
    config_param :columns, :string, :default => nil

	logval = []

    attr_accessor :handler
    attr_accessor :path

    def initialize
      super
      require 'clickhouse'
    end

    unless method_defined?(:log)
      define_method("log") { $log }
    end

    def configure(conf)
      super
      if conf['columns']
        @columns = conf['columns'].split(",").map{|m| m.strip }
      end
      @config = {}
      if conf['username']
        @config['username'] = conf['username']
      end
      if conf['password']
        @config['password'] = conf['password']
      end

      if conf['urls']
        @urls = @urls.split(",").map{|m| m.strip }
      elsif conf['host']
        @config['host'] = conf['host']
      end
      if conf['port']
        @config['port'] = conf['port']
      end
    end

    def start
      super
      # init
    end

    def shutdown
      super
      # destroy
    end

    def format(tag, time, record)
      [tag, time, record].to_msgpack      
    end

    def write(chunk)
      Clickhouse.establish_connection(@config)
      Clickhouse.connection.insert_rows(@table, :names => @columns) { |rows|
        chunk.msgpack_each { |tag, time, record|
          rows << @columns.map{|m| record[m] }
        }
        rows
      }
    end
  end
end

I am getting following error :

[db_clickhouse] got unrecoverable error in primary and no secondary error_class=NoMethodError error="undefined method `[]' for nil:NilClass"
2018-08-16 21:16:55 +0530 [warn]: #0 [db_clickhouse] bad chunk is moved to /tmp/fluent/backup/worker0/db_clickhouse/5738f56711a9a7f3a575ca2945974db2.log

this seems to be msgpack related error. any idea on this ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants