Browse Source

First commit

master
John Smith 3 weeks ago
commit
dd9224edaf
  1. 3
      .gitignore
  2. 18
      README.md
  3. 31
      bootstrap.ps1
  4. 101
      library/commons.py
  5. 75
      library/inputimeout.py
  6. BIN
      resources/speedometer.ico
  7. 10
      resources/speedometer.svg
  8. 2
      setup.bat
  9. 93
      setup.py
  10. 20
      speedtest.ini
  11. 244
      speedtest.py
  12. 1
      urls.txt

3
.gitignore vendored

@ -0,0 +1,3 @@ @@ -0,0 +1,3 @@
__pycache__
*.pyc
speedtest.log

18
README.md

@ -0,0 +1,18 @@ @@ -0,0 +1,18 @@
# Speedtest
A speedtest utiliy that is supposed to be ran on the end user machine.
The utility sends email reports about average and maximum network speed to the pre-configured email address.
# Setting up
In order for the utility to perform any tests, consider filling two configuration files:
* `speedtest.ini` — the file where you would put the logging, chunk size, number of samples and email settings.
* `urls.txt` — the file where you put direct links to the files you want the tests to be performed on.
# Usage
Just run `speedtest.py` or `speedtest.exe` after completing the configuration.
Done! You're amazing!

31
bootstrap.ps1

@ -0,0 +1,31 @@ @@ -0,0 +1,31 @@
$url = "https://www.python.org/ftp/python/3.11.5/python-3.11.5-amd64.exe"
$output = "python311_installer.exe"
$install_dir = "C:\Python311"
if (!(Test-Path $install_dir)) {
Write-Output "Python 3.11 installation not found. Installing..."
if (!(Test-Path $output)) {
$start_time = Get-Date
$wc = New-Object System.Net.WebClient
$wc.DownloadFile($url, $output)
#OR
(New-Object System.Net.WebClient).DownloadFile($url, $output)
Write-Output "Python 3.11 Downloaded. Time taken: $((Get-Date).Subtract($start_time).Seconds) second(s)"
}
Write-Output "Installing Python 3.11 to $install_dir"
Start-Process $output -ArgumentList "/quiet","InstallAllUsers=1","TargetDir=$install_dir" -NoNewWindow -Wait
}
if (!(Test-Path $install_dir)) {
Write-Output "Python 3.11 installation still not found at $install_dir. Please repair your installation"
Read-Host -Prompt "Press Enter to continue"
}
Write-Output "Installing packaging"
Start-Process "$install_dir\python.exe" -ArgumentList "-m pip install packaging" -Wait
Write-Output "Installing setuptools"
Start-Process "$install_dir\python.exe" -ArgumentList "-m pip install setuptools" -Wait
Write-Output "Installing py2exe"
Start-Process "$install_dir\python.exe" -ArgumentList "-m pip install py2exe" -Wait

101
library/commons.py

@ -0,0 +1,101 @@ @@ -0,0 +1,101 @@
# This file is a part of Teflon.
class Format(object):
pass
class Message(object):
def __init__(self, contents: list):
self.contents = contents
self.server = None
class Cell(object):
def __init__(self, content: str, args=None):
self.content = content
self.args = args or []
def add_arg(self, arg):
self.args.append(arg)
class Row(object):
def __init__(self, cells=None):
self._cells = cells or []
def add_cell(self, cell: Cell):
self._cells.append(cell)
def get_cells(self) -> list:
return self._cells
class TableFormat(Format):
def __init__(self, rows: int, cols: int, rows_=None):
self.rows = rows
self.cols = cols
self._rows = rows_ or []
def add_row(self, row: Row):
self._rows.append(row)
def get_rows(self) -> list:
return self._rows
class Text(object):
def __init__(self, content, args=None):
self.content = content
self.args = args or []
def add_arg(self, arg):
self.args.append(arg)
class LineFormat(Format):
def __init__(self, text=None):
self.text = text or []
def add_text(self, text):
self.text.append(text)
class Content(object):
def __init__(self, head: str = "", body: str = "", footer: str ="\n"):
self.head = head
# body is a list of Body joined with EOL
self.body = body
self.footer = footer
class Connection(object):
"""
Common connection interface
"""
def __init__(self, name: str):
self.name = name
self._notify_methods = []
def set_notify_methods(self, methods: list):
self._notify_methods = methods
def get_notify_methods(self) -> list:
return self._notify_methods
def connect(self):
raise NotImplementedError("Method should be overridden")
def get_executor(self):
raise NotImplementedError("Method should be overridden")
def disconnect(self):
raise NotImplementedError("Method should be overridden")

75
library/inputimeout.py

