Add registered clients integration test

This commit is contained in:
Moises P. Sena 2018-10-15 19:02:30 -03:00
parent 9b0eb8c40b
commit e09981d3c3
4 changed files with 121 additions and 12 deletions

View file

@ -89,12 +89,13 @@ func makeEcho(t testing.TB) (http net.Listener, tcp net.Listener) {
return
}
func makeTunnelServer(t testing.TB) *tunnel.Server {
func makeTunnelServer(t testing.TB, clientsProvider tunnel.RegisteredClientsProvider) *tunnel.Server {
s, err := tunnel.NewServer(&tunnel.ServerConfig{
Addr: ":0",
AutoSubscribe: true,
AutoSubscribe: clientsProvider == nil,
TLSConfig: tlsConfig(),
Logger: log.NewStdLogger(),
RegisteredClientsProvider: clientsProvider,
})
if err != nil {
t.Fatal(err)
@ -118,14 +119,14 @@ func makeTunnelClient(t testing.TB, serverAddr string, httpLocalAddr, httpAddr,
tunnels := map[string]*proto.Tunnel{
proto.HTTP: {
Protocol: proto.HTTP,
Host: "localhost",
Auth: "user:password",
Protocol: proto.HTTP,
Host: "localhost",
Auth: "user:password",
LocalAddr: httpLocalAddr.String(),
},
proto.TCP: {
Protocol: proto.TCP,
LocalAddr: tcpLocalAddr.String(),
Protocol: proto.TCP,
LocalAddr: tcpLocalAddr.String(),
},
}
@ -158,7 +159,7 @@ func TestIntegration(t *testing.T) {
defer tcp.Close()
// server
s := makeTunnelServer(t)
s := makeTunnelServer(t, nil)
defer s.Stop()
h := httptest.NewServer(s)
defer h.Close()
@ -197,7 +198,7 @@ func TestIntegration(t *testing.T) {
}()
wg.Add(1)
go func() {
testTCP(t, tcpLocalAddr, p, r)
testTCP(t, tcpLocalAddr, p, r, false)
wg.Done()
}()
}
@ -205,6 +206,94 @@ func TestIntegration(t *testing.T) {
wg.Wait()
}
func makeTunnelRegisteredClient(t testing.TB, serverAddr string, tcpLocalAddr net.Addr) *tunnel.Client {
tcpProxy := tunnel.NewTCPProxy(tcpLocalAddr.String(), log.NewStdLogger())
tunnels := map[string]*proto.Tunnel{
proto.TCP: {
Protocol: proto.TCP,
LocalAddr: tcpLocalAddr.String(),
},
}
c, err := tunnel.NewClient(&tunnel.ClientConfig{
ServerAddr: serverAddr,
TLSClientConfig: tlsConfig(),
Tunnels: tunnels,
Registered: true,
Proxy: tunnel.Proxy(tunnel.ProxyFuncs{
TCP: tcpProxy.Proxy,
}),
Logger: log.NewStdLogger(),
})
if err != nil {
t.Fatal(err)
}
go func() {
if err := c.Start(); err != nil {
t.Log(err)
}
}()
return c
}
func TestRegisteredClientIntegration(t *testing.T) {
// local services
tcp, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
go echoTCP(tcp)
defer tcp.Close()
tcpRemoteAddr := freeAddr()
numConnections := 3
// server
s := makeTunnelServer(t, &registeredClientsProvider{&tunnel.RegisteredClientConfig{
Connections: numConnections,
Tunnels: map[string]*proto.Tunnel{
proto.TCP: {
Protocol: proto.TCP,
RemoteAddr: tcpRemoteAddr.String(),
},
},
}})
defer s.Stop()
// controller
c := makeTunnelRegisteredClient(t, s.Addr(), tcp.Addr())
// FIXME: replace sleep with controller state change watch when ready
time.Sleep(500 * time.Millisecond)
defer c.Stop()
payload := randPayload(payloadInitialSize, payloadLen)
table := []struct {
S []uint
}{
{[]uint{200, 160, 120, 80, 40, 20}},
{[]uint{40, 80, 120, 160, 200}},
{[]uint{0, 0, 0, 0, 0, 200}},
}
var wg sync.WaitGroup
for _, test := range table {
for i, repeat := range test.S {
p := payload[i]
r := repeat
for i := 0; i <= numConnections; i++ {
wg.Add(1)
go func() {
testTCP(t, tcpRemoteAddr, p, r, true)
wg.Done()
}()
}
}
}
wg.Wait()
}
func testHTTP(t testing.TB, addr net.Addr, payload []byte, repeat uint) {
url := fmt.Sprintf("http://localhost:%s/some/path", port(addr))
@ -234,13 +323,17 @@ func testHTTP(t testing.TB, addr net.Addr, payload []byte, repeat uint) {
}
}
func testTCP(t testing.TB, addr net.Addr, payload []byte, repeat uint) {
func testTCP(t testing.TB, addr net.Addr, payload []byte, repeat uint, sleep bool) {
conn, err := net.Dial("tcp", addr.String())
if err != nil {
t.Fatal("Dial failed", err)
}
defer conn.Close()
if sleep {
time.Sleep(50 * time.Millisecond)
}
var buf = make([]byte, 10*1024*1024)
var read, write int
for repeat > 0 {

View file

@ -150,8 +150,9 @@ func (p *connPool) GetClientConn(req *http.Request, addr string) (*http2.ClientC
if cp, ok := p.conns[addr]; ok {
conn := cp.next()
setter := req.Context().Value(reqPool)
setter.(*clientConnectionSetter).conn = conn
if setter := req.Context().Value(reqPool); setter != nil {
setter.(*clientConnectionSetter).conn = conn
}
cp.controller.logger.Log("level", 3, "client_conn", fmt.Sprintf("#%d", conn.id), "addr", addr)
return conn.ClientConn, nil
}

View file

@ -0,0 +1,14 @@
package tunnel_test
import (
"github.com/mmatczuk/go-http-tunnel"
"github.com/mmatczuk/go-http-tunnel/id"
)
type registeredClientsProvider struct {
cfg *tunnel.RegisteredClientConfig
}
func (p registeredClientsProvider) Get(clientID id.ID) (client *tunnel.RegisteredClientConfig, err error) {
return p.cfg, nil
}

View file

@ -336,6 +336,7 @@ func (s *Server) handleClient(conn net.Conn, main bool) {
}
if cfg == nil {
cfg = &RegisteredClientConfig{}
if err = json.NewDecoder(&io.LimitedReader{R: resp.Body, N: 126976}).Decode(cfg); err != nil {
logger.Log(
"level", 2,