from urllib.parse import unquote import mimetypes import datetime import aiofiles import asyncio import config import os STATUS = { 200: "200 OK", 403: "403 Forbidden", 404: "404 Not Found", 405: "405 Method Not Allowed", 418: "418 I'm a teapot" } class WebServer: async def log(self, text: str, addr: tuple=None, file: str=None) -> None: text = text.replace("", f"{addr[0]}:{addr[1]}" if addr else "") text = text.replace("", file if file else "") if config.log_file: async with aiofiles.open(config.log_file, mode="a") as file: await file.write(text + "\n") else: print(text) async def send_headers(self, writer: asyncio.StreamWriter, status: int, file_size: int, mime: str="text/html; charset=utf-8") -> None: headers = ( f"HTTP/1.1 {STATUS[status]}\r\n" f"Content-Type: {mime}\r\n" f"Content-Length: {file_size}\r\n" f"Server: {config.name}\r\n" "\r\n" ) writer.write(headers.encode()) await writer.drain() async def send_file(self, writer: asyncio.StreamWriter, file_path: str, file_size: int): if not os.path.isfile(file_path): writer.write("file error") await writer.drain() sent = 0 async with aiofiles.open(file_path, "rb") as f: while sent < file_size: chunk = await f.read(config.write_buffer) if not chunk: break writer.write(chunk) await writer.drain() sent += len(chunk) async def handle(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: addr = writer.get_extra_info("peername") await self.log(config.conn_msg, addr) rdata = await reader.read(config.read_buffer) data = rdata.decode() if not data: return real_addr = None for line in data.split("\n"): if line.startswith("X-Real-IP: "): real_addr = line[len("X-Real-IP: "):].strip() break if real_addr and config.proxied: addr = (real_addr, addr[1]) request = data.split("\n")[0] parts = request.split() if len(parts) < 2: return path = unquote(parts[1]) file_name = path[1:] if path.startswith('/') else path file_path = os.path.abspath(os.path.join(config.directory, file_name)) if not file_path.startswith(os.path.abspath(config.directory)): await self.log(config.err_msgs[418], addr, file_path) file_size = os.path.getsize(config.err_files[418]) await self.send_headers(writer, 418, file_size) await self.send_file(writer, config.err_files[418], file_size) writer.close() await writer.wait_closed() return if os.path.isfile(file_path): mime, _ = mimetypes.guess_type(file_path) file_size = os.path.getsize(file_path) if not mime: mime = "application/octet-stream" if mime.startswith("text"): mime += "; charset=utf-8" await self.send_headers(writer, 200, file_size, mime) await self.send_file(writer, file_path, file_size) elif os.path.isdir(file_path): resp = "" async with aiofiles.open(config.preset_file, "r", encoding="utf-8") as f: resp = await f.read() files = "" base_path = os.path.relpath(file_path, config.directory).replace('\\', '/') if base_path == '.': base_path = '' for item in sorted(os.listdir(file_path)): item_path = os.path.join(file_path, item) is_dir = os.path.isdir(item_path) if base_path: rel_path = f"{base_path}/{item}" else: rel_path = item if is_dir: rel_path += "/" item += "/" modify_time = os.path.getmtime(item_path) modify_datetime = datetime.datetime.fromtimestamp(modify_time) formatted_time = modify_datetime.strftime("%d.%m.%Y %H:%M:%S") rel_path_encoded = rel_path.replace(' ', '%20').replace('#', '%23') files += f'{item} | {formatted_time}
\n' resp = resp.replace("", files) resp = resp.encode() await self.send_headers(writer, 200, len(resp)) writer.write(resp) await writer.drain() else: await self.log(config.err_msgs[404], addr, config.err_files[404]) file_size = os.path.getsize(config.err_files[404]) await self.send_headers(writer, 404, file_size) await self.send_file(writer, config.err_files[404], file_size) writer.close() await writer.wait_closed() return await self.log(config.get_msg, addr, file_path) writer.close() await writer.wait_closed() async def start(self) -> None: server = await asyncio.start_server( self.handle, config.addr, config.port ) await self.log(f"{config.start_msg}", (config.addr, config.port)) async with server: await server.serve_forever() async def main(): s = WebServer() if not config.directory: print("directory not set!") exit(1) if not config.preset_file: print("preset file not set!") exit(1) if not os.path.isfile(config.preset_file): print("invalid preset file") exit(1) await s.start() asyncio.run(main())