#!/usr/bin/python3 # -*- coding: utf-8 -*- """ Configurator for Home Assistant. https://github.com/danielperna84/hass-configurator """ import os import sys import json import ssl import socketserver import base64 import ipaddress import signal import cgi import shlex import subprocess import logging import fnmatch from string import Template from http.server import BaseHTTPRequestHandler, HTTPServer import urllib.request from urllib.parse import urlparse, parse_qs, unquote ### Some options for you to change LISTENIP = "0.0.0.0" LISTENPORT = 3218 # Set BASEPATH to something like "/home/hass/.homeassistant/" if you're not running the # configurator from that path BASEPATH = None # Set the paths to a certificate and the key if you're using SSL, e.g "/etc/ssl/certs/mycert.pem" SSL_CERTIFICATE = None SSL_KEY = None # Set the destination where the HASS API is reachable HASS_API = "http://127.0.0.1:8123/api/" # If a password is required to access the API, set it in the form of "password" # if you have HA ignoring SSL locally this is not needed if on same machine. HASS_API_PASSWORD = None # To enable authentication, set the credentials in the form of "username:password" CREDENTIALS = None # Limit access to the configurator by adding allowed IP addresses / networks to the list, # e.g ALLOWED_NETWORKS = ["192.168.0.0/24", "172.16.47.23"] ALLOWED_NETWORKS = [] # List of statically banned IP addresses, e.g. ["1.1.1.1", "2.2.2.2"] BANNED_IPS = [] # Ban IPs after n failed login attempts. Restart service to reset banning. The default # of `0` disables this feature. BANLIMIT = 0 # Enable git integration. GitPython (https://gitpython.readthedocs.io/en/stable/) has # to be installed. GIT = False # Files to ignore in the UI. A good example list that cleans up the UI is # [".*", "*.log", "deps", "icloud", "*.conf", "*.json", "certs", "__pycache__"] IGNORE_PATTERN = [] ### End of options LOGLEVEL = logging.INFO LOG = logging.getLogger(__name__) LOG.setLevel(LOGLEVEL) SO = logging.StreamHandler(sys.stdout) SO.setLevel(LOGLEVEL) SO.setFormatter(logging.Formatter('%(levelname)s:%(asctime)s:%(name)s:%(message)s')) LOG.addHandler(SO) RELEASEURL = "https://api.github.com/repos/danielperna84/hass-configurator/releases/latest" VERSION = "0.1.9" BASEDIR = "." DEV = False HTTPD = None FAIL2BAN_IPS = {} REPO = False if GIT: try: from git import Repo as REPO except Exception: LOG.warning("Unable to import Git module") INDEX = Template(r""" HASS Configurator

