mirror of
https://github.com/ziirish/burp-ui.git
synced 2026-05-21 06:45:24 -06:00
437 lines
16 KiB
Python
437 lines
16 KiB
Python
# -*- coding: utf8 -*-
|
|
"""
|
|
Burp-UI is a web-ui for burp backup written in python with Flask and
|
|
jQuery/Bootstrap
|
|
|
|
.. module:: burpui.app
|
|
:platform: Unix
|
|
:synopsis: Burp-UI app module.
|
|
|
|
.. moduleauthor:: Ziirish <hi+burpui@ziirish.me>
|
|
"""
|
|
import os
|
|
import json
|
|
import time
|
|
import logging
|
|
|
|
from .desc import __version__, __release__
|
|
from .extensions import create_celery, create_db, create_websocket, \
|
|
parse_db_setting, get_redis_server
|
|
|
|
|
|
def create_app(conf=None, verbose=0, logfile=None, **kwargs):
|
|
"""Initialize the whole application.
|
|
|
|
:param conf: Configuration file to use
|
|
:type conf: str
|
|
|
|
:param verbose: Set the verbosity level
|
|
:type verbose: int
|
|
|
|
:param logfile: Store the logs in the given file
|
|
:type logfile: str
|
|
|
|
:param kwargs: Extra options:
|
|
- gunicorn (bool): Enable gunicorn engine instead of flask's
|
|
default. Default is True.
|
|
- unittest (bool): Are we running tests (used for test only).
|
|
Default is False.
|
|
- debug (bool): Enable debug mode. Default is False.
|
|
- cli (bool): Are we running the CLI. Default is False.
|
|
- reverse_proxy (bool): Are we behind a reverse-proxy.
|
|
Default is True if gunicorn is True
|
|
- websocket_server (bool): Are we running the websocket
|
|
server. Default is False
|
|
:type kwargs: dict
|
|
|
|
:returns: A :class:`burpui.engines.server.BUIServer` object
|
|
"""
|
|
from flask import g, request, session, render_template
|
|
from flask_login import LoginManager
|
|
from flask_bower import Bower
|
|
from flask_babel import gettext
|
|
from .utils import ReverseProxied, lookup_file, is_uuid
|
|
from .tools.logging import logger
|
|
from .security import basic_login_from_request
|
|
from .engines.server import BUIServer as BurpUI
|
|
from .sessions import session_manager
|
|
from .filter import mask
|
|
from .ext.cache import cache
|
|
from .ext.i18n import babel, get_locale
|
|
from .misc.auth.handler import BUIanon
|
|
|
|
gunicorn = kwargs.get('gunicorn', True)
|
|
unittest = kwargs.get('unittest', False)
|
|
debug = kwargs.get('debug', False)
|
|
cli = kwargs.get('cli', False)
|
|
reverse_proxy = kwargs.get('reverse_proxy', gunicorn)
|
|
celery_worker = kwargs.get('celery_worker', False)
|
|
websocket_server = kwargs.get('websocket_server', False)
|
|
|
|
# We initialize the core
|
|
app = BurpUI()
|
|
|
|
app.config['CFG'] = None
|
|
app.config['LOG_FILE'] = logfile
|
|
app.config['LOG_LEVEL'] = verbose
|
|
|
|
logger.init_app(app)
|
|
if verbose:
|
|
app.enable_logger()
|
|
|
|
app.gunicorn = gunicorn
|
|
|
|
logger.debug(
|
|
'conf: {}\n'.format(conf) +
|
|
'verbose: {}\n'.format(logging.getLevelName(verbose)) +
|
|
'logfile: {}\n'.format(logfile) +
|
|
'gunicorn: {}\n'.format(gunicorn) +
|
|
'debug: {}\n'.format(debug) +
|
|
'unittest: {}\n'.format(unittest) +
|
|
'cli: {}\n'.format(cli) +
|
|
'reverse_proxy: {}\n'.format(reverse_proxy) +
|
|
'celery_worker: {}\n'.format(celery_worker) +
|
|
'websocket_server: {}'.format(websocket_server)
|
|
)
|
|
|
|
# Some config
|
|
app.config['BUI_CLI'] = cli
|
|
|
|
# FIXME: strange behavior when bundling errors
|
|
# app.config['BUNDLE_ERRORS'] = True
|
|
|
|
app.config['REMEMBER_COOKIE_HTTPONLY'] = True
|
|
|
|
if debug and not gunicorn: # pragma: no cover
|
|
app.config['DEBUG'] = True and not unittest
|
|
app.config['TESTING'] = True and not unittest
|
|
|
|
# Still need to test conf file here because the init function can be called
|
|
# by gunicorn directly
|
|
if conf:
|
|
app.config['CFG'] = lookup_file(conf, guess=False)
|
|
else:
|
|
app.config['CFG'] = lookup_file()
|
|
|
|
logger.info('Using configuration: {}'.format(app.config['CFG']))
|
|
|
|
app.setup(app.config['CFG'], unittest, cli)
|
|
if cli and not websocket_server:
|
|
return app
|
|
|
|
if debug:
|
|
app.config.setdefault('TEMPLATES_AUTO_RELOAD', True)
|
|
app.config['TEMPLATES_AUTO_RELOAD'] = True
|
|
app.config['DEBUG'] = True
|
|
|
|
SENTRY_AVAILABLE = False
|
|
if app.demo:
|
|
try:
|
|
from .ext.sentry import sentry
|
|
sentry.init_app(app, dsn=app.config['BUI_DSN'])
|
|
SENTRY_AVAILABLE = True
|
|
except ImportError:
|
|
pass
|
|
|
|
# manage application secret key
|
|
if app.secret_key and \
|
|
(app.secret_key.lower() == 'none' or
|
|
(app.secret_key.lower() == 'random' and
|
|
gunicorn)): # pragma: no cover
|
|
logger.critical('Your setup is not secure! Please consider setting a'
|
|
' secret key in your configuration file')
|
|
app.secret_key = 'Burp-UI'
|
|
if not app.secret_key or app.secret_key.lower() == 'random':
|
|
from base64 import b64encode
|
|
app.secret_key = b64encode(os.urandom(256))
|
|
|
|
app.wsgi_app = ReverseProxied(app.wsgi_app, app)
|
|
|
|
# Manage reverse_proxy special tricks & improvements
|
|
if reverse_proxy: # pragma: no cover
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
|
|
kwargs = {}
|
|
if app.config['NUM_PROXIES'] > 0:
|
|
kwargs = app.config['PROXY_FIX_ARGS'].format(num_proxies=app.config['NUM_PROXIES'])
|
|
kwargs = json.loads(kwargs)
|
|
logger.debug(f"Using {kwargs} as ProxyFix parameters")
|
|
app = ProxyFix(app, **kwargs)
|
|
|
|
if app.storage and app.storage.lower() == 'redis':
|
|
try:
|
|
# Session setup
|
|
if not app.session_db or \
|
|
str(app.session_db).lower() not in ['none', 'false']:
|
|
from redis import Redis
|
|
from .ext.session import sess
|
|
host, port, pwd = get_redis_server(app)
|
|
db = 0
|
|
if app.session_db and \
|
|
str(app.session_db).lower() not \
|
|
in ['redis', 'default', 'true']:
|
|
try: # pragma: no cover
|
|
(_, _, pwd, host, port, db) = \
|
|
parse_db_setting(app.session_db)
|
|
except ValueError as exp:
|
|
logger.warning(str(exp))
|
|
try:
|
|
db = int(db)
|
|
except ValueError:
|
|
db = 0
|
|
logger.debug(
|
|
'SESSION: Using redis://guest:****@{}:{}/{}'.format(
|
|
host,
|
|
port,
|
|
db)
|
|
)
|
|
red = Redis(host=host, port=port, db=db, password=pwd)
|
|
app.config['WITH_SRV_SESSION'] = True
|
|
app.config['SESSION_TYPE'] = 'redis'
|
|
app.config['SESSION_REDIS'] = red
|
|
app.config['SESSION_USE_SIGNER'] = app.secret_key is not None
|
|
app.config['SESSION_PERMANENT'] = False
|
|
sess.init_app(app)
|
|
session_manager.backend = red
|
|
except Exception as exp: # pragma: no cover
|
|
logger.warning('Unable to initialize session: {}'.format(str(exp)))
|
|
app.config['WITH_SRV_SESSION'] = False
|
|
try:
|
|
# Cache setup
|
|
if not app.cache_db or \
|
|
str(app.cache_db).lower() not in ['none', 'false']:
|
|
host, port, pwd = get_redis_server(app)
|
|
db = 1
|
|
if app.cache_db and \
|
|
str(app.cache_db).lower() not \
|
|
in ['redis', 'default', 'true']:
|
|
try: # pragma: no cover
|
|
(_, _, pwd, host, port, db) = \
|
|
parse_db_setting(app.cache_db)
|
|
except ValueError as exp:
|
|
logger.warning(str(exp))
|
|
try:
|
|
db = int(db)
|
|
except ValueError:
|
|
db = 1
|
|
logger.debug('CACHE: Using redis://guest:****@{}:{}/{}'.format(
|
|
host,
|
|
port,
|
|
db)
|
|
)
|
|
cache.init_app(
|
|
app,
|
|
config={
|
|
'CACHE_TYPE': 'redis',
|
|
'CACHE_REDIS_HOST': host,
|
|
'CACHE_REDIS_PORT': port,
|
|
'CACHE_REDIS_PASSWORD': pwd,
|
|
'CACHE_REDIS_DB': db
|
|
}
|
|
)
|
|
# clear cache at startup in case we removed or added servers
|
|
with app.app_context():
|
|
cache.clear()
|
|
else: # pragma: no cover
|
|
cache.init_app(app)
|
|
except Exception as exp: # pragma: no cover
|
|
logger.warning('Unable to initialize cache: {}'.format(str(exp)))
|
|
cache.init_app(app)
|
|
try:
|
|
# Limiter setup
|
|
if app.limiter and str(app.limiter).lower() not \
|
|
in ['none', 'false']: # pragma: no cover
|
|
from .ext.limit import limiter
|
|
app.config['RATELIMIT_HEADERS_ENABLED'] = True
|
|
if app.limiter and str(app.limiter).lower() not \
|
|
in ['default', 'redis', 'true']:
|
|
app.config['RATELIMIT_STORAGE_URL'] = app.limiter
|
|
else:
|
|
db = 3
|
|
host, port, pwd = get_redis_server(app)
|
|
if pwd:
|
|
conn = 'redis://guest:{}@{}:{}/{}'.format(
|
|
pwd,
|
|
host,
|
|
port,
|
|
db
|
|
)
|
|
else:
|
|
conn = 'redis://{}:{}/{}'.format(host, port, db)
|
|
app.config['RATELIMIT_STORAGE_URL'] = conn
|
|
|
|
(_, _, pwd, host, port, db) = parse_db_setting(
|
|
app.config['RATELIMIT_STORAGE_URL']
|
|
)
|
|
|
|
logger.debug(
|
|
'LIMITER: Using redis://guest:****@{}:{}/{}'.format(
|
|
host,
|
|
port,
|
|
db
|
|
)
|
|
)
|
|
limiter.init_app(app)
|
|
app.config['WITH_LIMIT'] = True
|
|
except ImportError: # pragma: no cover
|
|
logger.warning('Unable to load limiter. Did you run \'pip install '
|
|
'flask-limiter\'?')
|
|
except Exception as exp: # pragma: no cover
|
|
logger.warning('Unable to initialize limiter: {}'.format(str(exp)))
|
|
else:
|
|
cache.init_app(app)
|
|
|
|
# Initialize i18n
|
|
babel.init_app(app)
|
|
|
|
# Create SQLAlchemy if enabled
|
|
create_db(app, cli, unittest, celery_worker=celery_worker)
|
|
|
|
if not celery_worker:
|
|
from .api import api, apibp
|
|
from .routes import view, mypad
|
|
|
|
app.jinja_env.globals.update(
|
|
isinstance=isinstance,
|
|
list=list,
|
|
mypad=mypad,
|
|
version_id='{}-{}'.format(__version__, __release__),
|
|
)
|
|
|
|
# We initialize the API
|
|
api.load_all()
|
|
app.register_blueprint(apibp)
|
|
|
|
# Then we load our routes
|
|
app.register_blueprint(view)
|
|
|
|
# Initialize Bower ext
|
|
app.config.setdefault(
|
|
'BOWER_COMPONENTS_ROOT',
|
|
os.path.join('static', 'vendor')
|
|
)
|
|
app.config.setdefault('BOWER_REPLACE_URL_FOR', True)
|
|
bower = Bower()
|
|
bower.init_app(app)
|
|
|
|
# Order of the initialization matters!
|
|
# The websocket must be configured prior to the celery worker for instance
|
|
|
|
# Initialize Session Manager
|
|
session_manager.init_app(app)
|
|
# Initialize filter
|
|
mask.init_app(app)
|
|
|
|
# And the login_manager
|
|
app.login_manager = LoginManager()
|
|
app.login_manager.anonymous_user = BUIanon
|
|
app.login_manager.login_view = 'view.login'
|
|
app.login_manager.login_message_category = 'info'
|
|
app.login_manager.session_protection = 'strong'
|
|
# This is just to have the strings in the .po files
|
|
app.login_manager.login_message = gettext(
|
|
'Please log in to access this page.'
|
|
)
|
|
app.login_manager.needs_refresh_message = gettext(
|
|
'Please reauthenticate to access this page.'
|
|
)
|
|
# This will be called at runtime and will then translate the strings
|
|
app.login_manager.localize_callback = gettext
|
|
app.login_manager.init_app(app)
|
|
|
|
# Create WebSocket server
|
|
if create_websocket(app, websocket_server, celery_worker, gunicorn, cli):
|
|
return app
|
|
|
|
# Create celery app if enabled
|
|
create_celery(app, warn=False)
|
|
|
|
def _check_session(user, request, api=False):
|
|
"""Check if the session is in the db"""
|
|
if user and not session_manager.session_in_db(): # pragma: no cover
|
|
login = getattr(user, 'name', None)
|
|
if login and not is_uuid(login):
|
|
remember = session.get('persistent', False)
|
|
if not remember:
|
|
from flask_login import decode_cookie
|
|
remember_cookie = request.cookies.get(
|
|
app.config.get('REMEMBER_COOKIE_NAME'),
|
|
False
|
|
)
|
|
# check if the remember_cookie is legit
|
|
if remember_cookie and decode_cookie(remember_cookie):
|
|
remember = True
|
|
session_manager.store_session(
|
|
login,
|
|
request.remote_addr,
|
|
request.headers.get('User-Agent'),
|
|
remember,
|
|
api
|
|
)
|
|
elif login:
|
|
app.uhandler.remove(login)
|
|
|
|
@app.before_request
|
|
def setup_request():
|
|
g.version = '{}-{}'.format(__version__, __release__)
|
|
g.locale = get_locale()
|
|
g.now = round(time.time())
|
|
g.date_format = session.get('dateFormat', 'llll')
|
|
# make sure to store secure cookie if required
|
|
if app.config['BUI_SCOOKIE']:
|
|
criteria = (
|
|
request.is_secure,
|
|
request.headers.get('X-Forwarded-Proto', 'http') == 'https'
|
|
)
|
|
app.config['SESSION_COOKIE_SECURE'] = \
|
|
app.config['REMEMBER_COOKIE_SECURE'] = any(criteria)
|
|
if '_extra' in request.args:
|
|
session['_extra'] = request.args.get('_extra')
|
|
g._extra = session.get('_extra', '')
|
|
|
|
@app.login_manager.user_loader
|
|
def load_user(userid):
|
|
"""User loader callback"""
|
|
if app.auth != 'none':
|
|
user = app.uhandler.user(userid)
|
|
if not user:
|
|
return None
|
|
if 'X-Language' in request.headers:
|
|
language = request.headers.get('X-Language')
|
|
user.language = language
|
|
session['language'] = language
|
|
if '_id' not in session:
|
|
from flask_login import login_user
|
|
# if _id is not in session, it means we loaded the user from
|
|
# cache/db using the remember cookie so we need to login it
|
|
login_user(user, remember=user.is_authenticated, fresh=False)
|
|
_check_session(user, request)
|
|
return user
|
|
return None
|
|
|
|
@app.login_manager.request_loader
|
|
def load_user_from_request(request):
|
|
"""User loader from request callback"""
|
|
if app.auth != 'none':
|
|
user = basic_login_from_request(request, app)
|
|
_check_session(user, request, True)
|
|
return user
|
|
|
|
@app.after_request
|
|
def after_request(response):
|
|
if getattr(g, 'basic_session', False):
|
|
if session_manager.invalidate_current_session():
|
|
session_manager.delete_session()
|
|
return response
|
|
|
|
if app.demo and SENTRY_AVAILABLE:
|
|
@app.errorhandler(500)
|
|
def internal_server_error(error):
|
|
from .ext.sentry import sentry
|
|
return render_template(
|
|
'500_sentry.html',
|
|
event_id=g.sentry_event_id,
|
|
public_dsn=sentry.client.get_public_dsn('https')
|
|
)
|
|
|
|
return app
|