use gevent for multiprocessing when gunicorn is in use

This commit is contained in:
ziirish 2015-12-16 22:46:30 +01:00
parent 5c3f65ed0f
commit d97106c07e
2 changed files with 151 additions and 65 deletions

View file

@ -11,7 +11,6 @@
import os
import sys
import json
import multiprocessing
from flask import Blueprint, Response, request
from flask.ext.restplus import Api
@ -20,39 +19,78 @@ from flask.ext.cache import Cache
from importlib import import_module
from functools import wraps
from .._compat import IS_GUNICORN
if sys.version_info >= (3, 0): # pragma: no cover
basestring = str
def parallel_loop(func=None, elem=None):
ret = []
# Implement a "parallel loop" routine either with gipc or multiprocessing
# depending if we are under gunicorn or not
if IS_GUNICORN:
def parallel_loop(func=None, elem=None):
import gevent
from gevent.queue import Queue
ret = []
api.bui.cli._logger('debug', 'Using gevent')
if not callable(func):
api.abort(500, 'The provided \'func\' is not callable!')
if not elem:
return []
if not callable(func):
api.abort(500, 'The provided \'func\' is not callable!')
if not elem:
return []
# create our process pool/queue
output = multiprocessing.Queue()
processes = [
multiprocessing.Process(
target=func,
args=(e, output)
) for e in elem
]
# start the processes
[p.start() for p in processes]
# wait for process termination
[p.join() for p in processes]
output = Queue()
for p in processes:
tmp = output.get()
if isinstance(tmp, basestring):
api.abort(500, tmp)
elif tmp:
ret.append(tmp)
processes = [
gevent.spawn(
func,
e,
output
) for e in elem
]
# wait for process termination
gevent.joinall(processes)
return ret
for p in processes:
tmp = output.get()
if isinstance(tmp, basestring):
api.abort(500, tmp)
elif tmp:
ret.append(tmp)
return ret
else:
def parallel_loop(func=None, elem=None):
import multiprocessing
ret = []
if not callable(func):
api.abort(500, 'The provided \'func\' is not callable!')
if not elem:
return []
# create our process pool/queue
output = multiprocessing.Queue()
processes = [
multiprocessing.Process(
target=func,
args=(e, output)
) for e in elem
]
# start the processes
[p.start() for p in processes]
# wait for process termination
[p.join() for p in processes]
for p in processes:
tmp = output.get()
if isinstance(tmp, basestring):
api.abort(500, tmp)
elif tmp:
ret.append(tmp)
return ret
def cache_key():

View file

@ -5,7 +5,6 @@ import errno
import time
import struct
import traceback
import multiprocessing
try:
import cPickle as pickle
except ImportError:
@ -14,19 +13,12 @@ try:
import ujson as json
except ImportError:
import json
try:
import ConfigParser
except ImportError:
import configparser as ConfigParser
try:
from gevent.local import local
except ImportError:
local = object
from six import iteritems
from .interface import BUIbackend
from ...exceptions import BUIserverException
from ..._compat import IS_GUNICORN, ConfigParser, local
class Burp(BUIbackend):
@ -95,19 +87,39 @@ class Burp(BUIbackend):
"""See :func:`burpui.misc.backend.interface.BUIbackend.is_backup_running`"""
return self.servers[agent].is_backup_running(name)
def is_one_backup_running(self, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.is_one_backup_running`"""
r = []
if agent:
r = self.servers[agent].is_one_backup_running(agent)
self.running[agent] = r
def _backup_running_parallel(self):
"""Use :mod:`multiprocessing` or :mod:`gevent` to retrieve a list of
running backups
"""
if IS_GUNICORN:
import gevent
from gevent.queue import Queue
else:
r = {}
output = multiprocessing.Queue()
import multiprocessing
Queue = multiprocessing.Queue
def get_running(a, i, output):
output.put((i, self.servers[a].is_one_backup_running(a)))
r = {}
output = Queue()
def get_running(a, i, output):
output.put((i, self.servers[a].is_one_backup_running(a)))
# If we are running under gunicorn, use a gevent-safe method
if IS_GUNICORN:
processes = [
(
gevent.spawn(
get_running,
a,
i,
output
),
a
) for (i, a) in enumerate(self.servers)
]
greens = [p for (p, a) in processes]
gevent.joinall(greens)
else:
processes = [
(
multiprocessing.Process(
@ -120,12 +132,23 @@ class Burp(BUIbackend):
[p.start() for (p, a) in processes]
[p.join() for (p, a) in processes]
results = [output.get() for (p, a) in processes]
results.sort()
results = [output.get() for (p, a) in processes]
results.sort()
for (i, (p, a)) in enumerate(processes):
# results contains a tuple (index, response) so we 'split' it
_, r[a] = results[i]
for (i, (p, a)) in enumerate(processes):
# results contains a tuple (index, response) so we 'split' it
_, r[a] = results[i]
return r
def is_one_backup_running(self, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.is_one_backup_running`"""
r = []
if agent:
r = self.servers[agent].is_one_backup_running(agent)
self.running[agent] = r
else:
r = self._backup_running_parallel()
self.running = r
self.refresh = time.time()
@ -185,11 +208,20 @@ class Burp(BUIbackend):
"""See :func:`burpui.misc.backend.interface.BUIbackend.schedule_restore`"""
return self.servers[agent].schedule_restore(name, backup, files, strip, force, prefix, restoreto)
def _get_multi_version(self, method=None):
def _get_version_parallel(self, method=None):
"""Use :mod:`multiprocessing` or :mod:`gevent` to retrieve versions"""
if IS_GUNICORN:
import gevent
from gevent.queue import Queue
else:
import multiprocessing
Queue = multiprocessing.Queue
if not method:
raise BUIserverException('Wrong method call')
r = {}
output = multiprocessing.Queue()
output = Queue()
def get_client_vers(key, i, output):
output.put((i, self.servers[key].get_client_version()))
@ -202,17 +234,33 @@ class Burp(BUIbackend):
else:
func = get_server_vers
processes = [
(
multiprocessing.Process(
target=func,
args=(k, i, output)
),
k
) for (i, (k, s)) in enumerate(iteritems(self.servers))
]
[p.start() for (p, k) in processes]
[p.join() for (p, k) in processes]
# If we are running under gunicorn, use a gevent-safe method
if IS_GUNICORN:
processes = [
(
gevent.spawn(
func,
k,
i,
output
),
k
) for (i, (k, s)) in enumerate(iteritems(self.servers))
]
greens = [p for (p, a) in processes]
gevent.joinall(greens)
else:
processes = [
(
multiprocessing.Process(
target=func,
args=(k, i, output)
),
k
) for (i, (k, s)) in enumerate(iteritems(self.servers))
]
[p.start() for (p, k) in processes]
[p.join() for (p, k) in processes]
results = [output.get() for (p, k) in processes]
results.sort()
@ -225,13 +273,13 @@ class Burp(BUIbackend):
def get_client_version(self, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.get_client_version`"""
if not agent:
return self._get_multi_version('get_client_version')
return self._get_version_parallel('get_client_version')
return self.servers[agent].get_client_version()
def get_server_version(self, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.get_server_version`"""
if not agent:
return self._get_multi_version('get_server_version')
return self._get_version_parallel('get_server_version')
return self.servers[agent].get_server_version()