From d97106c07e9e40990bee6263f7a81ea5c0673379 Mon Sep 17 00:00:00 2001 From: ziirish Date: Wed, 16 Dec 2015 22:46:30 +0100 Subject: [PATCH] use gevent for multiprocessing when gunicorn is in use --- burpui/api/__init__.py | 90 +++++++++++++++++-------- burpui/misc/backend/multi.py | 126 ++++++++++++++++++++++++----------- 2 files changed, 151 insertions(+), 65 deletions(-) diff --git a/burpui/api/__init__.py b/burpui/api/__init__.py index 8bdc4da0..b546994a 100644 --- a/burpui/api/__init__.py +++ b/burpui/api/__init__.py @@ -11,7 +11,6 @@ import os import sys import json -import multiprocessing from flask import Blueprint, Response, request from flask.ext.restplus import Api @@ -20,39 +19,78 @@ from flask.ext.cache import Cache from importlib import import_module from functools import wraps +from .._compat import IS_GUNICORN + if sys.version_info >= (3, 0): # pragma: no cover basestring = str -def parallel_loop(func=None, elem=None): - ret = [] +# Implement a "parallel loop" routine either with gipc or multiprocessing +# depending if we are under gunicorn or not +if IS_GUNICORN: + def parallel_loop(func=None, elem=None): + import gevent + from gevent.queue import Queue + ret = [] + api.bui.cli._logger('debug', 'Using gevent') - if not callable(func): - api.abort(500, 'The provided \'func\' is not callable!') - if not elem: - return [] + if not callable(func): + api.abort(500, 'The provided \'func\' is not callable!') + if not elem: + return [] - # create our process pool/queue - output = multiprocessing.Queue() - processes = [ - multiprocessing.Process( - target=func, - args=(e, output) - ) for e in elem - ] - # start the processes - [p.start() for p in processes] - # wait for process termination - [p.join() for p in processes] + output = Queue() - for p in processes: - tmp = output.get() - if isinstance(tmp, basestring): - api.abort(500, tmp) - elif tmp: - ret.append(tmp) + processes = [ + gevent.spawn( + func, + e, + output + ) for e in elem + ] + # wait for process termination + gevent.joinall(processes) - return ret + for p in processes: + tmp = output.get() + if isinstance(tmp, basestring): + api.abort(500, tmp) + elif tmp: + ret.append(tmp) + + return ret + +else: + def parallel_loop(func=None, elem=None): + import multiprocessing + ret = [] + + if not callable(func): + api.abort(500, 'The provided \'func\' is not callable!') + if not elem: + return [] + + # create our process pool/queue + output = multiprocessing.Queue() + processes = [ + multiprocessing.Process( + target=func, + args=(e, output) + ) for e in elem + ] + # start the processes + [p.start() for p in processes] + # wait for process termination + [p.join() for p in processes] + + for p in processes: + tmp = output.get() + if isinstance(tmp, basestring): + api.abort(500, tmp) + elif tmp: + ret.append(tmp) + + return ret def cache_key(): diff --git a/burpui/misc/backend/multi.py b/burpui/misc/backend/multi.py index 8befa86a..76008d8a 100644 --- a/burpui/misc/backend/multi.py +++ b/burpui/misc/backend/multi.py @@ -5,7 +5,6 @@ import errno import time import struct import traceback -import multiprocessing try: import cPickle as pickle except ImportError: @@ -14,19 +13,12 @@ try: import ujson as json except ImportError: import json -try: - import ConfigParser -except ImportError: - import configparser as ConfigParser -try: - from gevent.local import local -except ImportError: - local = object from six import iteritems from .interface import BUIbackend from ...exceptions import BUIserverException +from ..._compat import IS_GUNICORN, ConfigParser, local class Burp(BUIbackend): @@ -95,19 +87,39 @@ class Burp(BUIbackend): """See :func:`burpui.misc.backend.interface.BUIbackend.is_backup_running`""" return self.servers[agent].is_backup_running(name) - def is_one_backup_running(self, agent=None): - """See :func:`burpui.misc.backend.interface.BUIbackend.is_one_backup_running`""" - r = [] - if agent: - r = self.servers[agent].is_one_backup_running(agent) - self.running[agent] = r + def _backup_running_parallel(self): + """Use :mod:`multiprocessing` or :mod:`gevent` to retrieve a list of + running backups + """ + if IS_GUNICORN: + import gevent + from gevent.queue import Queue else: - r = {} - output = multiprocessing.Queue() + import multiprocessing + Queue = multiprocessing.Queue - def get_running(a, i, output): - output.put((i, self.servers[a].is_one_backup_running(a))) + r = {} + output = Queue() + def get_running(a, i, output): + output.put((i, self.servers[a].is_one_backup_running(a))) + + # If we are running under gunicorn, use a gevent-safe method + if IS_GUNICORN: + processes = [ + ( + gevent.spawn( + get_running, + a, + i, + output + ), + a + ) for (i, a) in enumerate(self.servers) + ] + greens = [p for (p, a) in processes] + gevent.joinall(greens) + else: processes = [ ( multiprocessing.Process( @@ -120,12 +132,23 @@ class Burp(BUIbackend): [p.start() for (p, a) in processes] [p.join() for (p, a) in processes] - results = [output.get() for (p, a) in processes] - results.sort() + results = [output.get() for (p, a) in processes] + results.sort() - for (i, (p, a)) in enumerate(processes): - # results contains a tuple (index, response) so we 'split' it - _, r[a] = results[i] + for (i, (p, a)) in enumerate(processes): + # results contains a tuple (index, response) so we 'split' it + _, r[a] = results[i] + + return r + + def is_one_backup_running(self, agent=None): + """See :func:`burpui.misc.backend.interface.BUIbackend.is_one_backup_running`""" + r = [] + if agent: + r = self.servers[agent].is_one_backup_running(agent) + self.running[agent] = r + else: + r = self._backup_running_parallel() self.running = r self.refresh = time.time() @@ -185,11 +208,20 @@ class Burp(BUIbackend): """See :func:`burpui.misc.backend.interface.BUIbackend.schedule_restore`""" return self.servers[agent].schedule_restore(name, backup, files, strip, force, prefix, restoreto) - def _get_multi_version(self, method=None): + def _get_version_parallel(self, method=None): + """Use :mod:`multiprocessing` or :mod:`gevent` to retrieve versions""" + if IS_GUNICORN: + import gevent + from gevent.queue import Queue + else: + import multiprocessing + Queue = multiprocessing.Queue + if not method: raise BUIserverException('Wrong method call') + r = {} - output = multiprocessing.Queue() + output = Queue() def get_client_vers(key, i, output): output.put((i, self.servers[key].get_client_version())) @@ -202,17 +234,33 @@ class Burp(BUIbackend): else: func = get_server_vers - processes = [ - ( - multiprocessing.Process( - target=func, - args=(k, i, output) - ), - k - ) for (i, (k, s)) in enumerate(iteritems(self.servers)) - ] - [p.start() for (p, k) in processes] - [p.join() for (p, k) in processes] + # If we are running under gunicorn, use a gevent-safe method + if IS_GUNICORN: + processes = [ + ( + gevent.spawn( + func, + k, + i, + output + ), + k + ) for (i, (k, s)) in enumerate(iteritems(self.servers)) + ] + greens = [p for (p, a) in processes] + gevent.joinall(greens) + else: + processes = [ + ( + multiprocessing.Process( + target=func, + args=(k, i, output) + ), + k + ) for (i, (k, s)) in enumerate(iteritems(self.servers)) + ] + [p.start() for (p, k) in processes] + [p.join() for (p, k) in processes] results = [output.get() for (p, k) in processes] results.sort() @@ -225,13 +273,13 @@ class Burp(BUIbackend): def get_client_version(self, agent=None): """See :func:`burpui.misc.backend.interface.BUIbackend.get_client_version`""" if not agent: - return self._get_multi_version('get_client_version') + return self._get_version_parallel('get_client_version') return self.servers[agent].get_client_version() def get_server_version(self, agent=None): """See :func:`burpui.misc.backend.interface.BUIbackend.get_server_version`""" if not agent: - return self._get_multi_version('get_server_version') + return self._get_version_parallel('get_server_version') return self.servers[agent].get_server_version()