Windows network speed tester
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

243 lines
7.4 KiB

3 weeks ago
#!/usr/bin/env python3
# This file is a part of speedtest
# Created at 12/28/2024
import os
import sys
core = getattr(sys.modules["__main__"], "__file__", None)
if core:
core = os.path.abspath(core)
root = os.path.dirname(core)
if root:
os.chdir(root)
sys.path.insert(0, "library")
import email.message
import email.utils
import io
import logging
import requests
import smtplib
import time
import traceback
from configparser import ConfigParser
from inputimeout import inputimeout, TimeoutOccurred
from commons import *
logger = logging.getLogger("speedtest")
formatter = logging.Formatter(
"%(asctime)s %(levelname)s: %(name)s: %(message)s", "%d.%m.%Y %H:%M:%S")
CONFIG_FILENAME = "speedtest.ini"
URLS_FILENAME = "urls.txt"
MAIN_SECTION = "main"
MAIL_SECTION = "mail"
CHUNK_SIZE = "chunk_size"
SAMPLE_EVERY = "sample_every"
CONNECTION_TIMEOUT = "connection_timeout"
SMTP_HOST = "smtp_host"
SMTP_STARTTLS = "smtp_starttls"
SMTP_HOST = "smtp_host"
SMTP_PORT = "smtp_port"
SMTP_PASSWORD = "smtp_password"
SMTP_LOGIN = "smtp_login"
SMTP_SENDTO = "smtp_sendto"
LOG_LEVEL = "log_level"
LOG_FILENAME = "log_filename"
# YOU HAVE 20 SECONDS TO COMPLY
INPUT_TIMEOUT = 20
def get_client(config: ConfigParser):
server = config[SMTP_HOST]
if config[SMTP_STARTTLS] == "True":
logger.debug("connecting to %s using starttls", server)
client = smtplib.SMTP(config[SMTP_HOST], config[SMTP_PORT])
client.starttls()
else:
client = smtplib.SMTP_SSL(config[SMTP_HOST], config[SMTP_PORT])
client.login(config[SMTP_LOGIN], config[SMTP_PASSWORD])
return client
def send_data(config: ConfigParser, email_content: str, messages: list):
3 weeks ago
client = get_client(config)
for message in messages:
if not message.contents:
continue
email_content += "# Server: %s\n" % (message.server.name)
for content in message.contents:
email_content += content.head
email_content += "\n"
body = content.body
if isinstance(body, str):
email_content += body
elif isinstance(body, LineFormat):
texts = body.text
for text in texts:
content_, args = text.content, text.args
email_content += content_.format(*text.args)
email_content += "\n"
email_content += content.footer
email_content += "\n"
email_content += "_" * 25
email_content += "\n"
from_addr = "Speedtest <%s>" % (config.get(SMTP_LOGIN))
msg = email.message.Message()
msg.add_header("Content-Type", "text")
msg.set_payload(email_content)
msg["Subject"] = "Speedtest report"
msg["From"] = from_addr
msg["To"] = config.get(SMTP_SENDTO)
client.sendmail(config.get(SMTP_LOGIN),
config.get(SMTP_SENDTO), msg.as_string())
def measure_speed(url: str, chunk_size: int = 1024, sample_every: int = 5, timeout: int = 10) -> tuple[int, int]:
with io.BytesIO() as f:
start = time.perf_counter()
r = requests.get(url, stream=True, timeout=timeout)
total_length = r.headers.get("content-length")
dl = 0
min_ = sys.maxsize
max_ = 0
samples = []
counter = 0
if total_length is None: # no content length header
# fixme: we can pre-configure the length
raise ValueError("Content length header not found!")
else:
for chunk in r.iter_content(chunk_size):
dl += len(chunk)
f.write(chunk)
speed = dl // (time.perf_counter() - start) / 1000000
if (counter % sample_every) == 0:
samples.append(speed)
max_ = max(max_, speed)
min_ = min(min_, speed)
counter += 1
avg = sum(samples) / len(samples)
return (avg, max_)
def test_url(url: str, chunk_size: int, sample_every: int, timeout: int) -> Message:
avg = 0
max_ = 0
try:
avg, max_ = measure_speed(url, chunk_size, sample_every, timeout)
except requests.exceptions.Timeout:
logger.error("Timeout to reach url: %s" % url, exc_info=True)
except Exception:
logger.error("Unable to test url: %s" % url, exc_info=True)
else:
text = []
text.append(Text("Average speed: {:.2f} MB/s", [avg]))
text.append(Text("Maximum speed: {:.2f} MB/s", [max_]))
content = Content("Speed test results", LineFormat(text))
message = Message([content])
message.server = Connection(url)
message.server.name = url
return message
def get_test_results(urls: list, chunk_size: int, sample_every: int, timeout: int):
messages = []
for url in urls:
logger.debug("Going to perform speedtest on %s" % url)
message = test_url(url, chunk_size, sample_every, timeout)
if message is not None:
messages.append(message)
else:
logger.warning("The url %s was not tested" % url)
return messages
def _get_urls() -> list:
with open(URLS_FILENAME) as file:
return file.read().splitlines()
def _setup_logging(config: ConfigParser):
log_level = config[LOG_LEVEL]
log_filename = config[LOG_FILENAME]
log_level = logging.getLevelName(log_level)
logger.setLevel(log_level)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
file_handler = logging.FileHandler(log_filename)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
def excepthook(*exc_info):
text = "".join(traceback.format_exception(*exc_info))
logger.error("unhandled exception caught: %s", text)
def _get_input(prompt: str, timeout: int = INPUT_TIMEOUT, default: str="") -> str:
try:
return inputimeout(prompt, timeout)
except TimeoutOccurred:
print("(fallback to default: %s)" % default)
return default
def get_name_and_isp() -> tuple[str, str]:
print("In order to proceed, we need to ask you a couple questions. Every question has a 20 second timeout, then the default value will be used.")
name = _get_input("Please input your name: ", INPUT_TIMEOUT, "undefined")
isp = _get_input("Please input your ISP name: ", INPUT_TIMEOUT, "undefined")
return name, isp
def main():
sys.excepthook = excepthook
parser = ConfigParser()
parser.read(CONFIG_FILENAME)
main_section = parser[MAIN_SECTION]
mail_section = parser[MAIL_SECTION]
_setup_logging(main_section)
sample_every = int(main_section[SAMPLE_EVERY])
chunk_size = int(main_section[CHUNK_SIZE])
timeout = int(main_section[CONNECTION_TIMEOUT])
urls = _get_urls()
logger.debug("Found %d urls" % len(urls))
name, isp = get_name_and_isp()
messages = get_test_results(urls, chunk_size, sample_every, timeout)
if messages is None or len(messages) == 0:
print("There was an error, no tests were performed!")
raise RuntimeError("No tests performed!")
client = get_client(mail_section)
body = "# User name: %s; ISP: %s\n" % (name, isp)
3 weeks ago
send_data(mail_section, body, messages)
logger.info("Done! You're amazing!")
if __name__ == "__main__":
try:
main()
except Exception:
logger.error("Critical failure: ", exc_info=True)
if os.name == "nt":
os.system("pause")