Skip to content

Commit

Permalink
Merge pull request #37 from splichy/tlsServerName
Browse files Browse the repository at this point in the history
add TLS options: serverName & skip verify
  • Loading branch information
nikepan authored Dec 15, 2020
2 parents a0e67a0 + 42c0dd4 commit 7fd0911
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 12 deletions.
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# ClickHouse-Bulk

[![Build Status](https://travis-ci.org/nikepan/clickhouse-bulk.svg?branch=master)](https://travis-ci.org/nikepan/clickhouse-bulk)
[![codecov](https://codecov.io/gh/nikepan/clickhouse-bulk/branch/master/graph/badge.svg)](https://codecov.io/gh/nikepan/clickhouse-bulk)
[![download binaries](https://img.shields.io/badge/binaries-download-blue.svg)](https://github.com/nikepan/clickhouse-bulk/releases)
Expand Down Expand Up @@ -35,7 +35,7 @@ go build
- Supports query in query parameters and in body
- Supports other query parameters like username, password, database
- Supports basic authentication


For example:
```sql
Expand All @@ -55,15 +55,19 @@ INSERT INTO table3 (c1, c2, c3) VALUES ('v1', 'v2', 'v3')('v4', 'v5', 'v6')
### Configuration file
```javascript
{
"listen": ":8124",
"listen": ":8124",
"flush_count": 10000, // check by \n char
"flush_interval": 1000, // milliseconds
"clean_interval": 0, // how often cleanup internal tables - e.g. inserts to different temporary tables, or as workaround for query_id etc. milliseconds
"remove_query_id": true, // some drivers sends query_id which prevents inserts to be batched
"dump_check_interval": 300, // interval for try to send dumps (seconds); -1 to disable
"debug": false, // log incoming requests
"dump_dir": "dumps", // directory for dump unsended data (if clickhouse errors)
"clickhouse": {
"down_timeout": 60, // wait if server in down (seconds)
"connect_timeout": 10, // wait for server connect (seconds)
"tls_server_name": "", // override TLS serverName for certificate verification (e.g. in cases you share same "cluster" certificate across multiple nodes)
"insecure_tls_skip_verify": false, // INSECURE - skip certificate verification at all
"servers": [
"http://127.0.0.1:8123"
]
Expand All @@ -73,12 +77,16 @@ INSERT INTO table3 (c1, c2, c3) VALUES ('v1', 'v2', 'v3')('v4', 'v5', 'v6')

### Environment variables (used for docker image)

* `CLICKHOUSE_BULK_DEBUG` - enable debug logging
* `CLICKHOUSE_SERVERS` - comma separated list of servers
* `CLICKHOUSE_FLUSH_COUNT` - count of rows for insert
* `CLICKHOUSE_FLUSH_INTERVAL` - insert interval
* `CLICKHOUSE_CLEAN_INTERVAL` - internal tables clean interval
* `DUMP_CHECK_INTERVAL` - interval of resend dumps
* `CLICKHOUSE_DOWN_TIMEOUT` - wait time if server is down
* `CLICKHOUSE_CONNECT_TIMEOUT` - clickhouse server connect timeout
* `CLICKHOUSE_TLS_SERVER_NAME` - server name for TLS certificate verification
* `CLICKHOUSE_INSECURE_TLS_SKIP_VERIFY` - skip certificate verification at all

### Quickstart

Expand Down
17 changes: 15 additions & 2 deletions clickhouse.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"crypto/tls"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -30,6 +31,7 @@ type Clickhouse struct {
ConnectTimeout int
Dumper Dumper
wg sync.WaitGroup
Transport *http.Transport
}

// ClickhouseRequest - request struct for queue
Expand All @@ -48,7 +50,15 @@ var ErrServerIsDown = errors.New("server is down")
var ErrNoServers = errors.New("No working clickhouse servers")

// NewClickhouse - get clickhouse object
func NewClickhouse(downTimeout int, connectTimeout int) (c *Clickhouse) {
func NewClickhouse(downTimeout int, connectTimeout int, tlsServerName string, tlsSkipVerify bool) (c *Clickhouse) {
tlsConfig := &tls.Config{}
if tlsServerName != "" {
tlsConfig.ServerName = tlsServerName
}
if tlsSkipVerify == true {
tlsConfig.InsecureSkipVerify = tlsSkipVerify
}

c = new(Clickhouse)
c.DownTimeout = downTimeout
c.ConnectTimeout = connectTimeout
Expand All @@ -57,6 +67,9 @@ func NewClickhouse(downTimeout int, connectTimeout int) (c *Clickhouse) {
}
c.Servers = make([]*ClickhouseServer, 0)
c.Queue = queue.New(1000)
c.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
go c.Run()
return c
}
Expand All @@ -66,7 +79,7 @@ func (c *Clickhouse) AddServer(url string) {
c.mu.Lock()
defer c.mu.Unlock()
c.Servers = append(c.Servers, &ClickhouseServer{URL: url, Client: &http.Client{
Timeout: time.Second * time.Duration(c.ConnectTimeout),
Timeout: time.Second * time.Duration(c.ConnectTimeout), Transport: c.Transport,
}})
}

Expand Down
8 changes: 4 additions & 4 deletions clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestClickhouse_GetNextServer(t *testing.T) {
c := NewClickhouse(300, 10)
c := NewClickhouse(300, 10, "", false)
c.AddServer("")
c.AddServer("http://127.0.0.1:8124")
c.AddServer("http://127.0.0.1:8125")
Expand All @@ -29,7 +29,7 @@ func TestClickhouse_GetNextServer(t *testing.T) {
}

func TestClickhouse_Send(t *testing.T) {
c := NewClickhouse(300, 10)
c := NewClickhouse(300, 10, "", false)
c.AddServer("")
c.Send(&ClickhouseRequest{})
for !c.Queue.Empty() {
Expand All @@ -38,7 +38,7 @@ func TestClickhouse_Send(t *testing.T) {
}

func TestClickhouse_SendQuery(t *testing.T) {
c := NewClickhouse(300, 10)
c := NewClickhouse(300, 10, "", false)
c.AddServer("")
c.GetNextServer()
c.Servers[0].Bad = true
Expand All @@ -48,7 +48,7 @@ func TestClickhouse_SendQuery(t *testing.T) {
}

func TestClickhouse_SendQuery1(t *testing.T) {
c := NewClickhouse(-1, 10)
c := NewClickhouse(-1, 10, "", false)
c.AddServer("")
c.GetNextServer()
c.Servers[0].Bad = true
Expand Down
2 changes: 2 additions & 0 deletions config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"clickhouse": {
"down_timeout": 60,
"connect_timeout": 10,
"tls_server_name": "",
"insecure_tls_skip_verify": false,
"servers": [
"http://127.0.0.1:8123"
]
Expand Down
2 changes: 1 addition & 1 deletion dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestDump_Dump(t *testing.T) {
c := NewClickhouse(-1, 10)
c := NewClickhouse(-1, 10, "", false)
dumpDir := "dumptest"
dumper := NewDumper(dumpDir)
c.Dumper = dumper
Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func SafeQuit(collect *Collector, sender Sender) {
func RunServer(cnf Config) {
InitMetrics()
dumper := NewDumper(cnf.DumpDir)
sender := NewClickhouse(cnf.Clickhouse.DownTimeout, cnf.Clickhouse.ConnectTimeout)
sender := NewClickhouse(cnf.Clickhouse.DownTimeout, cnf.Clickhouse.ConnectTimeout, cnf.Clickhouse.tlsServerName, cnf.Clickhouse.tlsSkipVerify)
sender.Dumper = dumper
for _, url := range cnf.Clickhouse.Servers {
sender.AddServer(url)
Expand Down
10 changes: 9 additions & 1 deletion server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ func TestRunServer(t *testing.T) {
status, _ = request("GET", "/metrics", "", server.echo)
assert.Equal(t, status, http.StatusOK)

server.echo.GET("/debug/gc", server.gcHandler)
status, resp = request("GET", "/debug/gc", "", server.echo)
assert.Equal(t, status, http.StatusOK)

server.echo.GET("/debug/freemem", server.freeMemHandler)
status, resp = request("GET", "/debug/freemem", "", server.echo)
assert.Equal(t, status, http.StatusOK)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
server.Shutdown(ctx)
Expand Down Expand Up @@ -97,7 +105,7 @@ func TestServer_MultiServer(t *testing.T) {
}))
defer s2.Close()

sender := NewClickhouse(10, 10)
sender := NewClickhouse(10, 10, "", false)
sender.AddServer(s1.URL)
sender.AddServer(s2.URL)
collect := NewCollector(sender, 1000, 1000, 0, true)
Expand Down
8 changes: 8 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const sampleConfig = "config.sample.json"

type clickhouseConfig struct {
Servers []string `json:"servers"`
tlsServerName string `json:"tls_server_name"`
tlsSkipVerify bool `json:"insecure_tls_skip_verify"`
DownTimeout int `json:"down_timeout"`
ConnectTimeout int `json:"connect_timeout"`
}
Expand Down Expand Up @@ -87,12 +89,18 @@ func ReadConfig(configFile string) (Config, error) {
readEnvInt("DUMP_CHECK_INTERVAL", &cnf.DumpCheckInterval)
readEnvInt("CLICKHOUSE_DOWN_TIMEOUT", &cnf.Clickhouse.DownTimeout)
readEnvInt("CLICKHOUSE_CONNECT_TIMEOUT", &cnf.Clickhouse.ConnectTimeout)
readEnvBool("CLICKHOUSE_INSECURE_TLS_SKIP_VERIFY", &cnf.Clickhouse.tlsSkipVerify)

serversList := os.Getenv("CLICKHOUSE_SERVERS")
if serversList != "" {
cnf.Clickhouse.Servers = strings.Split(serversList, ",")
}
log.Printf("use servers: %+v\n", strings.Join(cnf.Clickhouse.Servers, ", "))

tlsServerName := os.Getenv("CLICKHOUSE_TLS_SERVER_NAME")
if tlsServerName != "" {
cnf.Clickhouse.tlsServerName = tlsServerName
}

return cnf, err
}

0 comments on commit 7fd0911

Please sign in to comment.