Skip to content

Commit

Permalink
Create a new table via insert_job in create_load_job
Browse files Browse the repository at this point in the history
to avoid reaching the retry limit

cf. fluent/fluentd#2123
  • Loading branch information
abicky committed Sep 9, 2018
1 parent f53a897 commit c2f9ea0
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 46 deletions.
51 changes: 23 additions & 28 deletions lib/fluent/plugin/bigquery/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,7 @@ def create_table(project, dataset, table_id, record_schema)
}
}

if @options[:time_partitioning_type]
definition[:time_partitioning] = {
type: @options[:time_partitioning_type].to_s.upcase,
field: @options[:time_partitioning_field] ? @options[:time_partitioning_field].to_s : nil,
expiration_ms: @options[:time_partitioning_expiration] ? @options[:time_partitioning_expiration] * 1000 : nil
}.select { |_, value| !value.nil? }
end
definition.merge!(time_partitioning: time_partitioning) if time_partitioning
client.insert_table(project, dataset, definition, {})
log.debug "create table", project_id: project, dataset: dataset, table: table_id
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
Expand Down Expand Up @@ -132,9 +126,6 @@ def create_load_job(chunk_id, chunk_id_hex, project, dataset, table_id, upload_s
dataset_id: dataset,
table_id: table_id,
},
schema: {
fields: fields.to_a,
},
write_disposition: "WRITE_APPEND",
source_format: source_format,
ignore_unknown_values: @options[:ignore_unknown_values],
Expand All @@ -144,17 +135,18 @@ def create_load_job(chunk_id, chunk_id_hex, project, dataset, table_id, upload_s
}

job_id = create_job_id(chunk_id_hex, dataset, table_id, fields.to_a) if @options[:prevent_duplicate_load]
configuration[:configuration][:load].merge!(create_disposition: "CREATE_NEVER") if @options[:time_partitioning_type]
configuration.merge!({job_reference: {project_id: project, job_id: job_id}}) if job_id

# If target table is already exist, omit schema configuration.
# Because schema changing is easier.
begin
if client.get_table(project, dataset, table_id)
configuration[:configuration][:load].delete(:schema)
# Check table existance
client.get_table(project, dataset, table_id)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 404 && /Not Found: Table/i =~ e.message
raise Fluent::BigQuery::UnRetryableError.new("Table is not found") unless @options[:auto_create_table]
raise Fluent::BigQuery::UnRetryableError.new("Schema is empty") if fields.empty?
configuration[:configuration][:load].merge!(schema: {fields: fields.to_a})
configuration[:configuration][:load].merge!(time_partitioning: time_partitioning) if time_partitioning
end
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError
raise Fluent::BigQuery::UnRetryableError.new("Schema is empty") if fields.empty?
end

res = client.insert_job(
Expand All @@ -170,17 +162,6 @@ def create_load_job(chunk_id, chunk_id_hex, project, dataset, table_id, upload_s
reason = e.respond_to?(:reason) ? e.reason : nil
log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason

if @options[:auto_create_table] && e.status_code == 404 && /Not Found: Table/i =~ e.message
# Table Not Found: Auto Create Table
create_table(
project,
dataset,
table_id,
fields,
)
raise "table created. send rows next time."
end

if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job
return JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, job_id)
end
Expand Down Expand Up @@ -317,6 +298,20 @@ def source_format
"NEWLINE_DELIMITED_JSON"
end
end

def time_partitioning
return @time_partitioning if instance_variable_defined?(:@time_partitioning)

if @options[:time_partitioning_type]
@time_partitioning = {
type: @options[:time_partitioning_type].to_s.upcase,
field: @options[:time_partitioning_field] ? @options[:time_partitioning_field].to_s : nil,
expiration_ms: @options[:time_partitioning_expiration] ? @options[:time_partitioning_expiration] * 1000 : nil
}.compact
else
@time_partitioning
end
end
end
end
end
74 changes: 56 additions & 18 deletions test/plugin/test_out_bigquery_load.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ def stub_writer(stub_auth: true)
writer
end
end

def test_write
schema_fields = Fluent::BigQuery::Helper.deep_symbolize_keys(MultiJson.load(File.read(SCHEMA_PATH)))

def test_write
response_stub = stub!

driver = create_driver
Expand All @@ -60,9 +58,6 @@ def test_write
dataset_id: 'yourdataset_id',
table_id: 'foo',
},
schema: {
fields: schema_fields,
},
write_disposition: "WRITE_APPEND",
source_format: "NEWLINE_DELIMITED_JSON",
ignore_unknown_values: false,
Expand Down Expand Up @@ -99,7 +94,6 @@ def test_write_with_prevent_duplicate_load
schema_path #{SCHEMA_PATH}
prevent_duplicate_load true
CONFIG
schema_fields = Fluent::BigQuery::Helper.deep_symbolize_keys(MultiJson.load(File.read(SCHEMA_PATH)))

