diff --git a/go.mod b/go.mod index 5151ff01e0..ec482f745d 100644 --- a/go.mod +++ b/go.mod @@ -10,12 +10,14 @@ require ( github.com/c-bata/go-prompt v0.2.2 github.com/cespare/xxhash v1.1.0 github.com/dave/jennifer v1.2.0 + github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect github.com/eclipse/paho.mqtt.golang v1.2.0 github.com/go-sql-driver/mysql v1.4.1 github.com/golang/geo v0.0.0-20190916061304-5b978397cfec github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect github.com/google/flatbuffers v1.11.0 github.com/google/go-cmp v0.3.0 + github.com/google/uuid v1.1.1 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e github.com/influxdata/pkg-config v0.2.0 @@ -30,6 +32,7 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.1 github.com/opentracing/opentracing-go v1.0.2 github.com/pierrec/lz4 v2.0.5+incompatible // indirect + github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 // indirect github.com/pkg/errors v0.9.1 github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5 // indirect github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 @@ -37,8 +40,10 @@ require ( github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b github.com/segmentio/kafka-go v0.1.0 github.com/sergi/go-diff v1.0.0 // indirect + github.com/snowflakedb/gosnowflake v1.3.4 github.com/spf13/cobra v0.0.3 go.uber.org/zap v1.9.1 + golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 // indirect golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522 golang.org/x/net v0.0.0-20190620200207-3b0461eec859 golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 diff --git a/go.sum b/go.sum index adb76cec24..9f7aaaeb9f 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,8 @@ github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhr github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0= github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -71,6 +73,8 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= @@ -119,6 +123,8 @@ github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98= +github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= @@ -147,6 +153,8 @@ github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfP github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/snowflakedb/gosnowflake v1.3.4 h1:Gyoi6g4lMHsilEwW9+KV+bgYkJTgf5pVfvL7Utus920= +github.com/snowflakedb/gosnowflake v1.3.4/go.mod h1:NsRq2QeiMUuoNUJhp5Q6xGC4uBrsS9g6LwZVEkTWgsE= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8= @@ -180,6 +188,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de h1:xSjD6HQTqT0H/k60N5yYBtnN1OEkVy7WIo/DYyxKRO0= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= diff --git a/stdlib/sql/from.go b/stdlib/sql/from.go index d149809cb8..b69724d2a0 100644 --- a/stdlib/sql/from.go +++ b/stdlib/sql/from.go @@ -119,6 +119,8 @@ func createFromSQLSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a ex newRowReader = NewSqliteRowReader case "postgres", "sqlmock": newRowReader = NewPostgresRowReader + case "snowflake": + newRowReader = NewSnowflakeRowReader default: return nil, errors.Newf(codes.Invalid, "sql driver %s not supported", spec.DriverName) } diff --git a/stdlib/sql/from_private_test.go b/stdlib/sql/from_private_test.go index 668f0c6425..4894b086d2 100644 --- a/stdlib/sql/from_private_test.go +++ b/stdlib/sql/from_private_test.go @@ -25,6 +25,14 @@ func TestFromSqlUrlValidation(t *testing.T) { Query: "", }, ErrMsg: "", + }, { + Name: "ok snowflake", + Spec: &FromSQLProcedureSpec{ + DriverName: "snowflake", + DataSourceName: "username:password@accountname.us-east-1/dbname", + Query: "", + }, + ErrMsg: "", }, { Name: "invalid driver", Spec: &FromSQLProcedureSpec{ diff --git a/stdlib/sql/snowflake.go b/stdlib/sql/snowflake.go new file mode 100644 index 0000000000..51ed27c05d --- /dev/null +++ b/stdlib/sql/snowflake.go @@ -0,0 +1,202 @@ +package sql + +import ( + "database/sql" + "strconv" + "time" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/codes" + "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/internal/errors" + "github.com/influxdata/flux/values" +) + +// Snowflake DB support. +// Notes: +// * type mapping +// - see https://pkg.go.dev/github.com/snowflakedb/gosnowflake +// - current mappings are valid for v1.3.4 + +type SnowflakeRowReader struct { + Cursor *sql.Rows + columns []interface{} + columnTypes []flux.ColType + columnNames []string + sqlTypes []*sql.ColumnType + NextFunc func() bool + CloseFunc func() error +} + +const ( + layoutDate = "2006-01-02" + layoutTime = "15:04:05" + layoutTimeStampNtz = "2006-01-02T15:04:05.0000000000" +) + +// Next prepares SnowflakeRowReader to return rows +func (m *SnowflakeRowReader) Next() bool { + if m.NextFunc != nil { + return m.NextFunc() + } + next := m.Cursor.Next() + if next { + columnNames, err := m.Cursor.Columns() + if err != nil { + return false + } + m.columns = make([]interface{}, len(columnNames)) + columnPointers := make([]interface{}, len(columnNames)) + for i := 0; i < len(columnNames); i++ { + columnPointers[i] = &m.columns[i] + } + if err := m.Cursor.Scan(columnPointers...); err != nil { + return false + } + } + return next +} + +func (m *SnowflakeRowReader) GetNextRow() ([]values.Value, error) { + row := make([]values.Value, len(m.columns)) + for i, column := range m.columns { + switch value := column.(type) { + case bool, int64, float64: // never happens with scan into []*interface{} + row[i] = values.New(value) + case string: + switch m.columnTypes[i] { + case flux.TFloat: + f, err := strconv.ParseFloat(value, 64) + if err != nil { + return nil, err + } + row[i] = values.NewFloat(f) + case flux.TInt: + d, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return nil, err + } + row[i] = values.NewInt(d) + case flux.TBool: + b, err := strconv.ParseBool(value) + if err != nil { + return nil, err + } + row[i] = values.NewBool(b) + default: + row[i] = values.New(value) + } + case time.Time: + // DATE, TIME and TIMESTAMP_NTZ types get scanned to time.Time by the driver, + // but they have no counterpart in Flux therefore will be represented as string + switch m.sqlTypes[i].DatabaseTypeName() { + case "DATE": + row[i] = values.NewString(value.Format(layoutDate)) + case "TIME": + row[i] = values.NewString(value.Format(layoutTime)) + case "TIMESTAMP_NTZ": + row[i] = values.NewString(value.Format(layoutTimeStampNtz)) + default: + row[i] = values.NewTime(values.ConvertTime(value)) + } + case nil: + row[i] = values.NewNull(flux.SemanticType(m.columnTypes[i])) + default: + execute.PanicUnknownType(flux.TInvalid) + } + } + return row, nil +} + +func (m *SnowflakeRowReader) InitColumnNames(names []string) { + m.columnNames = names +} + +func (m *SnowflakeRowReader) InitColumnTypes(types []*sql.ColumnType) { + fluxTypes := make([]flux.ColType, len(types)) + for i := 0; i < len(types); i++ { + switch types[i].DatabaseTypeName() { + case "FIXED", "NUMBER": // FIXED is reported by Snowflake driver + _, scale, ok := types[i].DecimalSize() + if ok && scale > 0 { + fluxTypes[i] = flux.TFloat + } else { + fluxTypes[i] = flux.TInt + } + case "REAL", "FLOAT": // REAL is reported by Snowflake driver + fluxTypes[i] = flux.TFloat + case "BOOLEAN": + fluxTypes[i] = flux.TBool + case "TIMESTAMP_TZ", "TIMESTAMP_LTZ": // "TIMESTAMP_NTZ", "DATE" and "TIME" will be represented as string + fluxTypes[i] = flux.TTime + default: + fluxTypes[i] = flux.TString + } + } + m.columnTypes = fluxTypes + m.sqlTypes = types +} + +func (m *SnowflakeRowReader) ColumnNames() []string { + return m.columnNames +} + +func (m *SnowflakeRowReader) ColumnTypes() []flux.ColType { + return m.columnTypes +} + +func (m *SnowflakeRowReader) SetColumnTypes(types []flux.ColType) { + m.columnTypes = types +} + +func (m *SnowflakeRowReader) SetColumns(i []interface{}) { + m.columns = i +} + +func (m *SnowflakeRowReader) Close() error { + if m.CloseFunc != nil { + return m.CloseFunc() + } + if err := m.Cursor.Err(); err != nil { + return err + } + return m.Cursor.Close() +} + +func NewSnowflakeRowReader(r *sql.Rows) (execute.RowReader, error) { + reader := &SnowflakeRowReader{ + Cursor: r, + } + cols, err := r.Columns() + if err != nil { + return nil, err + } + reader.InitColumnNames(cols) + + types, err := r.ColumnTypes() + if err != nil { + return nil, err + } + reader.InitColumnTypes(types) + + return reader, nil +} + +var fluxToSnowflake = map[flux.ColType]string{ + flux.TFloat: "FLOAT", + flux.TInt: "NUMBER", + flux.TString: "TEXT", + flux.TBool: "BOOLEAN", + flux.TTime: "TIMESTAMP_LTZ", +} + +// SnowflakeTranslateColumn translates flux colTypes into their corresponding Snowflake column type +func SnowflakeColumnTranslateFunc() translationFunc { + return func(f flux.ColType, colName string) (string, error) { + s, found := fluxToSnowflake[f] + if !found { + return "", errors.Newf(codes.Internal, "Snowflake does not support column type %s", f.String()) + } + return colName + " " + s, nil + } +} diff --git a/stdlib/sql/source_validator.go b/stdlib/sql/source_validator.go index 1fc7d044b4..3b0e10c917 100644 --- a/stdlib/sql/source_validator.go +++ b/stdlib/sql/source_validator.go @@ -8,9 +8,10 @@ import ( "github.com/influxdata/flux/codes" "github.com/influxdata/flux/dependencies/url" "github.com/influxdata/flux/internal/errors" + "github.com/snowflakedb/gosnowflake" ) -// helper function to validate the data source url (postgres, sqlmock) / dsn (mysql) using the URLValidator. +// helper function to validate the data source url (postgres, sqlmock) / dsn (mysql, snowflake) using the URLValidator. func validateDataSource(validator url.Validator, driverName string, dataSourceName string) error { /* @@ -60,6 +61,17 @@ func validateDataSource(validator url.Validator, driverName string, dataSourceNa if err != nil { return errors.Newf(codes.Invalid, "invalid data source url: %v", err) } + case "snowflake": + // an example is: username:password@accountname/dbname/testschema?warehouse=mywh + cfg, err := gosnowflake.ParseDSN(dataSourceName) + if err != nil { + return errors.Newf(codes.Invalid, "invalid data source dsn: %v", err) + } + u = &neturl.URL{ + Scheme: cfg.Protocol, + User: neturl.UserPassword(cfg.User, cfg.Password), + Host: cfg.Host, + } default: return errors.Newf(codes.Invalid, "sql driver %s not supported", driverName) } diff --git a/stdlib/sql/to.go b/stdlib/sql/to.go index a4bd48e63f..b10bb8f81e 100644 --- a/stdlib/sql/to.go +++ b/stdlib/sql/to.go @@ -266,10 +266,11 @@ func getTranslationFunc(driverName string) (func() translationFunc, error) { return PostgresColumnTranslateFunc, nil case "mysql": return MysqlColumnTranslateFunc, nil + case "snowflake": + return SnowflakeColumnTranslateFunc, nil default: return nil, errors.Newf(codes.Internal, "invalid driverName: %s", driverName) } - } func CreateInsertComponents(t *ToSQLTransformation, tbl flux.Table) (colNames []string, valStringArray [][]string, valArgsArray [][]interface{}, err error) { diff --git a/stdlib/sql/to_privates_test.go b/stdlib/sql/to_privates_test.go index ebe8634adb..0eda5e7d6b 100644 --- a/stdlib/sql/to_privates_test.go +++ b/stdlib/sql/to_privates_test.go @@ -66,6 +66,13 @@ func TestTranslationDriverReturn(t *testing.T) { t.Fail() } + // verify that valid returns expected happiness for Snowflake + _, err = getTranslationFunc("snowflake") + if !cmp.Equal(nil, err) { + t.Log(cmp.Diff(nil, err)) + t.Fail() + } + } func TestSqliteTranslation(t *testing.T) { @@ -147,7 +154,7 @@ func TestMysqlTranslation(t *testing.T) { } columnLabel := "apples" - // verify that valid returns expected happiness for postgres + // verify that valid returns expected happiness for mysql sqlT, err := getTranslationFunc("mysql") if !cmp.Equal(nil, err) { t.Log(cmp.Diff(nil, err)) @@ -166,3 +173,33 @@ func TestMysqlTranslation(t *testing.T) { } } } + +func TestSnowflakeTranslation(t *testing.T) { + snowflakeTypeTranslations := map[string]flux.ColType{ + "FLOAT": flux.TFloat, + "NUMBER": flux.TInt, + "TEXT": flux.TString, + "TIMESTAMP_LTZ": flux.TTime, + "BOOLEAN": flux.TBool, + } + + columnLabel := "apples" + // verify that valid returns expected happiness for snowflake + sqlT, err := getTranslationFunc("snowflake") + if !cmp.Equal(nil, err) { + t.Log(cmp.Diff(nil, err)) + t.Fail() + } + + for dbTypeString, fluxType := range snowflakeTypeTranslations { + v, err := sqlT()(fluxType, columnLabel) + if !cmp.Equal(nil, err) { + t.Log(cmp.Diff(nil, err)) + t.Fail() + } + if !cmp.Equal(columnLabel+" "+dbTypeString, v) { + t.Log(cmp.Diff(columnLabel+" "+dbTypeString, v)) + t.Fail() + } + } +}