Update: plugin source code files

This commit is contained in:
2026-01-06 19:25:41 +09:00
parent 786e2b5026
commit 1ad3d0767a
12 changed files with 1497 additions and 692 deletions

547
main.py
View File

@@ -1,543 +1,10 @@
import os from typing import Any
import sys
import platform
import traceback
import subprocess
import sqlite3
from datetime import datetime
from flask import render_template, jsonify from .setup import P
from .my_youtube_dl import MyYoutubeDL, Status logger: Any = P.logger
package_name: str = P.package_name
ModelSetting: Any = P.ModelSetting
logger = P.logger # LogicMain은 ModuleBasic으로 기능이 통합되어 더 이상 사용되지 않음.
package_name = P.package_name # 하위 호환성이나 프레임워크 요구사항이 있을 경우를 위해 파일 구조만 유지.
ModelSetting = P.ModelSetting
class LogicMain(LogicModuleBase):
db_default = {
"db_version": "2",
"youtube_dl_package": "1",
"ffmpeg_path": ""
if platform.system() != "Windows"
else os.path.join(path_app_root, "bin", "Windows", "ffmpeg.exe"),
"temp_path": os.path.join(path_data, "download_tmp"),
"save_path": os.path.join(path_data, "download"),
"default_filename": "",
"proxy": "",
}
def __init__(self, plugin):
super(LogicMain, self).__init__(plugin, None)
self.name = package_name # 모듈명
default_route_socketio(plugin, self)
def plugin_load(self):
try:
# youtube-dl 업데이트
youtube_dl = Plugin.youtube_dl_packages[
int(ModelSetting.get("youtube_dl_package"))
]
logger.debug(f"{youtube_dl} upgrade")
logger.debug(
subprocess.check_output(
[sys.executable, "-m", "pip", "install", "--upgrade", youtube_dl],
universal_newlines=True,
)
)
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
def process_menu(self, sub, req):
try:
arg = {
"package_name": package_name,
"sub": sub,
"template_name": f"{package_name}_{sub}",
"package_version": Plugin.plugin_info["version"],
}
if sub == "setting":
arg.update(ModelSetting.to_dict())
arg["package_list"] = Plugin.youtube_dl_packages
arg["youtube_dl_version"] = LogicMain.get_youtube_dl_version()
arg["DEFAULT_FILENAME"] = LogicMain.get_default_filename()
elif sub == "download":
default_filename = ModelSetting.get("default_filename")
arg["filename"] = (
default_filename
if default_filename
else LogicMain.get_default_filename()
)
arg["preset_list"] = LogicMain.get_preset_list()
arg["postprocessor_list"] = LogicMain.get_postprocessor_list()
elif sub == "thumbnail":
default_filename = ModelSetting.get("default_filename")
arg["filename"] = (
default_filename
if default_filename
else LogicMain.get_default_filename()
)
elif sub == "sub":
default_filename = ModelSetting.get("default_filename")
arg["filename"] = (
default_filename
if default_filename
else LogicMain.get_default_filename()
)
elif sub == "list":
pass
return render_template(f"{package_name}_{sub}.html", arg=arg)
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
return render_template("sample.html", title=f"{package_name} - {sub}")
def process_ajax(self, sub, req):
try:
logger.debug("AJAX: %s, %s", sub, req.values)
ret = {"ret": "success"}
if sub == "ffmpeg_version":
path = req.form["path"]
output = subprocess.check_output([path, "-version"])
output = output.decode().replace("\n", "<br>")
ret["data"] = output
elif sub == "download":
postprocessor = req.form["postprocessor"]
video_convertor, extract_audio = LogicMain.get_postprocessor()
preferedformat = None
preferredcodec = None
preferredquality = None
if postprocessor in video_convertor:
preferedformat = postprocessor
elif postprocessor in extract_audio:
preferredcodec = postprocessor
preferredquality = 192
youtube_dl = LogicMain.download(
plugin=package_name,
url=req.form["url"],
filename=req.form["filename"],
temp_path=ModelSetting.get("temp_path"),
save_path=ModelSetting.get("save_path"),
format=req.form["format"],
preferedformat=preferedformat,
preferredcodec=preferredcodec,
preferredquality=preferredquality,
proxy=ModelSetting.get("proxy"),
ffmpeg_path=ModelSetting.get("ffmpeg_path"),
)
youtube_dl.start()
LogicMain.socketio_emit("add", youtube_dl)
ret["ret"] = "info"
ret["msg"] = "분석중..."
elif sub == "thumbnail":
youtube_dl = LogicMain.thumbnail(
plugin=package_name,
url=req.form["url"],
filename=req.form["filename"],
temp_path=ModelSetting.get("temp_path"),
save_path=ModelSetting.get("save_path"),
all_thumbnails=req.form["all_thumbnails"],
proxy=ModelSetting.get("proxy"),
ffmpeg_path=ModelSetting.get("ffmpeg_path"),
)
youtube_dl.start()
LogicMain.socketio_emit("add", youtube_dl)
ret["ret"] = "info"
ret["msg"] = "분석중..."
elif sub == "sub":
youtube_dl = LogicMain.sub(
plugin=package_name,
url=req.form["url"],
filename=req.form["filename"],
temp_path=ModelSetting.get("temp_path"),
save_path=ModelSetting.get("save_path"),
all_subs=req.form["all_subs"],
sub_lang=req.form["sub_lang"],
auto_sub=req.form["auto_sub"],
proxy=ModelSetting.get("proxy"),
ffmpeg_path=ModelSetting.get("ffmpeg_path"),
)
youtube_dl.start()
LogicMain.socketio_emit("add", youtube_dl)
ret["ret"] = "info"
ret["msg"] = "분석중..."
elif sub == "list":
ret["data"] = []
for i in LogicMain.youtube_dl_list:
data = LogicMain.get_data(i)
if data is not None:
ret["data"].append(data)
elif sub == "all_stop":
for i in LogicMain.youtube_dl_list:
i.stop()
elif sub == "stop":
index = int(req.form["index"])
LogicMain.youtube_dl_list[index].stop()
return jsonify(ret)
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
return jsonify({"ret": "danger", "msg": str(error)})
def migration(self):
try:
db_version = ModelSetting.get_int("db_version")
connect = sqlite3.connect(
os.path.join(path_data, "db", f"{package_name}.db")
)
if db_version < 2:
logger.debug("youtube-dlc uninstall")
logger.debug(
subprocess.check_output(
[sys.executable, "-m", "pip", "uninstall", "-y", "youtube-dlc"],
universal_newlines=True,
)
)
connect.commit()
connect.close()
ModelSetting.set("db_version", LogicMain.db_default["db_version"])
db.session.flush()
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
youtube_dl_list = []
@staticmethod
def get_youtube_dl_version():
try:
return MyYoutubeDL.get_version()
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
return "패키지 임포트 실패"
@staticmethod
def get_default_filename():
return MyYoutubeDL.DEFAULT_FILENAME
@staticmethod
def get_preset_list():
return [
["bestvideo+bestaudio/best", "최고 화질"],
["bestvideo[height<=1080]+bestaudio/best[height<=1080]", "1080p"],
["worstvideo+worstaudio/worst", "최저 화질"],
["bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]", "최고 화질(mp4)"],
[
"bestvideo[ext=mp4][height<=1080]+bestaudio[ext=m4a]/best[ext=mp4][height<=1080]",
"1080p(mp4)",
],
["bestvideo[filesize<50M]+bestaudio/best[filesize<50M]", "50MB 미만"],
["bestaudio/best", "오디오만"],
["_custom", "사용자 정의"],
]
@staticmethod
def get_postprocessor_list():
return [
["", "후처리 안함", None],
["mp4", "MP4", "비디오 변환"],
["flv", "FLV", "비디오 변환"],
["webm", "WebM", "비디오 변환"],
["ogg", "Ogg", "비디오 변환"],
["mkv", "MKV", "비디오 변환"],
["ts", "TS", "비디오 변환"],
["avi", "AVI", "비디오 변환"],
["wmv", "WMV", "비디오 변환"],
["mov", "MOV", "비디오 변환"],
["gif", "GIF", "비디오 변환"],
["mp3", "MP3", "오디오 추출"],
["aac", "AAC", "오디오 추출"],
["flac", "FLAC", "오디오 추출"],
["m4a", "M4A", "오디오 추출"],
["opus", "Opus", "오디오 추출"],
["vorbis", "Vorbis", "오디오 추출"],
["wav", "WAV", "오디오 추출"],
]
@staticmethod
def get_postprocessor():
video_convertor = []
extract_audio = []
for i in LogicMain.get_postprocessor_list():
if i[2] == "비디오 변환":
video_convertor.append(i[0])
elif i[2] == "오디오 추출":
extract_audio.append(i[0])
return video_convertor, extract_audio
@staticmethod
def download(**kwagrs):
try:
logger.debug(kwagrs)
plugin = kwagrs["plugin"]
url = kwagrs["url"]
filename = kwagrs["filename"]
temp_path = kwagrs["temp_path"]
save_path = kwagrs["save_path"]
opts = {}
if "format" in kwagrs and kwagrs["format"]:
opts["format"] = kwagrs["format"]
postprocessor = []
if "preferedformat" in kwagrs and kwagrs["preferedformat"]:
postprocessor.append(
{
"key": "FFmpegVideoConvertor",
"preferedformat": kwagrs["preferedformat"],
}
)
if "preferredcodec" in kwagrs and kwagrs["preferredcodec"]:
postprocessor.append(
{
"key": "FFmpegExtractAudio",
"preferredcodec": kwagrs["preferredcodec"],
"preferredquality": str(kwagrs["preferredquality"]),
}
)
if postprocessor:
opts["postprocessors"] = postprocessor
if "playlist" in kwagrs and kwagrs["playlist"]:
if kwagrs["playlist"] == "reverse":
opts["playlistreverse"] = True
elif kwagrs["playlist"] == "random":
opts["playlistrandom"] = True
else:
opts["playlist_items"] = kwagrs["playlist"]
if "archive" in kwagrs and kwagrs["archive"]:
opts["download_archive"] = kwagrs["archive"]
if "proxy" in kwagrs and kwagrs["proxy"]:
opts["proxy"] = kwagrs["proxy"]
if "ffmpeg_path" in kwagrs and kwagrs["ffmpeg_path"]:
opts["ffmpeg_location"] = kwagrs["ffmpeg_path"]
if "cookiefile" in kwagrs and kwagrs["cookiefile"]:
opts["cookiefile"] = kwagrs["cookiefile"]
if "headers" in kwagrs and kwagrs["headers"]:
opts["http_headers"] = kwagrs["headers"]
dateafter = kwagrs.get("dateafter")
youtube_dl = MyYoutubeDL(
plugin, "video", url, filename, temp_path, save_path, opts, dateafter
)
youtube_dl.key = kwagrs.get("key")
LogicMain.youtube_dl_list.append(youtube_dl) # 리스트 추가
return youtube_dl
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
return None
@staticmethod
def thumbnail(**kwagrs):
try:
logger.debug(kwagrs)
plugin = kwagrs["plugin"]
url = kwagrs["url"]
filename = kwagrs["filename"]
temp_path = kwagrs["temp_path"]
save_path = kwagrs["save_path"]
opts = {"skip_download": True}
if (
"all_thumbnails" in kwagrs
and str(kwagrs["all_thumbnails"]).lower() != "false"
):
opts["write_all_thumbnails"] = True
else:
opts["writethumbnail"] = True
if "playlist" in kwagrs and kwagrs["playlist"]:
if kwagrs["playlist"] == "reverse":
opts["playlistreverse"] = True
elif kwagrs["playlist"] == "random":
opts["playlistrandom"] = True
else:
opts["playlist_items"] = kwagrs["playlist"]
if "archive" in kwagrs and kwagrs["archive"]:
opts["download_archive"] = kwagrs["archive"]
if "proxy" in kwagrs and kwagrs["proxy"]:
opts["proxy"] = kwagrs["proxy"]
if "ffmpeg_path" in kwagrs and kwagrs["ffmpeg_path"]:
opts["ffmpeg_location"] = kwagrs["ffmpeg_path"]
if "cookiefile" in kwagrs and kwagrs["cookiefile"]:
opts["cookiefile"] = kwagrs["cookiefile"]
if "headers" in kwagrs and kwagrs["headers"]:
opts["http_headers"] = kwagrs["headers"]
dateafter = kwagrs.get("dateafter")
youtube_dl = MyYoutubeDL(
plugin,
"thumbnail",
url,
filename,
temp_path,
save_path,
opts,
dateafter,
)
youtube_dl.key = kwagrs.get("key")
LogicMain.youtube_dl_list.append(youtube_dl) # 리스트 추가
return youtube_dl
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
return None
@staticmethod
def sub(**kwagrs):
try:
logger.debug(kwagrs)
plugin = kwagrs["plugin"]
url = kwagrs["url"]
filename = kwagrs["filename"]
temp_path = kwagrs["temp_path"]
save_path = kwagrs["save_path"]
opts = {"skip_download": True}
sub_lang = map(
lambda x: x.strip(), kwagrs["sub_lang"].split(",")
) # 문자열을 리스트로 변환
if "all_subs" in kwagrs and str(kwagrs["all_subs"]).lower() != "false":
opts["allsubtitles"] = True
else:
opts["subtitleslangs"] = sub_lang
if "auto_sub" in kwagrs and str(kwagrs["auto_sub"]).lower() != "false":
opts["writeautomaticsub"] = True
else:
opts["writesubtitles"] = True
if "playlist" in kwagrs and kwagrs["playlist"]:
if kwagrs["playlist"] == "reverse":
opts["playlistreverse"] = True
elif kwagrs["playlist"] == "random":
opts["playlistrandom"] = True
else:
opts["playlist_items"] = kwagrs["playlist"]
if "archive" in kwagrs and kwagrs["archive"]:
opts["download_archive"] = kwagrs["archive"]
if "proxy" in kwagrs and kwagrs["proxy"]:
opts["proxy"] = kwagrs["proxy"]
if "ffmpeg_path" in kwagrs and kwagrs["ffmpeg_path"]:
opts["ffmpeg_location"] = kwagrs["ffmpeg_path"]
if "cookiefile" in kwagrs and kwagrs["cookiefile"]:
opts["cookiefile"] = kwagrs["cookiefile"]
if "headers" in kwagrs and kwagrs["headers"]:
opts["http_headers"] = kwagrs["headers"]
dateafter = kwagrs.get("dateafter")
youtube_dl = MyYoutubeDL(
plugin, "subtitle", url, filename, temp_path, save_path, opts, dateafter
)
youtube_dl.key = kwagrs.get("key")
LogicMain.youtube_dl_list.append(youtube_dl) # 리스트 추가
return youtube_dl
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
return None
@staticmethod
def get_data(youtube_dl):
try:
data = {}
data["plugin"] = youtube_dl.plugin
data["url"] = youtube_dl.url
data["filename"] = youtube_dl.filename
data["temp_path"] = youtube_dl.temp_path
data["save_path"] = youtube_dl.save_path
data["index"] = youtube_dl.index
data["status_str"] = youtube_dl.status.name
data["status_ko"] = str(youtube_dl.status)
data["end_time"] = ""
data["extractor"] = youtube_dl.type + (
" - " + youtube_dl.info_dict["extractor"]
if youtube_dl.info_dict["extractor"] is not None
else ""
)
data["title"] = (
youtube_dl.info_dict["title"]
if youtube_dl.info_dict["title"] is not None
else youtube_dl.url
)
data["uploader"] = (
youtube_dl.info_dict["uploader"]
if youtube_dl.info_dict["uploader"] is not None
else ""
)
data["uploader_url"] = (
youtube_dl.info_dict["uploader_url"]
if youtube_dl.info_dict["uploader_url"] is not None
else ""
)
data["downloaded_bytes_str"] = ""
data["total_bytes_str"] = ""
data["percent"] = "0"
data["eta"] = (
youtube_dl.progress_hooks["eta"]
if youtube_dl.progress_hooks["eta"] is not None
else ""
)
data["speed_str"] = (
LogicMain.human_readable_size(youtube_dl.progress_hooks["speed"], "/s")
if youtube_dl.progress_hooks["speed"] is not None
else ""
)
if youtube_dl.status == Status.READY: # 다운로드 전
data["start_time"] = ""
data["download_time"] = ""
else:
if youtube_dl.end_time is None: # 완료 전
download_time = datetime.now() - youtube_dl.start_time
else:
download_time = youtube_dl.end_time - youtube_dl.start_time
data["end_time"] = youtube_dl.end_time.strftime("%m-%d %H:%M:%S")
if None not in (
youtube_dl.progress_hooks["downloaded_bytes"],
youtube_dl.progress_hooks["total_bytes"],
): # 둘 다 값이 있으면
data["downloaded_bytes_str"] = LogicMain.human_readable_size(
youtube_dl.progress_hooks["downloaded_bytes"]
)
data["total_bytes_str"] = LogicMain.human_readable_size(
youtube_dl.progress_hooks["total_bytes"]
)
data[
"percent"
] = f"{(float(youtube_dl.progress_hooks['downloaded_bytes']) / float(youtube_dl.progress_hooks['total_bytes']) * 100):.2f}"
data["start_time"] = youtube_dl.start_time.strftime("%m-%d %H:%M:%S")
data[
"download_time"
] = f"{int(download_time.seconds / 60):02d}:{int(download_time.seconds % 60):02d}"
return data
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
return None
@staticmethod
def get_info_dict(url, proxy):
return MyYoutubeDL.get_info_dict(url, proxy)
@staticmethod
def human_readable_size(size, suffix=""):
for unit in ("Bytes", "KB", "MB", "GB", "TB", "PB", "EB", "ZB"):
if size < 1024.0:
return f"{size:3.1f} {unit}{suffix}"
size /= 1024.0
return f"{size:.1f} YB{suffix}"
@staticmethod
def socketio_emit(cmd, data):
socketio.emit(
cmd, LogicMain.get_data(data), namespace=f"/{package_name}", broadcast=True
)

