client: httpproxy

This commit is contained in:
mmatczuk 2017-01-18 08:58:53 +01:00
parent 248b29c947
commit e5736fb1f3
10 changed files with 350 additions and 150 deletions

View file

@ -1,7 +1,6 @@
1. `ClientState` changes channel, on both client and server
1. WebSockets proxing
1. UDP and IP proxing
1. Default proxy functions
1. Dynamic `AllowedClient` management
1. Client driven configuration, on connect client sends it's configuration, server just needs to know the certificate id
1. URL prefix based routing, like urlprefix tag in fabio https://github.com/eBay/fabio/wiki/Quickstart
@ -9,4 +8,5 @@
1. Stream compression
1. `ControlMessage` `String()` function for better logging
1. Use of `sync.Pool` to avoid allocations of `ControlMessage`
1. Client and server commands (hcl configuration?)
1. Client and server commands (hcl configuration?)
1. Server as RoundTripper

View file

@ -14,8 +14,8 @@ import (
)
var (
// DefaultDialTimeout specifies who long client should wait for server or
// local service connection.
// DefaultDialTimeout specifies how long client should wait for tunnel
// server or local service connection.
DefaultDialTimeout = 10 * time.Second
)
@ -142,7 +142,7 @@ func (c *Client) serveHTTP(w http.ResponseWriter, r *http.Request) {
return
}
c.log.Debug("Start proxying %v", msg)
c.config.Proxy(flushWriter{w}, r.Body, msg)
c.config.Proxy(w, r.Body, msg)
c.log.Debug("Done proxying %v", msg)
}

151
httpproxy.go Normal file
View file

@ -0,0 +1,151 @@
package tunnel
import (
"bufio"
"context"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"github.com/koding/logging"
"github.com/mmatczuk/tunnel/proto"
)
// HTTPProxy forwards HTTP traffic.
type HTTPProxy struct {
httputil.ReverseProxy
// Log is the proxy logger.
Log logging.Logger
// localURL specifies default base URL of local service.
localURL *url.URL
// localURLMap specifies mapping from ControlMessage ForwardedBy to
// local service URL, keys may contain host and port, only host or
// only port. The order of precedence is the following
// * host and port
// * port
// * host
localURLMap map[string]*url.URL
}
// NewHTTPProxy creates a new direct HTTPProxy, everything will be proxied to
// localURL.
func NewHTTPProxy(localURL *url.URL) *HTTPProxy {
if localURL == nil {
panic("Missing URL")
}
p := &HTTPProxy{
Log: logging.NewLogger("httpproxy"),
localURL: localURL,
}
p.ReverseProxy.Director = p.Director
return p
}
// NewMultiHTTPProxy creates a new dispatching HTTPProxy, requests may go to
// different backends based on localURLMap, see HTTPProxy localURLMap docs for
// more details.
func NewMultiHTTPProxy(localURLMap map[string]*url.URL) *HTTPProxy {
if localURLMap == nil || len(localURLMap) == 0 {
panic("Missing URL map")
}
p := &HTTPProxy{
Log: logging.NewLogger("httpproxy"),
localURLMap: localURLMap,
}
p.ReverseProxy.Director = p.Director
return p
}
// Proxy is a ProxyFunc.
func (p *HTTPProxy) Proxy(w io.Writer, r io.ReadCloser, msg *proto.ControlMessage) {
if msg.Protocol != proto.HTTP {
panic(fmt.Sprintf("Expected proxy protocol, got %s", msg.Protocol))
}
rw, ok := w.(http.ResponseWriter)
if !ok {
panic(fmt.Sprintf("Expected http.ResponseWriter got %t", w))
}
req, err := http.ReadRequest(bufio.NewReader(r))
if err != nil {
p.Log.Warning("Failed to read request: %s", err)
return
}
req.URL.Host = msg.ForwardedBy
req.URL.Path = msg.URLPath
p.ServeHTTP(rw, req)
}
// Director is ReverseProxy Director it changes request URL so that the request
// is correctly routed based on localURL and localURLMap. If no URL can be found
// the request is canceled.
func (p *HTTPProxy) Director(req *http.Request) {
target := p.localURLFor(req.URL)
if target == nil {
_, cancel := context.WithCancel(req.Context())
cancel()
return
}
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
targetQuery := target.RawQuery
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
if _, ok := req.Header["User-Agent"]; !ok {
// explicitly disable User-Agent so it's not set to default value
req.Header.Set("User-Agent", "")
}
}
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
func (p *HTTPProxy) localURLFor(u *url.URL) *url.URL {
if p.localURLMap == nil {
return p.localURL
}
// try host and port
hostPort := u.Host
if addr := p.localURLMap[hostPort]; addr != nil {
return addr
}
// try port
host, port, _ := net.SplitHostPort(hostPort)
if addr := p.localURLMap[port]; addr != nil {
return addr
}
// try host
if addr := p.localURLMap[host]; addr != nil {
return addr
}
return p.localURL
}