response_stub = stub!
stub_writer do |writer|
Expand All @@ -116,9 +110,6 @@ def test_write_with_prevent_duplicate_load
dataset_id: 'yourdataset_id',
table_id: 'foo',
},
schema: {
fields: schema_fields,
},
write_disposition: "WRITE_APPEND",
source_format: "NEWLINE_DELIMITED_JSON",
ignore_unknown_values: false,
Expand All @@ -138,7 +129,6 @@ def test_write_with_prevent_duplicate_load

def test_write_with_retryable_error
driver = create_driver
schema_fields = Fluent::BigQuery::Helper.deep_symbolize_keys(MultiJson.load(File.read(SCHEMA_PATH)))

driver.instance_start
tag, time, record = "tag", Time.now.to_i, {"a" => "b"}
Expand All @@ -158,9 +148,6 @@ def test_write_with_retryable_error
dataset_id: 'yourdataset_id',
table_id: 'foo',
},
schema: {
fields: schema_fields,
},
write_disposition: "WRITE_APPEND",
source_format: "NEWLINE_DELIMITED_JSON",
ignore_unknown_values: false,
Expand Down Expand Up @@ -225,7 +212,6 @@ def test_write_with_not_retryable_error
utc
</secondary>
CONFIG
schema_fields = Fluent::BigQuery::Helper.deep_symbolize_keys(MultiJson.load(File.read(SCHEMA_PATH)))

driver.instance_start
tag, time, record = "tag", Time.now.to_i, {"a" => "b"}
Expand All @@ -245,9 +231,6 @@ def test_write_with_not_retryable_error
dataset_id: 'yourdataset_id',
table_id: 'foo',
},
schema: {
fields: schema_fields,
},
write_disposition: "WRITE_APPEND",
source_format: "NEWLINE_DELIMITED_JSON",
ignore_unknown_values: false,
Expand Down Expand Up @@ -289,6 +272,61 @@ def test_write_with_not_retryable_error
driver.instance_shutdown
end

def test_write_with_auto_create_table
driver = create_driver(<<-CONFIG)
table foo
email [email protected]
private_key_path /path/to/key
project yourproject_id
dataset yourdataset_id
<buffer>
@type memory
</buffer>
<inject>
time_format %s
time_key time
</inject>
auto_create_table true
schema_path #{SCHEMA_PATH}
CONFIG

schema_fields = Fluent::BigQuery::Helper.deep_symbolize_keys(MultiJson.load(File.read(SCHEMA_PATH)))

stub_writer do |writer|
mock(writer.client).get_table('yourproject_id', 'yourdataset_id', 'foo') do
raise Google::Apis::ClientError.new("notFound: Not found: Table yourproject_id:yourdataset_id.foo", status_code: 404)
end

mock(writer.client).insert_job('yourproject_id', {
configuration: {
load: {
destination_table: {
project_id: 'yourproject_id',
dataset_id: 'yourdataset_id',
table_id: 'foo',
},
write_disposition: "WRITE_APPEND",
source_format: "NEWLINE_DELIMITED_JSON",
ignore_unknown_values: false,
max_bad_records: 0,
schema: {
fields: schema_fields,
},
}
}
}, {upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream"}) do
stub!.job_reference.stub!.job_id { "dummy_job_id" }
end
end

driver.run do
driver.feed("tag", Time.now.to_i, {"a" => "b"})
end
end

private

def create_response_stub(response)
Expand Down

0 comments on commit c2f9ea0

Please sign in to comment.