150 lines
4.9 KiB
Python
150 lines
4.9 KiB
Python
from urllib.parse import unquote
|
|
|
|
import mimetypes
|
|
import datetime
|
|
import aiofiles
|
|
import asyncio
|
|
import time
|
|
import os
|
|
import re
|
|
|
|
class WebServer:
|
|
def __init__(self):
|
|
self._addr = os.environ.get("ADDR", "localhost")
|
|
self._port = int(os.environ.get("PORT", 7856))
|
|
|
|
self._log_file = os.environ.get("FILE")
|
|
self.preset_file = os.environ.get("PRESET_FILE")
|
|
self.directory = os.environ.get("DIR")
|
|
|
|
self._read_buffer = int(os.environ.get("READ_BUFFER", 16384))
|
|
self._write_size = int(os.environ.get("WRITE_BUFFER", 16384))
|
|
|
|
self.conn_msg = os.environ.get("CONN_MSG", "conn from <ADDR>")
|
|
self.start_msg = os.environ.get("START_MSG", "started at <ADDR>")
|
|
self.get_msg = os.environ.get("GET_MSG", "<ADDR> got <FILE>")
|
|
self.e404_msg = os.environ.get("404_MSG", "<ADDR> err 404 <FILE>")
|
|
|
|
|
|
async def log(self, text: str, addr: tuple=None, file: str=None) -> None:
|
|
text = text.replace("<ADDR>", f"{addr[0]}:{addr[1]}" if addr else "")
|
|
text = text.replace("<FILE>", file if file else "")
|
|
|
|
if self._log_file:
|
|
async with aiofiles.open(self._log_file, mode="a") as file:
|
|
await file.write(text + "\n")
|
|
else:
|
|
print(text)
|
|
|
|
|
|
async def handle(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
|
|
addr = writer.get_extra_info("peername")
|
|
await self.log(self.conn_msg, addr)
|
|
data = await reader.read(self._read_buffer)
|
|
data = unquote(data.decode())
|
|
if not data: return
|
|
|
|
request = data.split("\n")[0]
|
|
file_name = request.split()[1][1:]
|
|
file_path = self.directory + file_name
|
|
|
|
if os.path.isfile(file_path):
|
|
mime, _ = mimetypes.guess_type(file_path)
|
|
file_size = os.path.getsize(file_path)
|
|
if not mime: mime = "application/octet-stream"
|
|
if mime.startswith("text"): mime += "; charset=utf-8"
|
|
|
|
headers = (
|
|
"HTTP/1.1 200 OK\r\n",
|
|
f"Content-Type: {mime}\r\n",
|
|
f"Content-Length: {file_size}\r\n",
|
|
"\r\n"
|
|
)
|
|
|
|
writer.write("".join(headers).encode())
|
|
await writer.drain()
|
|
|
|
sent = 0
|
|
async with aiofiles.open(file_path, "rb") as f:
|
|
while sent < file_size:
|
|
chunk = await f.read(self._write_size)
|
|
if not chunk: break
|
|
|
|
writer.write(chunk)
|
|
await writer.drain()
|
|
|
|
sent += len(chunk)
|
|
|
|
|
|
elif os.path.isdir(file_path):
|
|
resp = ""
|
|
async with aiofiles.open(self.preset_file, "r") as f:
|
|
resp = await f.read()
|
|
|
|
files = ""
|
|
for file_name in sorted(os.listdir(file_path)): # TODO: добавить настройки отображения файла
|
|
file = "/".join(file_path.split("/")[1:]) +"/" + file_name
|
|
|
|
if os.path.isdir(file_path + "/" + file_name):
|
|
file_name = "/" + file_name
|
|
|
|
modify_time = (os.path.getmtime(file_path + "/" + file_name))
|
|
modify_datetime = datetime.datetime.fromtimestamp(modify_time)
|
|
formatted_time = modify_datetime.strftime("%d.%m.%Y %H:%M:%S")
|
|
|
|
files += f'<a href="/{file}">{file_name}</a> | {formatted_time}<br>'
|
|
|
|
resp = resp.replace("FILES", files)
|
|
resp = resp.replace("//", "/")
|
|
resp = resp.encode()
|
|
headers = (
|
|
"HTTP/1.1 200 OK\r\n"
|
|
f"Content-Type: text/html; charset=utf-8\r\n"
|
|
f"Content-Length: {len(resp)}\r\n"
|
|
"\r\n"
|
|
)
|
|
|
|
writer.write("".join(headers).encode() + resp)
|
|
else:
|
|
await self.log(self.e404_msg, addr, file_path)
|
|
response = ( # TODO: добавить страничку для 404
|
|
"HTTP/1.1 404 Not Found\r\n"
|
|
"Content-Type: text/html; charset=utf-8\r\n"
|
|
"\r\n"
|
|
"<h1>плоке плоке, 404</h1>"
|
|
)
|
|
writer.write(response.encode())
|
|
await writer.drain()
|
|
return
|
|
|
|
await self.log(self.get_msg, addr, file_path)
|
|
await writer.drain()
|
|
|
|
|
|
async def start(self) -> None:
|
|
server = await asyncio.start_server(
|
|
self.handle,
|
|
self._addr,
|
|
self._port
|
|
)
|
|
await self.log(f"{self.start_msg}", (self._addr, self._port))
|
|
|
|
async with server:
|
|
await server.serve_forever()
|
|
|
|
|
|
async def main():
|
|
s = WebServer()
|
|
if not s.directory:
|
|
print("directory not set!")
|
|
exit(1)
|
|
if not s.preset_file:
|
|
print("preset file not set!")
|
|
exit(1)
|
|
if not os.path.isfile(s.preset_file):
|
|
print("invalid preset file")
|
|
exit(1)
|
|
await s.start()
|
|
|
|
|
|
asyncio.run(main()) |