mirror of
https://github.com/ziirish/burp-ui.git
synced 2026-05-15 06:05:58 -06:00
466 lines
14 KiB
Python
466 lines
14 KiB
Python
# -*- coding: utf8 -*-
|
|
"""
|
|
Burp-UI is a web-ui for burp backup written in python with Flask and
|
|
jQuery/Bootstrap
|
|
|
|
.. module:: burpui
|
|
:platform: Unix
|
|
:synopsis: Burp-UI main module.
|
|
|
|
.. moduleauthor:: Ziirish <hi+burpui@ziirish.me>
|
|
"""
|
|
import os
|
|
import re
|
|
import sys
|
|
import logging
|
|
import warnings
|
|
from logging import Formatter
|
|
|
|
if sys.version_info < (3, 0):
|
|
reload(sys)
|
|
sys.setdefaultencoding('utf-8')
|
|
else:
|
|
basestring = str
|
|
|
|
__title__ = 'burp-ui'
|
|
__author__ = 'Benjamin SANS (Ziirish)'
|
|
__author_email__ = 'hi+burpui@ziirish.me'
|
|
__url__ = 'https://git.ziirish.me/ziirish/burp-ui'
|
|
__doc__ = 'https://burp-ui.readthedocs.io/en/latest/'
|
|
__description__ = ('Burp-UI is a web-ui for burp backup written in python with '
|
|
'Flask and jQuery/Bootstrap')
|
|
__license__ = 'BSD 3-clause'
|
|
__version__ = open(
|
|
os.path.join(os.path.dirname(os.path.realpath(__file__)), 'VERSION')
|
|
).read().rstrip()
|
|
try: # pragma: no cover
|
|
__release__ = open(
|
|
os.path.join(os.path.dirname(os.path.realpath(__file__)), 'RELEASE')
|
|
).read().rstrip()
|
|
except: # pragma: no cover
|
|
__release__ = 'unknown'
|
|
|
|
warnings.simplefilter('always', RuntimeWarning)
|
|
|
|
|
|
def parse_db_setting(string):
|
|
parts = re.search(
|
|
'(?:(?P<backend>\w+)(?:\+(?P<driver>\w+))?://)?'
|
|
'(?:(?P<user>\w+)(?::?(?P<pass>.+))?@)?'
|
|
'(?P<host>[\w_.-]+):?(?P<port>\d+)?(?:/(?P<db>\w+))?',
|
|
string
|
|
)
|
|
if not parts:
|
|
raise ValueError('Unable to parse the db: "{}"'.format(string))
|
|
back = parts.group('backend') or ''
|
|
user = parts.group('user') or None
|
|
pwd = parts.group('pass') or None
|
|
host = parts.group('host') or ''
|
|
port = parts.group('port') or ''
|
|
db = parts.group('db') or ''
|
|
return (back, user, pwd, host, port, db)
|
|
|
|
|
|
def get_redis_server(myapp):
|
|
host = 'localhost'
|
|
port = 6379
|
|
if myapp.redis and myapp.redis.lower() != 'none':
|
|
try:
|
|
back, user, pwd, host, port, db = parse_db_setting(myapp.redis)
|
|
host = host or 'localhost'
|
|
try:
|
|
port = int(port)
|
|
except (ValueError, IndexError):
|
|
port = 6379
|
|
except ValueError:
|
|
pass
|
|
return host, port, pwd
|
|
|
|
|
|
def create_db(myapp):
|
|
"""Create the SQLAlchemy instance if possible
|
|
|
|
:param myapp: Application context
|
|
:type myapp: :class:`burpui.server.BUIServer`
|
|
"""
|
|
if myapp.config['WITH_SQL']:
|
|
from .ext.sql import db
|
|
myapp.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
|
db.init_app(myapp)
|
|
return db
|
|
|
|
return None
|
|
|
|
|
|
def create_celery(myapp, warn=True):
|
|
"""Create the Celery app if possible
|
|
|
|
:param myapp: Application context
|
|
:type myapp: :class:`burpui.server.BUIServer`
|
|
"""
|
|
if myapp.config['WITH_CELERY']:
|
|
from .ext.async import celery
|
|
host, oport, pwd = get_redis_server(myapp)
|
|
odb = 2
|
|
if isinstance(myapp.use_celery, basestring):
|
|
try:
|
|
(_, _, pwd, host, port, db) = parse_db_setting(myapp.use_celery)
|
|
if not port:
|
|
port = oport
|
|
if not db:
|
|
db = odb
|
|
else:
|
|
try:
|
|
db = int(db)
|
|
except ValueError:
|
|
db = odb
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
db = odb
|
|
port = oport
|
|
if pwd:
|
|
redis_url = 'redis://:{}@{}:{}/{}'.format(pwd, host, port, db)
|
|
else:
|
|
redis_url = 'redis://{}:{}/{}'.format(host, port, db)
|
|
myapp.config['CELERY_BROKER_URL'] = myapp.config['BROKER_URL'] = \
|
|
redis_url
|
|
myapp.config['CELERY_RESULT_BACKEND'] = redis_url
|
|
celery.conf.update(myapp.config)
|
|
|
|
if not hasattr(celery, 'flask_app'):
|
|
celery.flask_app = myapp
|
|
|
|
TaskBase = celery.Task
|
|
|
|
class ContextTask(TaskBase):
|
|
abstract = True
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
with myapp.app_context():
|
|
return TaskBase.__call__(self, *args, **kwargs)
|
|
|
|
celery.Task = ContextTask
|
|
|
|
return celery
|
|
|
|
if warn:
|
|
message = 'Something went wrong while initializing celery worker.\n' \
|
|
'Maybe it is not enabled in your conf ' \
|
|
'({}).'.format(myapp.config['CFG'])
|
|
warnings.warn(
|
|
message,
|
|
RuntimeWarning
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
def init(conf=None, verbose=0, logfile=None, gunicorn=True, unittest=False, debug=False):
|
|
"""Initialize the whole application.
|
|
|
|
:param conf: Configuration file to use
|
|
:type conf: str
|
|
|
|
:param verbose: Set the verbosity level
|
|
:type verbose: int
|
|
|
|
:param logfile: Store the logs in the given file
|
|
:type logfile: str
|
|
|
|
:param gunicorn: Enable gunicorn engine instead of flask's default
|
|
:type gunicorn: bool
|
|
|
|
:param unittest: Are we running tests (used for test only)
|
|
:type unittest: bool
|
|
|
|
:param debug: Enable debug mode
|
|
:type debug: bool
|
|
|
|
:returns: A :class:`burpui.server.BUIServer` object
|
|
"""
|
|
from flask import g
|
|
from flask_login import LoginManager
|
|
from flask_bower import Bower
|
|
from .utils import basic_login_from_request, ReverseProxied, lookup_file
|
|
from .server import BUIServer as BurpUI
|
|
from .routes import view
|
|
from .api import api, apibp
|
|
from .ext.cache import cache
|
|
from .ext.i18n import babel, get_locale
|
|
|
|
logger = logging.getLogger('burp-ui')
|
|
|
|
# The debug argument used to be a boolean so we keep supporting this format
|
|
if isinstance(verbose, bool):
|
|
if verbose:
|
|
verbose = logging.DEBUG
|
|
else:
|
|
verbose = logging.CRITICAL
|
|
else:
|
|
levels = [
|
|
logging.CRITICAL,
|
|
logging.ERROR,
|
|
logging.WARNING,
|
|
logging.INFO,
|
|
logging.DEBUG
|
|
]
|
|
if verbose >= len(levels):
|
|
verbose = len(levels) - 1
|
|
if not verbose:
|
|
verbose = 0
|
|
verbose = levels[verbose]
|
|
|
|
if logfile:
|
|
from logging.handlers import RotatingFileHandler
|
|
handler = RotatingFileHandler(
|
|
logfile,
|
|
maxBytes=1024 * 1024 * 100,
|
|
backupCount=5
|
|
)
|
|
else:
|
|
from logging import StreamHandler
|
|
handler = StreamHandler()
|
|
|
|
if verbose > logging.DEBUG:
|
|
LOG_FORMAT = (
|
|
'[%(asctime)s] %(levelname)s in '
|
|
'%(module)s.%(funcName)s: %(message)s'
|
|
)
|
|
else:
|
|
LOG_FORMAT = (
|
|
'-' * 80 + '\n' +
|
|
'%(levelname)s in %(module)s.%(funcName)s ' +
|
|
'[%(pathname)s:%(lineno)d]:\n' +
|
|
'%(message)s\n' +
|
|
'-' * 80
|
|
)
|
|
|
|
handler.setLevel(verbose)
|
|
handler.setFormatter(Formatter(LOG_FORMAT))
|
|
|
|
logger.setLevel(verbose)
|
|
|
|
logger.addHandler(handler)
|
|
|
|
logger.debug(
|
|
'conf: {}\n'.format(conf) +
|
|
'verbose: {}\n'.format(logging.getLevelName(verbose)) +
|
|
'logfile: {}\n'.format(logfile) +
|
|
'gunicorn: {}\n'.format(gunicorn) +
|
|
'debug: {}\n'.format(debug) +
|
|
'unittest: {}'.format(unittest)
|
|
)
|
|
|
|
if not unittest:
|
|
from ._compat import patch_json
|
|
patch_json()
|
|
|
|
# We initialize the core
|
|
app = BurpUI()
|
|
if verbose:
|
|
app.enable_logger()
|
|
app.gunicorn = gunicorn
|
|
|
|
app.config['CFG'] = None
|
|
|
|
# Some config
|
|
|
|
# FIXME: strange behavior when bundling errors
|
|
# app.config['BUNDLE_ERRORS'] = True
|
|
|
|
app.config['REMEMBER_COOKIE_HTTPONLY'] = True
|
|
|
|
if debug and not gunicorn: # pragma: no cover
|
|
app.config['DEBUG'] = True and not unittest
|
|
app.config['TESTING'] = True and not unittest
|
|
|
|
# Still need to test conf file here because the init function can be called
|
|
# by gunicorn directly
|
|
if conf:
|
|
app.config['CFG'] = lookup_file(conf, guess=False)
|
|
else:
|
|
app.config['CFG'] = lookup_file()
|
|
|
|
logger.info('Using configuration: {}'.format(app.config['CFG']))
|
|
|
|
app.setup(app.config['CFG'])
|
|
|
|
if debug:
|
|
app.config.setdefault('TEMPLATES_AUTO_RELOAD', True)
|
|
app.config['TEMPLATES_AUTO_RELOAD'] = True
|
|
|
|
app.jinja_env.globals.update(
|
|
isinstance=isinstance,
|
|
list=list,
|
|
version_id='{}-{}'.format(__version__, __release__),
|
|
config=app.config
|
|
)
|
|
|
|
# manage application secret key
|
|
if app.secret_key and (app.secret_key.lower() == 'none' or
|
|
(app.secret_key.lower() == 'random' and gunicorn)):
|
|
logger.warning('Your setup is not secure! Please consider setting a'
|
|
' secret key in your configuration file')
|
|
app.secret_key = 'Burp-UI'
|
|
if not app.secret_key or app.secret_key.lower() == 'random':
|
|
from base64 import b64encode
|
|
app.secret_key = b64encode(os.urandom(256))
|
|
|
|
app.wsgi_app = ReverseProxied(app.wsgi_app, app)
|
|
|
|
# Manage gunicorn special tricks & improvements
|
|
if gunicorn: # pragma: no cover
|
|
logger.info('Using gunicorn')
|
|
from werkzeug.contrib.fixers import ProxyFix
|
|
|
|
app.wsgi_app = ProxyFix(app.wsgi_app)
|
|
|
|
if app.storage and app.storage.lower() != 'default':
|
|
try:
|
|
# Session setup
|
|
if not app.session_db or app.session_db.lower() != 'none':
|
|
from redis import Redis
|
|
from flask_session import Session
|
|
host, port, pwd = get_redis_server(app)
|
|
db = 0
|
|
if app.session_db and app.session_db.lower() != 'default':
|
|
try:
|
|
(_, _, pwd, host, port, db) = \
|
|
parse_db_setting(app.session_db)
|
|
except ValueError as exp:
|
|
logger.warning(str(exp))
|
|
try:
|
|
db = int(db)
|
|
except ValueError:
|
|
db = 0
|
|
logger.debug('Using redis://guest:****@{}:{}/{}'.format(
|
|
host,
|
|
port,
|
|
db)
|
|
)
|
|
red = Redis(host=host, port=port, db=db, password=pwd)
|
|
app.config['SESSION_TYPE'] = 'redis'
|
|
app.config['SESSION_REDIS'] = red
|
|
app.config['SESSION_USE_SIGNER'] = app.secret_key is not None
|
|
app.config['SESSION_PERMANENT'] = False
|
|
ses = Session()
|
|
ses.init_app(app)
|
|
# Cache setup
|
|
if not app.cache_db or app.cache_db.lower() != 'none':
|
|
host, port, pwd = get_redis_server(app)
|
|
db = 1
|
|
if app.cache_db and app.cache_db.lower() != 'default':
|
|
try:
|
|
(_, _, pwd, host, port, db) = \
|
|
parse_db_setting(app.cache_db)
|
|
except ValueError as exp:
|
|
logger.warning(str(exp))
|
|
try:
|
|
db = int(db)
|
|
except ValueError:
|
|
db = 1
|
|
logger.debug('Using redis://guest:****@{}:{}/{}'.format(
|
|
host,
|
|
port,
|
|
db)
|
|
)
|
|
cache.init_app(
|
|
app,
|
|
config={
|
|
'CACHE_TYPE': 'redis',
|
|
'CACHE_REDIS_HOST': host,
|
|
'CACHE_REDIS_PORT': port,
|
|
'CACHE_REDIS_PASSWORD': pwd,
|
|
'CACHE_REDIS_DB': db
|
|
}
|
|
)
|
|
# clear cache at startup in case we removed or added servers
|
|
with app.app_context():
|
|
cache.clear()
|
|
else:
|
|
cache.init_app(app)
|
|
except Exception as e:
|
|
logger.warning('Unable to initialize redis: {}'.format(str(e)))
|
|
cache.init_app(app)
|
|
else:
|
|
cache.init_app(app)
|
|
|
|
# Create celery app if enabled
|
|
create_celery(app, warn=False)
|
|
if app.config['WITH_CELERY']:
|
|
# may fail in case redis is not running (this can happen while running
|
|
# the bui-manage script)
|
|
try:
|
|
from .api.async import force_scheduling_now
|
|
force_scheduling_now()
|
|
except:
|
|
pass
|
|
|
|
# Initialize i18n
|
|
babel.init_app(app)
|
|
|
|
# Create SQLAlchemy if enabled
|
|
create_db(app)
|
|
|
|
# We initialize the API
|
|
api.version = __version__
|
|
api.release = __release__
|
|
api.__url__ = __url__
|
|
api.__doc__ = __doc__
|
|
api.load_all()
|
|
app.register_blueprint(apibp)
|
|
|
|
# Then we load our routes
|
|
view.__url__ = __url__
|
|
view.__doc__ = __doc__
|
|
app.register_blueprint(view)
|
|
|
|
# And the login_manager
|
|
app.login_manager = LoginManager()
|
|
app.login_manager.login_view = 'view.login'
|
|
app.login_manager.login_message_category = 'info'
|
|
app.login_manager.session_protection = 'strong'
|
|
app.login_manager.init_app(app)
|
|
|
|
app.config.setdefault(
|
|
'BOWER_COMPONENTS_ROOT',
|
|
os.path.join('static', 'vendor')
|
|
)
|
|
app.config.setdefault('BOWER_REPLACE_URL_FOR', True)
|
|
bower = Bower()
|
|
bower.init_app(app)
|
|
|
|
@app.before_request
|
|
def setup_request():
|
|
# make sure to store secure cookie if required
|
|
if app.scookie:
|
|
from flask import request
|
|
criteria = [
|
|
request.is_secure,
|
|
request.headers.get('X-Forwarded-Proto', 'http') == 'https'
|
|
]
|
|
app.config['SESSION_COOKIE_SECURE'] = \
|
|
app.config['REMEMBER_COOKIE_SECURE'] = any(criteria)
|
|
|
|
@app.login_manager.user_loader
|
|
def load_user(userid):
|
|
"""User loader callback"""
|
|
if app.auth != 'none':
|
|
return app.uhandler.user(userid)
|
|
return None
|
|
|
|
@app.login_manager.request_loader
|
|
def load_user_from_request(request):
|
|
"""User loader from request callback"""
|
|
if app.auth != 'none':
|
|
return basic_login_from_request(request, app)
|
|
|
|
@app.before_request
|
|
def before_request():
|
|
g.locale = get_locale()
|
|
|
|
return app
|
|
|
|
|
|
create_app = init
|