Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

genererating schemas from arbitrary map[string]interface{} (parquet, avro) #1353

Open
loicalleyne opened this issue Jul 30, 2022 · 8 comments
Labels
enhancement outputs Any tasks or issues relating specifically to outputs

Comments

@loicalleyne
Copy link
Contributor

loicalleyne commented Jul 30, 2022

When reading from a source (ie. avro-ocf, parquet) where the reader outputs a map[string]interface{} it would be great if it wasn't necessary to redefine the exit schema but instead convert the input schema to the equivalent schema for the output writer.

Could this be done by iterating over the map and using type assertions to assemble the schema for different writers? And then perhaps configurable filters/regex for mapping field names+primitive type combinations to logical types (ie. field-name: (event[[:graph:]]*) type:INT64 logital-type:TIMESTAMP unit:MILLIS).

Use case brainstorm:
data stream sinks to object storage
transferring OLTP DB data to OLAP DB using federated tables (ie. BigQuery external tables)
converting from row-based to column-based format

@Jeffail
Copy link
Collaborator

Jeffail commented Jul 30, 2022

Hey @loicalleyne, there's potential here but we'd need to be very specific about how we handle a range of edge cases.

@Jeffail Jeffail added enhancement outputs Any tasks or issues relating specifically to outputs labels Jul 30, 2022
@loicalleyne
Copy link
Contributor Author

loicalleyne commented Jul 30, 2022

On a high level I'm thinking the first step is that output writer would need a new configuration option for schema where the writer will accept a schema passed by the input or processor preceding it.
For the schema itself, are the Go native types conserved in the message?

@loicalleyne
Copy link
Contributor Author

On a high level I'm thinking the first step is that output writer would need a new configuration option for schema where the writer will accept a schema passed by the input or processor preceding it.
Are the Go native types conserved in the message? In other words, do need to parse the schema for each type of input or can we rely on the output of the reader libraries to do most of the work?

@loicalleyne
Copy link
Contributor Author

I wrote a proof of concept that can take a map and output what I think is compliant YAML schema for the parquet encoder and also the beginning of a Avro schema to Parquet schema converter; looks ok for primitive types, logical type support would need some more work but as far as I can make out I don't think they're supported in the encoder yet. Would appreciate your feedback if you have time to look at it.
https://github.com/loicalleyne/map-to-parquet-schema

@mihaitodor
Copy link
Collaborator

mihaitodor commented Aug 17, 2022

Without looking at the code, just curious, how are you planning to handle cases when you need to infer the schema for something like this:

{
  "foo": [
    1,
    {"bar": {"x": "y"}},
    2,
    "z"
  ]
}

are the Go native types conserved in the message?

Yeah, the supported types are listed here: https://www.benthos.dev/docs/guides/bloblang/methods#type. Message objects have the AsStructured() method which returns an object of type interface{} containing the structured data.

@loicalleyne
Copy link
Contributor Author

As far as I know Parquet only supports a union of ["primitive_type", "null"] or ["null", "primitive_type"]
apache/parquet-format#44
In that scenario you provided my code would infer the following
- { "name":"foo","type":"BYTE_ARRAY", repeated: true }
using BYTE_ARRAY as a catch-all - I'm not sure if this works in actual practice, would have to test.

@loicalleyne
Copy link
Contributor Author

loicalleyne commented Aug 17, 2022

As an example of what gets inferred from an arbitrary map, here are the input and output examples:
Input:

var exampleMap = map[string]interface{}{
	"request": map[string]interface{}{
		"datetime":    "2021-07-27 02:59:59",
		"ip":          "172.222.233.111",
		"host":        "www.domain.com",
		"uri":         "/api/v1/",
		"request_uri": "/api/v1/",
		"referer":     "",
		"useragent":   "",
	},
	"metadata": map[string]interface{}{
		"compression_ratio": 1,
		"coroutine_uuid":    "46c45675-5c10-4094-a124-f8615f2b10db",
		"hostname":          "server.domain.com",
	},
	"incoming_request": map[string]interface{}{
		"id":                "1d25bb6a-3e09-4a8e-8b0f-93a0e9383904",
		"external_id":       "afwef",
		"direct_connection": true,
		"remote_ip":         "67.49.160.53",
	},
	"entity": map[string]interface{}{
		"id":                      39,
		"api_key":                 "1d6492bb-490e-4cb9-1d24-8wr4db8d07e7",
		"tracking_protocol":       "emulator",
		"event_proxy":             nil,
		"feature_support_enabled": true,
		"primary_currency":        "EUR",
		"rate_percent":            10,
	},
	"screen": map[string]interface{}{
		"id": 86233,
		"ids": []interface{}{
			132, 453535, 13412341,
		},
		"external_id": "com.domain:215426709",
		"width":       1080,
		"height":      1920,
	},
	"geo": map[string]interface{}{
		"ip":        "0.0.0.0",
		"country":   "NL",
		"region":    "06",
		"city":      "Noordhoek",
		"dma":       "1234",
		"zip":       "1345-A",
		"latitude":  51.958441162109375,
		"longitude": 5.8780440068664551,
	},
	"schedule": map[string]interface{}{
		"id": 18604307,
	},
	"audience": map[string]interface{}{
		"origin":    "file",
		"SOMEFLOAT": 5.255000114440918,
	},
	"provider": map[string]interface{}{
		"id":               42,
		"protocol":         "openrtb",
		"feature1_enabled": false,
		"feature2_enabled": true,
		"primary_currency": "USD",
	},
	"outgoing_request": map[string]interface{}{
		"string_array": []interface{}{
			"P0OW42XDHA",
			"KLTFJYG9FX",
		},
		"id": "9abbb401-d29a-4025-bd45-416b2ebf13e3",
	},
}

