Skip to content

Commit

Permalink
feat: modify tsbs to support using in ceresdb integration test (#6)
Browse files Browse the repository at this point in the history
* support creating partitioned table, you can make by setting `--parition-keys xxx`.

* support setting access mode of ceresdb client(--access-mode direct/proxy).

* support output query responses to defined file.

* remove useless prints, and refactor create sql's building.

* fix naming style.

* fix naming style again...

* Extract client's build to a function for eliminating duplicated codes.

* make queries result printing more pretty.

* Support setting update mode.

* support setting partition num.
  • Loading branch information
Rachelint authored Sep 6, 2023
1 parent 4d5fcf4 commit 61dc11d
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 21 deletions.
8 changes: 8 additions & 0 deletions cmd/tsbs_load_ceresdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,20 @@ func initProgramOptions() (*ceresdb.SpecificConfig, load.BenchmarkRunner, *load.
storageFormat := viper.GetString("storage-format")
primaryKeys := viper.GetString("primary-keys")
rowGroupSize := viper.GetInt64("row-group-size")
partitionKeys := viper.GetString("partition-keys")
partitionNum := viper.GetUint32("partition-num")
accessMode := viper.GetString("access-mode")
updateMode := viper.GetString("update-mode")
loader := load.GetBenchmarkRunner(loaderConf)
return &ceresdb.SpecificConfig{
CeresdbAddr: ceresdbAddr,
StorageFormat: storageFormat,
RowGroupSize: rowGroupSize,
PrimaryKeys: primaryKeys,
PartitionKeys: partitionKeys,
PartitionNum: partitionNum,
AccessMode: accessMode,
UpdateMode: updateMode,
}, loader, &loaderConf
}

Expand Down
54 changes: 47 additions & 7 deletions cmd/tsbs_run_queries_ceresdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,27 @@ package main
import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/CeresDB/ceresdb-client-go/ceresdb"
ceresdbSdk "github.com/CeresDB/ceresdb-client-go/ceresdb"
"github.com/blagojts/viper"
"github.com/spf13/pflag"
"github.com/timescale/tsbs/internal/utils"
"github.com/timescale/tsbs/pkg/query"
"github.com/timescale/tsbs/pkg/targets/ceresdb"
)

// Program option vars:
var (
ceresdbAddr string

showExplain bool

accessMode string

responsesFile string
)

// Global vars:
Expand All @@ -39,6 +46,8 @@ func init() {
"ceresdb gRPC endpoint",
)
pflag.Bool("show-explain", false, "Print out the EXPLAIN output for sample query")
pflag.String("access-mode", "direct", "Access mode of ceresdb client")
pflag.String("responses-file", "", "Write responses to this file if enable responses printing")
pflag.Parse()

err := utils.SetupConfigFile()
Expand All @@ -53,7 +62,8 @@ func init() {

ceresdbAddr = viper.GetString("ceresdb-addr")
showExplain = viper.GetBool("show-explain")

accessMode = viper.GetString("access-mode")
responsesFile = viper.GetString("responses-file")
runner = query.NewBenchmarkRunner(config)
}