View file

@ -5,11 +5,11 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
@ -18,19 +18,6 @@ import (
"github.com/mmatczuk/tunnel/tunneltest"
)
// echo accepts connections and echos results.
func echo(l net.Listener) {
for {
conn, err := l.Accept()
if err != nil {
return
}
go func() {
io.Copy(conn, conn)
}()
}
}
const (
payloadInitialSize = 32
payloadLen = 10
@ -38,10 +25,10 @@ const (
// testContext stores state shared between sub tests.
type testContext struct {
// handler is entry point for HTTP tests.
handler http.Handler
// listener is entry point for TCP tests.
listener net.Listener
// httpAddr is address for HTTP tests.
httpAddr net.Addr
// listener is address for TCP tests.
tcpAddr net.Addr
// payload is pre generated random data.
payload [][]byte
}
@ -50,22 +37,21 @@ var ctx testContext
func TestMain(m *testing.M) {
// prepare server TCP listener
l, err := net.Listen("tcp", ":0")
serverTCPListener, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
defer l.Close()
cert, id := selfSignedCert()
defer serverTCPListener.Close()
// prepare tunnel server
cert, id := selfSignedCert()
s, err := tunnel.NewServer(&tunnel.ServerConfig{
TLSConfig: tunneltest.TLSConfig(cert),
AllowedClients: []*tunnel.AllowedClient{
{
ID: id,
Host: "foobar.com",
Listeners: []net.Listener{l},
Host: "localhost",
Listeners: []net.Listener{serverTCPListener},
},
},
})
@ -75,30 +61,58 @@ func TestMain(m *testing.M) {
s.Start()
defer s.Close()
// prepare local TCP echo service
e, err := net.Listen("tcp", ":0")
// run server HTTP interface
serverHTTPListener, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
go echo(e)
defer serverHTTPListener.Close()
go http.Serve(serverHTTPListener, s)
// prepare local TCP echo service
echoTCPListener, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
defer echoTCPListener.Close()
go tunneltest.EchoTCP(echoTCPListener)
// prepare local HTTP echo service
echoHTTPListener, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
defer echoHTTPListener.Close()
go tunneltest.EchoHTTP(echoHTTPListener)
// prepare proxy
httpproxy := tunnel.NewMultiHTTPProxy(map[string]*url.URL{
"localhost:" + port(serverHTTPListener.Addr()): {
Scheme: "http",
Host: echoHTTPListener.Addr().String(),
},
})
tcpproxy := tunnel.NewMultiTCPProxy(map[string]string{
port(serverTCPListener.Addr()): echoTCPListener.Addr().String(),
})
proxy := tunnel.Proxy(tunnel.ProxyFuncs{
HTTP: httpproxy.Proxy,
TCP: tcpproxy.Proxy,
})
// prepare tunnel client
proxy := &tunnel.TCPProxy{
LocalAddrMap: map[string]string{port(l.Addr()): e.Addr().String()},
}
c := tunnel.NewClient(&tunnel.ClientConfig{
ServerAddr: s.Addr(),
TLSClientConfig: tunneltest.TLSConfig(cert),
Proxy: proxy.Proxy,
Proxy: proxy,
})
if err := c.Start(); err != nil {
panic(err)
}
defer c.Stop()
ctx.handler = s
ctx.listener = l
ctx.httpAddr = serverHTTPListener.Addr()
ctx.tcpAddr = serverTCPListener.Addr()
ctx.payload = randPayload(payloadInitialSize, payloadLen)
m.Run()
@ -136,12 +150,12 @@ func TestProxying(t *testing.T) {
name string
seq []uint
}{
//{"http", "small", []uint{100, 80, 60, 40, 20, 10}},
//{"http", "mid", []uint{20, 40, 60, 80, 100}},
//{"http", "big", []uint{0, 0, 0, 0, 0, 0, 0, 0, 0, 100}},
{"tcp", "small", []uint{100, 80, 60, 40, 20, 10}},
{"tcp", "mid", []uint{20, 40, 60, 80, 100}},
{"tcp", "big", []uint{0, 0, 0, 0, 0, 0, 0, 0, 0, 100}},
{"http", "small", []uint{200, 160, 120, 80, 40, 20}},
{"http", "mid", []uint{40, 80, 120, 160, 200}},
{"http", "big", []uint{0, 0, 0, 0, 0, 0, 0, 0, 0, 200}},
{"tcp", "small", []uint{200, 160, 120, 80, 40, 20}},
{"tcp", "mid", []uint{40, 80, 120, 160, 200}},
{"tcp", "big", []uint{0, 0, 0, 0, 0, 0, 0, 0, 0, 200}},
}
for _, tt := range data {
@ -162,24 +176,30 @@ func TestProxying(t *testing.T) {
}
func testHTTP(t *testing.T, seq []uint) {
var buf = bytes.NewBuffer(bigBuffer())
for idx, s := range seq {
for s > 0 {
r, err := http.NewRequest(http.MethodPost, "http://foobar.com/some/path", bytes.NewReader(ctx.payload[idx]))
url := fmt.Sprintf("http://localhost:%s/some/path", port(ctx.httpAddr))
r, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(ctx.payload[idx]))
if err != nil {
panic("Failed to create request")
}
buf.Reset()
w := &httptest.ResponseRecorder{
HeaderMap: make(http.Header),
Body: buf,
Code: 200,
resp, err := http.DefaultClient.Do(r)
if err != nil {
panic(fmt.Sprintf("HTTP error %s", err))
}
ctx.handler.ServeHTTP(w, r)
if w.Code != http.StatusOK {
t.Error("Unexpected status code", w)
if resp.StatusCode != http.StatusOK {
t.Error("Unexpected status code", resp)
}
n, m := w.Body.Len(), len(ctx.payload[idx])
if resp.Body == nil {
t.Error("No body")
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Error("Read error")
}
n, m := len(b), len(ctx.payload[idx])
if n != m {
t.Log("Read mismatch", n, m)
}
@ -189,7 +209,7 @@ func testHTTP(t *testing.T, seq []uint) {
}
func testTCP(t *testing.T, seq []uint) {
conn, err := net.Dial("tcp", ctx.listener.Addr().String())
conn, err := net.Dial("tcp", ctx.tcpAddr.String())
if err != nil {
t.Fatal("Dial failed", err)
}

View file

@ -20,13 +20,15 @@ const (
ForwardedHeader = "Forwarded"
)
// Additional protocols, base protocols are net.Dial networks.
//
// Known networks are "tcp", "tcp4" (IPv4-only), "tcp6" (IPv6-only), "udp",
// "udp4" (IPv4-only), "udp6" (IPv6-only), "ip", "ip4" (IPv4-only), "ip6" (IPv6-only),
// "unix", "unixgram" and "unixpacket".
const (
HTTPProtocol = "http"
HTTP = "http"
TCP = "tcp"
TCP4 = "tcp4"
TCP6 = "tcp6"
UNIX = "unix"
)
// ControlMessage is sent from server to client to establish tunneled connection.

View file

@ -3,9 +3,38 @@ package tunnel
import (
"io"
"github.com/koding/logging"
"github.com/mmatczuk/tunnel/proto"
)
// ProxyFunc is responsible for forwarding a remote connection to local server
// and writing the response.
type ProxyFunc func(w io.Writer, r io.ReadCloser, msg *proto.ControlMessage)
// ProxyFuncs is a collection of ProxyFunc.
type ProxyFuncs struct {
// HTTP is custom implementation of HTTP proxing.
HTTP ProxyFunc
// TCP is custom implementation of TCP proxing.
TCP ProxyFunc
}
// Proxy returns a ProxyFunc that uses custom function if provided.
func Proxy(p ProxyFuncs) ProxyFunc {
return func(w io.Writer, r io.ReadCloser, msg *proto.ControlMessage) {
var f ProxyFunc
switch msg.Protocol {
case proto.HTTP:
f = p.HTTP
case proto.TCP, proto.TCP4, proto.TCP6, proto.UNIX:
f = p.TCP
}
if f == nil {
logging.Error("Could not determine proxy function for %v", msg)
return
}
f(w, r, msg)
}
}

View file

@ -1,7 +1,6 @@
package tunnel
import (
"bufio"
"crypto/tls"
"fmt"
"io"
@ -135,7 +134,7 @@ func (s *Server) handleClient(conn net.Conn) {
goto reject
}
req, err = http.NewRequest(http.MethodConnect, url(client.Host), nil)
req, err = http.NewRequest(http.MethodConnect, clientURL(client.Host), nil)
if err != nil {
s.log.Error("Invalid host %q for client %q", client.Host, client.ID)
goto reject
@ -227,7 +226,7 @@ func (s *Server) listen(l net.Listener, client *AllowedClient) {
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
msg := &proto.ControlMessage{
Action: proto.RequestClientSession,
Protocol: proto.HTTPProtocol,
Protocol: proto.HTTP,
ForwardedFor: r.RemoteAddr,
ForwardedBy: r.Host,
URLPath: r.URL.Path,
@ -255,7 +254,7 @@ func (s *Server) proxyHTTP(host string, w http.ResponseWriter, r *http.Request,
defer pr.Close()
defer pw.Close()
req, err := http.NewRequest(http.MethodPut, url(host), pr)
req, err := http.NewRequest(http.MethodPut, clientURL(host), pr)
if err != nil {
return fmt.Errorf("request creation error: %s", err)
}
@ -277,14 +276,10 @@ func (s *Server) proxyHTTP(host string, w http.ResponseWriter, r *http.Request,
return fmt.Errorf("proxy request error: %s", err)
}
inner, err := http.ReadResponse(bufio.NewReader(resp.Body), r)
if err != nil {
return fmt.Errorf("reading response error: %s", err)
}
copyHeader(w.Header(), inner.Header)
w.WriteHeader(inner.StatusCode)
if inner.Body != nil {
transfer("remote to local", w, inner.Body)
copyHeader(w.Header(), resp.Header)
w.WriteHeader(resp.StatusCode)
if resp.Body != nil {
transfer("remote to local", w, resp.Body)
}
<-done
@ -300,7 +295,7 @@ func (s *Server) proxyConn(host string, c net.Conn, msg *proto.ControlMessage) e
defer pr.Close()
defer pw.Close()
req, err := http.NewRequest(http.MethodPut, url(host), pr)
req, err := http.NewRequest(http.MethodPut, clientURL(host), pr)
if err != nil {
return fmt.Errorf("request creation error: %s", err)
}

View file

@ -9,73 +9,88 @@ import (
"github.com/mmatczuk/tunnel/proto"
)
var (
tpcLog = logging.NewLogger("tcp")
)
// TCPProxy forwards TCP streams.
type TCPProxy struct {
// LocalAddr defines TCP address of the local server.
LocalAddr string
// LocalAddrMap specifies a mapping from ControlMessage.ForwardedBy port
// to local server. If port is not found then if LocalAddr is not empty
// it will be used as a default otherwise connection will be closed.
LocalAddrMap map[string]string
// Log specifies the logger. If nil a default logging.Logger is used.
// Log is the proxy logger.
Log logging.Logger
// localAddr specifies default TCP address of the local server.
localAddr string
// localAddrMap specifies mapping from ControlMessage ForwardedBy to
// local server address, keys may contain host and port, only host or
// only port. The order of precedence is the following
// * host and port
// * port
// * host
localAddrMap map[string]string
}
// NewTCPProxy creates new direct TCPProxy, everything will be proxied to
// localAddr.
func NewTCPProxy(localAddr string) *TCPProxy {
return &TCPProxy{
Log: logging.NewLogger("tcpproxy"),
localAddr: localAddr,
}
}
// NewMultiTCPProxy creates a new dispatching TCPProxy, connections may go to
// different backends based on localAddrMap, see TCPProxy localAddrMap docs for
// more details.
func NewMultiTCPProxy(localAddrMap map[string]string) *TCPProxy {
return &TCPProxy{
Log: logging.NewLogger("tcpproxy"),
localAddrMap: localAddrMap,
}
}
// Proxy is a ProxyFunc.
func (p *TCPProxy) Proxy(w io.Writer, r io.ReadCloser, msg *proto.ControlMessage) {
w = flushWriter{w}
if msg.Protocol != "tcp" {
panic(fmt.Sprintf("Expected proxy protocol, got %s", msg.Protocol))
}
var log = p.log()
_, port, err := net.SplitHostPort(msg.ForwardedBy)
if err != nil {
log.Error("Failed to parse input address: %s", err)
target := p.localAddrFor(msg.ForwardedBy)
if target == "" {
p.Log.Warning("Failed to get local address")
return
}
localAddr := p.localAddr(port)
if localAddr == "" {
log.Warning("Failed to get local address for port %q", port)
return
}
log.Debug("Dialing local server: %q", localAddr)
local, err := net.DialTimeout("tcp", localAddr, DefaultDialTimeout)
local, err := net.DialTimeout("tcp", target, DefaultDialTimeout)
if err != nil {
log.Error("Dialing local server %q failed: %s", localAddr, err)
p.Log.Error("Dialing local server %q failed: %s", target, err)
return
}
done := make(chan struct{})
go func() {
transfer("local to remote", local, r)
transfer("local to remote", w, local)
close(done)
}()
transfer("remote to local", w, local)
transfer("remote to local", local, r)
}
func (p *TCPProxy) localAddr(port string) string {
if p.LocalAddrMap == nil {
return p.LocalAddr
func (p *TCPProxy) localAddrFor(hostPort string) string {
if p.localAddrMap == nil {
return p.localAddr
}
addr, ok := p.LocalAddrMap[port]
if !ok {
return p.LocalAddr
// try host and port
if addr := p.localAddrMap[hostPort]; addr != "" {
return addr
}
return addr
}
func (p *TCPProxy) log() logging.Logger {
if p.Log != nil {
return p.Log
}
return tpcLog
// try port
host, port, _ := net.SplitHostPort(hostPort)
if addr := p.localAddrMap[port]; addr != "" {
return addr
}
// try host
if addr := p.localAddrMap[host]; addr != "" {
return addr
}
return p.localAddr
}

View file

@ -3,13 +3,13 @@
package tunneltest
import (
"bufio"
"bytes"
"crypto/tls"
"fmt"
"io"
"io/ioutil"
"mime"
"net"
"net/http"
"os"
"path"
@ -20,44 +20,32 @@ import (
"github.com/mmatczuk/tunnel/proto"
)
// EchoProxyFunc pipes reader with writer.
func EchoProxyFunc(w io.Writer, r io.ReadCloser, msg *proto.ControlMessage) {
switch msg.Protocol {
case proto.HTTPProtocol:
EchoHTTPProxyFunc(w, r, msg)
default:
io.Copy(w, r)
}
// EchoHTTP starts serving HTTP requests on listener l, it accepts connections,
// reads request body and writes is back in response.
func EchoHTTP(l net.Listener) {
http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if r.Body != nil {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
panic(err)
}
w.Write(body)
}
}))
}
// EchoHTTPProxyFunc is a special case of EchoProxyFunc that handles HTTP
// request response model.
func EchoHTTPProxyFunc(w io.Writer, r io.ReadCloser, msg *proto.ControlMessage) {
req, err := http.ReadRequest(bufio.NewReader(r))
if err != nil {
panic(err)
}
resp := &http.Response{
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.0",
ProtoMajor: 1,
ProtoMinor: 0,
Request: req,
Header: make(http.Header),
}
if req.Body != nil {
body, err := ioutil.ReadAll(req.Body)
// EchoTCP accepts connections and copies back received bytes.
func EchoTCP(l net.Listener) {
for {
conn, err := l.Accept()
if err != nil {
panic(err)
return
}
resp.ContentLength = int64(len(body))
resp.Body = ioutil.NopCloser(bytes.NewReader(body))
go func() {
io.Copy(conn, conn)
}()
}
resp.Write(w)
}
// InMemoryFileServer scans directory dir, loads all files to memory and returns

View file

@ -8,7 +8,7 @@ import (
"github.com/koding/logging"
)
func url(host string) string {
func clientURL(host string) string {
return fmt.Sprint("https://", host)
}