#!/usr/bin/env python3 # This file is a part of speedtest # Created at 12/28/2024 import os import sys core = getattr(sys.modules["__main__"], "__file__", None) if core: core = os.path.abspath(core) root = os.path.dirname(core) if root: os.chdir(root) sys.path.insert(0, "library") import email.message import email.utils import io import logging import requests import smtplib import time import traceback from configparser import ConfigParser from inputimeout import inputimeout, TimeoutOccurred from azure_updater import update_azure_lists from commons import * logger = logging.getLogger("speedtest") formatter = logging.Formatter( "%(asctime)s %(levelname)s: %(name)s: %(message)s", "%d.%m.%Y %H:%M:%S") USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36" CONFIG_FILENAME = "speedtest.ini" URLS_FILENAME = "urls.txt" MAIN_SECTION = "main" MAIL_SECTION = "mail" CHUNK_SIZE = "chunk_size" SAMPLE_EVERY = "sample_every" CONNECTION_TIMEOUT = "connection_timeout" AZURE_UPDATE = "azure_update" SMTP_HOST = "smtp_host" SMTP_STARTTLS = "smtp_starttls" SMTP_HOST = "smtp_host" SMTP_PORT = "smtp_port" SMTP_PASSWORD = "smtp_password" SMTP_LOGIN = "smtp_login" SMTP_SENDTO = "smtp_sendto" LOG_LEVEL = "log_level" LOG_FILENAME = "log_filename" # YOU HAVE 20 SECONDS TO COMPLY INPUT_TIMEOUT = 20 def get_client(config: ConfigParser): server = config[SMTP_HOST] if config[SMTP_STARTTLS] == "True": logger.debug("connecting to %s using starttls", server) client = smtplib.SMTP(config[SMTP_HOST], config[SMTP_PORT]) client.starttls() else: client = smtplib.SMTP_SSL(config[SMTP_HOST], config[SMTP_PORT]) client.login(config[SMTP_LOGIN], config[SMTP_PASSWORD]) return client def send_data(config: ConfigParser, email_content: str, messages: list): client = get_client(config) for message in messages: if not message.contents: continue email_content += "# Server: %s\n" % (message.server.name) for content in message.contents: email_content += content.head email_content += "\n" body = content.body if isinstance(body, str): email_content += body elif isinstance(body, LineFormat): texts = body.text for text in texts: content_, args = text.content, text.args email_content += content_.format(*text.args) email_content += "\n" email_content += content.footer email_content += "\n" from_addr = "Speedtest <%s>" % (config.get(SMTP_LOGIN)) msg = email.message.Message() msg.add_header("Content-Type", "text") msg.set_payload(email_content) msg["Subject"] = "Speedtest report" msg["From"] = from_addr msg["To"] = config.get(SMTP_SENDTO) client.sendmail(config.get(SMTP_LOGIN), config.get(SMTP_SENDTO), msg.as_string()) def measure_speed(url: str, chunk_size: int = 1024, sample_every: int = 5, timeout: int = 10) -> tuple[int, int]: with io.BytesIO() as f: start = time.perf_counter() r = requests.get(url, stream=True, timeout=timeout, headers={"User-Agent": USER_AGENT}) total_length = r.headers.get("content-length") dl = 0 min_ = sys.maxsize max_ = 0 samples = [] counter = 0 if total_length is None: # no content length header # fixme: we can pre-configure the length raise ValueError("Content length header not found!") else: for chunk in r.iter_content(chunk_size): dl += len(chunk) f.write(chunk) speed = dl // (time.perf_counter() - start) / 1000000 if (counter % sample_every) == 0: samples.append(speed) done = int(30 * dl / int(total_length)) sys.stdout.write("\r[%s%s] %s MB/s" % ("=" * done, " " * (30-done), speed)) max_ = max(max_, speed) min_ = min(min_, speed) counter += 1 avg = sum(samples) / len(samples) print("\n") return (avg, max_) def test_url(server: str, chunk_size: int, sample_every: int, timeout: int) -> tuple[int, int]: avg = 0 max_ = 0 location, hosting, url = server.split("|") try: avg, max_ = measure_speed(url, chunk_size, sample_every, timeout) except requests.exceptions.Timeout: logger.error("Timeout to reach url: %s" % url, exc_info=True) except requests.exceptions.ConnectionError as e: logger.error("Unable to test url: %s (requests error: %s)" % (url, str(e))) except Exception: logger.error("Unable to test url: %s" % url, exc_info=True) else: return avg, max_ def get_test_results(servers: list[str], chunk_size: int, sample_every: int, timeout: int) -> list[dict]: result = [] for server in servers: location, hosting, url = server.split("|") logger.debug("Going to perform speedtest on %s" % server) test_result = test_url(server, chunk_size, sample_every, timeout) if test_result is not None: avg, max_ = test_result data = {"avg": avg, "max": max_, "server": server} result.append(data) else: logger.warning("The server %s was not tested" % server) continue return result def _get_test_servers() -> list: with open(URLS_FILENAME) as file: return file.read().splitlines() def _setup_logging(config: ConfigParser): log_level = config[LOG_LEVEL] log_filename = config[LOG_FILENAME] log_level = logging.getLevelName(log_level) logger.setLevel(log_level) console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) logger.addHandler(console_handler) file_handler = logging.FileHandler(log_filename) file_handler.setFormatter(formatter) logger.addHandler(file_handler) def excepthook(*exc_info): text = "".join(traceback.format_exception(*exc_info)) logger.error("unhandled exception caught: %s", text) def _get_input(prompt: str, timeout: int = INPUT_TIMEOUT, default: str="") -> str: try: return inputimeout(prompt, timeout) except TimeoutOccurred: print("(fallback to default: %s)" % default) return default def get_name_and_isp() -> tuple[str, str]: print("In order to proceed, we need to ask you a couple questions. Every question has a 20 second timeout, then the default value will be used.") name = _get_input("Please input your name: ", INPUT_TIMEOUT, "undefined") isp = _get_input("Please input your ISP name: ", INPUT_TIMEOUT, "undefined") return name, isp def combine_results(results: dict) -> list[Message]: messages = [] for item in sorted(results, key=lambda cur: -cur["avg"]): text = [] avg = item["avg"] max_ = item["max"] location, hosting, url = item["server"].split("|") text.append(Text("Average speed: {:.2f} MB/s", [avg])) text.append(Text("Maximum speed: {:.2f} MB/s", [max_])) content = Content("Speed test results", LineFormat(text)) content.footer = "_" * 25 message = Message([content]) message.server = Connection(url) message.server.name = location + " " + hosting + " " + url messages.append(message) return messages def main(): # sys.excepthook = excepthook parser = ConfigParser() parser.read(CONFIG_FILENAME) main_section = parser[MAIN_SECTION] mail_section = parser[MAIL_SECTION] _setup_logging(main_section) sample_every = int(main_section[SAMPLE_EVERY]) chunk_size = int(main_section[CHUNK_SIZE]) timeout = int(main_section[CONNECTION_TIMEOUT]) azure_update = main_section[AZURE_UPDATE] if azure_update == "True": print("Updating azure lists") update_azure_lists() servers = _get_test_servers() logger.debug("Found %d servers" % len(servers)) name, isp = get_name_and_isp() results = get_test_results(servers, chunk_size, sample_every, timeout) messages = combine_results(results) if messages is None or len(messages) == 0: print("There was an error, no tests were performed!") raise RuntimeError("No tests performed!") client = get_client(mail_section) body = "# User name: %s; ISP: %s\n" % (name, isp) send_data(mail_section, body, messages) logger.info("Done! You're amazing!") if __name__ == "__main__": try: main() except Exception: logger.error("Critical failure: ", exc_info=True) if os.name == "nt": os.system("pause")