поделал сайт, сделал orm-модели, requirements.txt
This commit is contained in:
@@ -1,71 +1,37 @@
|
||||
from typing import Dict, Any, List, Tuple, Optional
|
||||
import os
|
||||
from fastapi import FastAPI, Request, Form, Depends, Cookie, responses, templating, HTTPException
|
||||
from starlette.staticfiles import StaticFiles
|
||||
from fastapi import FastAPI, Request, Form, Depends, Cookie, responses, templating, staticfiles
|
||||
from functions.admin import is_logged_in
|
||||
from functions.admin.models import database
|
||||
from functions.admin.templates import logins, refresh, logs, log
|
||||
|
||||
app = FastAPI()
|
||||
templates = templating.Jinja2Templates(directory="static/templates")
|
||||
app.mount("/static", StaticFiles(directory="static"), name="static")
|
||||
|
||||
|
||||
def is_logged_in(logged_in: Optional[str] = Cookie(None)):
|
||||
return bool(logged_in)
|
||||
|
||||
app.mount("/static", staticfiles.StaticFiles(directory="static"), name="static")
|
||||
|
||||
@app.get("/", response_class=responses.HTMLResponse)
|
||||
def index(request: Request):
|
||||
def login(request: Request):
|
||||
return templates.TemplateResponse("index.html", {"request": request})
|
||||
|
||||
|
||||
@app.post("/login")
|
||||
async def login(password: str = Form(...)):
|
||||
if password == "password":
|
||||
response = responses.RedirectResponse(url="/logs", status_code=303)
|
||||
response.set_cookie(key="logged_in", value="true")
|
||||
return response
|
||||
return {"message": "Неправильный пароль"}
|
||||
@app.get("/login", response_class=responses.HTMLResponse)
|
||||
def login(request: Request):
|
||||
return templates.TemplateResponse("login.html", {"request": request})
|
||||
|
||||
|
||||
@app.post("/logins")
|
||||
async def logins_response(username: str = Form(...), password: str = Form(...), db=Depends(database.get_db)):
|
||||
return await logins.logins(username, password, db)
|
||||
|
||||
|
||||
@app.get("/refresh")
|
||||
async def refresh_access_token(req: Request, refresh_token: str = Cookie(None), db=Depends(database.get_db)):
|
||||
return await refresh.refresh_access_token(req, refresh_token, db)
|
||||
|
||||
|
||||
@app.get("/logs", response_class=responses.HTMLResponse)
|
||||
async def logs(request: Request, logged_in: bool = Depends(is_logged_in)):
|
||||
if logged_in:
|
||||
return templates.TemplateResponse("logs.html", {"request": request, "logs": get_logs()})
|
||||
else:
|
||||
return responses.RedirectResponse(url="/", status_code=303)
|
||||
async def logs_response(request: Request, logged_in: bool = Depends(is_logged_in.is_logged_in)):
|
||||
return await logs.logs(templates, request, logged_in)
|
||||
|
||||
|
||||
@app.get("/logs/{log_id}", response_class=responses.HTMLResponse)
|
||||
async def log(request: Request, log_id: int, logged_in: bool = Depends(is_logged_in)):
|
||||
if logged_in:
|
||||
if not get_log(log_id):
|
||||
raise HTTPException(status_code=400)
|
||||
return templates.TemplateResponse("log.html", {"request": request,
|
||||
"log": get_log(log_id)})
|
||||
else:
|
||||
responses.RedirectResponse(url="/login", status_code=303)
|
||||
|
||||
|
||||
|
||||
def get_logs() -> List[Tuple[int, str]]:
|
||||
return [(int(os.path.basename(dir_path)), dir_path.split("/")[2].strip())
|
||||
for dir_path, _, filenames in os.walk("static/logs")
|
||||
if dir_path != "static/logs" and len(dir_path.split("/")) == 3]
|
||||
|
||||
|
||||
def get_log(log_id: int) -> List[Dict[str, Any]]:
|
||||
log_dir = os.path.join("static/logs", str(log_id))
|
||||
return_dir = []
|
||||
for dir_path, _, filenames in os.walk(log_dir):
|
||||
if dir_path != log_dir:
|
||||
audio_file = os.path.join(dir_path, "audio.ogg")
|
||||
if not os.path.exists(audio_file):
|
||||
audio_file = os.path.join(dir_path, "audio.wav")
|
||||
text_file = os.path.join(dir_path, "yandex-text.txt")
|
||||
if not os.path.exists(text_file):
|
||||
text_file = os.path.join(dir_path, "google-text.txt")
|
||||
try:
|
||||
return_dir.append({"id": log_id, "audio_file": f"/{audio_file}",
|
||||
"text": open(text_file).read().split("\n")})
|
||||
except UnicodeDecodeError:
|
||||
pass
|
||||
return return_dir
|
||||
async def log_response(request: Request, log_id: int, logged_in: bool = Depends(is_logged_in.is_logged_in)):
|
||||
return await log.log(templates, request, log_id, logged_in)
|
||||
|
||||
Reference in New Issue
Block a user