547
main_old.py Normal file
View File

@@ -0,0 +1,547 @@
import os
import sys
import platform
import traceback
import subprocess
import sqlite3
from datetime import datetime
from flask import render_template, jsonify
from framework import db, path_app_root, path_data, socketio
from framework.common.plugin import LogicModuleBase, default_route_socketio
from .plugin import Plugin
from .my_youtube_dl import MyYoutubeDL, Status
logger = Plugin.logger
package_name = Plugin.package_name
ModelSetting = Plugin.ModelSetting
class LogicMain(LogicModuleBase):
db_default = {
"db_version": "2",
"youtube_dl_package": "1",
"ffmpeg_path": ""
if platform.system() != "Windows"
else os.path.join(path_app_root, "bin", "Windows", "ffmpeg.exe"),
"temp_path": os.path.join(path_data, "download_tmp"),
"save_path": os.path.join(path_data, "download"),
"default_filename": "",
"proxy": "",
}
def __init__(self, plugin):
super(LogicMain, self).__init__(plugin, None)
self.name = package_name # 모듈명
default_route_socketio(plugin, self)
def plugin_load(self):
try:
# youtube-dl 업데이트
youtube_dl = Plugin.youtube_dl_packages[
int(ModelSetting.get("youtube_dl_package"))
]
logger.debug(f"{youtube_dl} upgrade")
logger.debug(
subprocess.check_output(
[sys.executable, "-m", "pip", "install", "--upgrade", youtube_dl],
universal_newlines=True,
)
)
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
def process_menu(self, sub, req):
try:
arg = {
"package_name": package_name,
"sub": sub,
"template_name": f"{package_name}_{sub}",
"package_version": Plugin.plugin_info["version"],
}
if sub == "setting":
arg.update(ModelSetting.to_dict())
arg["package_list"] = Plugin.youtube_dl_packages
arg["youtube_dl_version"] = LogicMain.get_youtube_dl_version()
arg["DEFAULT_FILENAME"] = LogicMain.get_default_filename()
elif sub == "download":
default_filename = ModelSetting.get("default_filename")
arg["filename"] = (
default_filename
if default_filename
else LogicMain.get_default_filename()
)
arg["preset_list"] = LogicMain.get_preset_list()
arg["postprocessor_list"] = LogicMain.get_postprocessor_list()
elif sub == "thumbnail":
default_filename = ModelSetting.get("default_filename")
arg["filename"] = (
default_filename
if default_filename
else LogicMain.get_default_filename()
)
elif sub == "sub":
default_filename = ModelSetting.get("default_filename")
arg["filename"] = (
default_filename
if default_filename
else LogicMain.get_default_filename()
)
elif sub == "list":
pass
return render_template(f"{package_name}_{sub}.html", arg=arg)
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
return render_template("sample.html", title=f"{package_name} - {sub}")
def process_ajax(self, sub, req):
try:
logger.debug("AJAX: %s, %s", sub, req.values)
ret = {"ret": "success"}
if sub == "ffmpeg_version":
path = req.form["path"]
output = subprocess.check_output([path, "-version"])
output = output.decode().replace("\n", "<br>")
ret["data"] = output
elif sub == "download":
postprocessor = req.form["postprocessor"]
video_convertor, extract_audio = LogicMain.get_postprocessor()
preferedformat = None
preferredcodec = None
preferredquality = None
if postprocessor in video_convertor:
preferedformat = postprocessor
elif postprocessor in extract_audio:
preferredcodec = postprocessor
preferredquality = 192
youtube_dl = LogicMain.download(
plugin=package_name,
url=req.form["url"],
filename=req.form["filename"],
temp_path=ModelSetting.get("temp_path"),
save_path=ModelSetting.get("save_path"),
format=req.form["format"],
preferedformat=preferedformat,
preferredcodec=preferredcodec,
preferredquality=preferredquality,
proxy=ModelSetting.get("proxy"),
ffmpeg_path=ModelSetting.get("ffmpeg_path"),
)
youtube_dl.start()
LogicMain.socketio_emit("add", youtube_dl)
ret["ret"] = "info"
ret["msg"] = "분석중..."
elif sub == "thumbnail":
youtube_dl = LogicMain.thumbnail(
plugin=package_name,
url=req.form["url"],
filename=req.form["filename"],
temp_path=ModelSetting.get("temp_path"),
save_path=ModelSetting.get("save_path"),
all_thumbnails=req.form["all_thumbnails"],
proxy=ModelSetting.get("proxy"),
ffmpeg_path=ModelSetting.get("ffmpeg_path"),
)
youtube_dl.start()
LogicMain.socketio_emit("add", youtube_dl)
ret["ret"] = "info"
ret["msg"] = "분석중..."
elif sub == "sub":
youtube_dl = LogicMain.sub(
plugin=package_name,
url=req.form["url"],
filename=req.form["filename"],
temp_path=ModelSetting.get("temp_path"),
save_path=ModelSetting.get("save_path"),
all_subs=req.form["all_subs"],
sub_lang=req.form["sub_lang"],
auto_sub=req.form["auto_sub"],
proxy=ModelSetting.get("proxy"),
ffmpeg_path=ModelSetting.get("ffmpeg_path"),
)
youtube_dl.start()
LogicMain.socketio_emit("add", youtube_dl)
ret["ret"] = "info"
ret["msg"] = "분석중..."
elif sub == "list":
ret["data"] = []
for i in LogicMain.youtube_dl_list:
data = LogicMain.get_data(i)
if data is not None:
ret["data"].append(data)
elif sub == "all_stop":
for i in LogicMain.youtube_dl_list:
i.stop()
elif sub == "stop":
index = int(req.form["index"])
LogicMain.youtube_dl_list[index].stop()
return jsonify(ret)
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
return jsonify({"ret": "danger", "msg": str(error)})
def migration(self):
try:
db_version = ModelSetting.get_int("db_version")
connect = sqlite3.connect(
os.path.join(path_data, "db", f"{package_name}.db")
)
if db_version < 2:
logger.debug("youtube-dlc uninstall")
logger.debug(
subprocess.check_output(
[sys.executable, "-m", "pip", "uninstall", "-y", "youtube-dlc"],
universal_newlines=True,
)
)
connect.commit()
connect.close()
ModelSetting.set("db_version", LogicMain.db_default["db_version"])
db.session.flush()
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
youtube_dl_list = []
@staticmethod
def get_youtube_dl_version():
try:
return MyYoutubeDL.get_version()
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
return "패키지 임포트 실패"
@staticmethod
def get_default_filename():
return MyYoutubeDL.DEFAULT_FILENAME
@staticmethod
def get_preset_list():
return [
["bestvideo+bestaudio/best", "최고 화질"],
["bestvideo[height<=1080]+bestaudio/best[height<=1080]", "1080p"],
["worstvideo+worstaudio/worst", "최저 화질"],
["bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]", "최고 화질(mp4)"],
[
"bestvideo[ext=mp4][height<=1080]+bestaudio[ext=m4a]/best[ext=mp4][height<=1080]",
"1080p(mp4)",
],
["bestvideo[filesize<50M]+bestaudio/best[filesize<50M]", "50MB 미만"],
["bestaudio/best", "오디오만"],
["_custom", "사용자 정의"],
]
@staticmethod
def get_postprocessor_list():
return [
["", "후처리 안함", None],
["mp4", "MP4", "비디오 변환"],
["flv", "FLV", "비디오 변환"],
["webm", "WebM", "비디오 변환"],
["ogg", "Ogg", "비디오 변환"],
["mkv", "MKV", "비디오 변환"],
["ts", "TS", "비디오 변환"],
["avi", "AVI", "비디오 변환"],
["wmv", "WMV", "비디오 변환"],
["mov", "MOV", "비디오 변환"],
["gif", "GIF", "비디오 변환"],
["mp3", "MP3", "오디오 추출"],
["aac", "AAC", "오디오 추출"],
["flac", "FLAC", "오디오 추출"],
["m4a", "M4A", "오디오 추출"],
["opus", "Opus", "오디오 추출"],
["vorbis", "Vorbis", "오디오 추출"],
["wav", "WAV", "오디오 추출"],
]
@staticmethod
def get_postprocessor():
video_convertor = []
extract_audio = []
for i in LogicMain.get_postprocessor_list():
if i[2] == "비디오 변환":
video_convertor.append(i[0])
elif i[2] == "오디오 추출":
extract_audio.append(i[0])
return video_convertor, extract_audio
@staticmethod
def download(**kwagrs):
try:
logger.debug(kwagrs)
plugin = kwagrs["plugin"]
url = kwagrs["url"]
filename = kwagrs["filename"]
temp_path = kwagrs["temp_path"]
save_path = kwagrs["save_path"]
opts = {}
if "format" in kwagrs and kwagrs["format"]:
opts["format"] = kwagrs["format"]
postprocessor = []
if "preferedformat" in kwagrs and kwagrs["preferedformat"]:
postprocessor.append(
{
"key": "FFmpegVideoConvertor",
"preferedformat": kwagrs["preferedformat"],
}
)
if "preferredcodec" in kwagrs and kwagrs["preferredcodec"]:
postprocessor.append(
{
"key": "FFmpegExtractAudio",
"preferredcodec": kwagrs["preferredcodec"],
"preferredquality": str(kwagrs["preferredquality"]),
}
)
if postprocessor:
opts["postprocessors"] = postprocessor
if "playlist" in kwagrs and kwagrs["playlist"]:
if kwagrs["playlist"] == "reverse":
opts["playlistreverse"] = True
elif kwagrs["playlist"] == "random":
opts["playlistrandom"] = True
else:
opts["playlist_items"] = kwagrs["playlist"]
if "archive" in kwagrs and kwagrs["archive"]:
opts["download_archive"] = kwagrs["archive"]
if "proxy" in kwagrs and kwagrs["proxy"]:
opts["proxy"] = kwagrs["proxy"]
if "ffmpeg_path" in kwagrs and kwagrs["ffmpeg_path"]:
opts["ffmpeg_location"] = kwagrs["ffmpeg_path"]
if "cookiefile" in kwagrs and kwagrs["cookiefile"]:
opts["cookiefile"] = kwagrs["cookiefile"]
if "headers" in kwagrs and kwagrs["headers"]:
opts["http_headers"] = kwagrs["headers"]
dateafter = kwagrs.get("dateafter")
youtube_dl = MyYoutubeDL(
plugin, "video", url, filename, temp_path, save_path, opts, dateafter
)
youtube_dl.key = kwagrs.get("key")
LogicMain.youtube_dl_list.append(youtube_dl) # 리스트 추가
return youtube_dl
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
return None
@staticmethod
def thumbnail(**kwagrs):
try:
logger.debug(kwagrs)
plugin = kwagrs["plugin"]
url = kwagrs["url"]
filename = kwagrs["filename"]
temp_path = kwagrs["temp_path"]
save_path = kwagrs["save_path"]
opts = {"skip_download": True}
if (
"all_thumbnails" in kwagrs
and str(kwagrs["all_thumbnails"]).lower() != "false"
):
opts["write_all_thumbnails"] = True
else:
opts["writethumbnail"] = True
if "playlist" in kwagrs and kwagrs["playlist"]:
if kwagrs["playlist"] == "reverse":
opts["playlistreverse"] = True
elif kwagrs["playlist"] == "random":
opts["playlistrandom"] = True
else:
opts["playlist_items"] = kwagrs["playlist"]
if "archive" in kwagrs and kwagrs["archive"]:
opts["download_archive"] = kwagrs["archive"]
if "proxy" in kwagrs and kwagrs["proxy"]:
opts["proxy"] = kwagrs["proxy"]
if "ffmpeg_path" in kwagrs and kwagrs["ffmpeg_path"]:
opts["ffmpeg_location"] = kwagrs["ffmpeg_path"]
if "cookiefile" in kwagrs and kwagrs["cookiefile"]:
opts["cookiefile"] = kwagrs["cookiefile"]
if "headers" in kwagrs and kwagrs["headers"]:
opts["http_headers"] = kwagrs["headers"]
dateafter = kwagrs.get("dateafter")
youtube_dl = MyYoutubeDL(
plugin,
"thumbnail",
url,
filename,
temp_path,
save_path,
opts,
dateafter,
)
youtube_dl.key = kwagrs.get("key")
LogicMain.youtube_dl_list.append(youtube_dl) # 리스트 추가
return youtube_dl
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
return None
@staticmethod
def sub(**kwagrs):
try:
logger.debug(kwagrs)
plugin = kwagrs["plugin"]
url = kwagrs["url"]
filename = kwagrs["filename"]
temp_path = kwagrs["temp_path"]
save_path = kwagrs["save_path"]
opts = {"skip_download": True}
sub_lang = map(
lambda x: x.strip(), kwagrs["sub_lang"].split(",")
) # 문자열을 리스트로 변환
if "all_subs" in kwagrs and str(kwagrs["all_subs"]).lower() != "false":
opts["allsubtitles"] = True
else:
opts["subtitleslangs"] = sub_lang
if "auto_sub" in kwagrs and str(kwagrs["auto_sub"]).lower() != "false":
opts["writeautomaticsub"] = True
else:
opts["writesubtitles"] = True
if "playlist" in kwagrs and kwagrs["playlist"]:
if kwagrs["playlist"] == "reverse":
opts["playlistreverse"] = True
elif kwagrs["playlist"] == "random":
opts["playlistrandom"] = True
else:
opts["playlist_items"] = kwagrs["playlist"]
if "archive" in kwagrs and kwagrs["archive"]:
opts["download_archive"] = kwagrs["archive"]
if "proxy" in kwagrs and kwagrs["proxy"]:
opts["proxy"] = kwagrs["proxy"]
if "ffmpeg_path" in kwagrs and kwagrs["ffmpeg_path"]:
opts["ffmpeg_location"] = kwagrs["ffmpeg_path"]
if "cookiefile" in kwagrs and kwagrs["cookiefile"]:
opts["cookiefile"] = kwagrs["cookiefile"]
if "headers" in kwagrs and kwagrs["headers"]:
opts["http_headers"] = kwagrs["headers"]
dateafter = kwagrs.get("dateafter")
youtube_dl = MyYoutubeDL(
plugin, "subtitle", url, filename, temp_path, save_path, opts, dateafter
)
youtube_dl.key = kwagrs.get("key")
LogicMain.youtube_dl_list.append(youtube_dl) # 리스트 추가
return youtube_dl
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
return None
@staticmethod
def get_data(youtube_dl):
try:
data = {}
data["plugin"] = youtube_dl.plugin
data["url"] = youtube_dl.url
data["filename"] = youtube_dl.filename
data["temp_path"] = youtube_dl.temp_path
data["save_path"] = youtube_dl.save_path
data["index"] = youtube_dl.index
data["status_str"] = youtube_dl.status.name
data["status_ko"] = str(youtube_dl.status)
data["end_time"] = ""
data["extractor"] = youtube_dl.type + (
" - " + youtube_dl.info_dict["extractor"]
if youtube_dl.info_dict["extractor"] is not None
else ""
)
data["title"] = (
youtube_dl.info_dict["title"]
if youtube_dl.info_dict["title"] is not None
else youtube_dl.url
)
data["uploader"] = (
youtube_dl.info_dict["uploader"]
if youtube_dl.info_dict["uploader"] is not None
else ""
)
data["uploader_url"] = (
youtube_dl.info_dict["uploader_url"]
if youtube_dl.info_dict["uploader_url"] is not None
else ""
)
data["downloaded_bytes_str"] = ""
data["total_bytes_str"] = ""
data["percent"] = "0"
data["eta"] = (
youtube_dl.progress_hooks["eta"]
if youtube_dl.progress_hooks["eta"] is not None
else ""
)
data["speed_str"] = (
LogicMain.human_readable_size(youtube_dl.progress_hooks["speed"], "/s")
if youtube_dl.progress_hooks["speed"] is not None
else ""
)
if youtube_dl.status == Status.READY: # 다운로드 전
data["start_time"] = ""
data["download_time"] = ""
else:
if youtube_dl.end_time is None: # 완료 전
download_time = datetime.now() - youtube_dl.start_time
else:
download_time = youtube_dl.end_time - youtube_dl.start_time
data["end_time"] = youtube_dl.end_time.strftime("%m-%d %H:%M:%S")
if None not in (
youtube_dl.progress_hooks["downloaded_bytes"],
youtube_dl.progress_hooks["total_bytes"],
): # 둘 다 값이 있으면
data["downloaded_bytes_str"] = LogicMain.human_readable_size(
youtube_dl.progress_hooks["downloaded_bytes"]
)
data["total_bytes_str"] = LogicMain.human_readable_size(
youtube_dl.progress_hooks["total_bytes"]
)
data[
"percent"
] = f"{(float(youtube_dl.progress_hooks['downloaded_bytes']) / float(youtube_dl.progress_hooks['total_bytes']) * 100):.2f}"
data["start_time"] = youtube_dl.start_time.strftime("%m-%d %H:%M:%S")
data[
"download_time"
] = f"{int(download_time.seconds / 60):02d}:{int(download_time.seconds % 60):02d}"
return data
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
return None
@staticmethod
def get_info_dict(url, proxy):
return MyYoutubeDL.get_info_dict(url, proxy)
@staticmethod
def human_readable_size(size, suffix=""):
for unit in ("Bytes", "KB", "MB", "GB", "TB", "PB", "EB", "ZB"):
if size < 1024.0:
return f"{size:3.1f} {unit}{suffix}"
size /= 1024.0
return f"{size:.1f} YB{suffix}"
@staticmethod
def socketio_emit(cmd, data):
socketio.emit(
cmd, LogicMain.get_data(data), namespace=f"/{package_name}", broadcast=True
)

