From 29a7e68ccff6c195c5e73567b24387aebcddc6d4 Mon Sep 17 00:00:00 2001 From: ziirish Date: Mon, 23 Jul 2018 20:27:16 +0200 Subject: [PATCH] make the multi backend async: restore-files is not working because it is too hard to make it fully async --- burpui/agent.py | 4 +- burpui/api/restore.py | 10 +- burpui/api/servers.py | 3 +- burpui/misc/backend/multi.py | 230 +++++++++++++++++++---------------- 4 files changed, 132 insertions(+), 115 deletions(-) diff --git a/burpui/agent.py b/burpui/agent.py index 1ea250a7..1e4a76d8 100644 --- a/burpui/agent.py +++ b/burpui/agent.py @@ -300,11 +300,9 @@ class BUIAgent(BUIbackend): async def receive_all(self, stream, length=1024): buf = b'' - bsize = 1024 + bsize = min(1024, length) received = 0 tries = 0 - if length < bsize: - bsize = length while received < length: newbuf = await stream.receive_some(bsize) if not newbuf: diff --git a/burpui/api/restore.py b/burpui/api/restore.py index c60b2445..f9794994 100644 --- a/burpui/api/restore.py +++ b/burpui/api/restore.py @@ -9,6 +9,7 @@ """ import select import struct +import trio from . import api from ..server import BUIServer # noqa @@ -157,7 +158,7 @@ class Restore(Resource): if not socket: self.abort(500) - lengthbuf = socket.recv(8) + lengthbuf = trio.run(socket.receive_some, 8) length, = struct.unpack('!Q', lengthbuf) bui.client.logger.debug('Need to get {} Bytes : {}'.format(length, socket)) @@ -176,15 +177,14 @@ class Restore(Resource): read, _, _ = select.select([sock], [], [], 5) if not read: raise Exception('Socket timed-out') - buf += sock.recv(bsize) + buf += trio.run(sock.receive_some, bsize) if not buf: continue received += len(buf) self.logger.debug('{}/{}'.format(received, size)) yield buf - sock.sendall(struct.pack('!Q', 2)) - sock.sendall(b'RE') - sock.close() + trio.run(sock.send_all, struct.pack('!Q', 2)) + trio.run(sock.send_all, b'RE') headers = Headers() headers.add('Content-Disposition', diff --git a/burpui/api/servers.py b/burpui/api/servers.py index 63cd8e1b..8053c501 100644 --- a/burpui/api/servers.py +++ b/burpui/api/servers.py @@ -1,4 +1,5 @@ # -*- coding: utf8 -*- +import trio # This is a submodule we can also use "from ..api import api" from . import api, cache_key, force_refresh @@ -72,7 +73,7 @@ class ServersStats(Resource): for serv in bui.client.servers: try: - alive = bui.client.servers[serv].ping() + alive = trio.run(bui.client.servers[serv].ping) except BUIserverException: alive = False diff --git a/burpui/misc/backend/multi.py b/burpui/misc/backend/multi.py index 12d8d713..58943249 100644 --- a/burpui/misc/backend/multi.py +++ b/burpui/misc/backend/multi.py @@ -1,9 +1,10 @@ # -*- coding: utf8 -*- import re -import socket import errno import json import struct +import trio +import ssl from werkzeug.datastructures import ImmutableMultiDict as _ImmutableMultiDict @@ -61,7 +62,7 @@ class ProxyCall(object): # Special case for network calls if self.network: data = {'func': self.method, 'args': encoded_args} - return json.loads(self.proxy.do_command(data)) + return json.loads(trio.run(self.proxy.do_command, data)) # normal case for "standard" interface if 'agent' not in encoded_args: raise AttributeError(str(encoded_args)) @@ -85,7 +86,7 @@ class ProxyParserCall(object): def __init__(self, agent, method): """ :param agent: Agent to use - :type agent: :class:`burpui.misc.backend.multi.NClient` + :type agent: :class:`burpui.misc.backend.multi.NetClient` :param method: Name of the method to proxify :type method: str @@ -93,7 +94,7 @@ class ProxyParserCall(object): self.agent = agent self.method = method - def __call__(self, *args, **kwargs): + async def __call__(self, *args, **kwargs): """This is where the proxy call (and the magic) occurs""" # retrieve the original function prototype proto = getattr(BUIparser, self.method) @@ -113,7 +114,7 @@ class ProxyParserCall(object): encoded_args.update(kwargs) data = {'func': 'proxy_parser', 'method': self.method, 'args': encoded_args} - return json.loads(self.agent.do_command(data)) + return json.loads(trio.run(self.agent.do_command, data)) class ProxyParser(BUIparser): @@ -128,7 +129,7 @@ class ProxyParser(BUIparser): def __init__(self, agent): """ :param agent: Agent to use - :type agent: :class:`burpui.misc.backend.multi.NClient` + :type agent: :class:`burpui.misc.backend.multi.NetClient` """ self.agent = agent @@ -152,7 +153,7 @@ class Burp(BUIbackend): :class:`burpui.misc.backend.interface.BUIbackend` class. For each agent found in the configuration, it will load a - :class:`burpui.misc.backend.multi.NClient` class. + :class:`burpui.misc.backend.multi.NetClient` class. :param server: ``Burp-UI`` server instance in order to access logger and/or some global settings @@ -190,7 +191,7 @@ class Burp(BUIbackend): ssl = conf.safe_get('ssl', 'boolean', section=sect) or False timeout = conf.safe_get('timeout', 'integer', section=sect) or 5 - self.servers[r.group(1)] = NClient(self.app, host, port, password, ssl, timeout) + self.servers[r.group(1)] = NetClient(self.app, host, port, password, ssl, timeout) self.app.config['SERVERS'].append(r.group(1)) if not self.servers: @@ -278,59 +279,56 @@ class Burp(BUIbackend): return self.servers[agent].get_server_version() -class Gsocket(): - def __init__(self, host, port, ssl=False, timeout=5, notimeout=False): +class TCPsocket(): + def __init__(self, host, port, ssl=False): self.host = host self.port = port self.ssl = ssl - self.timeout = timeout - self.notimeout = notimeout - - def conn(self): - if self.ssl: - import ssl - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - if not self.notimeout: - s.settimeout(self.timeout) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - ret = ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE, ssl_version=ssl.PROTOCOL_SSLv23) - ret.connect((self.host, self.port)) - else: - if not self.notimeout: - ret = socket.create_connection((self.host, self.port), timeout=self.timeout) - else: - ret = socket.create_connection((self.host, self.port)) - ret.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - self.sock = ret - self.connected = True - - def __enter__(self): - self.conn() - return self.sock, self - - def __exit__(self, type, value, traceback): - if self.connected: - self.sock.close() self.connected = False - def recvall(self, length=1024): + async def conn(self): + if self.ssl: + ctx = ssl.SSLContext() + ctx.verify_mode = ssl.CERT_NONE + ctx.check_hostname = False + ctx.load_default_certs() + self.client_stream = await trio.open_ssl_over_tcp_stream(self.host, self.port, ssl_context=ctx) + else: + self.client_stream = await trio.open_tcp_stream(self.host, self.port) + self.connected = True + + async def __aenter__(self): + # def __enter__(self): + if not self.connected: + await self.conn() + return self.client_stream, self + + async def __aexit__(self, exc_type, exc, tb): + # def __exit__(self, type, value, traceback): + self.connected = False + + async def receive_all(self, length=1024): """Read the answer of the agent""" buf = b'' - bsize = 1024 + bsize = min(1024, length) received = 0 - if length < bsize: - bsize = length + tries = 0 while received < length: - newbuf = self.sock.recv(bsize) + newbuf = await self.client_stream.receive_some(bsize) if not newbuf: - return None + if tries > 3: + raise Exception('Unable to read full response') + tries += 1 + await trio.sleep(0.1) + continue + tries = 0 buf += newbuf received += len(newbuf) return buf -class NClient(BUIbackend): - """The :class:`burpui.misc.backend.multi.NClient` class provides a +class NetClient(BUIbackend): + """The :class:`burpui.misc.backend.multi.NetClient` class provides a consistent backend to interact with ``agents``. It acts as a proxy so it works with any agent running a backend implementing @@ -389,11 +387,11 @@ class NClient(BUIbackend): return func return object.__getattribute__(self, name) - def _get_agent_version(self): - if self.ping() and not self._agent_version: + async def _get_agent_version(self): + if await self.ping() and not self._agent_version: data = {'func': 'agent_version'} try: - vers = self.do_command(data) + vers = await self.do_command(data) self._agent_version = json.loads(to_unicode(vers)) except BUIserverException: # just ignore the error if this custom function is not @@ -401,30 +399,32 @@ class NClient(BUIbackend): pass return self._agent_version - def ping(self): + async def ping(self): """Check if we are connected to the agent""" res = False try: - with Gsocket(self.host, self.port, self.ssl, self.timeout) as (sock, gsock): - sock.sendall(struct.pack('!Q', 2)) - sock.sendall(b'RE') + async with TCPsocket(self.host, self.port, self.ssl) as (client_stream, _): + await client_stream.send_all(struct.pack('!Q', 2)) + await client_stream.send_all(b'RE') res = True - except socket.error: + except OSError: pass return res - def setup(self, sock, gsock, data): + async def setup(self, client_stream: trio.SocketStream, tcp_sock: TCPsocket, data): length = struct.pack('!Q', len(data)) - sock.sendall(length) + await client_stream.send_all(length) data = to_unicode(data) - sock.sendall(to_bytes(data)) - self.logger.debug("Sending: {}".format(data)) - tmp = to_unicode(sock.recv(2)) - self.logger.debug("recv: '{}'".format(tmp)) + await client_stream.send_all(to_bytes(data)) + self.logger.debug(f"Sending: {data!r}") + tmp = await client_stream.receive_some(2) + tmp = to_unicode(tmp) + self.logger.debug(f"recv: '{tmp!r}'") if 'ER' == tmp: - lengthbuf = sock.recv(8) + lengthbuf = await client_stream.receive_some(8) length, = struct.unpack('!Q', lengthbuf) - err = to_unicode(gsock.recvall(length)) + err = await TCPsocket.receive_all(length) + err = to_unicode(err) raise BUIserverException(err) if 'OK' != tmp: self.logger.debug('Ooops, unsuccessful!') @@ -432,7 +432,7 @@ class NClient(BUIbackend): self.logger.debug("Data sent successfully") return True - def do_command(self, data=None, restarted=False): + async def do_command(self, data=None, restarted=False): """Send a command to the remote agent""" res = '[]' err = None @@ -450,44 +450,62 @@ class NClient(BUIbackend): try: # don't need a context manager here if data['func'] == 'get_file': - gsock = Gsocket(self.host, self.port, self.ssl, notimeout=True) - gsock.conn() + tcp_socket = TCPsocket(self.host, self.port, self.ssl) + await tcp_socket.conn() raw = json.dumps(data) - if not self.setup(gsock.sock, gsock, raw): + async with tcp_socket.client_stream: + setup = await self.setup(tcp_socket.client_stream, tcp_socket, raw) + if not setup: return res - return gsock.sock - with Gsocket(self.host, self.port, self.ssl, timeout, notimeout) as (sock, gsock): - try: - raw = json.dumps(data) - if not self.setup(gsock.sock, gsock, raw): - return res - lengthbuf = sock.recv(8) - length, = struct.unpack('!Q', lengthbuf) - res = to_unicode(gsock.recvall(length)) - except IOError as exc: - if not restarted and exc.errno == errno.EPIPE: - self.logger.warning('Broken pipe, restarting the request') - return self.do_command(data, True) - elif exc.errno == errno.ECONNRESET: - self.logger.error('!!! {} !!!\nPlease check your SSL configuration on both sides!'.format(str(exc))) - else: - self.logger.error('!!! {} !!!'.format(str(exc)), exc_info=True) - raise exc - except socket.timeout as exc: - if self.app.gunicorn and not restarted: - self.logger.warning('Socket timed-out, restarting the request') - return self.do_command(data, True) - self.logger.error('!!! {} !!!'.format(str(exc)), exc_info=exc) - raise exc - # catch all - except Exception as exc: - self.logger.error('!!! {} !!!'.format(str(exc)), exc_info=exc) - if data['func'] == 'restore_files': - err = str(exc) - elif isinstance(exc, BUIserverException): - raise exc - else: - raise BUIserverException(str(exc)) + return tcp_socket.client_stream + + async def __inner_job(): + nonlocal res + nonlocal err + async with TCPsocket(self.host, self.port, self.ssl) as (client_stream, tcp_socket): + async with client_stream: + try: + raw = json.dumps(data) + setup = await self.setup(client_stream, tcp_socket, raw) + if not setup: + return res + lengthbuf = await client_stream.receive_some(8) + length, = struct.unpack('!Q', lengthbuf) + res = await tcp_socket.receive_all(length) + res = to_unicode(res) + except IOError as exc: + if not restarted and exc.errno == errno.EPIPE: + self.logger.warning('Broken pipe, restarting the request') + return await self.do_command(data, True) + elif exc.errno == errno.ECONNRESET: + self.logger.error('!!! {} !!!\nPlease check your SSL configuration on both sides!'.format(str(exc))) + else: + self.logger.error('!!! {} !!!'.format(str(exc)), exc_info=True) + raise exc + # catch all + except Exception as exc: + self.logger.error('!!! {} !!!'.format(str(exc)), exc_info=exc) + if data['func'] == 'restore_files': + err = str(exc) + elif isinstance(exc, BUIserverException): + raise exc + else: + raise BUIserverException(str(exc)) + return res + timedout = True + if notimeout: + timedout = False + res = await __inner_job() + else: + with trio.move_on_after(timeout) as cancel_scope: + res = await __inner_job() + timedout = cancel_scope.cancelled_caught + if timedout: + if self.app.gunicorn and not restarted: + self.logger.warning('Socket timed-out, restarting the request') + return await self.do_command(data, True) + self.logger.error('!!! TimeoutError !!!') + raise TimeoutError() except Exception as exc: self.logger.error('!!! {} !!!'.format(str(exc)), exc_info=exc) raise BUIserverException(str(exc)) @@ -514,7 +532,7 @@ class NClient(BUIbackend): msg = 'Wrong data type' self.logger.warning(msg) raise BUIserverException(msg) - vers = self._get_agent_version() + vers = trio.run(self._get_agent_version) if vers and vers >= AGENT_VERSION_CAST: # convert the data to our custom ImmutableMultiDict data = ImmutableMultiDict(data.to_dict(False)) @@ -524,7 +542,7 @@ class NClient(BUIbackend): bytes_pickles = to_bytes(pickles) digest = to_unicode(hmac.new(key, bytes_pickles, hashlib.sha1).hexdigest()) data = {'func': 'store_conf_cli', 'args': pickles, 'pickled': True, 'digest': digest} - return json.loads(self.do_command(data)) + return json.loads(trio.run(self.do_command, data)) @implement def store_conf_srv(self, data, conf=None, agent=None): @@ -537,7 +555,7 @@ class NClient(BUIbackend): msg = 'Wrong data type' self.logger.warning(msg) raise BUIserverException(msg) - vers = self._get_agent_version() + vers = trio.run(self._get_agent_version) if vers and vers >= AGENT_VERSION_CAST: # convert the data to our custom ImmutableMultiDict data = ImmutableMultiDict(data.to_dict(False)) @@ -547,22 +565,22 @@ class NClient(BUIbackend): bytes_pickles = to_bytes(pickles) digest = to_unicode(hmac.new(key, bytes_pickles, hashlib.sha1).hexdigest()) data = {'func': 'store_conf_srv', 'args': pickles, 'pickled': True, 'digest': digest} - return json.loads(self.do_command(data)) + return json.loads(trio.run(self.do_command, data)) @implement def restore_files(self, name=None, backup=None, files=None, strip=None, archive='zip', password=None, agent=None): """See :func:`burpui.misc.backend.interface.BUIbackend.restore_files`""" data = {'func': 'restore_files', 'args': {'name': name, 'backup': backup, 'files': files, 'strip': strip, 'archive': archive, 'password': password}} - return self.do_command(data) + return trio.run(self.do_command, data) @implement def get_file(self, path, agent=None): """See :func:`burpui.misc.backend.interface.BUIbackend.get_file`""" data = {'func': 'get_file', 'path': path} - return self.do_command(data) + return trio.run(self.do_command, data) @implement def del_file(self, path, agent=None): """See :func:`burpui.misc.backend.interface.BUIbackend.del_file`""" data = {'func': 'del_file', 'path': path} - return json.loads(self.do_command(data)) + return json.loads(trio.run(self.do_command, data))