improve test coverage

This commit is contained in:
ziirish 2017-03-22 23:19:11 +01:00
parent bb33ab985e
commit 60e87919e4
5 changed files with 162 additions and 43 deletions

View file

@ -39,7 +39,7 @@ def api_login_required(func):
@wraps(func)
def decorated_view(*args, **kwargs):
"""decorator"""
if request.method in EXEMPT_METHODS:
if request.method in EXEMPT_METHODS: # pragma: no cover
return func(*args, **kwargs)
# 'func' is a Flask.view.MethodView so we have access to some special
# params
@ -88,7 +88,7 @@ class Api(ApiPlus):
self.logger.debug('Loading API module: {}'.format(mod))
try:
import_module(mod, __name__)
except:
except: # pragma: no cover
import traceback
self.logger.critical('Unable to load {}:\n{}'.format(mod, traceback.format_exc()))
else:
@ -108,7 +108,7 @@ class Api(ApiPlus):
def decorator(func):
@wraps(func)
def decorated(resource, *args, **kwargs):
if key not in kwargs:
if key not in kwargs: # pragma: no cover
resource.abort(500, "key '{}' not found".format(key))
if kwargs[key] != resource.username and not resource.is_admin:
resource.abort(code, message)

View file

@ -30,7 +30,7 @@ def parse_db_setting(string):
'(?P<host>[\w_.-]+):?(?P<port>\d+)?(?:/(?P<db>\w+))?',
string
)
if not parts:
if not parts: # pragma: no cover
raise ValueError('Unable to parse the db: "{}"'.format(string))
back = parts.group('backend') or ''
user = parts.group('user') or None
@ -52,7 +52,7 @@ def get_redis_server(myapp):
port = int(port)
except (ValueError, IndexError):
port = 6379
except ValueError:
except ValueError: # pragma: no cover
pass
return host, port, pwd
@ -72,7 +72,7 @@ def create_db(myapp, cli=False, unittest=False, create=True):
myapp.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
if not database_exists(myapp.config['SQLALCHEMY_DATABASE_URI']) and \
not cli and not unittest:
if create:
if create: # pragma: no cover
import subprocess
local = os.path.join(os.getcwd(), '..', 'bui-manage')
buimanage = local if os.path.exists(local) else 'bui-manage'
@ -98,7 +98,7 @@ def create_db(myapp, cli=False, unittest=False, create=True):
myapp.config['WITH_SQL'] = False
return None
return create_db(myapp, cli, unittest, False)
else:
else: # pragma: no cover
myapp.logger.error(
'Database not found, disabling SQL support'
)
@ -107,13 +107,13 @@ def create_db(myapp, cli=False, unittest=False, create=True):
back = parse_db_setting(myapp.config['SQLALCHEMY_DATABASE_URI'])[0]
if 'mysql' in back:
if 'mysql' in back: # pragma: no cover
# optimize SQL pools for MySQL driver
myapp.config['SQLALCHEMY_POOL_SIZE'] = 20
myapp.config['SQLALCHEMY_POOL_RECYCLE'] = 600
db.init_app(myapp)
if not cli and not unittest:
if not cli and not unittest: # pragma: no cover
with myapp.app_context():
try:
test_database()
@ -127,13 +127,13 @@ def create_db(myapp, cli=False, unittest=False, create=True):
myapp.config['WITH_SQL'] = False
return None
return db
except ImportError:
except ImportError: # pragma: no cover
myapp.logger.critical(
'Unable to load requirements, you may want to run \'pip '
'install burp-ui-sql\'.\nDisabling SQL support for now.'
)
myapp.config['WITH_SQL'] = False
except OperationalError as exp:
except OperationalError as exp: # pragma: no cover
myapp.logger.critical(
'unable to contact database: {}\nDisabling SQL '
'support.'.format(exp)
@ -149,7 +149,7 @@ def create_celery(myapp, warn=True):
:param myapp: Application context
:type myapp: :class:`burpui.server.BUIServer`
"""
if myapp.config['WITH_CELERY']:
if myapp.config['WITH_CELERY']: # pragma: no cover
from .ext.async import celery
from .exceptions import BUIserverException
host, oport, pwd = get_redis_server(myapp)
@ -200,7 +200,7 @@ def create_celery(myapp, warn=True):
return celery
if warn:
if warn: # pragma: no cover
message = 'Something went wrong while initializing celery worker.\n' \
'Maybe it is not enabled in your conf ' \
'({}).'.format(myapp.config['CFG'])
@ -321,7 +321,7 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs):
'reverse_proxy: {}'.format(reverse_proxy)
)
if not unittest:
if not unittest: # pragma: no cover
from ._compat import patch_json
patch_json()
@ -369,8 +369,10 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs):
)
# manage application secret key
if app.secret_key and (app.secret_key.lower() == 'none' or
(app.secret_key.lower() == 'random' and gunicorn)):
if app.secret_key and \
(app.secret_key.lower() == 'none' or
(app.secret_key.lower() == 'random' and \
gunicorn)): # pragma: no cover
logger.critical('Your setup is not secure! Please consider setting a'
' secret key in your configuration file')
app.secret_key = 'Burp-UI'
@ -398,7 +400,7 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs):
if app.session_db and \
str(app.session_db).lower() not \
in ['redis', 'default', 'true']:
try:
try: # pragma: no cover
(_, _, pwd, host, port, db) = \
parse_db_setting(app.session_db)
except ValueError as exp:
@ -420,7 +422,7 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs):
app.config['SESSION_PERMANENT'] = False
sess.init_app(app)
session_manager.backend = red
except Exception as exp:
except Exception as exp: # pragma: no cover
logger.warning('Unable to initialize session: {}'.format(str(exp)))
try:
# Cache setup
@ -431,7 +433,7 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs):
if app.cache_db and \
str(app.cache_db).lower() not \
in ['redis', 'default', 'true']:
try:
try: # pragma: no cover
(_, _, pwd, host, port, db) = \
parse_db_setting(app.cache_db)
except ValueError as exp:
@ -458,15 +460,15 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs):
# clear cache at startup in case we removed or added servers
with app.app_context():
cache.clear()
else:
else: # pragma: no cover
cache.init_app(app)
except Exception as exp:
except Exception as exp: # pragma: no cover
logger.warning('Unable to initialize cache: {}'.format(str(exp)))
cache.init_app(app)
try:
# Limiter setup
if not app.limiter or str(app.limiter).lower() not \
in ['none', 'false']:
in ['none', 'false']: # pragma: no cover
from .ext.limit import limiter
app.config['RATELIMIT_HEADERS_ENABLED'] = True
if app.limiter and str(app.limiter).lower() not \
@ -499,10 +501,10 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs):
)
limiter.init_app(app)
app.config['WITH_LIMIT'] = True
except ImportError:
except ImportError: # pragma: no cover
logger.warning('Unable to load limiter. Did you run \'pip install '
'flask-limiter\'?')
except Exception as exp:
except Exception as exp: # pragma: no cover
logger.warning('Unable to initialize limiter: {}'.format(str(exp)))
else:
cache.init_app(app)
@ -515,7 +517,7 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs):
try:
from .api.async import force_scheduling_now
force_scheduling_now()
except:
except: # pragma: no cover
pass
# Initialize i18n
@ -567,7 +569,7 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs):
def _check_session(user, request, api=False):
"""Check if the session is in the db"""
if user and not session_manager.session_in_db():
if user and not session_manager.session_in_db(): # pragma: no cover
login = getattr(user, 'name', None)
if login and not is_uuid(login):
remember = session.get('persistent', False)

View file

@ -2,7 +2,12 @@
universal = 1
[coverage:report]
omit = burpui/_compat.py
omit =
burpui/_compat.py
burpui/datastructures.py
burpui/misc/parser/openssl.py
burpui/api/async.py
show_missing = True
[metadata]
provides-extra =

92
tests/test7-5.cfg Normal file
View file

@ -0,0 +1,92 @@
# Burp-UI configuration file
# @version@ - 0.3.0
# @release@ - stable
[Global]
# On which port is the application listening
port = 5001
# On which address is the application listening
# '::' is the default for all IPv6
bind = ::
# enable SSL
ssl = false
# ssl cert
sslcert = /etc/burp/ssl_cert-server.pem
# ssl key
sslkey = /etc/burp/ssl_cert-server.key
# burp server version (currently only burp 1.x is implemented)
version = 1
# Handle multiple bui-servers or not
# If set to 'false', you will need to declare at least one 'Agent' section (see
# bellow)
standalone = true
# authentication plugin (mandatory)
# list the misc/auth directory to see the available backends
# to disable authentication you can set "auth: none"
auth = basic
# acl plugin
# list misc/auth directory to see the available backends
# default is no ACL
acl = basic
[UI]
# refresh interval of the pages in seconds
refresh = 15
[Production]
# storage backend for session and cache
# may be either 'default' or 'redis'
storage = redis
# session database to use
# may also be a backend url like: redis://localhost:6379/0
# if set to 'redis', the backend url defaults to:
# redis://<redis_host>:<redis_port>/0
# where <redis_host> is the host part, and <redis_port> is the port part of
# the below "redis" setting
session = redis
# cache database to use
# may also be a backend url like: redis://localhost:6379/0
# if set to 'redis', the backend url defaults to:
# redis://<redis_host>:<redis_port>/1
# where <redis_host> is the host part, and <redis_port> is the port part of
# the below "redis" setting
cache = redis
# redis server to connect to
redis = localhost
# whether to use celery or not
# may also be a broker url like: redis://localhost:6379/0
# if set to "true", the broker url defaults to:
# redis://<redis_host>:<redis_port>/2
# where <redis_host> is the host part, and <redis_port> is the port part of
# the above "redis" setting
celery = true
# database url to store some persistent data
# none or a connect string supported by SQLAlchemy:
# http://docs.sqlalchemy.org/en/latest/core/engines.html#database-urls
# example: sqlite:////var/lib/burpui/store.db
database = sqlite:////somewhere/you/dont/have/rights
# burp1 backend specific options
[Burp1]
# burp status address (can only be '127.0.0.1' or '::1'
#bhost = 127.0.0.1
# burp status port
bport = 9999
# burp binary
burpbin = /this file-should-not-exist
# vss_strip binary
stripbin = /this file-should-not-exist
# temporary dir for the on the fly restoration
#tmpdir = this-file-should-not-exist
# burp client configuration file used for the restoration (Default: None)
bconfcli = this-file-should-not-exist
# burp server configuration file used for the setting page
bconfsrv = this-file-should-not-exist
[BASIC]
admin = pbkdf2:sha1:1000$07Q0FeKW$eab0bc54b0d2e779081fe85c91ea84a50203d0bf
user1 = pbkdf2:sha1:1000$hWYnkYoh$ba7521104d262bb8cca3095c33ae1a3f19dbb3c7
[BASIC:ACL]
admin = ["fail]
user1 = '["client1", "client2"]'
user2 = {"agent1": ["client3"]

View file

@ -34,7 +34,7 @@ class BurpuiLiveTestCase(LiveServerTestCase):
def create_app(self):
conf = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../share/burpui/etc/burpui.sample.cfg')
bui = BUIinit(debug=12, gunicorn=False, unittest=True)
bui = BUIinit(debug=12, logfile='/dev/null', gunicorn=False, unittest=True)
bui.setup(conf, True)
bui.config['DEBUG'] = False
bui.config['TESTING'] = True
@ -105,7 +105,7 @@ class BurpuiAPITestCase(TestCase):
def create_app(self):
conf = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test2.cfg')
bui = BUIinit(gunicorn=False, unittest=True)
bui = BUIinit(logfile='/dev/null', gunicorn=False, unittest=True)
bui.setup(conf, True)
bui.config['TESTING'] = True
bui.config['LOGIN_DISABLED'] = True
@ -238,7 +238,7 @@ class BurpuiRoutesTestCase(TestCase):
def create_app(self):
with patch('socket.socket'):
conf = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test4.cfg')
bui = BUIinit(conf, gunicorn=False, unittest=True)
bui = BUIinit(conf, logfile='/dev/null', gunicorn=False, unittest=True)
bui.setup(conf, True)
bui.config['TESTING'] = True
bui.config['LOGIN_DISABLED'] = True
@ -275,7 +275,7 @@ class BurpuiLoginTestCase(TestCase):
def create_app(self):
conf = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../share/burpui/etc/burpui.sample.cfg')
bui = BUIinit(conf, False, None, gunicorn=False, unittest=True)
bui = BUIinit(conf, False, '/dev/null', gunicorn=False, unittest=True)
bui.config['TESTING'] = True
bui.config['LIVESERVER_PORT'] = 5001
bui.config['WTF_CSRF_ENABLED'] = False
@ -308,19 +308,19 @@ class BurpuiACLTestCase(TestCase):
def tearDown(self):
print ('\nTest 6 Finished!\n')
def login(self, username, password):
def login(self, username, password, headers=None):
return self.client.post(url_for('view.login'), data=dict(
username=username,
password=password,
language='en'
), follow_redirects=True)
), headers=headers, follow_redirects=True)
def logout(self):
return self.client.get(url_for('view.logout'), follow_redirects=True)
def create_app(self):
conf = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test6.cfg')
bui = BUIinit(conf, False, None, gunicorn=False, unittest=True)
bui = BUIinit(conf, False, '/dev/null', gunicorn=False, unittest=True)
bui.config['TESTING'] = True
bui.config['LIVESERVER_PORT'] = 5001
bui.config['WTF_CSRF_ENABLED'] = False
@ -348,6 +348,12 @@ class BurpuiACLTestCase(TestCase):
self.assertEqual(sorted(response.json, key=lambda k: k['name']), sorted([{u'id': u'admin', u'name': u'admin', u'backend': u'BASIC'}, {u'id': u'user1', u'name': u'user1', u'backend': u'BASIC'}], key=lambda k: k['name']))
self.assertEqual(sorted(response2.json, key=lambda k: k['name']), sorted([{u'add': True, u'del': True, u'name': u'BASIC', u'mod': True}], key=lambda k: k['name']))
def test_change_password(self):
with self.client:
rv = self.login('user1', 'password')
response = self.client.post(url_for('api.auth_users', name='user1'), data={'backend': 'BASIC', 'old_password': 'plop', 'password': 'toto'}, headers={'X-Language': 'en'})
self.assert_status(response, 200)
def test_config_render_ko(self):
with self.client:
rv = self.login('user1', 'password')
@ -362,6 +368,16 @@ class BurpuiACLTestCase(TestCase):
self.assert403(response)
self.logout()
def test_api_403(self):
with self.client:
response = self.client.get(url_for('api.client_settings', client='toto'), headers={'X-From-UI': True})
self.assert403(response)
def test_api_401(self):
with self.client:
response = self.client.get(url_for('api.client_settings', client='toto'))
self.assert401(response)
class BurpuiTestInit(TestCase):
@ -373,13 +389,17 @@ class BurpuiTestInit(TestCase):
os.unlink(self.tmpFile)
def create_app(self):
conf1 = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test7-1.cfg')
conf2 = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test7-2.cfg')
conf4 = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test7-4.cfg')
BUIinit(conf1, False, None, gunicorn=False, unittest=True)
BUIinit(conf2, False, None, gunicorn=False, unittest=True)
BUIinit(conf4, False, None, gunicorn=False, unittest=True)
bui = BUIinit(None, False, None, gunicorn=False, unittest=True)
kwargs = {'verbose': 0, 'logfile': '/dev/null', 'gunicorn': False, 'unittest': True}
root = os.path.dirname(os.path.realpath(__file__))
conf1 = os.path.join(root, 'test7-1.cfg')
conf2 = os.path.join(root, 'test7-2.cfg')
conf4 = os.path.join(root, 'test7-4.cfg')
conf5 = os.path.join(root, 'test7-5.cfg')
BUIinit(conf1, **kwargs)
BUIinit(conf2, **kwargs)
BUIinit(conf4, **kwargs)
BUIinit(conf5, **kwargs)
bui = BUIinit(None, **kwargs)
bui.config['TESTING'] = True
bui.config['LIVESERVER_PORT'] = 5001
bui.config['WTF_CSRF_ENABLED'] = False
@ -391,7 +411,7 @@ class BurpuiTestInit(TestCase):
self.assertRaises(IOError, BUIinit, 'thisfileisnotlikelytoexist', True, self.tmpFile, gunicorn=False, unittest=True)
self.assertRaises(IOError, BUIinit, 'thisfileisnotlikelytoexist', False, self.tmpFile, gunicorn=False, unittest=True)
conf3 = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test7-3.cfg')
self.assertRaises(ImportError, BUIinit, conf3, False, None, gunicorn=False, unittest=True)
self.assertRaises(ImportError, BUIinit, conf3, 12, '/dev/null', gunicorn=False, unittest=True)
class BurpuiRedisTestCase(TestCase):
@ -417,7 +437,7 @@ class BurpuiRedisTestCase(TestCase):
@patch('redis.Redis', mock_redis_client)
def create_app(self):
conf = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test8.cfg')
bui = BUIinit(conf, False, None, gunicorn=False, unittest=True)
bui = BUIinit(conf, False, '/dev/null', gunicorn=False, unittest=True)
bui.config['TESTING'] = True
bui.config['LIVESERVER_PORT'] = 5001
bui.config['WTF_CSRF_ENABLED'] = False