View File

@@ -1,29 +1,36 @@
# -*- coding: utf-8 -*- from typing import Any, Dict, List, Optional, Tuple, Union
# @Time : 2023/02/25 7:29 PM
# @Author : yommi from flask import Response, jsonify, render_template
# @Site : from loguru import logger
# @File : mod_basic
# @Software: PyCharm
# @Path : youtube-dl/mod_basic.py
from support import SupportYaml from support import SupportYaml
from tool import ToolUtil from tool import ToolUtil
from .setup import * from .model import ModelYoutubeDlItem
# from .main import LogicMain
from .my_youtube_dl import MyYoutubeDL, Status from .my_youtube_dl import MyYoutubeDL, Status
from .setup import *
import platform import platform
import os import os
from .model import ModelYoutubeDlItem import subprocess
from loguru import logger import traceback
import sys
from datetime import datetime
try:
from gommi_download_manager.mod_queue import ModuleQueue
except ImportError:
ModuleQueue = None
class ModuleBasic(PluginModuleBase): class ModuleBasic(PluginModuleBase):
def __init__(self, P): """유튜브 다운로더의 기본 기능을 담당하는 모듈"""
youtube_dl_list: List[MyYoutubeDL] = []
def __init__(self, P: Any) -> None:
super(ModuleBasic, self).__init__( super(ModuleBasic, self).__init__(
P, name="basic", first_menu="setting", scheduler_desc="유튜브 다운로더" P, name="basic", first_menu="setting", scheduler_desc="유튜브 다운로더"
) )
self.db_default = { self.db_default: Dict[str, str] = {
"db_version": "2", "db_version": "2",
"youtube_dl_package": "1", "youtube_dl_package": "1",
"ffmpeg_path": "" "ffmpeg_path": ""
@@ -34,32 +41,465 @@ class ModuleBasic(PluginModuleBase):
"default_filename": "", "default_filename": "",
"proxy": "", "proxy": "",
} }
self.web_list_model = ModelYoutubeDlItem self.web_list_model: Any = ModelYoutubeDlItem
def process_menu(self, sub, req): def process_menu(self, sub: str, req: Any) -> Union[Response, str]:
"""메뉴별 템플릿 렌더링"""
logger.debug(f"sub: {sub}") logger.debug(f"sub: {sub}")
arg = P.ModelSetting.to_dict() arg: Dict[str, Any] = P.ModelSetting.to_dict()
arg["package_name"] = P.package_name
arg["package_version"] = P.plugin_info["version"]
arg["template_name"] = f"{P.package_name}_{sub}" # JS 파일명 하위 호환성
logger.debug(f"arg:: {arg}") logger.debug(f"arg:: {arg}")
if sub == "setting": if sub == "setting":
arg["is_include"] = F.scheduler.is_include(self.get_scheduler_name()) arg["is_include"] = F.scheduler.is_include(self.get_scheduler_name())
arg["is_running"] = F.scheduler.is_running(self.get_scheduler_name()) arg["is_running"] = F.scheduler.is_running(self.get_scheduler_name())
arg["package_list"] = [[x, x] for x in P.youtube_dl_packages]
arg["youtube_dl_version"] = self.get_youtube_dl_version()
arg["DEFAULT_FILENAME"] = self.get_default_filename()
elif sub == "download": elif sub == "download":
default_filename = P.ModelSetting.get("default_filename") default_filename: Optional[str] = P.ModelSetting.get("default_filename")
arg["filename"] = ( arg["filename"] = (
default_filename default_filename
if default_filename if default_filename
else ModuleBasic.get_default_filename() else self.get_default_filename()
) )
arg["preset_list"] = ModuleBasic.get_preset_list() arg["preset_list"] = self.get_preset_list()
arg["postprocessor_list"] = ModuleBasic.get_postprocessor_list() arg["postprocessor_list"] = self.get_postprocessor_list()
return render_template(f"{P.package_name}_{self.name}_{sub}.html", arg=arg) return render_template(f"{P.package_name}_{self.name}_{sub}.html", arg=arg)
def process_command(self, command, arg1, arg2, arg3, req): def plugin_load(self) -> None:
ret = {"ret": "success"} """플러그인 로드 시 업그레이드 체크"""
try:
package_idx: str = P.ModelSetting.get("youtube_dl_package")
if not package_idx:
package_idx = "1"
youtube_dl: str = P.youtube_dl_packages[int(package_idx)]
logger.debug(f"{youtube_dl} 업그레이드 체크")
# 선택된 패키지(yt-dlp 등)를 최신 버전으로 업데이트
subprocess.check_output(
[sys.executable, "-m", "pip", "install", "--upgrade", youtube_dl],
universal_newlines=True,
)
except Exception as error:
logger.error(f"플러그인 로드 중 예외 발생: {error}")
logger.error(traceback.format_exc())
def get_youtube_dl_version(self) -> str:
"""다운로드 라이브러리 버전 획득"""
try:
return MyYoutubeDL.get_version()
except Exception as error:
logger.error(f"버전 확인 중 예외 발생: {error}")
# logger.error(traceback.format_exc())
return "패키지 임포트 실패"
def process_command(self, command: str, arg1: Any, arg2: Any, arg3: Any, req: Any) -> Response:
"""일반 커맨드 처리 (현재 미사용)"""
ret: Dict[str, str] = {"ret": "success"}
return jsonify(ret) return jsonify(ret)
def process_ajax(self, sub: str, req: Any) -> Response:
"""UI에서의 AJAX 요청 처리"""
try:
logger.debug(f"AJAX 요청: {sub}, {req.values}")
ret: Dict[str, Any] = {"ret": "success"}
if sub == "ffmpeg_version":
path: str = req.form["path"]
output: bytes = subprocess.check_output([path, "-version"])
ret["data"] = output.decode().replace("\n", "<br>")
elif sub == "download":
postprocessor: str = req.form["postprocessor"]
video_convertor: List[str]
extract_audio: List[str]
video_convertor, extract_audio = self.get_postprocessor()
preferedformat: Optional[str] = None
preferredcodec: Optional[str] = None
preferredquality: Optional[int] = None
if postprocessor in video_convertor:
preferedformat = postprocessor
elif postprocessor in extract_audio:
preferredcodec = postprocessor
preferredquality = 192
# GDM 연동 시도
if ModuleQueue:
gdm_options = {
'proxy': P.ModelSetting.get("proxy"),
'ffmpeg_path': P.ModelSetting.get("ffmpeg_path"),
}
if req.form.get("format"):
gdm_options['format'] = req.form["format"]
if preferredcodec:
gdm_options['extract_audio'] = True
gdm_options['audio_format'] = preferredcodec
if preferedformat:
gdm_options['merge_output_format'] = preferedformat
if postprocessor == "60fps":
# recode-video를 mkv로 강제하여 무조건 재인코딩(변환) 단계가 실행되도록 함.
# 그래야만 VideoConvertor 필터가 적용됨.
gdm_options['extra_args'] = gdm_options.get('extra_args', []) + [
'--recode-video', 'mkv',
'--postprocessor-args', 'VideoConvertor:-c:v libx264 -crf 23 -vf minterpolate=fps=60'
]
# 상세 로그 확인을 위해 verbose 추가 가능 (디버깅용)
# gdm_options['extra_args'] = gdm_options.get('extra_args', []) + ['--verbose']
# GDM 큐에 추가
task = ModuleQueue.add_download(
url=req.form["url"],
save_path=ToolUtil.make_path(P.ModelSetting.get("save_path")),
filename=req.form["filename"],
source_type='youtube',
caller_plugin=P.package_name,
**gdm_options
)
if task:
ret["ret"] = "success"
ret["msg"] = "GDM 대기열에 추가되었습니다."
return jsonify(ret)
# GDM 미설치 시 기존 방식 (직접 다운로드)
youtube_dl: Optional[MyYoutubeDL] = self.download(
plugin=P.package_name,
url=req.form["url"],
filename=req.form["filename"],
temp_path=ToolUtil.make_path(P.ModelSetting.get("temp_path")),
save_path=ToolUtil.make_path(P.ModelSetting.get("save_path")),
format=req.form["format"],
preferedformat=preferedformat,
preferredcodec=preferredcodec,
preferredquality=preferredquality,
proxy=P.ModelSetting.get("proxy"),
ffmpeg_path=P.ModelSetting.get("ffmpeg_path"),
)
if youtube_dl:
youtube_dl.start()
self.socketio_emit("add", youtube_dl)
ret["ret"] = "info"
ret["msg"] = "분석중..."
else:
ret["ret"] = "danger"
ret["msg"] = "다운로드 실패"
elif sub == "list":
ret["data"] = []
for i in self.youtube_dl_list:
data: Optional[Dict[str, Any]] = self.get_data(i)
if data is not None:
ret["data"].append(data)
elif sub == "all_stop":
for i in self.youtube_dl_list:
i.stop()
elif sub == "stop":
index: int = int(req.form["index"])
self.youtube_dl_list[index].stop()
elif sub == "thumbnail":
youtube_dl = self.download(
plugin=P.package_name,
url=req.form["url"],
filename=req.form["filename"],
temp_path=ToolUtil.make_path(P.ModelSetting.get("temp_path")),
save_path=ToolUtil.make_path(P.ModelSetting.get("save_path")),
format="best",
proxy=P.ModelSetting.get("proxy"),
ffmpeg_path=P.ModelSetting.get("ffmpeg_path"),
type="thumbnail",
)
if youtube_dl:
youtube_dl.start()
self.socketio_emit("add", youtube_dl)
ret["ret"] = "info"
ret["msg"] = "분석중..."
else:
ret["ret"] = "danger"
ret["msg"] = "다운로드 실패"
elif sub == "sub":
opts: Dict[str, Any] = {}
if req.form.get("all_subs") == "true":
opts["allsubtitles"] = True
else:
opts["writesubtitles"] = True
youtube_dl = self.download(
plugin=P.package_name,
url=req.form["url"],
filename=req.form["filename"],
temp_path=ToolUtil.make_path(P.ModelSetting.get("temp_path")),
save_path=ToolUtil.make_path(P.ModelSetting.get("save_path")),
format="best",
proxy=P.ModelSetting.get("proxy"),
ffmpeg_path=P.ModelSetting.get("ffmpeg_path"),
type="sub",
opts=opts,
)
if youtube_dl:
youtube_dl.start()
self.socketio_emit("add", youtube_dl)
ret["ret"] = "info"
ret["msg"] = "분석중..."
else:
ret["ret"] = "danger"
ret["msg"] = "다운로드 실패"
elif sub == "preview":
url: str = req.form["url"]
data: Optional[str] = MyYoutubeDL.get_preview_url(url)
if data:
ret["data"] = data
else:
ret["ret"] = "warning"
ret["msg"] = "미리보기 URL을 가져올 수 없습니다."
return jsonify(ret)
except Exception as error:
logger.error(f"AJAX 처리 중 예외 발생: {error}")
logger.error(traceback.format_exc())
return jsonify({"ret": "danger", "msg": str(error)})
def process_api(self, sub: str, req: Any) -> Response:
"""외부 모듈(Youtube 플러그인 등)에서의 API 요청 처리"""
try:
if sub == "info_dict":
url: str = req.values.get("url")
proxy: Optional[str] = req.values.get("proxy")
data: Optional[Dict[str, Any]] = MyYoutubeDL.get_info_dict(url, proxy)
return jsonify(data)
elif sub == "download":
# Gommi Download Manager 연동
if ModuleQueue:
url = req.values.get("url")
save_path = ToolUtil.make_path(req.values.get("save_path") or P.ModelSetting.get("save_path"))
filename = req.values.get("filename")
task = ModuleQueue.add_download(
url=url,
save_path=save_path,
filename=filename,
source_type='youtube',
caller_plugin='youtube-dl'
)
if task:
return jsonify({'ret': 'success', 'msg': '큐에 추가됨', 'task_id': task.id})
else:
return jsonify({'ret': 'fail'})
youtube_dl: Optional[MyYoutubeDL] = self.download(
plugin=req.values.get("plugin"),
url=req.values.get("url"),
filename=req.values.get("filename"),
temp_path=ToolUtil.make_path(P.ModelSetting.get("temp_path")),
save_path=ToolUtil.make_path(req.values.get("save_path") or P.ModelSetting.get("save_path")),
format=req.values.get("format"),
preferedformat=req.values.get("preferedformat"),
preferredcodec=req.values.get("preferredcodec"),
preferredquality=req.values.get("preferredquality"),
proxy=req.values.get("proxy") or P.ModelSetting.get("proxy"),
ffmpeg_path=req.values.get("ffmpeg_path") or P.ModelSetting.get("ffmpeg_path"),
key=req.values.get("key"),
)
if youtube_dl:
if req.values.get("start") == "True" or req.values.get("start") is None:
youtube_dl.start()
return jsonify(self.get_data(youtube_dl))
else:
return jsonify({"ret": "error"})
elif sub == "status":
index: int = int(req.values.get("index"))
return jsonify(self.get_data(self.youtube_dl_list[index]))
elif sub == "stop":
index: int = int(req.values.get("index"))
self.youtube_dl_list[index].stop()
return jsonify({"ret": "success"})
except Exception as error:
logger.error(f"API 처리 중 예외 발생: {error}")
logger.error(traceback.format_exc())
return jsonify({"ret": "error", "msg": str(error)})
def download(self, **kwargs: Any) -> Optional[MyYoutubeDL]:
"""다운로드 객체 생성 및 리스트 관리"""
try:
logger.debug(kwargs)
plugin: str = kwargs["plugin"]
url: str = kwargs["url"]
filename: str = kwargs["filename"]
temp_path: str = kwargs["temp_path"]
save_path: str = kwargs["save_path"]
type_name: str = kwargs.get("type", "video")
opts: Dict[str, Any] = kwargs.get("opts", {})
if "format" in kwargs and kwargs["format"]:
opts["format"] = kwargs["format"]
if type_name == "thumbnail":
opts["writethumbnail"] = True
opts["skip_download"] = True
elif type_name == "sub":
opts["skip_download"] = True
postprocessor: List[Dict[str, Any]] = opts.get("postprocessors", [])
if "preferedformat" in kwargs and kwargs["preferedformat"]:
postprocessor.append(
{
"key": "FFmpegVideoConvertor",
"preferedformat": kwargs["preferedformat"],
}
)
if "preferredcodec" in kwargs and kwargs["preferredcodec"]:
postprocessor.append(
{
"key": "FFmpegExtractAudio",
"preferredcodec": kwargs["preferredcodec"],
"preferredquality": str(kwargs.get("preferredquality", 192)),
}
)
if postprocessor:
opts["postprocessors"] = postprocessor
if "playlist" in kwargs and kwargs["playlist"]:
if kwargs["playlist"] == "reverse":
opts["playlistreverse"] = True
elif kwargs["playlist"] == "random":
opts["playlistrandom"] = True
else:
opts["playlist_items"] = kwargs["playlist"]
if "archive" in kwargs and kwargs["archive"]:
opts["download_archive"] = kwargs["archive"]
if "proxy" in kwargs and kwargs["proxy"]:
opts["proxy"] = kwargs["proxy"]
if "ffmpeg_path" in kwargs and kwargs["ffmpeg_path"]:
opts["ffmpeg_location"] = kwargs["ffmpeg_path"]
if "cookiefile" in kwargs and kwargs["cookiefile"]:
opts["cookiefile"] = kwargs["cookiefile"]
if "headers" in kwargs and kwargs["headers"]:
opts["http_headers"] = kwargs["headers"]
dateafter: Optional[str] = kwargs.get("dateafter")
youtube_dl: MyYoutubeDL = MyYoutubeDL(
plugin, type_name, url, filename, temp_path, save_path, opts, dateafter
)
youtube_dl.key = kwargs.get("key")
self.youtube_dl_list.append(youtube_dl)
return youtube_dl
except Exception as error:
logger.error(f"다운로드 객체 생성 중 예외 발생: {error}")
logger.error(traceback.format_exc())
return None
def get_data(self, youtube_dl: MyYoutubeDL) -> Optional[Dict[str, Any]]:
"""다운로드 객체의 현재 상태 데이터를 딕셔너리로 변환"""
try:
data: Dict[str, Any] = {}
data["plugin"] = youtube_dl.plugin
data["url"] = youtube_dl.url
data["filename"] = youtube_dl.filename
data["temp_path"] = youtube_dl.temp_path
data["save_path"] = youtube_dl.save_path
data["index"] = youtube_dl.index
data["status_str"] = youtube_dl.status.name
data["status_ko"] = str(youtube_dl.status)
data["end_time"] = ""
data["extractor"] = youtube_dl.type + (
" - " + youtube_dl.info_dict["extractor"]
if youtube_dl.info_dict["extractor"] is not None
else ""
)
data["title"] = (
youtube_dl.info_dict["title"]
if youtube_dl.info_dict["title"] is not None
else youtube_dl.url
)
data["uploader"] = (
youtube_dl.info_dict["uploader"]
if youtube_dl.info_dict["uploader"] is not None
else ""
)
data["uploader_url"] = (
youtube_dl.info_dict["uploader_url"]
if youtube_dl.info_dict["uploader_url"] is not None
else ""
)
data["downloaded_bytes_str"] = ""
data["total_bytes_str"] = ""
data["percent"] = "0"
data["eta"] = (
str(youtube_dl.progress_hooks["eta"])
if youtube_dl.progress_hooks["eta"] is not None
else ""
)
data["speed_str"] = (
self.human_readable_size(youtube_dl.progress_hooks["speed"], "/s")
if youtube_dl.progress_hooks["speed"] is not None
else ""
)
if youtube_dl.status == Status.READY:
data["start_time"] = ""
data["download_time"] = ""
else:
if youtube_dl.end_time is None:
download_time: Any = datetime.now() - youtube_dl.start_time
else:
download_time = youtube_dl.end_time - youtube_dl.start_time
data["end_time"] = youtube_dl.end_time.strftime("%m-%d %H:%M:%S")
if None not in (
youtube_dl.progress_hooks["downloaded_bytes"],
youtube_dl.progress_hooks["total_bytes"],
):
data["downloaded_bytes_str"] = self.human_readable_size(
youtube_dl.progress_hooks["downloaded_bytes"]
)
data["total_bytes_str"] = self.human_readable_size(
youtube_dl.progress_hooks["total_bytes"]
)
data[
"percent"
] = f"{(float(youtube_dl.progress_hooks['downloaded_bytes']) / float(youtube_dl.progress_hooks['total_bytes']) * 100):.2f}"
data["start_time"] = youtube_dl.start_time.strftime("%m-%d %H:%M:%S")
data[
"download_time"
] = f"{int(download_time.seconds / 60):02d}:{int(download_time.seconds % 60):02d}"
return data
except Exception as error:
logger.error(f"상태 데이터 변환 중 예외 발생: {error}")
logger.error(traceback.format_exc())
return None
def human_readable_size(self, size: Union[int, float, None], suffix: str = "") -> str:
"""바이트 단위를 사람이 읽기 쉬운 형식으로 변환"""
if size is None:
return ""
for unit in ("Bytes", "KB", "MB", "GB", "TB", "PB", "EB", "ZB"):
if size < 1024.0:
return f"{size:3.1f} {unit}{suffix}"
size /= 1024.0
return f"{size:.1f} YB{suffix}"
def socketio_emit(self, cmd: str, data: MyYoutubeDL) -> None:
"""Socket.IO 메시지 전송"""
F.socketio.emit(
cmd,
self.get_data(data),
namespace=f"/{P.package_name}",
)
def get_postprocessor(self) -> Tuple[List[str], List[str]]:
"""후처리 리스트 분류 (비디오/오디오)"""
video_convertor: List[str] = []
extract_audio: List[str] = []
for i in self.get_postprocessor_list():
if i[2] == "비디오 변환":
video_convertor.append(i[0])
elif i[2] == "오디오 추출":
extract_audio.append(i[0])
return video_convertor, extract_audio
# def plugin_load(self): # def plugin_load(self):
# if ( # if (
# os.path.exists( # os.path.exists(
@@ -74,12 +514,11 @@ class ModuleBasic(PluginModuleBase):
# ToolUtil.make_path(P.ModelSetting.get(f"{self.name}_path_config")), # ToolUtil.make_path(P.ModelSetting.get(f"{self.name}_path_config")),
# ) # )
@staticmethod def get_default_filename(self) -> str:
def get_default_filename():
return MyYoutubeDL.DEFAULT_FILENAME return MyYoutubeDL.DEFAULT_FILENAME
@staticmethod @staticmethod
def get_preset_list(): def get_preset_list() -> List[List[str]]:
return [ return [
["bestvideo+bestaudio/best", "최고 화질"], ["bestvideo+bestaudio/best", "최고 화질"],
["bestvideo[height<=1080]+bestaudio/best[height<=1080]", "1080p"], ["bestvideo[height<=1080]+bestaudio/best[height<=1080]", "1080p"],
@@ -95,7 +534,7 @@ class ModuleBasic(PluginModuleBase):
] ]
@staticmethod @staticmethod
def get_postprocessor_list(): def get_postprocessor_list() -> List[List[Union[str, None]]]:
return [ return [
["", "후처리 안함", None], ["", "후처리 안함", None],
["mp4", "MP4", "비디오 변환"], ["mp4", "MP4", "비디오 변환"],
@@ -115,4 +554,5 @@ class ModuleBasic(PluginModuleBase):
["opus", "Opus", "오디오 추출"], ["opus", "Opus", "오디오 추출"],
["vorbis", "Vorbis", "오디오 추출"], ["vorbis", "Vorbis", "오디오 추출"],
["wav", "WAV", "오디오 추출"], ["wav", "WAV", "오디오 추출"],
# ["60fps", "60fps 보간 (느림)", "고급 변환"],
] ]

