Browse Source

Add azure server updater & add actual speed display

master
John Smith 1 day ago
parent
commit
5f595d2955
  1. 94
      library/azure_updater.py
  2. 2
      speedtest.ini
  3. 89
      speedtest.py

94
library/azure_updater.py

@ -0,0 +1,94 @@
#!/usr/bin/env python3
# This file is a part of speedtest
# Created at 01/17/2025
import os
import sys
core = getattr(sys.modules["__main__"], "__file__", None)
if core:
core = os.path.abspath(core)
root = os.path.dirname(core)
if root:
os.chdir(root)
import requests
URLS_FILE = "urls.txt"
SEARCH_STRINGS = ["azurespeed.com", "windows.net", "azure"]
BASE_URL = "https://www.azurespeed.com/api/sas?regionName={location}&blobName=100MB.bin&operation=download"
LOCATIONS = ("australiacentral",
"australiaeast",
"australiasoutheast",
"centralindia",
"eastasia",
"japaneast",
"japanwest",
"koreacentral",
"koreasouth",
"newzealandnorth",
"southindia",
"southeastasia",
"westindia")
DIRECTIONS = ("southeast", "west", "south", "central", "east", "north")
def convert_location(location: str) -> str:
for direction in DIRECTIONS:
if direction in location:
location = location.replace(direction, "")
location = direction.capitalize() + " " + location.capitalize()
break
return location
def get_urls() -> list[tuple[str]]:
new_urls = []
for location in LOCATIONS:
url = BASE_URL.format(location=location)
try:
json = requests.get(url).json()
new_url = json["url"]
except Exception:
continue
new_urls.append((location, new_url))
return new_urls
def rewrite_urls(new_urls: list[tuple[str]]):
contents = []
with open(URLS_FILE, "r") as file:
for line in file.read().splitlines():
line = line.strip()
for str_ in SEARCH_STRINGS:
if str_.lower() in line.lower():
break
else:
contents.append(line)
with open(URLS_FILE, "w") as file:
for (location, new_url) in new_urls:
location = convert_location(location)
file.write("{location}|Azure|{new_url}\n".format(location=location, new_url=new_url))
for item in contents:
file.write(item + "\n")
def update_azure_lists():
urls = get_urls()
rewrite_urls(urls)
def main():
update_azure_lists()
if __name__ == "__main__":
main()

2
speedtest.ini

@ -8,6 +8,8 @@ chunk_size=1024
sample_every=5 sample_every=5
# connection timeout. This is the timeout for the server to send the first response, NOT read timeout # connection timeout. This is the timeout for the server to send the first response, NOT read timeout
connection_timeout=60 connection_timeout=60
# whether to update azure lists
azure_update=True
[mail] [mail]
smtp_host=smtp.mzjtechnology.com smtp_host=smtp.mzjtechnology.com

89
speedtest.py

