Skip to content

Commit

Permalink
Support UNIX domain socket in the example servers (grpc-ecosystem#609)
Browse files Browse the repository at this point in the history
* Lets example servers gracefully shutdown
* Support UNIX domain sockets in the example servers
  • Loading branch information
yugui authored and achew22 committed Apr 22, 2018
1 parent bb00f27 commit b9de75e
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 69 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ $(ABE_EXAMPLE_SRCS): $(ABE_EXAMPLE_SPEC)
examples: $(EXAMPLE_SVCSRCS) $(EXAMPLE_GWSRCS) $(EXAMPLE_DEPSRCS) $(EXAMPLE_SWAGGERSRCS) $(EXAMPLE_CLIENT_SRCS)
test: examples
go test -race $(PKG)/...
go test -race $(PKG)/examples -args -network=unix -endpoint=test.sock

lint:
golint --set_exit_status $(PKG)/runtime
Expand Down
5 changes: 3 additions & 2 deletions examples/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ go_library(
srcs = ["main.go"],
importpath = "github.com/grpc-ecosystem/grpc-gateway/examples",
deps = [
"//examples/examplepb:go_default_library",
"//examples/gateway:go_default_library",
"//runtime:go_default_library",
"@com_github_golang_glog//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
],
)

Expand All @@ -36,10 +35,12 @@ go_test(
"//examples/server:go_default_library",
"//examples/sub:go_default_library",
"//runtime:go_default_library",
"@com_github_golang_glog//:go_default_library",
"@com_github_golang_protobuf//jsonpb:go_default_library",
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_golang_protobuf//ptypes/empty:go_default_library",
"@org_golang_google_genproto//googleapis/rpc/status:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_x_net//context:go_default_library",
],
)
13 changes: 13 additions & 0 deletions examples/gateway/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["gateway.go"],
importpath = "github.com/grpc-ecosystem/grpc-gateway/examples/gateway",
visibility = ["//visibility:public"],
deps = [
"//examples/examplepb:go_default_library",
"//runtime:go_default_library",
"@org_golang_google_grpc//:go_default_library",
],
)
71 changes: 71 additions & 0 deletions examples/gateway/gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package gateway

import (
"context"
"net"
"net/http"
"time"

"github.com/grpc-ecosystem/grpc-gateway/examples/examplepb"
gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"
)

type optSet struct {
mux []gwruntime.ServeMuxOption
dial []grpc.DialOption

echoEndpoint, abeEndpoint, flowEndpoint string
}

// newGateway returns a new gateway server which translates HTTP into gRPC.
func newGateway(ctx context.Context, opts optSet) (http.Handler, error) {
mux := gwruntime.NewServeMux(opts.mux...)

err := examplepb.RegisterEchoServiceHandlerFromEndpoint(ctx, mux, opts.echoEndpoint, opts.dial)
if err != nil {
return nil, err
}
err = examplepb.RegisterStreamServiceHandlerFromEndpoint(ctx, mux, opts.abeEndpoint, opts.dial)
if err != nil {
return nil, err
}
err = examplepb.RegisterABitOfEverythingServiceHandlerFromEndpoint(ctx, mux, opts.abeEndpoint, opts.dial)
if err != nil {
return nil, err
}
err = examplepb.RegisterFlowCombinationHandlerFromEndpoint(ctx, mux, opts.flowEndpoint, opts.dial)
if err != nil {
return nil, err
}
return mux, nil
}

// NewTCPGateway returns a new gateway server which connect to the gRPC service with TCP.
// "addr" must be a valid TCP address with a port number.
func NewTCPGateway(ctx context.Context, addr string, opts ...gwruntime.ServeMuxOption) (http.Handler, error) {
return newGateway(ctx, optSet{
mux: opts,
dial: []grpc.DialOption{grpc.WithInsecure()},
echoEndpoint: addr,
abeEndpoint: addr,
flowEndpoint: addr,
})
}

// NewUnixGatway returns a new gateway server which connect to the gRPC service with a unix domain socket.
// "addr" must be a valid path to the socket.
func NewUnixGateway(ctx context.Context, addr string, opts ...gwruntime.ServeMuxOption) (http.Handler, error) {
return newGateway(ctx, optSet{
mux: opts,
dial: []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
},
echoEndpoint: addr,
abeEndpoint: addr,
flowEndpoint: addr,
})
}
5 changes: 5 additions & 0 deletions examples/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,13 @@ func TestEcho(t *testing.T) {
}

func TestForwardResponseOption(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

go func() {
if err := Run(
ctx,
":8081",
runtime.WithForwardResponseOption(
func(_ context.Context, w http.ResponseWriter, _ proto.Message) error {
Expand Down
71 changes: 37 additions & 34 deletions examples/main.go
Original file line number Diff line number Diff line change
@@ -1,49 +1,24 @@
package main

import (
"context"
"flag"
"fmt"
"net/http"
"path"
"strings"

"context"
"github.com/golang/glog"
"github.com/grpc-ecosystem/grpc-gateway/examples/examplepb"
"github.com/grpc-ecosystem/grpc-gateway/examples/gateway"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"
)

var (
echoEndpoint = flag.String("echo_endpoint", "localhost:9090", "endpoint of EchoService")
abeEndpoint = flag.String("more_endpoint", "localhost:9090", "endpoint of ABitOfEverythingService")
flowEndpoint = flag.String("flow_endpoint", "localhost:9090", "endpoint of FlowCombination")

endpoint = flag.String("endpoint", "localhost:9090", "endpoint of the gRPC service")
network = flag.String("network", "tcp", `one of "tcp" or "unix". Must be consistent to -endpoint`)
swaggerDir = flag.String("swagger_dir", "examples/examplepb", "path to the directory which contains swagger definitions")
)

// newGateway returns a new gateway server which translates HTTP into gRPC.
func newGateway(ctx context.Context, opts ...runtime.ServeMuxOption) (http.Handler, error) {
mux := runtime.NewServeMux(opts...)
dialOpts := []grpc.DialOption{grpc.WithInsecure()}
err := examplepb.RegisterEchoServiceHandlerFromEndpoint(ctx, mux, *echoEndpoint, dialOpts)
if err != nil {
return nil, err
}
err = examplepb.RegisterStreamServiceHandlerFromEndpoint(ctx, mux, *abeEndpoint, dialOpts)
if err != nil {
return nil, err
}
err = examplepb.RegisterABitOfEverythingServiceHandlerFromEndpoint(ctx, mux, *abeEndpoint, dialOpts)
if err != nil {
return nil, err
}
err = examplepb.RegisterFlowCombinationHandlerFromEndpoint(ctx, mux, *flowEndpoint, dialOpts)
if err != nil {
return nil, err
}
return mux, nil
}

func serveSwagger(w http.ResponseWriter, r *http.Request) {
if !strings.HasSuffix(r.URL.Path, ".swagger.json") {
glog.Errorf("Not Found: %s", r.URL.Path)
Expand Down Expand Up @@ -81,9 +56,19 @@ func preflightHandler(w http.ResponseWriter, r *http.Request) {
return
}

func newGateway(ctx context.Context, opts ...runtime.ServeMuxOption) (http.Handler, error) {
switch *network {
case "tcp":
return gateway.NewTCPGateway(ctx, *endpoint, opts...)
case "unix":
return gateway.NewUnixGateway(ctx, *endpoint, opts...)
default:
return nil, fmt.Errorf("unsupported network type %q:", *network)
}
}

// Run starts a HTTP server and blocks forever if successful.
func Run(address string, opts ...runtime.ServeMuxOption) error {
ctx := context.Background()
func Run(ctx context.Context, address string, opts ...runtime.ServeMuxOption) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -96,14 +81,32 @@ func Run(address string, opts ...runtime.ServeMuxOption) error {
}
mux.Handle("/", gw)

return http.ListenAndServe(address, allowCORS(mux))
s := &http.Server{
Addr: address,
Handler: allowCORS(mux),
}
go func() {
<-ctx.Done()
glog.Infof("Shutting down the http server")
if err := s.Shutdown(context.Background()); err != nil {
glog.Errorf("Failed to shutdown http server: %v", err)
}
}()

glog.Infof("Starting listening at %s", address)
if err := s.ListenAndServe(); err != http.ErrServerClosed {
glog.Errorf("Failed to listen and serve: %v", err)
return err
}
return nil
}

func main() {
flag.Parse()
defer glog.Flush()

if err := Run(":8080"); err != nil {
ctx := context.Background()
if err := Run(ctx, ":8080"); err != nil {
glog.Fatal(err)
}
}
15 changes: 11 additions & 4 deletions examples/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ import (
"testing"
"time"

"github.com/golang/glog"
server "github.com/grpc-ecosystem/grpc-gateway/examples/server"
"golang.org/x/net/context"
)

func runServers() <-chan error {
func runServers(ctx context.Context) <-chan error {
ch := make(chan error, 2)
go func() {
if err := server.Run(); err != nil {
if err := server.Run(ctx, *network, *endpoint); err != nil {
ch <- fmt.Errorf("cannot run grpc service: %v", err)
}
}()
go func() {
if err := Run(":8080"); err != nil {
if err := Run(ctx, ":8080"); err != nil {
ch <- fmt.Errorf("cannot run gateway service: %v", err)
}
}()
Expand All @@ -27,7 +29,11 @@ func runServers() <-chan error {

func TestMain(m *testing.M) {
flag.Parse()
errCh := runServers()
defer glog.Flush()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := runServers(ctx)

ch := make(chan int, 1)
go func() {
Expand All @@ -40,6 +46,7 @@ func TestMain(m *testing.M) {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
case status := <-ch:
cancel()
os.Exit(status)
}
}
Loading

0 comments on commit b9de75e

Please sign in to comment.