Skip to content

Commit

Permalink
Added built-in identifier and timestamp (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Apr 30, 2020
1 parent 108e2a3 commit 3efa207
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 27 deletions.
108 changes: 99 additions & 9 deletions internal/column/computed.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ package column

import (
"context"
"crypto/rand"
"encoding/binary"
"encoding/hex"
"fmt"
"sync/atomic"
"time"

"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/kelindar/talaria/internal/scripting"
"github.com/kelindar/loader"
"github.com/kelindar/lua"
"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/kelindar/talaria/internal/scripting"
)

// Loader is the default loader to use for loading computed columns.
Expand All @@ -22,37 +26,53 @@ const emptyScript = `function main(row)
return null
end`

// Computed represents a computed column
type Computed interface {
Name() string
Type() typeof.Type
Value(map[string]interface{}) (interface{}, error)
}

// NewComputed creates a new script from a string
func NewComputed(name string, typ typeof.Type, uriOrCode string, loader *script.Loader) (*Computed, error) {
func NewComputed(name string, typ typeof.Type, uriOrCode string, loader *script.Loader) (Computed, error) {
switch uriOrCode {
case "make://identifier":
return newIdentifier(name), nil
case "make://timestamp":
return newTimestamp(name), nil
}

s, err := loader.Load(name, uriOrCode)
if err != nil {
return nil, err
}

return &Computed{
return &scripted{
code: s,
typ: typ,
}, nil
}

// Computed represents a computed column
type Computed struct {
// ------------------------------------------------------------------------------------------------------------

// scripted represents a computed column computed through a lua script
type scripted struct {
code *lua.Script // The script associated with the column
typ typeof.Type // The type of the column
}

// Name returns the name of the column
func (c *Computed) Name() string {
func (c *scripted) Name() string {
return c.code.Name()
}

// Type returns the type of the column
func (c *Computed) Type() typeof.Type {
func (c *scripted) Type() typeof.Type {
return c.typ
}

// Value computes the column value for the row
func (c *Computed) Value(row map[string]interface{}) (interface{}, error) {
func (c *scripted) Value(row map[string]interface{}) (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

Expand Down Expand Up @@ -93,3 +113,73 @@ func (c *Computed) Value(row map[string]interface{}) (interface{}, error) {
// Type mismatch
return nil, fmt.Errorf("script expects %s type but got %T", c.typ.String(), out)
}

// ------------------------------------------------------------------------------------------------------------

// identifier represents a computed column that generates an event ID
type identifier struct {
seq uint32 // Sequence counter
rnd uint32 // Random component
name string // Name of the column
}

// newIdentifier creates a new ID generator column
func newIdentifier(name string) *identifier {
b := make([]byte, 4)
rand.Read(b)
uniq := binary.BigEndian.Uint32(b)

return &identifier{
seq: 0,
rnd: uniq,
name: name,
}
}

// Name returns the name of the column
func (c *identifier) Name() string {
return c.name
}

// Type returns the type of the column
func (c *identifier) Type() typeof.Type {
return typeof.String
}

// Value computes the column value for the row
func (c *identifier) Value(row map[string]interface{}) (interface{}, error) {
id := make([]byte, 16)
binary.BigEndian.PutUint64(id[0:8], uint64(time.Now().UTC().UnixNano()))
binary.BigEndian.PutUint32(id[8:12], atomic.AddUint32(&c.seq, 1))
binary.BigEndian.PutUint32(id[12:16], c.rnd)
return hex.EncodeToString(id), nil
}

// ------------------------------------------------------------------------------------------------------------

// Timestamp represents a timestamp computed column
type timestamp struct {
name string // Name of the column
}

// newIdentifier creates a new ID generator column
func newTimestamp(name string) *timestamp {
return &timestamp{
name: name,
}
}

// Name returns the name of the column
func (c *timestamp) Name() string {
return c.name
}

// Type returns the type of the column
func (c *timestamp) Type() typeof.Type {
return typeof.Int64
}

// Value computes the column value for the row
func (c *timestamp) Value(row map[string]interface{}) (interface{}, error) {
return time.Now().UTC().Unix(), nil
}
36 changes: 33 additions & 3 deletions internal/column/computed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,61 @@ import (
)

func Test_Computed(t *testing.T) {

c := newDataColumn(t)
out, err := c.Value(map[string]interface{}{
"a": 1,
"b": "hello",
})

assert.NotNil(t, out)
assert.NoError(t, err)
assert.Equal(t, `{"a":1,"b":"hello"}`, out)
}

func Test_Identifier(t *testing.T) {
c, err := NewComputed("id", typeof.String, "make://identifier", nil)
assert.NoError(t, err)
out, err := c.Value(map[string]interface{}{
"a": 1,
"b": "hello",
})

assert.Equal(t, "id", c.Name())
assert.Equal(t, typeof.String, c.Type())
assert.NotNil(t, out)
assert.NoError(t, err)
assert.Equal(t, 32, len(out.(string)))
}

func Test_Timestamp(t *testing.T) {
c, err := NewComputed("ts", typeof.String, "make://timestamp", nil)
assert.NoError(t, err)
out, err := c.Value(map[string]interface{}{
"a": 1,
"b": "hello",
})

assert.Equal(t, "ts", c.Name())
assert.Equal(t, typeof.Int64, c.Type())
assert.NotNil(t, out)
assert.NoError(t, err)
assert.NotZero(t, out.(int64))
}

func Test_Download(t *testing.T) {
l := script.NewLoader(nil)
c, err := NewComputed("data", typeof.JSON, "https://raw.githubusercontent.com/kelindar/lua/master/fixtures/json.lua", l)
out, err := c.Value(map[string]interface{}{
"a": 1,
"b": "hello",
})

assert.NotNil(t, out)
assert.NoError(t, err)
assert.Equal(t, `{"a":1,"b":"hello"}`, out)

}

func newDataColumn(t *testing.T) *Computed {
func newDataColumn(t *testing.T) Computed {
l := script.NewLoader(nil)
c, err := NewComputed("data", typeof.JSON, `
local json = require("json")
Expand Down
2 changes: 1 addition & 1 deletion internal/encoding/block/from_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

// FromBatchBy creates a block from a talaria protobuf-encoded batch. It
// repartitions the batch by a given partition key at the same time.
func FromBatchBy(batch *talaria.Batch, partitionBy string, filter *typeof.Schema, computed ...*column.Computed) ([]Block, error) {
func FromBatchBy(batch *talaria.Batch, partitionBy string, filter *typeof.Schema, computed ...column.Computed) ([]Block, error) {
if batch == nil || batch.Strings == nil || batch.Events == nil {
return nil, errEmptyBatch
}
Expand Down
10 changes: 5 additions & 5 deletions internal/encoding/block/from_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ func TestBlock_FromBatch(t *testing.T) {

// The schema to filter
filter := typeof.Schema{
"a": typeof.Int64,
"b": typeof.Timestamp,
"d": typeof.String,
"data": typeof.JSON,
"a": typeof.Int64,
"b": typeof.Timestamp,
"d": typeof.String,
"data": typeof.JSON,
"another": typeof.Int64,
}

Expand Down Expand Up @@ -107,7 +107,7 @@ func TestBlock_FromBatch(t *testing.T) {
assert.Contains(t, string(row["data"].(json.RawMessage)), "event3")
}

func newDataColumn() (*column.Computed, error) {
func newDataColumn() (column.Computed, error) {
return column.NewComputed("data", typeof.JSON, `
local json = require("json")
Expand Down
2 changes: 1 addition & 1 deletion internal/encoding/block/from_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// FromCSVBy creates a block from a comma-separated file. It repartitions the batch by a given partition key at the same time.
func FromCSVBy(input []byte, partitionBy string, filter *typeof.Schema, computed ...*column.Computed) ([]Block, error) {
func FromCSVBy(input []byte, partitionBy string, filter *typeof.Schema, computed ...column.Computed) ([]Block, error) {
const max = 10000000 // 10MB

rdr := csv.NewReader(bytes.NewReader(input))
Expand Down
2 changes: 1 addition & 1 deletion internal/encoding/block/from_orc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

// FromOrcBy decodes a set of blocks from an orc file and repartitions
// it by the specified partition key.
func FromOrcBy(payload []byte, partitionBy string, filter *typeof.Schema, computed ...*column.Computed) ([]Block, error) {
func FromOrcBy(payload []byte, partitionBy string, filter *typeof.Schema, computed ...column.Computed) ([]Block, error) {
const max = 10000000 // 10MB

iter, err := orc.FromBuffer(payload)
Expand Down
2 changes: 1 addition & 1 deletion internal/encoding/block/from_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// FromRequestBy creates a block from a talaria protobuf-encoded request. It
// repartitions the batch by a given partition key at the same time.
func FromRequestBy(request *talaria.IngestRequest, partitionBy string, filter *typeof.Schema, computed ...*column.Computed) ([]Block, error) {
func FromRequestBy(request *talaria.IngestRequest, partitionBy string, filter *typeof.Schema, computed ...column.Computed) ([]Block, error) {
switch data := request.GetData().(type) {
case *talaria.IngestRequest_Batch:
return FromBatchBy(data.Batch, partitionBy, filter, computed...)
Expand Down
4 changes: 2 additions & 2 deletions internal/encoding/block/from_url.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
)

// FromURLBy creates a block from a remote url which should be loaded. It repartitions the batch by a given partition key at the same time.
func FromURLBy(uri string, partitionBy string, filter *typeof.Schema, computed ...*column.Computed) ([]Block, error) {
var handler func([]byte, string, *typeof.Schema, ...*column.Computed) ([]Block, error)
func FromURLBy(uri string, partitionBy string, filter *typeof.Schema, computed ...column.Computed) ([]Block, error) {
var handler func([]byte, string, *typeof.Schema, ...column.Computed) ([]Block, error)
switch strings.ToLower(filepath.Ext(uri)) {
case ".orc":
handler = FromOrcBy
Expand Down
2 changes: 1 addition & 1 deletion internal/encoding/block/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (r row) AppendTo(cols column.Columns) (size int) {
}

// Transform runs the computed columns and overwrites/appends them to the set.
func (r row) Transform(computed []*column.Computed, filter *typeof.Schema) row {
func (r row) Transform(computed []column.Computed, filter *typeof.Schema) row {

// Create a new output row and copy the column values from the input
schema := make(typeof.Schema, len(r.schema))
Expand Down
4 changes: 2 additions & 2 deletions internal/encoding/block/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestTransform(t *testing.T) {
in.Set("c", 123)

// Run a transformation
out := in.Transform([]*column.Computed{dataColumn}, &filter)
out := in.Transform([]column.Computed{dataColumn}, &filter)
assert.NotNil(t, out)

// Make sure input is not changed
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestTransform_NoFilter(t *testing.T) {
in.Set("c", 123)

// Run a transformation
out := in.Transform([]*column.Computed{dataColumn}, nil)
out := in.Transform([]column.Computed{dataColumn}, nil)
assert.NotNil(t, out)

// Make sure input is not changed
Expand Down
2 changes: 1 addition & 1 deletion internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type Server struct {
monitor monitor.Monitor // The monitoring layer
cancel context.CancelFunc // The cancellation function for the server
tables map[string]table.Table // The list of tables
computed []*column.Computed // The set of computed columns
computed []column.Computed // The set of computed columns
s3sqs *s3sqs.Ingress // The S3SQS Ingress (optional)
}

Expand Down

0 comments on commit 3efa207

Please sign in to comment.