You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
244 lines
7.4 KiB
244 lines
7.4 KiB
#!/usr/bin/env python3 |
|
# This file is a part of speedtest |
|
# Created at 12/28/2024 |
|
|
|
import os |
|
import sys |
|
|
|
|
|
core = getattr(sys.modules["__main__"], "__file__", None) |
|
if core: |
|
core = os.path.abspath(core) |
|
root = os.path.dirname(core) |
|
if root: |
|
os.chdir(root) |
|
|
|
|
|
sys.path.insert(0, "library") |
|
|
|
|
|
import email.message |
|
import email.utils |
|
import io |
|
import logging |
|
import requests |
|
import smtplib |
|
import time |
|
import traceback |
|
|
|
from configparser import ConfigParser |
|
|
|
from inputimeout import inputimeout, TimeoutOccurred |
|
from commons import * |
|
|
|
|
|
logger = logging.getLogger("speedtest") |
|
formatter = logging.Formatter( |
|
"%(asctime)s %(levelname)s: %(name)s: %(message)s", "%d.%m.%Y %H:%M:%S") |
|
|
|
|
|
CONFIG_FILENAME = "speedtest.ini" |
|
URLS_FILENAME = "urls.txt" |
|
|
|
MAIN_SECTION = "main" |
|
MAIL_SECTION = "mail" |
|
|
|
CHUNK_SIZE = "chunk_size" |
|
SAMPLE_EVERY = "sample_every" |
|
CONNECTION_TIMEOUT = "connection_timeout" |
|
|
|
SMTP_HOST = "smtp_host" |
|
SMTP_STARTTLS = "smtp_starttls" |
|
SMTP_HOST = "smtp_host" |
|
SMTP_PORT = "smtp_port" |
|
SMTP_PASSWORD = "smtp_password" |
|
SMTP_LOGIN = "smtp_login" |
|
SMTP_SENDTO = "smtp_sendto" |
|
|
|
LOG_LEVEL = "log_level" |
|
LOG_FILENAME = "log_filename" |
|
|
|
# YOU HAVE 20 SECONDS TO COMPLY |
|
INPUT_TIMEOUT = 20 |
|
|
|
|
|
def get_client(config: ConfigParser): |
|
server = config[SMTP_HOST] |
|
if config[SMTP_STARTTLS] == "True": |
|
logger.debug("connecting to %s using starttls", server) |
|
client = smtplib.SMTP(config[SMTP_HOST], config[SMTP_PORT]) |
|
client.starttls() |
|
|
|
else: |
|
client = smtplib.SMTP_SSL(config[SMTP_HOST], config[SMTP_PORT]) |
|
client.login(config[SMTP_LOGIN], config[SMTP_PASSWORD]) |
|
return client |
|
|
|
|
|
def send_data(config: ConfigParser, body: str, messages: list): |
|
client = get_client(config) |
|
email_content = "" |
|
for message in messages: |
|
if not message.contents: |
|
continue |
|
email_content += "# Server: %s\n" % (message.server.name) |
|
for content in message.contents: |
|
email_content += content.head |
|
email_content += "\n" |
|
body = content.body |
|
if isinstance(body, str): |
|
email_content += body |
|
|
|
elif isinstance(body, LineFormat): |
|
texts = body.text |
|
for text in texts: |
|
content_, args = text.content, text.args |
|
print("args", args) |
|
email_content += content_.format(*text.args) |
|
email_content += "\n" |
|
|
|
email_content += content.footer |
|
email_content += "\n" |
|
email_content += "_" * 25 |
|
email_content += "\n" |
|
from_addr = "Speedtest <%s>" % (config.get(SMTP_LOGIN)) |
|
msg = email.message.Message() |
|
msg.add_header("Content-Type", "text") |
|
msg.set_payload(email_content) |
|
msg["Subject"] = "Speedtest report" |
|
msg["From"] = from_addr |
|
msg["To"] = config.get(SMTP_SENDTO) |
|
client.sendmail(config.get(SMTP_LOGIN), |
|
config.get(SMTP_SENDTO), msg.as_string()) |
|
|
|
|
|
def measure_speed(url: str, chunk_size: int = 1024, sample_every: int = 5, timeout: int = 10) -> tuple[int, int]: |
|
with io.BytesIO() as f: |
|
start = time.perf_counter() |
|
r = requests.get(url, stream=True, timeout=timeout) |
|
total_length = r.headers.get("content-length") |
|
dl = 0 |
|
min_ = sys.maxsize |
|
max_ = 0 |
|
samples = [] |
|
counter = 0 |
|
if total_length is None: # no content length header |
|
# fixme: we can pre-configure the length |
|
raise ValueError("Content length header not found!") |
|
else: |
|
for chunk in r.iter_content(chunk_size): |
|
dl += len(chunk) |
|
f.write(chunk) |
|
speed = dl // (time.perf_counter() - start) / 1000000 |
|
if (counter % sample_every) == 0: |
|
samples.append(speed) |
|
max_ = max(max_, speed) |
|
min_ = min(min_, speed) |
|
counter += 1 |
|
avg = sum(samples) / len(samples) |
|
return (avg, max_) |
|
|
|
|
|
def test_url(url: str, chunk_size: int, sample_every: int, timeout: int) -> Message: |
|
avg = 0 |
|
max_ = 0 |
|
try: |
|
avg, max_ = measure_speed(url, chunk_size, sample_every, timeout) |
|
except requests.exceptions.Timeout: |
|
logger.error("Timeout to reach url: %s" % url, exc_info=True) |
|
except Exception: |
|
logger.error("Unable to test url: %s" % url, exc_info=True) |
|
else: |
|
text = [] |
|
text.append(Text("Average speed: {:.2f} MB/s", [avg])) |
|
text.append(Text("Maximum speed: {:.2f} MB/s", [max_])) |
|
content = Content("Speed test results", LineFormat(text)) |
|
message = Message([content]) |
|
message.server = Connection(url) |
|
message.server.name = url |
|
return message |
|
|
|
|
|
def get_test_results(urls: list, chunk_size: int, sample_every: int, timeout: int): |
|
messages = [] |
|
for url in urls: |
|
logger.debug("Going to perform speedtest on %s" % url) |
|
message = test_url(url, chunk_size, sample_every, timeout) |
|
if message is not None: |
|
messages.append(message) |
|
else: |
|
logger.warning("The url %s was not tested" % url) |
|
return messages |
|
|
|
|
|
def _get_urls() -> list: |
|
with open(URLS_FILENAME) as file: |
|
return file.read().splitlines() |
|
|
|
|
|
def _setup_logging(config: ConfigParser): |
|
log_level = config[LOG_LEVEL] |
|
log_filename = config[LOG_FILENAME] |
|
log_level = logging.getLevelName(log_level) |
|
logger.setLevel(log_level) |
|
console_handler = logging.StreamHandler(sys.stdout) |
|
console_handler.setFormatter(formatter) |
|
logger.addHandler(console_handler) |
|
file_handler = logging.FileHandler(log_filename) |
|
file_handler.setFormatter(formatter) |
|
logger.addHandler(file_handler) |
|
|
|
|
|
def excepthook(*exc_info): |
|
text = "".join(traceback.format_exception(*exc_info)) |
|
logger.error("unhandled exception caught: %s", text) |
|
|
|
|
|
def _get_input(prompt: str, timeout: int = INPUT_TIMEOUT, default: str="") -> str: |
|
try: |
|
return inputimeout(prompt, timeout) |
|
except TimeoutOccurred: |
|
print("(fallback to default: %s)" % default) |
|
return default |
|
|
|
|
|
def get_name_and_isp() -> tuple[str, str]: |
|
print("In order to proceed, we need to ask you a couple questions. Every question has a 20 second timeout, then the default value will be used.") |
|
name = _get_input("Please input your name: ", INPUT_TIMEOUT, "undefined") |
|
isp = _get_input("Please input your ISP name: ", INPUT_TIMEOUT, "undefined") |
|
return name, isp |
|
|
|
|
|
def main(): |
|
sys.excepthook = excepthook |
|
parser = ConfigParser() |
|
parser.read(CONFIG_FILENAME) |
|
main_section = parser[MAIN_SECTION] |
|
mail_section = parser[MAIL_SECTION] |
|
|
|
_setup_logging(main_section) |
|
sample_every = int(main_section[SAMPLE_EVERY]) |
|
chunk_size = int(main_section[CHUNK_SIZE]) |
|
timeout = int(main_section[CONNECTION_TIMEOUT]) |
|
|
|
urls = _get_urls() |
|
logger.debug("Found %d urls" % len(urls)) |
|
name, isp = get_name_and_isp() |
|
messages = get_test_results(urls, chunk_size, sample_every, timeout) |
|
if messages is None or len(messages) == 0: |
|
print("There was an error, no tests were performed!") |
|
raise RuntimeError("No tests performed!") |
|
|
|
client = get_client(mail_section) |
|
body = "User name: %s; ISP: %s" % (name, isp) |
|
send_data(mail_section, body, messages) |
|
logger.info("Done! You're amazing!") |
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
main() |
|
except Exception: |
|
logger.error("Critical failure: ", exc_info=True) |
|
if os.name == "nt": |
|
os.system("pause")
|
|
|