247
my_youtube_dl.old.py Normal file
View File

@@ -0,0 +1,247 @@
from __future__ import unicode_literals
import os
import traceback
import tempfile
from glob import glob
from datetime import datetime
from threading import Thread
from enum import Enum
import framework.common.celery as celery_shutil
from .plugin import Plugin
logger = Plugin.logger
ModelSetting = Plugin.ModelSetting
youtube_dl_package = Plugin.youtube_dl_packages[
int(ModelSetting.get("youtube_dl_package"))
if ModelSetting.get("youtube_dl_package")
else 1 # LogicMain.db_default["youtube_dl_package"]
].replace("-", "_")
class Status(Enum):
READY = 0
START = 1
DOWNLOADING = 2
ERROR = 3
FINISHED = 4
STOP = 5
COMPLETED = 6
def __str__(self):
str_list = ["준비", "분석중", "다운로드중", "실패", "변환중", "중지", "완료"]
return str_list[self.value]
class MyYoutubeDL:
DEFAULT_FILENAME = "%(title)s-%(id)s.%(ext)s"
_index = 0
def __init__(
self,
plugin,
type_name,
url,
filename,
temp_path,
save_path=None,
opts=None,
dateafter=None,
datebefore=None,
):
# from youtube_dl.utils import DateRange
DateRange = __import__(
f"{youtube_dl_package}.utils", fromlist=["DateRange"]
).DateRange
if save_path is None:
save_path = temp_path
if opts is None:
opts = {}
self.plugin = plugin
self.type = type_name
self.url = url
self.filename = filename
if not os.path.isdir(temp_path):
os.makedirs(temp_path)
self.temp_path = tempfile.mkdtemp(prefix="youtube-dl_", dir=temp_path)
if not os.path.isdir(save_path):
os.makedirs(save_path)
self.save_path = save_path
self.opts = opts
if dateafter or datebefore:
self.opts["daterange"] = DateRange(start=dateafter, end=datebefore)
self.index = MyYoutubeDL._index
MyYoutubeDL._index += 1
self._status = Status.READY
self._thread = None
self.key = None
self.start_time = None # 시작 시간
self.end_time = None # 종료 시간
# info_dict에서 얻는 정보
self.info_dict = {
"extractor": None, # 타입
"title": None, # 제목
"uploader": None, # 업로더
"uploader_url": None, # 업로더 주소
}
# info_dict에서 얻는 정보(entries)
# self.info_dict['playlist_index'] = None
# self.info_dict['duration'] = None # 길이
# self.info_dict['format'] = None # 포맷
# self.info_dict['thumbnail'] = None # 썸네일
# progress_hooks에서 얻는 정보
self.progress_hooks = {
"downloaded_bytes": None, # 다운로드한 크기
"total_bytes": None, # 전체 크기
"eta": None, # 예상 시간(s)
"speed": None, # 다운로드 속도(bytes/s)
}
def start(self):
if self.status != Status.READY:
return False
self._thread = Thread(target=self.run)
self._thread.start()
return True
def run(self):
# import youtube_dl
youtube_dl = __import__(youtube_dl_package)
try:
self.start_time = datetime.now()
self.status = Status.START
# 동영상 정보 가져오기
info_dict = MyYoutubeDL.get_info_dict(
self.url,
self.opts.get("proxy"),
self.opts.get("cookiefile"),
self.opts.get("http_headers"),
)
if info_dict is None:
self.status = Status.ERROR
return
self.info_dict["extractor"] = info_dict["extractor"]
self.info_dict["title"] = info_dict.get("title", info_dict["id"])
self.info_dict["uploader"] = info_dict.get("uploader", "")
self.info_dict["uploader_url"] = info_dict.get("uploader_url", "")
ydl_opts = {
"logger": MyLogger(),
"progress_hooks": [self.my_hook],
# 'match_filter': self.match_filter_func,
"outtmpl": os.path.join(self.temp_path, self.filename),
"ignoreerrors": True,
"cachedir": False,
}
ydl_opts.update(self.opts)
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
logger.debug(self.url)
error_code = ydl.download([self.url])
logger.debug(error_code)
if self.status in (Status.START, Status.FINISHED): # 다운로드 성공
for i in glob(self.temp_path + "/**/*", recursive=True):
path = i.replace(self.temp_path, self.save_path, 1)
if os.path.isdir(i):
if not os.path.isdir(path):
os.mkdir(path)
continue
celery_shutil.move(i, path)
self.status = Status.COMPLETED
except Exception as error:
self.status = Status.ERROR
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
finally:
# 임시폴더 삭제
celery_shutil.rmtree(self.temp_path)
if self.status != Status.STOP:
self.end_time = datetime.now()
def stop(self):
if self.status in (Status.ERROR, Status.STOP, Status.COMPLETED):
return False
self.status = Status.STOP
self.end_time = datetime.now()
return True
@staticmethod
def get_version():
# from youtube_dl.version import __version__
__version__ = __import__(
f"{youtube_dl_package}.version", fromlist=["__version__"]
).__version__
return __version__
@staticmethod
def get_info_dict(url, proxy=None, cookiefile=None, http_headers=None):
# import youtube_dl
youtube_dl = __import__(youtube_dl_package)
try:
ydl_opts = {"extract_flat": "in_playlist", "logger": MyLogger()}
if proxy:
ydl_opts["proxy"] = proxy
if cookiefile:
ydl_opts["cookiefile"] = cookiefile
if http_headers:
ydl_opts["http_headers"] = http_headers
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
# info = ydl.extract_info(url, download=False)
info = ydl.extract_info(url, download=False)
except Exception as error:
logger.error("Exception:%s", error)
logger.error(traceback.format_exc())
return None
return ydl.sanitize_info(info)
def my_hook(self, data):
if self.status != Status.STOP:
self.status = {
"downloading": Status.DOWNLOADING,
"error": Status.ERROR,
"finished": Status.FINISHED, # 다운로드 완료. 변환 시작
}[data["status"]]
if data["status"] != "error":
self.filename = os.path.basename(data.get("filename"))
self.progress_hooks["downloaded_bytes"] = data.get("downloaded_bytes")
self.progress_hooks["total_bytes"] = data.get("total_bytes")
self.progress_hooks["eta"] = data.get("eta")
self.progress_hooks["speed"] = data.get("speed")
def match_filter_func(self, info_dict):
self.info_dict["playlist_index"] = info_dict["playlist_index"]
self.info_dict["duration"] = info_dict["duration"]
self.info_dict["format"] = info_dict["format"]
self.info_dict["thumbnail"] = info_dict["thumbnail"]
return None
@property
def status(self):
return self._status
@status.setter
def status(self, value):
from .main import LogicMain
self._status = value
LogicMain.socketio_emit("status", self)
class MyLogger:
def debug(self, msg):
if msg.find(" ETA ") != -1:
# 과도한 로그 방지
return
logger.debug(msg)
def warning(self, msg):
logger.warning(msg)
def error(self, msg):
logger.error(msg)