Expand All @@ -69,8 +79,9 @@ type queryExecutorOptions struct {

// query.Processor interface implementation
type processor struct {
db ceresdb.Client
opts *queryExecutorOptions
db ceresdbSdk.Client
opts *queryExecutorOptions
queryResultsFile *os.File
}

// query.Processor interface implementation
Expand All @@ -80,11 +91,20 @@ func newProcessor() query.Processor {

// query.Processor interface implementation
func (p *processor) Init(workerNumber int) {
client, err := ceresdb.NewClient(ceresdbAddr, ceresdb.Direct, ceresdb.WithDefaultDatabase("public"))
client, err := ceresdb.NewClient(ceresdbAddr, accessMode, ceresdbSdk.WithDefaultDatabase("public"))
if err != nil {
panic(err)
}
p.db = client

if responsesFile != "" {
queryResultsFile, err := os.Create(responsesFile)
if err != nil {
panic(err)
}
p.queryResultsFile = queryResultsFile
}

p.opts = &queryExecutorOptions{
showExplain: false,
debug: runner.DebugLevel() > 0,
Expand All @@ -110,7 +130,7 @@ func (p *processor) ProcessQuery(q query.Query, isWarm bool) ([]*query.Stat, err
}

// Main action - run the query
rows, err := p.db.SQLQuery(context.TODO(), ceresdb.SQLQueryRequest{
rows, err := p.db.SQLQuery(context.TODO(), ceresdbSdk.SQLQueryRequest{
Tables: []string{string(ceresdbQuery.Table)},
SQL: sql,
})
Expand All @@ -123,7 +143,13 @@ func (p *processor) ProcessQuery(q query.Query, isWarm bool) ([]*query.Stat, err
fmt.Println(sql)
}
if p.opts.printResponse {
fmt.Printf("request = %v\n", rows)
rowsStr := RowsToStr(rows.Rows)
query_res := fmt.Sprintf("###query\n sql: %v\naffected: %v\nrows: \n%s\n\n", rows.SQL, rows.AffectedRows, rowsStr)
if p.queryResultsFile != nil {
p.queryResultsFile.WriteString(query_res)
} else {
fmt.Print(query_res)
}
}

took := float64(time.Since(start).Nanoseconds()) / 1e6
Expand All @@ -133,3 +159,17 @@ func (p *processor) ProcessQuery(q query.Query, isWarm bool) ([]*query.Stat, err

return []*query.Stat{stat}, err
}

func RowsToStr(rows []ceresdbSdk.Row) string {
rowLen := len(rows)
if rowLen == 0 {
return ""
}

rowStrs := make([]string, 0, len(rows))
for _, row := range rows {
rowStrs = append(rowStrs, fmt.Sprintf("%v", row))
}

return strings.Join(rowStrs, "\n")
}
7 changes: 6 additions & 1 deletion pkg/targets/ceresdb/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type SpecificConfig struct {
StorageFormat string `yaml:"storageFormat" mapstructure:"storageFormat"`
RowGroupSize int64 `yaml:"rowGroupSize" mapstructure:"rowGroupSize"`
PrimaryKeys string `yaml:"primaryKeys" mapstructure:"primaryKeys"`
PartitionKeys string `yaml:"partitionKeys" mapstructure:"partitionKeys"`
PartitionNum uint32 `yaml:"partitionKeys" mapstructure:"partitionNum"`
AccessMode string `yaml:"accessMode" mapstructure:"accessMode"`
UpdateMode string `yaml:"updateMode" mapstructure:"updateMode"`
}

func parseSpecificConfig(v *viper.Viper) (*SpecificConfig, error) {
Expand All @@ -44,7 +48,8 @@ func NewBenchmark(config *SpecificConfig, dataSourceConfig *source.DataSourceCon
dataSource := &fileDataSource{
scanner: bufio.NewScanner(br),
}
client, err := ceresdb.NewClient(config.CeresdbAddr, ceresdb.Direct, ceresdb.WithDefaultDatabase("public"))

client, err := NewClient(config.CeresdbAddr, config.AccessMode, ceresdb.WithDefaultDatabase("public"))
if err != nil {
panic(err)
}
Expand Down
37 changes: 24 additions & 13 deletions pkg/targets/ceresdb/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (d *dbCreator) DBExists(dbName string) bool { return true }

// loader.DBCreator interface implementation
func (d *dbCreator) CreateDB(dbName string) error {
client, err := ceresdb.NewClient(d.config.CeresdbAddr, ceresdb.Direct, ceresdb.WithDefaultDatabase("public"))
client, err := NewClient(d.config.CeresdbAddr, d.config.AccessMode, ceresdb.WithDefaultDatabase("public"))
if err != nil {
return err
}
Expand Down Expand Up @@ -78,19 +78,30 @@ func (d *dbCreator) createTable(client ceresdb.Client, tableName string,
columnDefs = append(columnDefs, fmt.Sprintf("`%s` double", field))
}

tmpl := `
create table if not exists %s (
%s,
primary key(%s)
) with (
enable_ttl = 'false',
num_rows_per_row_group='%d',
storage_format = '%s'
);
// The sql can be divided into three parts:
// + Create part
// + Partition part
// + With part
crTmpl := `create table if not exists %s (
%s,
primary key(%s)
)`
partTmpl := `partition by key (%s) partitions %v`
withTmpl := `with (
enable_ttl = 'false',
num_rows_per_row_group='%d',
storage_format = '%s',
update_mode='%s'
);`

`
sql := fmt.Sprintf(tmpl, tableName, strings.Join(columnDefs, ","), d.config.PrimaryKeys, d.config.RowGroupSize, d.config.StorageFormat)
// fmt.Printf("sql = %s\n", sql)
// Make sql
sql := fmt.Sprintf(crTmpl, tableName, strings.Join(columnDefs, ","), d.config.PrimaryKeys) + "\n"
if d.config.PartitionKeys != "" {
sql = sql + fmt.Sprintf(partTmpl, d.config.PartitionKeys, d.config.PartitionNum) + "\n"
}
sql = sql + fmt.Sprintf(withTmpl, d.config.RowGroupSize, d.config.StorageFormat, d.config.UpdateMode)

// Execute
_, err := client.SQLQuery(context.TODO(), ceresdb.SQLQueryRequest{
Tables: []string{tableName},
SQL: sql,
Expand Down
20 changes: 20 additions & 0 deletions pkg/targets/ceresdb/implemented_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@ func (vm vmTarget) TargetSpecificFlags(flagPrefix string, flagSet *pflag.FlagSet
"tsid,timestamp",
"Primary keys used when create table",
)
flagSet.String(
flagPrefix+"partition-keys",
"",
"Partition keys used when create partitioned table",
)
flagSet.Uint32(
flagPrefix+"partition-num",
4,
"Partition keys used when create partitioned table",
)
flagSet.String(
flagPrefix+"access-mode",
"direct",
"Access mode of ceresdb client",
)
flagSet.String(
flagPrefix+"update-mode",
"OVERWRITE",
"Update mode when insert",
)
}

func (vm vmTarget) TargetName() string {
Expand Down
11 changes: 11 additions & 0 deletions pkg/targets/ceresdb/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ceresdb

import "github.com/CeresDB/ceresdb-client-go/ceresdb"

func NewClient(endpoint string, accessMode string, opts ...ceresdb.Option) (ceresdb.Client, error) {
mode := ceresdb.Direct
if accessMode == "proxy" {
mode = ceresdb.Proxy
}
return ceresdb.NewClient(endpoint, mode, opts...)
}

0 comments on commit 61dc11d

Please sign in to comment.