diff --git a/configurator.py b/configurator.py index 91ad644..052f211 100755 --- a/configurator.py +++ b/configurator.py @@ -1,5 +1,6 @@ #!/usr/bin/python3 # -*- coding: utf-8 -*- +# pylint: disable=too-many-lines """ Configurator for Home Assistant. https://github.com/danielperna84/hass-configurator @@ -82,6 +83,7 @@ NOTIFY_SERVICE_DEFAULT = "persistent_notification.create" NOTIFY_SERVICE = NOTIFY_SERVICE_DEFAULT ### End of options + LOGLEVEL = logging.INFO LOG = logging.getLogger(__name__) LOG.setLevel(LOGLEVEL) @@ -94,6 +96,7 @@ RELEASEURL = "https://api.github.com/repos/danielperna84/hass-configurator/relea VERSION = "0.3.0" BASEDIR = "." DEV = False +LISTENPORT = None TOTP = None HTTPD = None FAIL2BAN_IPS = {} @@ -3413,6 +3416,7 @@ editor.on('change', queue_lint); """) +# pylint: disable=unused-argument def signal_handler(sig, frame): global HTTPD LOG.info("Got signal: %s. Shutting down server", str(sig)) @@ -3420,11 +3424,11 @@ def signal_handler(sig, frame): sys.exit(0) def load_settings(settingsfile): - global LISTENIP, PORT, BASEPATH, SSL_CERTIFICATE, SSL_KEY, HASS_API, \ + global LISTENIP, LISTENPORT, BASEPATH, SSL_CERTIFICATE, SSL_KEY, HASS_API, \ HASS_API_PASSWORD, CREDENTIALS, ALLOWED_NETWORKS, BANNED_IPS, BANLIMIT, \ DEV, IGNORE_PATTERN, DIRSFIRST, SESAME, VERIFY_HOSTNAME, ENFORCE_BASEPATH, \ ENV_PREFIX, NOTIFY_SERVICE, USERNAME, PASSWORD, SESAME_TOTP_SECRET, TOTP, \ - GIT, REPO + GIT, REPO, PORT settings = {} if settingsfile: try: @@ -3513,11 +3517,17 @@ def get_dircontent(path, repo=None): unstaged = {} try: for element in repo.index.diff("HEAD"): - staged["%s%s%s" % (repo.working_dir, os.sep, "%s"%os.sep.join(element.b_path.split('/')))] = element.change_type + staged["%s%s%s" % (repo.working_dir, + os.sep, + "%s"%os.sep.join( + element.b_path.split('/')))] = element.change_type except Exception as err: LOG.warning("Exception: %s", str(err)) for element in repo.index.diff(None): - unstaged["%s%s%s" % (repo.working_dir, os.sep, "%s"%os.sep.join(element.b_path.split('/')))] = element.change_type + unstaged["%s%s%s" % (repo.working_dir, + os.sep, + "%s"%os.sep.join( + element.b_path.split('/')))] = element.change_type else: untracked = [] staged = {} @@ -3527,9 +3537,9 @@ def get_dircontent(path, repo=None): dirlist = [x for x in os.listdir(path) if os.path.isdir(os.path.join(path, x))] filelist = [x for x in os.listdir(path) if not os.path.isdir(os.path.join(path, x))] if DIRSFIRST: - return sorted(dirlist, key=lambda x: x.lower()) + sorted(filelist, key=lambda x: x.lower()) - else: - return sorted(dirlist + filelist, key=lambda x: x.lower()) + return sorted(dirlist, key=lambda x: x.lower()) + \ + sorted(filelist, key=lambda x: x.lower()) + return sorted(dirlist + filelist, key=lambda x: x.lower()) for elem in sorted_file_list(): edata = {} @@ -3596,7 +3606,8 @@ def password_problems(password, name="UNKNOWN"): exp = len(password) ** len(set(password)) score = exp / quota / 8 if score < 65536: - LOG.warning("Password %s does not contain enough unique characters (%i)" % (name, len(set(password)))) + LOG.warning("Password %s does not contain enough unique characters (%i)" % ( + name, len(set(password)))) problems += 8 return problems @@ -3622,15 +3633,18 @@ def verify_hostname(request_hostname): return True class RequestHandler(BaseHTTPRequestHandler): + # pylint: disable=redefined-builtin def log_message(self, format, *args): LOG.info("%s - %s" % (self.client_address[0], format % args)) return + # pylint: disable=invalid-name def do_BLOCK(self, status=420, reason="Policy not fulfilled"): self.send_response(status) self.end_headers() self.wfile.write(bytes(reason, "utf8")) + # pylint: disable=invalid-name def do_GET(self): if not verify_hostname(self.headers.get('Host', '')): self.do_BLOCK(403, "Forbidden") @@ -3691,7 +3705,9 @@ class RequestHandler(BaseHTTPRequestHandler): if os.path.isfile(os.path.join(BASEDIR.encode('utf-8'), filename)): with open(os.path.join(BASEDIR.encode('utf-8'), filename), 'rb') as fptr: filecontent = fptr.read() - self.send_header('Content-Disposition', 'attachment; filename=%s' % filename.decode('utf-8').split(os.sep)[-1]) + self.send_header( + 'Content-Disposition', + 'attachment; filename=%s' % filename.decode('utf-8').split(os.sep)[-1]) self.end_headers() self.wfile.write(filecontent) return @@ -3731,14 +3747,15 @@ class RequestHandler(BaseHTTPRequestHandler): except Exception as err: LOG.debug("Exception (no repo): %s" % str(err)) dircontent = get_dircontent(dirpath.decode('utf-8'), repo) - filedata = {'content': dircontent, - 'abspath': os.path.abspath(dirpath).decode('utf-8'), - 'parent': os.path.dirname(os.path.abspath(dirpath)).decode('utf-8'), - 'branches': branches, - 'activebranch': activebranch, - 'dirty': dirty, - 'error': None - } + filedata = { + 'content': dircontent, + 'abspath': os.path.abspath(dirpath).decode('utf-8'), + 'parent': os.path.dirname(os.path.abspath(dirpath)).decode('utf-8'), + 'branches': branches, + 'activebranch': activebranch, + 'dirty': dirty, + 'error': None + } self.wfile.write(bytes(json.dumps(filedata), "utf8")) except Exception as err: LOG.warning(err) @@ -3967,19 +3984,21 @@ class RequestHandler(BaseHTTPRequestHandler): ws_api = "%s://%swebsocket" % ( "wss" if protocol == 'https' else 'ws', uri ) - html = get_html().safe_substitute(services=services, - events=events, - states=states, - loadfile=loadfile, - current=VERSION, - versionclass=color, - githidden="" if GIT else "hiddendiv", - separator="\%s" % os.sep if os.sep == "\\" else os.sep, - listening_address="%s://%s:%i" % ( - 'https' if SSL_CERTIFICATE else 'http', LISTENIP, PORT), - hass_api_address="%s" % (HASS_API, ), - hass_ws_address=ws_api, - api_password=HASS_API_PASSWORD if HASS_API_PASSWORD else "") + html = get_html().safe_substitute( + services=services, + events=events, + states=states, + loadfile=loadfile, + current=VERSION, + versionclass=color, + githidden="" if GIT else "hiddendiv", + # pylint: disable=anomalous-backslash-in-string + separator="\%s" % os.sep if os.sep == "\\" else os.sep, + listening_address="%s://%s:%i" % ( + 'https' if SSL_CERTIFICATE else 'http', LISTENIP, PORT), + hass_api_address="%s" % (HASS_API, ), + hass_ws_address=ws_api, + api_password=HASS_API_PASSWORD if HASS_API_PASSWORD else "") self.wfile.write(bytes(html, "utf8")) return else: @@ -3987,6 +4006,7 @@ class RequestHandler(BaseHTTPRequestHandler): self.end_headers() self.wfile.write(bytes("File not found", "utf8")) + # pylint: disable=invalid-name def do_POST(self): global ALLOWED_NETWORKS, BANNED_IPS if not verify_hostname(self.headers.get('Host', '')): @@ -4042,24 +4062,24 @@ class RequestHandler(BaseHTTPRequestHandler): response['message'] = "File too big: %i" % read self.wfile.write(bytes(json.dumps(response), "utf8")) return - else: - form = cgi.FieldStorage( - fp=self.rfile, - headers=self.headers, - environ={'REQUEST_METHOD': 'POST', - 'CONTENT_TYPE': self.headers['Content-Type'], - }) - filename = form['file'].filename - filepath = form['path'].file.read() - data = form['file'].file.read() - open("%s%s%s" % (filepath, os.sep, filename), "wb").write(data) - self.send_response(200) - self.send_header('Content-type', 'text/json') - self.end_headers() - response['error'] = False - response['message'] = "Upload successful" - self.wfile.write(bytes(json.dumps(response), "utf8")) - return + form = cgi.FieldStorage( + fp=self.rfile, + headers=self.headers, + environ={ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': self.headers['Content-Type'], + }) + filename = form['file'].filename + filepath = form['path'].file.read() + data = form['file'].file.read() + open("%s%s%s" % (filepath, os.sep, filename), "wb").write(data) + self.send_response(200) + self.send_header('Content-type', 'text/json') + self.end_headers() + response['error'] = False + response['message'] = "Upload successful" + self.wfile.write(bytes(json.dumps(response), "utf8")) + return elif req.path.endswith('/api/delete'): try: postvars = parse_qs(self.rfile.read(length).decode('utf-8'), @@ -4160,7 +4180,8 @@ class RequestHandler(BaseHTTPRequestHandler): # pylint: disable=not-callable repo = REPO(addpath, search_parent_directories=True) - filepath = "/".join(addpath.split(os.sep)[len(repo.working_dir.split(os.sep)):]) + filepath = "/".join( + addpath.split(os.sep)[len(repo.working_dir.split(os.sep)):]) response['path'] = filepath try: repo.index.add([filepath]) @@ -4196,10 +4217,13 @@ class RequestHandler(BaseHTTPRequestHandler): # pylint: disable=not-callable repo = REPO(diffpath, search_parent_directories=True) - filepath = "/".join(diffpath.split(os.sep)[len(repo.working_dir.split(os.sep)):]) + filepath = "/".join( + diffpath.split(os.sep)[len(repo.working_dir.split(os.sep)):]) response['path'] = filepath try: - diff = repo.index.diff(None, create_patch=True, paths=filepath)[0].diff.decode("utf-8") + diff = repo.index.diff(None, + create_patch=True, + paths=filepath)[0].diff.decode("utf-8") response['error'] = False response['message'] = diff self.send_response(200) @@ -4546,21 +4570,21 @@ class RequestHandler(BaseHTTPRequestHandler): if 'ip' in postvars.keys() and 'method' in postvars.keys(): if postvars['ip'] and postvars['method']: try: - ip = unquote(postvars['ip'][0]) + ip_address = unquote(postvars['ip'][0]) method = unquote(postvars['method'][0]) if method == 'unban': - if ip in BANNED_IPS: - BANNED_IPS.remove(ip) + if ip_address in BANNED_IPS: + BANNED_IPS.remove(ip_address) response['error'] = False elif method == 'ban': - ipaddress.ip_network(ip) - BANNED_IPS.append(ip) + ipaddress.ip_network(ip_address) + BANNED_IPS.append(ip_address) else: response['error'] = True self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() - response['message'] = "BANNED_IPS (%s): %s" % (method, ip) + response['message'] = "BANNED_IPS (%s): %s" % (method, ip_address) self.wfile.write(bytes(json.dumps(response), "utf8")) return except Exception as err: @@ -4583,6 +4607,7 @@ class AuthHandler(RequestHandler): self.end_headers() self.wfile.write(bytes(reason, "utf8")) + # pylint: disable=invalid-name def do_AUTHHEAD(self): LOG.info("Requesting authorization") self.send_response(401) @@ -4738,15 +4763,15 @@ def main(args): except Exception as err: LOG.warning("Exception while checking passwords: %s" % err) - CustomServer = SimpleServer + custom_server = SimpleServer if ':' in LISTENIP: - CustomServer.address_family = socket.AF_INET6 + custom_server.address_family = socket.AF_INET6 server_address = (LISTENIP, PORT) if USERNAME and PASSWORD: - Handler = AuthHandler + handler = AuthHandler else: - Handler = RequestHandler - HTTPD = CustomServer(server_address, Handler) + handler = RequestHandler + HTTPD = custom_server(server_address, handler) if SSL_CERTIFICATE: HTTPD.socket = ssl.wrap_socket(HTTPD.socket, certfile=SSL_CERTIFICATE, diff --git a/pylintrc b/pylintrc index 5c85bc9..c4f64f2 100644 --- a/pylintrc +++ b/pylintrc @@ -2,22 +2,12 @@ reports=no disable= - too-many-lines, missing-docstring, global-statement, - global-variable-not-assigned, + logging-not-lazy, + broad-except, too-many-return-statements, too-many-nested-blocks, too-many-statements, too-many-branches, - too-many-locals, - unused-argument, - logging-not-lazy, - invalid-name, - broad-except, - line-too-long, - unnecessary-pass, - anomalous-backslash-in-string, - redefined-variable-type, - redefined-builtin, - no-else-return, \ No newline at end of file + too-many-locals, \ No newline at end of file