@ -0,0 +1,75 @@ @@ -0,0 +1,75 @@
# Code from https://github.com/johejo/inputimeout/
import sys
DEFAULT_TIMEOUT = 30.0
INTERVAL = 0.05
SP = ' '
CR = '\r'
LF = '\n'
CRLF = CR + LF
class TimeoutOccurred(Exception):
pass
def echo(string):
sys.stdout.write(string)
sys.stdout.flush()
def posix_inputimeout(prompt='', timeout=DEFAULT_TIMEOUT):
echo(prompt)
sel = selectors.DefaultSelector()
sel.register(sys.stdin, selectors.EVENT_READ)
events = sel.select(timeout)
if events:
key, _ = events[0]
return key.fileobj.readline().rstrip(LF)
else:
echo(LF)
termios.tcflush(sys.stdin, termios.TCIFLUSH)
raise TimeoutOccurred
def win_inputimeout(prompt='', timeout=DEFAULT_TIMEOUT):
echo(prompt)
begin = time.monotonic()
end = begin + timeout
line = ''
while time.monotonic() < end:
if msvcrt.kbhit():
c = msvcrt.getwche()
if c in (CR, LF):
echo(CRLF)
return line
if c == '\003':
raise KeyboardInterrupt
if c == '\b':
line = line[:-1]
cover = SP * len(prompt + line + SP)
echo(''.join([CR, cover, CR, prompt, line]))
else:
line += c
time.sleep(INTERVAL)
echo(CRLF)
raise TimeoutOccurred
try:
import msvcrt
except ImportError:
import selectors
import termios
inputimeout = posix_inputimeout
else:
import time
inputimeout = win_inputimeout

BIN
resources/speedometer.ico

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

10
resources/speedometer.svg

@ -0,0 +1,10 @@ @@ -0,0 +1,10 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 32 32" id="speedometer">
<g>
<path d="M16 3a13 13 0 1 0 9.86 21.47A13 13 0 0 0 16 3Zm7.64 20.9-.37.33-.5.42-.4.3-.53.36-.42.26-.57.3-.44.21-.61.24-.45.16-.66.18-.43.11c-.25.05-.5.09-.75.12l-.37.06a10.6 10.6 0 0 1-2.29 0l-.37-.06c-.25 0-.5-.07-.75-.12l-.43-.11-.66-.18-.45-.16-.61-.24-.44-.21-.57-.3-.42-.25-.53-.36-.4-.3-.5-.42-.36-.33-.15-.16a10 10 0 0 1 15.59 0Zm1.45-1.72a12 12 0 0 0-7.53-4.07l2.33-4.66a1 1 0 1 0-1.79-.89L15.36 18a12 12 0 0 0-8.45 4.14 11 11 0 1 1 18.18 0Z"></path>
<circle cx="8" cy="16" r="1"></circle>
<circle cx="24" cy="16" r="1"></circle>
<circle cx="11" cy="11" r="1" transform="rotate(-40 10.993 11.002)"></circle>
<circle cx="22" cy="11" r="1" transform="rotate(-30 21.994 10.993)"></circle>
<circle cx="16" cy="8" r="1"></circle>
</g>
</svg>

After

Width:  |  Height:  |  Size: 854 B

2
setup.bat

@ -0,0 +1,2 @@ @@ -0,0 +1,2 @@
C:\Python311\python.exe setup.py
pause

93
setup.py

@ -0,0 +1,93 @@ @@ -0,0 +1,93 @@
# Windows py2exe build script
# running: python3 setup.py py2exe
import os
import sys
import traceback
# noinspection PyUnresolvedReferences
import py2exe
import importlib
from distutils.core import setup
from glob import glob
sys.argv.append("py2exe")
PYTHON_PATH = r"C:\Python311"
SETUP_DICT = {
"console": [{
"script": "speedtest.py",
"icon_resources": [(1, "resources/speedometer.ico")],
"dest_base": "Speedtest"
}],
"zipfile": "lib/library.zip",
"data_files": [
("", glob(os.path.join(WINDOWS_PATH, r"SYSTEM32\msvcp100.dll"))),
("", glob(os.path.join(WINDOWS_PATH, r"SYSTEM32\msvcr100.dll"))),
],
"options": {
"py2exe": {
"bundle_files": 3,
"excludes": ["tcl", "tk", "tkinter", "idna", "lib2to3", "xmlrpc", "multiprocessing", "urllib3", "chardet",
"asyncio", "pydoc_data", "requests", "unittest", "pydoc", "lzma", "bz2"],
"dll_excludes": ["tcl86.dll", "tk86.dll"],
"compressed": True,
"optimize": 2
},
},
# fix for conflicting module dirs
"py_modules": [],
}
# Check if all files can be found
for num, file in enumerate(SETUP_DICT["data_files"]):
type_, file = file
if len(file) > 0:
file = file[0]
if os.path.exists(file):
print("Found file:", file)
else:
print("File not found:", file)
sys.exit(-1)
else:
if type_:
print("Data file of type", type_, "not found")
else:
print("Data file with index", num, "and name", file, "not found")
sys.exit(-1)
try:
setup(**SETUP_DICT)
except Exception:
traceback.print_exc()
os.system("pause")
else:
# DDLs / exe files that are safe to be compressed with UPX
OPTIMIZE = [glob(r"dist\python311.dll"),
glob("dist\adb.exe"),
glob(r"dist\lib\Qt*"),
glob(r"dist\lib\*.pyd"),
glob(r"dist\lib\libcrypto-1_1.dll"),
glob(r"dist\lib\libssl-1_1.dll")]
if os.path.exists("dist") and os.path.exists("upx.exe"):
for files in OPTIMIZE:
for file in files:
if os.path.exists(file):
# todo use subprocessing to run UPX on ALL CPU cores
os.system("upx --best %s" % file)
else:
print("Warning: not going to optimize dist size because upx.exe doesn't exist\n")
print("Please put upx.exe next to setup.py in order to have the build reduced in size by ~50%")
os.system("pause")

