Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
zhoujianxuan committed Jul 22, 2022
1 parent e80f89f commit d1de3fd
Show file tree
Hide file tree
Showing 10 changed files with 851 additions and 0 deletions.
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/watcherTasks.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/ztil.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
module ztil

go 1.17

require (
github.com/Shopify/sarama v1.34.1
github.com/go-sql-driver/mysql v1.6.0
github.com/urfave/cli/v2 v2.10.2
)

require (
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.15.6 // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
)
539 changes: 539 additions & 0 deletions go.sum

Large diffs are not rendered by default.

78 changes: 78 additions & 0 deletions kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"fmt"
"log"

"github.com/urfave/cli/v2"

"github.com/Shopify/sarama"
)

func NewKafkaCommand() *cli.Command {
return &cli.Command{
Name: "kafka_send",
Aliases: []string{"ks"},
Usage: "Send kafka message",
UsageText: "ks <topic> <value> [url]",
Action: func(c *cli.Context) error {
if c.NArg() < 2 {
fmt.Println("Periodic incomplete")
return nil
}
hosts := []string{"192.168.3.54:9092", "192.168.3.53:9092", "192.168.3.37:9092"}
if c.NArg() == 3 {
hosts = []string{c.Args().Get(2)}
}

client, err := NewClient(hosts)
if err != nil {
return err
}

topic := c.Args().First()
value := c.Args().Get(1)

client.Send(topic, value)
return nil
},
}
}

type Client struct {
ProducerClient sarama.AsyncProducer
}

func NewClient(server []string) (*Client, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 构造一个消息
// 连接kafka
client, err := sarama.NewAsyncProducer(server, config)
if err != nil {
return nil, err
}
return &Client{ProducerClient: client}, nil
}

func (c *Client) Send(topic string, v string) {
msg := &sarama.ProducerMessage{}
msg.Topic = topic
msg.Value = sarama.StringEncoder(v)
c.ProducerClient.Input() <- msg
select {
case m := <-c.ProducerClient.Successes():
log.Println(m)
case m := <-c.ProducerClient.Errors():
log.Println(m)
}
}

func (c *Client) Close() {
err := c.ProducerClient.Close()
if err != nil {
return
}
}
36 changes: 36 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
"fmt"
"log"
"os"
"sort"

"github.com/urfave/cli/v2"
)

func NewV50() *cli.Command {
return &cli.Command{Name: "v50", Action: func(c *cli.Context) error {
fmt.Println("KFC Crazy Thursday V me 50")
return nil
}}
}

func main() {
app := &cli.App{
Flags: []cli.Flag{},
Commands: []*cli.Command{
NewV50(),
NewKafkaCommand(),
NewMySQLCommand(),
},
}

sort.Sort(cli.FlagsByName(app.Flags))
sort.Sort(cli.CommandsByName(app.Commands))

err := app.Run(os.Args)
if err != nil {
log.Fatal(err)
}
}
125 changes: 125 additions & 0 deletions mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package main

import (
"database/sql"
"encoding/csv"
"fmt"
"log"
"os"
"time"

"github.com/urfave/cli/v2"

_ "github.com/go-sql-driver/mysql"
)

func getMysqlConnection(user, pass, url, database string) (*sql.DB, error) {
return sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s", user, pass, url, database))
}

type DatabaseResult struct {
Tp map[string]string
Value map[string][]byte
}

func RAW(user, pass, url, database, sqlStr string) ([][]string, []string, error) {
db, err := getMysqlConnection(user, pass, url, database)
if err != nil {
return nil, nil, err
}
defer db.Close()

rows, err := db.Query(sqlStr)
if err != nil {
return nil, nil, err
}

cols, _ := rows.Columns()
types, _ := rows.ColumnTypes()

values := make([][]byte, len(cols))
scans := make([]interface{}, len(cols))
for i, _ := range cols {
scans[i] = &values[i]
}

index := 0
result := make(map[int]*DatabaseResult)
for rows.Next() {
err = rows.Scan(scans...)
if err != nil {
log.Fatal(err)
}

tp := make(map[string]string)
row := make(map[string][]byte)
j := 0
for k, v := range values {
key := cols[k]
//这里把[]byte根据条件转换
row[key] = v
tp[key] = types[j].DatabaseTypeName()
j++
}

result[index] = &DatabaseResult{
Tp: tp,
Value: row,
}
index++
}

var data [][]string
for i := range result {
row := result[i]
var rowData []string
for _, col := range cols {
if v, ok := row.Value[col]; ok {
rowData = append(rowData, string(v))
}
}
data = append(data, rowData)
}
return data, cols, nil
}

func NewMySQLCommand() *cli.Command {
return &cli.Command{
Name: "mysql",
Aliases: []string{"sql"},
Usage: "select sql",
UsageText: "mysql <user> <password> <url> <database> <sql>",
Action: func(c *cli.Context) error {
if c.NArg() < 5 {
fmt.Println("Periodic incomplete")
return nil
}

user := c.Args().Get(0)
pass := c.Args().Get(1)
url := c.Args().Get(2)
database := c.Args().Get(3)
sqlStr := c.Args().Get(4)

list, cols, err := RAW(user, pass, url, database, sqlStr)
if err != nil {
return err
}

f, err := os.Create(fmt.Sprintf("%d%s", time.Now().UnixNano(), ".csv"))
if err != nil {
log.Fatal(err)
}
defer f.Close()

_, _ = f.WriteString("\xEF\xBB\xBF")
writer := csv.NewWriter(f)
_ = writer.Write(cols)
for _, data := range list {
_ = writer.Write(data)
}
writer.Flush()
return nil
},
}
}

0 comments on commit d1de3fd

Please sign in to comment.