Skip to content

Commit e40f551

Browse files
committed
proxy#4
1 parent 0b45798 commit e40f551

File tree

1 file changed

+14
-25
lines changed

1 file changed

+14
-25
lines changed

proxy/kafkaproxy.go

+14-25
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"log"
77
"net"
88
"time"
9-
"sync"
109
)
1110

1211

@@ -37,34 +36,24 @@ func main() {
3736
}
3837

3938
go func() {
40-
dialConn, err := net.DialTimeout("tcp", *bootstrap, 10*time.Second)
41-
if err != nil {
42-
log.Fatalf("error establishing connection on bootstrap: %s", err)
43-
}
44-
handleProxiedRequest(conn, dialConn)
45-
}()
46-
}
47-
}
48-
49-
func handleProxiedRequest(conn net.Conn, dialConn net.Conn) {
39+
dialConn, err := net.DialTimeout("tcp", *bootstrap, 10*time.Second)
40+
if err == nil {
41+
log.Printf("Connection request from %q to %q", dialConn.LocalAddr(), dialConn.RemoteAddr())
42+
}
43+
if err != nil {
44+
log.Fatalf("error establishing connection on bootstrap: %s", err)
45+
}
5046

51-
log.Printf("connection started: client=%v destination=%v",conn.RemoteAddr().String(),dialConn.RemoteAddr().String())
52-
start := time.Now()
47+
go io.Copy(dialConn, conn)
48+
io.Copy(conn, dialConn)
5349

54-
var wg sync.WaitGroup
55-
wg.Add(1)
56-
go func() {
57-
defer wg.Done()
58-
io.Copy(conn, dialConn)
59-
}()
60-
io.Copy(dialConn, conn)
61-
wg.Wait()
62-
defer conn.Close()
63-
defer dialConn.Close()
50+
dialConn.Close()
51+
conn.Close()
6452

65-
elapsed := time.Now().Sub(start)
66-
log.Printf("connection ended: client=%v destination=%v duration=%v",conn.RemoteAddr().String(),dialConn.RemoteAddr().String(),elapsed.String())
53+
}()
54+
}
6755
}
6856

6957

7058

59+

0 commit comments

Comments
 (0)