diff --git a/channel/channel_test.go b/channel/channel_test.go index 755b380..0090ce8 100644 --- a/channel/channel_test.go +++ b/channel/channel_test.go @@ -200,46 +200,3 @@ func TestHeaderTypeMismatch(t *testing.T) { } } } - -func TestWithTrigger(t *testing.T) { - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - r, w := io.Pipe() - triggered := false - ch := WithTrigger(test.framing(r, w), func() { - triggered = true - }) - - // Send a message to the channel, then close it. - const message = `["fools", "rush", "in"]` - done := make(chan struct{}) - go func() { - defer close(done) - t.Log("Sending...") - if err := ch.Send([]byte(message)); err != nil { - t.Errorf("Send failed: %v", err) - } - t.Logf("Close: err=%v", ch.Close()) - }() - - // Read messages from the channel till it closes, then check that - // the trigger was correctly invoked. - for { - msg, err := ch.Recv() - if err == io.EOF { - t.Log("Recv: returned io.EOF") - break - } else if err != nil { - t.Errorf("Recv: unexpected error: %v", err) - break - } - t.Logf("Recv: msg=%q", string(msg)) - } - - <-done - if !triggered { - t.Error("After channel close: trigger not called") - } - }) - } -} diff --git a/channel/types.go b/channel/types.go index 20e71fc..43eef01 100644 --- a/channel/types.go +++ b/channel/types.go @@ -9,30 +9,6 @@ import ( // message-framing discipline. type Framing func(io.Reader, io.WriteCloser) Channel -// WithTrigger returns a Channel that delegates I/O operations to ch, and when -// a Recv operation on ch returns io.EOF it synchronously calls the trigger. -func WithTrigger(ch Channel, trigger func()) Channel { - return triggered{ch: ch, trigger: trigger} -} - -type triggered struct { - ch Channel - trigger func() -} - -// Recv implements part of the channel.Channel interface. It delegates to the -// wrapped channel and calls the trigger when the delegate returns io.EOF. -func (c triggered) Recv() ([]byte, error) { - msg, err := c.ch.Recv() - if err == io.EOF { - c.trigger() - } - return msg, err -} - -func (c triggered) Send(msg []byte) error { return c.ch.Send(msg) } -func (c triggered) Close() error { return c.ch.Close() } - type direct struct { send chan<- []byte recv <-chan []byte