Skip to content

Commit

Permalink
Added ingestion by CSV and URL (#14)
Browse files Browse the repository at this point in the history
* Added ingestion by CSV and URL

* path fix
  • Loading branch information
kelindar committed Apr 20, 2020
1 parent 396e850 commit 03af7e2
Show file tree
Hide file tree
Showing 11 changed files with 4,208 additions and 1,717 deletions.
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# MIT License

Copyright (c) 2019-2020 Grabtaxi Holdings PTE LTE (GRAB)
Copyright (c) 2020 Roman Atachiants (from commit #81390f0b48bd772d1048c698ecf4ccbfec4c4f56)

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
39 changes: 37 additions & 2 deletions client/golang/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,46 @@ func (c *Client) IngestBatch(ctx context.Context, batch []Event) error {
},
}

err := hystrix.Do(commandName, func() error {
return hystrix.Do(commandName, func() error {
_, err := c.ingress.Ingest(ctx, req)
return err
}, nil)
return err
}

// IngestURL sends a request to Talaria to ingest a file from a specific URL.
func (c *Client) IngestURL(ctx context.Context, url string) error {
return hystrix.Do(commandName, func() error {
_, err := c.ingress.Ingest(ctx, &pb.IngestRequest{
Data: &pb.IngestRequest_Url{
Url: url,
},
})
return err
}, nil)
}

// IngestCSV sends a set of comma-separated file to Talaria to ingest.
func (c *Client) IngestCSV(ctx context.Context, data []byte) error {
return hystrix.Do(commandName, func() error {
_, err := c.ingress.Ingest(ctx, &pb.IngestRequest{
Data: &pb.IngestRequest_Csv{
Csv: data,
},
})
return err
}, nil)
}

// IngestORC sends an ORC-encoded file to Talaria to ingest.
func (c *Client) IngestORC(ctx context.Context, data []byte) error {
return hystrix.Do(commandName, func() error {
_, err := c.ingress.Ingest(ctx, &pb.IngestRequest{
Data: &pb.IngestRequest_Orc{
Orc: data,
},
})
return err
}, nil)
}

// Close connection
Expand Down
87 changes: 87 additions & 0 deletions internal/encoding/block/from_csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

package block

import (
"bytes"
"encoding/csv"
"io"

"github.com/kelindar/talaria/internal/column"
"github.com/kelindar/talaria/internal/encoding/typeof"
)

// FromCSVBy creates a block from a comma-separated file. It repartitions the batch by a given partition key at the same time.
func FromCSVBy(input []byte, partitionBy string, filter *typeof.Schema, computed ...*column.Computed) ([]Block, error) {
const max = 10000000 // 10MB

rdr := csv.NewReader(bytes.NewReader(input))

// Read the header first
r, err := rdr.Read()
header := r

// Find the partition index
partitionIdx, ok := findString(header, partitionBy)
if !ok {
return nil, errPartitionNotFound
}

// The resulting set of blocks, repartitioned and chunked
blocks := make([]Block, 0, 128)

// Create presto columns and iterate
result, size := make(map[string]column.Columns, 16), 0
for {
r, err = rdr.Read()
if err == io.EOF {
break
} else if err != nil {
return nil, err
}

if size >= max {
pending, err := makeBlocks(result)
if err != nil {
return nil, err
}

size = 0 // Reset the size
blocks = append(blocks, pending...)
result = make(map[string]column.Columns, 16)
}

// Get the partition value, must be a string
partition, ok := convertToString(r[partitionIdx])
if !ok {
return nil, errPartitionNotFound
}

// Get the block for that partition
columns, exists := result[partition]
if !exists {
columns = column.MakeColumns(filter)
result[partition] = columns
}

// Prepare a row for transformation
row := newRow(filter.Clone(), len(r))
for i, v := range r {
row.Set(header[i], v)
}

// Append computed columns and fill nulls for the row
size += row.Transform(computed, filter).AppendTo(columns)
size += columns.FillNulls()
}

// Write the last chunk
last, err := makeBlocks(result)
if err != nil {
return nil, err
}

blocks = append(blocks, last...)
return blocks, nil
}
36 changes: 36 additions & 0 deletions internal/encoding/block/from_csv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

package block

import (
"io/ioutil"
"testing"

"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/stretchr/testify/assert"
)

func TestFromCSV(t *testing.T) {
const testFile = "../../../test/test4.csv"

o, err := ioutil.ReadFile(testFile)
assert.NotEmpty(t, o)
assert.NoError(t, err)

b, err := FromCSVBy(o, "raisedCurrency", &typeof.Schema{
"raisedCurrency": typeof.String,
"raisedAmt": typeof.Float64,
})
assert.NoError(t, err)
assert.Equal(t, 3, len(b))

for _, v := range b {
assert.Contains(t, []string{"EUR", "CAD", "USD"}, string(v.Key))
}

v, err := b[0].Select(typeof.Schema{"raisedAmt": typeof.String})
assert.NoError(t, err)
assert.True(t, v["raisedAmt"].Size() > 0)
assert.Equal(t, typeof.Float64, v["raisedAmt"].Kind())
}
4 changes: 2 additions & 2 deletions internal/encoding/block/from_orc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"strings"

orctype "github.com/crphang/orc"
"github.com/kelindar/talaria/internal/column"
Expand Down Expand Up @@ -80,7 +81,6 @@ func FromOrcBy(payload []byte, partitionBy string, filter *typeof.Schema, comput

// Append computed columns and fill nulls for the row
size += row.Transform(computed, filter).AppendTo(columns)

size += columns.FillNulls()
return false
}, cols...)
Expand All @@ -98,7 +98,7 @@ func FromOrcBy(payload []byte, partitionBy string, filter *typeof.Schema, comput
// Find the partition index
func findString(columns []string, partitionBy string) (int, bool) {
for i, k := range columns {
if k == partitionBy {
if strings.EqualFold(k, partitionBy) {
return i, true
}
}
Expand Down
4 changes: 4 additions & 0 deletions internal/encoding/block/from_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ func FromRequestBy(request *talaria.IngestRequest, partitionBy string, filter *t
return FromBatchBy(data.Batch, partitionBy, filter, computed...)
case *talaria.IngestRequest_Orc:
return FromOrcBy(data.Orc, partitionBy, filter, computed...)
case *talaria.IngestRequest_Csv:
return FromCSVBy(data.Csv, partitionBy, filter, computed...)
case *talaria.IngestRequest_Url:
return FromURLBy(data.Url, partitionBy, filter, computed...)
case nil: // The field is not set.
return nil, nil
default:
Expand Down
36 changes: 36 additions & 0 deletions internal/encoding/block/from_url.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

package block

import (
"context"
"path/filepath"
"strings"

"github.com/kelindar/loader"
"github.com/kelindar/talaria/internal/column"
"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/kelindar/talaria/internal/monitor/errors"
)

// FromURLBy creates a block from a remote url which should be loaded. It repartitions the batch by a given partition key at the same time.
func FromURLBy(uri string, partitionBy string, filter *typeof.Schema, computed ...*column.Computed) ([]Block, error) {
var handler func([]byte, string, *typeof.Schema, ...*column.Computed) ([]Block, error)
switch strings.ToLower(filepath.Ext(uri)) {
case ".orc":
handler = FromOrcBy
case ".csv":
handler = FromCSVBy
default:
return nil, errors.Newf("block: unsupported file extension %s", filepath.Ext(uri))
}

l := loader.New()
b, err := l.Load(context.Background(), uri)
if err != nil {
return nil, err
}

return handler(b, partitionBy, filter, computed...)
}
33 changes: 33 additions & 0 deletions internal/encoding/block/from_url_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

package block

import (
"path/filepath"
"testing"

"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/stretchr/testify/assert"
)

func TestFromURL(t *testing.T) {
p, err := filepath.Abs("../../../test/test4.csv")
assert.NoError(t, err)

b, err := FromURLBy("file:///"+p, "raisedCurrency", &typeof.Schema{
"raisedCurrency": typeof.String,
"raisedAmt": typeof.Float64,
})
assert.NoError(t, err)
assert.Equal(t, 3, len(b))

for _, v := range b {
assert.Contains(t, []string{"EUR", "CAD", "USD"}, string(v.Key))
}

v, err := b[0].Select(typeof.Schema{"raisedAmt": typeof.String})
assert.NoError(t, err)
assert.True(t, v["raisedAmt"].Size() > 0)
assert.Equal(t, typeof.Float64, v["raisedAmt"].Kind())
}
Loading

0 comments on commit 03af7e2

Please sign in to comment.