fix: enforce max_concurrent and global speed-limit distribution

This commit is contained in:
2026-03-03 18:36:56 +09:00
parent 1cdf68cc59
commit 7a087ce9c5
5 changed files with 163 additions and 4 deletions

View File

@@ -47,6 +47,8 @@ class FfmpegHlsDownloader(BaseDownloader):
# ffmpeg 명령어 구성
ffmpeg_path = options.get('ffmpeg_path', 'ffmpeg')
if options.get('effective_max_download_rate') or options.get('max_download_rate'):
logger.warning('[GDM] ffmpeg_hls downloader does not support strict bandwidth cap; total limit may be approximate for HLS tasks.')
cmd = [ffmpeg_path, '-y']

View File

@@ -5,6 +5,8 @@ HTTP 직접 다운로더
"""
import os
import traceback
import re
import time
from typing import Dict, Any, Optional, Callable
from .base import BaseDownloader
@@ -19,6 +21,21 @@ except:
class HttpDirectDownloader(BaseDownloader):
"""HTTP 직접 다운로더"""
@staticmethod
def _rate_to_bps(rate_value: Any) -> float:
if rate_value is None:
return 0.0
value = str(rate_value).strip().upper()
if not value or value in ('0', 'UNLIMITED'):
return 0.0
m = re.match(r'^(\d+(?:\.\d+)?)\s*([KMG])(?:I?B)?$', value)
if not m:
return 0.0
num = float(m.group(1))
unit = m.group(2)
mul = {'K': 1024, 'M': 1024 ** 2, 'G': 1024 ** 3}[unit]
return num * mul
def download(
self,
@@ -53,6 +70,9 @@ class HttpDirectDownloader(BaseDownloader):
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
chunk_size = 1024 * 1024 # 1MB 청크
max_rate = options.get('effective_max_download_rate') or options.get('max_download_rate')
rate_bps = self._rate_to_bps(max_rate)
start_time = time.monotonic()
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=chunk_size):
@@ -62,6 +82,13 @@ class HttpDirectDownloader(BaseDownloader):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# 평균 다운로드 속도를 제한(총량 제한 분배값 포함)
if rate_bps > 0:
elapsed = max(0.001, time.monotonic() - start_time)
expected_elapsed = downloaded / rate_bps
if expected_elapsed > elapsed:
time.sleep(expected_elapsed - elapsed)
if total_size > 0 and progress_callback:
progress = int(downloaded / total_size * 100)

View File

@@ -26,6 +26,19 @@ class YtdlpAria2Downloader(BaseDownloader):
def __init__(self):
super().__init__()
self._process: Optional[subprocess.Popen] = None
@staticmethod
def _normalize_rate(raw_rate: Any) -> str:
"""속도 제한 문자열 정규화 (예: 6MB -> 6M, 0/None -> '')"""
if raw_rate is None:
return ''
value = str(raw_rate).strip().upper()
if not value or value in ('0', '0B', 'UNLIMITED'):
return ''
m = re.match(r'^(\d+(?:\.\d+)?)\s*([KMG])(?:I?B)?$', value)
if m:
return f'{m.group(1)}{m.group(2)}'
return value
def download(
self,
@@ -61,8 +74,12 @@ class YtdlpAria2Downloader(BaseDownloader):
cmd.extend(['--print', 'before_dl:GDM_FIX:thumb:%(thumbnail)s'])
# 속도 제한 설정
max_rate = P.ModelSetting.get('max_download_rate')
rate_limited = bool(max_rate and max_rate != '0')
max_rate = self._normalize_rate(
options.get('effective_max_download_rate')
or options.get('max_download_rate')
or P.ModelSetting.get('max_download_rate')
)
rate_limited = bool(max_rate)
# aria2c 사용 (설치되어 있으면)
aria2c_path = options.get('aria2c_path', 'aria2c')
@@ -83,6 +100,10 @@ class YtdlpAria2Downloader(BaseDownloader):
# yt-dlp native downloader 제한 (external-downloader 미사용/보조 경로)
if rate_limited:
cmd.extend(['--limit-rate', max_rate])
if options.get('is_global_rate_split'):
logger.info(f'[GDM] global split limit enabled: {max_rate}/s per task')
else:
logger.info(f'[GDM] download speed limit enabled: {max_rate}/s')
# 포맷 선택
format_spec = options.get('format')

View File

@@ -1,6 +1,6 @@
title: "GDM"
package_name: gommi_downloader_manager
version: '0.2.33'
version: '0.2.35'
description: FlaskFarm 범용 다운로더 큐 - YouTube, 애니24, 링크애니, Anilife 지원
developer: projectdx
home: https://gitea.yommi.duckdns.org/projectdx/gommi_downloader_manager

View File

@@ -5,6 +5,7 @@ import os
import time
import threading
import traceback
import re
from datetime import datetime
from typing import Optional, Dict, Any, List, Callable
from enum import Enum
@@ -46,6 +47,8 @@ class ModuleQueue(PluginModuleBase):
# 진행 중인 다운로드 인스턴스들
_downloads: Dict[str, 'DownloadTask'] = {}
_queue_lock = threading.Lock()
_concurrency_sem: Optional[threading.Semaphore] = None
_concurrency_limit: int = 0
# 업데이트 체크 캐싱
_last_update_check = 0
@@ -55,6 +58,32 @@ class ModuleQueue(PluginModuleBase):
from .setup import default_route_socketio_module
super(ModuleQueue, self).__init__(P, name='queue', first_menu='list')
default_route_socketio_module(self, attach='/queue')
self._ensure_concurrency_limit()
@classmethod
def _ensure_concurrency_limit(cls):
"""max_concurrent 설정 기반 동시 실행 슬롯 보장"""
try:
from .setup import P
configured = int(P.ModelSetting.get('max_concurrent') or 3)
except Exception:
configured = 3
configured = max(1, configured)
if cls._concurrency_sem is None:
cls._concurrency_sem = threading.Semaphore(configured)
cls._concurrency_limit = configured
return
if cls._concurrency_limit != configured:
# 실행 중 태스크가 없을 때만 세마포어 재생성
active = any(
t.status == DownloadStatus.DOWNLOADING and not t._cancelled
for t in cls._downloads.values()
)
if not active:
cls._concurrency_sem = threading.Semaphore(configured)
cls._concurrency_limit = configured
def process_menu(self, page_name: str, req: Any) -> Any:
@@ -459,6 +488,7 @@ class ModuleQueue(PluginModuleBase):
def plugin_load(self) -> None:
"""플러그인 로드 시 초기화"""
self.P.logger.info('gommi_downloader 플러그인 로드')
self._ensure_concurrency_limit()
try:
# DB에서 진행 중인 작업 로드
with F.app.app_context():
@@ -671,9 +701,35 @@ class DownloadTask:
"""다운로드 시작 (비동기)"""
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
@staticmethod
def _rate_to_bps(rate_value: Any) -> float:
"""'6M'/'900K' 형태를 bytes/sec로 변환"""
if rate_value is None:
return 0.0
value = str(rate_value).strip().upper()
if not value or value in ('0', 'UNLIMITED'):
return 0.0
m = re.match(r'^(\d+(?:\.\d+)?)\s*([KMG])(?:I?B)?$', value)
if not m:
return 0.0
num = float(m.group(1))
unit = m.group(2)
mul = {'K': 1024, 'M': 1024 ** 2, 'G': 1024 ** 3}[unit]
return num * mul
@staticmethod
def _bps_to_rate(bps: float) -> str:
"""bytes/sec를 yt-dlp/aria2 형식 문자열로 변환"""
if bps <= 0:
return '0'
if bps >= 1024 ** 2:
return f'{max(0.1, bps / (1024 ** 2)):.2f}M'
return f'{max(1.0, bps / 1024):.2f}K'
def _run(self):
"""다운로드 실행"""
slot_acquired = False
try:
self.status = DownloadStatus.EXTRACTING
if not self.start_time:
@@ -686,9 +742,57 @@ class DownloadTask:
if not self._downloader:
raise Exception(f"지원하지 않는 소스 타입: {self.source_type}")
# 동시 다운로드 제한 슬롯 획득
ModuleQueue._ensure_concurrency_limit()
sem = ModuleQueue._concurrency_sem
if sem is not None:
while not self._cancelled:
if sem.acquire(timeout=0.5):
slot_acquired = True
break
if not slot_acquired:
self.status = DownloadStatus.CANCELLED
self._emit_status()
return
self.status = DownloadStatus.DOWNLOADING
self._emit_status()
# 전역 설정값을 태스크 옵션에 주입 (개별 호출 옵션이 있으면 우선)
from .setup import P
runtime_options = dict(self.options or {})
if not runtime_options.get('aria2c_path'):
runtime_options['aria2c_path'] = P.ModelSetting.get('aria2c_path')
if not runtime_options.get('connections'):
try:
runtime_options['connections'] = int(P.ModelSetting.get('aria2c_connections') or 16)
except Exception:
runtime_options['connections'] = 16
if not runtime_options.get('ffmpeg_path'):
runtime_options['ffmpeg_path'] = P.ModelSetting.get('ffmpeg_path')
if not runtime_options.get('max_download_rate'):
runtime_options['max_download_rate'] = P.ModelSetting.get('max_download_rate')
# 전체 속도 제한을 활성 다운로드 수에 따라 분배 (합산 속도 상한)
raw_global_rate = runtime_options.get('max_download_rate')
global_bps = self._rate_to_bps(raw_global_rate)
if global_bps > 0:
with ModuleQueue._queue_lock:
active_count = sum(
1
for t in ModuleQueue._downloads.values()
if t.status == DownloadStatus.DOWNLOADING and not t._cancelled
)
active_count = max(1, active_count)
effective_bps = global_bps / active_count
runtime_options['effective_max_download_rate'] = self._bps_to_rate(effective_bps)
runtime_options['is_global_rate_split'] = active_count > 1
if active_count > 1:
P.logger.info(
f'[GDM] Global speed split: total={raw_global_rate}/s, '
f'active={active_count}, per-task={runtime_options["effective_max_download_rate"]}/s'
)
# 다운로드 실행
result = self._downloader.download(
@@ -697,7 +801,7 @@ class DownloadTask:
filename=self.filename,
progress_callback=self._progress_callback,
info_callback=self._info_update_callback,
**self.options
**runtime_options
)
if self._cancelled:
@@ -741,6 +845,11 @@ class DownloadTask:
self._cleanup_if_empty()
finally:
if slot_acquired and ModuleQueue._concurrency_sem is not None:
try:
ModuleQueue._concurrency_sem.release()
except Exception:
pass
self._emit_status()
def _progress_callback(self, progress: int, speed: str = '', eta: str = ''):