""") def signal_handler(sig, frame): global HTTPD LOG.info("Got signal: %s. Shutting down server", str(sig)) HTTPD.server_close() sys.exit(0) def load_settings(settingsfile): global LISTENIP, LISTENPORT, BASEPATH, SSL_CERTIFICATE, SSL_KEY, HASS_API, \ HASS_API_PASSWORD, CREDENTIALS, ALLOWED_NETWORKS, BANNED_IPS, BANLIMIT, DEV, \ IGNORE_PATTERN try: if os.path.isfile(settingsfile): with open(settingsfile) as fptr: settings = json.loads(fptr.read()) LISTENIP = settings.get("LISTENIP", LISTENIP) LISTENPORT = settings.get("LISTENPORT", LISTENPORT) BASEPATH = settings.get("BASEPATH", BASEPATH) SSL_CERTIFICATE = settings.get("SSL_CERTIFICATE", SSL_CERTIFICATE) SSL_KEY = settings.get("SSL_KEY", SSL_KEY) HASS_API = settings.get("HASS_API", HASS_API) HASS_API_PASSWORD = settings.get("HASS_API_PASSWORD", HASS_API_PASSWORD) CREDENTIALS = settings.get("CREDENTIALS", CREDENTIALS) ALLOWED_NETWORKS = settings.get("ALLOWED_NETWORKS", ALLOWED_NETWORKS) BANNED_IPS = settings.get("BANNED_IPS", BANNED_IPS) BANLIMIT = settings.get("BANLIMIT", BANLIMIT) DEV = settings.get("DEV", DEV) IGNORE_PATTERN = settings.get("IGNORE_PATTERN", IGNORE_PATTERN) except Exception as err: LOG.warning(err) LOG.warning("Not loading static settings") return False def get_dircontent(path, repo=None): dircontent = [] if repo: untracked = [ "%s%s%s"%(repo.working_dir, os.sep, e) for e in \ ["%s"%os.sep.join(f.split('/')) for f in repo.untracked_files] ] staged = {} unstaged = {} try: for element in repo.index.diff("HEAD"): staged["%s%s%s" % (repo.working_dir, os.sep, "%s"%os.sep.join(element.b_path.split('/')))] = element.change_type except Exception as err: LOG.warning("Exception: %s", str(err)) for element in repo.index.diff(None): unstaged["%s%s%s" % (repo.working_dir, os.sep, "%s"%os.sep.join(element.b_path.split('/')))] = element.change_type else: untracked = [] staged = {} unstaged = {} for elem in sorted(os.listdir(path), key=lambda x: x.lower()): edata = {} edata['name'] = elem edata['dir'] = path edata['fullpath'] = os.path.abspath(os.path.join(path, elem)) edata['type'] = 'dir' if os.path.isdir(edata['fullpath']) else 'file' try: stats = os.stat(os.path.join(path, elem)) edata['size'] = stats.st_size edata['modified'] = stats.st_mtime edata['created'] = stats.st_ctime except Exception: edata['size'] = 0 edata['modified'] = 0 edata['created'] = 0 edata['changetype'] = None edata['gitstatus'] = bool(repo) edata['gittracked'] = 'untracked' if edata['fullpath'] in untracked else 'tracked' if edata['fullpath'] in unstaged: edata['gitstatus'] = 'unstaged' edata['changetype'] = unstaged.get(edata['name'], None) elif edata['fullpath'] in staged: edata['gitstatus'] = 'staged' edata['changetype'] = staged.get(edata['name'], None) hidden = False if IGNORE_PATTERN is not None: for file_pattern in IGNORE_PATTERN: if fnmatch.fnmatch(edata['name'], file_pattern): hidden = True if not hidden: dircontent.append(edata) return dircontent def get_html(): if DEV: try: with open("dev.html") as fptr: html = Template(fptr.read()) return html except Exception as err: LOG.warning(err) LOG.warning("Delivering embedded HTML") return INDEX def check_access(clientip): global BANNED_IPS if clientip in BANNED_IPS: return False if not ALLOWED_NETWORKS: return True for net in ALLOWED_NETWORKS: ipobject = ipaddress.ip_address(clientip) if ipobject in ipaddress.ip_network(net): return True BANNED_IPS.append(clientip) return False class RequestHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): LOG.info("%s - %s" % (self.client_address[0], format % args)) return def do_BLOCK(self): self.send_response(420) self.end_headers() self.wfile.write(bytes("Policy not fulfilled", "utf8")) def do_GET(self): if not check_access(self.client_address[0]): self.do_BLOCK() return req = urlparse(self.path) query = parse_qs(req.query) self.send_response(200) if req.path == '/api/file': content = "" self.send_header('Content-type', 'text/text') self.end_headers() filename = query.get('filename', None) try: if filename: filename = unquote(filename[0]).encode('utf-8') if os.path.isfile(os.path.join(BASEDIR.encode('utf-8'), filename)): with open(os.path.join(BASEDIR.encode('utf-8'), filename)) as fptr: content += fptr.read() else: content = "File not found" except Exception as err: LOG.warning(err) content = str(err) self.wfile.write(bytes(content, "utf8")) return elif req.path == '/api/download': content = "" filename = query.get('filename', None) try: if filename: filename = unquote(filename[0]).encode('utf-8') LOG.info(filename) if os.path.isfile(os.path.join(BASEDIR.encode('utf-8'), filename)): with open(os.path.join(BASEDIR.encode('utf-8'), filename), 'rb') as fptr: filecontent = fptr.read() self.send_header('Content-Disposition', 'attachment; filename=%s' % filename.decode('utf-8').split(os.sep)[-1]) self.end_headers() self.wfile.write(filecontent) return else: content = "File not found" except Exception as err: LOG.warning(err) content = str(err) self.send_header('Content-type', 'text/text') self.wfile.write(bytes(content, "utf8")) return elif req.path == '/api/listdir': content = "" self.send_header('Content-type', 'text/json') self.end_headers() dirpath = query.get('path', None) try: if dirpath: dirpath = unquote(dirpath[0]).encode('utf-8') if os.path.isdir(dirpath): repo = None activebranch = None dirty = False branches = [] if REPO: try: repo = REPO(dirpath.decode('utf-8'), search_parent_directories=True) activebranch = repo.active_branch.name dirty = repo.is_dirty() for branch in repo.branches: branches.append(branch.name) except Exception as err: LOG.debug("Exception (no repo): %s" % str(err)) dircontent = get_dircontent(dirpath.decode('utf-8'), repo) filedata = {'content': dircontent, 'abspath': os.path.abspath(dirpath).decode('utf-8'), 'parent': os.path.dirname(os.path.abspath(dirpath)).decode('utf-8'), 'branches': branches, 'activebranch': activebranch, 'dirty': dirty } self.wfile.write(bytes(json.dumps(filedata), "utf8")) except Exception as err: LOG.warning(err) content = str(err) self.wfile.write(bytes(content, "utf8")) return elif req.path == '/api/abspath': content = "" self.send_header('Content-type', 'text/text') self.end_headers() dirpath = query.get('path', None) if dirpath: dirpath = unquote(dirpath[0]).encode('utf-8') LOG.debug(dirpath) absp = os.path.abspath(dirpath) LOG.debug(absp) if os.path.isdir(dirpath): self.wfile.write(os.path.abspath(dirpath)) return elif req.path == '/api/parent': content = "" self.send_header('Content-type', 'text/text') self.end_headers() dirpath = query.get('path', None) if dirpath: dirpath = unquote(dirpath[0]).encode('utf-8') LOG.debug(dirpath) absp = os.path.abspath(dirpath) LOG.debug(absp) if os.path.isdir(dirpath): self.wfile.write(os.path.abspath(os.path.dirname(dirpath))) return elif req.path == '/api/restart': LOG.info("/api/restart") self.send_header('Content-type', 'text/json') self.end_headers() res = {"restart": False} try: headers = { "Content-Type": "application/json" } if HASS_API_PASSWORD: headers["x-ha-access"] = HASS_API_PASSWORD req = urllib.request.Request("%sservices/homeassistant/restart" % HASS_API, headers=headers, method='POST') with urllib.request.urlopen(req) as response: res = json.loads(response.read().decode('utf-8')) LOG.debug(res) except Exception as err: LOG.warning(err) res['restart'] = str(err) self.wfile.write(bytes(json.dumps(res), "utf8")) return elif req.path == '/api/check_config': LOG.info("/api/check_config") self.send_header('Content-type', 'text/json') self.end_headers() res = {"check_config": False} try: headers = { "Content-Type": "application/json" } if HASS_API_PASSWORD: headers["x-ha-access"] = HASS_API_PASSWORD req = urllib.request.Request("%sservices/homeassistant/check_config" % HASS_API, headers=headers, method='POST') # with urllib.request.urlopen(req) as response: # print(json.loads(response.read().decode('utf-8'))) # res['service'] = "called successfully" except Exception as err: LOG.warning(err) res['restart'] = str(err) self.wfile.write(bytes(json.dumps(res), "utf8")) return elif req.path == '/api/reload_automations': LOG.info("/api/reload_automations") self.send_header('Content-type', 'text/json') self.end_headers() res = {"reload_automations": False} try: headers = { "Content-Type": "application/json" } if HASS_API_PASSWORD: headers["x-ha-access"] = HASS_API_PASSWORD req = urllib.request.Request("%sservices/automation/reload" % HASS_API, headers=headers, method='POST') with urllib.request.urlopen(req) as response: LOG.debug(json.loads(response.read().decode('utf-8'))) res['service'] = "called successfully" except Exception as err: LOG.warning(err) res['restart'] = str(err) self.wfile.write(bytes(json.dumps(res), "utf8")) return elif req.path == '/api/reload_groups': LOG.info("/api/reload_groups") self.send_header('Content-type', 'text/json') self.end_headers() res = {"reload_groups": False} try: headers = { "Content-Type": "application/json" } if HASS_API_PASSWORD: headers["x-ha-access"] = HASS_API_PASSWORD req = urllib.request.Request("%sservices/group/reload" % HASS_API, headers=headers, method='POST') with urllib.request.urlopen(req) as response: LOG.debug(json.loads(response.read().decode('utf-8'))) res['service'] = "called successfully" except Exception as err: LOG.warning(err) res['restart'] = str(err) self.wfile.write(bytes(json.dumps(res), "utf8")) return elif req.path == '/api/reload_core': LOG.info("/api/reload_core") self.send_header('Content-type', 'text/json') self.end_headers() res = {"reload_core": False} try: headers = { "Content-Type": "application/json" } if HASS_API_PASSWORD: headers["x-ha-access"] = HASS_API_PASSWORD req = urllib.request.Request("%sservices/homeassistant/reload_core_config" % HASS_API, headers=headers, method='POST') with urllib.request.urlopen(req) as response: LOG.debug(json.loads(response.read().decode('utf-8'))) res['service'] = "called successfully" except Exception as err: LOG.warning(err) res['restart'] = str(err) self.wfile.write(bytes(json.dumps(res), "utf8")) return elif req.path == '/': self.send_header('Content-type', 'text/html') self.end_headers() boot = "{}" try: headers = { "Content-Type": "application/json" } if HASS_API_PASSWORD: headers["x-ha-access"] = HASS_API_PASSWORD req = urllib.request.Request("%sbootstrap" % HASS_API, headers=headers, method='GET') with urllib.request.urlopen(req) as response: boot = response.read().decode('utf-8') except Exception as err: LOG.warning("Exception getting bootstrap") LOG.warning(err) color = "green" try: response = urllib.request.urlopen(RELEASEURL) latest = json.loads(response.read().decode('utf-8'))['tag_name'] if VERSION != latest: color = "red" except Exception as err: LOG.warning("Exception getting release") LOG.warning(err) html = get_html().safe_substitute(bootstrap=boot, current=VERSION, versionclass=color, separator="\%s" % os.sep if os.sep == "\\" else os.sep) self.wfile.write(bytes(html, "utf8")) return else: self.send_response(404) self.end_headers() self.wfile.write(bytes("File not found", "utf8")) def do_POST(self): if not check_access(self.client_address[0]): self.do_BLOCK() return req = urlparse(self.path) response = { "error": True, "message": "Generic failure" } length = int(self.headers['content-length']) if req.path == '/api/save': try: postvars = parse_qs(self.rfile.read(length).decode('utf-8'), keep_blank_values=1) except Exception as err: LOG.warning(err) response['message'] = "%s" % (str(err)) postvars = {} if 'filename' in postvars.keys() and 'text' in postvars.keys(): if postvars['filename'] and postvars['text']: try: filename = unquote(postvars['filename'][0]) response['file'] = filename with open(filename, 'wb') as fptr: fptr.write(bytes(postvars['text'][0], "utf-8")) self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() response['error'] = False response['message'] = "File saved successfully" self.wfile.write(bytes(json.dumps(response), "utf8")) return except Exception as err: response['message'] = "%s" % (str(err)) LOG.warning(err) else: response['message'] = "Missing filename or text" elif req.path == '/api/upload': if length > 104857600: #100 MB for now read = 0 while read < length: read += len(self.rfile.read(min(66556, length - read))) self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() response['error'] = True response['message'] = "File too big: %i" % read self.wfile.write(bytes(json.dumps(response), "utf8")) return else: form = cgi.FieldStorage( fp=self.rfile, headers=self.headers, environ={'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': self.headers['Content-Type'], }) filename = form['file'].filename filepath = form['path'].file.read() data = form['file'].file.read() open("%s%s%s" % (filepath, os.sep, filename), "wb").write(data) self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() response['error'] = False response['message'] = "Upload successful" self.wfile.write(bytes(json.dumps(response), "utf8")) return elif req.path == '/api/delete': try: postvars = parse_qs(self.rfile.read(length).decode('utf-8'), keep_blank_values=1) except Exception as err: LOG.warning(err) response['message'] = "%s" % (str(err)) postvars = {} if 'path' in postvars.keys(): if postvars['path']: try: delpath = unquote(postvars['path'][0]) response['path'] = delpath try: if os.path.isdir(delpath): os.rmdir(delpath) else: os.unlink(delpath) self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() response['error'] = False response['message'] = "Deletetion successful" self.wfile.write(bytes(json.dumps(response), "utf8")) return except Exception as err: LOG.warning(err) response['error'] = True response['message'] = str(err) except Exception as err: response['message'] = "%s" % (str(err)) LOG.warning(err) else: response['message'] = "Missing filename or text" elif req.path == '/api/exec_command': try: postvars = parse_qs(self.rfile.read(length).decode('utf-8'), keep_blank_values=1) except Exception as err: LOG.warning(err) response['message'] = "%s" % (str(err)) postvars = {} if 'command' in postvars.keys(): if postvars['command']: try: command = shlex.split(postvars['command'][0]) timeout = 15 if 'timeout' in postvars.keys(): if postvars['timeout']: timeout = int(postvars['timeout'][0]) try: proc = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = proc.communicate(timeout=timeout) self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() response['error'] = False response['message'] = "Command executed: %s" % postvars['command'][0] response['returncode'] = proc.returncode try: response['stdout'] = stdout.decode(sys.getdefaultencoding()) except Exception as err: LOG.warning(err) response['stdout'] = stdout.decode("utf-8", errors="replace") try: response['stderr'] = stderr.decode(sys.getdefaultencoding()) except Exception as err: LOG.warning(err) response['stderr'] = stderr.decode("utf-8", errors="replace") self.wfile.write(bytes(json.dumps(response), "utf8")) return except Exception as err: LOG.warning(err) response['error'] = True response['message'] = str(err) except Exception as err: response['message'] = "%s" % (str(err)) LOG.warning(err) else: response['message'] = "Missing command" elif req.path == '/api/gitadd': try: postvars = parse_qs(self.rfile.read(length).decode('utf-8'), keep_blank_values=1) except Exception as err: LOG.warning(err) response['message'] = "%s" % (str(err)) postvars = {} if 'path' in postvars.keys(): if postvars['path']: try: addpath = unquote(postvars['path'][0]) repo = REPO(addpath, search_parent_directories=True) filepath = "/".join(addpath.split(os.sep)[len(repo.working_dir.split(os.sep)):]) response['path'] = filepath try: repo.index.add([filepath]) response['error'] = False response['message'] = "Added file to index" self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() self.wfile.write(bytes(json.dumps(response), "utf8")) return except Exception as err: LOG.warning(err) response['error'] = True response['message'] = str(err) except Exception as err: response['message'] = "%s" % (str(err)) LOG.warning(err) else: response['message'] = "Missing filename" elif req.path == '/api/commit': try: postvars = parse_qs(self.rfile.read(length).decode('utf-8'), keep_blank_values=1) except Exception as err: LOG.warning(err) response['message'] = "%s" % (str(err)) postvars = {} if 'path' in postvars.keys() and 'message' in postvars.keys(): if postvars['path'] and postvars['message']: try: commitpath = unquote(postvars['path'][0]) response['path'] = commitpath message = unquote(postvars['message'][0]) repo = REPO(commitpath, search_parent_directories=True) try: repo.index.commit(message) response['error'] = False response['message'] = "Changes commited" self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() self.wfile.write(bytes(json.dumps(response), "utf8")) return except Exception as err: response['error'] = True response['message'] = str(err) LOG.debug(response) except Exception as err: response['message'] = "Not a git repository: %s" % (str(err)) LOG.warning("Exception (no repo): %s" % str(err)) else: response['message'] = "Missing path" elif req.path == '/api/checkout': try: postvars = parse_qs(self.rfile.read(length).decode('utf-8'), keep_blank_values=1) except Exception as err: LOG.warning(err) response['message'] = "%s" % (str(err)) postvars = {} if 'path' in postvars.keys() and 'branch' in postvars.keys(): if postvars['path'] and postvars['branch']: try: branchpath = unquote(postvars['path'][0]) response['path'] = branchpath branch = unquote(postvars['branch'][0]) repo = REPO(branchpath, search_parent_directories=True) try: head = [h for h in repo.heads if h.name == branch][0] head.checkout() response['error'] = False response['message'] = "Checked out %s" % branch self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() self.wfile.write(bytes(json.dumps(response), "utf8")) return except Exception as err: response['error'] = True response['message'] = str(err) LOG.warning(response) except Exception as err: response['message'] = "Not a git repository: %s" % (str(err)) LOG.warning("Exception (no repo): %s" % str(err)) else: response['message'] = "Missing path or branch" elif req.path == '/api/newbranch': try: postvars = parse_qs(self.rfile.read(length).decode('utf-8'), keep_blank_values=1) except Exception as err: LOG.warning(err) response['message'] = "%s" % (str(err)) postvars = {} if 'path' in postvars.keys() and 'branch' in postvars.keys(): if postvars['path'] and postvars['branch']: try: branchpath = unquote(postvars['path'][0]) response['path'] = branchpath branch = unquote(postvars['branch'][0]) repo = REPO(branchpath, search_parent_directories=True) try: repo.git.checkout("HEAD", b=branch) response['error'] = False response['message'] = "Created and checked out %s" % branch self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() self.wfile.write(bytes(json.dumps(response), "utf8")) return except Exception as err: response['error'] = True response['message'] = str(err) LOG.warning(response) except Exception as err: response['message'] = "Not a git repository: %s" % (str(err)) LOG.warning("Exception (no repo): %s" % str(err)) else: response['message'] = "Missing path or branch" elif req.path == '/api/init': try: postvars = parse_qs(self.rfile.read(length).decode('utf-8'), keep_blank_values=1) except Exception as err: LOG.warning(err) response['message'] = "%s" % (str(err)) postvars = {} if 'path' in postvars.keys(): if postvars['path']: try: repopath = unquote(postvars['path'][0]) response['path'] = repopath try: repo = REPO.init(repopath) response['error'] = False response['message'] = "Initialized repository in %s" % repopath self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() self.wfile.write(bytes(json.dumps(response), "utf8")) return except Exception as err: response['error'] = True response['message'] = str(err) LOG.warning(response) except Exception as err: response['message'] = "Not a git repository: %s" % (str(err)) LOG.warning("Exception (no repo): %s" % str(err)) else: response['message'] = "Missing path or branch" elif req.path == '/api/newfolder': try: postvars = parse_qs(self.rfile.read(length).decode('utf-8'), keep_blank_values=1) except Exception as err: LOG.warning(err) response['message'] = "%s" % (str(err)) postvars = {} if 'path' in postvars.keys() and 'name' in postvars.keys(): if postvars['path'] and postvars['name']: try: basepath = unquote(postvars['path'][0]) name = unquote(postvars['name'][0]) response['path'] = os.path.join(basepath, name) try: os.makedirs(response['path']) self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() response['error'] = False response['message'] = "Folder created" self.wfile.write(bytes(json.dumps(response), "utf8")) return except Exception as err: LOG.warning(err) response['error'] = True response['message'] = str(err) except Exception as err: response['message'] = "%s" % (str(err)) LOG.warning(err) elif req.path == '/api/newfile': try: postvars = parse_qs(self.rfile.read(length).decode('utf-8'), keep_blank_values=1) except Exception as err: LOG.warning(err) response['message'] = "%s" % (str(err)) postvars = {} if 'path' in postvars.keys() and 'name' in postvars.keys(): if postvars['path'] and postvars['name']: try: basepath = unquote(postvars['path'][0]) name = unquote(postvars['name'][0]) response['path'] = os.path.join(basepath, name) try: with open(response['path'], 'w') as fptr: fptr.write("") self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() response['error'] = False response['message'] = "File created" self.wfile.write(bytes(json.dumps(response), "utf8")) return except Exception as err: LOG.warning(err) response['error'] = True response['message'] = str(err) except Exception as err: response['message'] = "%s" % (str(err)) LOG.warning(err) else: response['message'] = "Missing filename or text" else: response['message'] = "Invalid method" self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() self.wfile.write(bytes(json.dumps(response), "utf8")) return class AuthHandler(RequestHandler): def do_AUTHHEAD(self): LOG.info("Requesting authorization") self.send_response(401) self.send_header('WWW-Authenticate', 'Basic realm=\"HASS-Configurator\"') self.send_header('Content-type', 'text/html') self.end_headers() def do_GET(self): global CREDENTIALS authorization = self.headers.get('Authorization', None) if authorization is None: self.do_AUTHHEAD() self.wfile.write(bytes('no auth header received', 'utf-8')) pass elif authorization == 'Basic %s' % CREDENTIALS.decode('utf-8'): if BANLIMIT: FAIL2BAN_IPS.pop(self.client_address[0], None) super().do_GET() pass else: if BANLIMIT: bancounter = FAIL2BAN_IPS.get(self.client_address[0], 1) if bancounter >= BANLIMIT: LOG.warning("Blocking access from %s" % self.client_address[0]) self.do_BLOCK() return else: FAIL2BAN_IPS[self.client_address[0]] = bancounter + 1 self.do_AUTHHEAD() self.wfile.write(bytes('Authentication required', 'utf-8')) pass def do_POST(self): global CREDENTIALS authorization = self.headers.get('Authorization', None) if authorization is None: self.do_AUTHHEAD() self.wfile.write(bytes('no auth header received', 'utf-8')) pass elif authorization == 'Basic %s' % CREDENTIALS.decode('utf-8'): if BANLIMIT: FAIL2BAN_IPS.pop(self.client_address[0], None) super().do_POST() pass else: if BANLIMIT: bancounter = FAIL2BAN_IPS.get(self.client_address[0], 1) if bancounter >= BANLIMIT: LOG.warning("Blocking access from %s" % self.client_address[0]) self.do_BLOCK() return else: FAIL2BAN_IPS[self.client_address[0]] = bancounter + 1 self.do_AUTHHEAD() self.wfile.write(bytes('Authentication required', 'utf-8')) pass def main(args): global HTTPD, CREDENTIALS if args: load_settings(args[0]) LOG.info("Starting server") server_address = (LISTENIP, LISTENPORT) if CREDENTIALS: CREDENTIALS = base64.b64encode(bytes(CREDENTIALS, "utf-8")) Handler = AuthHandler else: Handler = RequestHandler if not SSL_CERTIFICATE: HTTPD = HTTPServer(server_address, Handler) else: HTTPD = socketserver.TCPServer(server_address, Handler) HTTPD.socket = ssl.wrap_socket(HTTPD.socket, certfile=SSL_CERTIFICATE, keyfile=SSL_KEY, server_side=True) LOG.info('Listening on: %s://%s:%i' % ('https' if SSL_CERTIFICATE else 'http', LISTENIP, LISTENPORT)) if BASEPATH: os.chdir(BASEPATH) HTTPD.serve_forever() if __name__ == "__main__": signal.signal(signal.SIGINT, signal_handler) main(sys.argv[1:])