Lint
This commit is contained in:
parent
63d8c591d0
commit
0435f399f1
2 changed files with 90 additions and 75 deletions
149
configurator.py
149
configurator.py
|
@ -1,5 +1,6 @@
|
|||
#!/usr/bin/python3
|
||||
# -*- coding: utf-8 -*-
|
||||
# pylint: disable=too-many-lines
|
||||
"""
|
||||
Configurator for Home Assistant.
|
||||
https://github.com/danielperna84/hass-configurator
|
||||
|
@ -82,6 +83,7 @@ NOTIFY_SERVICE_DEFAULT = "persistent_notification.create"
|
|||
NOTIFY_SERVICE = NOTIFY_SERVICE_DEFAULT
|
||||
### End of options
|
||||
|
||||
|
||||
LOGLEVEL = logging.INFO
|
||||
LOG = logging.getLogger(__name__)
|
||||
LOG.setLevel(LOGLEVEL)
|
||||
|
@ -94,6 +96,7 @@ RELEASEURL = "https://api.github.com/repos/danielperna84/hass-configurator/relea
|
|||
VERSION = "0.3.0"
|
||||
BASEDIR = "."
|
||||
DEV = False
|
||||
LISTENPORT = None
|
||||
TOTP = None
|
||||
HTTPD = None
|
||||
FAIL2BAN_IPS = {}
|
||||
|
@ -3413,6 +3416,7 @@ editor.on('change', queue_lint);
|
|||
</body>
|
||||
</html>""")
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def signal_handler(sig, frame):
|
||||
global HTTPD
|
||||
LOG.info("Got signal: %s. Shutting down server", str(sig))
|
||||
|
@ -3420,11 +3424,11 @@ def signal_handler(sig, frame):
|
|||
sys.exit(0)
|
||||
|
||||
def load_settings(settingsfile):
|
||||
global LISTENIP, PORT, BASEPATH, SSL_CERTIFICATE, SSL_KEY, HASS_API, \
|
||||
global LISTENIP, LISTENPORT, BASEPATH, SSL_CERTIFICATE, SSL_KEY, HASS_API, \
|
||||
HASS_API_PASSWORD, CREDENTIALS, ALLOWED_NETWORKS, BANNED_IPS, BANLIMIT, \
|
||||
DEV, IGNORE_PATTERN, DIRSFIRST, SESAME, VERIFY_HOSTNAME, ENFORCE_BASEPATH, \
|
||||
ENV_PREFIX, NOTIFY_SERVICE, USERNAME, PASSWORD, SESAME_TOTP_SECRET, TOTP, \
|
||||
GIT, REPO
|
||||
GIT, REPO, PORT
|
||||
settings = {}
|
||||
if settingsfile:
|
||||
try:
|
||||
|
@ -3513,11 +3517,17 @@ def get_dircontent(path, repo=None):
|
|||
unstaged = {}
|
||||
try:
|
||||
for element in repo.index.diff("HEAD"):
|
||||
staged["%s%s%s" % (repo.working_dir, os.sep, "%s"%os.sep.join(element.b_path.split('/')))] = element.change_type
|
||||
staged["%s%s%s" % (repo.working_dir,
|
||||
os.sep,
|
||||
"%s"%os.sep.join(
|
||||
element.b_path.split('/')))] = element.change_type
|
||||
except Exception as err:
|
||||
LOG.warning("Exception: %s", str(err))
|
||||
for element in repo.index.diff(None):
|
||||
unstaged["%s%s%s" % (repo.working_dir, os.sep, "%s"%os.sep.join(element.b_path.split('/')))] = element.change_type
|
||||
unstaged["%s%s%s" % (repo.working_dir,
|
||||
os.sep,
|
||||
"%s"%os.sep.join(
|
||||
element.b_path.split('/')))] = element.change_type
|
||||
else:
|
||||
untracked = []
|
||||
staged = {}
|
||||
|
@ -3527,9 +3537,9 @@ def get_dircontent(path, repo=None):
|
|||
dirlist = [x for x in os.listdir(path) if os.path.isdir(os.path.join(path, x))]
|
||||
filelist = [x for x in os.listdir(path) if not os.path.isdir(os.path.join(path, x))]
|
||||
if DIRSFIRST:
|
||||
return sorted(dirlist, key=lambda x: x.lower()) + sorted(filelist, key=lambda x: x.lower())
|
||||
else:
|
||||
return sorted(dirlist + filelist, key=lambda x: x.lower())
|
||||
return sorted(dirlist, key=lambda x: x.lower()) + \
|
||||
sorted(filelist, key=lambda x: x.lower())
|
||||
return sorted(dirlist + filelist, key=lambda x: x.lower())
|
||||
|
||||
for elem in sorted_file_list():
|
||||
edata = {}
|
||||
|
@ -3596,7 +3606,8 @@ def password_problems(password, name="UNKNOWN"):
|
|||
exp = len(password) ** len(set(password))
|
||||
score = exp / quota / 8
|
||||
if score < 65536:
|
||||
LOG.warning("Password %s does not contain enough unique characters (%i)" % (name, len(set(password))))
|
||||
LOG.warning("Password %s does not contain enough unique characters (%i)" % (
|
||||
name, len(set(password))))
|
||||
problems += 8
|
||||
return problems
|
||||
|
||||
|
@ -3622,15 +3633,18 @@ def verify_hostname(request_hostname):
|
|||
return True
|
||||
|
||||
class RequestHandler(BaseHTTPRequestHandler):
|
||||
# pylint: disable=redefined-builtin
|
||||
def log_message(self, format, *args):
|
||||
LOG.info("%s - %s" % (self.client_address[0], format % args))
|
||||
return
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def do_BLOCK(self, status=420, reason="Policy not fulfilled"):
|
||||
self.send_response(status)
|
||||
self.end_headers()
|
||||
self.wfile.write(bytes(reason, "utf8"))
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def do_GET(self):
|
||||
if not verify_hostname(self.headers.get('Host', '')):
|
||||
self.do_BLOCK(403, "Forbidden")
|
||||
|
@ -3691,7 +3705,9 @@ class RequestHandler(BaseHTTPRequestHandler):
|
|||
if os.path.isfile(os.path.join(BASEDIR.encode('utf-8'), filename)):
|
||||
with open(os.path.join(BASEDIR.encode('utf-8'), filename), 'rb') as fptr:
|
||||
filecontent = fptr.read()
|
||||
self.send_header('Content-Disposition', 'attachment; filename=%s' % filename.decode('utf-8').split(os.sep)[-1])
|
||||
self.send_header(
|
||||
'Content-Disposition',
|
||||
'attachment; filename=%s' % filename.decode('utf-8').split(os.sep)[-1])
|
||||
self.end_headers()
|
||||
self.wfile.write(filecontent)
|
||||
return
|
||||
|
@ -3731,14 +3747,15 @@ class RequestHandler(BaseHTTPRequestHandler):
|
|||
except Exception as err:
|
||||
LOG.debug("Exception (no repo): %s" % str(err))
|
||||
dircontent = get_dircontent(dirpath.decode('utf-8'), repo)
|
||||
filedata = {'content': dircontent,
|
||||
'abspath': os.path.abspath(dirpath).decode('utf-8'),
|
||||
'parent': os.path.dirname(os.path.abspath(dirpath)).decode('utf-8'),
|
||||
'branches': branches,
|
||||
'activebranch': activebranch,
|
||||
'dirty': dirty,
|
||||
'error': None
|
||||
}
|
||||
filedata = {
|
||||
'content': dircontent,
|
||||
'abspath': os.path.abspath(dirpath).decode('utf-8'),
|
||||
'parent': os.path.dirname(os.path.abspath(dirpath)).decode('utf-8'),
|
||||
'branches': branches,
|
||||
'activebranch': activebranch,
|
||||
'dirty': dirty,
|
||||
'error': None
|
||||
}
|
||||
self.wfile.write(bytes(json.dumps(filedata), "utf8"))
|
||||
except Exception as err:
|
||||
LOG.warning(err)
|
||||
|
@ -3967,19 +3984,21 @@ class RequestHandler(BaseHTTPRequestHandler):
|
|||
ws_api = "%s://%swebsocket" % (
|
||||
"wss" if protocol == 'https' else 'ws', uri
|
||||
)
|
||||
html = get_html().safe_substitute(services=services,
|
||||
events=events,
|
||||
states=states,
|
||||
loadfile=loadfile,
|
||||
current=VERSION,
|
||||
versionclass=color,
|
||||
githidden="" if GIT else "hiddendiv",
|
||||
separator="\%s" % os.sep if os.sep == "\\" else os.sep,
|
||||
listening_address="%s://%s:%i" % (
|
||||
'https' if SSL_CERTIFICATE else 'http', LISTENIP, PORT),
|
||||
hass_api_address="%s" % (HASS_API, ),
|
||||
hass_ws_address=ws_api,
|
||||
api_password=HASS_API_PASSWORD if HASS_API_PASSWORD else "")
|
||||
html = get_html().safe_substitute(
|
||||
services=services,
|
||||
events=events,
|
||||
states=states,
|
||||
loadfile=loadfile,
|
||||
current=VERSION,
|
||||
versionclass=color,
|
||||
githidden="" if GIT else "hiddendiv",
|
||||
# pylint: disable=anomalous-backslash-in-string
|
||||
separator="\%s" % os.sep if os.sep == "\\" else os.sep,
|
||||
listening_address="%s://%s:%i" % (
|
||||
'https' if SSL_CERTIFICATE else 'http', LISTENIP, PORT),
|
||||
hass_api_address="%s" % (HASS_API, ),
|
||||
hass_ws_address=ws_api,
|
||||
api_password=HASS_API_PASSWORD if HASS_API_PASSWORD else "")
|
||||
self.wfile.write(bytes(html, "utf8"))
|
||||
return
|
||||
else:
|
||||
|
@ -3987,6 +4006,7 @@ class RequestHandler(BaseHTTPRequestHandler):
|
|||
self.end_headers()
|
||||
self.wfile.write(bytes("File not found", "utf8"))
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def do_POST(self):
|
||||
global ALLOWED_NETWORKS, BANNED_IPS
|
||||
if not verify_hostname(self.headers.get('Host', '')):
|
||||
|
@ -4042,24 +4062,24 @@ class RequestHandler(BaseHTTPRequestHandler):
|
|||
response['message'] = "File too big: %i" % read
|
||||
self.wfile.write(bytes(json.dumps(response), "utf8"))
|
||||
return
|
||||
else:
|
||||
form = cgi.FieldStorage(
|
||||
fp=self.rfile,
|
||||
headers=self.headers,
|
||||
environ={'REQUEST_METHOD': 'POST',
|
||||
'CONTENT_TYPE': self.headers['Content-Type'],
|
||||
})
|
||||
filename = form['file'].filename
|
||||
filepath = form['path'].file.read()
|
||||
data = form['file'].file.read()
|
||||
open("%s%s%s" % (filepath, os.sep, filename), "wb").write(data)
|
||||
self.send_response(200)
|
||||
self.send_header('Content-type', 'text/json')
|
||||
self.end_headers()
|
||||
response['error'] = False
|
||||
response['message'] = "Upload successful"
|
||||
self.wfile.write(bytes(json.dumps(response), "utf8"))
|
||||
return
|
||||
form = cgi.FieldStorage(
|
||||
fp=self.rfile,
|
||||
headers=self.headers,
|
||||
environ={
|
||||
'REQUEST_METHOD': 'POST',
|
||||
'CONTENT_TYPE': self.headers['Content-Type'],
|
||||
})
|
||||
filename = form['file'].filename
|
||||
filepath = form['path'].file.read()
|
||||
data = form['file'].file.read()
|
||||
open("%s%s%s" % (filepath, os.sep, filename), "wb").write(data)
|
||||
self.send_response(200)
|
||||
self.send_header('Content-type', 'text/json')
|
||||
self.end_headers()
|
||||
response['error'] = False
|
||||
response['message'] = "Upload successful"
|
||||
self.wfile.write(bytes(json.dumps(response), "utf8"))
|
||||
return
|
||||
elif req.path.endswith('/api/delete'):
|
||||
try:
|
||||
postvars = parse_qs(self.rfile.read(length).decode('utf-8'),
|
||||
|
@ -4160,7 +4180,8 @@ class RequestHandler(BaseHTTPRequestHandler):
|
|||
# pylint: disable=not-callable
|
||||
repo = REPO(addpath,
|
||||
search_parent_directories=True)
|
||||
filepath = "/".join(addpath.split(os.sep)[len(repo.working_dir.split(os.sep)):])
|
||||
filepath = "/".join(
|
||||
addpath.split(os.sep)[len(repo.working_dir.split(os.sep)):])
|
||||
response['path'] = filepath
|
||||
try:
|
||||
repo.index.add([filepath])
|
||||
|
@ -4196,10 +4217,13 @@ class RequestHandler(BaseHTTPRequestHandler):
|
|||
# pylint: disable=not-callable
|
||||
repo = REPO(diffpath,
|
||||
search_parent_directories=True)
|
||||
filepath = "/".join(diffpath.split(os.sep)[len(repo.working_dir.split(os.sep)):])
|
||||
filepath = "/".join(
|
||||
diffpath.split(os.sep)[len(repo.working_dir.split(os.sep)):])
|
||||
response['path'] = filepath
|
||||
try:
|
||||
diff = repo.index.diff(None, create_patch=True, paths=filepath)[0].diff.decode("utf-8")
|
||||
diff = repo.index.diff(None,
|
||||
create_patch=True,
|
||||
paths=filepath)[0].diff.decode("utf-8")
|
||||
response['error'] = False
|
||||
response['message'] = diff
|
||||
self.send_response(200)
|
||||
|
@ -4546,21 +4570,21 @@ class RequestHandler(BaseHTTPRequestHandler):
|
|||
if 'ip' in postvars.keys() and 'method' in postvars.keys():
|
||||
if postvars['ip'] and postvars['method']:
|
||||
try:
|
||||
ip = unquote(postvars['ip'][0])
|
||||
ip_address = unquote(postvars['ip'][0])
|
||||
method = unquote(postvars['method'][0])
|
||||
if method == 'unban':
|
||||
if ip in BANNED_IPS:
|
||||
BANNED_IPS.remove(ip)
|
||||
if ip_address in BANNED_IPS:
|
||||
BANNED_IPS.remove(ip_address)
|
||||
response['error'] = False
|
||||
elif method == 'ban':
|
||||
ipaddress.ip_network(ip)
|
||||
BANNED_IPS.append(ip)
|
||||
ipaddress.ip_network(ip_address)
|
||||
BANNED_IPS.append(ip_address)
|
||||
else:
|
||||
response['error'] = True
|
||||
self.send_response(200)
|
||||
self.send_header('Content-type', 'text/json')
|
||||
self.end_headers()
|
||||
response['message'] = "BANNED_IPS (%s): %s" % (method, ip)
|
||||
response['message'] = "BANNED_IPS (%s): %s" % (method, ip_address)
|
||||
self.wfile.write(bytes(json.dumps(response), "utf8"))
|
||||
return
|
||||
except Exception as err:
|
||||
|
@ -4583,6 +4607,7 @@ class AuthHandler(RequestHandler):
|
|||
self.end_headers()
|
||||
self.wfile.write(bytes(reason, "utf8"))
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def do_AUTHHEAD(self):
|
||||
LOG.info("Requesting authorization")
|
||||
self.send_response(401)
|
||||
|
@ -4738,15 +4763,15 @@ def main(args):
|
|||
except Exception as err:
|
||||
LOG.warning("Exception while checking passwords: %s" % err)
|
||||
|
||||
CustomServer = SimpleServer
|
||||
custom_server = SimpleServer
|
||||
if ':' in LISTENIP:
|
||||
CustomServer.address_family = socket.AF_INET6
|
||||
custom_server.address_family = socket.AF_INET6
|
||||
server_address = (LISTENIP, PORT)
|
||||
if USERNAME and PASSWORD:
|
||||
Handler = AuthHandler
|
||||
handler = AuthHandler
|
||||
else:
|
||||
Handler = RequestHandler
|
||||
HTTPD = CustomServer(server_address, Handler)
|
||||
handler = RequestHandler
|
||||
HTTPD = custom_server(server_address, handler)
|
||||
if SSL_CERTIFICATE:
|
||||
HTTPD.socket = ssl.wrap_socket(HTTPD.socket,
|
||||
certfile=SSL_CERTIFICATE,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue