add: multiprocessing for better scalability

This commit is contained in:
ziirish 2015-12-15 21:59:59 +01:00
parent bf664f69de
commit 5c3f65ed0f
3 changed files with 106 additions and 17 deletions

View file

@ -11,6 +11,7 @@
import os
import sys
import json
import multiprocessing
from flask import Blueprint, Response, request
from flask.ext.restplus import Api
@ -23,6 +24,37 @@ if sys.version_info >= (3, 0): # pragma: no cover
basestring = str
def parallel_loop(func=None, elem=None):
ret = []
if not callable(func):
api.abort(500, 'The provided \'func\' is not callable!')
if not elem:
return []
# create our process pool/queue
output = multiprocessing.Queue()
processes = [
multiprocessing.Process(
target=func,
args=(e, output)
) for e in elem
]
# start the processes
[p.start() for p in processes]
# wait for process termination
[p.join() for p in processes]
for p in processes:
tmp = output.get()
if isinstance(tmp, basestring):
api.abort(500, tmp)
elif tmp:
ret.append(tmp)
return ret
def cache_key():
return '{}-{}-{}'.format(current_user.get_id(), request.path, request.values)

View file

@ -1,7 +1,7 @@
# -*- coding: utf8 -*-
# This is a submodule we can also use "from ..api import api"
from . import api, cache_key
from . import api, cache_key, parallel_loop
from ..exceptions import BUIserverException
from flask.ext.restplus import Resource, fields
@ -63,22 +63,27 @@ class ServersStats(Resource):
check = True
allowed = api.bui.acl.servers(current_user.get_id())
for serv in api.bui.cli.servers:
def get_servers_info(serv, output):
try:
if check:
if serv in allowed:
r.append({
output.put({
'name': serv,
'clients': len(api.bui.acl.clients(current_user.get_id(), serv)),
'alive': api.bui.cli.servers[serv].ping()
})
return
else:
r.append({
output.put({
'name': serv,
'clients': len(api.bui.cli.servers[serv].get_all_clients(serv)),
'alive': api.bui.cli.servers[serv].ping()
})
return
output.put(None)
except BUIserverException as e:
api.abort(500, str(e))
output.put(str(e))
r = parallel_loop(get_servers_info, api.bui.cli.servers)
return r

View file

@ -5,6 +5,7 @@ import errno
import time
import struct
import traceback
import multiprocessing
try:
import cPickle as pickle
except ImportError:
@ -102,8 +103,30 @@ class Burp(BUIbackend):
self.running[agent] = r
else:
r = {}
for a in self.servers:
r[a] = self.servers[a].is_one_backup_running(a)
output = multiprocessing.Queue()
def get_running(a, i, output):
output.put((i, self.servers[a].is_one_backup_running(a)))
processes = [
(
multiprocessing.Process(
target=get_running,
args=(a, i, output)
),
a
) for (i, a) in enumerate(self.servers)
]
[p.start() for (p, a) in processes]
[p.join() for (p, a) in processes]
results = [output.get() for (p, a) in processes]
results.sort()
for (i, (p, a)) in enumerate(processes):
# results contains a tuple (index, response) so we 'split' it
_, r[a] = results[i]
self.running = r
self.refresh = time.time()
return r
@ -162,24 +185,53 @@ class Burp(BUIbackend):
"""See :func:`burpui.misc.backend.interface.BUIbackend.schedule_restore`"""
return self.servers[agent].schedule_restore(name, backup, files, strip, force, prefix, restoreto)
def _get_multi_version(self, method=None):
if not method:
raise BUIserverException('Wrong method call')
r = {}
output = multiprocessing.Queue()
def get_client_vers(key, i, output):
output.put((i, self.servers[key].get_client_version()))
def get_server_vers(key, i, output):
output.put((i, self.servers[key].get_server_version()))
if method == 'get_client_version':
func = get_client_vers
else:
func = get_server_vers
processes = [
(
multiprocessing.Process(
target=func,
args=(k, i, output)
),
k
) for (i, (k, s)) in enumerate(iteritems(self.servers))
]
[p.start() for (p, k) in processes]
[p.join() for (p, k) in processes]
results = [output.get() for (p, k) in processes]
results.sort()
for (i, (p, k)) in enumerate(processes):
_, r[k] = results[i]
return r
def get_client_version(self, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.get_client_version`"""
if not agent:
r = {}
for (key, serv) in iteritems(self.servers):
v = serv.get_client_version() or None
r[key] = v
return r
return self._get_multi_version('get_client_version')
return self.servers[agent].get_client_version()
def get_server_version(self, agent=None):
"""See :func:`burpui.misc.backend.interface.BUIbackend.get_server_version`"""
if not agent:
r = {}
for (key, serv) in iteritems(self.servers):
v = serv.get_server_version() or None
r[key] = v
return r
return self._get_multi_version('get_server_version')
return self.servers[agent].get_server_version()