Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling DynamoDB "unprocessed_items" from batch_write_item is too difficult #815

Closed
smeyfroi opened this issue May 14, 2015 · 9 comments
Closed

Comments

@smeyfroi
Copy link

DynamoDB batch_write_item API can fail and return "unprocessed_items". The docs say that the format of this returned data can be used in a subsequent batch_write_item, but that's not quite true. You can't simply do something like client.batch_write_item(request_items: response.unprocessed_items) because the response is not a hash as expected in the params.

What I've ended up doing is something like

client.batch_write_item(request_items: unprocessed_items_to_request_items(response.unprocessed_items))

where that helper method munges the response structure like this (deep_transform_keys comes from ActiveSupport):

    def self.unprocessed_items_to_request_items(unprocessed_items)
      unprocessed_items.as_json.deep_transform_keys do |key|
        if %w{ item put_request n s b ss ns bs m l null bool delete_request }.include? key
          key.to_sym
        else
          key
        end
      end
    end

(This is required because the table_name key to the hash supplied to put_request needs to be a string. There may be other cases that I've not encountered yet.)

It would be quite good not to require dirty hacks like this, if possible. :-)

(This is SDKv2 by the way)

@trevorrowe
Copy link
Member

I did the following and it worked as expected:

items = (1..25).map do |n|
  {
    put_request: {
      item: {
        "id" => "value#{n}"
      }
    }
  }
end

unprocessed_items = nil
100.times do
  # the target table I created with 1 write capacity units to ensure I will be throttled on batch write
  r = dynamodb.batch_write_item(request_items: { "aws-sdk-slow": items })
  if r.unprocessed_items.count > 0
    unprocessed_items = r.unprocessed_items
    break
  end
end

dynamodb.batch_write_item(request_items: unprocessed_items)

Can you share an example of what you are doing that requires the customization above?

@smeyfroi
Copy link
Author

