330 lines
12 KiB
Python
330 lines
12 KiB
Python
from __future__ import unicode_literals
|
|
|
|
import os
|
|
import traceback
|
|
import tempfile
|
|
from glob import glob
|
|
from datetime import datetime
|
|
from threading import Thread
|
|
from enum import Enum
|
|
from typing import Optional, Dict, List, Any, Tuple, Union
|
|
|
|
import shutil as celery_shutil
|
|
from .setup import P
|
|
|
|
logger = P.logger
|
|
ModelSetting = P.ModelSetting
|
|
|
|
# yt-dlp 패키지 설정 (기본값 index 1)
|
|
package_idx: str = ModelSetting.get("youtube_dl_package")
|
|
if not package_idx:
|
|
package_idx = "1"
|
|
youtube_dl_package: str = P.youtube_dl_packages[int(package_idx)].replace("-", "_")
|
|
|
|
|
|
class Status(Enum):
|
|
READY = 0
|
|
START = 1
|
|
DOWNLOADING = 2
|
|
ERROR = 3
|
|
FINISHED = 4
|
|
STOP = 5
|
|
COMPLETED = 6
|
|
|
|
def __str__(self) -> str:
|
|
str_list: List[str] = ["준비", "분석중", "다운로드중", "실패", "변환중", "중지", "완료"]
|
|
return str_list[self.value]
|
|
|
|
|
|
class MyYoutubeDL:
|
|
DEFAULT_FILENAME: str = "%(title)s-%(id)s.%(ext)s"
|
|
|
|
_index: int = 0
|
|
|
|
def __init__(
|
|
self,
|
|
plugin: str,
|
|
type_name: str,
|
|
url: str,
|
|
filename: str,
|
|
temp_path: str,
|
|
save_path: Optional[str] = None,
|
|
opts: Optional[Dict[str, Any]] = None,
|
|
dateafter: Optional[str] = None,
|
|
datebefore: Optional[str] = None,
|
|
):
|
|
# yt-dlp/youtube-dl의 utils 모듈에서 DateRange 임포트
|
|
DateRange = __import__(
|
|
f"{youtube_dl_package}.utils", fromlist=["DateRange"]
|
|
).DateRange
|
|
|
|
if save_path is None:
|
|
save_path = temp_path
|
|
if opts is None:
|
|
opts = {}
|
|
|
|
self.plugin: str = plugin
|
|
self.type: str = type_name
|
|
self.url: str = url
|
|
self.filename: str = filename
|
|
|
|
# 임시 폴더 생성
|
|
if not os.path.isdir(temp_path):
|
|
os.makedirs(temp_path)
|
|
self.temp_path: str = tempfile.mkdtemp(prefix="youtube-dl_", dir=temp_path)
|
|
|
|
# 저장 폴더 생성
|
|
if not os.path.isdir(save_path):
|
|
os.makedirs(save_path)
|
|
self.save_path: str = save_path
|
|
|
|
self.opts: Dict[str, Any] = opts
|
|
if dateafter or datebefore:
|
|
self.opts["daterange"] = DateRange(start=dateafter, end=datebefore)
|
|
|
|
self.index: int = MyYoutubeDL._index
|
|
MyYoutubeDL._index += 1
|
|
|
|
self._status: Status = Status.READY
|
|
self._thread: Optional[Thread] = None
|
|
self.key: Optional[str] = None
|
|
self.start_time: Optional[datetime] = None
|
|
self.end_time: Optional[datetime] = None
|
|
|
|
# 비디오 정보
|
|
self.info_dict: Dict[str, Optional[str]] = {
|
|
"extractor": None,
|
|
"title": None,
|
|
"uploader": None,
|
|
"uploader_url": None,
|
|
}
|
|
|
|
# 진행률 정보
|
|
self.progress_hooks: Dict[str, Optional[Union[int, float, str]]] = {
|
|
"downloaded_bytes": None,
|
|
"total_bytes": None,
|
|
"eta": None,
|
|
"speed": None,
|
|
}
|
|
|
|
def start(self) -> bool:
|
|
"""다운로드 스레드 시작"""
|
|
if self.status != Status.READY:
|
|
return False
|
|
self._thread = Thread(target=self.run)
|
|
self._thread.start()
|
|
return True
|
|
|
|
def run(self) -> None:
|
|
"""다운로드 실행 본체"""
|
|
youtube_dl = __import__(youtube_dl_package)
|
|
|
|
try:
|
|
self.start_time = datetime.now()
|
|
self.status = Status.START
|
|
|
|
# 정보 추출
|
|
info_dict = MyYoutubeDL.get_info_dict(
|
|
self.url,
|
|
self.opts.get("proxy"),
|
|
self.opts.get("cookiefile"),
|
|
self.opts.get("http_headers"),
|
|
self.opts.get("cookiesfrombrowser"),
|
|
)
|
|
|
|
if info_dict is None:
|
|
self.status = Status.ERROR
|
|
return
|
|
|
|
self.info_dict["extractor"] = info_dict.get("extractor", "unknown")
|
|
self.info_dict["title"] = info_dict.get("title", info_dict.get("id", "unknown"))
|
|
self.info_dict["uploader"] = info_dict.get("uploader", "")
|
|
self.info_dict["uploader_url"] = info_dict.get("uploader_url", "")
|
|
|
|
ydl_opts: Dict[str, Any] = {
|
|
"logger": MyLogger(),
|
|
"progress_hooks": [self.my_hook],
|
|
"outtmpl": os.path.join(self.temp_path, self.filename),
|
|
"ignoreerrors": True,
|
|
"cachedir": False,
|
|
"nocheckcertificate": True,
|
|
}
|
|
|
|
# yt-dlp 전용 성능 향상 옵션
|
|
if youtube_dl_package == "yt_dlp":
|
|
ydl_opts.update({
|
|
"concurrent_fragment_downloads": 5,
|
|
"retries": 10,
|
|
})
|
|
|
|
ydl_opts.update(self.opts)
|
|
|
|
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
|
|
logger.debug(f"다운로드 시작: {self.url}")
|
|
error_code: int = ydl.download([self.url])
|
|
logger.debug(f"다운로드 종료 (코드: {error_code})")
|
|
|
|
if self.status in (Status.START, Status.FINISHED, Status.DOWNLOADING):
|
|
# 임시 폴더의 파일을 실제 저장 경로로 이동
|
|
for i in glob(self.temp_path + "/**/*", recursive=True):
|
|
path: str = i.replace(self.temp_path, self.save_path, 1)
|
|
if os.path.isdir(i):
|
|
if not os.path.isdir(path):
|
|
os.mkdir(path)
|
|
continue
|
|
celery_shutil.move(i, path)
|
|
self.status = Status.COMPLETED
|
|
except Exception as error:
|
|
self.status = Status.ERROR
|
|
logger.error(f"실행 중 예외 발생: {error}")
|
|
logger.error(traceback.format_exc())
|
|
finally:
|
|
if os.path.exists(self.temp_path):
|
|
celery_shutil.rmtree(self.temp_path)
|
|
if self.status != Status.STOP:
|
|
self.end_time = datetime.now()
|
|
|
|
def stop(self) -> bool:
|
|
"""다운로드 중지"""
|
|
if self.status in (Status.ERROR, Status.STOP, Status.COMPLETED):
|
|
return False
|
|
self.status = Status.STOP
|
|
self.end_time = datetime.now()
|
|
return True
|
|
|
|
@staticmethod
|
|
def get_preview_url(url: str) -> Optional[str]:
|
|
"""미리보기용 직접 재생 가능한 URL 추출"""
|
|
youtube_dl = __import__(youtube_dl_package)
|
|
try:
|
|
# 미리보기를 위해 다양한 포맷 시도 (mp4, hls 등)
|
|
ydl_opts: Dict[str, Any] = {
|
|
"format": "best[ext=mp4]/bestvideo[ext=mp4]+bestaudio[ext=m4a]/best",
|
|
"logger": MyLogger(),
|
|
"nocheckcertificate": True,
|
|
"quiet": True,
|
|
"js_runtimes": {"node": {"path": "/Users/yommi/.local/state/fnm_multishells/53824_1769161399333/bin/node"}},
|
|
}
|
|
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
|
|
info: Dict[str, Any] = ydl.extract_info(url, download=False)
|
|
|
|
# 1. HLS 매니페스트 우선 (가장 안정적으로 오디오+비디오 제공)
|
|
if info.get("manifest_url"):
|
|
return info["manifest_url"]
|
|
|
|
# 2. 직접 URL (ydl_opts에서 지정한 best[ext=mp4] 결과)
|
|
if info.get("url"):
|
|
return info["url"]
|
|
|
|
# 3. 포맷 목록에서 적절한 것 찾기
|
|
formats = info.get("formats", [])
|
|
# 오디오와 비디오가 모두 있는 포맷 찾기
|
|
combined_formats = [f for f in formats if f.get("vcodec") != "none" and f.get("acodec") != "none"]
|
|
if combined_formats:
|
|
# 가장 좋은 화질의 결합 포맷 선택
|
|
return combined_formats[-1].get("url")
|
|
|
|
return None
|
|
except Exception as error:
|
|
logger.error(f"미리보기 URL 추출 중 예외 발생: {error}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_version() -> str:
|
|
"""라이브러리 버전 확인"""
|
|
__version__: str = __import__(
|
|
f"{youtube_dl_package}.version", fromlist=["__version__"]
|
|
).__version__
|
|
return __version__
|
|
|
|
@staticmethod
|
|
def get_info_dict(
|
|
url: str,
|
|
proxy: Optional[str] = None,
|
|
cookiefile: Optional[str] = None,
|
|
http_headers: Optional[Dict[str, str]] = None,
|
|
cookiesfrombrowser: Optional[str] = None,
|
|
**extra_opts
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""비디오 메타데이터 정보 추출"""
|
|
youtube_dl = __import__(youtube_dl_package)
|
|
|
|
try:
|
|
ydl_opts: Dict[str, Any] = {
|
|
"logger": MyLogger(),
|
|
"nocheckcertificate": True,
|
|
"quiet": True,
|
|
# JS 런타임 수동 지정 (유저 시스템 환경 반영)
|
|
"js_runtimes": {"node": {"path": "/Users/yommi/.local/state/fnm_multishells/53824_1769161399333/bin/node"}},
|
|
}
|
|
# 기본값으로 extract_flat 적용 (명시적으로 override 가능)
|
|
if "extract_flat" not in extra_opts:
|
|
ydl_opts["extract_flat"] = True # True = 모든 추출기에 적용
|
|
|
|
if proxy:
|
|
ydl_opts["proxy"] = proxy
|
|
if cookiefile:
|
|
ydl_opts["cookiefile"] = cookiefile
|
|
if http_headers:
|
|
ydl_opts["http_headers"] = http_headers
|
|
if cookiesfrombrowser:
|
|
ydl_opts["cookiesfrombrowser"] = (cookiesfrombrowser, None, None, None)
|
|
|
|
# 추가 옵션 반영 (playliststart, playlistend 등)
|
|
ydl_opts.update(extra_opts)
|
|
|
|
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
|
|
info: Dict[str, Any] = ydl.extract_info(url, download=False)
|
|
except Exception as error:
|
|
logger.error(f"정보 추출 중 예외 발생: {error}")
|
|
logger.error(traceback.format_exc())
|
|
return None
|
|
return ydl.sanitize_info(info)
|
|
|
|
def my_hook(self, data: Dict[str, Any]) -> None:
|
|
"""진행률 업데이트 훅"""
|
|
if self.status != Status.STOP:
|
|
if data["status"] == "downloading":
|
|
self.status = Status.DOWNLOADING
|
|
elif data["status"] == "error":
|
|
self.status = Status.ERROR
|
|
elif data["status"] == "finished":
|
|
self.status = Status.FINISHED
|
|
|
|
if data["status"] != "error":
|
|
self.filename = os.path.basename(data.get("filename", self.filename))
|
|
self.progress_hooks["downloaded_bytes"] = data.get("downloaded_bytes")
|
|
self.progress_hooks["total_bytes"] = data.get("total_bytes") or data.get("total_bytes_estimate")
|
|
self.progress_hooks["eta"] = data.get("eta")
|
|
self.progress_hooks["speed"] = data.get("speed")
|
|
|
|
@property
|
|
def status(self) -> Status:
|
|
return self._status
|
|
|
|
@status.setter
|
|
def status(self, value: Status) -> None:
|
|
self._status = value
|
|
# Socket.IO를 통한 상태 업데이트 전송
|
|
try:
|
|
basic_module = P.get_module('basic')
|
|
if basic_module:
|
|
basic_module.socketio_emit("status", self)
|
|
except Exception as e:
|
|
logger.error(f"SocketIO 전송 에러: {e}")
|
|
|
|
|
|
class MyLogger:
|
|
"""yt-dlp의 로그를 가로채서 처리하는 클래성"""
|
|
def debug(self, msg: str) -> None:
|
|
# 진행 상황 관련 로그는 걸러냄
|
|
if " ETA " in msg or "at" in msg and "B/s" in msg:
|
|
return
|
|
logger.debug(msg)
|
|
|
|
def warning(self, msg: str) -> None:
|
|
logger.warning(msg)
|
|
|
|
def error(self, msg: str) -> None:
|
|
logger.error(msg)
|