From 60e87919e483fb052a50fb61dd8f314c7036a3fb Mon Sep 17 00:00:00 2001 From: ziirish Date: Wed, 22 Mar 2017 23:19:11 +0100 Subject: [PATCH] improve test coverage --- burpui/api/__init__.py | 6 +-- burpui/app.py | 48 +++++++++++----------- setup.cfg | 7 +++- tests/test7-5.cfg | 92 ++++++++++++++++++++++++++++++++++++++++++ tests/test_burpui.py | 52 ++++++++++++++++-------- 5 files changed, 162 insertions(+), 43 deletions(-) create mode 100644 tests/test7-5.cfg diff --git a/burpui/api/__init__.py b/burpui/api/__init__.py index 155b1b49..0afd4f9f 100644 --- a/burpui/api/__init__.py +++ b/burpui/api/__init__.py @@ -39,7 +39,7 @@ def api_login_required(func): @wraps(func) def decorated_view(*args, **kwargs): """decorator""" - if request.method in EXEMPT_METHODS: + if request.method in EXEMPT_METHODS: # pragma: no cover return func(*args, **kwargs) # 'func' is a Flask.view.MethodView so we have access to some special # params @@ -88,7 +88,7 @@ class Api(ApiPlus): self.logger.debug('Loading API module: {}'.format(mod)) try: import_module(mod, __name__) - except: + except: # pragma: no cover import traceback self.logger.critical('Unable to load {}:\n{}'.format(mod, traceback.format_exc())) else: @@ -108,7 +108,7 @@ class Api(ApiPlus): def decorator(func): @wraps(func) def decorated(resource, *args, **kwargs): - if key not in kwargs: + if key not in kwargs: # pragma: no cover resource.abort(500, "key '{}' not found".format(key)) if kwargs[key] != resource.username and not resource.is_admin: resource.abort(code, message) diff --git a/burpui/app.py b/burpui/app.py index a7476577..01c492ad 100644 --- a/burpui/app.py +++ b/burpui/app.py @@ -30,7 +30,7 @@ def parse_db_setting(string): '(?P[\w_.-]+):?(?P\d+)?(?:/(?P\w+))?', string ) - if not parts: + if not parts: # pragma: no cover raise ValueError('Unable to parse the db: "{}"'.format(string)) back = parts.group('backend') or '' user = parts.group('user') or None @@ -52,7 +52,7 @@ def get_redis_server(myapp): port = int(port) except (ValueError, IndexError): port = 6379 - except ValueError: + except ValueError: # pragma: no cover pass return host, port, pwd @@ -72,7 +72,7 @@ def create_db(myapp, cli=False, unittest=False, create=True): myapp.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False if not database_exists(myapp.config['SQLALCHEMY_DATABASE_URI']) and \ not cli and not unittest: - if create: + if create: # pragma: no cover import subprocess local = os.path.join(os.getcwd(), '..', 'bui-manage') buimanage = local if os.path.exists(local) else 'bui-manage' @@ -98,7 +98,7 @@ def create_db(myapp, cli=False, unittest=False, create=True): myapp.config['WITH_SQL'] = False return None return create_db(myapp, cli, unittest, False) - else: + else: # pragma: no cover myapp.logger.error( 'Database not found, disabling SQL support' ) @@ -107,13 +107,13 @@ def create_db(myapp, cli=False, unittest=False, create=True): back = parse_db_setting(myapp.config['SQLALCHEMY_DATABASE_URI'])[0] - if 'mysql' in back: + if 'mysql' in back: # pragma: no cover # optimize SQL pools for MySQL driver myapp.config['SQLALCHEMY_POOL_SIZE'] = 20 myapp.config['SQLALCHEMY_POOL_RECYCLE'] = 600 db.init_app(myapp) - if not cli and not unittest: + if not cli and not unittest: # pragma: no cover with myapp.app_context(): try: test_database() @@ -127,13 +127,13 @@ def create_db(myapp, cli=False, unittest=False, create=True): myapp.config['WITH_SQL'] = False return None return db - except ImportError: + except ImportError: # pragma: no cover myapp.logger.critical( 'Unable to load requirements, you may want to run \'pip ' 'install burp-ui-sql\'.\nDisabling SQL support for now.' ) myapp.config['WITH_SQL'] = False - except OperationalError as exp: + except OperationalError as exp: # pragma: no cover myapp.logger.critical( 'unable to contact database: {}\nDisabling SQL ' 'support.'.format(exp) @@ -149,7 +149,7 @@ def create_celery(myapp, warn=True): :param myapp: Application context :type myapp: :class:`burpui.server.BUIServer` """ - if myapp.config['WITH_CELERY']: + if myapp.config['WITH_CELERY']: # pragma: no cover from .ext.async import celery from .exceptions import BUIserverException host, oport, pwd = get_redis_server(myapp) @@ -200,7 +200,7 @@ def create_celery(myapp, warn=True): return celery - if warn: + if warn: # pragma: no cover message = 'Something went wrong while initializing celery worker.\n' \ 'Maybe it is not enabled in your conf ' \ '({}).'.format(myapp.config['CFG']) @@ -321,7 +321,7 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs): 'reverse_proxy: {}'.format(reverse_proxy) ) - if not unittest: + if not unittest: # pragma: no cover from ._compat import patch_json patch_json() @@ -369,8 +369,10 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs): ) # manage application secret key - if app.secret_key and (app.secret_key.lower() == 'none' or - (app.secret_key.lower() == 'random' and gunicorn)): + if app.secret_key and \ + (app.secret_key.lower() == 'none' or + (app.secret_key.lower() == 'random' and \ + gunicorn)): # pragma: no cover logger.critical('Your setup is not secure! Please consider setting a' ' secret key in your configuration file') app.secret_key = 'Burp-UI' @@ -398,7 +400,7 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs): if app.session_db and \ str(app.session_db).lower() not \ in ['redis', 'default', 'true']: - try: + try: # pragma: no cover (_, _, pwd, host, port, db) = \ parse_db_setting(app.session_db) except ValueError as exp: @@ -420,7 +422,7 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs): app.config['SESSION_PERMANENT'] = False sess.init_app(app) session_manager.backend = red - except Exception as exp: + except Exception as exp: # pragma: no cover logger.warning('Unable to initialize session: {}'.format(str(exp))) try: # Cache setup @@ -431,7 +433,7 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs): if app.cache_db and \ str(app.cache_db).lower() not \ in ['redis', 'default', 'true']: - try: + try: # pragma: no cover (_, _, pwd, host, port, db) = \ parse_db_setting(app.cache_db) except ValueError as exp: @@ -458,15 +460,15 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs): # clear cache at startup in case we removed or added servers with app.app_context(): cache.clear() - else: + else: # pragma: no cover cache.init_app(app) - except Exception as exp: + except Exception as exp: # pragma: no cover logger.warning('Unable to initialize cache: {}'.format(str(exp))) cache.init_app(app) try: # Limiter setup if not app.limiter or str(app.limiter).lower() not \ - in ['none', 'false']: + in ['none', 'false']: # pragma: no cover from .ext.limit import limiter app.config['RATELIMIT_HEADERS_ENABLED'] = True if app.limiter and str(app.limiter).lower() not \ @@ -499,10 +501,10 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs): ) limiter.init_app(app) app.config['WITH_LIMIT'] = True - except ImportError: + except ImportError: # pragma: no cover logger.warning('Unable to load limiter. Did you run \'pip install ' 'flask-limiter\'?') - except Exception as exp: + except Exception as exp: # pragma: no cover logger.warning('Unable to initialize limiter: {}'.format(str(exp))) else: cache.init_app(app) @@ -515,7 +517,7 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs): try: from .api.async import force_scheduling_now force_scheduling_now() - except: + except: # pragma: no cover pass # Initialize i18n @@ -567,7 +569,7 @@ def create_app(conf=None, verbose=0, logfile=None, **kwargs): def _check_session(user, request, api=False): """Check if the session is in the db""" - if user and not session_manager.session_in_db(): + if user and not session_manager.session_in_db(): # pragma: no cover login = getattr(user, 'name', None) if login and not is_uuid(login): remember = session.get('persistent', False) diff --git a/setup.cfg b/setup.cfg index 45a84d8c..0a3566c2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,7 +2,12 @@ universal = 1 [coverage:report] -omit = burpui/_compat.py +omit = + burpui/_compat.py + burpui/datastructures.py + burpui/misc/parser/openssl.py + burpui/api/async.py +show_missing = True [metadata] provides-extra = diff --git a/tests/test7-5.cfg b/tests/test7-5.cfg new file mode 100644 index 00000000..1b350744 --- /dev/null +++ b/tests/test7-5.cfg @@ -0,0 +1,92 @@ +# Burp-UI configuration file +# @version@ - 0.3.0 +# @release@ - stable +[Global] +# On which port is the application listening +port = 5001 +# On which address is the application listening +# '::' is the default for all IPv6 +bind = :: +# enable SSL +ssl = false +# ssl cert +sslcert = /etc/burp/ssl_cert-server.pem +# ssl key +sslkey = /etc/burp/ssl_cert-server.key +# burp server version (currently only burp 1.x is implemented) +version = 1 +# Handle multiple bui-servers or not +# If set to 'false', you will need to declare at least one 'Agent' section (see +# bellow) +standalone = true +# authentication plugin (mandatory) +# list the misc/auth directory to see the available backends +# to disable authentication you can set "auth: none" +auth = basic +# acl plugin +# list misc/auth directory to see the available backends +# default is no ACL +acl = basic + +[UI] +# refresh interval of the pages in seconds +refresh = 15 + +[Production] +# storage backend for session and cache +# may be either 'default' or 'redis' +storage = redis +# session database to use +# may also be a backend url like: redis://localhost:6379/0 +# if set to 'redis', the backend url defaults to: +# redis://:/0 +# where is the host part, and is the port part of +# the below "redis" setting +session = redis +# cache database to use +# may also be a backend url like: redis://localhost:6379/0 +# if set to 'redis', the backend url defaults to: +# redis://:/1 +# where is the host part, and is the port part of +# the below "redis" setting +cache = redis +# redis server to connect to +redis = localhost +# whether to use celery or not +# may also be a broker url like: redis://localhost:6379/0 +# if set to "true", the broker url defaults to: +# redis://:/2 +# where is the host part, and is the port part of +# the above "redis" setting +celery = true +# database url to store some persistent data +# none or a connect string supported by SQLAlchemy: +# http://docs.sqlalchemy.org/en/latest/core/engines.html#database-urls +# example: sqlite:////var/lib/burpui/store.db +database = sqlite:////somewhere/you/dont/have/rights + +# burp1 backend specific options +[Burp1] +# burp status address (can only be '127.0.0.1' or '::1' +#bhost = 127.0.0.1 +# burp status port +bport = 9999 +# burp binary +burpbin = /this file-should-not-exist +# vss_strip binary +stripbin = /this file-should-not-exist +# temporary dir for the on the fly restoration +#tmpdir = this-file-should-not-exist +# burp client configuration file used for the restoration (Default: None) +bconfcli = this-file-should-not-exist +# burp server configuration file used for the setting page +bconfsrv = this-file-should-not-exist + +[BASIC] +admin = pbkdf2:sha1:1000$07Q0FeKW$eab0bc54b0d2e779081fe85c91ea84a50203d0bf +user1 = pbkdf2:sha1:1000$hWYnkYoh$ba7521104d262bb8cca3095c33ae1a3f19dbb3c7 + +[BASIC:ACL] +admin = ["fail] +user1 = '["client1", "client2"]' +user2 = {"agent1": ["client3"] diff --git a/tests/test_burpui.py b/tests/test_burpui.py index 09554dc6..171d9b26 100755 --- a/tests/test_burpui.py +++ b/tests/test_burpui.py @@ -34,7 +34,7 @@ class BurpuiLiveTestCase(LiveServerTestCase): def create_app(self): conf = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../share/burpui/etc/burpui.sample.cfg') - bui = BUIinit(debug=12, gunicorn=False, unittest=True) + bui = BUIinit(debug=12, logfile='/dev/null', gunicorn=False, unittest=True) bui.setup(conf, True) bui.config['DEBUG'] = False bui.config['TESTING'] = True @@ -105,7 +105,7 @@ class BurpuiAPITestCase(TestCase): def create_app(self): conf = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test2.cfg') - bui = BUIinit(gunicorn=False, unittest=True) + bui = BUIinit(logfile='/dev/null', gunicorn=False, unittest=True) bui.setup(conf, True) bui.config['TESTING'] = True bui.config['LOGIN_DISABLED'] = True @@ -238,7 +238,7 @@ class BurpuiRoutesTestCase(TestCase): def create_app(self): with patch('socket.socket'): conf = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test4.cfg') - bui = BUIinit(conf, gunicorn=False, unittest=True) + bui = BUIinit(conf, logfile='/dev/null', gunicorn=False, unittest=True) bui.setup(conf, True) bui.config['TESTING'] = True bui.config['LOGIN_DISABLED'] = True @@ -275,7 +275,7 @@ class BurpuiLoginTestCase(TestCase): def create_app(self): conf = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../share/burpui/etc/burpui.sample.cfg') - bui = BUIinit(conf, False, None, gunicorn=False, unittest=True) + bui = BUIinit(conf, False, '/dev/null', gunicorn=False, unittest=True) bui.config['TESTING'] = True bui.config['LIVESERVER_PORT'] = 5001 bui.config['WTF_CSRF_ENABLED'] = False @@ -308,19 +308,19 @@ class BurpuiACLTestCase(TestCase): def tearDown(self): print ('\nTest 6 Finished!\n') - def login(self, username, password): + def login(self, username, password, headers=None): return self.client.post(url_for('view.login'), data=dict( username=username, password=password, language='en' - ), follow_redirects=True) + ), headers=headers, follow_redirects=True) def logout(self): return self.client.get(url_for('view.logout'), follow_redirects=True) def create_app(self): conf = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test6.cfg') - bui = BUIinit(conf, False, None, gunicorn=False, unittest=True) + bui = BUIinit(conf, False, '/dev/null', gunicorn=False, unittest=True) bui.config['TESTING'] = True bui.config['LIVESERVER_PORT'] = 5001 bui.config['WTF_CSRF_ENABLED'] = False @@ -348,6 +348,12 @@ class BurpuiACLTestCase(TestCase): self.assertEqual(sorted(response.json, key=lambda k: k['name']), sorted([{u'id': u'admin', u'name': u'admin', u'backend': u'BASIC'}, {u'id': u'user1', u'name': u'user1', u'backend': u'BASIC'}], key=lambda k: k['name'])) self.assertEqual(sorted(response2.json, key=lambda k: k['name']), sorted([{u'add': True, u'del': True, u'name': u'BASIC', u'mod': True}], key=lambda k: k['name'])) + def test_change_password(self): + with self.client: + rv = self.login('user1', 'password') + response = self.client.post(url_for('api.auth_users', name='user1'), data={'backend': 'BASIC', 'old_password': 'plop', 'password': 'toto'}, headers={'X-Language': 'en'}) + self.assert_status(response, 200) + def test_config_render_ko(self): with self.client: rv = self.login('user1', 'password') @@ -362,6 +368,16 @@ class BurpuiACLTestCase(TestCase): self.assert403(response) self.logout() + def test_api_403(self): + with self.client: + response = self.client.get(url_for('api.client_settings', client='toto'), headers={'X-From-UI': True}) + self.assert403(response) + + def test_api_401(self): + with self.client: + response = self.client.get(url_for('api.client_settings', client='toto')) + self.assert401(response) + class BurpuiTestInit(TestCase): @@ -373,13 +389,17 @@ class BurpuiTestInit(TestCase): os.unlink(self.tmpFile) def create_app(self): - conf1 = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test7-1.cfg') - conf2 = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test7-2.cfg') - conf4 = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test7-4.cfg') - BUIinit(conf1, False, None, gunicorn=False, unittest=True) - BUIinit(conf2, False, None, gunicorn=False, unittest=True) - BUIinit(conf4, False, None, gunicorn=False, unittest=True) - bui = BUIinit(None, False, None, gunicorn=False, unittest=True) + kwargs = {'verbose': 0, 'logfile': '/dev/null', 'gunicorn': False, 'unittest': True} + root = os.path.dirname(os.path.realpath(__file__)) + conf1 = os.path.join(root, 'test7-1.cfg') + conf2 = os.path.join(root, 'test7-2.cfg') + conf4 = os.path.join(root, 'test7-4.cfg') + conf5 = os.path.join(root, 'test7-5.cfg') + BUIinit(conf1, **kwargs) + BUIinit(conf2, **kwargs) + BUIinit(conf4, **kwargs) + BUIinit(conf5, **kwargs) + bui = BUIinit(None, **kwargs) bui.config['TESTING'] = True bui.config['LIVESERVER_PORT'] = 5001 bui.config['WTF_CSRF_ENABLED'] = False @@ -391,7 +411,7 @@ class BurpuiTestInit(TestCase): self.assertRaises(IOError, BUIinit, 'thisfileisnotlikelytoexist', True, self.tmpFile, gunicorn=False, unittest=True) self.assertRaises(IOError, BUIinit, 'thisfileisnotlikelytoexist', False, self.tmpFile, gunicorn=False, unittest=True) conf3 = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test7-3.cfg') - self.assertRaises(ImportError, BUIinit, conf3, False, None, gunicorn=False, unittest=True) + self.assertRaises(ImportError, BUIinit, conf3, 12, '/dev/null', gunicorn=False, unittest=True) class BurpuiRedisTestCase(TestCase): @@ -417,7 +437,7 @@ class BurpuiRedisTestCase(TestCase): @patch('redis.Redis', mock_redis_client) def create_app(self): conf = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test8.cfg') - bui = BUIinit(conf, False, None, gunicorn=False, unittest=True) + bui = BUIinit(conf, False, '/dev/null', gunicorn=False, unittest=True) bui.config['TESTING'] = True bui.config['LIVESERVER_PORT'] = 5001 bui.config['WTF_CSRF_ENABLED'] = False