diff --git a/internal/column/computed.go b/internal/column/computed.go index ea58a692..8d142d91 100644 --- a/internal/column/computed.go +++ b/internal/column/computed.go @@ -5,13 +5,17 @@ package column import ( "context" + "crypto/rand" + "encoding/binary" + "encoding/hex" "fmt" + "sync/atomic" "time" - "github.com/kelindar/talaria/internal/encoding/typeof" - "github.com/kelindar/talaria/internal/scripting" "github.com/kelindar/loader" "github.com/kelindar/lua" + "github.com/kelindar/talaria/internal/encoding/typeof" + "github.com/kelindar/talaria/internal/scripting" ) // Loader is the default loader to use for loading computed columns. @@ -22,37 +26,53 @@ const emptyScript = `function main(row) return null end` +// Computed represents a computed column +type Computed interface { + Name() string + Type() typeof.Type + Value(map[string]interface{}) (interface{}, error) +} + // NewComputed creates a new script from a string -func NewComputed(name string, typ typeof.Type, uriOrCode string, loader *script.Loader) (*Computed, error) { +func NewComputed(name string, typ typeof.Type, uriOrCode string, loader *script.Loader) (Computed, error) { + switch uriOrCode { + case "make://identifier": + return newIdentifier(name), nil + case "make://timestamp": + return newTimestamp(name), nil + } + s, err := loader.Load(name, uriOrCode) if err != nil { return nil, err } - return &Computed{ + return &scripted{ code: s, typ: typ, }, nil } -// Computed represents a computed column -type Computed struct { +// ------------------------------------------------------------------------------------------------------------ + +// scripted represents a computed column computed through a lua script +type scripted struct { code *lua.Script // The script associated with the column typ typeof.Type // The type of the column } // Name returns the name of the column -func (c *Computed) Name() string { +func (c *scripted) Name() string { return c.code.Name() } // Type returns the type of the column -func (c *Computed) Type() typeof.Type { +func (c *scripted) Type() typeof.Type { return c.typ } // Value computes the column value for the row -func (c *Computed) Value(row map[string]interface{}) (interface{}, error) { +func (c *scripted) Value(row map[string]interface{}) (interface{}, error) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() @@ -93,3 +113,73 @@ func (c *Computed) Value(row map[string]interface{}) (interface{}, error) { // Type mismatch return nil, fmt.Errorf("script expects %s type but got %T", c.typ.String(), out) } + +// ------------------------------------------------------------------------------------------------------------ + +// identifier represents a computed column that generates an event ID +type identifier struct { + seq uint32 // Sequence counter + rnd uint32 // Random component + name string // Name of the column +} + +// newIdentifier creates a new ID generator column +func newIdentifier(name string) *identifier { + b := make([]byte, 4) + rand.Read(b) + uniq := binary.BigEndian.Uint32(b) + + return &identifier{ + seq: 0, + rnd: uniq, + name: name, + } +} + +// Name returns the name of the column +func (c *identifier) Name() string { + return c.name +} + +// Type returns the type of the column +func (c *identifier) Type() typeof.Type { + return typeof.String +} + +// Value computes the column value for the row +func (c *identifier) Value(row map[string]interface{}) (interface{}, error) { + id := make([]byte, 16) + binary.BigEndian.PutUint64(id[0:8], uint64(time.Now().UTC().UnixNano())) + binary.BigEndian.PutUint32(id[8:12], atomic.AddUint32(&c.seq, 1)) + binary.BigEndian.PutUint32(id[12:16], c.rnd) + return hex.EncodeToString(id), nil +} + +// ------------------------------------------------------------------------------------------------------------ + +// Timestamp represents a timestamp computed column +type timestamp struct { + name string // Name of the column +} + +// newIdentifier creates a new ID generator column +func newTimestamp(name string) *timestamp { + return ×tamp{ + name: name, + } +} + +// Name returns the name of the column +func (c *timestamp) Name() string { + return c.name +} + +// Type returns the type of the column +func (c *timestamp) Type() typeof.Type { + return typeof.Int64 +} + +// Value computes the column value for the row +func (c *timestamp) Value(row map[string]interface{}) (interface{}, error) { + return time.Now().UTC().Unix(), nil +} diff --git a/internal/column/computed_test.go b/internal/column/computed_test.go index f56e60d9..bfced7d2 100644 --- a/internal/column/computed_test.go +++ b/internal/column/computed_test.go @@ -12,17 +12,47 @@ import ( ) func Test_Computed(t *testing.T) { - c := newDataColumn(t) out, err := c.Value(map[string]interface{}{ "a": 1, "b": "hello", }) + assert.NotNil(t, out) assert.NoError(t, err) assert.Equal(t, `{"a":1,"b":"hello"}`, out) } +func Test_Identifier(t *testing.T) { + c, err := NewComputed("id", typeof.String, "make://identifier", nil) + assert.NoError(t, err) + out, err := c.Value(map[string]interface{}{ + "a": 1, + "b": "hello", + }) + + assert.Equal(t, "id", c.Name()) + assert.Equal(t, typeof.String, c.Type()) + assert.NotNil(t, out) + assert.NoError(t, err) + assert.Equal(t, 32, len(out.(string))) +} + +func Test_Timestamp(t *testing.T) { + c, err := NewComputed("ts", typeof.String, "make://timestamp", nil) + assert.NoError(t, err) + out, err := c.Value(map[string]interface{}{ + "a": 1, + "b": "hello", + }) + + assert.Equal(t, "ts", c.Name()) + assert.Equal(t, typeof.Int64, c.Type()) + assert.NotNil(t, out) + assert.NoError(t, err) + assert.NotZero(t, out.(int64)) +} + func Test_Download(t *testing.T) { l := script.NewLoader(nil) c, err := NewComputed("data", typeof.JSON, "https://raw.githubusercontent.com/kelindar/lua/master/fixtures/json.lua", l) @@ -30,13 +60,13 @@ func Test_Download(t *testing.T) { "a": 1, "b": "hello", }) + assert.NotNil(t, out) assert.NoError(t, err) assert.Equal(t, `{"a":1,"b":"hello"}`, out) - } -func newDataColumn(t *testing.T) *Computed { +func newDataColumn(t *testing.T) Computed { l := script.NewLoader(nil) c, err := NewComputed("data", typeof.JSON, ` local json = require("json") diff --git a/internal/encoding/block/from_batch.go b/internal/encoding/block/from_batch.go index fab94dda..2cc986dd 100644 --- a/internal/encoding/block/from_batch.go +++ b/internal/encoding/block/from_batch.go @@ -16,7 +16,7 @@ import ( // FromBatchBy creates a block from a talaria protobuf-encoded batch. It // repartitions the batch by a given partition key at the same time. -func FromBatchBy(batch *talaria.Batch, partitionBy string, filter *typeof.Schema, computed ...*column.Computed) ([]Block, error) { +func FromBatchBy(batch *talaria.Batch, partitionBy string, filter *typeof.Schema, computed ...column.Computed) ([]Block, error) { if batch == nil || batch.Strings == nil || batch.Events == nil { return nil, errEmptyBatch } diff --git a/internal/encoding/block/from_batch_test.go b/internal/encoding/block/from_batch_test.go index e54853c4..2c722efd 100644 --- a/internal/encoding/block/from_batch_test.go +++ b/internal/encoding/block/from_batch_test.go @@ -73,10 +73,10 @@ func TestBlock_FromBatch(t *testing.T) { // The schema to filter filter := typeof.Schema{ - "a": typeof.Int64, - "b": typeof.Timestamp, - "d": typeof.String, - "data": typeof.JSON, + "a": typeof.Int64, + "b": typeof.Timestamp, + "d": typeof.String, + "data": typeof.JSON, "another": typeof.Int64, } @@ -107,7 +107,7 @@ func TestBlock_FromBatch(t *testing.T) { assert.Contains(t, string(row["data"].(json.RawMessage)), "event3") } -func newDataColumn() (*column.Computed, error) { +func newDataColumn() (column.Computed, error) { return column.NewComputed("data", typeof.JSON, ` local json = require("json") diff --git a/internal/encoding/block/from_csv.go b/internal/encoding/block/from_csv.go index 118638ec..223600c8 100644 --- a/internal/encoding/block/from_csv.go +++ b/internal/encoding/block/from_csv.go @@ -13,7 +13,7 @@ import ( ) // FromCSVBy creates a block from a comma-separated file. It repartitions the batch by a given partition key at the same time. -func FromCSVBy(input []byte, partitionBy string, filter *typeof.Schema, computed ...*column.Computed) ([]Block, error) { +func FromCSVBy(input []byte, partitionBy string, filter *typeof.Schema, computed ...column.Computed) ([]Block, error) { const max = 10000000 // 10MB rdr := csv.NewReader(bytes.NewReader(input)) diff --git a/internal/encoding/block/from_orc.go b/internal/encoding/block/from_orc.go index 803833f2..760ca857 100644 --- a/internal/encoding/block/from_orc.go +++ b/internal/encoding/block/from_orc.go @@ -17,7 +17,7 @@ import ( // FromOrcBy decodes a set of blocks from an orc file and repartitions // it by the specified partition key. -func FromOrcBy(payload []byte, partitionBy string, filter *typeof.Schema, computed ...*column.Computed) ([]Block, error) { +func FromOrcBy(payload []byte, partitionBy string, filter *typeof.Schema, computed ...column.Computed) ([]Block, error) { const max = 10000000 // 10MB iter, err := orc.FromBuffer(payload) diff --git a/internal/encoding/block/from_request.go b/internal/encoding/block/from_request.go index 9d268dd9..d931822a 100644 --- a/internal/encoding/block/from_request.go +++ b/internal/encoding/block/from_request.go @@ -13,7 +13,7 @@ import ( // FromRequestBy creates a block from a talaria protobuf-encoded request. It // repartitions the batch by a given partition key at the same time. -func FromRequestBy(request *talaria.IngestRequest, partitionBy string, filter *typeof.Schema, computed ...*column.Computed) ([]Block, error) { +func FromRequestBy(request *talaria.IngestRequest, partitionBy string, filter *typeof.Schema, computed ...column.Computed) ([]Block, error) { switch data := request.GetData().(type) { case *talaria.IngestRequest_Batch: return FromBatchBy(data.Batch, partitionBy, filter, computed...) diff --git a/internal/encoding/block/from_url.go b/internal/encoding/block/from_url.go index 71eeb462..1012f9bb 100644 --- a/internal/encoding/block/from_url.go +++ b/internal/encoding/block/from_url.go @@ -15,8 +15,8 @@ import ( ) // FromURLBy creates a block from a remote url which should be loaded. It repartitions the batch by a given partition key at the same time. -func FromURLBy(uri string, partitionBy string, filter *typeof.Schema, computed ...*column.Computed) ([]Block, error) { - var handler func([]byte, string, *typeof.Schema, ...*column.Computed) ([]Block, error) +func FromURLBy(uri string, partitionBy string, filter *typeof.Schema, computed ...column.Computed) ([]Block, error) { + var handler func([]byte, string, *typeof.Schema, ...column.Computed) ([]Block, error) switch strings.ToLower(filepath.Ext(uri)) { case ".orc": handler = FromOrcBy diff --git a/internal/encoding/block/transform.go b/internal/encoding/block/transform.go index 94f42704..29a085ce 100644 --- a/internal/encoding/block/transform.go +++ b/internal/encoding/block/transform.go @@ -66,7 +66,7 @@ func (r row) AppendTo(cols column.Columns) (size int) { } // Transform runs the computed columns and overwrites/appends them to the set. -func (r row) Transform(computed []*column.Computed, filter *typeof.Schema) row { +func (r row) Transform(computed []column.Computed, filter *typeof.Schema) row { // Create a new output row and copy the column values from the input schema := make(typeof.Schema, len(r.schema)) diff --git a/internal/encoding/block/transform_test.go b/internal/encoding/block/transform_test.go index d67cb0c3..b7066925 100644 --- a/internal/encoding/block/transform_test.go +++ b/internal/encoding/block/transform_test.go @@ -37,7 +37,7 @@ func TestTransform(t *testing.T) { in.Set("c", 123) // Run a transformation - out := in.Transform([]*column.Computed{dataColumn}, &filter) + out := in.Transform([]column.Computed{dataColumn}, &filter) assert.NotNil(t, out) // Make sure input is not changed @@ -67,7 +67,7 @@ func TestTransform_NoFilter(t *testing.T) { in.Set("c", 123) // Run a transformation - out := in.Transform([]*column.Computed{dataColumn}, nil) + out := in.Transform([]column.Computed{dataColumn}, nil) assert.NotNil(t, out) // Make sure input is not changed diff --git a/internal/server/server.go b/internal/server/server.go index b293f2a8..98e8c7ca 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -86,7 +86,7 @@ type Server struct { monitor monitor.Monitor // The monitoring layer cancel context.CancelFunc // The cancellation function for the server tables map[string]table.Table // The list of tables - computed []*column.Computed // The set of computed columns + computed []column.Computed // The set of computed columns s3sqs *s3sqs.Ingress // The S3SQS Ingress (optional) }