#!/usr/bin/python3 """ Proof of concept configurator for Home Assistant. https://github.com/danielperna84/hass-poc-configurator """ import os import sys import json import ssl import socketserver import urllib.request import base64 import ipaddress import signal from string import Template from http.server import BaseHTTPRequestHandler, HTTPServer from urllib.parse import urlparse, parse_qs ### Some options for you to change LISTENIP = "0.0.0.0" LISTENPORT = 3218 # Set BASEPATH to something like "/home/hass/.homeasssitant" if you're not running the configurator from that path BASEPATH = None # Set the paths to a certificate and the key if you're using SSL, e.g "/etc/ssl/certs/mycert.pem" SSL_CERTIFICATE = None SSL_KEY = None # Set the destination where the HASS API is reachable HASS_API = "http://127.0.0.1:8123/api/" # If a password is required to access the API, set it in the form of "password" HASS_API_PASSWORD = None # To enable authentication, set the credentials in the form of "username:password" CREDENTIALS = None # Limit access to the configurator by adding allowed IP addresses / networks to the list, # e.g ALLOWED_NETWORKS = ["192.168.0.0/24", "172.16.47.23"] ALLOWED_NETWORKS = [] # List of statically banned IP addresses, e.g. ["1.1.1.1", "2.2.2.2"] BANNED_IPS = [] # Ban IPs after n failed login attempts. Restart service to reset banning. The default of `0` disables this feature. BANLIMIT = 0 # File extensions the file browser should include EXTENSIONS = ['yaml', 'conf'] ### End of options RELEASEURL = "https://api.github.com/repos/danielperna84/hass-poc-configurator/releases/latest" VERSION = "0.0.7" BASEDIR = "." DEV = False HTTPD = None FAIL2BAN_IPS = {} INDEX = Template(""" HASS-PoC-Configurator
$current
""") def signal_handler(signal, frame): global HTTPD print("Shutting down server") HTTPD.server_close() sys.exit(0) def load_settings(settingsfile): global LISTENIP, LISTENPORT, BASEPATH, SSL_CERTIFICATE, SSL_KEY, HASS_API, \ HASS_API_PASSWORD, CREDENTIALS, ALLOWED_NETWORKS, BANNED_IPS, BANLIMIT, \ EXTENSIONS try: if os.path.isfile(settingsfile): with open(settingsfile) as fptr: settings = json.loads(fptr.read()) LISTENIP = settings.get("LISTENIP", LISTENIP) LISTENPORT = settings.get("LISTENPORT", LISTENPORT) BASEPATH = settings.get("BASEPATH", BASEPATH) SSL_CERTIFICATE = settings.get("SSL_CERTIFICATE", SSL_CERTIFICATE) SSL_KEY = settings.get("SSL_KEY", SSL_KEY) HASS_API = settings.get("HASS_API", HASS_API) HASS_API_PASSWORD = settings.get("HASS_API_PASSWORD", HASS_API_PASSWORD) CREDENTIALS = settings.get("CREDENTIALS", CREDENTIALS) ALLOWED_NETWORKS = settings.get("ALLOWED_NETWORKS", ALLOWED_NETWORKS) BANNED_IPS = settings.get("BANNED_IPS", BANNED_IPS) BANLIMIT = settings.get("BANLIMIT", BANLIMIT) EXTENSIONS = settings.get("EXTENSIONS", EXTENSIONS) except Exception as err: print(err) print("Not loading static settings") return False def get_html(): if DEV: try: with open("dev.html") as fptr: html = Template(fptr.read()) return html except Exception as err: print(err) print("Delivering embedded HTML") return INDEX def check_access(clientip): global BANNED_IPS if clientip in BANNED_IPS: return False if not ALLOWED_NETWORKS: return True for net in ALLOWED_NETWORKS: ipobject = ipaddress.ip_address(clientip) if ipobject in ipaddress.ip_network(net): return True BANNED_IPS.append(clientip) return False class Node: def __init__(self, id, text, parent): self.id = id self.text = text self.parent = parent self.icon = 'jstree-folder' self.state = {'opened': self.id == '.'} if os.path.isfile(os.path.join(parent, text)): self.icon = "jstree-file" def is_equal(self, node): return self.id == node.id def as_json(self): return dict( id=self.id, parent=self.parent, text=self.text, icon=self.icon, state=self.state ) def get_nodes_from_path(path): nodes = [] path_nodes = path.split(os.sep) for idx, node_name in enumerate(path_nodes): parent = None node_id = os.sep.join(path_nodes[0:idx+1]) if idx != 0: parent = os.sep.join(path_nodes[0:idx]) if os.path.isfile(os.path.join(parent, node_name)) and node_name.split('.')[-1] not in EXTENSIONS: continue else: parent = "#" nodes.append(Node(node_id, node_name, parent)) return nodes def getdirs(searchpath): unique_nodes = [] for root, dirs, files in os.walk(searchpath, topdown=True): if '/deps' not in root and '/.git' not in root and '/www' not in root: for name in files: path = os.path.join(root, name) nodes = get_nodes_from_path(path) for node in nodes: if not any(node.is_equal(unode) for unode in unique_nodes): unique_nodes.append(node) return [node.as_json() for node in unique_nodes] class RequestHandler(BaseHTTPRequestHandler): def do_BLOCK(self): self.send_response(420) self.end_headers() self.wfile.write(bytes("Policy not fulfilled", "utf8")) def do_GET(self): if not check_access(self.client_address[0]): self.do_BLOCK() return req = urlparse(self.path) query = parse_qs(req.query) self.send_response(200) if req.path == '/api/files': self.send_header('Content-type','text/json') self.end_headers() dirs = sorted(getdirs(BASEDIR), key=lambda x: x["text"]) self.wfile.write(bytes(json.dumps(dirs), "utf8")) return elif req.path == '/api/file': content = "" self.send_header('Content-type','text/text') self.end_headers() filename = query.get('filename', None) if filename: if os.path.isfile(os.path.join(BASEDIR, filename[0])): with open(os.path.join(BASEDIR, filename[0])) as fptr: content += fptr.read() self.wfile.write(bytes(content, "utf8")) return elif req.path == '/api/restart': print("/api/restart") self.send_header('Content-type','text/json') self.end_headers() r = {"restart": False} try: headers = { "Content-Type": "application/json" } if HASS_API_PASSWORD: headers["x-ha-access"] = HASS_API_PASSWORD req = urllib.request.Request("%sservices/homeassistant/restart" % HASS_API, headers=headers, method='POST') with urllib.request.urlopen(req) as response: r = json.loads(response.read().decode('utf-8')) print(r) except Exception as err: print(err) r['restart'] = str(err) self.wfile.write(bytes(json.dumps(r), "utf8")) return elif req.path == '/': self.send_header('Content-type','text/html') self.end_headers() boot = "{}" try: headers = { "Content-Type": "application/json" } if HASS_API_PASSWORD: headers["x-ha-access"] = HASS_API_PASSWORD req = urllib.request.Request("%sbootstrap" % HASS_API, headers=headers, method='GET') with urllib.request.urlopen(req) as response: boot = response.read().decode('utf-8') except Exception as err: print("Exception getting bootstrap") print(err) color = "green" try: response = urllib.request.urlopen(RELEASEURL) latest = json.loads(response.read().decode('utf-8'))['tag_name'] if VERSION != latest: color = "red" except Exception as err: print("Exception getting release") print(err) html = get_html().safe_substitute(bootstrap=boot, current=VERSION, versionclass=color) self.wfile.write(bytes(html, "utf8")) return else: self.send_response(404) self.end_headers() self.wfile.write(bytes("File not found", "utf8")) def do_POST(self): if not check_access(self.client_address[0]): self.do_BLOCK() return postvars = {} response = "Failure" try: length = int(self.headers['content-length']) postvars = parse_qs(self.rfile.read(length).decode('utf-8'), keep_blank_values=1) except Exception as err: print(err) response = "%s" % (str(err)) if 'filename' in postvars.keys() and 'text' in postvars.keys(): if postvars['filename'] and postvars['text']: try: filename = postvars['filename'][0] with open(os.path.join(BASEDIR, filename), 'wb') as fptr: fptr.write(bytes(postvars['text'][0], "utf-8")) self.send_response(200) self.end_headers() self.wfile.write(bytes("File saved successfully", "utf8")) return except Exception as err: response = "%s" % (str(err)) print(err) else: response = "Missing filename or text" self.send_response(200) self.end_headers() self.wfile.write(bytes(response, "utf8")) return class AuthHandler(RequestHandler): def do_AUTHHEAD(self): print("Requesting authorization") self.send_response(401) self.send_header('WWW-Authenticate', 'Basic realm=\"HASS-PoC-Configurator\"') self.send_header('Content-type', 'text/html') self.end_headers() def do_GET(self): global CREDENTIALS authorization = self.headers.get('Authorization', None) if authorization == None: self.do_AUTHHEAD() self.wfile.write(bytes('no auth header received', 'utf-8')) pass elif authorization == 'Basic %s' % CREDENTIALS.decode('utf-8'): if BANLIMIT: FAIL2BAN_IPS.pop(self.client_address[0], None) super().do_GET() pass else: if BANLIMIT: bancounter = FAIL2BAN_IPS.get(self.client_address[0], 1) if bancounter >= BANLIMIT: print("Blocking access from %s" % self.client_address[0]) self.do_BLOCK() return else: FAIL2BAN_IPS[self.client_address[0]] = bancounter + 1 self.do_AUTHHEAD() self.wfile.write(bytes('Authentication required', 'utf-8')) pass def do_POST(self): global CREDENTIALS authorization = self.headers.get('Authorization', None) if authorization == None: self.do_AUTHHEAD() self.wfile.write(bytes('no auth header received', 'utf-8')) pass elif authorization == 'Basic %s' % CREDENTIALS.decode('utf-8'): if BANLIMIT: FAIL2BAN_IPS.pop(self.client_address[0], None) super().do_POST() pass else: if BANLIMIT: bancounter = FAIL2BAN_IPS.get(self.client_address[0], 1) if bancounter >= BANLIMIT: print("Blocking access from %s" % self.client_address[0]) self.do_BLOCK() return else: FAIL2BAN_IPS[self.client_address[0]] = bancounter + 1 self.do_AUTHHEAD() self.wfile.write(bytes('Authentication required', 'utf-8')) pass def main(args): global HTTPD, CREDENTIALS if args: load_settings(args[0]) print("Starting server") server_address = (LISTENIP, LISTENPORT) if CREDENTIALS: CREDENTIALS = base64.b64encode(bytes(CREDENTIALS, "utf-8")) Handler = AuthHandler else: Handler = RequestHandler if not SSL_CERTIFICATE: HTTPD = HTTPServer(server_address, Handler) else: HTTPD = socketserver.TCPServer(server_address, Handler) HTTPD.socket = ssl.wrap_socket(HTTPD.socket, certfile=SSL_CERTIFICATE, keyfile=SSL_KEY, server_side=True) print('Listening on: %s://%s:%i' % ('https' if SSL_CERTIFICATE else 'http', LISTENIP, LISTENPORT)) if BASEPATH: os.chdir(BASEPATH) HTTPD.serve_forever() if __name__ == "__main__": signal.signal(signal.SIGINT, signal_handler) main(sys.argv[1:])