Skip to content

Commit

Permalink
Added an ability to sink to multiple writers (#23)
Browse files Browse the repository at this point in the history
* Added an ability to sink to multiple writers

* utc

* fixed timezone in test

* dep upgrade
  • Loading branch information
kelindar committed May 13, 2020
1 parent d956737 commit 3888dee
Show file tree
Hide file tree
Showing 9 changed files with 510 additions and 74 deletions.
54 changes: 34 additions & 20 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,58 @@ module github.com/kelindar/talaria
go 1.14

require (
cloud.google.com/go/bigquery v1.4.0
cloud.google.com/go/storage v1.6.0
github.com/Azure/azure-sdk-for-go v41.2.0+incompatible
github.com/Azure/go-autorest/autorest v0.10.0 // indirect
cloud.google.com/go v0.57.0 // indirect
cloud.google.com/go/bigquery v1.7.0
cloud.google.com/go/storage v1.7.0
github.com/Azure/azure-sdk-for-go v42.1.0+incompatible
github.com/Azure/go-autorest/autorest v0.10.1 // indirect
github.com/Azure/go-autorest/autorest/adal v0.8.3 // indirect
github.com/Azure/go-autorest/autorest/to v0.3.0 // indirect
github.com/DataDog/datadog-go v2.2.0+incompatible
github.com/DataDog/datadog-go v3.7.1+incompatible
github.com/DataDog/zstd v1.4.5 // indirect
github.com/Knetic/govaluate v3.0.0+incompatible
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/aws/aws-sdk-go v1.25.31
github.com/armon/go-metrics v0.3.3 // indirect
github.com/aws/aws-sdk-go v1.30.25
github.com/crphang/orc v0.0.3
github.com/dgraph-io/badger/v2 v2.0.2
github.com/dgryski/go-farm v0.0.0-20191112170834-c2139c5d712b // indirect
github.com/dgraph-io/badger/v2 v2.0.3
github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dnaeon/go-vcr v1.0.1 // indirect
github.com/emitter-io/address v1.0.0
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.4.1 // indirect
github.com/golang/snappy v0.0.1
github.com/gopherjs/gopherjs v0.0.0-20200209183636-89e6cbcd0b6d // indirect
github.com/gorilla/mux v1.7.4
github.com/grab/async v0.0.5
github.com/hako/durafmt v0.0.0-20191009132224-3f39dc1ed9f4
github.com/hashicorp/go-immutable-radix v1.1.0 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/go-immutable-radix v1.2.0 // indirect
github.com/hashicorp/go-msgpack v1.1.5 // indirect
github.com/hashicorp/go-multierror v1.1.0
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/hashicorp/memberlist v0.1.5
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/memberlist v0.2.2
github.com/imroc/req v0.3.0 // indirect
github.com/kelindar/binary v1.0.8
github.com/kelindar/loader v0.0.10
github.com/kelindar/lua v0.0.6
github.com/miekg/dns v1.1.22 // indirect
github.com/miekg/dns v1.1.29 // indirect
github.com/myteksi/hystrix-go v1.1.3
github.com/samuel/go-thrift v0.0.0-20190219015601-e8b6b52668fe
github.com/samuel/go-thrift v0.0.0-20191111193933-5165175b40af
github.com/satori/go.uuid v1.2.0 // indirect
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.4.0
github.com/stretchr/testify v1.5.1
github.com/twmb/murmur3 v1.1.3
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
google.golang.org/grpc v1.27.1
gopkg.in/yaml.v2 v2.2.4
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb // indirect
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37 // indirect
golang.org/x/net v0.0.0-20200506145744-7e3656a0809f // indirect
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25 // indirect
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect
golang.org/x/tools v0.0.0-20200512001501-aaeff5de670a // indirect
google.golang.org/api v0.24.0 // indirect
google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380 // indirect
google.golang.org/grpc v1.29.1
gopkg.in/yaml.v2 v2.2.8
)
218 changes: 184 additions & 34 deletions go.sum

Large diffs are not rendered by default.

59 changes: 59 additions & 0 deletions internal/scripting/net/net.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package net

import (
"errors"
"net"
"strings"

"github.com/emitter-io/address"
"github.com/kelindar/lua"
"github.com/kelindar/talaria/internal/monitor"
)

const ctxTag = "net"

// New creates a new lua module exposing the stats
func New(monitor monitor.Monitor) lua.Module {
l := &libnet{monitor}
m := &lua.NativeModule{
Name: "net",
Version: "1.0",
}

m.Register("getmac", l.GetMac)
m.Register("addr", l.Addr)
return m
}

type libnet struct {
monitor monitor.Monitor
}

// GetMac returns the mac address
func (l *libnet) GetMac() (lua.String, error) {
ifas, err := net.Interfaces()
if err != nil {
return "", err
}

for _, ifa := range ifas {
if addr := ifa.HardwareAddr.String(); addr != "" {
return lua.String(addr), nil
}
}
return "", errors.New("no valid mac address found")
}

// Addr returns the address of a specific host
func (l *libnet) Addr(host lua.String) (lua.String, error) {
if !strings.Contains(string(host), ":") {
host = host + ":80"
}

addr, err := address.Parse(string(host), 0)
if err != nil {
return "", err
}

return lua.String(addr.IP.String()), nil
}
49 changes: 49 additions & 0 deletions internal/scripting/net/net_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package net

import (
"context"
"testing"

"github.com/kelindar/lua"
"github.com/kelindar/talaria/internal/monitor"
"github.com/stretchr/testify/assert"
)

func Test_GetMac(t *testing.T) {
m := New(monitor.NewNoop())
assert.NotNil(t, m)

// Create a new script
s, err := lua.FromString("test", `
local net = require("net")
function main()
local addr = net.getmac()
return addr
end`, m)
assert.NoError(t, err)
assert.NotNil(t, s)

addr, err := s.Run(context.Background())
assert.NoError(t, err)
assert.Contains(t, []rune(addr.(lua.String)), rune(':'))
}

func Test_Addr(t *testing.T) {
m := New(monitor.NewNoop())
assert.NotNil(t, m)

// Create a new script
s, err := lua.FromString("test", `
local net = require("net")
function main()
return net.addr("localhost")
end`, m)
assert.NoError(t, err)
assert.NotNil(t, s)

addr, err := s.Run(context.Background())
assert.NoError(t, err)
assert.Equal(t, "127.0.0.1", string(addr.(lua.String)))
}
62 changes: 61 additions & 1 deletion internal/storage/flush/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import (
"testing"

eorc "github.com/crphang/orc"
"github.com/kelindar/binary"
"github.com/kelindar/talaria/internal/column"
"github.com/kelindar/talaria/internal/encoding/block"
"github.com/kelindar/talaria/internal/encoding/orc"
"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/kelindar/talaria/internal/monitor"
"github.com/kelindar/talaria/internal/scripting"
"github.com/kelindar/talaria/internal/storage/writer/noop"
"github.com/kelindar/binary"
"github.com/stretchr/testify/assert"
)

const testBlockFile = "../../../test/testBlocks"
Expand Down Expand Up @@ -198,3 +199,62 @@ func BenchmarkFlush(b *testing.B) {
})

}