@ -29,6 +29,7 @@ import traceback
from configparser import ConfigParser from configparser import ConfigParser
from inputimeout import inputimeout, TimeoutOccurred from inputimeout import inputimeout, TimeoutOccurred
from azure_updater import update_azure_lists
from commons import * from commons import *
@ -37,6 +38,8 @@ formatter = logging.Formatter(
"%(asctime)s %(levelname)s: %(name)s: %(message)s", "%d.%m.%Y %H:%M:%S") "%(asctime)s %(levelname)s: %(name)s: %(message)s", "%d.%m.%Y %H:%M:%S")
USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
CONFIG_FILENAME = "speedtest.ini" CONFIG_FILENAME = "speedtest.ini"
URLS_FILENAME = "urls.txt" URLS_FILENAME = "urls.txt"
@ -46,6 +49,8 @@ MAIL_SECTION = "mail"
CHUNK_SIZE = "chunk_size" CHUNK_SIZE = "chunk_size"
SAMPLE_EVERY = "sample_every" SAMPLE_EVERY = "sample_every"
CONNECTION_TIMEOUT = "connection_timeout" CONNECTION_TIMEOUT = "connection_timeout"
AZURE_UPDATE = "azure_update"
SMTP_HOST = "smtp_host" SMTP_HOST = "smtp_host"
SMTP_STARTTLS = "smtp_starttls" SMTP_STARTTLS = "smtp_starttls"
@ -94,11 +99,9 @@ def send_data(config: ConfigParser, email_content: str, messages: list):
content_, args = text.content, text.args content_, args = text.content, text.args
email_content += content_.format(*text.args) email_content += content_.format(*text.args)
email_content += "\n" email_content += "\n"
email_content += content.footer email_content += content.footer
email_content += "\n" email_content += "\n"
email_content += "_" * 25
email_content += "\n"
from_addr = "Speedtest <%s>" % (config.get(SMTP_LOGIN)) from_addr = "Speedtest <%s>" % (config.get(SMTP_LOGIN))
msg = email.message.Message() msg = email.message.Message()
msg.add_header("Content-Type", "text") msg.add_header("Content-Type", "text")
@ -113,7 +116,7 @@ def send_data(config: ConfigParser, email_content: str, messages: list):
def measure_speed(url: str, chunk_size: int = 1024, sample_every: int = 5, timeout: int = 10) -> tuple[int, int]: def measure_speed(url: str, chunk_size: int = 1024, sample_every: int = 5, timeout: int = 10) -> tuple[int, int]:
with io.BytesIO() as f: with io.BytesIO() as f:
start = time.perf_counter() start = time.perf_counter()
r = requests.get(url, stream=True, timeout=timeout) r = requests.get(url, stream=True, timeout=timeout, headers={"User-Agent": USER_AGENT})
total_length = r.headers.get("content-length") total_length = r.headers.get("content-length")
dl = 0 dl = 0
min_ = sys.maxsize min_ = sys.maxsize
@ -130,46 +133,51 @@ def measure_speed(url: str, chunk_size: int = 1024, sample_every: int = 5, timeo
speed = dl // (time.perf_counter() - start) / 1000000 speed = dl // (time.perf_counter() - start) / 1000000
if (counter % sample_every) == 0: if (counter % sample_every) == 0:
samples.append(speed) samples.append(speed)
done = int(30 * dl / int(total_length))
sys.stdout.write("\r[%s%s] %s MB/s" % ("=" * done, " " * (30-done), speed))
max_ = max(max_, speed) max_ = max(max_, speed)
min_ = min(min_, speed) min_ = min(min_, speed)
counter += 1 counter += 1
avg = sum(samples) / len(samples) avg = sum(samples) / len(samples)
print("\n")
return (avg, max_) return (avg, max_)
def test_url(url: str, chunk_size: int, sample_every: int, timeout: int) -> Message:
def test_url(server: str, chunk_size: int, sample_every: int, timeout: int) -> tuple[int, int]:
avg = 0 avg = 0
max_ = 0 max_ = 0
location, hosting, url = server.split("|")
try: try:
avg, max_ = measure_speed(url, chunk_size, sample_every, timeout) avg, max_ = measure_speed(url, chunk_size, sample_every, timeout)
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
logger.error("Timeout to reach url: %s" % url, exc_info=True) logger.error("Timeout to reach url: %s" % url, exc_info=True)
except requests.exceptions.ConnectionError as e:
logger.error("Unable to test url: %s (requests error: %s)" % (url, str(e)))
except Exception: except Exception:
logger.error("Unable to test url: %s" % url, exc_info=True) logger.error("Unable to test url: %s" % url, exc_info=True)
else: else:
text = [] return avg, max_
text.append(Text("Average speed: {:.2f} MB/s", [avg]))
text.append(Text("Maximum speed: {:.2f} MB/s", [max_]))
content = Content("Speed test results", LineFormat(text)) def get_test_results(servers: list[str], chunk_size: int, sample_every: int, timeout: int) -> list[dict]:
message = Message([content]) result = []
message.server = Connection(url) for server in servers:
message.server.name = url location, hosting, url = server.split("|")
return message logger.debug("Going to perform speedtest on %s" % server)
test_result = test_url(server, chunk_size, sample_every, timeout)
if test_result is not None:
def get_test_results(urls: list, chunk_size: int, sample_every: int, timeout: int): avg, max_ = test_result
messages = [] data = {"avg": avg, "max": max_, "server": server}
for url in urls: result.append(data)
logger.debug("Going to perform speedtest on %s" % url)
message = test_url(url, chunk_size, sample_every, timeout)
if message is not None:
messages.append(message)
else: else:
logger.warning("The url %s was not tested" % url) logger.warning("The server %s was not tested" % server)
return messages continue
return result
def _get_urls() -> list: def _get_test_servers() -> list:
with open(URLS_FILENAME) as file: with open(URLS_FILENAME) as file:
return file.read().splitlines() return file.read().splitlines()
@ -207,8 +215,26 @@ def get_name_and_isp() -> tuple[str, str]:
return name, isp return name, isp
def combine_results(results: dict) -> list[Message]:
messages = []
for item in sorted(results, key=lambda cur: -cur["avg"]):
text = []
avg = item["avg"]
max_ = item["max"]
location, hosting, url = item["server"].split("|")
text.append(Text("Average speed: {:.2f} MB/s", [avg]))
text.append(Text("Maximum speed: {:.2f} MB/s", [max_]))
content = Content("Speed test results", LineFormat(text))
content.footer = "_" * 25
message = Message([content])
message.server = Connection(url)
message.server.name = location + " " + hosting + " " + url
messages.append(message)
return messages
def main(): def main():
sys.excepthook = excepthook # sys.excepthook = excepthook
parser = ConfigParser() parser = ConfigParser()
parser.read(CONFIG_FILENAME) parser.read(CONFIG_FILENAME)
main_section = parser[MAIN_SECTION] main_section = parser[MAIN_SECTION]
@ -218,11 +244,16 @@ def main():
sample_every = int(main_section[SAMPLE_EVERY]) sample_every = int(main_section[SAMPLE_EVERY])
chunk_size = int(main_section[CHUNK_SIZE]) chunk_size = int(main_section[CHUNK_SIZE])
timeout = int(main_section[CONNECTION_TIMEOUT]) timeout = int(main_section[CONNECTION_TIMEOUT])
azure_update = main_section[AZURE_UPDATE]
if azure_update == "True":
print("Updating azure lists")
update_azure_lists()
urls = _get_urls() servers = _get_test_servers()
logger.debug("Found %d urls" % len(urls)) logger.debug("Found %d servers" % len(servers))
name, isp = get_name_and_isp() name, isp = get_name_and_isp()
messages = get_test_results(urls, chunk_size, sample_every, timeout) results = get_test_results(servers, chunk_size, sample_every, timeout)
messages = combine_results(results)
if messages is None or len(messages) == 0: if messages is None or len(messages) == 0:
print("There was an error, no tests were performed!") print("There was an error, no tests were performed!")
raise RuntimeError("No tests performed!") raise RuntimeError("No tests performed!")

Loading…
Cancel
Save