ws: integration test WIP

This commit is contained in:
Michał Matczuk 2017-08-10 09:22:54 +02:00
parent c035cfcac5
commit 34c13cb387
2 changed files with 66 additions and 104 deletions

View file

@ -19,6 +19,8 @@ import (
"github.com/mmatczuk/go-http-tunnel"
"github.com/mmatczuk/go-http-tunnel/id"
"github.com/mmatczuk/go-http-tunnel/proto"
"golang.org/x/net/websocket"
)
const (
@ -26,8 +28,6 @@ const (
payloadLen = 10
)
// echoHTTP starts serving HTTP requests on listener l, it accepts connections,
// reads request body and writes is back in response.
func echoHTTP(l net.Listener) {
http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
@ -41,7 +41,16 @@ func echoHTTP(l net.Listener) {
}))
}
// echoTCP accepts connections and copies back received bytes.
func echoWS(l net.Listener) {
wsServer := &websocket.Server{Handler: func(ws *websocket.Conn) {
io.Copy(ws, ws)
}}
http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wsServer.ServeHTTP(w, r)
}))
}
func echoTCP(l net.Listener) {
for {
conn, err := l.Accept()
@ -54,16 +63,9 @@ func echoTCP(l net.Listener) {
}
}
func makeEcho(t *testing.T) (http net.Listener, tcp net.Listener) {
func makeEcho(t *testing.T) (http, ws, tcp net.Listener) {
var err error
// TCP echo
tcp, err = net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
go echoTCP(tcp)
// HTTP echo
http, err = net.Listen("tcp", ":0")
if err != nil {
@ -71,6 +73,20 @@ func makeEcho(t *testing.T) (http net.Listener, tcp net.Listener) {
}
go echoHTTP(http)
// WS echo
ws, err = net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
go echoWS(ws)
// TCP echo
tcp, err = net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
go echoTCP(tcp)
return
}
@ -89,17 +105,10 @@ func makeTunnelServer(t *testing.T) *tunnel.Server {
return s
}
func makeTunnelClient(t *testing.T, serverAddr string, httpLocalAddr, httpAddr, tcpLocalAddr, tcpAddr net.Addr) *tunnel.Client {
httpProxy := tunnel.NewMultiHTTPProxy(map[string]*url.URL{
"localhost:" + port(httpLocalAddr): {
Scheme: "http",
Host: "127.0.0.1:" + port(httpAddr),
},
}, nil)
tcpProxy := tunnel.NewMultiTCPProxy(map[string]string{
port(tcpLocalAddr): tcpAddr.String(),
}, nil)
func makeTunnelClient(t *testing.T, serverAddr string, tcpTunAddr, httpAddr, wsAddr, tcpAddr net.Addr) *tunnel.Client {
httpProxy := tunnel.NewHTTPProxy(&url.URL{Scheme: "http", Host: httpAddr.String()}, nil)
wsProxy := tunnel.NewWSProxy(&url.URL{Scheme: "ws", Host: wsAddr.String()}, nil)
tcpProxy := tunnel.NewTCPProxy(tcpAddr.String(), nil)
tunnels := map[string]*proto.Tunnel{
proto.HTTP: {
@ -107,9 +116,10 @@ func makeTunnelClient(t *testing.T, serverAddr string, httpLocalAddr, httpAddr,
Host: "localhost",
Auth: "user:password",
},
// TODO WS tunnel?
proto.TCP: {
Protocol: proto.TCP,
Addr: tcpLocalAddr.String(),
Addr: tcpTunAddr.String(),
},
}
@ -120,6 +130,7 @@ func makeTunnelClient(t *testing.T, serverAddr string, httpLocalAddr, httpAddr,
Tunnels: tunnels,
Proxy: tunnel.Proxy(tunnel.ProxyFuncs{
HTTP: httpProxy.Proxy,
WS: wsProxy.Proxy,
TCP: tcpProxy.Proxy,
}),
})
@ -130,7 +141,7 @@ func makeTunnelClient(t *testing.T, serverAddr string, httpLocalAddr, httpAddr,
func TestIntegration(t *testing.T) {
// local services
http, tcp := makeEcho(t)
http, ws, tcp := makeEcho(t)
defer http.Close()
defer tcp.Close()
@ -140,13 +151,11 @@ func TestIntegration(t *testing.T) {
h := httptest.NewServer(s)
defer h.Close()
httpLocalAddr := h.Listener.Addr()
tcpLocalAddr := freeAddr()
tcpTunAddr := freeAddr()
// client
c := makeTunnelClient(t, s.Addr(),
httpLocalAddr, http.Addr(),
tcpLocalAddr, tcp.Addr(),
c := makeTunnelClient(t, s.Addr(), tcpTunAddr,
http.Addr(), ws.Addr(), tcp.Addr(),
)
// FIXME: replace sleep with client state change watch when ready
time.Sleep(500 * time.Millisecond)
@ -174,7 +183,31 @@ func TestIntegration(t *testing.T) {
}()
wg.Add(1)
go func() {
testTCP(t, tcpLocalAddr, p, r)
config, err := websocket.NewConfig(
fmt.Sprintf("ws://localhost:%s/some/path", port(h.Listener.Addr())),
fmt.Sprintf("http://localhost:%s/", port(h.Listener.Addr())),
)
if err != nil {
panic("Invalid config")
}
config.Header.Set("Authorization", "Basic dXNlcjpwYXNzd29yZA==")
ws, err := websocket.DialConfig(config)
if err != nil {
t.Fatal("Dial failed", err)
}
defer ws.Close()
testConn(t, ws, p, r)
wg.Done()
}()
wg.Add(1)
go func() {
conn, err := net.Dial("tcp", tcpTunAddr.String())
if err != nil {
t.Fatal("Dial failed", err)
}
defer conn.Close()
testConn(t, conn, p, r)
wg.Done()
}()
}
@ -214,13 +247,7 @@ func testHTTP(t *testing.T, addr net.Addr, payload []byte, repeat uint) {
}
}
func testTCP(t *testing.T, addr net.Addr, payload []byte, repeat uint) {
conn, err := net.Dial("tcp", addr.String())
if err != nil {
t.Fatal("Dial failed", err)
}
defer conn.Close()
func testConn(t *testing.T, conn net.Conn, payload []byte, repeat uint) {
var buf = make([]byte, 10*1024*1024)
var read, write int
for repeat > 0 {

View file

@ -566,7 +566,7 @@ func (s *Server) proxyConn(identifier id.ID, conn net.Conn, msg *proto.ControlMe
defer pr.Close()
defer pw.Close()
req, err := s.proxyRequest(identifier, msg, pr)
req, err := s.connectRequest(identifier, msg, pr)
if err != nil {
return err
}
@ -616,24 +616,11 @@ func (s *Server) proxyWS(identifier id.ID, w http.ResponseWriter, r *http.Reques
defer pr.Close()
defer pw.Close()
req, err := s.proxyRequest(identifier, msg, pr)
req, err := s.connectRequest(identifier, msg, pr)
if err != nil {
return err
}
go func() {
err := r.Write(pw)
if err != nil {
s.logger.Log(
"level", 0,
"msg", "proxy error",
"identifier", identifier,
"ctrlMsg", msg,
"err", err,
)
}
}()
resp, err := s.httpClient.Do(req)
if err != nil {
return fmt.Errorf("io error: %s", err)
@ -691,58 +678,6 @@ func (s *Server) proxyWS(identifier id.ID, w http.ResponseWriter, r *http.Reques
return nil
}
func (s *Server) proxyConn(identifier id.ID, conn net.Conn, msg *proto.ControlMessage) error {
s.logger.Log(
"level", 2,
"action", "proxy",
"identifier", identifier,
"ctrlMsg", msg,
)
defer conn.Close()
pr, pw := io.Pipe()
defer pr.Close()
defer pw.Close()
req, err := s.connectRequest(identifier, msg, pr)
if err != nil {
return err
}
done := make(chan struct{})
go func() {
transfer(pw, conn, log.NewContext(s.logger).With(
"dir", "user to client",
"dst", identifier,
"src", conn.RemoteAddr(),
))
close(done)
}()
resp, err := s.httpClient.Do(req)
if err != nil {
return fmt.Errorf("io error: %s", err)
}
transfer(conn, resp.Body, log.NewContext(s.logger).With(
"dir", "client to user",
"dst", conn.RemoteAddr(),
"src", identifier,
))
<-done
s.logger.Log(
"level", 2,
"action", "proxy done",
"identifier", identifier,
"ctrlMsg", msg,
)
return nil
}
func (s *Server) proxyHTTP(identifier id.ID, r *http.Request, msg *proto.ControlMessage) (*http.Response, error) {
s.logger.Log(
"level", 2,