20
speedtest.ini

@ -0,0 +1,20 @@ @@ -0,0 +1,20 @@
[main]
# logging (DEBUG/INFO/WARNING/ERROR/CRITICAL)
log_level=DEBUG
log_filename=speedtest.log
# download chunk size in bytes
chunk_size=1024
# sample every N chunks for averaging speed
sample_every=5
# connection timeout. This is the timeout for the server to send the first response, NOT read timeout
connection_timeout=60
[mail]
smtp_host=smtp.mzjtechnology.com
smtp_login=test_account@mzjtechnology.com
smtp_password=test123456789
smtp_port=587
smtp_starttls=True
# optional
smtp_from=test@wiphone.io
smtp_sendto=me@helldev.net

244
speedtest.py

@ -0,0 +1,244 @@ @@ -0,0 +1,244 @@
#!/usr/bin/env python3
# This file is a part of speedtest
# Created at 12/28/2024
import os
import sys
core = getattr(sys.modules["__main__"], "__file__", None)
if core:
core = os.path.abspath(core)
root = os.path.dirname(core)
if root:
os.chdir(root)
sys.path.insert(0, "library")
import email.message
import email.utils
import io
import logging
import requests
import smtplib
import time
import traceback
from configparser import ConfigParser
from inputimeout import inputimeout, TimeoutOccurred
from commons import *
logger = logging.getLogger("speedtest")
formatter = logging.Formatter(
"%(asctime)s %(levelname)s: %(name)s: %(message)s", "%d.%m.%Y %H:%M:%S")
CONFIG_FILENAME = "speedtest.ini"
URLS_FILENAME = "urls.txt"
MAIN_SECTION = "main"
MAIL_SECTION = "mail"
CHUNK_SIZE = "chunk_size"
SAMPLE_EVERY = "sample_every"
CONNECTION_TIMEOUT = "connection_timeout"
SMTP_HOST = "smtp_host"
SMTP_STARTTLS = "smtp_starttls"
SMTP_HOST = "smtp_host"
SMTP_PORT = "smtp_port"
SMTP_PASSWORD = "smtp_password"
SMTP_LOGIN = "smtp_login"
SMTP_SENDTO = "smtp_sendto"
LOG_LEVEL = "log_level"
LOG_FILENAME = "log_filename"
# YOU HAVE 20 SECONDS TO COMPLY
INPUT_TIMEOUT = 20
def get_client(config: ConfigParser):
server = config[SMTP_HOST]
if config[SMTP_STARTTLS] == "True":
logger.debug("connecting to %s using starttls", server)
client = smtplib.SMTP(config[SMTP_HOST], config[SMTP_PORT])
client.starttls()
else:
client = smtplib.SMTP_SSL(config[SMTP_HOST], config[SMTP_PORT])
client.login(config[SMTP_LOGIN], config[SMTP_PASSWORD])
return client
def send_data(config: ConfigParser, body: str, messages: list):
client = get_client(config)
email_content = ""
for message in messages:
if not message.contents:
continue
email_content += "# Server: %s\n" % (message.server.name)
for content in message.contents:
email_content += content.head
email_content += "\n"
body = content.body
if isinstance(body, str):
email_content += body
elif isinstance(body, LineFormat):
texts = body.text
for text in texts:
content_, args = text.content, text.args
print("args", args)
email_content += content_.format(*text.args)
email_content += "\n"
email_content += content.footer
email_content += "\n"
email_content += "_" * 25
email_content += "\n"
from_addr = "Speedtest <%s>" % (config.get(SMTP_LOGIN))
msg = email.message.Message()
msg.add_header("Content-Type", "text")
msg.set_payload(email_content)
msg["Subject"] = "Speedtest report"
msg["From"] = from_addr
msg["To"] = config.get(SMTP_SENDTO)
client.sendmail(config.get(SMTP_LOGIN),
config.get(SMTP_SENDTO), msg.as_string())
def measure_speed(url: str, chunk_size: int = 1024, sample_every: int = 5, timeout: int = 10) -> tuple[int, int]:
with io.BytesIO() as f:
start = time.perf_counter()
r = requests.get(url, stream=True, timeout=timeout)
total_length = r.headers.get("content-length")
dl = 0
min_ = sys.maxsize
max_ = 0
samples = []
counter = 0
if total_length is None: # no content length header
# fixme: we can pre-configure the length
raise ValueError("Content length header not found!")
else:
for chunk in r.iter_content(chunk_size):
dl += len(chunk)
f.write(chunk)
speed = dl // (time.perf_counter() - start) / 1000000
if (counter % sample_every) == 0:
samples.append(speed)
max_ = max(max_, speed)
min_ = min(min_, speed)
counter += 1
avg = sum(samples) / len(samples)
return (avg, max_)
def test_url(url: str, chunk_size: int, sample_every: int, timeout: int) -> Message:
avg = 0
max_ = 0
try:
avg, max_ = measure_speed(url, chunk_size, sample_every, timeout)
except requests.exceptions.Timeout:
logger.error("Timeout to reach url: %s" % url, exc_info=True)
except Exception:
logger.error("Unable to test url: %s" % url, exc_info=True)
else:
text = []
text.append(Text("Average speed: {:.2f} MB/s", [avg]))
text.append(Text("Maximum speed: {:.2f} MB/s", [max_]))
content = Content("Speed test results", LineFormat(text))
message = Message([content])
message.server = Connection(url)
message.server.name = url
return message
def get_test_results(urls: list, chunk_size: int, sample_every: int, timeout: int):
messages = []
for url in urls:
logger.debug("Going to perform speedtest on %s" % url)
message = test_url(url, chunk_size, sample_every, timeout)
if message is not None:
messages.append(message)
else:
logger.warning("The url %s was not tested" % url)
return messages
def _get_urls() -> list:
with open(URLS_FILENAME) as file:
return file.read().splitlines()
def _setup_logging(config: ConfigParser):
log_level = config[LOG_LEVEL]
log_filename = config[LOG_FILENAME]
log_level = logging.getLevelName(log_level)
logger.setLevel(log_level)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
file_handler = logging.FileHandler(log_filename)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
def excepthook(*exc_info):
text = "".join(traceback.format_exception(*exc_info))
logger.error("unhandled exception caught: %s", text)
def _get_input(prompt: str, timeout: int = INPUT_TIMEOUT, default: str="") -> str:
try:
return inputimeout(prompt, timeout)
except TimeoutOccurred:
print("(fallback to default: %s)" % default)
return default
def get_name_and_isp() -> tuple[str, str]:
print("In order to proceed, we need to ask you a couple questions. Every question has a 20 second timeout, then the default value will be used.")
name = _get_input("Please input your name: ", INPUT_TIMEOUT, "undefined")
isp = _get_input("Please input your ISP name: ", INPUT_TIMEOUT, "undefined")
return name, isp
def main():
sys.excepthook = excepthook
parser = ConfigParser()
parser.read(CONFIG_FILENAME)
main_section = parser[MAIN_SECTION]
mail_section = parser[MAIL_SECTION]
_setup_logging(main_section)
sample_every = int(main_section[SAMPLE_EVERY])
chunk_size = int(main_section[CHUNK_SIZE])
timeout = int(main_section[CONNECTION_TIMEOUT])
urls = _get_urls()
logger.debug("Found %d urls" % len(urls))
name, isp = get_name_and_isp()
messages = get_test_results(urls, chunk_size, sample_every, timeout)
if messages is None or len(messages) == 0:
print("There was an error, no tests were performed!")
raise RuntimeError("No tests performed!")
client = get_client(mail_section)
body = "User name: %s; ISP: %s" % (name, isp)
send_data(mail_section, body, messages)
logger.info("Done! You're amazing!")
if __name__ == "__main__":
try:
main()
except Exception:
logger.error("Critical failure: ", exc_info=True)
if os.name == "nt":
os.system("pause")

1
urls.txt

@ -0,0 +1 @@ @@ -0,0 +1 @@
http://ipv4.download.thinkbroadband.com/100MB.zip
Loading…
Cancel
Save