Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

mt-gateway: support ingesting data into kafka #1608

Merged
merged 24 commits into from
Jan 23, 2020
Merged
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f748bf3
initial import of "publish" package from raintank/tsdb-gw (doesn't co…
fitzoh Jan 15, 2020
66a9481
update imports from raintank/tsdb-gw -> grafana/metrictank (still no …
fitzoh Jan 15, 2020
6b76e24
copy-paste metrics_client package from raintank/tsdb_gw to fix compil…
fitzoh Jan 15, 2020
e89cc6e
pull in util/flags package from raintank/tsdb-gw to fix compilation i…
fitzoh Jan 15, 2020
85e9c4f
import BufferPool33 struct from raintank/tsdb-gw to fix compilation e…
fitzoh Jan 15, 2020
d955422
fix compilation error
fitzoh Jan 15, 2020
e37ea38
add missing publisher test dependency (github.com/Shopify/sarama/mocks)
fitzoh Jan 15, 2020
0b5c3fc
go fmt
fitzoh Jan 15, 2020
c2ccac6
get rid of unused variable and the dependency it pulled in
fitzoh Jan 16, 2020
3f7baea
remove unnecessary function/file
fitzoh Jan 16, 2020
8674c57
initial ingest of metrics ingestion files
fitzoh Jan 21, 2020
d50c88e
update imports
fitzoh Jan 21, 2020
3a270f8
drop rate limiting
fitzoh Jan 21, 2020
d4d0b7d
Replace macaron ctx object with base http objects
fitzoh Jan 21, 2020
60a6788
There's no auth in this version, so we should assume admin = true
fitzoh Jan 21, 2020
7adf01d
Wire up new ingest handler
fitzoh Jan 21, 2020
82e377e
go fmt
fitzoh Jan 21, 2020
94c2112
configure kafka for ingestor endpoint
fitzoh Jan 21, 2020
b6887d9
sync tool docs
fitzoh Jan 21, 2020
87cb16c
fix message + make more descriptive
fitzoh Jan 22, 2020
186fab2
update imports
fitzoh Jan 22, 2020
1abc592
Refactor responses/logging for failed ingest requests
fitzoh Jan 22, 2020
c90e7a7
remove unused function
fitzoh Jan 22, 2020
3237276
Fprintf -> Fprint
fitzoh Jan 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 21 additions & 34 deletions cmd/mt-gateway/ingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ func Metrics(w http.ResponseWriter, r *http.Request) {
case "application/json":
metricsJson(w, r)
default:
w.WriteHeader(400)
fmt.Fprintf(w, "unknown content-type: %s", contentType)
writeErrorResponse(w, 400, "unknown content-type: %s", contentType)
}
}

Expand Down Expand Up @@ -108,29 +107,24 @@ func prepareIngest(in []*schema.MetricData, toPublish []*schema.MetricData) ([]*

func metricsJson(w http.ResponseWriter, r *http.Request) {
if r.Body == nil {
w.WriteHeader(400)
fmt.Fprintf(w, "no data included in request.")
writeErrorResponse(w, 400, "no data included in request.")
return
}
defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
if err != nil {
select {
case <-r.Context().Done():
w.WriteHeader(499)
fmt.Fprintf(w, "request canceled")
writeErrorResponse(w, 499, "request canceled")
default:
log.Errorf("unable to read request body. %s", err)
w.WriteHeader(500)
fmt.Fprintf(w, "unable to read request body. %s", err)
writeErrorResponse(w, 500, "unable to read request body. %s", err)
}
return
}
metrics := make([]*schema.MetricData, 0)
err = json.Unmarshal(body, &metrics)
if err != nil {
w.WriteHeader(400)
fmt.Fprintf(w, "unable to parse request body. %s", err)
writeErrorResponse(w, 400, "unable to parse request body. %s", err)
return
}

Expand All @@ -139,17 +133,14 @@ func metricsJson(w http.ResponseWriter, r *http.Request) {

select {
case <-r.Context().Done():
w.WriteHeader(499)
fmt.Fprintf(w, "request canceled")
writeErrorResponse(w, 499, "request canceled")
return
default:
}

err = publish.Publish(toPublish)
if err != nil {
log.Errorf("failed to publish metrics. %s", err)
w.WriteHeader(500)
fmt.Fprintf(w, "failed to publish metrics. %s", err)
writeErrorResponse(w, 500, "failed to publish metrics. %s", err)
return
}

Expand All @@ -161,8 +152,7 @@ func metricsJson(w http.ResponseWriter, r *http.Request) {

func metricsBinary(w http.ResponseWriter, r *http.Request, compressed bool) {
if r.Body == nil {
w.WriteHeader(400)
fmt.Fprintf(w, "no data included in request.")
writeErrorResponse(w, 400, "no data included in request.")
return
}
var bodyReadCloser io.ReadCloser
Expand All @@ -177,29 +167,22 @@ func metricsBinary(w http.ResponseWriter, r *http.Request, compressed bool) {
if err != nil {
select {
case <-r.Context().Done():
w.WriteHeader(499)
fmt.Fprintf(w, "request canceled")
writeErrorResponse(w, 499, "request canceled")
default:
log.Errorf("unable to read request body. %s", err)
w.WriteHeader(500)
fmt.Fprintf(w, "unable to read request body. %s", err)
writeErrorResponse(w, 500, "unable to read request body. %s", err)
}
return
}
metricData := new(msg.MetricData)
err = metricData.InitFromMsg(body)
if err != nil {
log.Errorf("payload not metricData. %s", err)
w.WriteHeader(400)
fmt.Fprintf(w, "payload not metricData. %s", err)
writeErrorResponse(w, 400, "payload not metricData. %s", err)
return
}

err = metricData.DecodeMetricData()
if err != nil {
log.Errorf("failed to unmarshal metricData. %s", err)
w.WriteHeader(400)
fmt.Fprintf(w, "failed to unmarshal metricData. %s", err)
writeErrorResponse(w, 400, "failed to unmarshal metricData. %s", err)
return
}

Expand All @@ -208,17 +191,14 @@ func metricsBinary(w http.ResponseWriter, r *http.Request, compressed bool) {

select {
case <-r.Context().Done():
w.WriteHeader(499)
fmt.Fprintf(w, "request canceled")
writeErrorResponse(w, 499, "request canceled")
return
default:
}

err = publish.Publish(toPublish)
if err != nil {
log.Errorf("failed to publish metrics. %s", err)
w.WriteHeader(500)
fmt.Fprintf(w, "failed to publish metrics. %s", err)
writeErrorResponse(w, 500, "failed to publish metrics. %s", err)
return
}

Expand All @@ -227,3 +207,10 @@ func metricsBinary(w http.ResponseWriter, r *http.Request, compressed bool) {
w.WriteHeader(200)
json.NewEncoder(w).Encode(resp)
}

func writeErrorResponse(w http.ResponseWriter, status int, msg string, fmtArgs ...interface{}) {
w.WriteHeader(status)
formatted := fmt.Sprint(msg, fmtArgs)
log.Error(formatted)
fmt.Fprintf(w, formatted)
}