Skip to content

Commit

Permalink
Fixing big query writer (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed May 12, 2020
1 parent 7667509 commit 13b5e10
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 12 deletions.
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ github.com/kelindar/loader v0.0.10 h1:af6iHps0yk20BnRnYAOu+hLarZubIgEaZUcbyHCWJE
github.com/kelindar/loader v0.0.10/go.mod h1:zhlDtwZnZ2I0Pq5tYScI+VWvvhItHGGJT5tLsAY1RQs=
github.com/kelindar/lua v0.0.5 h1:VqJQGl74ub7fiZcSygHV4aZw/IbFKXA6vzk4pEUu4mk=
github.com/kelindar/lua v0.0.5/go.mod h1:zCY2muj80Y5c3odwn4Ql6kjzpk3GAq9V4JwuVGLOsdA=
github.com/kelindar/lua v0.0.6 h1:UeM5wedaRI5IYvXBHi2yVu8Kc2IJH6tSB7tqT191FBU=
github.com/kelindar/lua v0.0.6/go.mod h1:zCY2muj80Y5c3odwn4Ql6kjzpk3GAq9V4JwuVGLOsdA=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
Expand Down
2 changes: 1 addition & 1 deletion internal/column/computed.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (c *timestamp) Name() string {

// Type returns the type of the column
func (c *timestamp) Type() typeof.Type {
return typeof.Int64
return typeof.Timestamp
}

// Value computes the column value for the row
Expand Down
2 changes: 1 addition & 1 deletion internal/column/computed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func Test_Timestamp(t *testing.T) {
})

assert.Equal(t, "ts", c.Name())
assert.Equal(t, typeof.Int64, c.Type())
assert.Equal(t, typeof.Timestamp, c.Type())
assert.NotNil(t, out)
assert.NoError(t, err)
assert.NotZero(t, out.(int64))
Expand Down
11 changes: 5 additions & 6 deletions internal/presto/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func (b *PrestoThriftVarchar) Range(from int, until int, f func(int, interface{}
continue
}

v := b.Bytes[offset:offset+size]
v := b.Bytes[offset : offset+size]
f(i, binaryToString(&v))
offset += size
}
Expand Down Expand Up @@ -584,13 +584,13 @@ func (b *PrestoThriftTimestamp) Append(v interface{}) int {
switch v := v.(type) {
case int64:
b.Nulls = append(b.Nulls, false)
b.Timestamps = append(b.Timestamps, v)
b.Timestamps = append(b.Timestamps, v*1000) // Always assume it's in UNIX seconds
case time.Time:
b.Nulls = append(b.Nulls, false)
b.Timestamps = append(b.Timestamps, v.UnixNano()/1000000) // UNIX time in millisecond
default:
b.Nulls = append(b.Nulls, false)
b.Timestamps = append(b.Timestamps, reflect.ValueOf(v).Int())
b.Timestamps = append(b.Timestamps, reflect.ValueOf(v).Int()*1000)
}

return size
Expand Down Expand Up @@ -677,7 +677,7 @@ func (b *PrestoThriftTimestamp) Range(from int, until int, f func(int, interface
continue
}

f(i, b.Timestamps[i])
f(i, time.Unix(b.Timestamps[i]/1000, 0))
}
}

Expand Down Expand Up @@ -805,7 +805,7 @@ func (b *PrestoThriftJson) Range(from int, until int, f func(int, interface{}))
continue
}

v := b.Bytes[offset:offset+size]
v := b.Bytes[offset : offset+size]
f(i, binaryToString(&v))
offset += size
}
Expand All @@ -815,4 +815,3 @@ func (b *PrestoThriftJson) Range(from int, until int, f func(int, interface{}))
func binaryToString(b *[]byte) string {
return *(*string)(unsafe.Pointer(b))
}

4 changes: 2 additions & 2 deletions internal/presto/columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,9 @@ func TestAppend_Timestamp(t *testing.T) {
input: int64(321),
output: &PrestoThriftTimestamp{
Nulls: []bool{false, false},
Timestamps: []int64{123, 321},
Timestamps: []int64{123, 321000},
},
last: int64(321),
last: int64(321000),
outputRes: 10,
size: 20,
count: 2,
Expand Down
8 changes: 7 additions & 1 deletion internal/storage/writer/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@ func New(project, dataset, table string) (*Writer, error) {
// Write writes the data to the sink.
func (w *Writer) Write(key key.Key, val []byte) error {
source := bigquery.NewReaderSource(bytes.NewReader(val))
loader := w.table.LoaderFrom(source)
source.FileConfig = bigquery.FileConfig{
SourceFormat: bigquery.DataFormat("ORC"),
AutoDetect: false,
IgnoreUnknownValues: true,
MaxBadRecords: 0,
}

// Run the loader
loader := w.table.LoaderFrom(source)
ctx := context.Background()
job, err := loader.Run(ctx)
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion internal/storage/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func New(config *config.Compaction, monitor monitor.Monitor, store storage.Stora

nameFunc := func(row map[string]interface{}) (s string, e error) {
return fmt.Sprintf("%s-%x.orc",
time.Now().UTC().Format("year=2006/month=01/day=02/15-04-05"),
time.Now().UTC().Format("year=2006/month=1/day=2/15-04-05"),
hashOfRow(row),
), nil
}
Expand All @@ -50,6 +50,11 @@ func New(config *config.Compaction, monitor monitor.Monitor, store storage.Stora
if fn, err := column.NewComputed("nameFunc", typeof.String, config.NameFunc, loader); err == nil {
nameFunc = func(row map[string]interface{}) (s string, e error) {
val, err := fn.Value(row)
if err != nil {
monitor.Error(err)
return "", err
}

return val.(string), err
}
}
Expand Down

0 comments on commit 13b5e10

Please sign in to comment.