add /info /video

This commit is contained in:
2026-02-22 22:17:32 +01:00
parent ad759647e0
commit 1bfcafc429

155
main.py
View File

@@ -1,6 +1,7 @@
import os import os
import uuid import uuid
import uvicorn import uvicorn
from typing import Any, Literal, cast
from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import FileResponse from fastapi.responses import FileResponse
from pydantic import BaseModel from pydantic import BaseModel
@@ -8,10 +9,28 @@ from yt_dlp import YoutubeDL
app = FastAPI() app = FastAPI()
# Input schema because we aren't savages # Input schema because we aren't savages
class VideoRequest(BaseModel): class VideoRequest(BaseModel):
url: str url: str
class VideoDownloadRequest(VideoRequest):
quality: Literal["best", "1080p", "720p", "480p", "360p"] = "best"
class VideoInfoResponse(BaseModel):
id: str | None = None
title: str | None = None
uploader: str | None = None
duration: int | None = None
webpage_url: str | None = None
thumbnail: str | None = None
view_count: int | None = None
upload_date: str | None = None
extractor: str | None = None
def cleanup(path: str): def cleanup(path: str):
"""Deletes the file after serving because I know you won't.""" """Deletes the file after serving because I know you won't."""
try: try:
@@ -20,50 +39,150 @@ def cleanup(path: str):
except Exception as e: except Exception as e:
print(f"Failed to delete {path}: {e}") print(f"Failed to delete {path}: {e}")
def get_video_format_selector(quality: str) -> str:
if quality == "best":
return "bestvideo+bestaudio/best"
height = quality.removesuffix("p")
return f"bestvideo[height<={height}]+bestaudio/best[height<={height}]"
@app.post("/audio") @app.post("/audio")
async def download_audio(req: VideoRequest, background_tasks: BackgroundTasks): async def download_audio(req: VideoRequest, background_tasks: BackgroundTasks):
# Random ID so concurrent requests don't overwrite each other # Random ID so concurrent requests don't overwrite each other
file_id = str(uuid.uuid4()) file_id = str(uuid.uuid4())
output_template = f"/tmp/{file_id}.%(ext)s" output_template = f"/tmp/{file_id}.%(ext)s"
# yt-dlp config: best audio, force mp3 because whisper hates weird codecs # yt-dlp config: best audio, force mp3 because whisper hates weird codecs
ydl_opts = { ydl_opts: dict[str, Any] = {
'format': 'bestaudio/best', "format": "bestaudio/best",
'outtmpl': output_template, "outtmpl": output_template,
'postprocessors': [{ "postprocessors": [
'key': 'FFmpegExtractAudio', {
'preferredcodec': 'mp3', "key": "FFmpegExtractAudio",
'preferredquality': '192', "preferredcodec": "mp3",
}], "preferredquality": "192",
'quiet': True, }
'no_warnings': True, ],
"quiet": True,
"no_warnings": True,
} }
try: try:
with YoutubeDL(ydl_opts) as ydl: with YoutubeDL(cast(Any, ydl_opts)) as ydl:
print(f"Yoinking audio from: {req.url}") print(f"Yoinking audio from: {req.url}")
info = ydl.extract_info(req.url, download=True) info = ydl.extract_info(req.url, download=True)
# yt-dlp might change extension post-processing, so we find the actual file # yt-dlp might change extension post-processing, so we find the actual file
# usually it's .mp3 if we forced it, but let's be safe # usually it's .mp3 if we forced it, but let's be safe
filename = ydl.prepare_filename(info).rsplit('.', 1)[0] + '.mp3' filename = ydl.prepare_filename(info).rsplit(".", 1)[0] + ".mp3"
if not os.path.exists(filename): if not os.path.exists(filename):
raise HTTPException(status_code=500, detail="Download failed, file missing. Blame YouTube.") raise HTTPException(
status_code=500, detail="Download failed, file missing. Blame YouTube."
)
# queue the deletion so it happens AFTER the response is sent # queue the deletion so it happens AFTER the response is sent
background_tasks.add_task(cleanup, filename) background_tasks.add_task(cleanup, filename)
return FileResponse( return FileResponse(
path=filename, path=filename, filename=f"{file_id}.mp3", media_type="audio/mpeg"
filename=f"{file_id}.mp3",
media_type='audio/mpeg'
) )
except Exception as e: except Exception as e:
print(f"L: {str(e)}") print(f"L: {str(e)}")
raise HTTPException(status_code=400, detail=f"Failed to download: {str(e)}") raise HTTPException(status_code=400, detail=f"Failed to download: {str(e)}")
@app.post("/info", response_model=VideoInfoResponse)
async def get_video_info(req: VideoRequest):
ydl_opts: dict[str, Any] = {
"quiet": True,
"no_warnings": True,
}
try:
with YoutubeDL(cast(Any, ydl_opts)) as ydl:
info = ydl.extract_info(req.url, download=False)
if not info:
raise HTTPException(status_code=404, detail="No metadata found for URL")
return VideoInfoResponse(
id=info.get("id"),
title=info.get("title"),
uploader=info.get("uploader"),
duration=info.get("duration"),
webpage_url=info.get("webpage_url"),
thumbnail=info.get("thumbnail"),
view_count=info.get("view_count"),
upload_date=info.get("upload_date"),
extractor=info.get("extractor"),
)
except HTTPException:
raise
except Exception as e:
print(f"Failed metadata fetch: {str(e)}")
raise HTTPException(
status_code=400, detail=f"Failed to fetch metadata: {str(e)}"
)
@app.post("/video")
async def download_video(req: VideoDownloadRequest, background_tasks: BackgroundTasks):
file_id = str(uuid.uuid4())
output_template = f"/tmp/{file_id}.%(ext)s"
ydl_opts: dict[str, Any] = {
"format": get_video_format_selector(req.quality),
"merge_output_format": "mp4",
"outtmpl": output_template,
"noplaylist": True,
"quiet": True,
"no_warnings": True,
}
try:
with YoutubeDL(cast(Any, ydl_opts)) as ydl:
print(f"Yoinking video from: {req.url} [{req.quality}]")
info = ydl.extract_info(req.url, download=True)
filename = None
requested_downloads = info.get("requested_downloads")
if isinstance(requested_downloads, list) and requested_downloads:
first = requested_downloads[0]
if isinstance(first, dict):
filepath = first.get("filepath")
if isinstance(filepath, str):
filename = filepath
if not filename:
maybe_filename = info.get("_filename")
if isinstance(maybe_filename, str):
filename = maybe_filename
if not filename:
filename = ydl.prepare_filename(info)
if not os.path.exists(filename):
raise HTTPException(
status_code=500, detail="Download failed, file missing. Blame YouTube."
)
background_tasks.add_task(cleanup, filename)
return FileResponse(
path=filename, filename=f"{file_id}.mp4", media_type="video/mp4"
)
except HTTPException:
raise
except Exception as e:
print(f"Video download failed: {str(e)}")
raise HTTPException(
status_code=400, detail=f"Failed to download video: {str(e)}"
)
if __name__ == "__main__": if __name__ == "__main__":
# Host on 0.0.0.0 so your docker container isn't useless # Host on 0.0.0.0 so your docker container isn't useless
uvicorn.run(app, host="0.0.0.0", port=8000) uvicorn.run(app, host="0.0.0.0", port=8000)