Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Support Parquet as a query response format. #15408

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
29 changes: 29 additions & 0 deletions docs/sources/reference/loki-http-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,35 @@ curl -u "User:$API_TOKEN" \
--data-urlencode 'query=sum(rate({job="varlogs"}[10m])) by (level)' | jq
```

### Response Format

The Parquet can be request as a response format by setting the `Accept` header to `application/vnd.apache.parquet`.

```bash
curl -G -s "http://localhost:3100/loki/api/v1/query" \
-H "Accept: application/vnd.apache.parquet" \
--data-urlencode 'query=sum(rate({job="varlogs"}[10m])) by (level)'
-o result.parquet
```

The Parquet schema is the following for streams:

│ column_name │ column_type │
│ varchar │ varchar │
|-------------|--------------------------|
│ timestamp │ TIMESTAMP WITH TIME ZONE │
│ labels │ MAP(VARCHAR, VARCHAR) │
│ line │ VARCHAR │

and for metrics:

│ column_name │ column_type │
│ varchar │ varchar │
|-------------|--------------------------|
│ timestamp │ TIMESTAMP WITH TIME ZONE │
│ labels │ MAP(VARCHAR, VARCHAR) │
│ line │ DOUBLE │

## Query logs within a range of time

```bash
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ require (
github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
github.com/ncw/swift/v2 v2.0.3
github.com/parquet-go/parquet-go v0.24.0
github.com/prometheus/alertmanager v0.27.0
github.com/prometheus/common/sigv4 v0.1.0
github.com/richardartoul/molecule v1.0.0
Expand Down Expand Up @@ -161,6 +162,7 @@ require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.1 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.48.1 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.48.1 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/dlclark/regexp2 v1.11.0 // indirect
github.com/ebitengine/purego v0.8.1 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
Expand All @@ -173,9 +175,11 @@ require (
github.com/imdario/mergo v0.3.16 // indirect
github.com/kamstrup/intmap v0.5.0 // indirect
github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/sys/userns v0.1.0 // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/pires/go-proxyproto v0.7.0 // indirect
github.com/pkg/xattr v0.4.10 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,8 @@ github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9/go.mod h1:eliMa/PW+RDr2QLWRmLH1R1ZA4RInpmvOzDDXtaIZkc=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0=
github.com/apache/arrow/go/v11 v11.0.0/go.mod h1:Eg5OsL5H+e299f7u5ssuXsuHQVEGC4xei5aX110hRiI=
Expand Down Expand Up @@ -2142,6 +2144,7 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
Expand Down Expand Up @@ -2271,6 +2274,8 @@ github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNs
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/olekukonko/tablewriter v0.0.0-20180130162743-b8a9be070da4/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down Expand Up @@ -2335,6 +2340,8 @@ github.com/ovh/go-ovh v1.6.0 h1:ixLOwxQdzYDx296sXcgS35TOPEahJkpjMGtzPadCjQI=
github.com/ovh/go-ovh v1.6.0/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC7c=
github.com/packethost/packngo v0.1.1-0.20180711074735-b9cb5096f54c/go.mod h1:otzZQXgoO96RTzDB/Hycg0qZcXZsWJGJRSXbmEIJ+4M=
github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM=
github.com/parquet-go/parquet-go v0.24.0 h1:VrsifmLPDnas8zpoHmYiWDZ1YHzLmc7NmNwPGkI2JM4=
github.com/parquet-go/parquet-go v0.24.0/go.mod h1:OqBBRGBl7+llplCvDMql8dEKaDqjaFA/VAPw+OJiNiw=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
Expand Down
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
frontendHandler = gziphandler.GzipHandler(frontendHandler)
}

// TODO: add SerializeHTTPHandler
toMerge := []middleware.Interface{
httpreq.ExtractQueryTagsMiddleware(),
httpreq.PropagateHeadersMiddleware(httpreq.LokiActorPathHeader, httpreq.LokiEncodingFlagsHeader, httpreq.LokiDisablePipelineWrappersHeader),
Expand Down
2 changes: 2 additions & 0 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,8 @@ func decodeResponseProtobuf(r *http.Response, req queryrangebase.Request) (query
func (Codec) EncodeResponse(ctx context.Context, req *http.Request, res queryrangebase.Response) (*http.Response, error) {
if req.Header.Get("Accept") == ProtobufType {
return encodeResponseProtobuf(ctx, res)
} else if req.Header.Get("Accept") == ParquetType {
return encodeResponseParquet(ctx, res)
}

// Default to JSON.
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/queryrange/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

const (
JSONType = `application/json; charset=utf-8`
ParquetType = `application/vnd.apache.parquet`
ProtobufType = `application/vnd.google.protobuf`
)

Expand Down
112 changes: 112 additions & 0 deletions pkg/querier/queryrange/parquet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package queryrange

import (
"bytes"
"context"
"io"
"net/http"

"github.com/opentracing/opentracing-go"
"github.com/parquet-go/parquet-go"
"github.com/prometheus/prometheus/promql/parser"

serverutil "github.com/grafana/loki/v3/pkg/util/server"

"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
)

func encodeResponseParquet(ctx context.Context, res queryrangebase.Response) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "codec.EncodeResponse")
defer sp.Finish()

var buf bytes.Buffer

err := encodeResponseParquetTo(ctx, res, &buf)
if err != nil {
return nil, err
}

resp := http.Response{
Header: http.Header{
"Content-Type": []string{ParquetType},
},
Body: io.NopCloser(&buf),
StatusCode: http.StatusOK,
}
return &resp, nil
}

func encodeResponseParquetTo(_ context.Context, res queryrangebase.Response, w io.Writer) error {
switch response := res.(type) {
case *LokiPromResponse:
return encodeMetricsParquetTo(response, w)
case *LokiResponse:
return encodeLogsParquetTo(response, w)
default:
return serverutil.UserError("request does not support Parquet responses")
}
}

type MetricRowType struct {
Timestamp int64 `parquet:"timestamp,timestamp(millisecond),delta"`
Labels map[string]string `parquet:"labels"`
Value float64 `parquet:"value"`
}

type LogStreamRowType struct {
Timestamp int64 `parquet:"timestamp,timestamp(nanosecond),delta"`
Labels map[string]string `parquet:"labels"`
Line string `parquet:"line,lz4"`
}

func encodeMetricsParquetTo(response *LokiPromResponse, w io.Writer) error {
schema := parquet.SchemaOf(new(MetricRowType))
writer := parquet.NewGenericWriter[MetricRowType](w, schema)

for _, stream := range response.Response.Data.Result {
lbls := make(map[string]string)
for _, keyValue := range stream.Labels {
lbls[keyValue.Name] = keyValue.Value
}
for _, sample := range stream.Samples {
row := MetricRowType{
Timestamp: sample.TimestampMs,
Labels: lbls,
Value: sample.Value,
}
if _, err := writer.Write([]MetricRowType{row}); err != nil {
return err
}
}
}
return writer.Close()
}

func encodeLogsParquetTo(response *LokiResponse, w io.Writer) error {
schema := parquet.SchemaOf(new(LogStreamRowType))
writer := parquet.NewGenericWriter[LogStreamRowType](w, schema)

for _, stream := range response.Data.Result {
lbls, err := parser.ParseMetric(stream.Labels)
if err != nil {
return err
}
lblsMap := make(map[string]string)
for _, keyValue := range lbls {
lblsMap[keyValue.Name] = keyValue.Value
}

for _, entry := range stream.Entries {
row := LogStreamRowType{
Timestamp: entry.Timestamp.UnixNano(),
Labels: lblsMap,
Line: entry.Line,
}
if _, err := writer.Write([]LogStreamRowType{row}); err != nil {
return err
}
}
}

return writer.Close()
}
63 changes: 63 additions & 0 deletions pkg/querier/queryrange/parquet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package queryrange

import (
"os"
"testing"

"github.com/parquet-go/parquet-go"

"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/loghttp"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
)

func TestEncodeMetricsParquet(t *testing.T) {
resp := &LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeMatrix,
Result: sampleStreams,
},
},
}