Output:

- name: screen
  optional: true
  fields:
  - { "name":"ids","type":"BYTE_ARRAY", repeated: true }
  - { "name":"external_id","type":"BYTE_ARRAY" }
  - { "name":"width","type":"INT32" }
  - { "name":"height","type":"INT32" }
  - { "name":"id","type":"INT32" }
- name: geo
  optional: true
  fields:
  - { "name":"zip","type":"BYTE_ARRAY" }
  - { "name":"latitude","type":"DOUBLE" }
  - { "name":"longitude","type":"DOUBLE" }
  - { "name":"ip","type":"BYTE_ARRAY" }
  - { "name":"country","type":"BYTE_ARRAY" }
  - { "name":"region","type":"BYTE_ARRAY" }
  - { "name":"city","type":"BYTE_ARRAY" }
  - { "name":"dma","type":"BYTE_ARRAY" }
- name: outgoing_request
  optional: true
  fields:
  - { "name":"id","type":"BYTE_ARRAY" }
  - { "name":"string_array","type":"BYTE_ARRAY", repeated: true }
- name: request
  optional: true
  fields:
  - { "name":"useragent","type":"BYTE_ARRAY" }
  - { "name":"datetime","type":"BYTE_ARRAY" }
  - { "name":"ip","type":"BYTE_ARRAY" }
  - { "name":"host","type":"BYTE_ARRAY" }
  - { "name":"uri","type":"BYTE_ARRAY" }
  - { "name":"request_uri","type":"BYTE_ARRAY" }
  - { "name":"referer","type":"BYTE_ARRAY" }
- name: metadata
  optional: true
  fields:
  - { "name":"compression_ratio","type":"INT32" }
  - { "name":"coroutine_uuid","type":"BYTE_ARRAY" }
  - { "name":"hostname","type":"BYTE_ARRAY" }
- name: incoming_request
  optional: true
  fields:
  - { "name":"direct_connection","type":"BOOLEAN" }
  - { "name":"remote_ip","type":"BYTE_ARRAY" }
  - { "name":"id","type":"BYTE_ARRAY" }
  - { "name":"external_id","type":"BYTE_ARRAY" }
- name: entity
  optional: true
  fields:
  - { "name":"event_proxy","type":"BYTE_ARRAY" }
  - { "name":"id","type":"INT32" }
  - { "name":"api_key","type":"BYTE_ARRAY" }
  - { "name":"tracking_protocol","type":"BYTE_ARRAY" }
  - { "name":"feature_support_enabled","type":"BOOLEAN" }
  - { "name":"primary_currency","type":"BYTE_ARRAY" }
  - { "name":"rate_percent","type":"INT32" }
- name: schedule
  optional: true
  fields:
  - { "name":"id","type":"INT32" }
- name: audience
  optional: true
  fields:
  - { "name":"origin","type":"BYTE_ARRAY" }
  - { "name":"SOMEFLOAT","type":"DOUBLE" }
- name: provider
  optional: true
  fields:
  - { "name":"feature1_enabled","type":"BOOLEAN" }
  - { "name":"feature2_enabled","type":"BOOLEAN" }
  - { "name":"primary_currency","type":"BYTE_ARRAY" }
  - { "name":"id","type":"INT32" }
  - { "name":"protocol","type":"BYTE_ARRAY" }

@loicalleyne
Copy link
Contributor Author

Just saw 07ed81b, I'd like to work on enabling parquet output without having to specify a schema in the config file - I was wondering:

If input messages are structured, is the structure guaranteed to stay the same across all messages from that input? Does it depend on the input source?

Is it preferable to get an AVRO or other schema from the input and pass that as metadata to be dynamically converted to parquet schema? If so, in the case where there's a Bloblang processor, would the metadata of the input be mutated to add new mappings or to change field type?

My standalone parquet YAML schema generator proof of concept tries to cover both the arbitrary map and the defined schema scenarios, which one dovetails better with the way Benthos works?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement outputs Any tasks or issues relating specifically to outputs
Projects
None yet
Development

No branches or pull requests

3 participants