make the multi backend async: restore-files is not working because it is too hard to make it fully async

This commit is contained in:
ziirish 2018-07-23 20:27:16 +02:00
parent 0a09d51c73
commit 29a7e68ccf
No known key found for this signature in database
GPG key ID: 72DB229A64B54E46
4 changed files with 132 additions and 115 deletions

View file

@ -300,11 +300,9 @@ class BUIAgent(BUIbackend):
async def receive_all(self, stream, length=1024):
buf = b''
bsize = 1024
bsize = min(1024, length)
received = 0
tries = 0
if length < bsize:
bsize = length
while received < length:
newbuf = await stream.receive_some(bsize)
if not newbuf:

View file

@ -9,6 +9,7 @@
"""
import select
import struct
import trio
from . import api
from ..server import BUIServer # noqa
@ -157,7 +158,7 @@ class Restore(Resource):
if not socket:
self.abort(500)
lengthbuf = socket.recv(8)
lengthbuf = trio.run(socket.receive_some, 8)
length, = struct.unpack('!Q', lengthbuf)
bui.client.logger.debug('Need to get {} Bytes : {}'.format(length, socket))
@ -176,15 +177,14 @@ class Restore(Resource):
read, _, _ = select.select([sock], [], [], 5)
if not read:
raise Exception('Socket timed-out')
buf += sock.recv(bsize)
buf += trio.run(sock.receive_some, bsize)
if not buf:
continue
received += len(buf)
self.logger.debug('{}/{}'.format(received, size))
yield buf
sock.sendall(struct.pack('!Q', 2))
sock.sendall(b'RE')
sock.close()
trio.run(sock.send_all, struct.pack('!Q', 2))
trio.run(sock.send_all, b'RE')
headers = Headers()
headers.add('Content-Disposition',

View file

@ -1,4 +1,5 @@
# -*- coding: utf8 -*-
import trio
# This is a submodule we can also use "from ..api import api"
from . import api, cache_key, force_refresh
@ -72,7 +73,7 @@ class ServersStats(Resource):
for serv in bui.client.servers:
try:
alive = bui.client.servers[serv].ping()
alive = trio.run(bui.client.servers[serv].ping)
except BUIserverException:
alive = False

View file

@ -1,9 +1,10 @@
# -*- coding: utf8 -*-
import re
import socket
import errno
import json
import struct
import trio
import ssl
from werkzeug.datastructures import ImmutableMultiDict as _ImmutableMultiDict
@ -61,7 +62,7 @@ class ProxyCall(object):
# Special case for network calls
if self.network:
data = {'func': self.method, 'args': encoded_args}
return json.loads(self.proxy.do_command(data))
return json.loads(trio.run(self.proxy.do_command, data))
# normal case for "standard" interface
if 'agent' not in encoded_args:
raise AttributeError(str(encoded_args))
@ -85,7 +86,7 @@ class ProxyParserCall(object):
def __init__(self, agent, method):
"""
:param agent: Agent to use
:type agent: :class:`burpui.misc.backend.multi.NClient`
:type agent: :class:`burpui.misc.backend.multi.NetClient`
:param method: Name of the method to proxify
:type method: str
@ -93,7 +94,7 @@ class ProxyParserCall(object):
self.agent = agent
self.method = method
def __call__(self, *args, **kwargs):
async def __call__(self, *args, **kwargs):
"""This is where the proxy call (and the magic) occurs"""
# retrieve the original function prototype
proto = getattr(BUIparser, self.method)
@ -113,7 +114,7 @@ class ProxyParserCall(object):
encoded_args.update(kwargs)
data = {'func': 'proxy_parser', 'method': self.method, 'args': encoded_args}
return json.loads(self.agent.do_command(data))
return json.loads(trio.run(self.agent.do_command, data))
class ProxyParser(BUIparser):
@ -128,7 +129,7 @@ class ProxyParser(BUIparser):
def __init__(self, agent):
"""
:param agent: Agent to use
:type agent: :class:`burpui.misc.backend.multi.NClient`
:type agent: :class:`burpui.misc.backend.multi.NetClient`
"""
self.agent = agent
@ -152,7 +153,7 @@ class Burp(BUIbackend):
:class:`burpui.misc.backend.interface.BUIbackend` class.
For each agent found in the configuration, it will load a
:class:`burpui.misc.backend.multi.NClient` class.
:class:`burpui.misc.backend.multi.NetClient` class.
:param server: ``Burp-UI`` server instance in order to access logger
and/or some global settings
@ -190,7 +191,7 @@ class Burp(BUIbackend):
ssl = conf.safe_get('ssl', 'boolean', section=sect) or False
timeout = conf.safe_get('timeout', 'integer', section=sect) or 5
self.servers[r.group(1)] = NClient(self.app, host, port, password, ssl, timeout)
self.servers[r.group(1)] = NetClient(self.app, host, port, password, ssl, timeout)
self.app.config['SERVERS'].append(r.group(1))
if not self.servers:
@ -278,59 +279,56 @@ class Burp(BUIbackend):
return self.servers[agent].get_server_version()
class Gsocket():
def __init__(self, host, port, ssl=False, timeout=5, notimeout=False):
class TCPsocket():
def __init__(self, host, port, ssl=False):
self.host = host
self.port = port
self.ssl = ssl
self.timeout = timeout
self.notimeout = notimeout
def conn(self):
if self.ssl:
import ssl
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if not self.notimeout:
s.settimeout(self.timeout)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
ret = ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE, ssl_version=ssl.PROTOCOL_SSLv23)
ret.connect((self.host, self.port))
else:
if not self.notimeout:
ret = socket.create_connection((self.host, self.port), timeout=self.timeout)
else:
ret = socket.create_connection((self.host, self.port))
ret.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.sock = ret
self.connected = True
def __enter__(self):
self.conn()
return self.sock, self
def __exit__(self, type, value, traceback):
if self.connected:
self.sock.close()
self.connected = False
def recvall(self, length=1024):
async def conn(self):
if self.ssl:
ctx = ssl.SSLContext()
ctx.verify_mode = ssl.CERT_NONE
ctx.check_hostname = False
ctx.load_default_certs()
self.client_stream = await trio.open_ssl_over_tcp_stream(self.host, self.port, ssl_context=ctx)
else:
self.client_stream = await trio.open_tcp_stream(self.host, self.port)
self.connected = True
async def __aenter__(self):
# def __enter__(self):
if not self.connected:
await self.conn()
return self.client_stream, self
async def __aexit__(self, exc_type, exc, tb):
# def __exit__(self, type, value, traceback):
self.connected = False
async def receive_all(self, length=1024):
"""Read the answer of the agent"""
buf = b''
bsize = 1024
bsize = min(1024, length)
received = 0
if length < bsize:
bsize = length
tries = 0
while received < length:
newbuf = self.sock.recv(bsize)
newbuf = await self.client_stream.receive_some(bsize)
if not newbuf:
return None
if tries > 3:
raise Exception('Unable to read full response')
tries += 1
await trio.sleep(0.1)
continue
tries = 0
buf += newbuf
received += len(newbuf)
return buf
class NClient(BUIbackend):
"""The :class:`burpui.misc.backend.multi.NClient` class provides a
class NetClient(BUIbackend):
"""The :class:`burpui.misc.backend.multi.NetClient` class provides a
consistent backend to interact with ``agents``.
It acts as a proxy so it works with any agent running a backend implementing
@ -389,11 +387,11 @@ class NClient(BUIbackend):
return func
return object.__getattribute__(self, name)
def _get_agent_version(self):
if self.ping() and not self._agent_version:
async def _get_agent_version(self):
if await self.ping() and not self._agent_version:
data = {'func': 'agent_version'}
try:
vers = self.do_command(data)
vers = await self.do_command(data)
self._agent_version = json.loads(to_unicode(vers))
except BUIserverException:
# just ignore the error if this custom function is not
@ -401,30 +399,32 @@ class NClient(BUIbackend):
pass
return self._agent_version
def ping(self):
async def ping(self):
"""Check if we are connected to the agent"""
res = False
try:
with Gsocket(self.host, self.port, self.ssl, self.timeout) as (sock, gsock):
sock.sendall(struct.pack('!Q', 2))
sock.sendall(b'RE')
async with TCPsocket(self.host, self.port, self.ssl) as (client_stream, _):
await client_stream.send_all(struct.pack('!Q', 2))
await client_stream.send_all(b'RE')
res = True
except socket.error:
except OSError:
pass
return res
def setup(self, sock, gsock, data):
async def setup(self, client_stream: trio.SocketStream, tcp_sock: TCPsocket, data):
length = struct.pack('!Q', len(data))
sock.sendall(length)
await client_stream.send_all(length)
data = to_unicode(data)
sock.sendall(to_bytes(data))
self.logger.debug("Sending: {}".format(data))
tmp = to_unicode(sock.recv(2))
self.logger.debug("recv: '{}'".format(tmp))
await client_stream.send_all(to_bytes(data))
self.logger.debug(f"Sending: {data!r}")
tmp = await client_stream.receive_some(2)
tmp = to_unicode(tmp)
self.logger.debug(f"recv: '{tmp!r}'")
if 'ER' == tmp:
lengthbuf = sock.recv(8)
lengthbuf = await client_stream.receive_some(8)
length, = struct.unpack('!Q', lengthbuf)
err = to_unicode(gsock.recvall(length))
err = await TCPsocket.receive_all(length)
err = to_unicode(err)
raise BUIserverException(err)
if 'OK' != tmp:
self.logger.debug('Ooops, unsuccessful!')
@ -432,7 +432,7 @@ class NClient(BUIbackend):
self.logger.debug("Data sent successfully")
return True
def do_command(self, data=None, restarted=False):
async def do_command(self, data=None, restarted=False):
"""Send a command to the remote agent"""
res = '[]'
err = None
@ -450,44 +450,62 @@ class NClient(BUIbackend):
try:
# don't need a context manager here
if data['func'] == 'get_file':
gsock = Gsocket(self.host, self.port, self.ssl, notimeout=True)
gsock.conn()
tcp_socket = TCPsocket(self.host, self.port, self.ssl)
await tcp_socket.conn()
raw = json.dumps(data)
if not self.setup(gsock.sock, gsock, raw):
async with tcp_socket.client_stream:
setup = await self.setup(tcp_socket.client_stream, tcp_socket, raw)
if not setup:
return res
return gsock.sock
with Gsocket(self.host, self.port, self.ssl, timeout, notimeout) as (sock, gsock):
try:
raw = json.dumps(data)
if not self.setup(gsock.sock, gsock, raw):
return res
lengthbuf = sock.recv(8)
length, = struct.unpack('!Q', lengthbuf)
res = to_unicode(gsock.recvall(length))
except IOError as exc:
if not restarted and exc.errno == errno.EPIPE:
self.logger.warning('Broken pipe, restarting the request')
return self.do_command(data, True)
elif exc.errno == errno.ECONNRESET:
self.logger.error('!!! {} !!!\nPlease check your SSL configuration on both sides!'.format(str(exc)))
else:
self.logger.error('!!! {} !!!'.format(str(exc)), exc_info=True)
raise exc
except socket.timeout as exc:
if self.app.gunicorn and not restarted:
self.logger.warning('Socket timed-out, restarting the request')
return self.do_command(data, True)
self.logger.error('!!! {} !!!'.format(str(exc)), exc_info=exc)
raise exc
# catch all
except Exception as exc:
self.logger.error('!!! {} !!!'.format(str(exc)), exc_info=exc)
if data['func'] == 'restore_files':
err = str(exc)
elif isinstance(exc, BUIserverException):
raise exc
else:
raise BUIserverException(str(exc))
return tcp_socket.client_stream
async def __inner_job():
nonlocal res
nonlocal err
async with TCPsocket(self.host, self.port, self.ssl) as (client_stream, tcp_socket):
async with client_stream:
try:
raw = json.dumps(data)
setup = await self.setup(client_stream, tcp_socket, raw)
if not setup:
return res
lengthbuf = await client_stream.receive_some(8)
length, = struct.unpack('!Q', lengthbuf)
res = await tcp_socket.receive_all(length)
res = to_unicode(res)
except IOError as exc:
if not restarted and exc.errno == errno.EPIPE:
self.logger.warning('Broken pipe, restarting the request')
return await self.do_command(data, True)
elif exc.errno == errno.ECONNRESET:
self.logger.error('!!! {} !!!\nPlease check your SSL configuration on both sides!'.format(str(exc)))
else:
self.logger.error('!!! {} !!!'.format(str(exc)), exc_info=True)
raise exc
# catch all
except Exception as exc:
self.logger.error('!!! {} !!!'.format(str(exc)), exc_info=exc)
if data['func'] == 'restore_files':
err = str(exc)
elif isinstance(exc, BUIserverException):
raise exc
else:
raise BUIserverException(str(exc))
return res
timedout = True
if notimeout:
timedout = False
res = await __inner_job()
else:
with trio.move_on_after(timeout) as cancel_scope:
res = await __inner_job()
timedout = cancel_scope.cancelled_caught
if timedout:
if self.app.gunicorn and not restarted:
self.logger.warning('Socket timed-out, restarting the request')
return await self.do_command(data, True)
self.logger.error('!!! TimeoutError !!!')
raise TimeoutError()
except Exception as exc:
self.logger.error('!!! {} !!!'.format(str(exc)), exc_info=exc)
raise BUIserverException(str(exc))
@ -514,7 +532,7 @@ class NClient(BUIbackend):
msg = 'Wrong data type'
self.logger.warning(msg)
raise BUIserverException(msg)
vers = self._get_agent_version()
vers = trio.run(self._get_agent_version)
if vers and vers >= AGENT_VERSION_CAST:
# convert the data to our custom ImmutableMultiDict
data = ImmutableMultiDict(data.to_dict(False))
@ -524,7 +542,7 @@ class NClient(BUIbackend):
bytes_pickles = to_bytes(pickles)
digest = to_unicode(hmac.new(key, bytes_pickles, hashlib.sha1).hexdigest())
data = {'func': 'store_conf_cli', 'args': pickles, 'pickled': True, 'digest': digest}
return json.loads(self.do_command(data))
return json.loads(trio.run(self.do_command, data))
@implement
def store_conf_srv(self, data, conf=None, agent=None):
@ -537,7 +555,7 @@ class NClient(BUIbackend):
msg = 'Wrong data type'
self.logger.warning(msg)
raise BUIserverException(msg)
vers = self._get_agent_version()
vers = trio.run(self._get_agent_version)
if vers and vers >= AGENT_VERSION_CAST:
# convert the data to our custom ImmutableMultiDict
data = ImmutableMultiDict(data.to_dict(False))
@ -547,22 +565,22 @@ class NClient(BUIbackend):
bytes_pickles = to_bytes(pickles)
digest = to_unicode(hmac.new(key, bytes_pickles, hashlib.sha1).hexdigest())
data = {'func': 'store_conf_srv', 'args': pickles, 'pickled': True, 'digest': digest}
return json.loads(self.do_command(data))
return json.loads(trio.run(self.do_command, data))
@implement
def restore_files(self, name=None, backup=None, files=None, strip=None, archive='zip', password=None, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.restore_files`"""
data = {'func': 'restore_files', 'args': {'name': name, 'backup': backup, 'files': files, 'strip': strip, 'archive': archive, 'password': password}}
return self.do_command(data)
return trio.run(self.do_command, data)
@implement
def get_file(self, path, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.get_file`"""
data = {'func': 'get_file', 'path': path}
return self.do_command(data)
return trio.run(self.do_command, data)
@implement
def del_file(self, path, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.del_file`"""
data = {'func': 'del_file', 'path': path}
return json.loads(self.do_command(data))
return json.loads(trio.run(self.do_command, data))