f, err := os.CreateTemp("", "metrics-*.parquet")
defer f.Close() // nolint:staticcheck

require.NoError(t, err)
err = encodeMetricsParquetTo(resp, f)
require.NoError(t, err)

rows, err := parquet.ReadFile[MetricRowType](f.Name())
require.NoError(t, err)

require.Len(t, rows, 3)
}

func TestEncodeLogsParquet(t *testing.T) {
resp := &LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: logproto.FORWARD,
Limit: 100,
Version: uint32(loghttp.VersionV1),
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Result: logStreams,
},
}

f, err := os.CreateTemp("", "logs-*.parquet")
defer f.Close() // nolint:staticcheck

require.NoError(t, err)
err = encodeLogsParquetTo(resp, f)
require.NoError(t, err)

rows, err := parquet.ReadFile[LogStreamRowType](f.Name())
require.NoError(t, err)

require.Len(t, rows, 3)
}
Empty file.
8 changes: 8 additions & 0 deletions pkg/querier/queryrange/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ func (rt *serializeHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request
return
}

// TODO(karsten): use rt.codec.EncodeResponse(ctx, r, response) which is the central encoding logic instead.
if r.Header.Get("Accept") == ParquetType {
w.Header().Add("Content-Type", ParquetType)
if err := encodeResponseParquetTo(ctx, response, w); err != nil {
serverutil.WriteError(err, w)
}
return
}
version := loghttp.GetVersion(r.RequestURI)
encodingFlags := httpreq.ExtractEncodingFlags(r)
if err := encodeResponseJSONTo(version, response, w, encodingFlags); err != nil {
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/server/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ const (
ErrDeadlineExceeded = "Request timed out, decrease the duration of the request or add more label matchers (prefer exact match over regex match) to reduce the amount of data processed."
)

type UserError string

func (e UserError) Error() string {
return string(e)
}

func ClientGrpcStatusAndError(err error) error {
if err == nil {
return nil
Expand Down Expand Up @@ -55,6 +61,7 @@ func ClientHTTPStatusAndError(err error) (int, error) {
var (
queryErr storage_errors.QueryError
promErr promql.ErrStorage
userErr UserError
)

me, ok := err.(util.MultiError)
Expand Down Expand Up @@ -91,6 +98,8 @@ func ClientHTTPStatusAndError(err error) (int, error) {
return http.StatusBadRequest, err
case errors.Is(err, user.ErrNoOrgID):
return http.StatusBadRequest, err
case errors.As(err, &userErr):
return http.StatusBadRequest, err
default:
if grpcErr, ok := httpgrpc.HTTPResponseFromError(err); ok {
return int(grpcErr.Code), errors.New(string(grpcErr.Body))
Expand Down
2 changes: 1 addition & 1 deletion tools/dev/loki-tsdb-storage-s3/compose-down.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

SCRIPT_DIR=$(cd "$(dirname "$0")" && pwd)

docker-compose -f "${SCRIPT_DIR}"/docker-compose.yml down --remove-orphans
docker compose -f "${SCRIPT_DIR}"/docker-compose.yml down --remove-orphans
4 changes: 2 additions & 2 deletions tools/dev/loki-tsdb-storage-s3/compose-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ CGO_ENABLED=0 GOOS=linux go build -mod=vendor -gcflags "all=-N -l" -o "${SCRIPT_
# ## install loki driver to send logs
docker plugin install grafana/loki-docker-driver:latest --alias loki-compose --grant-all-permissions || true
# build the compose image
docker-compose -f "${SCRIPT_DIR}"/docker-compose.yml build distributor
docker compose -f "${SCRIPT_DIR}"/docker-compose.yml build distributor
# cleanup sources
rm -Rf "${SRC_DEST}"
docker-compose -f "${SCRIPT_DIR}"/docker-compose.yml up "$@"
docker compose -f "${SCRIPT_DIR}"/docker-compose.yml up "$@"

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading