Skip to content

Commit

Permalink
Revert "feat: use unix domain socket"
Browse files Browse the repository at this point in the history
This reverts commit 44b321e.
  • Loading branch information
alexec committed Sep 15, 2021
1 parent d7060b3 commit 7db2d38
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 47 deletions.
35 changes: 9 additions & 26 deletions runner/sidecar/in.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"strconv"
Expand All @@ -17,21 +16,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
// https://www.loginradius.com/blog/async/tune-the-go-http-client-for-high-performance/
httpTransport = http.DefaultTransport.(*http.Transport).Clone()
httpClient = &http.Client{
Transport: httpTransport,
Timeout: 10 * time.Second,
}
)

func init() {
httpTransport.MaxIdleConns = 32
httpTransport.MaxConnsPerHost = 32
httpTransport.MaxIdleConnsPerHost = 32
}

func connectIn(ctx context.Context, sink func(context.Context, []byte) error) (func(context.Context, []byte) error, error) {
inFlight := promauto.NewGauge(prometheus.GaugeOpts{
Subsystem: "input",
Expand Down Expand Up @@ -80,14 +64,20 @@ func connectIn(ctx context.Context, sink func(context.Context, []byte) error) (f
return nil, err
}
addStopHook(waitUnready)
// https://www.loginradius.com/blog/async/tune-the-go-http-client-for-high-performance/
t := http.DefaultTransport.(*http.Transport).Clone()
t.MaxIdleConns = 100
t.MaxConnsPerHost = 100
t.MaxIdleConnsPerHost = 100
httpClient := &http.Client{Timeout: 10 * time.Second, Transport: t}
return func(ctx context.Context, data []byte) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "messages")
defer span.Finish()
inFlight.Inc()
defer inFlight.Dec()
start := time.Now()
defer func() { messageTimeSeconds.Observe(time.Since(start).Seconds()) }()
req, err := http.NewRequestWithContext(ctx, "POST", "http://127.0.0.1:8080/messages", bytes.NewBuffer(data))
req, err := http.NewRequestWithContext(ctx, "POST", "http://localhost:8080/messages", bytes.NewBuffer(data))
if err != nil {
return err
}
Expand Down Expand Up @@ -117,20 +107,13 @@ func connectIn(ctx context.Context, sink func(context.Context, []byte) error) (f
}

func waitReady(ctx context.Context) error {
const ipcSockPath = "/var/run/argo-dataflow/ipc.sock"
for {
select {
case <-ctx.Done():
return fmt.Errorf("failed to wait for ready: %w", ctx.Err())
default:
if _, err := os.Stat(ipcSockPath); err == nil {
logger.Info("switching to Unix socket", "path", ipcSockPath)
httpTransport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", ipcSockPath)
}
}
logger.Info("waiting for HTTP in interface to be ready")
if resp, err := httpClient.Get("http://127.0.0.1:8080/ready"); err == nil && resp.StatusCode < 300 {
if resp, err := http.Get("http://localhost:8080/ready"); err == nil && resp.StatusCode < 300 {
logger.Info("HTTP in interface ready")
return nil
}
Expand All @@ -146,7 +129,7 @@ func waitUnready(ctx context.Context) error {
return fmt.Errorf("failed to wait for un-ready: %w", ctx.Err())
default:
logger.Info("waiting for HTTP in interface to be unready")
if resp, err := httpClient.Get("http://127.0.0.1:8080/ready"); err != nil || resp.StatusCode >= 300 {
if resp, err := http.Get("http://localhost:8080/ready"); err != nil || resp.StatusCode >= 300 {
logger.Info("HTTP in interface unready")
return nil
}
Expand Down
15 changes: 8 additions & 7 deletions runner/sidecar/sink/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -36,15 +37,15 @@ func New(ctx context.Context, sinkName string, secretInterface corev1.SecretInte
header.Add(h.Name, string(secret.Data[r.Key]))
}
}
t := http.DefaultTransport.(*http.Transport).Clone()
t.MaxIdleConns = 32
t.MaxConnsPerHost = 32
t.MaxIdleConnsPerHost = 32
t.TLSClientConfig.InsecureSkipVerify = x.InsecureSkipVerify
return httpSink{
sinkName,
header,
&http.Client{Timeout: 10 * time.Second, Transport: t},
&http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: x.InsecureSkipVerify},
},
},
x.URL,
}, nil
}
Expand All @@ -66,8 +67,8 @@ func (h httpSink) Sink(ctx context.Context, msg []byte) error {
if resp, err := h.client.Do(req); err != nil {
return fmt.Errorf("failed to send HTTP request: %w", err)
} else {
defer func() { _ = resp.Body.Close() }()
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("failed to send HTTP request: %q", resp.Status)
}
Expand Down
9 changes: 5 additions & 4 deletions runner/sidecar/source/loadbalanced/loadbalanced.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package loadbalanced
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -53,10 +54,10 @@ func New(ctx context.Context, r NewReq) (source.HasPending, error) {
if r.LeadReplica {
endpoint := "https://" + r.PipelineName + "-" + r.StepName + "/sources/" + r.SourceName
t := http.DefaultTransport.(*http.Transport).Clone()
t.MaxIdleConns = 32
t.MaxConnsPerHost = 32
t.MaxIdleConnsPerHost = 32
t.TLSClientConfig.InsecureSkipVerify = true
t.MaxIdleConns = 100
t.MaxConnsPerHost = 100
t.MaxIdleConnsPerHost = 100
t.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
httpClient := &http.Client{Timeout: 10 * time.Second, Transport: t}

logger.Info("starting lead replica's workers", "source", r.SourceName, "endpoint", endpoint)
Expand Down
14 changes: 4 additions & 10 deletions sdks/golang/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"io/ioutil"
"log"
"net"
"net/http"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
Expand All @@ -23,10 +22,9 @@ func StartWithContext(ctx context.Context, handler func(ctx context.Context, msg
})
http.HandleFunc("/messages", func(w http.ResponseWriter, r *http.Request) {
ctx := dfv1.MetaExtract(r.Context(), r.Header)
defer func() { _ = r.Body.Close() }()
out, err := func() ([]byte, error) {
in, err := ioutil.ReadAll(r.Body)
_ = r.Body.Close()
if err != nil {
if in, err := ioutil.ReadAll(r.Body); err != nil {
return nil, err
} else {
return handler(ctx, in)
Expand All @@ -44,19 +42,15 @@ func StartWithContext(ctx context.Context, handler func(ctx context.Context, msg
})
// https://medium.com/honestbee-tw-engineer/gracefully-shutdown-in-go-http-server-5f5e6b83da5a
server := &http.Server{Addr: ":8080"}
listener, err := net.Listen("unix", "/var/run/argo-dataflow/ipc.sock")
if err != nil {
return err
}
defer func() { _ = listener.Close() }()

go func() {
defer func() {
r := recover()
if r != nil {
println(r)
}
}()
if err := server.Serve(listener); err != nil && err != http.ErrServerClosed {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
panic(err)
}
}()
Expand Down

0 comments on commit 7db2d38

Please sign in to comment.