Hmmm that's odd. OK here's the error, which includes output from the code below (excuse the mess: you can see the working version in the stuff that's commented out):

    # Note AWS API limit of 25 puts per request so we batch the put
    # and add a delay between batches
    # TODO: limit of 1MB per request in total not enforced here
    def self.put_items(table_name, items)
      __start_time = Time.now if DEBUG
      in_batches_of_25(items) do |items_batch, last_iteration|
        put_requests = items_batch.map do |item|
          {
            put_request: {
              item: item
            }
          }
        end
        request_items = {
          table_name => put_requests
        }
        loop do
          response = client.batch_write_item(request_items: request_items)
          sleep NICENESS_PAUSE unless last_iteration

          break if response.unprocessed_items.size == 0
request_items = response.unprocessed_items
#          request_items = unprocessed_items_to_request_items(response.unprocessed_items)
Rails.logger.info("request_items")
Rails.logger.info(request_items)

          sleep NICENESS_PAUSE  * 2  # sleep again: we're not managing to store all items
        end
      end
    ensure
      ::Rails.logger.debug("put_items #{table_name} (#{items.length}): #{(Time.now() - __start_time)*1000}ms") if DEBUG
    end

#    def self.unprocessed_items_to_request_items(unprocessed_items)
#      unprocessed_items.as_json.deep_transform_keys do |key|
#        if %w{ item put_request n s b ss ns bs m l null bool delete_request }.include? key
#          key.to_sym
#        else
#          key
#        end
#      end
#    end
2015-05-15T08:03:48.914Z 23049 TID-ox45wrunc CreateHistoricGroupMetricsWorker JID-25a46a9675cc7fc03ec44445 INFO: request_items
2015-05-15T08:03:48.914Z 23049 TID-ox45wrunc CreateHistoricGroupMetricsWorker JID-25a46a9675cc7fc03ec44445 INFO: {"test_cc_group_metrics"=>[#<struct put_request=#<struct item={"timestamp"=>#<struct s=nil, n="1431406800000.0", b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "_class"=>#<struct s="GroupMetric", n=nil, b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "interval"=>#<struct s=nil, n="900", b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "group_id"=>#<struct s=nil, n="73", b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "group_id_interval"=>#<struct s="73:900", n=nil, b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "organisation_id"=>#<struct s=nil, n="4", b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "config_resource_statistics"=>#<struct s="{\"1\":{\"probe\":{\"l\":{\"Connect\":{\"Unit\":\"ms\",\"Avg\":58},\"FirstByte\":{\"Unit\":\"ms\",\"Avg\":216},\"Transfer\":{\"Unit\":\"ms\",\"Avg\":1},\"Total\":{\"Unit\":\"ms\",\"Avg\":275}},\"h\":{\"Unit\":\"%\",\"Avg\":99.979},\"u\":{\"Unit\":\"%\",\"Avg\":100.0},\"s\":{\"2XX\":{\"Unit\":\"\",\"Avg\":null},\"3XX\":{\"Unit\":\"\",\"Avg\":null},\"4XX\":{\"Unit\":\"\",\"Avg\":null},\"5XX\":{\"Unit\":\"\",\"Avg\":null}}}}}", n=nil, b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>}>, delete_request=nil>, #<struct put_request=#<struct item={"timestamp"=>#<struct s=nil, n="1431407700000.0", b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "_class"=>#<struct s="GroupMetric", n=nil, b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "interval"=>#<struct s=nil, n="900", b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "group_id"=>#<struct s=nil, n="73", b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "group_id_interval"=>#<struct s="73:900", n=nil, b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "organisation_id"=>#<struct s=nil, n="4", b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "config_resource_statistics"=>#<struct s="{\"1\":{\"probe\":{\"l\":{\"Connect\":{\"Unit\":\"ms\",\"Avg\":61},\"FirstByte\":{\"Unit\":\"ms\",\"Avg\":224},\"Transfer\":{\"Unit\":\"ms\",\"Avg\":0},\"Total\":{\"Unit\":\"ms\",\"Avg\":285}},\"h\":{\"Unit\":\"%\",\"Avg\":99.983},\"u\":{\"Unit\":\"%\",\"Avg\":100.0},\"s\":{\"2XX\":{\"Unit\":\"\",\"Avg\":null},\"3XX\":{\"Unit\":\"\",\"Avg\":null},\"4XX\":{\"Unit\":\"\",\"Avg\":null},\"5XX\":{\"Unit\":\"\",\"Avg\":null}}}}}", n=nil, b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>}>, delete_request=nil>, #<struct put_request=#<struct item={"timestamp"=>#<struct s=nil, n="1431408600000.0", b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "_class"=>#<struct s="GroupMetric", n=nil, b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "interval"=>#<struct s=nil, n="900", b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "group_id"=>#<struct s=nil, n="73", b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "group_id_interval"=>#<struct s="73:900", n=nil, b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "organisation_id"=>#<struct s=nil, n="4", b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>, "config_resource_statistics"=>#<struct s="{\"1\":{\"probe\":{\"l\":{\"Connect\":{\"Unit\":\"ms\",\"Avg\":56},\"FirstByte\":{\"Unit\":\"ms\",\"Avg\":228},\"Transfer\":{\"Unit\":\"ms\",\"Avg\":0},\"Total\":{\"Unit\":\"ms\",\"Avg\":284}},\"h\":{\"Unit\":\"%\",\"Avg\":99.98699999999999},\"u\":{\"Unit\":\"%\",\"Avg\":100.0},\"s\":{\"2XX\":{\"Unit\":\"\",\"Avg\":null},\"3XX\":{\"Unit\":\"\",\"Avg\":null},\"4XX\":{\"Unit\":\"\",\"Avg\":null},\"5XX\":{\"Unit\":\"\",\"Avg\":null}}}}}", n=nil, b=nil, ss=nil, ns=nil, bs=nil, m=nil, l=nil, null=nil, bool=nil>}>, delete_request=nil>]}
2015-05-15T08:03:49.931Z 23049 TID-ox45wrunc CreateHistoricGroupMetricsWorker JID-25a46a9675cc7fc03ec44445 ERROR: parameter validator found 3 errors:
  - expected params[:request_items]["test_cc_group_metrics"][0] to be a hash
  - expected params[:request_items]["test_cc_group_metrics"][1] to be a hash
  - expected params[:request_items]["test_cc_group_metrics"][2] to be a hash
2015-05-15T08:03:49.931Z 23049 TID-ox45wrunc CreateHistoricGroupMetricsWorker JID-25a46a9675cc7fc03ec44445 ERROR: parameter validator found 3 errors:
  - expected params[:request_items]["test_cc_group_metrics"][0] to be a hash
  - expected params[:request_items]["test_cc_group_metrics"][1] to be a hash
  - expected params[:request_items]["test_cc_group_metrics"][2] to be a hash
2015-05-15T08:03:49.931Z 23049 TID-ox45wrunc CreateHistoricGroupMetricsWorker JID-25a46a9675cc7fc03ec44445 ERROR: /geome/releases/93ebf9aea0a4de93b442d41170a42a23877ec91e/vendor/bundle/ruby/2.2.0/gems/aws-sdk-core-2.0.41/lib/seahorse/client/param_validator.rb:24:in `validate!'
/geome/releases/93ebf9aea0a4de93b442d41170a42a23877ec91e/vendor/bundle/ruby/2.2.0/gems/aws-sdk-core-2.0.41/lib/seahorse/client/param_validator.rb:9:in `validate!'
/geome/releases/93ebf9aea0a4de93b442d41170a42a23877ec91e/vendor/bundle/ruby/2.2.0/gems/aws-sdk-core-2.0.41/lib/seahorse/client/plugins/param_validation.rb:21:in `call'
/geome/releases/93ebf9aea0a4de93b442d41170a42a23877ec91e/vendor/bundle/ruby/2.2.0/gems/aws-sdk-core-2.0.41/lib/seahorse/client/plugins/raise_response_errors.rb:14:in `call'
/geome/releases/93ebf9aea0a4de93b442d41170a42a23877ec91e/vendor/bundle/ruby/2.2.0/gems/aws-sdk-core-2.0.41/lib/aws-sdk-core/plugins/response_paging.rb:10:in `call'
/geome/releases/93ebf9aea0a4de93b442d41170a42a23877ec91e/vendor/bundle/ruby/2.2.0/gems/aws-sdk-core-2.0.41/lib/seahorse/client/plugins/response_target.rb:18:in `call'
/geome/releases/93ebf9aea0a4de93b442d41170a42a23877ec91e/vendor/bundle/ruby/2.2.0/gems/aws-sdk-core-2.0.41/lib/seahorse/client/request.rb:70:in `send_request'
/geome/releases/93ebf9aea0a4de93b442d41170a42a23877ec91e/vendor/bundle/ruby/2.2.0/gems/aws-sdk-core-2.0.41/lib/seahorse/client/base.rb:216:in `block (2 levels) in define_operation_methods'
/geome/releases/93ebf9aea0a4de93b442d41170a42a23877ec91e/lib/dynamodb/driver.rb:73:in `block (2 levels) in put_items'
/geome/releases/93ebf9aea0a4de93b442d41170a42a23877ec91e/lib/dynamodb/driver.rb:72:in `loop'
/geome/releases/93ebf9aea0a4de93b442d41170a42a23877ec91e/lib/dynamodb/driver.rb:72:in `block in put_items'
/geome/releases/93ebf9aea0a4de93b442d41170a42a23877ec91e/lib/dynamodb/driver.rb:52:in `block in in_batches_of_25'

@trevorrowe
Copy link
Member

Can you pretty-print or inspect the response.unprocessed_items in your loop once so that I can see what they look like?

@smeyfroi
Copy link
Author

It's in the log output above: you have to scroll to the right a bit. Let me know if that's enough?

@trevorrowe
Copy link
Member

Yup, thats what I was looking for. I didn't notice the wide scroll. Have you potentially disabled parameter conversion when you construct the client?

Also, as a work-around, this should perform the nested conversion you require without having to white-list specific keys for the string maps.

request_items = response.data.to_h[:unprocessed_items]

I want to stress that this should not be required and if there is a bug when param conversion is disabled, then I'll want to address this.

@smeyfroi
Copy link
Author

Ah, my config is:

    convert_params: false,
    simple_attributes: false,
    simple_json: false,

Mostly because that was the easiest way to migrate from the v1 SDK if I remember right.

I'll try that workaround out: looks way better than my hack, even if it's an interim thing. :-)

@trevorrowe
Copy link
Member

I've been able now to reproduce the issue you are experiencing by disabling param conversion. If you re-enable the :convert_params (this is the default behavior) then your code should work without the hash conversion.

Param conversion allows you to pass in a wider set of input values. For example, it allows you to pass in strings for dates and it will attempt to parse then into Time objects when necessary. When you disable param conversion, you are required to supply the proper type, e.g. Time. In this case, the param conversion is what was converting the structs down to simple hashes.

That said, the param validator should be updated to work with structs natively without conversion. I'll take a look at this. For now, my recommendation would be to set convert_params to true and you should be good.

trevorrowe added a commit that referenced this issue May 15, 2015
Resolved an issue where structure parameter values, such as those
returned in responses, could not be serialized as input parameters
when parameter conversion is disabled.

This fix resolves the issue by changing the serializes to all enumerate
structure/hash values in a consistent manor, using #each_pair. Also
added a few minor helpers to `Aws::Structure` making it quack more
like a Hash object.

Fixes #815
@trevorrowe
Copy link
Member

The commit above is available in the version-2.1-release branch. This will be merged onto master and tagged as v2.1.0. I'm not sure when this will happen. There are few other missing features for the 2.1 milestone.

The fix makes it possible to round-trip the response data structures as input without parameter conversion. Until then, enabling parameter conversion or calling #to_h on the response data will work around the issue.

Thank you for reporting the issue.

@smeyfroi
Copy link
Author

Fantastic, thanks for that. (I'll take a look at my use of the param conversion a bit later this week, so thanks for the pointer there too.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants