Skip to content

Commit

Permalink
Retry to insert after creating a new table
Browse files Browse the repository at this point in the history
to avoid reaching the retry limit

cf. fluent/fluentd#2123
  • Loading branch information
abicky committed Sep 9, 2018
1 parent 39df63c commit da35e19
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 16 deletions.
31 changes: 29 additions & 2 deletions lib/fluent/plugin/bigquery/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,19 @@ def fetch_schema(project, dataset, table_id)
nil
end

def insert_rows(project, dataset, table_id, rows, template_suffix: nil)
def insert_rows(project, dataset, table_id, rows, schema, template_suffix: nil)
body = {
rows: rows,
skip_invalid_rows: @options[:skip_invalid_rows],
ignore_unknown_values: @options[:ignore_unknown_values],
}
body.merge!(template_suffix: template_suffix) if template_suffix
res = client.insert_all_table_data(project, dataset, table_id, body, {})

if @options[:auto_create_table]
res = insert_all_table_data_with_create_table(project, dataset, table_id, body, schema)
else
res = client.insert_all_table_data(project, dataset, table_id, body, {})
end
log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size

if res.insert_errors && !res.insert_errors.empty?
Expand Down Expand Up @@ -312,6 +317,28 @@ def time_partitioning
@time_partitioning
end
end

def insert_all_table_data_with_create_table(project, dataset, table_id, body, schema)
try_count ||= 1
res = client.insert_all_table_data(project, dataset, table_id, body, {})
rescue Google::Apis::ClientError => e
if e.status_code == 404 && /Not Found: Table/i =~ e.message
if try_count == 1
# Table Not Found: Auto Create Table
create_table(project, dataset, table_id, schema)
elsif try_count > 10
raise "A new table was created but it is not found."
end

# Retry to insert several times because the created table is not visible from Streaming insert for a little while
# cf. https://cloud.google.com/bigquery/troubleshooting-errors#metadata-errors-for-streaming-inserts
try_count += 1
sleep 5
log.debug "Retry to insert rows", project_id: project, dataset: dataset, table: table_id
retry
end
raise
end
end
end
end
8 changes: 1 addition & 7 deletions lib/fluent/plugin/out_bigquery_insert.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,8 @@ def write(chunk)
end

def insert(project, dataset, table_id, rows, schema, template_suffix)
writer.insert_rows(project, dataset, table_id, rows, template_suffix: template_suffix)
writer.insert_rows(project, dataset, table_id, rows, schema, template_suffix: template_suffix)
rescue Fluent::BigQuery::Error => e
if @auto_create_table && e.status_code == 404 && /Not Found: Table/i =~ e.message
# Table Not Found: Auto Create Table
writer.create_table(project, dataset, table_id, schema)
raise "table created. send rows next time."
end

raise if e.retryable?

if @secondary
Expand Down
27 changes: 20 additions & 7 deletions test/plugin/test_out_bigquery_insert.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def test_write
driver = create_driver

stub_writer do |writer|
mock.proxy(writer).insert_rows('yourproject_id', 'yourdataset_id', 'foo', [{json: hash_including(entry)}], template_suffix: nil)
mock(writer.client).insert_all_table_data('yourproject_id', 'yourdataset_id', 'foo', {
rows: [{json: hash_including(entry)}],
skip_invalid_rows: false,
Expand Down Expand Up @@ -346,9 +345,16 @@ def test_auto_create_table_by_bigquery_api
CONFIG

stub_writer do |writer|
mock(writer).insert_rows('yourproject_id', 'yourdataset_id', 'foo', [{json: Fluent::BigQuery::Helper.deep_symbolize_keys(message)}], template_suffix: nil) do
raise Fluent::BigQuery::RetryableError.new(nil, Google::Apis::ServerError.new("Not found: Table yourproject_id:yourdataset_id.foo", status_code: 404, body: "Not found: Table yourproject_id:yourdataset_id.foo"))
end
body = {
rows: [{json: Fluent::BigQuery::Helper.deep_symbolize_keys(message)}],
skip_invalid_rows: false,
ignore_unknown_values: false,
}
mock(writer.client).insert_all_table_data('yourproject_id', 'yourdataset_id', 'foo', body, {}) do
raise Google::Apis::ClientError.new("notFound: Not found: Table yourproject_id:yourdataset_id.foo", status_code: 404)
end.at_least(1)
mock(writer).sleep(instance_of(Numeric)) { nil }.at_least(1)

mock(writer).create_table('yourproject_id', 'yourdataset_id', 'foo', driver.instance.instance_variable_get(:@table_schema))
end

Expand Down Expand Up @@ -406,9 +412,16 @@ def test_auto_create_partitioned_table_by_bigquery_api
CONFIG

stub_writer do |writer|
mock(writer).insert_rows('yourproject_id', 'yourdataset_id', 'foo', [message], template_suffix: nil) do
raise Fluent::BigQuery::RetryableError.new(nil, Google::Apis::ServerError.new("Not found: Table yourproject_id:yourdataset_id.foo", status_code: 404, body: "Not found: Table yourproject_id:yourdataset_id.foo"))
end
body = {
rows: [message],
skip_invalid_rows: false,
ignore_unknown_values: false,
}
mock(writer.client).insert_all_table_data('yourproject_id', 'yourdataset_id', 'foo', body, {}) do
raise Google::Apis::ClientError.new("notFound: Not found: Table yourproject_id:yourdataset_id.foo", status_code: 404)
end.at_least(1)
mock(writer).sleep(instance_of(Numeric)) { nil }.at_least(1)

mock(writer).create_table('yourproject_id', 'yourdataset_id', 'foo', driver.instance.instance_variable_get(:@table_schema))
end

Expand Down

0 comments on commit da35e19

Please sign in to comment.