From da35e1909210c8e12c1e2016b9bfc1665498529b Mon Sep 17 00:00:00 2001 From: abicky Date: Sun, 9 Sep 2018 21:59:14 +0900 Subject: [PATCH] Retry to insert after creating a new table to avoid reaching the retry limit cf. https://github.com/fluent/fluentd/issues/2123 --- lib/fluent/plugin/bigquery/writer.rb | 31 ++++++++++++++++++++++-- lib/fluent/plugin/out_bigquery_insert.rb | 8 +----- test/plugin/test_out_bigquery_insert.rb | 27 +++++++++++++++------ 3 files changed, 50 insertions(+), 16 deletions(-) diff --git a/lib/fluent/plugin/bigquery/writer.rb b/lib/fluent/plugin/bigquery/writer.rb index 367fc92..56fe37b 100644 --- a/lib/fluent/plugin/bigquery/writer.rb +++ b/lib/fluent/plugin/bigquery/writer.rb @@ -71,14 +71,19 @@ def fetch_schema(project, dataset, table_id) nil end - def insert_rows(project, dataset, table_id, rows, template_suffix: nil) + def insert_rows(project, dataset, table_id, rows, schema, template_suffix: nil) body = { rows: rows, skip_invalid_rows: @options[:skip_invalid_rows], ignore_unknown_values: @options[:ignore_unknown_values], } body.merge!(template_suffix: template_suffix) if template_suffix - res = client.insert_all_table_data(project, dataset, table_id, body, {}) + + if @options[:auto_create_table] + res = insert_all_table_data_with_create_table(project, dataset, table_id, body, schema) + else + res = client.insert_all_table_data(project, dataset, table_id, body, {}) + end log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size if res.insert_errors && !res.insert_errors.empty? @@ -312,6 +317,28 @@ def time_partitioning @time_partitioning end end + + def insert_all_table_data_with_create_table(project, dataset, table_id, body, schema) + try_count ||= 1 + res = client.insert_all_table_data(project, dataset, table_id, body, {}) + rescue Google::Apis::ClientError => e + if e.status_code == 404 && /Not Found: Table/i =~ e.message + if try_count == 1 + # Table Not Found: Auto Create Table + create_table(project, dataset, table_id, schema) + elsif try_count > 10 + raise "A new table was created but it is not found." + end + + # Retry to insert several times because the created table is not visible from Streaming insert for a little while + # cf. https://cloud.google.com/bigquery/troubleshooting-errors#metadata-errors-for-streaming-inserts + try_count += 1 + sleep 5 + log.debug "Retry to insert rows", project_id: project, dataset: dataset, table: table_id + retry + end + raise + end end end end diff --git a/lib/fluent/plugin/out_bigquery_insert.rb b/lib/fluent/plugin/out_bigquery_insert.rb index 2b7c4a2..db0ba48 100644 --- a/lib/fluent/plugin/out_bigquery_insert.rb +++ b/lib/fluent/plugin/out_bigquery_insert.rb @@ -96,14 +96,8 @@ def write(chunk) end def insert(project, dataset, table_id, rows, schema, template_suffix) - writer.insert_rows(project, dataset, table_id, rows, template_suffix: template_suffix) + writer.insert_rows(project, dataset, table_id, rows, schema, template_suffix: template_suffix) rescue Fluent::BigQuery::Error => e - if @auto_create_table && e.status_code == 404 && /Not Found: Table/i =~ e.message - # Table Not Found: Auto Create Table - writer.create_table(project, dataset, table_id, schema) - raise "table created. send rows next time." - end - raise if e.retryable? if @secondary diff --git a/test/plugin/test_out_bigquery_insert.rb b/test/plugin/test_out_bigquery_insert.rb index 9305db5..1e38013 100644 --- a/test/plugin/test_out_bigquery_insert.rb +++ b/test/plugin/test_out_bigquery_insert.rb @@ -121,7 +121,6 @@ def test_write driver = create_driver stub_writer do |writer| - mock.proxy(writer).insert_rows('yourproject_id', 'yourdataset_id', 'foo', [{json: hash_including(entry)}], template_suffix: nil) mock(writer.client).insert_all_table_data('yourproject_id', 'yourdataset_id', 'foo', { rows: [{json: hash_including(entry)}], skip_invalid_rows: false, @@ -346,9 +345,16 @@ def test_auto_create_table_by_bigquery_api CONFIG stub_writer do |writer| - mock(writer).insert_rows('yourproject_id', 'yourdataset_id', 'foo', [{json: Fluent::BigQuery::Helper.deep_symbolize_keys(message)}], template_suffix: nil) do - raise Fluent::BigQuery::RetryableError.new(nil, Google::Apis::ServerError.new("Not found: Table yourproject_id:yourdataset_id.foo", status_code: 404, body: "Not found: Table yourproject_id:yourdataset_id.foo")) - end + body = { + rows: [{json: Fluent::BigQuery::Helper.deep_symbolize_keys(message)}], + skip_invalid_rows: false, + ignore_unknown_values: false, + } + mock(writer.client).insert_all_table_data('yourproject_id', 'yourdataset_id', 'foo', body, {}) do + raise Google::Apis::ClientError.new("notFound: Not found: Table yourproject_id:yourdataset_id.foo", status_code: 404) + end.at_least(1) + mock(writer).sleep(instance_of(Numeric)) { nil }.at_least(1) + mock(writer).create_table('yourproject_id', 'yourdataset_id', 'foo', driver.instance.instance_variable_get(:@table_schema)) end @@ -406,9 +412,16 @@ def test_auto_create_partitioned_table_by_bigquery_api CONFIG stub_writer do |writer| - mock(writer).insert_rows('yourproject_id', 'yourdataset_id', 'foo', [message], template_suffix: nil) do - raise Fluent::BigQuery::RetryableError.new(nil, Google::Apis::ServerError.new("Not found: Table yourproject_id:yourdataset_id.foo", status_code: 404, body: "Not found: Table yourproject_id:yourdataset_id.foo")) - end + body = { + rows: [message], + skip_invalid_rows: false, + ignore_unknown_values: false, + } + mock(writer.client).insert_all_table_data('yourproject_id', 'yourdataset_id', 'foo', body, {}) do + raise Google::Apis::ClientError.new("notFound: Not found: Table yourproject_id:yourdataset_id.foo", status_code: 404) + end.at_least(1) + mock(writer).sleep(instance_of(Numeric)) { nil }.at_least(1) + mock(writer).create_table('yourproject_id', 'yourdataset_id', 'foo', driver.instance.instance_variable_get(:@table_schema)) end