Files
debweb/main.py
2025-08-15 00:25:49 +03:00

218 lines
7.5 KiB
Python

from urllib.parse import unquote
import mimetypes
import datetime
import aiofiles
import asyncio
import config
import utils
import time
import os
STATUS = {
200: "200 OK",
403: "403 Forbidden",
404: "404 Not Found",
405: "405 Method Not Allowed",
418: "418 I'm a teapot"
}
class WebServer:
async def log(self, text: str, addr: tuple=None, file: str=None) -> None:
text = text.replace("<ADDR>", f"{addr[0]}:{addr[1]}" if addr else "")
text = text.replace("<FILE>", file if file else "")
text = text.replace("<TIME>", datetime.datetime.now().strftime(config.time_format))
if config.log_file:
async with aiofiles.open(config.log_file, mode="a") as file:
await file.write(text + "\n")
else:
print(text)
async def send_headers(self, writer: asyncio.StreamWriter, status: int, file_size: int, mime: str="text/html; charset=utf-8") -> None:
headers = (
f"HTTP/1.1 {STATUS[status]}\r\n"
f"Content-Type: {mime}\r\n"
f"Content-Length: {file_size}\r\n"
f"Server: {config.name}\r\n"
"\r\n"
)
writer.write(headers.encode())
await writer.drain()
async def send_file(self, writer: asyncio.StreamWriter, file_path: str, file_size: int):
if not os.path.isfile(file_path):
writer.write("file error")
await writer.drain()
sent = 0
async with aiofiles.open(file_path, "rb") as f:
while sent < file_size:
chunk = await f.read(config.write_buffer)
if not chunk: break
writer.write(chunk)
await writer.drain()
sent += len(chunk)
async def handle(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
addr = writer.get_extra_info("peername")
await self.log(config.conn_msg, addr)
conn_time = time.time()
rdata = await reader.read(config.read_buffer)
data = rdata.decode()
if not data: return
real_addr = None
for line in data.split("\n"):
if line.startswith("X-Real-IP: "):
real_addr = line[len("X-Real-IP: "):].strip()
break
if real_addr and config.proxied:
addr = (real_addr, addr[1])
request = data.split("\n")[0]
parts = request.split()
if len(parts) < 2: return
path = unquote(parts[1])
file_name = path[1:] if path.startswith('/') else path
file_path = os.path.abspath(os.path.join(config.directory, file_name))
if not file_path.startswith(os.path.abspath(config.directory)): # directory traversal
await self.log(config.err_msgs[418], addr, file_path)
file_size = os.path.getsize(config.err_files[418])
await self.send_headers(writer, 418, file_size)
await self.send_file(writer, config.err_files[418], file_size)
writer.close()
await writer.wait_closed()
return
if addr[0] in utils.get_banlist(): # banlist for pidors
await self.log(config.err_msgs[403], addr, file_path)
file_size = os.path.getsize(config.err_files[403])
await self.send_headers(writer, 403, file_size)
await self.send_file(writer, config.err_files[403], file_size)
writer.close()
await writer.wait_closed()
return
if os.path.isfile(file_path):
mime, _ = mimetypes.guess_type(file_path)
file_size = os.path.getsize(file_path)
if not mime: mime = "application/octet-stream"
if mime.startswith("text"): mime += "; charset=utf-8"
await self.send_headers(writer, 200, file_size, mime)
await self.send_file(writer, file_path, file_size)
elif os.path.isdir(file_path):
resp = ""
if os.path.isfile(os.path.join(file_path, "preset.html")):
preset_file = os.path.join(file_path, "preset.html")
else:
preset_file = config.preset_file
async with aiofiles.open(preset_file, "r", encoding="utf-8") as f:
resp = await f.read()
files = ""
base_path = os.path.relpath(file_path, config.directory).replace('\\', '/')
if base_path == '.': base_path = ''
file_count = 0
dir_count = 0
for item in sorted(os.listdir(file_path)):
item_path = os.path.join(file_path, item)
is_dir = os.path.isdir(item_path)
if base_path: rel_path = f"{base_path}/{item}"
else: rel_path = item
if is_dir:
rel_path += "/"
item += "/"
dir_count += 1
else: file_count += 1
rel_path_encoded = rel_path.replace(' ', '%20').replace('#', '%23')
entry = config.dir_entry if is_dir else config.file_entry
entry = entry.replace("<NAME>", item)
entry = entry.replace("<REL_PATH>", rel_path_encoded)
entry = entry.replace("<CDATE>", utils.get_create_time(item_path, config.time_format))
entry = entry.replace("<MDATE>", utils.get_mod_time(item_path, config.time_format))
entry = entry.replace("<SIZE_B>", f"{os.path.getsize(item_path)}B")
entry = entry.replace("<SIZE_KB>", f"{format(os.path.getsize(item_path) / 1024, ".2f")}KB")
entry = entry.replace("<SIZE_MB>", f"{format(os.path.getsize(item_path) / 1024 ** 2, ".2f")}MB")
files += entry
resp = resp.replace("<FILES>", files)
resp = resp.replace("<FILE_COUNT>", str(file_count))
resp = resp.replace("<DIR_COUNT>", str(dir_count))
resp = resp.replace("<TOTAL_COUNT>", str(file_count + dir_count))
resp = resp.replace("<SERVER>", config.name)
resp = resp.replace("<LOAD_TIME>", format(time.time() - conn_time, ".3f"))
resp = resp.replace("<SERVER_TIME>", datetime.datetime.now().strftime(config.time_format))
resp = resp.encode()
await self.send_headers(writer, 200, len(resp))
writer.write(resp)
await writer.drain()
else:
await self.log(config.err_msgs[404], addr, file_path)
file_size = os.path.getsize(config.err_files[404])
await self.send_headers(writer, 404, file_size)
await self.send_file(writer, config.err_files[404], file_size)
writer.close()
await writer.wait_closed()
return
await self.log(config.get_msg, addr, file_path)
writer.close()
await writer.wait_closed()
async def start(self) -> None:
server = await asyncio.start_server(
self.handle,
config.addr,
config.port
)
await self.log(f"{config.start_msg}", (config.addr, config.port))
async with server:
await server.serve_forever()
async def main():
s = WebServer()
if not config.directory:
print("directory not set!")
exit(1)
if not config.preset_file:
print("preset file not set!")
exit(1)
if not os.path.isfile(config.preset_file):
print("invalid preset file")
exit(1)
await s.start()
asyncio.run(main())