From 13b5e10412af75651c2ba288f53f2f0a4527e152 Mon Sep 17 00:00:00 2001 From: Roman Atachiants Date: Tue, 12 May 2020 09:53:35 +0800 Subject: [PATCH] Fixing big query writer (#21) --- go.sum | 2 ++ internal/column/computed.go | 2 +- internal/column/computed_test.go | 2 +- internal/presto/columns.go | 11 +++++------ internal/presto/columns_test.go | 4 ++-- internal/storage/writer/bigquery/bigquery.go | 8 +++++++- internal/storage/writer/writer.go | 7 ++++++- 7 files changed, 24 insertions(+), 12 deletions(-) diff --git a/go.sum b/go.sum index e233029d..bd0babf3 100644 --- a/go.sum +++ b/go.sum @@ -198,6 +198,8 @@ github.com/kelindar/loader v0.0.10 h1:af6iHps0yk20BnRnYAOu+hLarZubIgEaZUcbyHCWJE github.com/kelindar/loader v0.0.10/go.mod h1:zhlDtwZnZ2I0Pq5tYScI+VWvvhItHGGJT5tLsAY1RQs= github.com/kelindar/lua v0.0.5 h1:VqJQGl74ub7fiZcSygHV4aZw/IbFKXA6vzk4pEUu4mk= github.com/kelindar/lua v0.0.5/go.mod h1:zCY2muj80Y5c3odwn4Ql6kjzpk3GAq9V4JwuVGLOsdA= +github.com/kelindar/lua v0.0.6 h1:UeM5wedaRI5IYvXBHi2yVu8Kc2IJH6tSB7tqT191FBU= +github.com/kelindar/lua v0.0.6/go.mod h1:zCY2muj80Y5c3odwn4Ql6kjzpk3GAq9V4JwuVGLOsdA= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= diff --git a/internal/column/computed.go b/internal/column/computed.go index 8d142d91..310d66a9 100644 --- a/internal/column/computed.go +++ b/internal/column/computed.go @@ -176,7 +176,7 @@ func (c *timestamp) Name() string { // Type returns the type of the column func (c *timestamp) Type() typeof.Type { - return typeof.Int64 + return typeof.Timestamp } // Value computes the column value for the row diff --git a/internal/column/computed_test.go b/internal/column/computed_test.go index bfced7d2..1d93b6df 100644 --- a/internal/column/computed_test.go +++ b/internal/column/computed_test.go @@ -47,7 +47,7 @@ func Test_Timestamp(t *testing.T) { }) assert.Equal(t, "ts", c.Name()) - assert.Equal(t, typeof.Int64, c.Type()) + assert.Equal(t, typeof.Timestamp, c.Type()) assert.NotNil(t, out) assert.NoError(t, err) assert.NotZero(t, out.(int64)) diff --git a/internal/presto/columns.go b/internal/presto/columns.go index 8335c4de..2b6188c8 100644 --- a/internal/presto/columns.go +++ b/internal/presto/columns.go @@ -463,7 +463,7 @@ func (b *PrestoThriftVarchar) Range(from int, until int, f func(int, interface{} continue } - v := b.Bytes[offset:offset+size] + v := b.Bytes[offset : offset+size] f(i, binaryToString(&v)) offset += size } @@ -584,13 +584,13 @@ func (b *PrestoThriftTimestamp) Append(v interface{}) int { switch v := v.(type) { case int64: b.Nulls = append(b.Nulls, false) - b.Timestamps = append(b.Timestamps, v) + b.Timestamps = append(b.Timestamps, v*1000) // Always assume it's in UNIX seconds case time.Time: b.Nulls = append(b.Nulls, false) b.Timestamps = append(b.Timestamps, v.UnixNano()/1000000) // UNIX time in millisecond default: b.Nulls = append(b.Nulls, false) - b.Timestamps = append(b.Timestamps, reflect.ValueOf(v).Int()) + b.Timestamps = append(b.Timestamps, reflect.ValueOf(v).Int()*1000) } return size @@ -677,7 +677,7 @@ func (b *PrestoThriftTimestamp) Range(from int, until int, f func(int, interface continue } - f(i, b.Timestamps[i]) + f(i, time.Unix(b.Timestamps[i]/1000, 0)) } } @@ -805,7 +805,7 @@ func (b *PrestoThriftJson) Range(from int, until int, f func(int, interface{})) continue } - v := b.Bytes[offset:offset+size] + v := b.Bytes[offset : offset+size] f(i, binaryToString(&v)) offset += size } @@ -815,4 +815,3 @@ func (b *PrestoThriftJson) Range(from int, until int, f func(int, interface{})) func binaryToString(b *[]byte) string { return *(*string)(unsafe.Pointer(b)) } - diff --git a/internal/presto/columns_test.go b/internal/presto/columns_test.go index 9e0b76e2..b2e54635 100644 --- a/internal/presto/columns_test.go +++ b/internal/presto/columns_test.go @@ -440,9 +440,9 @@ func TestAppend_Timestamp(t *testing.T) { input: int64(321), output: &PrestoThriftTimestamp{ Nulls: []bool{false, false}, - Timestamps: []int64{123, 321}, + Timestamps: []int64{123, 321000}, }, - last: int64(321), + last: int64(321000), outputRes: 10, size: 20, count: 2, diff --git a/internal/storage/writer/bigquery/bigquery.go b/internal/storage/writer/bigquery/bigquery.go index e81c69e6..58752fbe 100644 --- a/internal/storage/writer/bigquery/bigquery.go +++ b/internal/storage/writer/bigquery/bigquery.go @@ -37,9 +37,15 @@ func New(project, dataset, table string) (*Writer, error) { // Write writes the data to the sink. func (w *Writer) Write(key key.Key, val []byte) error { source := bigquery.NewReaderSource(bytes.NewReader(val)) - loader := w.table.LoaderFrom(source) + source.FileConfig = bigquery.FileConfig{ + SourceFormat: bigquery.DataFormat("ORC"), + AutoDetect: false, + IgnoreUnknownValues: true, + MaxBadRecords: 0, + } // Run the loader + loader := w.table.LoaderFrom(source) ctx := context.Background() job, err := loader.Run(ctx) if err != nil { diff --git a/internal/storage/writer/writer.go b/internal/storage/writer/writer.go index 0759fdce..a5a4b284 100644 --- a/internal/storage/writer/writer.go +++ b/internal/storage/writer/writer.go @@ -34,7 +34,7 @@ func New(config *config.Compaction, monitor monitor.Monitor, store storage.Stora nameFunc := func(row map[string]interface{}) (s string, e error) { return fmt.Sprintf("%s-%x.orc", - time.Now().UTC().Format("year=2006/month=01/day=02/15-04-05"), + time.Now().UTC().Format("year=2006/month=1/day=2/15-04-05"), hashOfRow(row), ), nil } @@ -50,6 +50,11 @@ func New(config *config.Compaction, monitor monitor.Monitor, store storage.Stora if fn, err := column.NewComputed("nameFunc", typeof.String, config.NameFunc, loader); err == nil { nameFunc = func(row map[string]interface{}) (s string, e error) { val, err := fn.Value(row) + if err != nil { + monitor.Error(err) + return "", err + } + return val.(string), err } }