Added docstrings

This commit is contained in:
Daniel Perna 2018-07-12 23:37:38 +02:00
parent 0435f399f1
commit f0afbad7a8
2 changed files with 18 additions and 2 deletions

View file

@ -83,7 +83,6 @@ NOTIFY_SERVICE_DEFAULT = "persistent_notification.create"
NOTIFY_SERVICE = NOTIFY_SERVICE_DEFAULT NOTIFY_SERVICE = NOTIFY_SERVICE_DEFAULT
### End of options ### End of options
LOGLEVEL = logging.INFO LOGLEVEL = logging.INFO
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
LOG.setLevel(LOGLEVEL) LOG.setLevel(LOGLEVEL)
@ -3418,12 +3417,14 @@ editor.on('change', queue_lint);
# pylint: disable=unused-argument # pylint: disable=unused-argument
def signal_handler(sig, frame): def signal_handler(sig, frame):
"""Handle signal to shut down server."""
global HTTPD global HTTPD
LOG.info("Got signal: %s. Shutting down server", str(sig)) LOG.info("Got signal: %s. Shutting down server", str(sig))
HTTPD.server_close() HTTPD.server_close()
sys.exit(0) sys.exit(0)
def load_settings(settingsfile): def load_settings(settingsfile):
"""Load settings from file and environment."""
global LISTENIP, LISTENPORT, BASEPATH, SSL_CERTIFICATE, SSL_KEY, HASS_API, \ global LISTENIP, LISTENPORT, BASEPATH, SSL_CERTIFICATE, SSL_KEY, HASS_API, \
HASS_API_PASSWORD, CREDENTIALS, ALLOWED_NETWORKS, BANNED_IPS, BANLIMIT, \ HASS_API_PASSWORD, CREDENTIALS, ALLOWED_NETWORKS, BANNED_IPS, BANLIMIT, \
DEV, IGNORE_PATTERN, DIRSFIRST, SESAME, VERIFY_HOSTNAME, ENFORCE_BASEPATH, \ DEV, IGNORE_PATTERN, DIRSFIRST, SESAME, VERIFY_HOSTNAME, ENFORCE_BASEPATH, \
@ -3500,6 +3501,7 @@ def load_settings(settingsfile):
LOG.warning("Unable to create TOTP object: %s" % err) LOG.warning("Unable to create TOTP object: %s" % err)
def is_safe_path(basedir, path, follow_symlinks=True): def is_safe_path(basedir, path, follow_symlinks=True):
"""Check path for malicious traversal."""
if basedir is None: if basedir is None:
return True return True
if follow_symlinks: if follow_symlinks:
@ -3507,6 +3509,7 @@ def is_safe_path(basedir, path, follow_symlinks=True):
return os.path.abspath(path).startswith(basedir.encode('utf-8')) return os.path.abspath(path).startswith(basedir.encode('utf-8'))
def get_dircontent(path, repo=None): def get_dircontent(path, repo=None):
"""Get content of directory."""
dircontent = [] dircontent = []
if repo: if repo:
untracked = [ untracked = [
@ -3534,6 +3537,7 @@ def get_dircontent(path, repo=None):
unstaged = {} unstaged = {}
def sorted_file_list(): def sorted_file_list():
"""Sort list of files / directories."""
dirlist = [x for x in os.listdir(path) if os.path.isdir(os.path.join(path, x))] dirlist = [x for x in os.listdir(path) if os.path.isdir(os.path.join(path, x))]
filelist = [x for x in os.listdir(path) if not os.path.isdir(os.path.join(path, x))] filelist = [x for x in os.listdir(path) if not os.path.isdir(os.path.join(path, x))]
if DIRSFIRST: if DIRSFIRST:
@ -3578,6 +3582,7 @@ def get_dircontent(path, repo=None):
return dircontent return dircontent
def get_html(): def get_html():
"""Load the HTML from file in dev-mode, otherwise embedded."""
if DEV: if DEV:
try: try:
with open("dev.html") as fptr: with open("dev.html") as fptr:
@ -3589,6 +3594,7 @@ def get_html():
return INDEX return INDEX
def password_problems(password, name="UNKNOWN"): def password_problems(password, name="UNKNOWN"):
"""Rudimentary checks for password strength."""
problems = 0 problems = 0
password = str(password) password = str(password)
if password is None: if password is None:
@ -3612,6 +3618,7 @@ def password_problems(password, name="UNKNOWN"):
return problems return problems
def check_access(clientip): def check_access(clientip):
"""Check if IP is allowed to access the configurator / API."""
global BANNED_IPS global BANNED_IPS
if clientip in BANNED_IPS: if clientip in BANNED_IPS:
LOG.warning("Client IP banned.") LOG.warning("Client IP banned.")
@ -3627,12 +3634,14 @@ def check_access(clientip):
return False return False
def verify_hostname(request_hostname): def verify_hostname(request_hostname):
"""Verify the provided host header is correct."""
if VERIFY_HOSTNAME: if VERIFY_HOSTNAME:
if VERIFY_HOSTNAME not in request_hostname: if VERIFY_HOSTNAME not in request_hostname:
return False return False
return True return True
class RequestHandler(BaseHTTPRequestHandler): class RequestHandler(BaseHTTPRequestHandler):
"""Request handler."""
# pylint: disable=redefined-builtin # pylint: disable=redefined-builtin
def log_message(self, format, *args): def log_message(self, format, *args):
LOG.info("%s - %s" % (self.client_address[0], format % args)) LOG.info("%s - %s" % (self.client_address[0], format % args))
@ -3640,12 +3649,14 @@ class RequestHandler(BaseHTTPRequestHandler):
# pylint: disable=invalid-name # pylint: disable=invalid-name
def do_BLOCK(self, status=420, reason="Policy not fulfilled"): def do_BLOCK(self, status=420, reason="Policy not fulfilled"):
"""Customized do_BLOCK method."""
self.send_response(status) self.send_response(status)
self.end_headers() self.end_headers()
self.wfile.write(bytes(reason, "utf8")) self.wfile.write(bytes(reason, "utf8"))
# pylint: disable=invalid-name # pylint: disable=invalid-name
def do_GET(self): def do_GET(self):
"""Customized do_GET method."""
if not verify_hostname(self.headers.get('Host', '')): if not verify_hostname(self.headers.get('Host', '')):
self.do_BLOCK(403, "Forbidden") self.do_BLOCK(403, "Forbidden")
return return
@ -4008,6 +4019,7 @@ class RequestHandler(BaseHTTPRequestHandler):
# pylint: disable=invalid-name # pylint: disable=invalid-name
def do_POST(self): def do_POST(self):
"""Customized do_POST method."""
global ALLOWED_NETWORKS, BANNED_IPS global ALLOWED_NETWORKS, BANNED_IPS
if not verify_hostname(self.headers.get('Host', '')): if not verify_hostname(self.headers.get('Host', '')):
self.do_BLOCK(403, "Forbidden") self.do_BLOCK(403, "Forbidden")
@ -4602,6 +4614,7 @@ class RequestHandler(BaseHTTPRequestHandler):
return return
class AuthHandler(RequestHandler): class AuthHandler(RequestHandler):
"""Handler to verify auth header."""
def do_BLOCK(self, status=420, reason="Policy not fulfilled"): def do_BLOCK(self, status=420, reason="Policy not fulfilled"):
self.send_response(status) self.send_response(status)
self.end_headers() self.end_headers()
@ -4609,6 +4622,7 @@ class AuthHandler(RequestHandler):
# pylint: disable=invalid-name # pylint: disable=invalid-name
def do_AUTHHEAD(self): def do_AUTHHEAD(self):
"""Request authorization."""
LOG.info("Requesting authorization") LOG.info("Requesting authorization")
self.send_response(401) self.send_response(401)
self.send_header('WWW-Authenticate', 'Basic realm=\"HASS-Configurator\"') self.send_header('WWW-Authenticate', 'Basic realm=\"HASS-Configurator\"')
@ -4682,6 +4696,7 @@ class AuthHandler(RequestHandler):
self.wfile.write(bytes('Authentication required', 'utf-8')) self.wfile.write(bytes('Authentication required', 'utf-8'))
class SimpleServer(socketserver.ThreadingMixIn, socketserver.TCPServer): class SimpleServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
"""Server class."""
daemon_threads = True daemon_threads = True
allow_reuse_address = True allow_reuse_address = True
@ -4691,6 +4706,7 @@ class SimpleServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
def notify(title="HASS Configurator", def notify(title="HASS Configurator",
message="Notification by HASS Configurator", message="Notification by HASS Configurator",
notification_id=None): notification_id=None):
"""Helper function to send notifications via HASS."""
if not HASS_API or not NOTIFY_SERVICE: if not HASS_API or not NOTIFY_SERVICE:
return return
headers = { headers = {
@ -4717,6 +4733,7 @@ def notify(title="HASS Configurator",
LOG.warning("Exception while creating notification: %s" % err) LOG.warning("Exception while creating notification: %s" % err)
def main(args): def main(args):
"""Main function, duh!"""
global HTTPD global HTTPD
if args: if args:
load_settings(args[0]) load_settings(args[0])

View file

@ -2,7 +2,6 @@
reports=no reports=no
disable= disable=
missing-docstring,
global-statement, global-statement,
logging-not-lazy, logging-not-lazy,
broad-except, broad-except,