From 7a087ce9c594e175d44ac3b7cddd88f8c165206b Mon Sep 17 00:00:00 2001 From: projectdx Date: Tue, 3 Mar 2026 18:36:56 +0900 Subject: [PATCH] fix: enforce max_concurrent and global speed-limit distribution --- downloader/ffmpeg_hls.py | 2 + downloader/http_direct.py | 27 ++++++++++ downloader/ytdlp_aria2.py | 25 ++++++++- info.yaml | 2 +- mod_queue.py | 111 +++++++++++++++++++++++++++++++++++++- 5 files changed, 163 insertions(+), 4 deletions(-) diff --git a/downloader/ffmpeg_hls.py b/downloader/ffmpeg_hls.py index 0d61d55..7799b6b 100644 --- a/downloader/ffmpeg_hls.py +++ b/downloader/ffmpeg_hls.py @@ -47,6 +47,8 @@ class FfmpegHlsDownloader(BaseDownloader): # ffmpeg 명령어 구성 ffmpeg_path = options.get('ffmpeg_path', 'ffmpeg') + if options.get('effective_max_download_rate') or options.get('max_download_rate'): + logger.warning('[GDM] ffmpeg_hls downloader does not support strict bandwidth cap; total limit may be approximate for HLS tasks.') cmd = [ffmpeg_path, '-y'] diff --git a/downloader/http_direct.py b/downloader/http_direct.py index c39bed0..0f00bdb 100644 --- a/downloader/http_direct.py +++ b/downloader/http_direct.py @@ -5,6 +5,8 @@ HTTP 직접 다운로더 """ import os import traceback +import re +import time from typing import Dict, Any, Optional, Callable from .base import BaseDownloader @@ -19,6 +21,21 @@ except: class HttpDirectDownloader(BaseDownloader): """HTTP 직접 다운로더""" + + @staticmethod + def _rate_to_bps(rate_value: Any) -> float: + if rate_value is None: + return 0.0 + value = str(rate_value).strip().upper() + if not value or value in ('0', 'UNLIMITED'): + return 0.0 + m = re.match(r'^(\d+(?:\.\d+)?)\s*([KMG])(?:I?B)?$', value) + if not m: + return 0.0 + num = float(m.group(1)) + unit = m.group(2) + mul = {'K': 1024, 'M': 1024 ** 2, 'G': 1024 ** 3}[unit] + return num * mul def download( self, @@ -53,6 +70,9 @@ class HttpDirectDownloader(BaseDownloader): total_size = int(response.headers.get('content-length', 0)) downloaded = 0 chunk_size = 1024 * 1024 # 1MB 청크 + max_rate = options.get('effective_max_download_rate') or options.get('max_download_rate') + rate_bps = self._rate_to_bps(max_rate) + start_time = time.monotonic() with open(filepath, 'wb') as f: for chunk in response.iter_content(chunk_size=chunk_size): @@ -62,6 +82,13 @@ class HttpDirectDownloader(BaseDownloader): if chunk: f.write(chunk) downloaded += len(chunk) + + # 평균 다운로드 속도를 제한(총량 제한 분배값 포함) + if rate_bps > 0: + elapsed = max(0.001, time.monotonic() - start_time) + expected_elapsed = downloaded / rate_bps + if expected_elapsed > elapsed: + time.sleep(expected_elapsed - elapsed) if total_size > 0 and progress_callback: progress = int(downloaded / total_size * 100) diff --git a/downloader/ytdlp_aria2.py b/downloader/ytdlp_aria2.py index adce00b..16addc0 100644 --- a/downloader/ytdlp_aria2.py +++ b/downloader/ytdlp_aria2.py @@ -26,6 +26,19 @@ class YtdlpAria2Downloader(BaseDownloader): def __init__(self): super().__init__() self._process: Optional[subprocess.Popen] = None + + @staticmethod + def _normalize_rate(raw_rate: Any) -> str: + """속도 제한 문자열 정규화 (예: 6MB -> 6M, 0/None -> '')""" + if raw_rate is None: + return '' + value = str(raw_rate).strip().upper() + if not value or value in ('0', '0B', 'UNLIMITED'): + return '' + m = re.match(r'^(\d+(?:\.\d+)?)\s*([KMG])(?:I?B)?$', value) + if m: + return f'{m.group(1)}{m.group(2)}' + return value def download( self, @@ -61,8 +74,12 @@ class YtdlpAria2Downloader(BaseDownloader): cmd.extend(['--print', 'before_dl:GDM_FIX:thumb:%(thumbnail)s']) # 속도 제한 설정 - max_rate = P.ModelSetting.get('max_download_rate') - rate_limited = bool(max_rate and max_rate != '0') + max_rate = self._normalize_rate( + options.get('effective_max_download_rate') + or options.get('max_download_rate') + or P.ModelSetting.get('max_download_rate') + ) + rate_limited = bool(max_rate) # aria2c 사용 (설치되어 있으면) aria2c_path = options.get('aria2c_path', 'aria2c') @@ -83,6 +100,10 @@ class YtdlpAria2Downloader(BaseDownloader): # yt-dlp native downloader 제한 (external-downloader 미사용/보조 경로) if rate_limited: cmd.extend(['--limit-rate', max_rate]) + if options.get('is_global_rate_split'): + logger.info(f'[GDM] global split limit enabled: {max_rate}/s per task') + else: + logger.info(f'[GDM] download speed limit enabled: {max_rate}/s') # 포맷 선택 format_spec = options.get('format') diff --git a/info.yaml b/info.yaml index b753494..a76f5c9 100644 --- a/info.yaml +++ b/info.yaml @@ -1,6 +1,6 @@ title: "GDM" package_name: gommi_downloader_manager -version: '0.2.33' +version: '0.2.35' description: FlaskFarm 범용 다운로더 큐 - YouTube, 애니24, 링크애니, Anilife 지원 developer: projectdx home: https://gitea.yommi.duckdns.org/projectdx/gommi_downloader_manager diff --git a/mod_queue.py b/mod_queue.py index 844bbe8..b5fc0a0 100644 --- a/mod_queue.py +++ b/mod_queue.py @@ -5,6 +5,7 @@ import os import time import threading import traceback +import re from datetime import datetime from typing import Optional, Dict, Any, List, Callable from enum import Enum @@ -46,6 +47,8 @@ class ModuleQueue(PluginModuleBase): # 진행 중인 다운로드 인스턴스들 _downloads: Dict[str, 'DownloadTask'] = {} _queue_lock = threading.Lock() + _concurrency_sem: Optional[threading.Semaphore] = None + _concurrency_limit: int = 0 # 업데이트 체크 캐싱 _last_update_check = 0 @@ -55,6 +58,32 @@ class ModuleQueue(PluginModuleBase): from .setup import default_route_socketio_module super(ModuleQueue, self).__init__(P, name='queue', first_menu='list') default_route_socketio_module(self, attach='/queue') + self._ensure_concurrency_limit() + + @classmethod + def _ensure_concurrency_limit(cls): + """max_concurrent 설정 기반 동시 실행 슬롯 보장""" + try: + from .setup import P + configured = int(P.ModelSetting.get('max_concurrent') or 3) + except Exception: + configured = 3 + configured = max(1, configured) + + if cls._concurrency_sem is None: + cls._concurrency_sem = threading.Semaphore(configured) + cls._concurrency_limit = configured + return + + if cls._concurrency_limit != configured: + # 실행 중 태스크가 없을 때만 세마포어 재생성 + active = any( + t.status == DownloadStatus.DOWNLOADING and not t._cancelled + for t in cls._downloads.values() + ) + if not active: + cls._concurrency_sem = threading.Semaphore(configured) + cls._concurrency_limit = configured def process_menu(self, page_name: str, req: Any) -> Any: @@ -459,6 +488,7 @@ class ModuleQueue(PluginModuleBase): def plugin_load(self) -> None: """플러그인 로드 시 초기화""" self.P.logger.info('gommi_downloader 플러그인 로드') + self._ensure_concurrency_limit() try: # DB에서 진행 중인 작업 로드 with F.app.app_context(): @@ -671,9 +701,35 @@ class DownloadTask: """다운로드 시작 (비동기)""" self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() + + @staticmethod + def _rate_to_bps(rate_value: Any) -> float: + """'6M'/'900K' 형태를 bytes/sec로 변환""" + if rate_value is None: + return 0.0 + value = str(rate_value).strip().upper() + if not value or value in ('0', 'UNLIMITED'): + return 0.0 + m = re.match(r'^(\d+(?:\.\d+)?)\s*([KMG])(?:I?B)?$', value) + if not m: + return 0.0 + num = float(m.group(1)) + unit = m.group(2) + mul = {'K': 1024, 'M': 1024 ** 2, 'G': 1024 ** 3}[unit] + return num * mul + + @staticmethod + def _bps_to_rate(bps: float) -> str: + """bytes/sec를 yt-dlp/aria2 형식 문자열로 변환""" + if bps <= 0: + return '0' + if bps >= 1024 ** 2: + return f'{max(0.1, bps / (1024 ** 2)):.2f}M' + return f'{max(1.0, bps / 1024):.2f}K' def _run(self): """다운로드 실행""" + slot_acquired = False try: self.status = DownloadStatus.EXTRACTING if not self.start_time: @@ -686,9 +742,57 @@ class DownloadTask: if not self._downloader: raise Exception(f"지원하지 않는 소스 타입: {self.source_type}") + + # 동시 다운로드 제한 슬롯 획득 + ModuleQueue._ensure_concurrency_limit() + sem = ModuleQueue._concurrency_sem + if sem is not None: + while not self._cancelled: + if sem.acquire(timeout=0.5): + slot_acquired = True + break + if not slot_acquired: + self.status = DownloadStatus.CANCELLED + self._emit_status() + return self.status = DownloadStatus.DOWNLOADING self._emit_status() + + # 전역 설정값을 태스크 옵션에 주입 (개별 호출 옵션이 있으면 우선) + from .setup import P + runtime_options = dict(self.options or {}) + if not runtime_options.get('aria2c_path'): + runtime_options['aria2c_path'] = P.ModelSetting.get('aria2c_path') + if not runtime_options.get('connections'): + try: + runtime_options['connections'] = int(P.ModelSetting.get('aria2c_connections') or 16) + except Exception: + runtime_options['connections'] = 16 + if not runtime_options.get('ffmpeg_path'): + runtime_options['ffmpeg_path'] = P.ModelSetting.get('ffmpeg_path') + if not runtime_options.get('max_download_rate'): + runtime_options['max_download_rate'] = P.ModelSetting.get('max_download_rate') + + # 전체 속도 제한을 활성 다운로드 수에 따라 분배 (합산 속도 상한) + raw_global_rate = runtime_options.get('max_download_rate') + global_bps = self._rate_to_bps(raw_global_rate) + if global_bps > 0: + with ModuleQueue._queue_lock: + active_count = sum( + 1 + for t in ModuleQueue._downloads.values() + if t.status == DownloadStatus.DOWNLOADING and not t._cancelled + ) + active_count = max(1, active_count) + effective_bps = global_bps / active_count + runtime_options['effective_max_download_rate'] = self._bps_to_rate(effective_bps) + runtime_options['is_global_rate_split'] = active_count > 1 + if active_count > 1: + P.logger.info( + f'[GDM] Global speed split: total={raw_global_rate}/s, ' + f'active={active_count}, per-task={runtime_options["effective_max_download_rate"]}/s' + ) # 다운로드 실행 result = self._downloader.download( @@ -697,7 +801,7 @@ class DownloadTask: filename=self.filename, progress_callback=self._progress_callback, info_callback=self._info_update_callback, - **self.options + **runtime_options ) if self._cancelled: @@ -741,6 +845,11 @@ class DownloadTask: self._cleanup_if_empty() finally: + if slot_acquired and ModuleQueue._concurrency_sem is not None: + try: + ModuleQueue._concurrency_sem.release() + except Exception: + pass self._emit_status() def _progress_callback(self, progress: int, speed: str = '', eta: str = ''):