Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ingestion by CSV and URL #14

Merged
merged 2 commits into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# MIT License

Copyright (c) 2019-2020 Grabtaxi Holdings PTE LTE (GRAB)
Copyright (c) 2020 Roman Atachiants (from commit #81390f0b48bd772d1048c698ecf4ccbfec4c4f56)

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
39 changes: 37 additions & 2 deletions client/golang/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,46 @@ func (c *Client) IngestBatch(ctx context.Context, batch []Event) error {
},
}

err := hystrix.Do(commandName, func() error {
return hystrix.Do(commandName, func() error {
_, err := c.ingress.Ingest(ctx, req)
return err
}, nil)
return err
}

// IngestURL sends a request to Talaria to ingest a file from a specific URL.
func (c *Client) IngestURL(ctx context.Context, url string) error {
return hystrix.Do(commandName, func() error {
_, err := c.ingress.Ingest(ctx, &pb.IngestRequest{
Data: &pb.IngestRequest_Url{
Url: url,
},
})
return err
}, nil)
}

// IngestCSV sends a set of comma-separated file to Talaria to ingest.
func (c *Client) IngestCSV(ctx context.Context, data []byte) error {
return hystrix.Do(commandName, func() error {
_, err := c.ingress.Ingest(ctx, &pb.IngestRequest{
Data: &pb.IngestRequest_Csv{
Csv: data,
},
})
return err
}, nil)
}

// IngestORC sends an ORC-encoded file to Talaria to ingest.
func (c *Client) IngestORC(ctx context.Context, data []byte) error {
return hystrix.Do(commandName, func() error {
_, err := c.ingress.Ingest(ctx, &pb.IngestRequest{
Data: &pb.IngestRequest_Orc{
Orc: data,
},
})
return err
}, nil)
}

// Close connection
Expand Down
87 changes: 87 additions & 0 deletions internal/encoding/block/from_csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

package block

import (
"bytes"
"encoding/csv"
"io"

"github.com/kelindar/talaria/internal/column"
"github.com/kelindar/talaria/internal/encoding/typeof"
)

// FromCSVBy creates a block from a comma-separated file. It repartitions the batch by a given partition key at the same time.
func FromCSVBy(input []byte, partitionBy string, filter *typeof.Schema, computed ...*column.Computed) ([]Block, error) {
const max = 10000000 // 10MB

rdr := csv.NewReader(bytes.NewReader(input))

// Read the header first
r, err := rdr.Read()
header := r

// Find the partition index
partitionIdx, ok := findString(header, partitionBy)
if !ok {
return nil, errPartitionNotFound
}

// The resulting set of blocks, repartitioned and chunked
blocks := make([]Block, 0, 128)

// Create presto columns and iterate
result, size := make(map[string]column.Columns, 16), 0
for {
r, err = rdr.Read()
if err == io.EOF {
break
} else if err != nil {
return nil, err
}

if size >= max {
pending, err := makeBlocks(result)
if err != nil {
return nil, err
}

size = 0 // Reset the size
blocks = append(blocks, pending...)
result = make(map[string]column.Columns, 16)
}

// Get the partition value, must be a string
partition, ok := convertToString(r[partitionIdx])
if !ok {
return nil, errPartitionNotFound
}

// Get the block for that partition
columns, exists := result[partition]
if !exists {
columns = column.MakeColumns(filter)
result[partition] = columns
}

// Prepare a row for transformation
row := newRow(filter.Clone(), len(r))
for i, v := range r {
row.Set(header[i], v)
}

// Append computed columns and fill nulls for the row
size += row.Transform(computed, filter).AppendTo(columns)
size += columns.FillNulls()
}

// Write the last chunk
last, err := makeBlocks(result)
if err != nil {
return nil, err
}

blocks = append(blocks, last...)
return blocks, nil
}
36 changes: 36 additions & 0 deletions internal/encoding/block/from_csv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

package block

import (
"io/ioutil"
"testing"

"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/stretchr/testify/assert"
)

func TestFromCSV(t *testing.T) {
const testFile = "../../../test/test4.csv"

o, err := ioutil.ReadFile(testFile)
assert.NotEmpty(t, o)
assert.NoError(t, err)

b, err := FromCSVBy(o, "raisedCurrency", &typeof.Schema{
"raisedCurrency": typeof.String,
"raisedAmt": typeof.Float64,
})
assert.NoError(t, err)
assert.Equal(t, 3, len(b))

for _, v := range b {
assert.Contains(t, []string{"EUR", "CAD", "USD"}, string(v.Key))
}

v, err := b[0].Select(typeof.Schema{"raisedAmt": typeof.String})
assert.NoError(t, err)
assert.True(t, v["raisedAmt"].Size() > 0)
assert.Equal(t, typeof.Float64, v["raisedAmt"].Kind())
}
4 changes: 2 additions & 2 deletions internal/encoding/block/from_orc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"strings"

orctype "github.com/crphang/orc"
"github.com/kelindar/talaria/internal/column"
Expand Down Expand Up @@ -80,7 +81,6 @@ func FromOrcBy(payload []byte, partitionBy string, filter *typeof.Schema, comput

// Append computed columns and fill nulls for the row
size += row.Transform(computed, filter).AppendTo(columns)

size += columns.FillNulls()
return false
}, cols...)
Expand All @@ -98,7 +98,7 @@ func FromOrcBy(payload []byte, partitionBy string, filter *typeof.Schema, comput
// Find the partition index
func findString(columns []string, partitionBy string) (int, bool) {
for i, k := range columns {
if k == partitionBy {
if strings.EqualFold(k, partitionBy) {
return i, true
}
}
Expand Down
4 changes: 4 additions & 0 deletions internal/encoding/block/from_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ func FromRequestBy(request *talaria.IngestRequest, partitionBy string, filter *t
return FromBatchBy(data.Batch, partitionBy, filter, computed...)
case *talaria.IngestRequest_Orc:
return FromOrcBy(data.Orc, partitionBy, filter, computed...)
case *talaria.IngestRequest_Csv:
return FromCSVBy(data.Csv, partitionBy, filter, computed...)
case *talaria.IngestRequest_Url:
return FromURLBy(data.Url, partitionBy, filter, computed...)
case nil: // The field is not set.
return nil, nil
default:
Expand Down
36 changes: 36 additions & 0 deletions internal/encoding/block/from_url.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

package block

import (
"context"
"path/filepath"
"strings"

"github.com/kelindar/loader"
"github.com/kelindar/talaria/internal/column"
"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/kelindar/talaria/internal/monitor/errors"
)

// FromURLBy creates a block from a remote url which should be loaded. It repartitions the batch by a given partition key at the same time.
func FromURLBy(uri string, partitionBy string, filter *typeof.Schema, computed ...*column.Computed) ([]Block, error) {
var handler func([]byte, string, *typeof.Schema, ...*column.Computed) ([]Block, error)
switch strings.ToLower(filepath.Ext(uri)) {
case ".orc":
handler = FromOrcBy
case ".csv":
handler = FromCSVBy
default:
return nil, errors.Newf("block: unsupported file extension %s", filepath.Ext(uri))
}

l := loader.New()
b, err := l.Load(context.Background(), uri)
if err != nil {
return nil, err
}

return handler(b, partitionBy, filter, computed...)
}
33 changes: 33 additions & 0 deletions internal/encoding/block/from_url_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

package block

import (
"path/filepath"
"testing"

"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/stretchr/testify/assert"
)

func TestFromURL(t *testing.T) {
p, err := filepath.Abs("../../../test/test4.csv")
assert.NoError(t, err)

b, err := FromURLBy("file:///"+p, "raisedCurrency", &typeof.Schema{
"raisedCurrency": typeof.String,
"raisedAmt": typeof.Float64,
})
assert.NoError(t, err)
assert.Equal(t, 3, len(b))

for _, v := range b {
assert.Contains(t, []string{"EUR", "CAD", "USD"}, string(v.Key))
}

v, err := b[0].Select(typeof.Schema{"raisedAmt": typeof.String})
assert.NoError(t, err)
assert.True(t, v["raisedAmt"].Size() > 0)
assert.Equal(t, typeof.Float64, v["raisedAmt"].Kind())
}
Loading