View File

@@ -7,21 +7,19 @@ from glob import glob
from datetime import datetime from datetime import datetime
from threading import Thread from threading import Thread
from enum import Enum from enum import Enum
from typing import Optional, Dict, List, Any, Tuple, Union
from framework import celery as celery_shutil import shutil as celery_shutil
# from .plugin import Plugin
from .setup import P from .setup import P
logger = P.logger logger = P.logger
ModelSetting = P.ModelSetting ModelSetting = P.ModelSetting
youtube_dl_package = P.youtube_dl_packages[ # yt-dlp 패키지 설정 (기본값 index 1)
int(ModelSetting.get("youtube_dl_package")) package_idx: str = ModelSetting.get("youtube_dl_package")
if ModelSetting.get("youtube_dl_package") if not package_idx:
else 1 # LogicMain.db_default["youtube_dl_package"] package_idx = "1"
].replace("-", "_") youtube_dl_package: str = P.youtube_dl_packages[int(package_idx)].replace("-", "_")
class Status(Enum): class Status(Enum):
@@ -33,29 +31,29 @@ class Status(Enum):
STOP = 5 STOP = 5
COMPLETED = 6 COMPLETED = 6
def __str__(self): def __str__(self) -> str:
str_list = ["준비", "분석중", "다운로드중", "실패", "변환중", "중지", "완료"] str_list: List[str] = ["준비", "분석중", "다운로드중", "실패", "변환중", "중지", "완료"]
return str_list[self.value] return str_list[self.value]
class MyYoutubeDL: class MyYoutubeDL:
DEFAULT_FILENAME = "%(title)s-%(id)s.%(ext)s" DEFAULT_FILENAME: str = "%(title)s-%(id)s.%(ext)s"
_index = 0 _index: int = 0
def __init__( def __init__(
self, self,
plugin, plugin: str,
type_name, type_name: str,
url, url: str,
filename, filename: str,
temp_path, temp_path: str,
save_path=None, save_path: Optional[str] = None,
opts=None, opts: Optional[Dict[str, Any]] = None,
dateafter=None, dateafter: Optional[str] = None,
datebefore=None, datebefore: Optional[str] = None,
): ):
# from youtube_dl.utils import DateRange # yt-dlp/youtube-dlutils 모듈에서 DateRange 임포트
DateRange = __import__( DateRange = __import__(
f"{youtube_dl_package}.utils", fromlist=["DateRange"] f"{youtube_dl_package}.utils", fromlist=["DateRange"]
).DateRange ).DateRange
@@ -64,90 +62,112 @@ class MyYoutubeDL:
save_path = temp_path save_path = temp_path
if opts is None: if opts is None:
opts = {} opts = {}
self.plugin = plugin
self.type = type_name self.plugin: str = plugin
self.url = url self.type: str = type_name
self.filename = filename self.url: str = url
self.filename: str = filename
# 임시 폴더 생성
if not os.path.isdir(temp_path): if not os.path.isdir(temp_path):
os.makedirs(temp_path) os.makedirs(temp_path)
self.temp_path = tempfile.mkdtemp(prefix="youtube-dl_", dir=temp_path) self.temp_path: str = tempfile.mkdtemp(prefix="youtube-dl_", dir=temp_path)
# 저장 폴더 생성
if not os.path.isdir(save_path): if not os.path.isdir(save_path):
os.makedirs(save_path) os.makedirs(save_path)
self.save_path = save_path self.save_path: str = save_path
self.opts = opts
self.opts: Dict[str, Any] = opts
if dateafter or datebefore: if dateafter or datebefore:
self.opts["daterange"] = DateRange(start=dateafter, end=datebefore) self.opts["daterange"] = DateRange(start=dateafter, end=datebefore)
self.index = MyYoutubeDL._index
self.index: int = MyYoutubeDL._index
MyYoutubeDL._index += 1 MyYoutubeDL._index += 1
self._status = Status.READY
self._thread = None self._status: Status = Status.READY
self.key = None self._thread: Optional[Thread] = None
self.start_time = None # 시작 시간 self.key: Optional[str] = None
self.end_time = None # 종료 시간 self.start_time: Optional[datetime] = None
# info_dict에서 얻는 정보 self.end_time: Optional[datetime] = None
self.info_dict = {
"extractor": None, # 타입 # 비디오 정보
"title": None, # 제목 self.info_dict: Dict[str, Optional[str]] = {
"uploader": None, # 업로더 "extractor": None,
"uploader_url": None, # 업로더 주소 "title": None,
} "uploader": None,
# info_dict에서 얻는 정보(entries) "uploader_url": None,
# self.info_dict['playlist_index'] = None
# self.info_dict['duration'] = None # 길이
# self.info_dict['format'] = None # 포맷
# self.info_dict['thumbnail'] = None # 썸네일
# progress_hooks에서 얻는 정보
self.progress_hooks = {
"downloaded_bytes": None, # 다운로드한 크기
"total_bytes": None, # 전체 크기
"eta": None, # 예상 시간(s)
"speed": None, # 다운로드 속도(bytes/s)
} }
def start(self): # 진행률 정보
self.progress_hooks: Dict[str, Optional[Union[int, float, str]]] = {
"downloaded_bytes": None,
"total_bytes": None,
"eta": None,
"speed": None,
}
def start(self) -> bool:
"""다운로드 스레드 시작"""
if self.status != Status.READY: if self.status != Status.READY:
return False return False
self._thread = Thread(target=self.run) self._thread = Thread(target=self.run)
self._thread.start() self._thread.start()
return True return True
def run(self): def run(self) -> None:
# import youtube_dl """다운로드 실행 본체"""
youtube_dl = __import__(youtube_dl_package) youtube_dl = __import__(youtube_dl_package)
try: try:
self.start_time = datetime.now() self.start_time = datetime.now()
self.status = Status.START self.status = Status.START
# 동영상 정보 가져오기
# 정보 추출
info_dict = MyYoutubeDL.get_info_dict( info_dict = MyYoutubeDL.get_info_dict(
self.url, self.url,
self.opts.get("proxy"), self.opts.get("proxy"),
self.opts.get("cookiefile"), self.opts.get("cookiefile"),
self.opts.get("http_headers"), self.opts.get("http_headers"),
self.opts.get("cookiesfrombrowser"),
) )
if info_dict is None: if info_dict is None:
self.status = Status.ERROR self.status = Status.ERROR
return return
self.info_dict["extractor"] = info_dict["extractor"]
self.info_dict["title"] = info_dict.get("title", info_dict["id"]) self.info_dict["extractor"] = info_dict.get("extractor", "unknown")
self.info_dict["title"] = info_dict.get("title", info_dict.get("id", "unknown"))
self.info_dict["uploader"] = info_dict.get("uploader", "") self.info_dict["uploader"] = info_dict.get("uploader", "")
self.info_dict["uploader_url"] = info_dict.get("uploader_url", "") self.info_dict["uploader_url"] = info_dict.get("uploader_url", "")
ydl_opts = {
ydl_opts: Dict[str, Any] = {
"logger": MyLogger(), "logger": MyLogger(),
"progress_hooks": [self.my_hook], "progress_hooks": [self.my_hook],
# 'match_filter': self.match_filter_func,
"outtmpl": os.path.join(self.temp_path, self.filename), "outtmpl": os.path.join(self.temp_path, self.filename),
"ignoreerrors": True, "ignoreerrors": True,
"cachedir": False, "cachedir": False,
"nocheckcertificate": True,
} }
# yt-dlp 전용 성능 향상 옵션
if youtube_dl_package == "yt_dlp":
ydl_opts.update({
"concurrent_fragment_downloads": 5,
"retries": 10,
})
ydl_opts.update(self.opts) ydl_opts.update(self.opts)
with youtube_dl.YoutubeDL(ydl_opts) as ydl: with youtube_dl.YoutubeDL(ydl_opts) as ydl:
logger.debug(self.url) logger.debug(f"다운로드 시작: {self.url}")
error_code = ydl.download([self.url]) error_code: int = ydl.download([self.url])
logger.debug(error_code) logger.debug(f"다운로드 종료 (코드: {error_code})")
if self.status in (Status.START, Status.FINISHED): # 다운로드 성공
if self.status in (Status.START, Status.FINISHED, Status.DOWNLOADING):
# 임시 폴더의 파일을 실제 저장 경로로 이동
for i in glob(self.temp_path + "/**/*", recursive=True): for i in glob(self.temp_path + "/**/*", recursive=True):
path = i.replace(self.temp_path, self.save_path, 1) path: str = i.replace(self.temp_path, self.save_path, 1)
if os.path.isdir(i): if os.path.isdir(i):
if not os.path.isdir(path): if not os.path.isdir(path):
os.mkdir(path) os.mkdir(path)
@@ -156,15 +176,16 @@ class MyYoutubeDL:
self.status = Status.COMPLETED self.status = Status.COMPLETED
except Exception as error: except Exception as error:
self.status = Status.ERROR self.status = Status.ERROR
logger.error("Exception:%s", error) logger.error(f"실행 중 예외 발생: {error}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
finally: finally:
# 임시폴더 삭제 if os.path.exists(self.temp_path):
celery_shutil.rmtree(self.temp_path) celery_shutil.rmtree(self.temp_path)
if self.status != Status.STOP: if self.status != Status.STOP:
self.end_time = datetime.now() self.end_time = datetime.now()
def stop(self): def stop(self) -> bool:
"""다운로드 중지"""
if self.status in (Status.ERROR, Status.STOP, Status.COMPLETED): if self.status in (Status.ERROR, Status.STOP, Status.COMPLETED):
return False return False
self.status = Status.STOP self.status = Status.STOP
@@ -172,78 +193,109 @@ class MyYoutubeDL:
return True return True
@staticmethod @staticmethod
def get_version(): def get_preview_url(url: str) -> Optional[str]:
# from youtube_dl.version import __version__ """미리보기용 직접 재생 가능한 URL 추출"""
__version__ = __import__( youtube_dl = __import__(youtube_dl_package)
try:
# 미리보기를 위해 포맷 필터링 (mp4, 비디오+오디오 권장)
ydl_opts: Dict[str, Any] = {
"format": "best[ext=mp4]/best",
"logger": MyLogger(),
"nocheckcertificate": True,
"quiet": True,
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
info: Dict[str, Any] = ydl.extract_info(url, download=False)
return info.get("url")
except Exception as error:
logger.error(f"미리보기 URL 추출 중 예외 발생: {error}")
return None
@staticmethod
def get_version() -> str:
"""라이브러리 버전 확인"""
__version__: str = __import__(
f"{youtube_dl_package}.version", fromlist=["__version__"] f"{youtube_dl_package}.version", fromlist=["__version__"]
).__version__ ).__version__
return __version__ return __version__
@staticmethod @staticmethod
def get_info_dict(url, proxy=None, cookiefile=None, http_headers=None): def get_info_dict(
# import youtube_dl url: str,
proxy: Optional[str] = None,
cookiefile: Optional[str] = None,
http_headers: Optional[Dict[str, str]] = None,
cookiesfrombrowser: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""비디오 메타데이터 정보 추출"""
youtube_dl = __import__(youtube_dl_package) youtube_dl = __import__(youtube_dl_package)
try: try:
ydl_opts = {"extract_flat": "in_playlist", "logger": MyLogger()} ydl_opts: Dict[str, Any] = {
"extract_flat": "in_playlist",
"logger": MyLogger(),
"nocheckcertificate": True,
}
if proxy: if proxy:
ydl_opts["proxy"] = proxy ydl_opts["proxy"] = proxy
if cookiefile: if cookiefile:
ydl_opts["cookiefile"] = cookiefile ydl_opts["cookiefile"] = cookiefile
if http_headers: if http_headers:
ydl_opts["http_headers"] = http_headers ydl_opts["http_headers"] = http_headers
if cookiesfrombrowser:
ydl_opts["cookiesfrombrowser"] = (cookiesfrombrowser, None, None, None)
with youtube_dl.YoutubeDL(ydl_opts) as ydl: with youtube_dl.YoutubeDL(ydl_opts) as ydl:
# info = ydl.extract_info(url, download=False) info: Dict[str, Any] = ydl.extract_info(url, download=False)
info = ydl.extract_info(url, download=False)
except Exception as error: except Exception as error:
logger.error("Exception:%s", error) logger.error(f"정보 추출 중 예외 발생: {error}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
return None return None
return ydl.sanitize_info(info) return ydl.sanitize_info(info)
def my_hook(self, data): def my_hook(self, data: Dict[str, Any]) -> None:
"""진행률 업데이트 훅"""
if self.status != Status.STOP: if self.status != Status.STOP:
self.status = { if data["status"] == "downloading":
"downloading": Status.DOWNLOADING, self.status = Status.DOWNLOADING
"error": Status.ERROR, elif data["status"] == "error":
"finished": Status.FINISHED, # 다운로드 완료. 변환 시작 self.status = Status.ERROR
}[data["status"]] elif data["status"] == "finished":
self.status = Status.FINISHED
if data["status"] != "error": if data["status"] != "error":
self.filename = os.path.basename(data.get("filename")) self.filename = os.path.basename(data.get("filename", self.filename))
self.progress_hooks["downloaded_bytes"] = data.get("downloaded_bytes") self.progress_hooks["downloaded_bytes"] = data.get("downloaded_bytes")
self.progress_hooks["total_bytes"] = data.get("total_bytes") self.progress_hooks["total_bytes"] = data.get("total_bytes") or data.get("total_bytes_estimate")
self.progress_hooks["eta"] = data.get("eta") self.progress_hooks["eta"] = data.get("eta")
self.progress_hooks["speed"] = data.get("speed") self.progress_hooks["speed"] = data.get("speed")
def match_filter_func(self, info_dict):
self.info_dict["playlist_index"] = info_dict["playlist_index"]
self.info_dict["duration"] = info_dict["duration"]
self.info_dict["format"] = info_dict["format"]
self.info_dict["thumbnail"] = info_dict["thumbnail"]
return None
@property @property
def status(self): def status(self) -> Status:
return self._status return self._status
@status.setter @status.setter
def status(self, value): def status(self, value: Status) -> None:
from .main import LogicMain
self._status = value self._status = value
LogicMain.socketio_emit("status", self) # Socket.IO를 통한 상태 업데이트 전송
try:
basic_module = P.get_module('basic')
if basic_module:
basic_module.socketio_emit("status", self)
except Exception as e:
logger.error(f"SocketIO 전송 에러: {e}")
class MyLogger: class MyLogger:
def debug(self, msg): """yt-dlp의 로그를 가로채서 처리하는 클래성"""
if msg.find(" ETA ") != -1: def debug(self, msg: str) -> None:
# 과도한 로그 방지 # 진행 상황 관련 로그는 걸러냄
if " ETA " in msg or "at" in msg and "B/s" in msg:
return return
logger.debug(msg) logger.debug(msg)
def warning(self, msg): def warning(self, msg: str) -> None:
logger.warning(msg) logger.warning(msg)
def error(self, msg): def error(self, msg: str) -> None:
logger.error(msg) logger.error(msg)

View File

@@ -12,25 +12,15 @@ __menu = {
"list": [ "list": [
{ {
"uri": "basic", "uri": "basic",
"name": "기본 처리", "name": "유튜브",
"list": [ "list": [
{ {
"uri": "setting", "uri": "setting",
"name": "설정", "name": "설정",
}, },
],
},
{
"uri": "basic/download",
"name": "다운로드",
},
{ {
"uri": "download", "uri": "download",
"name": "다운로드", "name": "직접 다운로드",
"list": [
{
"uri": "basic",
"name": "다운로드",
}, },
], ],
}, },

View File

@@ -64,6 +64,52 @@
return; return;
} }
post_ajax('/download', get_formdata('#download')); post_ajax('/basic/download', getFormdata('#download'));
});
// Artplayer 미리보기 로직
let art = null;
let last_preview_url = '';
const init_artplayer = (video_url) => {
const wrapper = document.getElementById('player-wrapper');
wrapper.style.display = 'block';
if (art) {
art.switchUrl(video_url);
return;
}
art = new Artplayer({
container: '#player-wrapper',
url: video_url,
autoplay: false,
pip: true,
setting: true,
flip: true,
playbackRate: true,
aspectRatio: true,
fullscreen: true,
fullscreenWeb: true,
miniProgressBar: true,
mutex: true,
backdrop: true,
playsInline: true,
autoPlayback: false,
airplay: true,
theme: '#23ade5',
});
};
url.addEventListener('change', () => {
const target_url = url.value.trim();
if (target_url && target_url.startsWith('http') && target_url !== last_preview_url) {
last_preview_url = target_url;
post_ajax('/basic/preview', { url: target_url }).then((ret) => {
if (ret.ret === 'success' && ret.data) {
init_artplayer(ret.data);
}
});
}
}); });
})(); })();

View File

@@ -128,14 +128,14 @@
}); });
const reload_list = async () => { const reload_list = async () => {
const { data } = await post_ajax('/list'); const { data } = await post_ajax('/basic/list');
list_tbody.innerHTML = data.map((item) => make_item(item)).join(''); list_tbody.innerHTML = data.map((item) => make_item(item)).join('');
}; };
// 전체 중지 // 전체 중지
all_stop_btn.addEventListener('click', (event) => { all_stop_btn.addEventListener('click', (event) => {
event.preventDefault(); event.preventDefault();
post_ajax('/all_stop').then(reload_list); post_ajax('/basic/all_stop').then(reload_list);
}); });
// 중지 // 중지
@@ -145,7 +145,7 @@
if (!target.classList.contains('youtubeDl-stop')) { if (!target.classList.contains('youtubeDl-stop')) {
return; return;
} }
post_ajax('/stop', { post_ajax('/basic/stop', {
index: target.dataset.index, index: target.dataset.index,
}).then(reload_list); }).then(reload_list);
}); });

View File

@@ -49,7 +49,7 @@
ffmpeg = 'ffmpeg'; ffmpeg = 'ffmpeg';
} }
post_ajax('/ffmpeg_version', { post_ajax('/basic/ffmpeg_version', {
path: ffmpeg, path: ffmpeg,
}).then(({ data }) => { }).then(({ data }) => {
modal_title.innerHTML = `${ffmpeg} -version`; modal_title.innerHTML = `${ffmpeg} -version`;

View File

@@ -47,6 +47,6 @@
return; return;
} }
post_ajax('/sub', get_formdata('#download')); post_ajax('/basic/sub', get_formdata('#download'));
}); });
})(); })();

View File

@@ -42,6 +42,6 @@
return; return;
} }
post_ajax('/thumbnail', get_formdata('#download')); post_ajax('/basic/thumbnail', get_formdata('#download'));
}); });
})(); })();

View File

@@ -30,6 +30,21 @@
{% endmacro %} {% endmacro %}
{% block content %} {% block content %}
<style>
#player-wrapper {
width: 100%;
max-width: 800px;
height: 450px;
margin: 0 auto 20px auto;
display: none; /* 초기에는 숨김 */
background: #000;
border-radius: 8px;
overflow: hidden;
box-shadow: 0 4px 15px rgba(0,0,0,0.3);
}
</style>
<div id="player-wrapper"></div>
<form id="download"> <form id="download">
{{ macros.setting_input_text('url', 'URL', placeholder='http:// 주소', desc='유튜브, 네이버TV 등 동영상 주소') }} {{ macros.setting_input_text('url', 'URL', placeholder='http:// 주소', desc='유튜브, 네이버TV 등 동영상 주소') }}
@@ -44,6 +59,7 @@
"use strict"; "use strict";
const package_name = '{{ arg["package_name"] }}'; const package_name = '{{ arg["package_name"] }}';
</script> </script>
<script src="https://cdn.jsdelivr.net/npm/artplayer/dist/artplayer.js"></script>
<script src="{{ url_for('.static', filename='%s.js' % arg['template_name']) }}?ver={{ arg['package_version'] }}"></script> <script src="{{ url_for('.static', filename='%s.js' % arg['template_name']) }}?ver={{ arg['package_version'] }}"></script>
{% endblock %} {% endblock %}