diff --git a/main.py b/main.py
index ee2c906..1c52575 100644
--- a/main.py
+++ b/main.py
@@ -1,543 +1,10 @@
-import os
-import sys
-import platform
-import traceback
-import subprocess
-import sqlite3
-from datetime import datetime
+from typing import Any
-from flask import render_template, jsonify
+from .setup import P
-from .my_youtube_dl import MyYoutubeDL, Status
+logger: Any = P.logger
+package_name: str = P.package_name
+ModelSetting: Any = P.ModelSetting
-logger = P.logger
-package_name = P.package_name
-ModelSetting = P.ModelSetting
-
-
-class LogicMain(LogicModuleBase):
- db_default = {
- "db_version": "2",
- "youtube_dl_package": "1",
- "ffmpeg_path": ""
- if platform.system() != "Windows"
- else os.path.join(path_app_root, "bin", "Windows", "ffmpeg.exe"),
- "temp_path": os.path.join(path_data, "download_tmp"),
- "save_path": os.path.join(path_data, "download"),
- "default_filename": "",
- "proxy": "",
- }
-
- def __init__(self, plugin):
- super(LogicMain, self).__init__(plugin, None)
- self.name = package_name # 모듈명
- default_route_socketio(plugin, self)
-
- def plugin_load(self):
- try:
- # youtube-dl 업데이트
- youtube_dl = Plugin.youtube_dl_packages[
- int(ModelSetting.get("youtube_dl_package"))
- ]
- logger.debug(f"{youtube_dl} upgrade")
- logger.debug(
- subprocess.check_output(
- [sys.executable, "-m", "pip", "install", "--upgrade", youtube_dl],
- universal_newlines=True,
- )
- )
- except Exception as error:
- logger.error("Exception:%s", error)
- logger.error(traceback.format_exc())
-
- def process_menu(self, sub, req):
- try:
- arg = {
- "package_name": package_name,
- "sub": sub,
- "template_name": f"{package_name}_{sub}",
- "package_version": Plugin.plugin_info["version"],
- }
-
- if sub == "setting":
- arg.update(ModelSetting.to_dict())
- arg["package_list"] = Plugin.youtube_dl_packages
- arg["youtube_dl_version"] = LogicMain.get_youtube_dl_version()
- arg["DEFAULT_FILENAME"] = LogicMain.get_default_filename()
-
- elif sub == "download":
- default_filename = ModelSetting.get("default_filename")
- arg["filename"] = (
- default_filename
- if default_filename
- else LogicMain.get_default_filename()
- )
- arg["preset_list"] = LogicMain.get_preset_list()
- arg["postprocessor_list"] = LogicMain.get_postprocessor_list()
-
- elif sub == "thumbnail":
- default_filename = ModelSetting.get("default_filename")
- arg["filename"] = (
- default_filename
- if default_filename
- else LogicMain.get_default_filename()
- )
-
- elif sub == "sub":
- default_filename = ModelSetting.get("default_filename")
- arg["filename"] = (
- default_filename
- if default_filename
- else LogicMain.get_default_filename()
- )
-
- elif sub == "list":
- pass
-
- return render_template(f"{package_name}_{sub}.html", arg=arg)
- except Exception as error:
- logger.error("Exception:%s", error)
- logger.error(traceback.format_exc())
- return render_template("sample.html", title=f"{package_name} - {sub}")
-
- def process_ajax(self, sub, req):
- try:
- logger.debug("AJAX: %s, %s", sub, req.values)
- ret = {"ret": "success"}
-
- if sub == "ffmpeg_version":
- path = req.form["path"]
- output = subprocess.check_output([path, "-version"])
- output = output.decode().replace("\n", "
")
- ret["data"] = output
-
- elif sub == "download":
- postprocessor = req.form["postprocessor"]
- video_convertor, extract_audio = LogicMain.get_postprocessor()
- preferedformat = None
- preferredcodec = None
- preferredquality = None
- if postprocessor in video_convertor:
- preferedformat = postprocessor
- elif postprocessor in extract_audio:
- preferredcodec = postprocessor
- preferredquality = 192
- youtube_dl = LogicMain.download(
- plugin=package_name,
- url=req.form["url"],
- filename=req.form["filename"],
- temp_path=ModelSetting.get("temp_path"),
- save_path=ModelSetting.get("save_path"),
- format=req.form["format"],
- preferedformat=preferedformat,
- preferredcodec=preferredcodec,
- preferredquality=preferredquality,
- proxy=ModelSetting.get("proxy"),
- ffmpeg_path=ModelSetting.get("ffmpeg_path"),
- )
- youtube_dl.start()
- LogicMain.socketio_emit("add", youtube_dl)
- ret["ret"] = "info"
- ret["msg"] = "분석중..."
-
- elif sub == "thumbnail":
- youtube_dl = LogicMain.thumbnail(
- plugin=package_name,
- url=req.form["url"],
- filename=req.form["filename"],
- temp_path=ModelSetting.get("temp_path"),
- save_path=ModelSetting.get("save_path"),
- all_thumbnails=req.form["all_thumbnails"],
- proxy=ModelSetting.get("proxy"),
- ffmpeg_path=ModelSetting.get("ffmpeg_path"),
- )
- youtube_dl.start()
- LogicMain.socketio_emit("add", youtube_dl)
- ret["ret"] = "info"
- ret["msg"] = "분석중..."
-
- elif sub == "sub":
- youtube_dl = LogicMain.sub(
- plugin=package_name,
- url=req.form["url"],
- filename=req.form["filename"],
- temp_path=ModelSetting.get("temp_path"),
- save_path=ModelSetting.get("save_path"),
- all_subs=req.form["all_subs"],
- sub_lang=req.form["sub_lang"],
- auto_sub=req.form["auto_sub"],
- proxy=ModelSetting.get("proxy"),
- ffmpeg_path=ModelSetting.get("ffmpeg_path"),
- )
- youtube_dl.start()
- LogicMain.socketio_emit("add", youtube_dl)
- ret["ret"] = "info"
- ret["msg"] = "분석중..."
-
- elif sub == "list":
- ret["data"] = []
- for i in LogicMain.youtube_dl_list:
- data = LogicMain.get_data(i)
- if data is not None:
- ret["data"].append(data)
-
- elif sub == "all_stop":
- for i in LogicMain.youtube_dl_list:
- i.stop()
-
- elif sub == "stop":
- index = int(req.form["index"])
- LogicMain.youtube_dl_list[index].stop()
-
- return jsonify(ret)
- except Exception as error:
- logger.error("Exception:%s", error)
- logger.error(traceback.format_exc())
- return jsonify({"ret": "danger", "msg": str(error)})
-
- def migration(self):
- try:
- db_version = ModelSetting.get_int("db_version")
- connect = sqlite3.connect(
- os.path.join(path_data, "db", f"{package_name}.db")
- )
-
- if db_version < 2:
- logger.debug("youtube-dlc uninstall")
- logger.debug(
- subprocess.check_output(
- [sys.executable, "-m", "pip", "uninstall", "-y", "youtube-dlc"],
- universal_newlines=True,
- )
- )
-
- connect.commit()
- connect.close()
- ModelSetting.set("db_version", LogicMain.db_default["db_version"])
- db.session.flush()
- except Exception as error:
- logger.error("Exception:%s", error)
- logger.error(traceback.format_exc())
-
- youtube_dl_list = []
-
- @staticmethod
- def get_youtube_dl_version():
- try:
- return MyYoutubeDL.get_version()
- except Exception as error:
- logger.error("Exception:%s", error)
- logger.error(traceback.format_exc())
- return "패키지 임포트 실패"
-
- @staticmethod
- def get_default_filename():
- return MyYoutubeDL.DEFAULT_FILENAME
-
- @staticmethod
- def get_preset_list():
- return [
- ["bestvideo+bestaudio/best", "최고 화질"],
- ["bestvideo[height<=1080]+bestaudio/best[height<=1080]", "1080p"],
- ["worstvideo+worstaudio/worst", "최저 화질"],
- ["bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]", "최고 화질(mp4)"],
- [
- "bestvideo[ext=mp4][height<=1080]+bestaudio[ext=m4a]/best[ext=mp4][height<=1080]",
- "1080p(mp4)",
- ],
- ["bestvideo[filesize<50M]+bestaudio/best[filesize<50M]", "50MB 미만"],
- ["bestaudio/best", "오디오만"],
- ["_custom", "사용자 정의"],
- ]
-
- @staticmethod
- def get_postprocessor_list():
- return [
- ["", "후처리 안함", None],
- ["mp4", "MP4", "비디오 변환"],
- ["flv", "FLV", "비디오 변환"],
- ["webm", "WebM", "비디오 변환"],
- ["ogg", "Ogg", "비디오 변환"],
- ["mkv", "MKV", "비디오 변환"],
- ["ts", "TS", "비디오 변환"],
- ["avi", "AVI", "비디오 변환"],
- ["wmv", "WMV", "비디오 변환"],
- ["mov", "MOV", "비디오 변환"],
- ["gif", "GIF", "비디오 변환"],
- ["mp3", "MP3", "오디오 추출"],
- ["aac", "AAC", "오디오 추출"],
- ["flac", "FLAC", "오디오 추출"],
- ["m4a", "M4A", "오디오 추출"],
- ["opus", "Opus", "오디오 추출"],
- ["vorbis", "Vorbis", "오디오 추출"],
- ["wav", "WAV", "오디오 추출"],
- ]
-
- @staticmethod
- def get_postprocessor():
- video_convertor = []
- extract_audio = []
- for i in LogicMain.get_postprocessor_list():
- if i[2] == "비디오 변환":
- video_convertor.append(i[0])
- elif i[2] == "오디오 추출":
- extract_audio.append(i[0])
- return video_convertor, extract_audio
-
- @staticmethod
- def download(**kwagrs):
- try:
- logger.debug(kwagrs)
- plugin = kwagrs["plugin"]
- url = kwagrs["url"]
- filename = kwagrs["filename"]
- temp_path = kwagrs["temp_path"]
- save_path = kwagrs["save_path"]
- opts = {}
- if "format" in kwagrs and kwagrs["format"]:
- opts["format"] = kwagrs["format"]
- postprocessor = []
- if "preferedformat" in kwagrs and kwagrs["preferedformat"]:
- postprocessor.append(
- {
- "key": "FFmpegVideoConvertor",
- "preferedformat": kwagrs["preferedformat"],
- }
- )
- if "preferredcodec" in kwagrs and kwagrs["preferredcodec"]:
- postprocessor.append(
- {
- "key": "FFmpegExtractAudio",
- "preferredcodec": kwagrs["preferredcodec"],
- "preferredquality": str(kwagrs["preferredquality"]),
- }
- )
- if postprocessor:
- opts["postprocessors"] = postprocessor
- if "playlist" in kwagrs and kwagrs["playlist"]:
- if kwagrs["playlist"] == "reverse":
- opts["playlistreverse"] = True
- elif kwagrs["playlist"] == "random":
- opts["playlistrandom"] = True
- else:
- opts["playlist_items"] = kwagrs["playlist"]
- if "archive" in kwagrs and kwagrs["archive"]:
- opts["download_archive"] = kwagrs["archive"]
- if "proxy" in kwagrs and kwagrs["proxy"]:
- opts["proxy"] = kwagrs["proxy"]
- if "ffmpeg_path" in kwagrs and kwagrs["ffmpeg_path"]:
- opts["ffmpeg_location"] = kwagrs["ffmpeg_path"]
- if "cookiefile" in kwagrs and kwagrs["cookiefile"]:
- opts["cookiefile"] = kwagrs["cookiefile"]
- if "headers" in kwagrs and kwagrs["headers"]:
- opts["http_headers"] = kwagrs["headers"]
- dateafter = kwagrs.get("dateafter")
- youtube_dl = MyYoutubeDL(
- plugin, "video", url, filename, temp_path, save_path, opts, dateafter
- )
- youtube_dl.key = kwagrs.get("key")
- LogicMain.youtube_dl_list.append(youtube_dl) # 리스트 추가
- return youtube_dl
- except Exception as error:
- logger.error("Exception:%s", error)
- logger.error(traceback.format_exc())
- return None
-
- @staticmethod
- def thumbnail(**kwagrs):
- try:
- logger.debug(kwagrs)
- plugin = kwagrs["plugin"]
- url = kwagrs["url"]
- filename = kwagrs["filename"]
- temp_path = kwagrs["temp_path"]
- save_path = kwagrs["save_path"]
- opts = {"skip_download": True}
- if (
- "all_thumbnails" in kwagrs
- and str(kwagrs["all_thumbnails"]).lower() != "false"
- ):
- opts["write_all_thumbnails"] = True
- else:
- opts["writethumbnail"] = True
- if "playlist" in kwagrs and kwagrs["playlist"]:
- if kwagrs["playlist"] == "reverse":
- opts["playlistreverse"] = True
- elif kwagrs["playlist"] == "random":
- opts["playlistrandom"] = True
- else:
- opts["playlist_items"] = kwagrs["playlist"]
- if "archive" in kwagrs and kwagrs["archive"]:
- opts["download_archive"] = kwagrs["archive"]
- if "proxy" in kwagrs and kwagrs["proxy"]:
- opts["proxy"] = kwagrs["proxy"]
- if "ffmpeg_path" in kwagrs and kwagrs["ffmpeg_path"]:
- opts["ffmpeg_location"] = kwagrs["ffmpeg_path"]
- if "cookiefile" in kwagrs and kwagrs["cookiefile"]:
- opts["cookiefile"] = kwagrs["cookiefile"]
- if "headers" in kwagrs and kwagrs["headers"]:
- opts["http_headers"] = kwagrs["headers"]
- dateafter = kwagrs.get("dateafter")
- youtube_dl = MyYoutubeDL(
- plugin,
- "thumbnail",
- url,
- filename,
- temp_path,
- save_path,
- opts,
- dateafter,
- )
- youtube_dl.key = kwagrs.get("key")
- LogicMain.youtube_dl_list.append(youtube_dl) # 리스트 추가
- return youtube_dl
- except Exception as error:
- logger.error("Exception:%s", error)
- logger.error(traceback.format_exc())
- return None
-
- @staticmethod
- def sub(**kwagrs):
- try:
- logger.debug(kwagrs)
- plugin = kwagrs["plugin"]
- url = kwagrs["url"]
- filename = kwagrs["filename"]
- temp_path = kwagrs["temp_path"]
- save_path = kwagrs["save_path"]
- opts = {"skip_download": True}
- sub_lang = map(
- lambda x: x.strip(), kwagrs["sub_lang"].split(",")
- ) # 문자열을 리스트로 변환
- if "all_subs" in kwagrs and str(kwagrs["all_subs"]).lower() != "false":
- opts["allsubtitles"] = True
- else:
- opts["subtitleslangs"] = sub_lang
- if "auto_sub" in kwagrs and str(kwagrs["auto_sub"]).lower() != "false":
- opts["writeautomaticsub"] = True
- else:
- opts["writesubtitles"] = True
- if "playlist" in kwagrs and kwagrs["playlist"]:
- if kwagrs["playlist"] == "reverse":
- opts["playlistreverse"] = True
- elif kwagrs["playlist"] == "random":
- opts["playlistrandom"] = True
- else:
- opts["playlist_items"] = kwagrs["playlist"]
- if "archive" in kwagrs and kwagrs["archive"]:
- opts["download_archive"] = kwagrs["archive"]
- if "proxy" in kwagrs and kwagrs["proxy"]:
- opts["proxy"] = kwagrs["proxy"]
- if "ffmpeg_path" in kwagrs and kwagrs["ffmpeg_path"]:
- opts["ffmpeg_location"] = kwagrs["ffmpeg_path"]
- if "cookiefile" in kwagrs and kwagrs["cookiefile"]:
- opts["cookiefile"] = kwagrs["cookiefile"]
- if "headers" in kwagrs and kwagrs["headers"]:
- opts["http_headers"] = kwagrs["headers"]
- dateafter = kwagrs.get("dateafter")
- youtube_dl = MyYoutubeDL(
- plugin, "subtitle", url, filename, temp_path, save_path, opts, dateafter
- )
- youtube_dl.key = kwagrs.get("key")
- LogicMain.youtube_dl_list.append(youtube_dl) # 리스트 추가
- return youtube_dl
- except Exception as error:
- logger.error("Exception:%s", error)
- logger.error(traceback.format_exc())
- return None
-
- @staticmethod
- def get_data(youtube_dl):
- try:
- data = {}
- data["plugin"] = youtube_dl.plugin
- data["url"] = youtube_dl.url
- data["filename"] = youtube_dl.filename
- data["temp_path"] = youtube_dl.temp_path
- data["save_path"] = youtube_dl.save_path
- data["index"] = youtube_dl.index
- data["status_str"] = youtube_dl.status.name
- data["status_ko"] = str(youtube_dl.status)
- data["end_time"] = ""
- data["extractor"] = youtube_dl.type + (
- " - " + youtube_dl.info_dict["extractor"]
- if youtube_dl.info_dict["extractor"] is not None
- else ""
- )
- data["title"] = (
- youtube_dl.info_dict["title"]
- if youtube_dl.info_dict["title"] is not None
- else youtube_dl.url
- )
- data["uploader"] = (
- youtube_dl.info_dict["uploader"]
- if youtube_dl.info_dict["uploader"] is not None
- else ""
- )
- data["uploader_url"] = (
- youtube_dl.info_dict["uploader_url"]
- if youtube_dl.info_dict["uploader_url"] is not None
- else ""
- )
- data["downloaded_bytes_str"] = ""
- data["total_bytes_str"] = ""
- data["percent"] = "0"
- data["eta"] = (
- youtube_dl.progress_hooks["eta"]
- if youtube_dl.progress_hooks["eta"] is not None
- else ""
- )
- data["speed_str"] = (
- LogicMain.human_readable_size(youtube_dl.progress_hooks["speed"], "/s")
- if youtube_dl.progress_hooks["speed"] is not None
- else ""
- )
- if youtube_dl.status == Status.READY: # 다운로드 전
- data["start_time"] = ""
- data["download_time"] = ""
- else:
- if youtube_dl.end_time is None: # 완료 전
- download_time = datetime.now() - youtube_dl.start_time
- else:
- download_time = youtube_dl.end_time - youtube_dl.start_time
- data["end_time"] = youtube_dl.end_time.strftime("%m-%d %H:%M:%S")
- if None not in (
- youtube_dl.progress_hooks["downloaded_bytes"],
- youtube_dl.progress_hooks["total_bytes"],
- ): # 둘 다 값이 있으면
- data["downloaded_bytes_str"] = LogicMain.human_readable_size(
- youtube_dl.progress_hooks["downloaded_bytes"]
- )
- data["total_bytes_str"] = LogicMain.human_readable_size(
- youtube_dl.progress_hooks["total_bytes"]
- )
- data[
- "percent"
- ] = f"{(float(youtube_dl.progress_hooks['downloaded_bytes']) / float(youtube_dl.progress_hooks['total_bytes']) * 100):.2f}"
- data["start_time"] = youtube_dl.start_time.strftime("%m-%d %H:%M:%S")
- data[
- "download_time"
- ] = f"{int(download_time.seconds / 60):02d}:{int(download_time.seconds % 60):02d}"
- return data
- except Exception as error:
- logger.error("Exception:%s", error)
- logger.error(traceback.format_exc())
- return None
-
- @staticmethod
- def get_info_dict(url, proxy):
- return MyYoutubeDL.get_info_dict(url, proxy)
-
- @staticmethod
- def human_readable_size(size, suffix=""):
- for unit in ("Bytes", "KB", "MB", "GB", "TB", "PB", "EB", "ZB"):
- if size < 1024.0:
- return f"{size:3.1f} {unit}{suffix}"
- size /= 1024.0
- return f"{size:.1f} YB{suffix}"
-
- @staticmethod
- def socketio_emit(cmd, data):
- socketio.emit(
- cmd, LogicMain.get_data(data), namespace=f"/{package_name}", broadcast=True
- )
+# LogicMain은 ModuleBasic으로 기능이 통합되어 더 이상 사용되지 않음.
+# 하위 호환성이나 프레임워크 요구사항이 있을 경우를 위해 파일 구조만 유지.
diff --git a/main_old.py b/main_old.py
new file mode 100644
index 0000000..61aa1bd
--- /dev/null
+++ b/main_old.py
@@ -0,0 +1,547 @@
+import os
+import sys
+import platform
+import traceback
+import subprocess
+import sqlite3
+from datetime import datetime
+
+from flask import render_template, jsonify
+
+from framework import db, path_app_root, path_data, socketio
+from framework.common.plugin import LogicModuleBase, default_route_socketio
+
+from .plugin import Plugin
+from .my_youtube_dl import MyYoutubeDL, Status
+
+logger = Plugin.logger
+package_name = Plugin.package_name
+ModelSetting = Plugin.ModelSetting
+
+
+class LogicMain(LogicModuleBase):
+ db_default = {
+ "db_version": "2",
+ "youtube_dl_package": "1",
+ "ffmpeg_path": ""
+ if platform.system() != "Windows"
+ else os.path.join(path_app_root, "bin", "Windows", "ffmpeg.exe"),
+ "temp_path": os.path.join(path_data, "download_tmp"),
+ "save_path": os.path.join(path_data, "download"),
+ "default_filename": "",
+ "proxy": "",
+ }
+
+ def __init__(self, plugin):
+ super(LogicMain, self).__init__(plugin, None)
+ self.name = package_name # 모듈명
+ default_route_socketio(plugin, self)
+
+ def plugin_load(self):
+ try:
+ # youtube-dl 업데이트
+ youtube_dl = Plugin.youtube_dl_packages[
+ int(ModelSetting.get("youtube_dl_package"))
+ ]
+ logger.debug(f"{youtube_dl} upgrade")
+ logger.debug(
+ subprocess.check_output(
+ [sys.executable, "-m", "pip", "install", "--upgrade", youtube_dl],
+ universal_newlines=True,
+ )
+ )
+ except Exception as error:
+ logger.error("Exception:%s", error)
+ logger.error(traceback.format_exc())
+
+ def process_menu(self, sub, req):
+ try:
+ arg = {
+ "package_name": package_name,
+ "sub": sub,
+ "template_name": f"{package_name}_{sub}",
+ "package_version": Plugin.plugin_info["version"],
+ }
+
+ if sub == "setting":
+ arg.update(ModelSetting.to_dict())
+ arg["package_list"] = Plugin.youtube_dl_packages
+ arg["youtube_dl_version"] = LogicMain.get_youtube_dl_version()
+ arg["DEFAULT_FILENAME"] = LogicMain.get_default_filename()
+
+ elif sub == "download":
+ default_filename = ModelSetting.get("default_filename")
+ arg["filename"] = (
+ default_filename
+ if default_filename
+ else LogicMain.get_default_filename()
+ )
+ arg["preset_list"] = LogicMain.get_preset_list()
+ arg["postprocessor_list"] = LogicMain.get_postprocessor_list()
+
+ elif sub == "thumbnail":
+ default_filename = ModelSetting.get("default_filename")
+ arg["filename"] = (
+ default_filename
+ if default_filename
+ else LogicMain.get_default_filename()
+ )
+
+ elif sub == "sub":
+ default_filename = ModelSetting.get("default_filename")
+ arg["filename"] = (
+ default_filename
+ if default_filename
+ else LogicMain.get_default_filename()
+ )
+
+ elif sub == "list":
+ pass
+
+ return render_template(f"{package_name}_{sub}.html", arg=arg)
+ except Exception as error:
+ logger.error("Exception:%s", error)
+ logger.error(traceback.format_exc())
+ return render_template("sample.html", title=f"{package_name} - {sub}")
+
+ def process_ajax(self, sub, req):
+ try:
+ logger.debug("AJAX: %s, %s", sub, req.values)
+ ret = {"ret": "success"}
+
+ if sub == "ffmpeg_version":
+ path = req.form["path"]
+ output = subprocess.check_output([path, "-version"])
+ output = output.decode().replace("\n", "
")
+ ret["data"] = output
+
+ elif sub == "download":
+ postprocessor = req.form["postprocessor"]
+ video_convertor, extract_audio = LogicMain.get_postprocessor()
+ preferedformat = None
+ preferredcodec = None
+ preferredquality = None
+ if postprocessor in video_convertor:
+ preferedformat = postprocessor
+ elif postprocessor in extract_audio:
+ preferredcodec = postprocessor
+ preferredquality = 192
+ youtube_dl = LogicMain.download(
+ plugin=package_name,
+ url=req.form["url"],
+ filename=req.form["filename"],
+ temp_path=ModelSetting.get("temp_path"),
+ save_path=ModelSetting.get("save_path"),
+ format=req.form["format"],
+ preferedformat=preferedformat,
+ preferredcodec=preferredcodec,
+ preferredquality=preferredquality,
+ proxy=ModelSetting.get("proxy"),
+ ffmpeg_path=ModelSetting.get("ffmpeg_path"),
+ )
+ youtube_dl.start()
+ LogicMain.socketio_emit("add", youtube_dl)
+ ret["ret"] = "info"
+ ret["msg"] = "분석중..."
+
+ elif sub == "thumbnail":
+ youtube_dl = LogicMain.thumbnail(
+ plugin=package_name,
+ url=req.form["url"],
+ filename=req.form["filename"],
+ temp_path=ModelSetting.get("temp_path"),
+ save_path=ModelSetting.get("save_path"),
+ all_thumbnails=req.form["all_thumbnails"],
+ proxy=ModelSetting.get("proxy"),
+ ffmpeg_path=ModelSetting.get("ffmpeg_path"),
+ )
+ youtube_dl.start()
+ LogicMain.socketio_emit("add", youtube_dl)
+ ret["ret"] = "info"
+ ret["msg"] = "분석중..."
+
+ elif sub == "sub":
+ youtube_dl = LogicMain.sub(
+ plugin=package_name,
+ url=req.form["url"],
+ filename=req.form["filename"],
+ temp_path=ModelSetting.get("temp_path"),
+ save_path=ModelSetting.get("save_path"),
+ all_subs=req.form["all_subs"],
+ sub_lang=req.form["sub_lang"],
+ auto_sub=req.form["auto_sub"],
+ proxy=ModelSetting.get("proxy"),
+ ffmpeg_path=ModelSetting.get("ffmpeg_path"),
+ )
+ youtube_dl.start()
+ LogicMain.socketio_emit("add", youtube_dl)
+ ret["ret"] = "info"
+ ret["msg"] = "분석중..."
+
+ elif sub == "list":
+ ret["data"] = []
+ for i in LogicMain.youtube_dl_list:
+ data = LogicMain.get_data(i)
+ if data is not None:
+ ret["data"].append(data)
+
+ elif sub == "all_stop":
+ for i in LogicMain.youtube_dl_list:
+ i.stop()
+
+ elif sub == "stop":
+ index = int(req.form["index"])
+ LogicMain.youtube_dl_list[index].stop()
+
+ return jsonify(ret)
+ except Exception as error:
+ logger.error("Exception:%s", error)
+ logger.error(traceback.format_exc())
+ return jsonify({"ret": "danger", "msg": str(error)})
+
+ def migration(self):
+ try:
+ db_version = ModelSetting.get_int("db_version")
+ connect = sqlite3.connect(
+ os.path.join(path_data, "db", f"{package_name}.db")
+ )
+
+ if db_version < 2:
+ logger.debug("youtube-dlc uninstall")
+ logger.debug(
+ subprocess.check_output(
+ [sys.executable, "-m", "pip", "uninstall", "-y", "youtube-dlc"],
+ universal_newlines=True,
+ )
+ )
+
+ connect.commit()
+ connect.close()
+ ModelSetting.set("db_version", LogicMain.db_default["db_version"])
+ db.session.flush()
+ except Exception as error:
+ logger.error("Exception:%s", error)
+ logger.error(traceback.format_exc())
+
+ youtube_dl_list = []
+
+ @staticmethod
+ def get_youtube_dl_version():
+ try:
+ return MyYoutubeDL.get_version()
+ except Exception as error:
+ logger.error("Exception:%s", error)
+ logger.error(traceback.format_exc())
+ return "패키지 임포트 실패"
+
+ @staticmethod
+ def get_default_filename():
+ return MyYoutubeDL.DEFAULT_FILENAME
+
+ @staticmethod
+ def get_preset_list():
+ return [
+ ["bestvideo+bestaudio/best", "최고 화질"],
+ ["bestvideo[height<=1080]+bestaudio/best[height<=1080]", "1080p"],
+ ["worstvideo+worstaudio/worst", "최저 화질"],
+ ["bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]", "최고 화질(mp4)"],
+ [
+ "bestvideo[ext=mp4][height<=1080]+bestaudio[ext=m4a]/best[ext=mp4][height<=1080]",
+ "1080p(mp4)",
+ ],
+ ["bestvideo[filesize<50M]+bestaudio/best[filesize<50M]", "50MB 미만"],
+ ["bestaudio/best", "오디오만"],
+ ["_custom", "사용자 정의"],
+ ]
+
+ @staticmethod
+ def get_postprocessor_list():
+ return [
+ ["", "후처리 안함", None],
+ ["mp4", "MP4", "비디오 변환"],
+ ["flv", "FLV", "비디오 변환"],
+ ["webm", "WebM", "비디오 변환"],
+ ["ogg", "Ogg", "비디오 변환"],
+ ["mkv", "MKV", "비디오 변환"],
+ ["ts", "TS", "비디오 변환"],
+ ["avi", "AVI", "비디오 변환"],
+ ["wmv", "WMV", "비디오 변환"],
+ ["mov", "MOV", "비디오 변환"],
+ ["gif", "GIF", "비디오 변환"],
+ ["mp3", "MP3", "오디오 추출"],
+ ["aac", "AAC", "오디오 추출"],
+ ["flac", "FLAC", "오디오 추출"],
+ ["m4a", "M4A", "오디오 추출"],
+ ["opus", "Opus", "오디오 추출"],
+ ["vorbis", "Vorbis", "오디오 추출"],
+ ["wav", "WAV", "오디오 추출"],
+ ]
+
+ @staticmethod
+ def get_postprocessor():
+ video_convertor = []
+ extract_audio = []
+ for i in LogicMain.get_postprocessor_list():
+ if i[2] == "비디오 변환":
+ video_convertor.append(i[0])
+ elif i[2] == "오디오 추출":
+ extract_audio.append(i[0])
+ return video_convertor, extract_audio
+
+ @staticmethod
+ def download(**kwagrs):
+ try:
+ logger.debug(kwagrs)
+ plugin = kwagrs["plugin"]
+ url = kwagrs["url"]
+ filename = kwagrs["filename"]
+ temp_path = kwagrs["temp_path"]
+ save_path = kwagrs["save_path"]
+ opts = {}
+ if "format" in kwagrs and kwagrs["format"]:
+ opts["format"] = kwagrs["format"]
+ postprocessor = []
+ if "preferedformat" in kwagrs and kwagrs["preferedformat"]:
+ postprocessor.append(
+ {
+ "key": "FFmpegVideoConvertor",
+ "preferedformat": kwagrs["preferedformat"],
+ }
+ )
+ if "preferredcodec" in kwagrs and kwagrs["preferredcodec"]:
+ postprocessor.append(
+ {
+ "key": "FFmpegExtractAudio",
+ "preferredcodec": kwagrs["preferredcodec"],
+ "preferredquality": str(kwagrs["preferredquality"]),
+ }
+ )
+ if postprocessor:
+ opts["postprocessors"] = postprocessor
+ if "playlist" in kwagrs and kwagrs["playlist"]:
+ if kwagrs["playlist"] == "reverse":
+ opts["playlistreverse"] = True
+ elif kwagrs["playlist"] == "random":
+ opts["playlistrandom"] = True
+ else:
+ opts["playlist_items"] = kwagrs["playlist"]
+ if "archive" in kwagrs and kwagrs["archive"]:
+ opts["download_archive"] = kwagrs["archive"]
+ if "proxy" in kwagrs and kwagrs["proxy"]:
+ opts["proxy"] = kwagrs["proxy"]
+ if "ffmpeg_path" in kwagrs and kwagrs["ffmpeg_path"]:
+ opts["ffmpeg_location"] = kwagrs["ffmpeg_path"]
+ if "cookiefile" in kwagrs and kwagrs["cookiefile"]:
+ opts["cookiefile"] = kwagrs["cookiefile"]
+ if "headers" in kwagrs and kwagrs["headers"]:
+ opts["http_headers"] = kwagrs["headers"]
+ dateafter = kwagrs.get("dateafter")
+ youtube_dl = MyYoutubeDL(
+ plugin, "video", url, filename, temp_path, save_path, opts, dateafter
+ )
+ youtube_dl.key = kwagrs.get("key")
+ LogicMain.youtube_dl_list.append(youtube_dl) # 리스트 추가
+ return youtube_dl
+ except Exception as error:
+ logger.error("Exception:%s", error)
+ logger.error(traceback.format_exc())
+ return None
+
+ @staticmethod
+ def thumbnail(**kwagrs):
+ try:
+ logger.debug(kwagrs)
+ plugin = kwagrs["plugin"]
+ url = kwagrs["url"]
+ filename = kwagrs["filename"]
+ temp_path = kwagrs["temp_path"]
+ save_path = kwagrs["save_path"]
+ opts = {"skip_download": True}
+ if (
+ "all_thumbnails" in kwagrs
+ and str(kwagrs["all_thumbnails"]).lower() != "false"
+ ):
+ opts["write_all_thumbnails"] = True
+ else:
+ opts["writethumbnail"] = True
+ if "playlist" in kwagrs and kwagrs["playlist"]:
+ if kwagrs["playlist"] == "reverse":
+ opts["playlistreverse"] = True
+ elif kwagrs["playlist"] == "random":
+ opts["playlistrandom"] = True
+ else:
+ opts["playlist_items"] = kwagrs["playlist"]
+ if "archive" in kwagrs and kwagrs["archive"]:
+ opts["download_archive"] = kwagrs["archive"]
+ if "proxy" in kwagrs and kwagrs["proxy"]:
+ opts["proxy"] = kwagrs["proxy"]
+ if "ffmpeg_path" in kwagrs and kwagrs["ffmpeg_path"]:
+ opts["ffmpeg_location"] = kwagrs["ffmpeg_path"]
+ if "cookiefile" in kwagrs and kwagrs["cookiefile"]:
+ opts["cookiefile"] = kwagrs["cookiefile"]
+ if "headers" in kwagrs and kwagrs["headers"]:
+ opts["http_headers"] = kwagrs["headers"]
+ dateafter = kwagrs.get("dateafter")
+ youtube_dl = MyYoutubeDL(
+ plugin,
+ "thumbnail",
+ url,
+ filename,
+ temp_path,
+ save_path,
+ opts,
+ dateafter,
+ )
+ youtube_dl.key = kwagrs.get("key")
+ LogicMain.youtube_dl_list.append(youtube_dl) # 리스트 추가
+ return youtube_dl
+ except Exception as error:
+ logger.error("Exception:%s", error)
+ logger.error(traceback.format_exc())
+ return None
+
+ @staticmethod
+ def sub(**kwagrs):
+ try:
+ logger.debug(kwagrs)
+ plugin = kwagrs["plugin"]
+ url = kwagrs["url"]
+ filename = kwagrs["filename"]
+ temp_path = kwagrs["temp_path"]
+ save_path = kwagrs["save_path"]
+ opts = {"skip_download": True}
+ sub_lang = map(
+ lambda x: x.strip(), kwagrs["sub_lang"].split(",")
+ ) # 문자열을 리스트로 변환
+ if "all_subs" in kwagrs and str(kwagrs["all_subs"]).lower() != "false":
+ opts["allsubtitles"] = True
+ else:
+ opts["subtitleslangs"] = sub_lang
+ if "auto_sub" in kwagrs and str(kwagrs["auto_sub"]).lower() != "false":
+ opts["writeautomaticsub"] = True
+ else:
+ opts["writesubtitles"] = True
+ if "playlist" in kwagrs and kwagrs["playlist"]:
+ if kwagrs["playlist"] == "reverse":
+ opts["playlistreverse"] = True
+ elif kwagrs["playlist"] == "random":
+ opts["playlistrandom"] = True
+ else:
+ opts["playlist_items"] = kwagrs["playlist"]
+ if "archive" in kwagrs and kwagrs["archive"]:
+ opts["download_archive"] = kwagrs["archive"]
+ if "proxy" in kwagrs and kwagrs["proxy"]:
+ opts["proxy"] = kwagrs["proxy"]
+ if "ffmpeg_path" in kwagrs and kwagrs["ffmpeg_path"]:
+ opts["ffmpeg_location"] = kwagrs["ffmpeg_path"]
+ if "cookiefile" in kwagrs and kwagrs["cookiefile"]:
+ opts["cookiefile"] = kwagrs["cookiefile"]
+ if "headers" in kwagrs and kwagrs["headers"]:
+ opts["http_headers"] = kwagrs["headers"]
+ dateafter = kwagrs.get("dateafter")
+ youtube_dl = MyYoutubeDL(
+ plugin, "subtitle", url, filename, temp_path, save_path, opts, dateafter
+ )
+ youtube_dl.key = kwagrs.get("key")
+ LogicMain.youtube_dl_list.append(youtube_dl) # 리스트 추가
+ return youtube_dl
+ except Exception as error:
+ logger.error("Exception:%s", error)
+ logger.error(traceback.format_exc())
+ return None
+
+ @staticmethod
+ def get_data(youtube_dl):
+ try:
+ data = {}
+ data["plugin"] = youtube_dl.plugin
+ data["url"] = youtube_dl.url
+ data["filename"] = youtube_dl.filename
+ data["temp_path"] = youtube_dl.temp_path
+ data["save_path"] = youtube_dl.save_path
+ data["index"] = youtube_dl.index
+ data["status_str"] = youtube_dl.status.name
+ data["status_ko"] = str(youtube_dl.status)
+ data["end_time"] = ""
+ data["extractor"] = youtube_dl.type + (
+ " - " + youtube_dl.info_dict["extractor"]
+ if youtube_dl.info_dict["extractor"] is not None
+ else ""
+ )
+ data["title"] = (
+ youtube_dl.info_dict["title"]
+ if youtube_dl.info_dict["title"] is not None
+ else youtube_dl.url
+ )
+ data["uploader"] = (
+ youtube_dl.info_dict["uploader"]
+ if youtube_dl.info_dict["uploader"] is not None
+ else ""
+ )
+ data["uploader_url"] = (
+ youtube_dl.info_dict["uploader_url"]
+ if youtube_dl.info_dict["uploader_url"] is not None
+ else ""
+ )
+ data["downloaded_bytes_str"] = ""
+ data["total_bytes_str"] = ""
+ data["percent"] = "0"
+ data["eta"] = (
+ youtube_dl.progress_hooks["eta"]
+ if youtube_dl.progress_hooks["eta"] is not None
+ else ""
+ )
+ data["speed_str"] = (
+ LogicMain.human_readable_size(youtube_dl.progress_hooks["speed"], "/s")
+ if youtube_dl.progress_hooks["speed"] is not None
+ else ""
+ )
+ if youtube_dl.status == Status.READY: # 다운로드 전
+ data["start_time"] = ""
+ data["download_time"] = ""
+ else:
+ if youtube_dl.end_time is None: # 완료 전
+ download_time = datetime.now() - youtube_dl.start_time
+ else:
+ download_time = youtube_dl.end_time - youtube_dl.start_time
+ data["end_time"] = youtube_dl.end_time.strftime("%m-%d %H:%M:%S")
+ if None not in (
+ youtube_dl.progress_hooks["downloaded_bytes"],
+ youtube_dl.progress_hooks["total_bytes"],
+ ): # 둘 다 값이 있으면
+ data["downloaded_bytes_str"] = LogicMain.human_readable_size(
+ youtube_dl.progress_hooks["downloaded_bytes"]
+ )
+ data["total_bytes_str"] = LogicMain.human_readable_size(
+ youtube_dl.progress_hooks["total_bytes"]
+ )
+ data[
+ "percent"
+ ] = f"{(float(youtube_dl.progress_hooks['downloaded_bytes']) / float(youtube_dl.progress_hooks['total_bytes']) * 100):.2f}"
+ data["start_time"] = youtube_dl.start_time.strftime("%m-%d %H:%M:%S")
+ data[
+ "download_time"
+ ] = f"{int(download_time.seconds / 60):02d}:{int(download_time.seconds % 60):02d}"
+ return data
+ except Exception as error:
+ logger.error("Exception:%s", error)
+ logger.error(traceback.format_exc())
+ return None
+
+ @staticmethod
+ def get_info_dict(url, proxy):
+ return MyYoutubeDL.get_info_dict(url, proxy)
+
+ @staticmethod
+ def human_readable_size(size, suffix=""):
+ for unit in ("Bytes", "KB", "MB", "GB", "TB", "PB", "EB", "ZB"):
+ if size < 1024.0:
+ return f"{size:3.1f} {unit}{suffix}"
+ size /= 1024.0
+ return f"{size:.1f} YB{suffix}"
+
+ @staticmethod
+ def socketio_emit(cmd, data):
+ socketio.emit(
+ cmd, LogicMain.get_data(data), namespace=f"/{package_name}", broadcast=True
+ )
diff --git a/mod_basic.py b/mod_basic.py
index 8569398..0fbacb2 100644
--- a/mod_basic.py
+++ b/mod_basic.py
@@ -1,29 +1,36 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/02/25 7:29 PM
-# @Author : yommi
-# @Site :
-# @File : mod_basic
-# @Software: PyCharm
-# @Path : youtube-dl/mod_basic.py
+from typing import Any, Dict, List, Optional, Tuple, Union
+
+from flask import Response, jsonify, render_template
+from loguru import logger
from support import SupportYaml
from tool import ToolUtil
-from .setup import *
-
-# from .main import LogicMain
+from .model import ModelYoutubeDlItem
from .my_youtube_dl import MyYoutubeDL, Status
+from .setup import *
import platform
import os
-from .model import ModelYoutubeDlItem
-from loguru import logger
+import subprocess
+import traceback
+import sys
+from datetime import datetime
+
+try:
+ from gommi_download_manager.mod_queue import ModuleQueue
+except ImportError:
+ ModuleQueue = None
class ModuleBasic(PluginModuleBase):
- def __init__(self, P):
+ """유튜브 다운로더의 기본 기능을 담당하는 모듈"""
+
+ youtube_dl_list: List[MyYoutubeDL] = []
+
+ def __init__(self, P: Any) -> None:
super(ModuleBasic, self).__init__(
P, name="basic", first_menu="setting", scheduler_desc="유튜브 다운로더"
)
- self.db_default = {
+ self.db_default: Dict[str, str] = {
"db_version": "2",
"youtube_dl_package": "1",
"ffmpeg_path": ""
@@ -34,32 +41,465 @@ class ModuleBasic(PluginModuleBase):
"default_filename": "",
"proxy": "",
}
- self.web_list_model = ModelYoutubeDlItem
+ self.web_list_model: Any = ModelYoutubeDlItem
- def process_menu(self, sub, req):
+ def process_menu(self, sub: str, req: Any) -> Union[Response, str]:
+ """메뉴별 템플릿 렌더링"""
logger.debug(f"sub: {sub}")
- arg = P.ModelSetting.to_dict()
+ arg: Dict[str, Any] = P.ModelSetting.to_dict()
+ arg["package_name"] = P.package_name
+ arg["package_version"] = P.plugin_info["version"]
+ arg["template_name"] = f"{P.package_name}_{sub}" # JS 파일명 하위 호환성
logger.debug(f"arg:: {arg}")
if sub == "setting":
arg["is_include"] = F.scheduler.is_include(self.get_scheduler_name())
arg["is_running"] = F.scheduler.is_running(self.get_scheduler_name())
+ arg["package_list"] = [[x, x] for x in P.youtube_dl_packages]
+ arg["youtube_dl_version"] = self.get_youtube_dl_version()
+ arg["DEFAULT_FILENAME"] = self.get_default_filename()
elif sub == "download":
- default_filename = P.ModelSetting.get("default_filename")
+ default_filename: Optional[str] = P.ModelSetting.get("default_filename")
arg["filename"] = (
default_filename
if default_filename
- else ModuleBasic.get_default_filename()
+ else self.get_default_filename()
)
- arg["preset_list"] = ModuleBasic.get_preset_list()
- arg["postprocessor_list"] = ModuleBasic.get_postprocessor_list()
+ arg["preset_list"] = self.get_preset_list()
+ arg["postprocessor_list"] = self.get_postprocessor_list()
return render_template(f"{P.package_name}_{self.name}_{sub}.html", arg=arg)
- def process_command(self, command, arg1, arg2, arg3, req):
- ret = {"ret": "success"}
+ def plugin_load(self) -> None:
+ """플러그인 로드 시 업그레이드 체크"""
+ try:
+ package_idx: str = P.ModelSetting.get("youtube_dl_package")
+ if not package_idx:
+ package_idx = "1"
+ youtube_dl: str = P.youtube_dl_packages[int(package_idx)]
+ logger.debug(f"{youtube_dl} 업그레이드 체크")
+ # 선택된 패키지(yt-dlp 등)를 최신 버전으로 업데이트
+ subprocess.check_output(
+ [sys.executable, "-m", "pip", "install", "--upgrade", youtube_dl],
+ universal_newlines=True,
+ )
+ except Exception as error:
+ logger.error(f"플러그인 로드 중 예외 발생: {error}")
+ logger.error(traceback.format_exc())
+
+ def get_youtube_dl_version(self) -> str:
+ """다운로드 라이브러리 버전 획득"""
+ try:
+ return MyYoutubeDL.get_version()
+ except Exception as error:
+ logger.error(f"버전 확인 중 예외 발생: {error}")
+ # logger.error(traceback.format_exc())
+ return "패키지 임포트 실패"
+
+ def process_command(self, command: str, arg1: Any, arg2: Any, arg3: Any, req: Any) -> Response:
+ """일반 커맨드 처리 (현재 미사용)"""
+ ret: Dict[str, str] = {"ret": "success"}
return jsonify(ret)
+ def process_ajax(self, sub: str, req: Any) -> Response:
+ """UI에서의 AJAX 요청 처리"""
+ try:
+ logger.debug(f"AJAX 요청: {sub}, {req.values}")
+ ret: Dict[str, Any] = {"ret": "success"}
+
+ if sub == "ffmpeg_version":
+ path: str = req.form["path"]
+ output: bytes = subprocess.check_output([path, "-version"])
+ ret["data"] = output.decode().replace("\n", "
")
+
+ elif sub == "download":
+ postprocessor: str = req.form["postprocessor"]
+ video_convertor: List[str]
+ extract_audio: List[str]
+ video_convertor, extract_audio = self.get_postprocessor()
+ preferedformat: Optional[str] = None
+ preferredcodec: Optional[str] = None
+ preferredquality: Optional[int] = None
+ if postprocessor in video_convertor:
+ preferedformat = postprocessor
+ elif postprocessor in extract_audio:
+ preferredcodec = postprocessor
+ preferredquality = 192
+ # GDM 연동 시도
+ if ModuleQueue:
+ gdm_options = {
+ 'proxy': P.ModelSetting.get("proxy"),
+ 'ffmpeg_path': P.ModelSetting.get("ffmpeg_path"),
+ }
+ if req.form.get("format"):
+ gdm_options['format'] = req.form["format"]
+ if preferredcodec:
+ gdm_options['extract_audio'] = True
+ gdm_options['audio_format'] = preferredcodec
+
+ if preferedformat:
+ gdm_options['merge_output_format'] = preferedformat
+
+ if postprocessor == "60fps":
+ # recode-video를 mkv로 강제하여 무조건 재인코딩(변환) 단계가 실행되도록 함.
+ # 그래야만 VideoConvertor 필터가 적용됨.
+ gdm_options['extra_args'] = gdm_options.get('extra_args', []) + [
+ '--recode-video', 'mkv',
+ '--postprocessor-args', 'VideoConvertor:-c:v libx264 -crf 23 -vf minterpolate=fps=60'
+ ]
+
+ # 상세 로그 확인을 위해 verbose 추가 가능 (디버깅용)
+ # gdm_options['extra_args'] = gdm_options.get('extra_args', []) + ['--verbose']
+
+ # GDM 큐에 추가
+ task = ModuleQueue.add_download(
+ url=req.form["url"],
+ save_path=ToolUtil.make_path(P.ModelSetting.get("save_path")),
+ filename=req.form["filename"],
+ source_type='youtube',
+ caller_plugin=P.package_name,
+ **gdm_options
+ )
+ if task:
+ ret["ret"] = "success"
+ ret["msg"] = "GDM 대기열에 추가되었습니다."
+ return jsonify(ret)
+
+ # GDM 미설치 시 기존 방식 (직접 다운로드)
+ youtube_dl: Optional[MyYoutubeDL] = self.download(
+ plugin=P.package_name,
+ url=req.form["url"],
+ filename=req.form["filename"],
+ temp_path=ToolUtil.make_path(P.ModelSetting.get("temp_path")),
+ save_path=ToolUtil.make_path(P.ModelSetting.get("save_path")),
+ format=req.form["format"],
+ preferedformat=preferedformat,
+ preferredcodec=preferredcodec,
+ preferredquality=preferredquality,
+ proxy=P.ModelSetting.get("proxy"),
+ ffmpeg_path=P.ModelSetting.get("ffmpeg_path"),
+ )
+ if youtube_dl:
+ youtube_dl.start()
+ self.socketio_emit("add", youtube_dl)
+ ret["ret"] = "info"
+ ret["msg"] = "분석중..."
+ else:
+ ret["ret"] = "danger"
+ ret["msg"] = "다운로드 실패"
+
+ elif sub == "list":
+ ret["data"] = []
+ for i in self.youtube_dl_list:
+ data: Optional[Dict[str, Any]] = self.get_data(i)
+ if data is not None:
+ ret["data"].append(data)
+
+ elif sub == "all_stop":
+ for i in self.youtube_dl_list:
+ i.stop()
+
+ elif sub == "stop":
+ index: int = int(req.form["index"])
+ self.youtube_dl_list[index].stop()
+
+ elif sub == "thumbnail":
+ youtube_dl = self.download(
+ plugin=P.package_name,
+ url=req.form["url"],
+ filename=req.form["filename"],
+ temp_path=ToolUtil.make_path(P.ModelSetting.get("temp_path")),
+ save_path=ToolUtil.make_path(P.ModelSetting.get("save_path")),
+ format="best",
+ proxy=P.ModelSetting.get("proxy"),
+ ffmpeg_path=P.ModelSetting.get("ffmpeg_path"),
+ type="thumbnail",
+ )
+ if youtube_dl:
+ youtube_dl.start()
+ self.socketio_emit("add", youtube_dl)
+ ret["ret"] = "info"
+ ret["msg"] = "분석중..."
+ else:
+ ret["ret"] = "danger"
+ ret["msg"] = "다운로드 실패"
+
+ elif sub == "sub":
+ opts: Dict[str, Any] = {}
+ if req.form.get("all_subs") == "true":
+ opts["allsubtitles"] = True
+ else:
+ opts["writesubtitles"] = True
+ youtube_dl = self.download(
+ plugin=P.package_name,
+ url=req.form["url"],
+ filename=req.form["filename"],
+ temp_path=ToolUtil.make_path(P.ModelSetting.get("temp_path")),
+ save_path=ToolUtil.make_path(P.ModelSetting.get("save_path")),
+ format="best",
+ proxy=P.ModelSetting.get("proxy"),
+ ffmpeg_path=P.ModelSetting.get("ffmpeg_path"),
+ type="sub",
+ opts=opts,
+ )
+ if youtube_dl:
+ youtube_dl.start()
+ self.socketio_emit("add", youtube_dl)
+ ret["ret"] = "info"
+ ret["msg"] = "분석중..."
+ else:
+ ret["ret"] = "danger"
+ ret["msg"] = "다운로드 실패"
+
+ elif sub == "preview":
+ url: str = req.form["url"]
+ data: Optional[str] = MyYoutubeDL.get_preview_url(url)
+ if data:
+ ret["data"] = data
+ else:
+ ret["ret"] = "warning"
+ ret["msg"] = "미리보기 URL을 가져올 수 없습니다."
+
+ return jsonify(ret)
+ except Exception as error:
+ logger.error(f"AJAX 처리 중 예외 발생: {error}")
+ logger.error(traceback.format_exc())
+ return jsonify({"ret": "danger", "msg": str(error)})
+
+ def process_api(self, sub: str, req: Any) -> Response:
+ """외부 모듈(Youtube 플러그인 등)에서의 API 요청 처리"""
+ try:
+ if sub == "info_dict":
+ url: str = req.values.get("url")
+ proxy: Optional[str] = req.values.get("proxy")
+ data: Optional[Dict[str, Any]] = MyYoutubeDL.get_info_dict(url, proxy)
+ return jsonify(data)
+ elif sub == "download":
+ # Gommi Download Manager 연동
+ if ModuleQueue:
+ url = req.values.get("url")
+ save_path = ToolUtil.make_path(req.values.get("save_path") or P.ModelSetting.get("save_path"))
+ filename = req.values.get("filename")
+
+ task = ModuleQueue.add_download(
+ url=url,
+ save_path=save_path,
+ filename=filename,
+ source_type='youtube',
+ caller_plugin='youtube-dl'
+ )
+
+ if task:
+ return jsonify({'ret': 'success', 'msg': '큐에 추가됨', 'task_id': task.id})
+ else:
+ return jsonify({'ret': 'fail'})
+
+ youtube_dl: Optional[MyYoutubeDL] = self.download(
+ plugin=req.values.get("plugin"),
+ url=req.values.get("url"),
+ filename=req.values.get("filename"),
+ temp_path=ToolUtil.make_path(P.ModelSetting.get("temp_path")),
+ save_path=ToolUtil.make_path(req.values.get("save_path") or P.ModelSetting.get("save_path")),
+ format=req.values.get("format"),
+ preferedformat=req.values.get("preferedformat"),
+ preferredcodec=req.values.get("preferredcodec"),
+ preferredquality=req.values.get("preferredquality"),
+ proxy=req.values.get("proxy") or P.ModelSetting.get("proxy"),
+ ffmpeg_path=req.values.get("ffmpeg_path") or P.ModelSetting.get("ffmpeg_path"),
+ key=req.values.get("key"),
+ )
+ if youtube_dl:
+ if req.values.get("start") == "True" or req.values.get("start") is None:
+ youtube_dl.start()
+ return jsonify(self.get_data(youtube_dl))
+ else:
+ return jsonify({"ret": "error"})
+ elif sub == "status":
+ index: int = int(req.values.get("index"))
+ return jsonify(self.get_data(self.youtube_dl_list[index]))
+ elif sub == "stop":
+ index: int = int(req.values.get("index"))
+ self.youtube_dl_list[index].stop()
+ return jsonify({"ret": "success"})
+ except Exception as error:
+ logger.error(f"API 처리 중 예외 발생: {error}")
+ logger.error(traceback.format_exc())
+ return jsonify({"ret": "error", "msg": str(error)})
+
+ def download(self, **kwargs: Any) -> Optional[MyYoutubeDL]:
+ """다운로드 객체 생성 및 리스트 관리"""
+ try:
+ logger.debug(kwargs)
+ plugin: str = kwargs["plugin"]
+ url: str = kwargs["url"]
+ filename: str = kwargs["filename"]
+ temp_path: str = kwargs["temp_path"]
+ save_path: str = kwargs["save_path"]
+ type_name: str = kwargs.get("type", "video")
+ opts: Dict[str, Any] = kwargs.get("opts", {})
+ if "format" in kwargs and kwargs["format"]:
+ opts["format"] = kwargs["format"]
+
+ if type_name == "thumbnail":
+ opts["writethumbnail"] = True
+ opts["skip_download"] = True
+ elif type_name == "sub":
+ opts["skip_download"] = True
+
+ postprocessor: List[Dict[str, Any]] = opts.get("postprocessors", [])
+ if "preferedformat" in kwargs and kwargs["preferedformat"]:
+ postprocessor.append(
+ {
+ "key": "FFmpegVideoConvertor",
+ "preferedformat": kwargs["preferedformat"],
+ }
+ )
+ if "preferredcodec" in kwargs and kwargs["preferredcodec"]:
+ postprocessor.append(
+ {
+ "key": "FFmpegExtractAudio",
+ "preferredcodec": kwargs["preferredcodec"],
+ "preferredquality": str(kwargs.get("preferredquality", 192)),
+ }
+ )
+ if postprocessor:
+ opts["postprocessors"] = postprocessor
+ if "playlist" in kwargs and kwargs["playlist"]:
+ if kwargs["playlist"] == "reverse":
+ opts["playlistreverse"] = True
+ elif kwargs["playlist"] == "random":
+ opts["playlistrandom"] = True
+ else:
+ opts["playlist_items"] = kwargs["playlist"]
+ if "archive" in kwargs and kwargs["archive"]:
+ opts["download_archive"] = kwargs["archive"]
+ if "proxy" in kwargs and kwargs["proxy"]:
+ opts["proxy"] = kwargs["proxy"]
+ if "ffmpeg_path" in kwargs and kwargs["ffmpeg_path"]:
+ opts["ffmpeg_location"] = kwargs["ffmpeg_path"]
+ if "cookiefile" in kwargs and kwargs["cookiefile"]:
+ opts["cookiefile"] = kwargs["cookiefile"]
+ if "headers" in kwargs and kwargs["headers"]:
+ opts["http_headers"] = kwargs["headers"]
+ dateafter: Optional[str] = kwargs.get("dateafter")
+ youtube_dl: MyYoutubeDL = MyYoutubeDL(
+ plugin, type_name, url, filename, temp_path, save_path, opts, dateafter
+ )
+ youtube_dl.key = kwargs.get("key")
+ self.youtube_dl_list.append(youtube_dl)
+ return youtube_dl
+ except Exception as error:
+ logger.error(f"다운로드 객체 생성 중 예외 발생: {error}")
+ logger.error(traceback.format_exc())
+ return None
+
+ def get_data(self, youtube_dl: MyYoutubeDL) -> Optional[Dict[str, Any]]:
+ """다운로드 객체의 현재 상태 데이터를 딕셔너리로 변환"""
+ try:
+ data: Dict[str, Any] = {}
+ data["plugin"] = youtube_dl.plugin
+ data["url"] = youtube_dl.url
+ data["filename"] = youtube_dl.filename
+ data["temp_path"] = youtube_dl.temp_path
+ data["save_path"] = youtube_dl.save_path
+ data["index"] = youtube_dl.index
+ data["status_str"] = youtube_dl.status.name
+ data["status_ko"] = str(youtube_dl.status)
+ data["end_time"] = ""
+ data["extractor"] = youtube_dl.type + (
+ " - " + youtube_dl.info_dict["extractor"]
+ if youtube_dl.info_dict["extractor"] is not None
+ else ""
+ )
+ data["title"] = (
+ youtube_dl.info_dict["title"]
+ if youtube_dl.info_dict["title"] is not None
+ else youtube_dl.url
+ )
+ data["uploader"] = (
+ youtube_dl.info_dict["uploader"]
+ if youtube_dl.info_dict["uploader"] is not None
+ else ""
+ )
+ data["uploader_url"] = (
+ youtube_dl.info_dict["uploader_url"]
+ if youtube_dl.info_dict["uploader_url"] is not None
+ else ""
+ )
+ data["downloaded_bytes_str"] = ""
+ data["total_bytes_str"] = ""
+ data["percent"] = "0"
+ data["eta"] = (
+ str(youtube_dl.progress_hooks["eta"])
+ if youtube_dl.progress_hooks["eta"] is not None
+ else ""
+ )
+ data["speed_str"] = (
+ self.human_readable_size(youtube_dl.progress_hooks["speed"], "/s")
+ if youtube_dl.progress_hooks["speed"] is not None
+ else ""
+ )
+ if youtube_dl.status == Status.READY:
+ data["start_time"] = ""
+ data["download_time"] = ""
+ else:
+ if youtube_dl.end_time is None:
+ download_time: Any = datetime.now() - youtube_dl.start_time
+ else:
+ download_time = youtube_dl.end_time - youtube_dl.start_time
+ data["end_time"] = youtube_dl.end_time.strftime("%m-%d %H:%M:%S")
+ if None not in (
+ youtube_dl.progress_hooks["downloaded_bytes"],
+ youtube_dl.progress_hooks["total_bytes"],
+ ):
+ data["downloaded_bytes_str"] = self.human_readable_size(
+ youtube_dl.progress_hooks["downloaded_bytes"]
+ )
+ data["total_bytes_str"] = self.human_readable_size(
+ youtube_dl.progress_hooks["total_bytes"]
+ )
+ data[
+ "percent"
+ ] = f"{(float(youtube_dl.progress_hooks['downloaded_bytes']) / float(youtube_dl.progress_hooks['total_bytes']) * 100):.2f}"
+ data["start_time"] = youtube_dl.start_time.strftime("%m-%d %H:%M:%S")
+ data[
+ "download_time"
+ ] = f"{int(download_time.seconds / 60):02d}:{int(download_time.seconds % 60):02d}"
+ return data
+ except Exception as error:
+ logger.error(f"상태 데이터 변환 중 예외 발생: {error}")
+ logger.error(traceback.format_exc())
+ return None
+
+ def human_readable_size(self, size: Union[int, float, None], suffix: str = "") -> str:
+ """바이트 단위를 사람이 읽기 쉬운 형식으로 변환"""
+ if size is None:
+ return ""
+ for unit in ("Bytes", "KB", "MB", "GB", "TB", "PB", "EB", "ZB"):
+ if size < 1024.0:
+ return f"{size:3.1f} {unit}{suffix}"
+ size /= 1024.0
+ return f"{size:.1f} YB{suffix}"
+
+ def socketio_emit(self, cmd: str, data: MyYoutubeDL) -> None:
+ """Socket.IO 메시지 전송"""
+ F.socketio.emit(
+ cmd,
+ self.get_data(data),
+ namespace=f"/{P.package_name}",
+ )
+
+ def get_postprocessor(self) -> Tuple[List[str], List[str]]:
+ """후처리 리스트 분류 (비디오/오디오)"""
+ video_convertor: List[str] = []
+ extract_audio: List[str] = []
+ for i in self.get_postprocessor_list():
+ if i[2] == "비디오 변환":
+ video_convertor.append(i[0])
+ elif i[2] == "오디오 추출":
+ extract_audio.append(i[0])
+ return video_convertor, extract_audio
+
# def plugin_load(self):
# if (
# os.path.exists(
@@ -74,12 +514,11 @@ class ModuleBasic(PluginModuleBase):
# ToolUtil.make_path(P.ModelSetting.get(f"{self.name}_path_config")),
# )
- @staticmethod
- def get_default_filename():
+ def get_default_filename(self) -> str:
return MyYoutubeDL.DEFAULT_FILENAME
@staticmethod
- def get_preset_list():
+ def get_preset_list() -> List[List[str]]:
return [
["bestvideo+bestaudio/best", "최고 화질"],
["bestvideo[height<=1080]+bestaudio/best[height<=1080]", "1080p"],
@@ -95,7 +534,7 @@ class ModuleBasic(PluginModuleBase):
]
@staticmethod
- def get_postprocessor_list():
+ def get_postprocessor_list() -> List[List[Union[str, None]]]:
return [
["", "후처리 안함", None],
["mp4", "MP4", "비디오 변환"],
@@ -115,4 +554,5 @@ class ModuleBasic(PluginModuleBase):
["opus", "Opus", "오디오 추출"],
["vorbis", "Vorbis", "오디오 추출"],
["wav", "WAV", "오디오 추출"],
+ # ["60fps", "60fps 보간 (느림)", "고급 변환"],
]
diff --git a/my_youtube_dl.old.py b/my_youtube_dl.old.py
new file mode 100644
index 0000000..01e8807
--- /dev/null
+++ b/my_youtube_dl.old.py
@@ -0,0 +1,247 @@
+from __future__ import unicode_literals
+
+import os
+import traceback
+import tempfile
+from glob import glob
+from datetime import datetime
+from threading import Thread
+from enum import Enum
+
+import framework.common.celery as celery_shutil
+
+from .plugin import Plugin
+
+logger = Plugin.logger
+ModelSetting = Plugin.ModelSetting
+
+youtube_dl_package = Plugin.youtube_dl_packages[
+ int(ModelSetting.get("youtube_dl_package"))
+ if ModelSetting.get("youtube_dl_package")
+ else 1 # LogicMain.db_default["youtube_dl_package"]
+].replace("-", "_")
+
+
+class Status(Enum):
+ READY = 0
+ START = 1
+ DOWNLOADING = 2
+ ERROR = 3
+ FINISHED = 4
+ STOP = 5
+ COMPLETED = 6
+
+ def __str__(self):
+ str_list = ["준비", "분석중", "다운로드중", "실패", "변환중", "중지", "완료"]
+ return str_list[self.value]
+
+
+class MyYoutubeDL:
+ DEFAULT_FILENAME = "%(title)s-%(id)s.%(ext)s"
+
+ _index = 0
+
+ def __init__(
+ self,
+ plugin,
+ type_name,
+ url,
+ filename,
+ temp_path,
+ save_path=None,
+ opts=None,
+ dateafter=None,
+ datebefore=None,
+ ):
+ # from youtube_dl.utils import DateRange
+ DateRange = __import__(
+ f"{youtube_dl_package}.utils", fromlist=["DateRange"]
+ ).DateRange
+
+ if save_path is None:
+ save_path = temp_path
+ if opts is None:
+ opts = {}
+ self.plugin = plugin
+ self.type = type_name
+ self.url = url
+ self.filename = filename
+ if not os.path.isdir(temp_path):
+ os.makedirs(temp_path)
+ self.temp_path = tempfile.mkdtemp(prefix="youtube-dl_", dir=temp_path)
+ if not os.path.isdir(save_path):
+ os.makedirs(save_path)
+ self.save_path = save_path
+ self.opts = opts
+ if dateafter or datebefore:
+ self.opts["daterange"] = DateRange(start=dateafter, end=datebefore)
+ self.index = MyYoutubeDL._index
+ MyYoutubeDL._index += 1
+ self._status = Status.READY
+ self._thread = None
+ self.key = None
+ self.start_time = None # 시작 시간
+ self.end_time = None # 종료 시간
+ # info_dict에서 얻는 정보
+ self.info_dict = {
+ "extractor": None, # 타입
+ "title": None, # 제목
+ "uploader": None, # 업로더
+ "uploader_url": None, # 업로더 주소
+ }
+ # info_dict에서 얻는 정보(entries)
+ # self.info_dict['playlist_index'] = None
+ # self.info_dict['duration'] = None # 길이
+ # self.info_dict['format'] = None # 포맷
+ # self.info_dict['thumbnail'] = None # 썸네일
+ # progress_hooks에서 얻는 정보
+ self.progress_hooks = {
+ "downloaded_bytes": None, # 다운로드한 크기
+ "total_bytes": None, # 전체 크기
+ "eta": None, # 예상 시간(s)
+ "speed": None, # 다운로드 속도(bytes/s)
+ }
+
+ def start(self):
+ if self.status != Status.READY:
+ return False
+ self._thread = Thread(target=self.run)
+ self._thread.start()
+ return True
+
+ def run(self):
+ # import youtube_dl
+ youtube_dl = __import__(youtube_dl_package)
+
+ try:
+ self.start_time = datetime.now()
+ self.status = Status.START
+ # 동영상 정보 가져오기
+ info_dict = MyYoutubeDL.get_info_dict(
+ self.url,
+ self.opts.get("proxy"),
+ self.opts.get("cookiefile"),
+ self.opts.get("http_headers"),
+ )
+ if info_dict is None:
+ self.status = Status.ERROR
+ return
+ self.info_dict["extractor"] = info_dict["extractor"]
+ self.info_dict["title"] = info_dict.get("title", info_dict["id"])
+ self.info_dict["uploader"] = info_dict.get("uploader", "")
+ self.info_dict["uploader_url"] = info_dict.get("uploader_url", "")
+ ydl_opts = {
+ "logger": MyLogger(),
+ "progress_hooks": [self.my_hook],
+ # 'match_filter': self.match_filter_func,
+ "outtmpl": os.path.join(self.temp_path, self.filename),
+ "ignoreerrors": True,
+ "cachedir": False,
+ }
+ ydl_opts.update(self.opts)
+ with youtube_dl.YoutubeDL(ydl_opts) as ydl:
+ logger.debug(self.url)
+ error_code = ydl.download([self.url])
+ logger.debug(error_code)
+ if self.status in (Status.START, Status.FINISHED): # 다운로드 성공
+ for i in glob(self.temp_path + "/**/*", recursive=True):
+ path = i.replace(self.temp_path, self.save_path, 1)
+ if os.path.isdir(i):
+ if not os.path.isdir(path):
+ os.mkdir(path)
+ continue
+ celery_shutil.move(i, path)
+ self.status = Status.COMPLETED
+ except Exception as error:
+ self.status = Status.ERROR
+ logger.error("Exception:%s", error)
+ logger.error(traceback.format_exc())
+ finally:
+ # 임시폴더 삭제
+ celery_shutil.rmtree(self.temp_path)
+ if self.status != Status.STOP:
+ self.end_time = datetime.now()
+
+ def stop(self):
+ if self.status in (Status.ERROR, Status.STOP, Status.COMPLETED):
+ return False
+ self.status = Status.STOP
+ self.end_time = datetime.now()
+ return True
+
+ @staticmethod
+ def get_version():
+ # from youtube_dl.version import __version__
+ __version__ = __import__(
+ f"{youtube_dl_package}.version", fromlist=["__version__"]
+ ).__version__
+
+ return __version__
+
+ @staticmethod
+ def get_info_dict(url, proxy=None, cookiefile=None, http_headers=None):
+ # import youtube_dl
+ youtube_dl = __import__(youtube_dl_package)
+
+ try:
+ ydl_opts = {"extract_flat": "in_playlist", "logger": MyLogger()}
+ if proxy:
+ ydl_opts["proxy"] = proxy
+ if cookiefile:
+ ydl_opts["cookiefile"] = cookiefile
+ if http_headers:
+ ydl_opts["http_headers"] = http_headers
+ with youtube_dl.YoutubeDL(ydl_opts) as ydl:
+ # info = ydl.extract_info(url, download=False)
+ info = ydl.extract_info(url, download=False)
+ except Exception as error:
+ logger.error("Exception:%s", error)
+ logger.error(traceback.format_exc())
+ return None
+ return ydl.sanitize_info(info)
+
+ def my_hook(self, data):
+ if self.status != Status.STOP:
+ self.status = {
+ "downloading": Status.DOWNLOADING,
+ "error": Status.ERROR,
+ "finished": Status.FINISHED, # 다운로드 완료. 변환 시작
+ }[data["status"]]
+ if data["status"] != "error":
+ self.filename = os.path.basename(data.get("filename"))
+ self.progress_hooks["downloaded_bytes"] = data.get("downloaded_bytes")
+ self.progress_hooks["total_bytes"] = data.get("total_bytes")
+ self.progress_hooks["eta"] = data.get("eta")
+ self.progress_hooks["speed"] = data.get("speed")
+
+ def match_filter_func(self, info_dict):
+ self.info_dict["playlist_index"] = info_dict["playlist_index"]
+ self.info_dict["duration"] = info_dict["duration"]
+ self.info_dict["format"] = info_dict["format"]
+ self.info_dict["thumbnail"] = info_dict["thumbnail"]
+ return None
+
+ @property
+ def status(self):
+ return self._status
+
+ @status.setter
+ def status(self, value):
+ from .main import LogicMain
+
+ self._status = value
+ LogicMain.socketio_emit("status", self)
+
+
+class MyLogger:
+ def debug(self, msg):
+ if msg.find(" ETA ") != -1:
+ # 과도한 로그 방지
+ return
+ logger.debug(msg)
+
+ def warning(self, msg):
+ logger.warning(msg)
+
+ def error(self, msg):
+ logger.error(msg)
diff --git a/my_youtube_dl.py b/my_youtube_dl.py
index 2544422..17d221f 100644
--- a/my_youtube_dl.py
+++ b/my_youtube_dl.py
@@ -7,21 +7,19 @@ from glob import glob
from datetime import datetime
from threading import Thread
from enum import Enum
+from typing import Optional, Dict, List, Any, Tuple, Union
-from framework import celery as celery_shutil
-
-# from .plugin import Plugin
-
+import shutil as celery_shutil
from .setup import P
logger = P.logger
ModelSetting = P.ModelSetting
-youtube_dl_package = P.youtube_dl_packages[
- int(ModelSetting.get("youtube_dl_package"))
- if ModelSetting.get("youtube_dl_package")
- else 1 # LogicMain.db_default["youtube_dl_package"]
-].replace("-", "_")
+# yt-dlp 패키지 설정 (기본값 index 1)
+package_idx: str = ModelSetting.get("youtube_dl_package")
+if not package_idx:
+ package_idx = "1"
+youtube_dl_package: str = P.youtube_dl_packages[int(package_idx)].replace("-", "_")
class Status(Enum):
@@ -33,29 +31,29 @@ class Status(Enum):
STOP = 5
COMPLETED = 6
- def __str__(self):
- str_list = ["준비", "분석중", "다운로드중", "실패", "변환중", "중지", "완료"]
+ def __str__(self) -> str:
+ str_list: List[str] = ["준비", "분석중", "다운로드중", "실패", "변환중", "중지", "완료"]
return str_list[self.value]
class MyYoutubeDL:
- DEFAULT_FILENAME = "%(title)s-%(id)s.%(ext)s"
+ DEFAULT_FILENAME: str = "%(title)s-%(id)s.%(ext)s"
- _index = 0
+ _index: int = 0
def __init__(
self,
- plugin,
- type_name,
- url,
- filename,
- temp_path,
- save_path=None,
- opts=None,
- dateafter=None,
- datebefore=None,
+ plugin: str,
+ type_name: str,
+ url: str,
+ filename: str,
+ temp_path: str,
+ save_path: Optional[str] = None,
+ opts: Optional[Dict[str, Any]] = None,
+ dateafter: Optional[str] = None,
+ datebefore: Optional[str] = None,
):
- # from youtube_dl.utils import DateRange
+ # yt-dlp/youtube-dl의 utils 모듈에서 DateRange 임포트
DateRange = __import__(
f"{youtube_dl_package}.utils", fromlist=["DateRange"]
).DateRange
@@ -64,90 +62,112 @@ class MyYoutubeDL:
save_path = temp_path
if opts is None:
opts = {}
- self.plugin = plugin
- self.type = type_name
- self.url = url
- self.filename = filename
+
+ self.plugin: str = plugin
+ self.type: str = type_name
+ self.url: str = url
+ self.filename: str = filename
+
+ # 임시 폴더 생성
if not os.path.isdir(temp_path):
os.makedirs(temp_path)
- self.temp_path = tempfile.mkdtemp(prefix="youtube-dl_", dir=temp_path)
+ self.temp_path: str = tempfile.mkdtemp(prefix="youtube-dl_", dir=temp_path)
+
+ # 저장 폴더 생성
if not os.path.isdir(save_path):
os.makedirs(save_path)
- self.save_path = save_path
- self.opts = opts
+ self.save_path: str = save_path
+
+ self.opts: Dict[str, Any] = opts
if dateafter or datebefore:
self.opts["daterange"] = DateRange(start=dateafter, end=datebefore)
- self.index = MyYoutubeDL._index
+
+ self.index: int = MyYoutubeDL._index
MyYoutubeDL._index += 1
- self._status = Status.READY
- self._thread = None
- self.key = None
- self.start_time = None # 시작 시간
- self.end_time = None # 종료 시간
- # info_dict에서 얻는 정보
- self.info_dict = {
- "extractor": None, # 타입
- "title": None, # 제목
- "uploader": None, # 업로더
- "uploader_url": None, # 업로더 주소
+
+ self._status: Status = Status.READY
+ self._thread: Optional[Thread] = None
+ self.key: Optional[str] = None
+ self.start_time: Optional[datetime] = None
+ self.end_time: Optional[datetime] = None
+
+ # 비디오 정보
+ self.info_dict: Dict[str, Optional[str]] = {
+ "extractor": None,
+ "title": None,
+ "uploader": None,
+ "uploader_url": None,
}
- # info_dict에서 얻는 정보(entries)
- # self.info_dict['playlist_index'] = None
- # self.info_dict['duration'] = None # 길이
- # self.info_dict['format'] = None # 포맷
- # self.info_dict['thumbnail'] = None # 썸네일
- # progress_hooks에서 얻는 정보
- self.progress_hooks = {
- "downloaded_bytes": None, # 다운로드한 크기
- "total_bytes": None, # 전체 크기
- "eta": None, # 예상 시간(s)
- "speed": None, # 다운로드 속도(bytes/s)
+
+ # 진행률 정보
+ self.progress_hooks: Dict[str, Optional[Union[int, float, str]]] = {
+ "downloaded_bytes": None,
+ "total_bytes": None,
+ "eta": None,
+ "speed": None,
}
- def start(self):
+ def start(self) -> bool:
+ """다운로드 스레드 시작"""
if self.status != Status.READY:
return False
self._thread = Thread(target=self.run)
self._thread.start()
return True
- def run(self):
- # import youtube_dl
+ def run(self) -> None:
+ """다운로드 실행 본체"""
youtube_dl = __import__(youtube_dl_package)
try:
self.start_time = datetime.now()
self.status = Status.START
- # 동영상 정보 가져오기
+
+ # 정보 추출
info_dict = MyYoutubeDL.get_info_dict(
self.url,
self.opts.get("proxy"),
self.opts.get("cookiefile"),
self.opts.get("http_headers"),
+ self.opts.get("cookiesfrombrowser"),
)
+
if info_dict is None:
self.status = Status.ERROR
return
- self.info_dict["extractor"] = info_dict["extractor"]
- self.info_dict["title"] = info_dict.get("title", info_dict["id"])
+
+ self.info_dict["extractor"] = info_dict.get("extractor", "unknown")
+ self.info_dict["title"] = info_dict.get("title", info_dict.get("id", "unknown"))
self.info_dict["uploader"] = info_dict.get("uploader", "")
self.info_dict["uploader_url"] = info_dict.get("uploader_url", "")
- ydl_opts = {
+
+ ydl_opts: Dict[str, Any] = {
"logger": MyLogger(),
"progress_hooks": [self.my_hook],
- # 'match_filter': self.match_filter_func,
"outtmpl": os.path.join(self.temp_path, self.filename),
"ignoreerrors": True,
"cachedir": False,
+ "nocheckcertificate": True,
}
+
+ # yt-dlp 전용 성능 향상 옵션
+ if youtube_dl_package == "yt_dlp":
+ ydl_opts.update({
+ "concurrent_fragment_downloads": 5,
+ "retries": 10,
+ })
+
ydl_opts.update(self.opts)
+
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
- logger.debug(self.url)
- error_code = ydl.download([self.url])
- logger.debug(error_code)
- if self.status in (Status.START, Status.FINISHED): # 다운로드 성공
+ logger.debug(f"다운로드 시작: {self.url}")
+ error_code: int = ydl.download([self.url])
+ logger.debug(f"다운로드 종료 (코드: {error_code})")
+
+ if self.status in (Status.START, Status.FINISHED, Status.DOWNLOADING):
+ # 임시 폴더의 파일을 실제 저장 경로로 이동
for i in glob(self.temp_path + "/**/*", recursive=True):
- path = i.replace(self.temp_path, self.save_path, 1)
+ path: str = i.replace(self.temp_path, self.save_path, 1)
if os.path.isdir(i):
if not os.path.isdir(path):
os.mkdir(path)
@@ -156,15 +176,16 @@ class MyYoutubeDL:
self.status = Status.COMPLETED
except Exception as error:
self.status = Status.ERROR
- logger.error("Exception:%s", error)
+ logger.error(f"실행 중 예외 발생: {error}")
logger.error(traceback.format_exc())
finally:
- # 임시폴더 삭제
- celery_shutil.rmtree(self.temp_path)
+ if os.path.exists(self.temp_path):
+ celery_shutil.rmtree(self.temp_path)
if self.status != Status.STOP:
self.end_time = datetime.now()
- def stop(self):
+ def stop(self) -> bool:
+ """다운로드 중지"""
if self.status in (Status.ERROR, Status.STOP, Status.COMPLETED):
return False
self.status = Status.STOP
@@ -172,78 +193,109 @@ class MyYoutubeDL:
return True
@staticmethod
- def get_version():
- # from youtube_dl.version import __version__
- __version__ = __import__(
+ def get_preview_url(url: str) -> Optional[str]:
+ """미리보기용 직접 재생 가능한 URL 추출"""
+ youtube_dl = __import__(youtube_dl_package)
+ try:
+ # 미리보기를 위해 포맷 필터링 (mp4, 비디오+오디오 권장)
+ ydl_opts: Dict[str, Any] = {
+ "format": "best[ext=mp4]/best",
+ "logger": MyLogger(),
+ "nocheckcertificate": True,
+ "quiet": True,
+ }
+ with youtube_dl.YoutubeDL(ydl_opts) as ydl:
+ info: Dict[str, Any] = ydl.extract_info(url, download=False)
+ return info.get("url")
+ except Exception as error:
+ logger.error(f"미리보기 URL 추출 중 예외 발생: {error}")
+ return None
+
+ @staticmethod
+ def get_version() -> str:
+ """라이브러리 버전 확인"""
+ __version__: str = __import__(
f"{youtube_dl_package}.version", fromlist=["__version__"]
).__version__
-
return __version__
@staticmethod
- def get_info_dict(url, proxy=None, cookiefile=None, http_headers=None):
- # import youtube_dl
+ def get_info_dict(
+ url: str,
+ proxy: Optional[str] = None,
+ cookiefile: Optional[str] = None,
+ http_headers: Optional[Dict[str, str]] = None,
+ cookiesfrombrowser: Optional[str] = None
+ ) -> Optional[Dict[str, Any]]:
+ """비디오 메타데이터 정보 추출"""
youtube_dl = __import__(youtube_dl_package)
try:
- ydl_opts = {"extract_flat": "in_playlist", "logger": MyLogger()}
+ ydl_opts: Dict[str, Any] = {
+ "extract_flat": "in_playlist",
+ "logger": MyLogger(),
+ "nocheckcertificate": True,
+ }
if proxy:
ydl_opts["proxy"] = proxy
if cookiefile:
ydl_opts["cookiefile"] = cookiefile
if http_headers:
ydl_opts["http_headers"] = http_headers
+ if cookiesfrombrowser:
+ ydl_opts["cookiesfrombrowser"] = (cookiesfrombrowser, None, None, None)
+
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
- # info = ydl.extract_info(url, download=False)
- info = ydl.extract_info(url, download=False)
+ info: Dict[str, Any] = ydl.extract_info(url, download=False)
except Exception as error:
- logger.error("Exception:%s", error)
+ logger.error(f"정보 추출 중 예외 발생: {error}")
logger.error(traceback.format_exc())
return None
return ydl.sanitize_info(info)
- def my_hook(self, data):
+ def my_hook(self, data: Dict[str, Any]) -> None:
+ """진행률 업데이트 훅"""
if self.status != Status.STOP:
- self.status = {
- "downloading": Status.DOWNLOADING,
- "error": Status.ERROR,
- "finished": Status.FINISHED, # 다운로드 완료. 변환 시작
- }[data["status"]]
+ if data["status"] == "downloading":
+ self.status = Status.DOWNLOADING
+ elif data["status"] == "error":
+ self.status = Status.ERROR
+ elif data["status"] == "finished":
+ self.status = Status.FINISHED
+
if data["status"] != "error":
- self.filename = os.path.basename(data.get("filename"))
+ self.filename = os.path.basename(data.get("filename", self.filename))
self.progress_hooks["downloaded_bytes"] = data.get("downloaded_bytes")
- self.progress_hooks["total_bytes"] = data.get("total_bytes")
+ self.progress_hooks["total_bytes"] = data.get("total_bytes") or data.get("total_bytes_estimate")
self.progress_hooks["eta"] = data.get("eta")
self.progress_hooks["speed"] = data.get("speed")
- def match_filter_func(self, info_dict):
- self.info_dict["playlist_index"] = info_dict["playlist_index"]
- self.info_dict["duration"] = info_dict["duration"]
- self.info_dict["format"] = info_dict["format"]
- self.info_dict["thumbnail"] = info_dict["thumbnail"]
- return None
-
@property
- def status(self):
+ def status(self) -> Status:
return self._status
@status.setter
- def status(self, value):
- from .main import LogicMain
-
+ def status(self, value: Status) -> None:
self._status = value
- LogicMain.socketio_emit("status", self)
+ # Socket.IO를 통한 상태 업데이트 전송
+ try:
+ basic_module = P.get_module('basic')
+ if basic_module:
+ basic_module.socketio_emit("status", self)
+ except Exception as e:
+ logger.error(f"SocketIO 전송 에러: {e}")
class MyLogger:
- def debug(self, msg):
- if msg.find(" ETA ") != -1:
- # 과도한 로그 방지
+ """yt-dlp의 로그를 가로채서 처리하는 클래성"""
+ def debug(self, msg: str) -> None:
+ # 진행 상황 관련 로그는 걸러냄
+ if " ETA " in msg or "at" in msg and "B/s" in msg:
return
logger.debug(msg)
- def warning(self, msg):
+ def warning(self, msg: str) -> None:
logger.warning(msg)
- def error(self, msg):
+ def error(self, msg: str) -> None:
logger.error(msg)
diff --git a/setup.py b/setup.py
index 0a99a2a..c3e5690 100644
--- a/setup.py
+++ b/setup.py
@@ -12,25 +12,15 @@ __menu = {
"list": [
{
"uri": "basic",
- "name": "기본 처리",
+ "name": "유튜브",
"list": [
{
"uri": "setting",
"name": "설정",
},
- ],
- },
- {
- "uri": "basic/download",
- "name": "다운로드",
- },
- {
- "uri": "download",
- "name": "다운로드",
- "list": [
{
- "uri": "basic",
- "name": "다운로드",
+ "uri": "download",
+ "name": "직접 다운로드",
},
],
},
diff --git a/static/youtube-dl_download.js b/static/youtube-dl_download.js
index f7ec53d..97eca86 100644
--- a/static/youtube-dl_download.js
+++ b/static/youtube-dl_download.js
@@ -64,6 +64,52 @@
return;
}
- post_ajax('/download', get_formdata('#download'));
+ post_ajax('/basic/download', getFormdata('#download'));
+ });
+
+ // Artplayer 미리보기 로직
+ let art = null;
+ let last_preview_url = '';
+
+ const init_artplayer = (video_url) => {
+ const wrapper = document.getElementById('player-wrapper');
+ wrapper.style.display = 'block';
+
+ if (art) {
+ art.switchUrl(video_url);
+ return;
+ }
+
+ art = new Artplayer({
+ container: '#player-wrapper',
+ url: video_url,
+ autoplay: false,
+ pip: true,
+ setting: true,
+ flip: true,
+ playbackRate: true,
+ aspectRatio: true,
+ fullscreen: true,
+ fullscreenWeb: true,
+ miniProgressBar: true,
+ mutex: true,
+ backdrop: true,
+ playsInline: true,
+ autoPlayback: false,
+ airplay: true,
+ theme: '#23ade5',
+ });
+ };
+
+ url.addEventListener('change', () => {
+ const target_url = url.value.trim();
+ if (target_url && target_url.startsWith('http') && target_url !== last_preview_url) {
+ last_preview_url = target_url;
+ post_ajax('/basic/preview', { url: target_url }).then((ret) => {
+ if (ret.ret === 'success' && ret.data) {
+ init_artplayer(ret.data);
+ }
+ });
+ }
});
})();
diff --git a/static/youtube-dl_list.js b/static/youtube-dl_list.js
index 61b7583..fdcb539 100644
--- a/static/youtube-dl_list.js
+++ b/static/youtube-dl_list.js
@@ -128,14 +128,14 @@
});
const reload_list = async () => {
- const { data } = await post_ajax('/list');
+ const { data } = await post_ajax('/basic/list');
list_tbody.innerHTML = data.map((item) => make_item(item)).join('');
};
// 전체 중지
all_stop_btn.addEventListener('click', (event) => {
event.preventDefault();
- post_ajax('/all_stop').then(reload_list);
+ post_ajax('/basic/all_stop').then(reload_list);
});
// 중지
@@ -145,7 +145,7 @@
if (!target.classList.contains('youtubeDl-stop')) {
return;
}
- post_ajax('/stop', {
+ post_ajax('/basic/stop', {
index: target.dataset.index,
}).then(reload_list);
});
diff --git a/static/youtube-dl_setting.js b/static/youtube-dl_setting.js
index bc26bfb..017cb09 100644
--- a/static/youtube-dl_setting.js
+++ b/static/youtube-dl_setting.js
@@ -49,7 +49,7 @@
ffmpeg = 'ffmpeg';
}
- post_ajax('/ffmpeg_version', {
+ post_ajax('/basic/ffmpeg_version', {
path: ffmpeg,
}).then(({ data }) => {
modal_title.innerHTML = `${ffmpeg} -version`;
diff --git a/static/youtube-dl_sub.js b/static/youtube-dl_sub.js
index c81be07..c091fef 100644
--- a/static/youtube-dl_sub.js
+++ b/static/youtube-dl_sub.js
@@ -47,6 +47,6 @@
return;
}
- post_ajax('/sub', get_formdata('#download'));
+ post_ajax('/basic/sub', get_formdata('#download'));
});
})();
diff --git a/static/youtube-dl_thumbnail.js b/static/youtube-dl_thumbnail.js
index c81f420..cd17f16 100644
--- a/static/youtube-dl_thumbnail.js
+++ b/static/youtube-dl_thumbnail.js
@@ -42,6 +42,6 @@
return;
}
- post_ajax('/thumbnail', get_formdata('#download'));
+ post_ajax('/basic/thumbnail', get_formdata('#download'));
});
})();
diff --git a/templates/youtube-dl_basic_download.html b/templates/youtube-dl_basic_download.html
index 465fa6b..785bbc0 100644
--- a/templates/youtube-dl_basic_download.html
+++ b/templates/youtube-dl_basic_download.html
@@ -30,6 +30,21 @@
{% endmacro %}
{% block content %}
+
+
+