PyWebBrowser/browser.py
Matthew Welch 015e46134e add support for file scheme
split url parsing into separate function
2024-12-06 11:31:09 -08:00

221 lines
6.3 KiB
Python

import os.path
import socket
import ssl
import sys
SUPPORTED_SCHEMES = [
"http",
"https",
"file",
]
DEFAULT_FILE = "default.html"
class URL:
def __init__(self, url_string: str | None = None):
self.scheme: str = ""
self.host: str = ""
self.port: int = -1
self.path: str = ""
self.query: str = ""
self.fragment: str = ""
self.default_port = False
if url_string is not None:
parse_url(url_string, self)
def to_string(self):
url_string = self.scheme + ":"
if self.host != "":
url_string += "//" + self.host
if self.port != -1 and not self.default_port:
url_string += f":{self.port}"
url_string += self.path
if self.query != "":
url_string += "?" + self.query
if self.fragment != "":
url_string += "#" + self.fragment
return url_string
def __str__(self):
return self.to_string()
def __repr__(self):
return f"<URL {self.to_string()}>"
class Request:
def __init__(self, url: URL, method: str = "GET"):
self.url = url
self.method = method
self.request_string = ""
self.http_version = "HTTP/1.1"
self.headers = {"Host": self.url.host}
def add_request_line(self, method):
self.request_string += f"{method} {self.url.path} {self.http_version}\r\n"
def add_header(self, key, value):
self.headers[key] = value
def add_headers(self, headers: dict[str,str]):
self.headers.update(headers)
def add_default_headers(self):
self.add_headers({
"Connection": "close",
"User-Agent": "PyWebBrowser"
})
def end_headers(self):
for key, value in self.headers.items():
self.request_string += f"{key}: {value}\r\n"
self.request_string += "\r\n"
def send_request(self, *args, **kwargs):
if self.url.scheme in ["http", "https"]:
return self.http_request(*args, **kwargs)
elif self.url.scheme == "file":
return self.file_request()
def http_request(self, method: str = "GET", headers: dict = None) -> str:
if headers is not None:
self.add_headers(headers)
s = socket.socket(
family=socket.AF_INET,
type=socket.SOCK_STREAM,
proto=socket.IPPROTO_TCP,
)
s.connect((self.url.host, self.url.port))
if self.url.scheme == "https":
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
s = context.wrap_socket(s, server_hostname=self.url.host)
self.add_request_line(method)
self.add_default_headers()
self.end_headers()
s.send(self.request_string.encode("utf8"))
response = s.makefile("r", encoding="utf8", newline="\r\n")
status_line = response.readline()
version, status, explanation = status_line.split(" ", 2)
response_headers = {}
while True:
line = response.readline()
if line == "\r\n":
break
header, value = line.split(":", 1)
response_headers[header.casefold()] = value.strip()
assert "transfer-encoding" not in response_headers
assert "content-encoding" not in response_headers
content = response.read()
s.close()
return content
def file_request(self):
with open(self.url.path) as f:
return f.read()
def parse_url(url_string: str, url: URL | None = None) -> tuple[URL, bool]:
has_authority = False
if url is None:
url = URL()
try:
url.scheme, url_string = url_string.split(":", 1)
assert url.scheme in SUPPORTED_SCHEMES
if url_string.startswith("//"):
has_authority = True
url_string = url_string[2:]
i = 0
for char in url_string:
if char in ["/", "?", "#"]:
break
i += 1
url.host += char
url_string = url_string[i:]
i = 0
for char in url_string:
if char in ["?", "#"]:
break
url.path += char
url_string = url_string[i:]
if has_authority and url.host == "" and url.path == "":
return url, False
elif not has_authority and url.path == "":
return url, False
if url_string.startswith("?"):
url_string = url_string[1:]
i = 0
for char in url_string:
if char == "#":
break
i += 1
url.query += char
url_string = url_string[i:]
if url_string.startswith("#"):
url.fragment = url_string[1:]
if url.scheme == "http":
url.port = 80
url.default_port = True
elif url.scheme == "https":
url.port = 443
url.default_port = False
if url.scheme in ["http", "https"]:
if url.path == "" or url.path is None:
url.path = "/"
if url.scheme == "file":
print(f"{url.scheme=}")
print(f"{url.host=}")
print(f"{url.port=}")
print(f"{url.path=}")
print(f"{url.query=}")
print(f"{url.fragment=}")
if sys.platform == "win32" and url.path.startswith("/") and ":" in url.path:
url.path = url.path[1:]
if url.path == "" or url.path == "/":
return url, False
if ":" in url.host:
url.host, port = url.host.split(":", 1)
url.port = int(port)
return url, True
except Exception as e:
print(e)
return url, False
def show(body: str) -> None:
in_tag = False
for char in body:
if char == "<":
in_tag = True
elif char == ">":
in_tag = False
elif not in_tag:
print(char, end="")
def load(url_string: str):
url, success = parse_url(url_string)
if not success:
default = os.path.abspath(DEFAULT_FILE)
if sys.platform == "win32":
default = "/" + default
url, _ = parse_url(f"file://{default}")
body = Request(url).send_request()
show(body)
if __name__ == '__main__':
load(sys.argv[1])