Merge pull request #8 from mmatczuk/mmt/logger

logging refactoring
This commit is contained in:
Michał Matczuk 2017-01-27 22:58:57 +01:00 committed by GitHub
commit fbbe155651
19 changed files with 620 additions and 144 deletions

View file

@ -24,4 +24,4 @@ script:
- gofmt -s -l . | ifne false
- go vet ./...
- $HOME/gopath/bin/golint ./...
- go test -cover -race . ./proto
- go test -race ./...

18
TODO.md
View file

@ -9,4 +9,20 @@
1. URL prefix based routing, like urlprefix tag in fabio https://github.com/eBay/fabio/wiki/Quickstart
1. Use of `sync.Pool` to avoid allocations of `ControlMessage`
1. Stream compression
1. UDP and IP proxing
1. UDP and IP proxing
1. Add prometheus.io integration, replace transfer logs with gauges
Not to forget README features
1. HTTP/2
1. server http.RoundTriper
1. extensible Proxy architecture
1. configurable HTTP proxy httputil.ReverseProxy
1. structured logs, go kit logger compatible
Log levels:
* 0 - Critical, error something went really wrong
* 1 - Info, something important happened
* 2 - Debug
* 3 - Trace, reserved for data transfer logs

139
client.go
View file

@ -8,9 +8,10 @@ import (
"sync"
"time"
"github.com/koding/logging"
"github.com/mmatczuk/tunnel/proto"
"golang.org/x/net/http2"
"github.com/mmatczuk/tunnel/log"
"github.com/mmatczuk/tunnel/proto"
)
var (
@ -34,8 +35,8 @@ type ClientConfig struct {
// Proxy is ProxyFunc responsible for transferring data between server
// and local services.
Proxy ProxyFunc
// Log specifies the logger. If nil a default logging.Logger is used.
Log logging.Logger
// Logger is optional logger. If nil no logs will be printed.
Logger log.Logger
}
// Backoff defines behavior of staggering reconnection retries.
@ -58,21 +59,31 @@ type Client struct {
conn net.Conn
connMu sync.Mutex
httpServer *http2.Server
log logging.Logger
logger log.Logger
}
// NewClient creates a new unconnected Client based on configuration. Caller
// must invoke Start() on returned instance in order to connect server.
func NewClient(config *ClientConfig) *Client {
log := logging.NewLogger("client")
if config.Log != nil {
log = config.Log
if config.ServerAddr == "" {
panic("Missing ServerAddr")
}
if config.TLSClientConfig == nil {
panic("Missing TLSClientConfig")
}
if config.Proxy == nil {
panic("Missing Proxy")
}
logger := config.Logger
if logger == nil {
logger = log.NewNopLogger()
}
c := &Client{
config: config,
httpServer: &http2.Server{},
log: log,
logger: logger,
}
return c
@ -85,8 +96,16 @@ func (c *Client) Start() error {
c.connMu.Lock()
defer c.connMu.Unlock()
c.log.Info("Connecting to %q", c.config.ServerAddr)
conn, err := c.dial("tcp", c.config.ServerAddr, c.config.TLSClientConfig)
c.logger.Log(
"level", 1,
"action", "start",
)
if c.conn != nil {
return fmt.Errorf("already connected")
}
conn, err := c.dial()
if err != nil {
return fmt.Errorf("failed to connect to server: %s", err)
}
@ -99,12 +118,41 @@ func (c *Client) Start() error {
return nil
}
func (c *Client) dial(network, addr string, config *tls.Config) (net.Conn, error) {
doDial := func() (net.Conn, error) {
func (c *Client) dial() (net.Conn, error) {
var (
network = "tcp"
addr = c.config.ServerAddr
tlsConfig = c.config.TLSClientConfig
)
doDial := func() (conn net.Conn, err error) {
c.logger.Log(
"level", 1,
"action", "dial",
"network", network,
"addr", addr,
)
if c.config.DialTLS != nil {
return c.config.DialTLS(network, addr, config)
conn, err = c.config.DialTLS(network, addr, tlsConfig)
} else {
conn, err = tls.DialWithDialer(
&net.Dialer{Timeout: DefaultDialTimeout},
network, addr, tlsConfig,
)
}
return tls.DialWithDialer(&net.Dialer{Timeout: DefaultDialTimeout}, network, addr, config)
if err != nil {
c.logger.Log(
"level", 0,
"action", "dial failed",
"network", network,
"addr", addr,
"err", err,
)
}
return
}
b := c.config.Backoff
@ -114,55 +162,92 @@ func (c *Client) dial(network, addr string, config *tls.Config) (net.Conn, error
for {
conn, err := doDial()
// success
if err == nil {
b.Reset()
return conn, err
}
d := b.NextBackOff()
// failure
d := b.NextBackOff()
if d < 0 {
return conn, fmt.Errorf("backoff limit exeded: %s", err)
}
// backoff
c.logger.Log(
"level", 1,
"action", "backoff",
"network", network,
"addr", addr,
"sleep", d,
)
time.Sleep(d)
}
}
func (c *Client) serveHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodConnect {
c.log.Info("Connected to server: %s", r.RemoteAddr)
http.Error(w, "Nice to see you", http.StatusOK)
c.handleHandshake(w, r)
return
}
msg, err := proto.ParseControlMessage(r.Header)
if err != nil {
c.log.Warning("Parsing control message failed: %s", err)
c.logger.Log(
"level", 1,
"err", err,
)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
c.log.Debug("Start %s", msg)
c.logger.Log(
"level", 2,
"action", "handle",
"ctrlMsg", msg,
)
switch msg.Action {
case proto.Proxy:
c.config.Proxy(w, r.Body, msg)
default:
c.log.Warning("Unknown action: %s", msg)
c.logger.Log(
"level", 0,
"msg", "unknown action",
"ctrlMsg", msg,
)
http.Error(w, err.Error(), http.StatusBadRequest)
}
c.log.Debug("Done %s", msg)
c.logger.Log(
"level", 2,
"action", "done",
"ctrlMsg", msg,
)
}
func (c *Client) handleHandshake(w http.ResponseWriter, r *http.Request) {
c.logger.Log(
"level", 1,
"action", "handshake",
"addr", r.RemoteAddr,
)
http.Error(w, "Nice to see you", http.StatusOK)
}
// Stop closes the connection between client and server. After stopping client
// can be started again.
func (c *Client) Stop() error {
func (c *Client) Stop() {
c.connMu.Lock()
defer c.connMu.Unlock()
if c.conn == nil {
return nil
c.logger.Log(
"level", 1,
"action", "stop",
)
if c.conn != nil {
c.conn.Close()
}
c.httpServer = nil
return c.conn.Close()
c.conn = nil
}

View file

@ -16,12 +16,15 @@ func TestClient_Dial(t *testing.T) {
s := httptest.NewTLSServer(nil)
defer s.Close()
c := NewClient(&ClientConfig{})
addr := s.Listener.Addr().String()
conn, err := c.dial("tcp", addr, &tls.Config{
InsecureSkipVerify: true,
c := NewClient(&ClientConfig{
ServerAddr: s.Listener.Addr().String(),
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
Proxy: Proxy(ProxyFuncs{}),
})
conn, err := c.dial()
if err != nil {
t.Fatal("Dial error", err)
}
@ -46,12 +49,15 @@ func TestClient_DialBackoff(t *testing.T) {
}
c := NewClient(&ClientConfig{
DialTLS: d,
Backoff: b,
ServerAddr: "8.8.8.8",
TLSClientConfig: &tls.Config{},
DialTLS: d,
Backoff: b,
Proxy: Proxy(ProxyFuncs{}),
})
start := time.Now()
_, err := c.dial("tcp", "8.8.8.8", nil)
_, err := c.dial()
end := time.Now()
if end.Sub(start) < 100*time.Millisecond {

View file

@ -11,15 +11,13 @@ import (
"net/url"
"strings"
"github.com/koding/logging"
"github.com/mmatczuk/tunnel/log"
"github.com/mmatczuk/tunnel/proto"
)
// HTTPProxy forwards HTTP traffic.
type HTTPProxy struct {
httputil.ReverseProxy
// Log is the proxy logger.
Log logging.Logger
// localURL specifies default base URL of local service.
localURL *url.URL
// localURLMap specifies mapping from ControlMessage ForwardedBy to
@ -29,18 +27,24 @@ type HTTPProxy struct {
// * port
// * host
localURLMap map[string]*url.URL
// logger is the proxy logger.
logger log.Logger
}
// NewHTTPProxy creates a new direct HTTPProxy, everything will be proxied to
// localURL.
func NewHTTPProxy(localURL *url.URL) *HTTPProxy {
func NewHTTPProxy(localURL *url.URL, logger log.Logger) *HTTPProxy {
if localURL == nil {
panic("Missing URL")
panic("Empty localURL")
}
if logger == nil {
logger = log.NewNopLogger()
}
p := &HTTPProxy{
Log: logging.NewLogger("httpproxy"),
localURL: localURL,
logger: logger,
}
p.ReverseProxy.Director = p.Director
@ -50,14 +54,18 @@ func NewHTTPProxy(localURL *url.URL) *HTTPProxy {
// NewMultiHTTPProxy creates a new dispatching HTTPProxy, requests may go to
// different backends based on localURLMap, see HTTPProxy localURLMap docs for
// more details.
func NewMultiHTTPProxy(localURLMap map[string]*url.URL) *HTTPProxy {
func NewMultiHTTPProxy(localURLMap map[string]*url.URL, logger log.Logger) *HTTPProxy {
if localURLMap == nil || len(localURLMap) == 0 {
panic("Missing URL map")
panic("Empty localURLMap")
}
if logger == nil {
logger = log.NewNopLogger()
}
p := &HTTPProxy{
Log: logging.NewLogger("httpproxy"),
localURLMap: localURLMap,
logger: logger,
}
p.ReverseProxy.Director = p.Director
@ -77,7 +85,11 @@ func (p *HTTPProxy) Proxy(w io.Writer, r io.ReadCloser, msg *proto.ControlMessag
req, err := http.ReadRequest(bufio.NewReader(r))
if err != nil {
p.Log.Warning("Failed to read request: %s", err)
p.logger.Log(
"level", 1,
"msg", "failed to read request",
"err", err,
)
return
}
req.URL.Host = msg.ForwardedBy
@ -91,8 +103,15 @@ func (p *HTTPProxy) Proxy(w io.Writer, r io.ReadCloser, msg *proto.ControlMessag
func (p *HTTPProxy) Director(req *http.Request) {
target := p.localURLFor(req.URL)
if target == nil {
p.logger.Log(
"level", 1,
"msg", "no target",
"url", req.URL,
)
_, cancel := context.WithCancel(req.Context())
cancel()
return
}

View file

@ -15,6 +15,7 @@ import (
"github.com/andrew-d/id"
"github.com/mmatczuk/tunnel"
"github.com/mmatczuk/tunnel/log"
"github.com/mmatczuk/tunnel/tunneltest"
)
@ -36,6 +37,8 @@ type testContext struct {
var ctx testContext
func TestMain(m *testing.M) {
logger := log.NewFilterLogger(log.NewStdLogger(), 1)
// prepare server TCP listener
serverTCPListener, err := net.Listen("tcp", ":0")
if err != nil {
@ -46,6 +49,7 @@ func TestMain(m *testing.M) {
// prepare tunnel server
cert, id := selfSignedCert()
s, err := tunnel.NewServer(&tunnel.ServerConfig{
Addr: ":0",
TLSConfig: tunneltest.TLSConfig(cert),
AllowedClients: []*tunnel.AllowedClient{
{
@ -54,12 +58,13 @@ func TestMain(m *testing.M) {
Listeners: []net.Listener{serverTCPListener},
},
},
Logger: log.NewContext(logger).WithPrefix("server", ":"),
})
if err != nil {
panic(err)
}
s.Start()
defer s.Close()
defer s.Stop()
// run server HTTP interface
serverHTTPListener, err := net.Listen("tcp", ":0")
@ -91,10 +96,12 @@ func TestMain(m *testing.M) {
Scheme: "http",
Host: echoHTTPListener.Addr().String(),
},
})
}, log.NewNopLogger())
tcpproxy := tunnel.NewMultiTCPProxy(map[string]string{
port(serverTCPListener.Addr()): echoTCPListener.Addr().String(),
})
}, log.NewNopLogger())
proxy := tunnel.Proxy(tunnel.ProxyFuncs{
HTTP: httpproxy.Proxy,
TCP: tcpproxy.Proxy,
@ -105,6 +112,7 @@ func TestMain(m *testing.M) {
ServerAddr: s.Addr(),
TLSClientConfig: tunneltest.TLSConfig(cert),
Proxy: proxy,
Logger: log.NewContext(logger).WithPrefix("client", ":"),
})
if err := c.Start(); err != nil {
panic(err)

44
log/filterlogger.go Normal file
View file

@ -0,0 +1,44 @@
package log
type filterLogger struct {
level int
logger Logger
}
// NewFilterLogger returns a Logger that accepts only log messages with
// "level" value <= level. Currently there are four levels 0 - error, 1 - info,
// 2 - debug, 3 - trace.
func NewFilterLogger(logger Logger, level int) Logger {
return filterLogger{
level: level,
logger: logger,
}
}
func (p filterLogger) Log(keyvals ...interface{}) error {
for i := 0; i < len(keyvals); i += 2 {
k := keyvals[i]
s, ok := k.(string)
if !ok {
continue
}
if s != "level" {
continue
}
if i+1 >= len(keyvals) {
break
}
v := keyvals[i+1]
level, ok := v.(int)
if !ok {
break
}
if level > p.level {
return nil
}
}
return p.logger.Log(keyvals...)
}

25
log/filterlogger_test.go Normal file
View file

@ -0,0 +1,25 @@
package log
import (
"testing"
"github.com/golang/mock/gomock"
"github.com/mmatczuk/tunnel/mock"
)
func TestFilterLogger_Log(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := mock.NewMockLogger(ctrl)
f := NewFilterLogger(b, 2)
b.EXPECT().Log("level", 0)
f.Log("level", 0)
b.EXPECT().Log("level", 1)
f.Log("level", 1)
b.EXPECT().Log("level", 2)
f.Log("level", 2)
f.Log("level", 3)
f.Log("level", 4)
}

54
log/log.go Normal file
View file

@ -0,0 +1,54 @@
package log
// Logger is the fundamental interface for all log operations. Log creates a
// log event from keyvals, a variadic sequence of alternating keys and values.
// Implementations must be safe for concurrent use by multiple goroutines. In
// particular, any implementation of Logger that appends to keyvals or
// modifies any of its elements must make a copy first.
type Logger interface {
Log(keyvals ...interface{}) error
}
// Context is simplified version of
// [go-kit log Context](https://godoc.org/github.com/go-kit/kit/log#Context).
type Context struct {
prefix []interface{}
suffix []interface{}
logger Logger
}
// NewContext returns a logger that adds prefix before keyvals.
func NewContext(logger Logger) *Context {
return &Context{
prefix: make([]interface{}, 0),
suffix: make([]interface{}, 0),
logger: logger,
}
}
// With returns a new Context with keyvals appended to those of the receiver.
func (c *Context) With(keyvals ...interface{}) *Context {
return &Context{
prefix: c.prefix,
suffix: append(c.suffix, keyvals...),
logger: c.logger,
}
}
// WithPrefix returns a new Context with keyvals prepended to those of the
// receiver.
func (c *Context) WithPrefix(keyvals ...interface{}) *Context {
return &Context{
prefix: append(c.prefix, keyvals...),
suffix: c.suffix,
logger: c.logger,
}
}
// Log adds prefix and suffix to keyvals and calls internal logger.
func (c *Context) Log(keyvals ...interface{}) error {
var s []interface{}
s = append(c.prefix, keyvals...)
s = append(s, c.suffix...)
return c.logger.Log(s...)
}

23
log/log_test.go Normal file
View file

@ -0,0 +1,23 @@
package log
import (
"testing"
"github.com/golang/mock/gomock"
"github.com/mmatczuk/tunnel/mock"
)
func TestContext_Log(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
b := mock.NewMockLogger(ctrl)
b.EXPECT().Log("key", "val", "sufix", "")
NewContext(b).With("sufix", "").Log("key", "val")
b.EXPECT().Log("prefix", "", "key", "val")
NewContext(b).WithPrefix("prefix", "").Log("key", "val")
b.EXPECT().Log("prefix", "", "key", "val", "sufix", "")
NewContext(b).With("sufix", "").WithPrefix("prefix", "").Log("key", "val")
}

8
log/noplogger.go Normal file
View file

@ -0,0 +1,8 @@
package log
type nopLogger struct{}
// NewNopLogger returns a logger that doesn't do anything.
func NewNopLogger() Logger { return nopLogger{} }
func (nopLogger) Log(...interface{}) error { return nil }

15
log/stdlogger.go Normal file
View file

@ -0,0 +1,15 @@
package log
import (
"log"
)
type stdLogger struct{}
// NewStdLogger returns logger based on standard "log" package.
func NewStdLogger() Logger { return stdLogger{} }
func (p stdLogger) Log(keyvals ...interface{}) error {
log.Println(keyvals...)
return nil
}

43
mock/logger.go Normal file
View file

@ -0,0 +1,43 @@
// Automatically generated by MockGen. DO NOT EDIT!
// Source: github.com/mmatczuk/tunnel/log (interfaces: Logger)
package mock
import (
gomock "github.com/golang/mock/gomock"
)
// Mock of Logger interface
type MockLogger struct {
ctrl *gomock.Controller
recorder *_MockLoggerRecorder
}
// Recorder for MockLogger (not exported)
type _MockLoggerRecorder struct {
mock *MockLogger
}
func NewMockLogger(ctrl *gomock.Controller) *MockLogger {
mock := &MockLogger{ctrl: ctrl}
mock.recorder = &_MockLoggerRecorder{mock}
return mock
}
func (_m *MockLogger) EXPECT() *_MockLoggerRecorder {
return _m.recorder
}
func (_m *MockLogger) Log(_param0 ...interface{}) error {
_s := []interface{}{}
for _, _x := range _param0 {
_s = append(_s, _x)
}
ret := _m.ctrl.Call(_m, "Log", _s...)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockLoggerRecorder) Log(arg0 ...interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Log", arg0...)
}

View file

@ -40,10 +40,6 @@ type ControlMessage struct {
ForwardedBy string
}
func (c *ControlMessage) String() string {
return fmt.Sprintf("%s %s: %s<-%s", c.Action, c.Protocol, c.ForwardedBy, c.ForwardedFor)
}
var xffRegexp = regexp.MustCompile("(for|proto|by)=([^;$]+)")
// ParseControlMessage creates new ControlMessage based on "Forwarded" http

View file

@ -3,7 +3,6 @@ package tunnel
import (
"io"
"github.com/koding/logging"
"github.com/mmatczuk/tunnel/proto"
)
@ -31,7 +30,6 @@ func Proxy(p ProxyFuncs) ProxyFunc {
}
if f == nil {
logging.Error("Could not determine proxy function for %v", msg)
return
}

230
server.go
View file

@ -9,10 +9,11 @@ import (
"strings"
"time"
"github.com/andrew-d/id"
"github.com/koding/logging"
"github.com/mmatczuk/tunnel/proto"
"golang.org/x/net/http2"
"github.com/andrew-d/id"
"github.com/mmatczuk/tunnel/log"
"github.com/mmatczuk/tunnel/proto"
)
// AllowedClient specifies client entry points on server.
@ -37,8 +38,8 @@ type ServerConfig struct {
Listener net.Listener
// AllowedClients specifies clients that can connect to the server.
AllowedClients []*AllowedClient
// Log specifies the logger. If nil a default logging.Logger is used.
Log logging.Logger
// Logger is optional logger. If nil no logs will be printed.
Logger log.Logger
}
// Server is responsible for proxying public connections to the client over a
@ -48,31 +49,31 @@ type Server struct {
listener net.Listener
connPool *connPool
httpClient *http.Client
log logging.Logger
logger log.Logger
}
// NewServer creates a new Server.
func NewServer(config *ServerConfig) (*Server, error) {
l, err := listener(config)
listener, err := listener(config)
if err != nil {
return nil, fmt.Errorf("tls listener failed :%s", err)
}
t := &http2.Transport{}
p := newConnPool(t)
t.ConnPool = p
pool := newConnPool(t)
t.ConnPool = pool
log := logging.NewLogger("server")
if config.Log != nil {
log = config.Log
logger := config.Logger
if logger == nil {
logger = log.NewNopLogger()
}
return &Server{
config: config,
listener: l,
connPool: p,
listener: listener,
connPool: pool,
httpClient: &http.Client{Transport: t},
log: log,
logger: logger,
}, nil
}
@ -81,17 +82,24 @@ func listener(config *ServerConfig) (net.Listener, error) {
return config.Listener, nil
}
addr := ":0"
if config.Addr != "" {
addr = config.Addr
if config.Addr == "" {
panic("Missing Addr")
}
if config.TLSConfig == nil {
panic("Missing TLSConfig")
}
return tls.Listen("tcp", addr, config.TLSConfig)
return tls.Listen("tcp", config.Addr, config.TLSConfig)
}
// Start starts accepting connections form clients and allowed clients listeners.
// For accepting http traffic one must run server as a handler to http server.
func (s *Server) Start() {
s.logger.Log(
"level", 1,
"action", "start",
"addr", s.listener.Addr(),
)
go s.listenControl()
s.listenClientListeners()
}
@ -100,21 +108,31 @@ func (s *Server) listenControl() {
for {
conn, err := s.listener.Accept()
if err != nil {
s.log.Warning("Accept %s control connection to %q failed: %s",
s.listener.Addr().Network(), s.listener.Addr().String(), err)
s.logger.Log(
"level", 2,
"msg", "accept control connection failed",
"err", err,
)
if strings.Contains(err.Error(), "use of closed network connection") {
return
}
continue
}
s.log.Info("Accepted %s control connection from %q to %q",
s.listener.Addr().Network(), conn.RemoteAddr(), s.listener.Addr().String())
go s.handleClient(conn)
}
}
func (s *Server) handleClient(conn net.Conn) {
logger := log.NewContext(s.logger).With("addr", conn.RemoteAddr())
logger.Log(
"level", 1,
"action", "try connect",
)
var (
id id.ID
client *AllowedClient
req *http.Request
resp *http.Response
@ -122,49 +140,98 @@ func (s *Server) handleClient(conn net.Conn) {
ok bool
)
id, err := peerID(conn.(*tls.Conn))
if err != nil {
s.log.Warning("Certificate error: %s", err)
tlsConn, ok := conn.(*tls.Conn)
if !ok {
logger.Log(
"level", 0,
"msg", "invalid connection type",
"err", fmt.Errorf("expected tls conn, got %T", conn),
)
goto reject
}
id, err = peerID(tlsConn)
if err != nil {
logger.Log(
"level", 2,
"msg", "certificate error",
"err", err,
)
goto reject
}
logger = logger.With("id", id)
client, ok = s.checkID(id)
if !ok {
s.log.Warning("Unknown certificate: %q", id.String())
logger.Log(
"level", 2,
"msg", "unknown certificate",
)
goto reject
}
req, err = http.NewRequest(http.MethodConnect, clientURL(client.Host), nil)
if err != nil {
s.log.Error("Invalid host %q for client %q", client.Host, client.ID)
logger.Log(
"level", 2,
"msg", "handshake request creation failed",
"err", err,
)
goto reject
}
if err = conn.SetDeadline(time.Time{}); err != nil {
s.log.Warning("Setting no deadline failed: %s", err)
logger.Log(
"level", 2,
"msg", "setting infinite deadline failed",
"err", err,
)
// recoverable
}
if err := s.connPool.addHostConn(client.Host, conn); err != nil {
s.log.Warning("Could not add host: %s", err)
logger.Log(
"level", 2,
"msg", "adding host failed",
"host", client.Host,
"err", err,
)
goto reject
}
resp, err = s.httpClient.Do(req)
if err != nil {
s.log.Warning("Handshake failed %s", err)
logger.Log(
"level", 2,
"msg", "handshake failed",
"err", err,
)
goto reject
}
if resp.StatusCode != http.StatusOK {
s.log.Warning("Handshake failed")
logger.Log(
"level", 2,
"msg", "handshake failed",
"err", fmt.Errorf("Status %s", resp.Status),
)
goto reject
}
s.log.Info("Connected to client %s (%q)", conn.RemoteAddr(), client.ID)
logger.Log(
"level", 1,
"action", "connected",
)
return
reject:
s.logger.Log(
"level", 1,
"action", "rejected",
"addr", conn.RemoteAddr(),
)
conn.Close()
if client != nil {
s.connPool.markHostDead(client.Host)
@ -196,15 +263,16 @@ func (s *Server) listen(l net.Listener, client *AllowedClient) {
for {
conn, err := l.Accept()
if err != nil {
s.log.Warning("Accept %s connection to %q failed: %s",
s.listener.Addr().Network(), s.listener.Addr().String(), err)
s.logger.Log(
"level", 2,
"msg", "accept connection failed",
"err", err,
)
if strings.Contains(err.Error(), "use of closed network connection") {
return
}
continue
}
s.log.Debug("Accepted %s connection from %q to %q",
l.Addr().Network(), conn.RemoteAddr(), l.Addr().String())
msg := &proto.ControlMessage{
Action: proto.Proxy,
@ -212,33 +280,46 @@ func (s *Server) listen(l net.Listener, client *AllowedClient) {
ForwardedFor: conn.RemoteAddr().String(),
ForwardedBy: l.Addr().String(),
}
go func() {
err := s.proxyConn(client.Host, conn, msg)
if err != nil {
s.log.Warning("Error %s: %s", msg, err)
}
}()
go s.proxyConn(client.Host, conn, msg)
}
}
func (s *Server) proxyConn(host string, conn net.Conn, msg *proto.ControlMessage) error {
func (s *Server) proxyConn(host string, conn net.Conn, msg *proto.ControlMessage) {
s.logger.Log(
"level", 2,
"action", "proxy",
"ctrlMsg", msg,
)
pr, pw := io.Pipe()
defer pr.Close()
defer pw.Close()
req := proxyRequest(host, msg, pr)
req := clientRequest(host, msg, pr)
go transfer("local to remote", pw, conn)
go transfer(pw, conn, log.NewContext(s.logger).With(
"dir", "user to client",
"dst", host,
"src", conn.RemoteAddr(),
))
resp, err := s.httpClient.Do(req)
if err != nil {
return fmt.Errorf("proxy request error: %s", err)
s.logger.Log(
"level", 0,
"msg", "proxy error",
"ctrlMsg", msg,
"err", err,
)
conn.Close()
return
}
transfer("remote to local", conn, resp.Body)
return nil
transfer(conn, resp.Body, log.NewContext(s.logger).With(
"dir", "client to user",
"dst", conn.RemoteAddr(),
"src", host,
))
}
// ServeHTTP proxies http connection to the client.
@ -251,7 +332,11 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
copyHeader(w.Header(), resp.Header)
w.WriteHeader(resp.StatusCode)
if resp.Body != nil {
transfer("remote to local", w, resp.Body)
transfer(w, resp.Body, log.NewContext(s.logger).With(
"dir", "client to user",
"dst", r.RemoteAddr,
"src", r.Host,
))
}
}
@ -267,19 +352,39 @@ func (s *Server) RoundTrip(r *http.Request) (*http.Response, error) {
}
func (s *Server) proxyHTTP(host string, r *http.Request, msg *proto.ControlMessage) (*http.Response, error) {
s.logger.Log(
"level", 2,
"action", "proxy",
"ctrlMsg", msg,
)
pr, pw := io.Pipe()
defer pr.Close()
defer pw.Close()
req := proxyRequest(host, msg, pr)
req := clientRequest(host, msg, pr)
go func() {
cw := &countWriter{pw, 0}
err := r.Write(cw)
if err != nil {
s.log.Error("Write to pipe failed: %s", err)
s.logger.Log(
"level", 0,
"msg", "proxy error",
"ctrlMsg", msg,
"err", err,
)
}
TransferLog.Debug("Coppied %d bytes from %s", cw.count, "local to remote")
s.logger.Log(
"level", 3,
"action", "transfered",
"bytes", cw.count,
"dir", "user to client",
"dst", r.Host,
"src", r.RemoteAddr,
)
if r.Body != nil {
r.Body.Close()
}
@ -301,11 +406,14 @@ func (s *Server) Addr() string {
return s.listener.Addr().String()
}
// Close closes the server.
func (s *Server) Close() error {
if s.listener == nil {
return nil
}
// Stop closes the server.
func (s *Server) Stop() {
s.logger.Log(
"level", 1,
"action", "stop",
)
return s.listener.Close()
if s.listener != nil {
s.listener.Close()
}
}

View file

@ -5,14 +5,12 @@ import (
"io"
"net"
"github.com/koding/logging"
"github.com/mmatczuk/tunnel/log"
"github.com/mmatczuk/tunnel/proto"
)
// TCPProxy forwards TCP streams.
type TCPProxy struct {
// Log is the proxy logger.
Log logging.Logger
// localAddr specifies default TCP address of the local server.
localAddr string
// localAddrMap specifies mapping from ControlMessage ForwardedBy to
@ -22,24 +20,42 @@ type TCPProxy struct {
// * port
// * host
localAddrMap map[string]string
// logger is the proxy logger.
logger log.Logger
}
// NewTCPProxy creates new direct TCPProxy, everything will be proxied to
// localAddr.
func NewTCPProxy(localAddr string) *TCPProxy {
func NewTCPProxy(localAddr string, logger log.Logger) *TCPProxy {
if localAddr == "" {
panic("Empty localAddr")
}
if logger == nil {
logger = log.NewNopLogger()
}
return &TCPProxy{
Log: logging.NewLogger("tcpproxy"),
localAddr: localAddr,
logger: logger,
}
}
// NewMultiTCPProxy creates a new dispatching TCPProxy, connections may go to
// different backends based on localAddrMap, see TCPProxy localAddrMap docs for
// more details.
func NewMultiTCPProxy(localAddrMap map[string]string) *TCPProxy {
func NewMultiTCPProxy(localAddrMap map[string]string, logger log.Logger) *TCPProxy {
if localAddrMap == nil || len(localAddrMap) == 0 {
panic("Empty localAddrMap")
}
if logger == nil {
logger = log.NewNopLogger()
}
return &TCPProxy{
Log: logging.NewLogger("tcpproxy"),
localAddrMap: localAddrMap,
logger: logger,
}
}
@ -53,22 +69,37 @@ func (p *TCPProxy) Proxy(w io.Writer, r io.ReadCloser, msg *proto.ControlMessage
target := p.localAddrFor(msg.ForwardedBy)
if target == "" {
p.Log.Warning("Failed to get local address")
p.logger.Log(
"level", 1,
"msg", "no target",
"host", msg.ForwardedBy,
)
return
}
local, err := net.DialTimeout("tcp", target, DefaultDialTimeout)
if err != nil {
p.Log.Error("Dialing local server %q failed: %s", target, err)
p.logger.Log(
"level", 0,
"msg", "dial failed",
"target", target,
"err", err,
)
return
}
done := make(chan struct{})
go func() {
transfer("local to remote", w, local)
transfer(w, local, log.NewContext(p.logger).With(
"dst", msg.ForwardedBy,
"src", target,
))
close(done)
}()
transfer("remote to local", local, r)
transfer(local, r, log.NewContext(p.logger).With(
"dst", target,
"src", msg.ForwardedBy,
))
}
func (p *TCPProxy) localAddrFor(hostPort string) string {

View file

@ -13,8 +13,6 @@ import (
"net/http"
"os"
"path/filepath"
"github.com/koding/logging"
)
// EchoHTTP starts serving HTTP requests on listener l, it accepts connections,
@ -93,9 +91,3 @@ func TLSConfig(cert tls.Certificate) *tls.Config {
c.BuildNameToCertificate()
return c
}
// DebugLogging makes koding logger print debug messages.
func DebugLogging() {
logging.DefaultLevel = logging.DEBUG
logging.DefaultHandler.SetLevel(logging.DEBUG)
}

View file

@ -6,11 +6,11 @@ import (
"net"
"net/http"
"github.com/koding/logging"
"github.com/mmatczuk/tunnel/log"
"github.com/mmatczuk/tunnel/proto"
)
func proxyRequest(host string, msg *proto.ControlMessage, r io.Reader) *http.Request {
func clientRequest(host string, msg *proto.ControlMessage, r io.Reader) *http.Request {
if msg.Action != proto.Proxy {
panic("Invalid action")
}
@ -43,13 +43,14 @@ type closeReader interface {
CloseRead() error
}
// TransferLog is a dedicated logger for reporting bytes read/written.
var TransferLog = logging.NewLogger("transfer")
func transfer(side string, dst io.Writer, src io.ReadCloser) {
func transfer(dst io.Writer, src io.ReadCloser, logger log.Logger) {
n, err := io.Copy(dst, src)
if err != nil {
TransferLog.Error("%s: copy error: %s", side, err)
logger.Log(
"level", 2,
"msg", "copy error",
"err", err,
)
}
if d, ok := dst.(closeWriter); ok {
@ -62,7 +63,11 @@ func transfer(side string, dst io.Writer, src io.ReadCloser) {
src.Close()
}
TransferLog.Debug("Coppied %d bytes from %s", n, side)
logger.Log(
"level", 3,
"action", "transfered",
"bytes", n,
)
}
func copyHeader(dst, src http.Header) {