Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(ci): replace toxiproxy client dep #2593

Merged
merged 2 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
"testing"
"time"

toxiproxy "github.com/Shopify/toxiproxy/v2/client"
"github.com/rcrowley/go-metrics"
"github.com/stretchr/testify/require"

"github.com/IBM/sarama/internal/toxiproxy"
)

const TestBatchSize = 1000
Expand Down
4 changes: 2 additions & 2 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"testing"
"time"

toxiproxy "github.com/Shopify/toxiproxy/v2/client"
"github.com/IBM/sarama/internal/toxiproxy"
)

const uncommittedTopic = "uncommitted-topic-test-4"
Expand Down Expand Up @@ -456,7 +456,7 @@ func resetProxies(t testing.TB) {
}

func SaveProxy(t *testing.T, px string) {
if err := FunctionalTestEnv.Proxies[px].Save(); err != nil {
if _, err := FunctionalTestEnv.Proxies[px].Save(); err != nil {
t.Fatal(err)
}
}
Expand Down
4 changes: 4 additions & 0 deletions internal/toxiproxy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# toxiproxy

A minimal client implementation to setup proxies and toxics in toxiproxy as used in the FV suite.
We have our own minimal client implementation to avoid having to pull in the toxiproxy repo which carries a number of transitive dependencies.
94 changes: 94 additions & 0 deletions internal/toxiproxy/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package toxiproxy

import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"time"
)

type Client struct {
httpClient *http.Client
endpoint string
}

func NewClient(endpoint string) *Client {
return &Client{
httpClient: &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
},
endpoint: endpoint,
}
}

func (c *Client) CreateProxy(
name string,
listenAddr string,
targetAddr string,
) (*Proxy, error) {
proxy := &Proxy{
Name: name,
ListenAddr: listenAddr,
TargetAddr: targetAddr,
Enabled: true,
client: c,
}
return proxy.Save()
}

func (c *Client) Proxy(name string) (*Proxy, error) {
req, err := http.NewRequest("GET", c.endpoint+"/proxies/"+name, nil)
if err != nil {
return nil, fmt.Errorf("failed to make proxy request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to http get proxy: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("error getting proxy %s: %s %s", name, resp.Status, body)
}

var p Proxy
if err := json.NewDecoder(resp.Body).Decode(&p); err != nil {
return nil, fmt.Errorf("error decoding json for proxy %s: %w", name, err)
}
p.client = c

return &p, nil
}

func (c *Client) ResetState() error {
req, err := http.NewRequest("POST", c.endpoint+"/reset", http.NoBody)
if err != nil {
return fmt.Errorf("failed to make reset request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to http post reset: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != 204 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("error resetting proxies: %s %s", resp.Status, body)
}

return nil
}
114 changes: 114 additions & 0 deletions internal/toxiproxy/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package toxiproxy

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
)

type Proxy struct {
client *Client
Name string `json:"name"`
ListenAddr string `json:"listen"`
TargetAddr string `json:"upstream"`
Enabled bool `json:"enabled"`
}

type Attributes map[string]int

func (p *Proxy) AddToxic(
name string,
toxicType string,
stream string,
toxicity float32,
attributes Attributes,
) (*Toxic, error) {
toxic := &Toxic{
Name: name,
Type: toxicType,
Stream: stream,
Toxicity: toxicity,
Attributes: attributes,
}
var b bytes.Buffer
if err := json.NewEncoder(&b).Encode(&toxic); err != nil {
return nil, fmt.Errorf("failed to json encode toxic: %w", err)
}
body := bytes.NewReader(b.Bytes())

c := p.client
req, err := http.NewRequest("POST", c.endpoint+"/proxies/"+p.Name+"/toxics", body)
if err != nil {
return nil, fmt.Errorf("failed to make post toxic request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to http post toxic: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("error creating toxic %s: %s %s", name, resp.Status, body)
}

return toxic, nil
}

func (p *Proxy) Enable() error {
p.Enabled = true
_, err := p.Save()
return err
}

func (p *Proxy) Disable() error {
p.Enabled = false
_, err := p.Save()
return err
}

func (p *Proxy) Save() (*Proxy, error) {
var b bytes.Buffer
if err := json.NewEncoder(&b).Encode(&p); err != nil {
return nil, fmt.Errorf("failed to json encode proxy: %w", err)
}
body := bytes.NewReader(b.Bytes())

c := p.client
req, err := http.NewRequest("POST", c.endpoint+"/proxies/"+p.Name, body)
if err != nil {
return nil, fmt.Errorf("failed to make post proxy request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to http post proxy: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode == 404 {
if _, err := body.Seek(0, io.SeekStart); err != nil {
return nil, fmt.Errorf("failed to rewind post body: %w", err)
}
req, err = http.NewRequest("POST", c.endpoint+"/proxies", body)
if err != nil {
return nil, fmt.Errorf("failed to make post proxy request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err = c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to http post proxy: %w", err)
}
defer resp.Body.Close()
}

if resp.StatusCode != 200 && resp.StatusCode != 201 {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("error saving proxy: %s %s", resp.Status, body)
}

return p, nil
}
9 changes: 9 additions & 0 deletions internal/toxiproxy/toxic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package toxiproxy

type Toxic struct {
Name string `json:"name"`
Type string `json:"type"`
Stream string `json:"stream,omitempty"`
Toxicity float32 `json:"toxicity"`
Attributes Attributes `json:"attributes"`
}