From 063b1c756c98a57ef1f06a4b8a22764d9ff94e96 Mon Sep 17 00:00:00 2001 From: Kenny Date: Sun, 11 Jun 2017 21:40:32 +1000 Subject: [PATCH] first working version --- .gitignore | 3 +- click.go | 191 ++++++++++++++++++++++++++++++++++++++++++ config/config.go | 56 ------------- glide.lock | 125 +++++++++++++++++++--------- glide.yaml | 20 ++++- log/log.go | 211 ----------------------------------------------- main.go | 110 ++++++++++++++++++++++-- srv.go | 125 ++++++++++++++++++++++++++++ 8 files changed, 526 insertions(+), 315 deletions(-) create mode 100644 click.go delete mode 100644 config/config.go delete mode 100644 log/log.go create mode 100644 srv.go diff --git a/.gitignore b/.gitignore index d7d73bf..ac879b4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ bin -vendor \ No newline at end of file +vendor +prom2click diff --git a/click.go b/click.go new file mode 100644 index 0000000..8a7cbae --- /dev/null +++ b/click.go @@ -0,0 +1,191 @@ +package main + +import ( + "database/sql" + "fmt" + "time" + + "sync" + + "github.com/kshvakov/clickhouse" + "github.com/prometheus/client_golang/prometheus" +) + +/* + Clickhouse SQL for expected tables - depending on metric volume and if you don't + need replicas/ha then a MergeTree engine may be all you need. Most likley you'll + want a ReplicatedMergeTree with at least one replica. + + For scaling use a Distributed table accross each MergeTree (or ReplicatedMergeTree). + Prom2click doesn't understand shards and so it depends on the distributed table + hashing to distribute writes accross the shards. + + create database if not exists metrics; + + -no replication - fine for testing: + + create table if not exists metrics.samples + ( + date Date DEFAULT toDate(0), + name String, + tags Array(String), + vals Array(String), + value Float64, + ts UInt32 + + ) ENGINE = MergeTree(date, (tags, ts), 8192); + + -or for replication - this is probably what you want: + + create table if not exists metrics.samples + ( + date Date DEFAULT toDate(0), + name String, + tags Array(String), + vals Array(String), + val Float64, + ts UInt32 + + ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/metrics.samples', '{replica}', date, (tags, ts), 8192); + + + -and if you have more than one shard - # shards depends on your metric volumes: + + create table if not exists metrics.dist + ( + date Date DEFAULT toDate(0), + name String, + tags Array(String), + vals Array(String), + val Float64, + ts UInt32 + ) ENGINE = Distributed(metrics, metrics, samples, intHash64(name)); + +*/ + +var insertSQL = `INSERT INTO %s.%s + (date, name, tags, vals, val, ts) + VALUES (?, ?, ?, ?, ?, ?)` + +type p2cWriter struct { + conf *config + requests chan *p2cRequest + wg sync.WaitGroup + db *sql.DB + tx prometheus.Counter + ko prometheus.Counter + timings prometheus.Histogram +} + +func NewP2CWriter(conf *config, reqs chan *p2cRequest) (*p2cWriter, error) { + var err error + w := new(p2cWriter) + w.conf = conf + w.requests = reqs + w.db, err = sql.Open("clickhouse", w.conf.ChDSN) + if err != nil { + fmt.Printf("Error connecting to clickhouse: %s\n", err.Error()) + return w, err + } + + w.tx = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "sent_samples_total", + Help: "Total number of processed samples sent to remote storage.", + }, + ) + + w.ko = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "failed_samples_total", + Help: "Total number of processed samples which failed on send to remote storage.", + }, + ) + + w.timings = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "sent_batch_duration_seconds", + Help: "Duration of sample batch send calls to the remote storage.", + Buckets: prometheus.DefBuckets, + }, + ) + prometheus.MustRegister(w.tx) + prometheus.MustRegister(w.ko) + prometheus.MustRegister(w.timings) + + return w, nil +} + +func (w *p2cWriter) Start() { + + go func() { + w.wg.Add(1) + fmt.Println("Clickhouse writer starting..") + sql := fmt.Sprintf(insertSQL, w.conf.ChDB, w.conf.ChTable) + ok := true + for ok { + // get next batch of requests + var reqs []*p2cRequest + + tstart := time.Now() + for i := 0; i < w.conf.ChBatch; i++ { + var req *p2cRequest + // get requet and also check if channel is closed + req, ok = <-w.requests + if !ok { + fmt.Println("clickhouse writer stopping..") + break + } + reqs = append(reqs, req) + } + + // ensure we have something to send.. + nmetrics := len(reqs) + if nmetrics < 1 { + continue + } + + // post them to db all at once + tx, err := w.db.Begin() + if err != nil { + fmt.Printf("Error: begin transaction: %s\n", err.Error()) + w.ko.Add(1.0) + continue + } + + // build statements + smt, err := tx.Prepare(sql) + for _, req := range reqs { + if err != nil { + fmt.Printf("Error: prepare statement: %s\n", err.Error()) + w.ko.Add(1.0) + continue + } + + _, err = smt.Exec(req.date, req.name, clickhouse.Array(req.tags), + clickhouse.Array(req.vals), req.val, req.ts) + + if err != nil { + fmt.Printf("Error: statement exec: %s\n", err.Error()) + w.ko.Add(1.0) + } + } + + // commit and record metrics + if err = tx.Commit(); err != nil { + fmt.Printf("Error: commit failed: %s\n", err.Error()) + w.ko.Add(1.0) + } else { + w.tx.Add(float64(nmetrics)) + w.timings.Observe(float64(time.Since(tstart))) + } + + } + fmt.Println("clickhouse writer stopped..") + w.wg.Done() + }() +} + +func (w *p2cWriter) Wait() { + w.wg.Wait() +} diff --git a/config/config.go b/config/config.go deleted file mode 100644 index 18ab4ef..0000000 --- a/config/config.go +++ /dev/null @@ -1,56 +0,0 @@ -package config - -import ( - "time" - - "github.com/spf13/viper" -) - -// Provider defines a set of read-only methods for accessing the application -// configuration params as defined in one of the config files. -type Provider interface { - ConfigFileUsed() string - Get(key string) interface{} - GetBool(key string) bool - GetDuration(key string) time.Duration - GetFloat64(key string) float64 - GetInt(key string) int - GetInt64(key string) int64 - GetSizeInBytes(key string) uint - GetString(key string) string - GetStringMap(key string) map[string]interface{} - GetStringMapString(key string) map[string]string - GetStringMapStringSlice(key string) map[string][]string - GetStringSlice(key string) []string - GetTime(key string) time.Time - InConfig(key string) bool - IsSet(key string) bool -} - -var defaultConfig *viper.Viper - -func Config() Provider { - return defaultConfig -} - -func LoadConfigProvider(appName string) Provider { - return readViperConfig(appName) -} - -func init() { - defaultConfig = readViperConfig("PROM2CLICK") -} - -func readViperConfig(appName string) *viper.Viper { - v := viper.New() - v.SetEnvPrefix(appName) - v.AutomaticEnv() - - // global defaults - - v.SetDefault("json_logs", false) - v.SetDefault("loglevel", "debug") - - - return v -} diff --git a/glide.lock b/glide.lock index 9223aa0..e17fba9 100644 --- a/glide.lock +++ b/glide.lock @@ -1,50 +1,95 @@ -hash: dcbd37804bea1f983ccb8c700220c48fb30398a08f7704600584e54bb115a561 -updated: 2017-06-10T18:11:54.309720259+10:00 +hash: 2936591cb6ef55dc5517f96d731851b2135281bd2200919933219089b837e517 +updated: 2017-06-11T21:38:09.459666062+10:00 imports: -- name: github.com/fsnotify/fsnotify - version: 4da3e2cfbabc9f751898f250b49f2439785783a1 -- name: github.com/hashicorp/hcl - version: 392dba7d905ed5d04a5794ba89f558b27e2ba1ca - subpackages: - - hcl/ast - - hcl/parser - - hcl/scanner - - hcl/strconv - - hcl/token - - json/parser - - json/scanner - - json/token -- name: github.com/magiconair/properties - version: 51463bfca2576e06c62a8504b5c0f06d61312647 -- name: github.com/mitchellh/mapstructure - version: d0303fe809921458f417bcf828397a65db30a7e4 -- name: github.com/pelletier/go-buffruneio - version: c37440a7cf42ac63b919c752ca73a85067e05992 -- name: github.com/pelletier/go-toml - version: fe7536c3dee2596cdd23ee9976a17c22bdaae286 +- name: github.com/beorn7/perks + version: 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9 + subpackages: + - quantile +- name: github.com/golang/protobuf + version: 5a0f697c9ed9d68fef0116532c6e05cfeae00e55 + subpackages: + - proto +- name: github.com/golang/snappy + version: 553a641470496b2327abcac10b36396bd98e45c9 +- name: github.com/kshvakov/clickhouse + version: 1250f4e0c94a7f0083e6a4e483bf3344787de0dc +- name: github.com/matttproud/golang_protobuf_extensions + version: c12348ce28de40eed0136aa2b644d0ee0650e56c + subpackages: + - pbutil +- name: github.com/opentracing/opentracing-go + version: eaaf4e1eeb7a5373b38e70901270c83577dc6fb9 + subpackages: + - log +- name: github.com/prometheus/client_golang + version: c5b7fccd204277076155f10851dad72b76a49317 + subpackages: + - prometheus +- name: github.com/prometheus/client_model + version: 6f3806018612930941127f2a7c6c453ba2c527d2 + subpackages: + - go +- name: github.com/prometheus/common + version: 13ba4ddd0caa9c28ca7b7bffe1dfa9ed8d5ef207 + subpackages: + - expfmt + - internal/bitbucket.org/ww/goautoneg + - log + - model +- name: github.com/prometheus/procfs + version: a3bfc74126ea9e45ee5d5c6f7fc86191b7d488fb + subpackages: + - xfs +- name: github.com/prometheus/prometheus + version: bfa37c8ee39d11078662dce16c162a61dccf616c + subpackages: + - config + - relabel + - storage + - storage/local + - storage/local/chunk + - storage/local/codable + - storage/local/index + - storage/metric + - storage/remote + - util/flock + - util/httputil + - util/testutil - name: github.com/Sirupsen/logrus - version: 85b1699d505667d13f8ac4478c1debbf85d6c5de -- name: github.com/spf13/afero - version: 9be650865eab0c12963d8753212f4f9c66cdcf12 - subpackages: - - mem -- name: github.com/spf13/cast - version: acbeb36b902d72a7a4c18e8f3241075e7ab763e4 -- name: github.com/spf13/jwalterweatherman - version: 0efa5202c04663c757d84f90f5219c1250baf94f -- name: github.com/spf13/pflag - version: e57e3eeb33f795204c1ca35f56c44f83227c6e66 -- name: github.com/spf13/viper - version: a1ecfa6a20bd4ef9e9caded262ee1b1b26847675 + version: 202f25545ea4cf9b191ff7f846df5d87c9382c2b +- name: github.com/syndtr/goleveldb + version: 8c81ea47d4c41a385645e133e15510fc6a2a74b4 + subpackages: + - leveldb + - leveldb/cache + - leveldb/comparer + - leveldb/errors + - leveldb/filter + - leveldb/iterator + - leveldb/journal + - leveldb/memdb + - leveldb/opt + - leveldb/storage + - leveldb/table + - leveldb/util +- name: golang.org/x/net + version: 1a68b1313cf4ad7778376e82641197b60c02f65c + subpackages: + - context + - context/ctxhttp - name: golang.org/x/sys version: 0b25a408a50076fbbcae6b7ac0ea5fbb0b085e79 subpackages: - unix -- name: golang.org/x/text - version: 210eee5cf7323015d097341bcf7166130d001cd8 + - windows + - windows/registry + - windows/svc/eventlog +- name: golang.org/x/time + version: 8be79e1e0910c292df4e79c241bb7e8f7e725959 subpackages: - - transform - - unicode/norm + - rate +- name: gopkg.in/tylerb/graceful.v1 + version: 4654dfbb6ad53cb5e27f37d99b02e16c1872fbbb - name: gopkg.in/yaml.v2 version: cd8b52f8269e0feb286dfeef29f8fe4d5b397e0b testImports: [] diff --git a/glide.yaml b/glide.yaml index df4b20c..4d790d8 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,4 +1,20 @@ package: github.com/s4z/prom2click import: -- package: github.com/spf13/viper -- package: github.com/Sirupsen/logrus \ No newline at end of file +- package: github.com/golang/protobuf + subpackages: + - proto +- package: github.com/golang/snappy +- package: github.com/prometheus/client_golang + version: v0.8.0 + subpackages: + - prometheus +- package: github.com/prometheus/common + subpackages: + - log + - model +- package: github.com/prometheus/prometheus + subpackages: + - storage/remote +- package: gopkg.in/tylerb/graceful.v1 + version: v1.2.15 +- package: github.com/kshvakov/clickhouse diff --git a/log/log.go b/log/log.go deleted file mode 100644 index d7c7e0a..0000000 --- a/log/log.go +++ /dev/null @@ -1,211 +0,0 @@ -package log - -import ( - "os" - - "github.com/Sirupsen/logrus" - "github.com/s4z/prom2click/config" -) - -// Logger defines a set of methods for writing application logs. Derived from and -// inspired by logrus.Entry. -type Logger interface { - Debug(args ...interface{}) - Debugf(format string, args ...interface{}) - Debugln(args ...interface{}) - Error(args ...interface{}) - Errorf(format string, args ...interface{}) - Errorln(args ...interface{}) - Fatal(args ...interface{}) - Fatalf(format string, args ...interface{}) - Fatalln(args ...interface{}) - Info(args ...interface{}) - Infof(format string, args ...interface{}) - Infoln(args ...interface{}) - Panic(args ...interface{}) - Panicf(format string, args ...interface{}) - Panicln(args ...interface{}) - Print(args ...interface{}) - Printf(format string, args ...interface{}) - Println(args ...interface{}) - Warn(args ...interface{}) - Warnf(format string, args ...interface{}) - Warning(args ...interface{}) - Warningf(format string, args ...interface{}) - Warningln(args ...interface{}) - Warnln(args ...interface{}) -} - -var defaultLogger *logrus.Logger - -func init() { - defaultLogger = newLogrusLogger(config.Config()) -} - - -func NewLogger(cfg config.Provider) *logrus.Logger { - return newLogrusLogger(cfg) -} - - - -func newLogrusLogger(cfg config.Provider) *logrus.Logger { - - l := logrus.New() - - if cfg.GetBool("json_logs") { - l.Formatter = new(logrus.JSONFormatter) - } - l.Out = os.Stderr - - switch cfg.GetString("loglevel") { - case "debug": - l.Level = logrus.DebugLevel - case "warning": - l.Level = logrus.WarnLevel - case "info": - l.Level = logrus.InfoLevel - default: - l.Level = logrus.DebugLevel - } - - return l -} - -type Fields map[string]interface{} - -func (f Fields) With(k string, v interface{}) Fields { - f[k] = v - return f -} - -func (f Fields) WithFields(f2 Fields) Fields { - for k, v := range f2 { - f[k] = v - } - return f -} - -func WithFields(fields Fields) Logger { - return defaultLogger.WithFields(logrus.Fields(fields)) -} - -// Debug package-level convenience method. -func Debug(args ...interface{}) { - defaultLogger.Debug(args...) -} - -// Debugf package-level convenience method. -func Debugf(format string, args ...interface{}) { - defaultLogger.Debugf(format, args...) -} - -// Debugln package-level convenience method. -func Debugln(args ...interface{}) { - defaultLogger.Debugln(args...) -} - -// Error package-level convenience method. -func Error(args ...interface{}) { - defaultLogger.Error(args...) -} - -// Errorf package-level convenience method. -func Errorf(format string, args ...interface{}) { - defaultLogger.Errorf(format, args...) -} - -// Errorln package-level convenience method. -func Errorln(args ...interface{}) { - defaultLogger.Errorln(args...) -} - -// Fatal package-level convenience method. -func Fatal(args ...interface{}) { - defaultLogger.Fatal(args...) -} - -// Fatalf package-level convenience method. -func Fatalf(format string, args ...interface{}) { - defaultLogger.Fatalf(format, args...) -} - -// Fatalln package-level convenience method. -func Fatalln(args ...interface{}) { - defaultLogger.Fatalln(args...) -} - -// Info package-level convenience method. -func Info(args ...interface{}) { - defaultLogger.Info(args...) -} - -// Infof package-level convenience method. -func Infof(format string, args ...interface{}) { - defaultLogger.Infof(format, args...) -} - -// Infoln package-level convenience method. -func Infoln(args ...interface{}) { - defaultLogger.Infoln(args...) -} - -// Panic package-level convenience method. -func Panic(args ...interface{}) { - defaultLogger.Panic(args...) -} - -// Panicf package-level convenience method. -func Panicf(format string, args ...interface{}) { - defaultLogger.Panicf(format, args...) -} - -// Panicln package-level convenience method. -func Panicln(args ...interface{}) { - defaultLogger.Panicln(args...) -} - -// Print package-level convenience method. -func Print(args ...interface{}) { - defaultLogger.Print(args...) -} - -// Printf package-level convenience method. -func Printf(format string, args ...interface{}) { - defaultLogger.Printf(format, args...) -} - -// Println package-level convenience method. -func Println(args ...interface{}) { - defaultLogger.Println(args...) -} - -// Warn package-level convenience method. -func Warn(args ...interface{}) { - defaultLogger.Warn(args...) -} - -// Warnf package-level convenience method. -func Warnf(format string, args ...interface{}) { - defaultLogger.Warnf(format, args...) -} - -// Warning package-level convenience method. -func Warning(args ...interface{}) { - defaultLogger.Warning(args...) -} - -// Warningf package-level convenience method. -func Warningf(format string, args ...interface{}) { - defaultLogger.Warningf(format, args...) -} - -// Warningln package-level convenience method. -func Warningln(args ...interface{}) { - defaultLogger.Warningln(args...) -} - -// Warnln package-level convenience method. -func Warnln(args ...interface{}) { - defaultLogger.Warnln(args...) -} diff --git a/main.go b/main.go index 241a08d..52dd42d 100644 --- a/main.go +++ b/main.go @@ -3,21 +3,121 @@ package main import ( "flag" "fmt" + "os" + "time" +) + +// a lot of this borrows directly from: +// https://github.com/prometheus/prometheus/blob/master/documentation/examples/remote_storage/remote_storage_adapter/main.go + +type config struct { + //tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000 + ChDSN string + ChDB string + ChTable string + ChBatch int + ChanSize int + HTTPTimeout time.Duration + HTTPAddr string + HTTPWritePath string + HTTPMetricsPath string +} + +var ( + versionFlag bool ) func main() { + excode := 0 - versionFlag := flag.Bool("version", false, "Version") - flag.Parse() + conf := parseFlags() - if *versionFlag { + if versionFlag { fmt.Println("Git Commit:", GitCommit) fmt.Println("Version:", Version) if VersionPrerelease != "" { fmt.Println("Version PreRelease:", VersionPrerelease) } - return + os.Exit(excode) } - fmt.Println("Hello.") + fmt.Println("Starting up..") + + srv, err := NewP2CServer(conf) + if err != nil { + fmt.Printf("Error: could not create server: %s\n", err.Error()) + excode = 1 + os.Exit(excode) + } + err = srv.Start() + if err != nil { + fmt.Printf("Error: http server returned error: %s\n", err.Error()) + excode = 1 + } + + fmt.Println("Shutting down..") + srv.Shutdown() + fmt.Println("Exiting..") + os.Exit(excode) +} + +func parseFlags() *config { + cfg := new(config) + + // print version? + flag.BoolVar(&versionFlag, "version", false, "Version") + + // clickhouse dsn + ddsn := "tcp://127.0.0.1:9000?username=&password=&database=metrics&" + + "read_timeout=10&write_timeout=10&alt_hosts=" + flag.StringVar(&cfg.ChDSN, "dsn", ddsn, + "The clickhouse server DSN to write to eg. "+ + "tcp://host1:9000?username=user&password=qwerty&database=clicks&"+ + "read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000 "+ + "(see https://github.com/kshvakov/clickhouse).", + ) + + // clickhouse db + flag.StringVar(&cfg.ChDB, "ch.db", "metrics", + "The clickhouse database to write to.", + ) + + // clickhouse table + flag.StringVar(&cfg.ChTable, "ch.table", "samples", + "The clickhouse table to write to.", + ) + + // clickhouse insertion batch size + flag.IntVar(&cfg.ChBatch, "ch.batch", 8192, + "Clickhouse write batch size (n metrics).", + ) + + // channel buffer size between http server => clickhouse writer(s) + flag.IntVar(&cfg.ChanSize, "ch.buffer", 8192, + "Maximum internal channel buffer size (n requests).", + ) + + // http listen address + flag.StringVar(&cfg.HTTPAddr, "web.address", ":9201", + "Address to listen on for web endpoints.", + ) + + // http prometheus remote write endpoint + flag.StringVar(&cfg.HTTPWritePath, "web.write", "/write", + "Address to listen on for remote write requests.", + ) + + // http prometheus metrics endpoint + flag.StringVar(&cfg.HTTPMetricsPath, "web.metrics", "/metrics", + "Address to listen on for metric requests.", + ) + + // http shutdown and request timeout + flag.DurationVar(&cfg.HTTPTimeout, "web.timeout", 30*time.Second, + "The timeout to use for HTTP requests and server shutdown. Defaults to 30s.", + ) + + flag.Parse() + + return cfg } diff --git a/srv.go b/srv.go new file mode 100644 index 0000000..07e0ff9 --- /dev/null +++ b/srv.go @@ -0,0 +1,125 @@ +package main + +import ( + "io/ioutil" + "net/http" + "time" + + "fmt" + + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/remote" + "gopkg.in/tylerb/graceful.v1" +) + +type p2cRequest struct { + date time.Time + name string + tags []string + vals []string + val float64 + ts int64 +} + +type p2cServer struct { + requests chan *p2cRequest + mux *http.ServeMux + conf *config + writer *p2cWriter + rx prometheus.Counter +} + +func NewP2CServer(conf *config) (*p2cServer, error) { + var err error + c := new(p2cServer) + c.requests = make(chan *p2cRequest, conf.ChanSize) + c.mux = http.NewServeMux() + c.conf = conf + + c.writer, err = NewP2CWriter(conf, c.requests) + if err != nil { + fmt.Printf("Error creating clickhouse writer: %s\n", err.Error()) + return c, err + } + + c.rx = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "received_samples_total", + Help: "Total number of received samples.", + }, + ) + prometheus.MustRegister(c.rx) + + c.mux.HandleFunc(c.conf.HTTPWritePath, func(w http.ResponseWriter, r *http.Request) { + compressed, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var req remote.WriteRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + c.process(req) + }) + + c.mux.Handle(c.conf.HTTPMetricsPath, prometheus.InstrumentHandler( + c.conf.HTTPMetricsPath, prometheus.UninstrumentedHandler(), + )) + + return c, nil +} + +func (c *p2cServer) process(req remote.WriteRequest) { + for _, series := range req.Timeseries { + c.rx.Add(float64(len(series.Samples))) + var ( + name string + tags []string + vals []string + ) + + for _, label := range series.Labels { + if model.LabelName(label.Name) == model.MetricNameLabel { + name = label.Value + } + tags = append(tags, label.Name) + vals = append(vals, label.Value) + } + + for _, sample := range series.Samples { + p2c := new(p2cRequest) + p2c.name = name + p2c.ts = sample.TimestampMs / 1000 + p2c.date = time.Unix(p2c.ts, 0) + p2c.val = sample.Value + p2c.tags = tags + p2c.vals = vals + c.requests <- p2c + } + + } +} + +func (c *p2cServer) Start() error { + fmt.Println("Clickhouse http server starting...") + c.writer.Start() + return graceful.RunWithErr(c.conf.HTTPAddr, c.conf.HTTPTimeout, c.mux) +} + +func (c *p2cServer) Shutdown() { + close(c.requests) + c.writer.Wait() +}