""" yt-dlp + aria2c 다운로더 (최고속) - aria2c 16개 연결로 3-5배 속도 향상 - YouTube 및 yt-dlp 지원 사이트 전용 """ import os import re import subprocess import traceback from typing import Dict, Any, Optional, Callable from .base import BaseDownloader # 상위 모듈에서 로거 가져오기 try: from ..setup import P logger = P.logger except: import logging logger = logging.getLogger(__name__) class YtdlpAria2Downloader(BaseDownloader): """yt-dlp + aria2c 다운로더""" def __init__(self): super().__init__() self._process: Optional[subprocess.Popen] = None @staticmethod def _normalize_rate(raw_rate: Any) -> str: """속도 제한 문자열 정규화 (예: 6MB -> 6M, 0/None -> '')""" if raw_rate is None: return '' value = str(raw_rate).strip().upper() if not value or value in ('0', '0B', 'UNLIMITED'): return '' m = re.match(r'^(\d+(?:\.\d+)?)\s*([KMG])(?:I?B)?$', value) if m: return f'{m.group(1)}{m.group(2)}' return value def download( self, url: str, save_path: str, filename: Optional[str] = None, progress_callback: Optional[Callable] = None, info_callback: Optional[Callable] = None, **options ) -> Dict[str, Any]: """yt-dlp + aria2c로 다운로드""" try: os.makedirs(save_path, exist_ok=True) # 출력 템플릿 (outtmpl 옵션 우선 처리) raw_outtmpl = options.get('outtmpl') or filename or '%(title)s.%(ext)s' # 경로와 템플릿 결합 후 정규화 output_template = os.path.abspath(os.path.join(save_path, raw_outtmpl)) # 윈도우/리눅스 구분 없이 중복 슬래시 제거 및 절대 경로 확보 output_template = os.path.normpath(output_template) # yt-dlp 명령어 구성 cmd = [ 'yt-dlp', '--newline', # 진행률 파싱용 '--no-check-certificate', '-o', output_template, ] # 제목/썸네일 업데이트용 출력 추가 (GDM_FIX) cmd.extend(['--print', 'before_dl:GDM_FIX:title:%(title)s']) cmd.extend(['--print', 'before_dl:GDM_FIX:thumb:%(thumbnail)s']) # 속도 제한 설정 max_rate = self._normalize_rate( options.get('effective_max_download_rate') or options.get('max_download_rate') or P.ModelSetting.get('max_download_rate') ) rate_limited = bool(max_rate) # aria2c 사용 (설치되어 있으면) aria2c_path = options.get('aria2c_path', 'aria2c') connections = options.get('connections', 4) if self._check_aria2c(aria2c_path): cmd.extend(['--external-downloader', aria2c_path]) # aria2c 설정: -x=연결수, -s=분할수, -j=병렬, -k=조각크기, --console-log-level=notice로 진행률 출력 aria2_args = f'aria2c:-x{connections} -s{connections} -j{connections} -k1M --summary-interval=1 --console-log-level=notice' if rate_limited: aria2_args = f'{aria2_args} --max-download-limit={max_rate}' cmd.extend(['--external-downloader-args', aria2_args]) logger.info(f'[GDM] Using aria2c for multi-threaded download (connections: {connections})') # 진행률 템플릿 추가 (yt-dlp native downloader) cmd.extend(['--progress-template', 'download:GDM_PROGRESS:%(progress._percent_str)s:%(progress._speed_str)s:%(progress._eta_str)s']) # yt-dlp native downloader 제한 (external-downloader 미사용/보조 경로) if rate_limited: cmd.extend(['--limit-rate', max_rate]) if options.get('is_global_rate_split'): logger.info(f'[GDM] global split limit enabled: {max_rate}/s per task') else: logger.info(f'[GDM] download speed limit enabled: {max_rate}/s') # 포맷 선택 format_spec = options.get('format') if not format_spec: if options.get('extract_audio'): format_spec = 'bestaudio/best' else: format_spec = 'bestvideo+bestaudio/best' cmd.extend(['-f', format_spec]) # 병합 포맷 (비디오인 경우에만) if not options.get('extract_audio'): merge_format = options.get('merge_output_format', 'mp4') cmd.extend(['--merge-output-format', merge_format]) # 쿠키 파일 if options.get('cookiefile'): cmd.extend(['--cookies', options['cookiefile']]) # 프록시 if options.get('proxy'): cmd.extend(['--proxy', options['proxy']]) # HTTP 헤더 추가 (Referer 등 - Linkkf 등 리다이렉트 방지용) if options.get('headers'): for key, value in options['headers'].items(): cmd.extend(['--add-header', f'{key}:{value}']) # FFmpeg 경로 자동 감지 및 설정 ffmpeg_path = options.get('ffmpeg_path') or P.ModelSetting.get('ffmpeg_path') if not ffmpeg_path or ffmpeg_path == 'ffmpeg': import shutil detected_path = shutil.which('ffmpeg') if detected_path: ffmpeg_path = detected_path else: common_paths = [ '/opt/homebrew/bin/ffmpeg', '/usr/local/bin/ffmpeg', '/usr/bin/ffmpeg' ] for p in common_paths: if os.path.exists(p): ffmpeg_path = p break if ffmpeg_path: cmd.extend(['--ffmpeg-location', ffmpeg_path]) logger.debug(f'[GDM] 감지된 FFmpeg 경로: {ffmpeg_path}') # 추가 인자 (extra_args: list) extra_args = options.get('extra_args', []) if isinstance(extra_args, list): cmd.extend(extra_args) if options.get('extract_audio'): cmd.append('--extract-audio') if options.get('audio_format'): cmd.extend(['--audio-format', options['audio_format']]) if options.get('embed_thumbnail'): cmd.append('--embed-thumbnail') if options.get('add_metadata'): cmd.append('--add-metadata') # URL 추가 cmd.append(url) logger.info(f'[GDM] yt-dlp command: {" ".join(cmd)}') # 프로세스 실행 self._process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1 ) final_filepath = '' last_logged_pct = -1 # 출력 파싱 for line in self._process.stdout: if self._cancelled: self._process.terminate() return {'success': False, 'error': 'Cancelled'} line = line.strip() if not line: continue # 메타데이터 파싱 (GDM_FIX) if 'GDM_FIX:' in line: try: if 'GDM_FIX:title:' in line: title = line.split('GDM_FIX:title:', 1)[1].strip() if info_callback: info_callback({'title': title}) elif 'GDM_FIX:thumb:' in line: thumb = line.split('GDM_FIX:thumb:', 1)[1].strip() if info_callback: info_callback({'thumbnail': thumb}) except: pass # 진행률 파싱 - GDM_PROGRESS 템플릿 (우선) # 형식: GDM_PROGRESS:XX.X%:SPEED:ETA if 'GDM_PROGRESS:' in line: try: parts = line.split('GDM_PROGRESS:', 1)[1].split(':') if len(parts) >= 1: pct_str = parts[0].strip().replace('%', '').strip() progress = int(float(pct_str)) if pct_str and pct_str != 'N/A' else 0 speed = parts[1].strip() if len(parts) > 1 else '' eta = parts[2].strip() if len(parts) > 2 else '' if speed == 'N/A': speed = '' if eta == 'N/A': eta = '' if progress_callback and progress > 0: progress_callback(progress, speed, eta) continue except: pass # 진행률 파싱 (yt-dlp default) progress_match = re.search(r'\[download\]\s+(\d+\.?\d*)%', line) should_log = True if progress_match: pct = float(progress_match.group(1)) if int(pct) >= last_logged_pct + 5 or pct >= 99.9: last_logged_pct = int(pct) else: should_log = False if should_log: logger.info(f'[GDM][yt-dlp] {line}') if not progress_match: aria2_match = re.search(r'\(\s*([\d.]+)%\)', line) if aria2_match and (('DL:' in line) or ('CN:' in line)): try: progress = int(float(aria2_match.group(1))) speed_match = re.search(r'DL:(\S+)', line) speed = speed_match.group(1) if speed_match else '' eta_match = re.search(r'ETA:(\S+)', line) eta = eta_match.group(1) if eta_match else '' if progress_callback: progress_callback(progress, speed, eta) continue except Exception as e: logger.error(f'Parsing Error: {e}') if progress_match and progress_callback: progress = int(float(progress_match.group(1))) speed = '' speed_match = re.search(r'at\s+([\d.]+\s*[KMG]?i?B/s)', line) if speed_match: speed = speed_match.group(1) eta = '' eta_match = re.search(r'ETA\s+([\d:]+)', line) if eta_match: eta = eta_match.group(1) progress_callback(progress, speed, eta) if any(x in line for x in ['[Merger]', '[VideoConvertor]', 'Destination:']): path_match = re.search(r'(?:Destination:|into|to)\s+["\']?(.+?)(?:["\']|$)', line) if path_match: potential_path = path_match.group(1).strip('"\'') if '.' in os.path.basename(potential_path): final_filepath = potential_path self._process.wait() if self._process.returncode == 0: if progress_callback: progress_callback(100, '', '') # 자막 다운로드 처리 vtt_url = options.get('subtitles') if vtt_url and final_filepath: try: self._download_subtitle(vtt_url, final_filepath, headers=options.get('headers')) except Exception as e: logger.error(f'[GDM] Subtitle download error: {e}') return {'success': True, 'filepath': final_filepath} else: return {'success': False, 'error': f'Exit code: {self._process.returncode}'} except Exception as e: logger.error(f'YtdlpAria2 download error: {e}') logger.error(traceback.format_exc()) return {'success': False, 'error': str(e)} def get_info(self, url: str) -> Dict[str, Any]: """URL 정보 추출""" try: import yt_dlp ydl_opts = { 'quiet': True, 'no_warnings': True, 'extract_flat': False, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) return { 'title': info.get('title', ''), 'thumbnail': info.get('thumbnail', ''), 'duration': info.get('duration', 0), 'formats': info.get('formats', []), 'uploader': info.get('uploader', ''), 'view_count': info.get('view_count', 0), } except Exception as e: logger.error(f'get_info error: {e}') return {} def cancel(self): """다운로드 취소""" super().cancel() if self._process: try: # [FIX] 파이프 명시적으로 닫기 if self._process.stdout: self._process.stdout.close() if self._process.stderr: self._process.stderr.close() self._process.terminate() # 짧은 대기 후 여전히 살아있으면 kill try: self._process.wait(timeout=1) except: self._process.kill() except: pass def _check_aria2c(self, aria2c_path: str) -> bool: """aria2c 설치 확인""" try: result = subprocess.run( [aria2c_path, '--version'], capture_output=True, timeout=5 ) return result.returncode == 0 except: return False def _download_subtitle(self, vtt_url: str, output_path: str, headers: Optional[dict] = None): """자막 다운로드 및 SRT 변환""" try: import requests # 자막 파일 경로 생성 (비디오 파일명.srt) video_basename = os.path.splitext(output_path)[0] srt_path = video_basename + ".srt" logger.info(f"[GDM] Downloading subtitle from: {vtt_url}") response = requests.get(vtt_url, headers=headers, timeout=30) if response.status_code == 200: vtt_content = response.text srt_content = self._vtt_to_srt(vtt_content) with open(srt_path, "w", encoding="utf-8") as f: f.write(srt_content) logger.info(f"[GDM] Subtitle saved to: {srt_path}") return True except Exception as e: logger.error(f"[GDM] Failed to download subtitle: {e}") return False def _vtt_to_srt(self, vtt_content: str) -> str: """VTT 형식을 SRT 형식으로 간단히 변환""" if not vtt_content.startswith("WEBVTT"): return vtt_content lines = vtt_content.split("\n") srt_lines = [] cue_index = 1 i = 0 while i < len(lines): line = lines[i].strip() if line.startswith("WEBVTT") or line.startswith("NOTE") or line.startswith("STYLE"): i += 1 continue if not line: i += 1 continue if "-->" in line: # VTT 타임코드를 SRT 형식으로 변환 (. -> ,) srt_timecode = line.replace(".", ",") srt_lines.append(str(cue_index)) srt_lines.append(srt_timecode) cue_index += 1 i += 1 while i < len(lines) and lines[i].strip(): srt_lines.append(lines[i].rstrip()) i += 1 srt_lines.append("") else: i += 1 return "\n".join(srt_lines)