from urllib.parse import unquote import mimetypes import datetime import aiofiles import asyncio import config import os class WebServer: def __init__(self): self.name = config.name self.proxied = config.proxied self._addr = config.addr self._port = config.port self._log_file = config.log_file self.preset_file = config.preset_file self.directory = config.directory self._read_buffer = config.read_buffer self._write_size = config.write_buffer self.conn_msg = config.conn_msg self.start_msg = config.start_msg self.get_msg = config.get_msg self._e404_file = config.e404_file self.e404_msg = config.e404_msg async def log(self, text: str, addr: tuple=None, file: str=None) -> None: text = text.replace("", f"{addr[0]}:{addr[1]}" if addr else "") text = text.replace("", file if file else "") if self._log_file: async with aiofiles.open(self._log_file, mode="a") as file: await file.write(text + "\n") else: print(text) async def handle(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: addr = writer.get_extra_info("peername") await self.log(self.conn_msg, addr) data = await reader.read(self._read_buffer) data = unquote(data.decode()) if not data: return real_addr = None for line in data.split("\n"): if line.startswith("X-Real-IP: "): real_addr = line[len("X-Real-IP: "):].strip() break if real_addr and self.proxied: addr = (real_addr, addr[1]) request = data.split("\n")[0] file_name = request.split()[1][1:] file_path = os.path.join(self.directory, file_name) if os.path.isfile(file_path): mime, _ = mimetypes.guess_type(file_path) file_size = os.path.getsize(file_path) if not mime: mime = "application/octet-stream" if mime.startswith("text"): mime += "; charset=utf-8" headers = ( "HTTP/1.1 200 OK\r\n" f"Content-Type: {mime}\r\n" f"Content-Length: {file_size}\r\n" f"Server: {self.name}\r\n" "\r\n" ) writer.write("".join(headers).encode()) await writer.drain() sent = 0 async with aiofiles.open(file_path, "rb") as f: while sent < file_size: chunk = await f.read(self._write_size) if not chunk: break writer.write(chunk) await writer.drain() sent += len(chunk) elif os.path.isdir(file_path): resp = "" async with aiofiles.open(self.preset_file, "r") as f: resp = await f.read() files = "" norm_path = file_path.replace('\\', '/') base_path = norm_path.replace(self.directory.replace('\\', '/'), '').strip('/') for item in sorted(os.listdir(file_path)): item_path = os.path.join(file_path, item) is_dir = os.path.isdir(item_path) rel_path = f"{base_path}/{item}" if base_path else item rel_path = rel_path.replace('\\', '/') if is_dir: rel_path += "/" item += "/" modify_time = os.path.getmtime(item_path) modify_datetime = datetime.datetime.fromtimestamp(modify_time) formatted_time = modify_datetime.strftime("%d.%m.%Y %H:%M:%S") files += f'{item} | {formatted_time}
' resp = resp.replace("", files) resp = resp.encode() headers = ( "HTTP/1.1 200 OK\r\n" f"Content-Type: text/html; charset=utf-8\r\n" f"Server: {self.name}\r\n" f"Content-Length: {len(resp)}\r\n" "\r\n" ) writer.write(headers.encode() + resp) else: await self.log(self.e404_msg, addr, file_path) async with aiofiles.open(self._e404_file, "r", encoding="utf-8") as f: content = await f.read() response = ( "HTTP/1.1 404 Not Found\r\n" "Content-Type: text/html; charset=utf-8\r\n" f"Server: {self.name}\r\n" "\r\n" f"{content}" ) writer.write(response.encode()) await writer.drain() return await self.log(self.get_msg, addr, file_path) await writer.drain() async def start(self) -> None: server = await asyncio.start_server( self.handle, self._addr, self._port ) await self.log(f"{self.start_msg}", (self._addr, self._port)) async with server: await server.serve_forever() async def main(): s = WebServer() if not s.directory: print("directory not set!") exit(1) if not s.preset_file: print("preset file not set!") exit(1) if not os.path.isfile(s.preset_file): print("invalid preset file") exit(1) await s.start() asyncio.run(main())