Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remote-storage service #3836

Merged
merged 9 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cmd/remote-storage/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
ARG base_image
ARG debug_image

ARG SVC=remote-storage

FROM $base_image AS release
ARG TARGETARCH
COPY $SVC-linux-$TARGETARCH /go/bin/$SVC-linux
EXPOSE 16686/tcp
ENTRYPOINT ["/go/bin/$SVC-linux"]

FROM $debug_image AS debug
ARG TARGETARCH=amd64
COPY $SVC-debug-linux-$TARGETARCH /go/bin/$SVC-linux
EXPOSE 12345/tcp 16686/tcp
ENTRYPOINT ["/go/bin/dlv", "exec", "/go/bin/$SVC-linux", "--headless", "--listen=:12345", "--api-version=2", "--accept-multiclient", "--log", "--"]
64 changes: 64 additions & 0 deletions cmd/remote-storage/app/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) 2022 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"flag"
"fmt"

"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/ports"
)

const (
flagGRPCHostPort = "grpc.host-port"
)

var tlsGRPCFlagsConfig = tlscfg.ServerFlagsConfig{
Prefix: "grpc",
}

// Options holds configuration for remote-storage service.
type Options struct {
// GRPCHostPort is the host:port address for gRPC server
GRPCHostPort string
// TLSGRPC configures secure transport
TLSGRPC tlscfg.Options
// Tenancy configuration
Tenancy tenancy.Options
}

// AddFlags adds flags to flag set.
func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(flagGRPCHostPort, ports.PortToHostPort(ports.RemoteStorageGRPC), "The host:port (e.g. 127.0.0.1:17271 or :17271) of the gRPC server")
tlsGRPCFlagsConfig.AddFlags(flagSet)
tenancy.AddFlags(flagSet)
}

// InitFromViper initializes Options with properties from CLI flags.
func (o *Options) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Options, error) {
o.GRPCHostPort = v.GetString(flagGRPCHostPort)
if tlsGrpc, err := tlsGRPCFlagsConfig.InitFromViper(v); err == nil {
o.TLSGRPC = tlsGrpc
} else {
return o, fmt.Errorf("failed to process gRPC TLS options: %w", err)
}
o.Tenancy = tenancy.InitFromViper(v)
return o, nil
}
47 changes: 47 additions & 0 deletions cmd/remote-storage/app/flags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2022 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
)

func TestFlags(t *testing.T) {
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--grpc.host-port=127.0.0.1:8081",
})
qOpts, err := new(Options).InitFromViper(v, zap.NewNop())
require.NoError(t, err)
assert.Equal(t, "127.0.0.1:8081", qOpts.GRPCHostPort)
}

func TestFailedTLSFlags(t *testing.T) {
v, command := config.Viperize(AddFlags)
err := command.ParseFlags([]string{
"--grpc.tls.enabled=false",
"--grpc.tls.cert=blah", // invalid unless tls.enabled
})
require.NoError(t, err)
_, err = new(Options).InitFromViper(v, zap.NewNop())
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to process gRPC TLS options")
}
151 changes: 151 additions & 0 deletions cmd/remote-storage/app/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"fmt"
"net"

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// Server runs a gRPC server
type Server struct {
logger *zap.Logger
opts *Options

grpcConn net.Listener
grpcServer *grpc.Server
unavailableChannel chan healthcheck.Status // used to signal to admin server that gRPC server is unavailable
}

// NewServer creates and initializes Server.
func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.TenancyManager, logger *zap.Logger) (*Server, error) {
handler, err := createGRPCHandler(storageFactory, logger)
if err != nil {
return nil, err
}

grpcServer, err := createGRPCServer(options, tm, handler, logger)
if err != nil {
return nil, err
}

return &Server{
logger: logger,
opts: options,
grpcServer: grpcServer,
unavailableChannel: make(chan healthcheck.Status),
}, nil
}

func createGRPCHandler(f storage.Factory, logger *zap.Logger) (*shared.GRPCHandler, error) {
reader, err := f.CreateSpanReader()
if err != nil {
return nil, err
}
writer, err := f.CreateSpanWriter()
if err != nil {
return nil, err
}
depReader, err := f.CreateDependencyReader()
if err != nil {
return nil, err
}

impl := &shared.GRPCHandlerStorageImpl{
SpanReader: func() spanstore.Reader { return reader },
SpanWriter: func() spanstore.Writer { return writer },
DependencyReader: func() dependencystore.Reader { return depReader },
StreamingSpanWriter: func() spanstore.Writer { return nil },
}

// borrow code from Query service for archive storage
qOpts := &querysvc.QueryServiceOptions{}
// when archive storage not initialized (returns false), the reader/writer will be nil
_ = qOpts.InitArchiveStorage(f, logger)
impl.ArchiveSpanReader = func() spanstore.Reader { return qOpts.ArchiveSpanReader }
impl.ArchiveSpanWriter = func() spanstore.Writer { return qOpts.ArchiveSpanWriter }

handler := shared.NewGRPCHandler(impl)
return handler, nil
}

// HealthCheckStatus returns health check status channel a client can subscribe to
func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}

func createGRPCServer(opts *Options, tm *tenancy.TenancyManager, handler *shared.GRPCHandler, logger *zap.Logger) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

if opts.TLSGRPC.Enabled {
tlsCfg, err := opts.TLSGRPC.Config(logger)
if err != nil {
return nil, fmt.Errorf("invalid TLS config: %w", err)
}
creds := credentials.NewTLS(tlsCfg)
grpcOpts = append(grpcOpts, grpc.Creds(creds))
}
if tm.Enabled {
grpcOpts = append(grpcOpts,
grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm)),
grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm)),
)
}

server := grpc.NewServer(grpcOpts...)
reflection.Register(server)
handler.Register(server)

return server, nil
}

// Start gRPC server concurrently
func (s *Server) Start() error {
listener, err := net.Listen("tcp", s.opts.GRPCHostPort)
if err != nil {
return err
}
s.logger.Info("Starting GRPC server", zap.Stringer("addr", listener.Addr()))
s.grpcConn = listener
go func() {
if err := s.grpcServer.Serve(s.grpcConn); err != nil {
s.logger.Error("GRPC server exited", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
}()

return nil
}

// Close stops http, GRPC servers and closes the port listener.
func (s *Server) Close() error {
s.grpcServer.Stop()
s.grpcConn.Close()
s.opts.TLSGRPC.Close()
return nil
}
Loading