func TestNameFunc(t *testing.T) {
fileNameFunc := func(row map[string]interface{}) (string, error) {
lua, _ := column.NewComputed("fileName", typeof.String, `
function main(row)
-- Convert the time to a lua date
local ts = row["col1"]
local tz = timezone()
local dt = os.date('*t', ts - tz)
-- Format the filename
return string.format("year=%d/month=%d/day=%d/ns=%s/%d-%d-%d-%s.orc",
dt.year,
dt.month,
dt.day,
row["col0"],
dt.hour,
dt.min,
dt.sec,
"127.0.0.1")
end
function timezone()
local utcdate = os.date("!*t")
local localdate = os.date("*t")
--localdate.isdst = false
return os.difftime(os.time(localdate), os.time(utcdate))
end
`, script.NewLoader(nil))

output, err := lua.Value(row)
return output.(string), err
}
flusher := New(monitor.NewNoop(), noop.New(), fileNameFunc)

schema := typeof.Schema{
"col0": typeof.String,
"col1": typeof.Timestamp,
"col2": typeof.Float64,
}

orcSchema, err := orc.SchemaFor(schema)
if err != nil {
t.Fatal(err)
}

orcBuffer := &bytes.Buffer{}
writer, _ := eorc.NewWriter(orcBuffer,
eorc.SetSchema(orcSchema))
_ = writer.Write("eventName", 1, 1.0)
_ = writer.Close()

blocks, err := block.FromOrcBy(orcBuffer.Bytes(), "col0", nil)
fileName, _ := flusher.Merge(blocks, schema)

assert.Equal(t, "year=46970/month=3/day=29/ns=eventName/0-0-0-127.0.0.1.orc", string(fileName))

}
26 changes: 26 additions & 0 deletions internal/storage/writer/multi/multi_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package multi

import (
"testing"

"github.com/kelindar/talaria/internal/encoding/key"
"github.com/stretchr/testify/assert"
)

type MockWriter func(key key.Key, val []byte) error

func (w MockWriter) Write(key key.Key, val []byte) error {
return w(key, val)
}

func TestMulti(t *testing.T) {
var count int
sub := MockWriter(func(key key.Key, val []byte) error {
count++
return nil
})

multiWriter := New(sub, sub, sub)
assert.NoError(t, multiWriter.Write(nil, nil))
assert.Equal(t, 3, count)
}
33 changes: 33 additions & 0 deletions internal/storage/writer/multi/mutli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package multi

import (
"github.com/kelindar/talaria/internal/encoding/key"
)

// SubWriter represents the sub-writer
type SubWriter interface {
Write(key.Key, []byte) error
}

// Writer represents a writer that writes into multiple sub-writers.
type Writer struct {
writers []SubWriter
}

// New ...
func New(writers ...SubWriter) *Writer {
return &Writer{
writers: writers,
}
}

// Write writes the data to the sink.
func (w *Writer) Write(key key.Key, val []byte) error {
for _, w := range w.writers {
if err := w.Write(key, val); err != nil {
return err
}
}

return nil
}
Loading

0 comments on commit 3888dee

Please sign in to comment.