anilife.live 사이트 구현

다른 버그도 고침
This commit is contained in:
2025-12-28 19:38:18 +09:00
parent e6e8c45f5a
commit 6dbeff13d3
14 changed files with 1576 additions and 347 deletions

177
lib/camoufox_anilife.py Normal file
View File

@@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""
Camoufox 기반 Anilife 비디오 URL 추출 스크립트
강력한 봇 감지 우회 기능이 있는 스텔스 Firefox
사용법:
python camoufox_anilife.py <detail_url> <episode_num>
"""
import sys
import json
import time
import re
def extract_aldata(detail_url: str, episode_num: str) -> dict:
"""Camoufox로 Detail 페이지에서 _aldata 추출"""
try:
from camoufox.sync_api import Camoufox
except ImportError as e:
return {"error": f"Camoufox not installed: {e}"}
result = {
"success": False,
"aldata": None,
"html": None,
"current_url": None,
"error": None,
"vod_url": None
}
try:
# Camoufox 시작 (자동 fingerprint 생성)
with Camoufox(headless=False) as browser:
page = browser.new_page()
try:
# 1. Detail 페이지로 이동
print(f"1. Navigating to detail page: {detail_url}", file=sys.stderr)
page.goto(detail_url, wait_until="domcontentloaded", timeout=30000)
time.sleep(2)
print(f" Current URL: {page.url}", file=sys.stderr)
# 2. 에피소드 목록으로 스크롤
page.mouse.wheel(0, 800)
time.sleep(1)
# 3. 해당 에피소드 찾아서 클릭
print(f"2. Looking for episode {episode_num}", file=sys.stderr)
episode_clicked = False
try:
# epl-num 클래스의 div에서 에피소드 번호 찾기
episode_link = page.locator(f'a:has(.epl-num:text("{episode_num}"))').first
if episode_link.is_visible(timeout=5000):
href = episode_link.get_attribute("href")
print(f" Found episode link: {href}", file=sys.stderr)
episode_link.click()
episode_clicked = True
time.sleep(3)
except Exception as e:
print(f" Method 1 failed: {e}", file=sys.stderr)
if not episode_clicked:
try:
# provider 링크들 중에서 에피소드 번호가 포함된 것 클릭
links = page.locator('a[href*="/ani/provider/"]').all()
for link in links:
text = link.inner_text()
if episode_num in text:
print(f" Found: {text}", file=sys.stderr)
link.click()
episode_clicked = True
time.sleep(3)
break
except Exception as e:
print(f" Method 2 failed: {e}", file=sys.stderr)
if not episode_clicked:
result["error"] = f"Episode {episode_num} not found"
result["html"] = page.content()
return result
# 4. Provider 페이지에서 _aldata 추출
print(f"3. Provider page URL: {page.url}", file=sys.stderr)
result["current_url"] = page.url
# 리다이렉트 확인
if "/ani/provider/" not in page.url:
result["error"] = f"Redirected to {page.url}"
result["html"] = page.content()
return result
# _aldata 추출 시도
try:
aldata_value = page.evaluate("typeof _aldata !== 'undefined' ? _aldata : null")
if aldata_value:
result["aldata"] = aldata_value
result["success"] = True
print(f" SUCCESS! _aldata found: {aldata_value[:60]}...", file=sys.stderr)
return result
except Exception as js_err:
print(f" JS error: {js_err}", file=sys.stderr)
# HTML에서 _aldata 패턴 추출 시도
html = page.content()
aldata_match = re.search(r'_aldata\s*=\s*["\']([A-Za-z0-9+/=]+)["\']', html)
if aldata_match:
result["aldata"] = aldata_match.group(1)
result["success"] = True
print(f" SUCCESS! _aldata from HTML: {result['aldata'][:60]}...", file=sys.stderr)
return result
# 5. CloudVideo 버튼 클릭 시도
print("4. Trying CloudVideo button click...", file=sys.stderr)
try:
page.mouse.wheel(0, 500)
time.sleep(1)
cloudvideo_btn = page.locator('a[onclick*="moveCloudvideo"], a[onclick*="moveJawcloud"]').first
if cloudvideo_btn.is_visible(timeout=3000):
cloudvideo_btn.click()
time.sleep(3)
result["current_url"] = page.url
print(f" After click URL: {page.url}", file=sys.stderr)
# 리다이렉트 확인 (구글로 갔는지)
if "google.com" in page.url:
result["error"] = "Redirected to Google - bot detected"
return result
# 플레이어 페이지에서 _aldata 추출
try:
aldata_value = page.evaluate("typeof _aldata !== 'undefined' ? _aldata : null")
if aldata_value:
result["aldata"] = aldata_value
result["success"] = True
print(f" SUCCESS! _aldata: {aldata_value[:60]}...", file=sys.stderr)
return result
except:
pass
# HTML에서 추출
html = page.content()
aldata_match = re.search(r'_aldata\s*=\s*["\']([A-Za-z0-9+/=]+)["\']', html)
if aldata_match:
result["aldata"] = aldata_match.group(1)
result["success"] = True
return result
result["html"] = html
except Exception as click_err:
print(f" Click error: {click_err}", file=sys.stderr)
result["html"] = page.content()
finally:
page.close()
except Exception as e:
result["error"] = str(e)
import traceback
print(traceback.format_exc(), file=sys.stderr)
return result
if __name__ == "__main__":
if len(sys.argv) < 3:
print(json.dumps({"error": "Usage: python camoufox_anilife.py <detail_url> <episode_num>"}))
sys.exit(1)
detail_url = sys.argv[1]
episode_num = sys.argv[2]
result = extract_aldata(detail_url, episode_num)
print(json.dumps(result, ensure_ascii=False))

View File

@@ -43,6 +43,7 @@ class FfmpegQueueEntity(abc.ABCMeta("ABC", (object,), {"__slots__": ()})):
self.filepath = None
self.quality = None
self.headers = None
self.proxy = None
self.current_speed = "" # 다운로드 속도
self.download_time = "" # 경과 시간
# FfmpegQueueEntity.static_index += 1
@@ -79,7 +80,27 @@ class FfmpegQueueEntity(abc.ABCMeta("ABC", (object,), {"__slots__": ()})):
tmp["filename"] = self.filename
tmp["filepath"] = self.filepath
tmp["quality"] = self.quality
# tmp['current_speed'] = self.ffmpeg_arg['current_speed'] if self.ffmpeg_arg is not None else ''
tmp["current_speed"] = self.current_speed
tmp["download_time"] = self.download_time
# 템플릿 호환 필드 추가 (queue.html에서 사용하는 필드명)
tmp["idx"] = self.entity_id
tmp["callback_id"] = getattr(self, 'name', 'anilife') if hasattr(self, 'name') else 'anilife'
tmp["start_time"] = self.created_time
tmp["status_kor"] = self.ffmpeg_status_kor
tmp["status_str"] = str(self.ffmpeg_status) if self.ffmpeg_status != -1 else "WAITING"
tmp["percent"] = self.ffmpeg_percent
tmp["duration_str"] = ""
tmp["duration"] = ""
tmp["current_duration"] = ""
tmp["current_pf_count"] = 0
tmp["max_pf_count"] = 0
tmp["current_bitrate"] = ""
tmp["end_time"] = ""
tmp["exist"] = False
tmp["temp_fullpath"] = self.filepath or ""
tmp["save_fullpath"] = self.filepath or ""
tmp = self.info_dict(tmp)
return tmp
@@ -194,13 +215,19 @@ class FfmpegQueue(object):
P.logger.debug(filename)
# P.logger.debug(filepath)
# SupportFfmpeg 초기화
self.support_init()
# entity.headers가 있으면 우선 사용, 없으면 caller.headers 사용
_headers = entity.headers
if _headers is None and self.caller is not None:
_headers = self.caller.headers
# SupportFfmpeg 초기화
self.support_init()
# proxy 가져오기
_proxy = getattr(entity, 'proxy', None)
if _proxy is None and self.caller is not None:
_proxy = getattr(self.caller, 'proxy', None)
logger.info(f"Starting ffmpeg download - video_url: {video_url}")
logger.info(f"save_path: {dirname}, filename: {filename}")
logger.info(f"headers: {_headers}")
@@ -219,10 +246,17 @@ class FfmpegQueue(object):
logger.info(f"=== END COMMAND ===")
# m3u8 URL인 경우 다운로드 방법 설정에 따라 분기
if video_url.endswith('.m3u8'):
if video_url.endswith('.m3u8') or 'master.txt' in video_url:
# 다운로드 방법 설정 확인
download_method = P.ModelSetting.get(f"{self.name}_download_method")
# cdndania.com 감지 시 YtdlpDownloader 사용 (CDN 세션 쿠키 + Impersonate로 보안 우회)
if 'cdndania.com' in video_url:
logger.info("Detected cdndania.com URL - forcing YtdlpDownloader with cookies (CDN security bypass)")
download_method = "ytdlp"
logger.info(f"Download method: {download_method}")
# 다운로드 시작 전 카운트 증가
self.current_ffmpeg_count += 1
@@ -245,12 +279,17 @@ class FfmpegQueue(object):
# yt-dlp 사용
from .ytdlp_downloader import YtdlpDownloader
logger.info("Using yt-dlp downloader...")
# 엔티티에서 쿠키 파일 가져오기 (있는 경우)
_cookies_file = getattr(entity_ref, 'cookies_file', None)
downloader = YtdlpDownloader(
url=video_url,
output_path=output_file_ref,
headers=headers_ref,
callback=progress_callback
callback=progress_callback,
proxy=_proxy,
cookies_file=_cookies_file
)
else:
# 기본: HLS 다운로더 사용
from .hls_downloader import HlsDownloader
@@ -259,7 +298,8 @@ class FfmpegQueue(object):
m3u8_url=video_url,
output_path=output_file_ref,
headers=headers_ref,
callback=progress_callback
callback=progress_callback,
proxy=_proxy
)
success, message = downloader.download()
@@ -360,6 +400,7 @@ class FfmpegQueue(object):
max_pf_count=0,
save_path=ToolUtil.make_path(dirname),
timeout_minute=60,
proxy=_proxy,
)
#
# todo: 임시로 start() 중지

View File

@@ -8,17 +8,21 @@ import requests
import tempfile
import subprocess
import time
import logging
from urllib.parse import urljoin
logger = logging.getLogger(__name__)
class HlsDownloader:
"""HLS 다운로더 - .jpg 확장자 세그먼트 지원"""
def __init__(self, m3u8_url, output_path, headers=None, callback=None):
def __init__(self, m3u8_url, output_path, headers=None, callback=None, proxy=None):
self.m3u8_url = m3u8_url
self.output_path = output_path
self.headers = headers or {}
self.callback = callback # 진행 상황 콜백
self.proxy = proxy
self.segments = []
self.total_segments = 0
self.downloaded_segments = 0
@@ -31,12 +35,35 @@ class HlsDownloader:
self.last_bytes = 0
self.current_speed = 0 # bytes per second
def parse_m3u8(self):
"""m3u8 파일 파싱"""
response = requests.get(self.m3u8_url, headers=self.headers, timeout=30)
def parse_m3u8(self, url=None):
"""m3u8 파일 파싱 (Master Playlist 대응)"""
if url is None:
url = self.m3u8_url
proxies = None
if self.proxy:
proxies = {"http": self.proxy, "https": self.proxy}
logger.debug(f"Parsing m3u8: {url}")
response = requests.get(url, headers=self.headers, timeout=30, proxies=proxies)
content = response.text
base_url = self.m3u8_url.rsplit('/', 1)[0] + '/'
# Master Playlist 체크
if "#EXT-X-STREAM-INF" in content:
last_media_url = None
for line in content.strip().split('\n'):
line = line.strip()
if line and not line.startswith('#'):
if not line.startswith('http'):
last_media_url = urljoin(url, line)
else:
last_media_url = line
if last_media_url:
logger.info(f"Master playlist detected, following media playlist: {last_media_url}")
return self.parse_m3u8(last_media_url)
base_url = url.rsplit('/', 1)[0] + '/'
self.segments = []
for line in content.strip().split('\n'):
@@ -96,17 +123,22 @@ class HlsDownloader:
return False, "Cancelled"
# 세그먼트 다운로드
segment_path = os.path.join(temp_dir, f"segment_{i:05d}.ts")
segment_filename = f"segment_{i:05d}.ts"
segment_path = os.path.join(temp_dir, segment_filename)
try:
response = requests.get(segment_url, headers=self.headers, timeout=60)
proxies = None
if self.proxy:
proxies = {"http": self.proxy, "https": self.proxy}
response = requests.get(segment_url, headers=self.headers, timeout=60, proxies=proxies)
response.raise_for_status()
segment_data = response.content
with open(segment_path, 'wb') as f:
f.write(segment_data)
segment_files.append(segment_path)
segment_files.append(segment_filename) # 상대 경로 저장
self.downloaded_segments = i + 1
self.total_bytes += len(segment_data)
@@ -139,27 +171,28 @@ class HlsDownloader:
# 세그먼트 합치기 (concat 파일 생성)
concat_file = os.path.join(temp_dir, "concat.txt")
with open(concat_file, 'w') as f:
for seg_file in segment_files:
f.write(f"file '{seg_file}'\n")
for seg_filename in segment_files:
f.write(f"file '{seg_filename}'\n")
# 출력 디렉토리 생성
output_dir = os.path.dirname(self.output_path)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
# ffmpeg로 합치기
# ffmpeg로 합치기 (temp_dir에서 실행)
cmd = [
'ffmpeg', '-y',
'-f', 'concat',
'-safe', '0',
'-i', concat_file,
'-i', 'concat.txt',
'-c', 'copy',
self.output_path
os.path.abspath(self.output_path)
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600, cwd=temp_dir)
if result.returncode != 0:
logger.error(f"FFmpeg stderr: {result.stderr}")
return False, f"FFmpeg concat failed: {result.stderr}"
return True, "Download completed"

221
lib/playwright_anilife.py Normal file
View File

@@ -0,0 +1,221 @@
#!/usr/bin/env python3
"""
Playwright 기반 Anilife 비디오 URL 추출 스크립트
FlaskFarm의 gevent와 충돌을 피하기 위해 별도의 subprocess로 실행됩니다.
사용법:
python playwright_anilife.py <detail_url> <episode_num>
출력:
JSON 형식으로 _aldata 또는 에러 메시지 출력
"""
import sys
import json
import time
import re
def extract_aldata(detail_url: str, episode_num: str) -> dict:
"""Detail 페이지에서 에피소드를 클릭하고 _aldata를 추출합니다."""
try:
from playwright.sync_api import sync_playwright
except ImportError as e:
return {"error": f"Playwright not installed: {e}"}
result = {
"success": False,
"aldata": None,
"html": None,
"current_url": None,
"error": None,
"player_url": None
}
try:
with sync_playwright() as p:
# 시스템에 설치된 Chrome 사용
browser = p.chromium.launch(
headless=False, # visible 모드
channel="chrome", # 시스템 Chrome 사용
args=[
"--disable-blink-features=AutomationControlled",
"--disable-automation",
"--no-sandbox",
]
)
# 브라우저 컨텍스트 생성 (스텔스 설정)
context = browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
locale="ko-KR",
)
# navigator.webdriver 숨기기
context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
""")
page = context.new_page()
try:
# 1. Detail 페이지 방문
page.goto(detail_url, wait_until="domcontentloaded", timeout=30000)
time.sleep(2)
# 2. 에피소드 찾아서 클릭 (episode_num을 포함하는 provider 링크)
episode_clicked = False
# 스크롤하여 에피소드 목록 로드
page.mouse.wheel(0, 800)
time.sleep(1)
# JavaScript로 에피소드 링크 찾아 클릭
try:
episode_href = page.evaluate(f"""
(() => {{
const links = Array.from(document.querySelectorAll('a[href*="/ani/provider/"]'));
const ep = links.find(a => a.innerText.includes('{episode_num}'));
if (ep) {{
ep.click();
return ep.href;
}}
return null;
}})()
""")
if episode_href:
episode_clicked = True
time.sleep(2)
except Exception as e:
result["error"] = f"Episode click failed: {e}"
if not episode_clicked:
result["error"] = f"Episode {episode_num} not found"
result["html"] = page.content()
return result
# 3. Provider 페이지에서 player_guid 추출 (버튼 클릭 대신)
# moveCloudvideo() 또는 moveJawcloud() 함수에서 GUID 추출
try:
player_info = page.evaluate("""
(() => {
// 함수 소스에서 GUID 추출 시도
let playerUrl = null;
// moveCloudvideo 함수 확인
if (typeof moveCloudvideo === 'function') {
const funcStr = moveCloudvideo.toString();
// URL 패턴 찾기
const match = funcStr.match(/['"]([^'"]+\\/h\\/live[^'"]+)['"]/);
if (match) {
playerUrl = match[1];
}
}
// moveJawcloud 함수 확인
if (!playerUrl && typeof moveJawcloud === 'function') {
const funcStr = moveJawcloud.toString();
const match = funcStr.match(/['"]([^'"]+\\/h\\/live[^'"]+)['"]/);
if (match) {
playerUrl = match[1];
}
}
// 페이지 변수 확인
if (!playerUrl && typeof _player_guid !== 'undefined') {
playerUrl = '/h/live?p=' + _player_guid + '&player=jawcloud';
}
// onclick 속성에서 추출
if (!playerUrl) {
const btn = document.querySelector('a[onclick*="moveCloudvideo"], a[onclick*="moveJawcloud"]');
if (btn) {
const onclick = btn.getAttribute('onclick');
// 함수 이름 확인 후 페이지 소스에서 URL 추출
}
}
// 전역 변수 검색
if (!playerUrl) {
for (const key of Object.keys(window)) {
if (key.includes('player') || key.includes('guid')) {
const val = window[key];
if (typeof val === 'string' && val.match(/^[a-f0-9-]{36}$/)) {
playerUrl = '/h/live?p=' + val + '&player=jawcloud';
break;
}
}
}
}
// _aldata 직접 확인 (provider 페이지에 있을 수 있음)
if (typeof _aldata !== 'undefined') {
return { aldata: _aldata, playerUrl: null };
}
return { aldata: null, playerUrl: playerUrl };
})()
""")
if player_info.get("aldata"):
result["aldata"] = player_info["aldata"]
result["success"] = True
result["current_url"] = page.url
return result
result["player_url"] = player_info.get("playerUrl")
except Exception as e:
result["error"] = f"Player info extraction failed: {e}"
# 4. Player URL이 있으면 해당 페이지로 이동하여 _aldata 추출
if result.get("player_url"):
player_full_url = "https://anilife.live" + result["player_url"] if result["player_url"].startswith("/") else result["player_url"]
page.goto(player_full_url, wait_until="domcontentloaded", timeout=30000)
time.sleep(2)
# _aldata 추출
try:
aldata_value = page.evaluate("typeof _aldata !== 'undefined' ? _aldata : null")
if aldata_value:
result["aldata"] = aldata_value
result["success"] = True
except Exception as e:
pass
# 현재 URL 기록
result["current_url"] = page.url
# HTML에서 _aldata 패턴 추출 시도
if not result["aldata"]:
html = page.content()
# _aldata = "..." 패턴 찾기
aldata_match = re.search(r'_aldata\s*=\s*["\']([A-Za-z0-9+/=]+)["\']', html)
if aldata_match:
result["aldata"] = aldata_match.group(1)
result["success"] = True
else:
result["html"] = html
finally:
context.close()
browser.close()
except Exception as e:
result["error"] = str(e)
return result
if __name__ == "__main__":
if len(sys.argv) < 3:
print(json.dumps({"error": "Usage: python playwright_anilife.py <detail_url> <episode_num>"}))
sys.exit(1)
detail_url = sys.argv[1]
episode_num = sys.argv[2]
result = extract_aldata(detail_url, episode_num)
print(json.dumps(result, ensure_ascii=False))

181
lib/playwright_cdp.py Normal file
View File

@@ -0,0 +1,181 @@
#!/usr/bin/env python3
"""
Chrome 디버그 모드에 연결하여 Anilife 비디오 URL 추출
Detail 페이지 → 에피소드 클릭 → _aldata 추출 플로우
사용법:
1. Chrome 디버그 모드 실행:
/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --remote-debugging-port=9222 --user-data-dir=/tmp/chrome_debug
2. 스크립트 실행:
python playwright_cdp.py <detail_url> <episode_num>
"""
import sys
import json
import time
import re
def extract_aldata_via_cdp(detail_url: str, episode_num: str) -> dict:
"""Chrome DevTools Protocol로 연결하여 _aldata 추출"""
try:
from playwright.sync_api import sync_playwright
except ImportError as e:
return {"error": f"Playwright not installed: {e}"}
result = {
"success": False,
"aldata": None,
"html": None,
"current_url": None,
"error": None,
"vod_url": None
}
try:
with sync_playwright() as p:
# Chrome 디버그 포트에 연결
browser = p.chromium.connect_over_cdp("http://localhost:9222")
# 기존 컨텍스트 사용
contexts = browser.contexts
if not contexts:
context = browser.new_context()
else:
context = contexts[0]
# 새 페이지 열기
page = context.new_page()
try:
# 1. Detail 페이지로 이동
print(f"1. Navigating to detail page: {detail_url}", file=sys.stderr)
page.goto(detail_url, wait_until="domcontentloaded", timeout=30000)
time.sleep(2)
print(f" Current URL: {page.url}", file=sys.stderr)
# 2. 에피소드 목록으로 스크롤
page.mouse.wheel(0, 800)
time.sleep(1)
# 3. 해당 에피소드 찾아서 클릭
print(f"2. Looking for episode {episode_num}", file=sys.stderr)
# 에피소드 링크 찾기 (provider 링크 중에서)
episode_clicked = False
try:
# 방법 1: epl-num 클래스의 div에서 에피소드 번호 찾기
episode_link = page.locator(f'a:has(.epl-num:text("{episode_num}"))').first
if episode_link.is_visible(timeout=5000):
href = episode_link.get_attribute("href")
print(f" Found episode link: {href}", file=sys.stderr)
episode_link.click()
episode_clicked = True
time.sleep(3)
except Exception as e:
print(f" Method 1 failed: {e}", file=sys.stderr)
if not episode_clicked:
try:
# 방법 2: provider 링크들 중에서 에피소드 번호가 포함된 것 클릭
links = page.locator('a[href*="/ani/provider/"]').all()
for link in links:
text = link.inner_text()
if episode_num in text:
print(f" Found: {text}", file=sys.stderr)
link.click()
episode_clicked = True
time.sleep(3)
break
except Exception as e:
print(f" Method 2 failed: {e}", file=sys.stderr)
if not episode_clicked:
result["error"] = f"Episode {episode_num} not found"
result["html"] = page.content()
return result
# 4. Provider 페이지에서 _aldata 추출
print(f"3. Provider page URL: {page.url}", file=sys.stderr)
result["current_url"] = page.url
# _aldata 추출 시도
try:
aldata_value = page.evaluate("typeof _aldata !== 'undefined' ? _aldata : null")
if aldata_value:
result["aldata"] = aldata_value
result["success"] = True
print(f" SUCCESS! _aldata found: {aldata_value[:60]}...", file=sys.stderr)
return result
except Exception as js_err:
print(f" JS error: {js_err}", file=sys.stderr)
# HTML에서 _aldata 패턴 추출 시도
html = page.content()
aldata_match = re.search(r'_aldata\s*=\s*["\']([A-Za-z0-9+/=]+)["\']', html)
if aldata_match:
result["aldata"] = aldata_match.group(1)
result["success"] = True
print(f" SUCCESS! _aldata from HTML: {result['aldata'][:60]}...", file=sys.stderr)
return result
# 5. CloudVideo 버튼 클릭 시도
print("4. Trying CloudVideo button click...", file=sys.stderr)
try:
page.mouse.wheel(0, 500)
time.sleep(1)
cloudvideo_btn = page.locator('a[onclick*="moveCloudvideo"], a[onclick*="moveJawcloud"]').first
if cloudvideo_btn.is_visible(timeout=3000):
cloudvideo_btn.click()
time.sleep(3)
result["current_url"] = page.url
print(f" After click URL: {page.url}", file=sys.stderr)
# 플레이어 페이지에서 _aldata 추출
try:
aldata_value = page.evaluate("typeof _aldata !== 'undefined' ? _aldata : null")
if aldata_value:
result["aldata"] = aldata_value
result["success"] = True
print(f" SUCCESS! _aldata: {aldata_value[:60]}...", file=sys.stderr)
return result
except:
pass
# HTML에서 추출
html = page.content()
aldata_match = re.search(r'_aldata\s*=\s*["\']([A-Za-z0-9+/=]+)["\']', html)
if aldata_match:
result["aldata"] = aldata_match.group(1)
result["success"] = True
return result
result["html"] = html
except Exception as click_err:
print(f" Click error: {click_err}", file=sys.stderr)
result["html"] = page.content()
finally:
page.close()
except Exception as e:
result["error"] = str(e)
if "connect" in str(e).lower():
result["error"] = "Chrome 디버그 모드가 실행 중이 아닙니다."
return result
if __name__ == "__main__":
if len(sys.argv) < 3:
print(json.dumps({"error": "Usage: python playwright_cdp.py <detail_url> <episode_num>"}))
sys.exit(1)
detail_url = sys.argv[1]
episode_num = sys.argv[2]
result = extract_aldata_via_cdp(detail_url, episode_num)
print(json.dumps(result, ensure_ascii=False))

View File

@@ -16,11 +16,13 @@ logger = logging.getLogger(__name__)
class YtdlpDownloader:
"""yt-dlp 기반 다운로더"""
def __init__(self, url, output_path, headers=None, callback=None):
def __init__(self, url, output_path, headers=None, callback=None, proxy=None, cookies_file=None):
self.url = url
self.output_path = output_path
self.headers = headers or {}
self.callback = callback # 진행 상황 콜백
self.proxy = proxy
self.cookies_file = cookies_file # CDN 세션 쿠키 파일 경로
self.cancelled = False
self.process = None
self.error_output = [] # 에러 메시지 저장
@@ -30,6 +32,7 @@ class YtdlpDownloader:
self.current_speed = ""
self.elapsed_time = ""
self.percent = 0
def format_time(self, seconds):
"""시간을 읽기 좋은 형식으로 변환"""
@@ -57,12 +60,7 @@ class YtdlpDownloader:
return f"{bytes_per_sec / (1024 * 1024):.2f} MB/s"
def download(self):
"""yt-dlp Python 모듈로 다운로드 수행"""
try:
import yt_dlp
except ImportError:
return False, "yt-dlp를 찾을 수 없습니다. pip install yt-dlp 로 설치해주세요."
"""yt-dlp CLI를 통한 브라우저 흉내(Impersonate) 방식 다운로드 수행"""
try:
self.start_time = time.time()
@@ -71,86 +69,118 @@ class YtdlpDownloader:
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
# 진행률 콜백
def progress_hook(d):
# URL 전처리: 확장자 힌트(?dummy=.m3u8) 사용
# (m3u8: 접두사나 #.m3u8보다 호환성이 높음. HLS 인식 강제용)
current_url = self.url
if 'master.txt' in current_url:
concat_char = '&' if '?' in current_url else '?'
current_url = f"{current_url}{concat_char}dummy=.m3u8"
# 1. 기본 명령어 구성 (Impersonate & HLS 강제)
cmd = [
'yt-dlp',
'--newline',
'--no-playlist',
'--no-part',
'--hls-prefer-ffmpeg',
'--hls-use-mpegts',
'--no-check-certificate',
'--progress',
'--verbose', # 디버깅용 상세 로그
'--impersonate', 'chrome-120', # 정밀한 크롬-120 지문 사용
'--extractor-args', 'generic:force_hls', # HLS 강제 추출
'-o', self.output_path,
]
# 2. 프록시 설정
if self.proxy:
cmd += ['--proxy', self.proxy]
# 2.5 쿠키 파일 설정 (CDN 세션 인증용)
if self.cookies_file and os.path.exists(self.cookies_file):
cmd += ['--cookies', self.cookies_file]
logger.info(f"Using cookies file: {self.cookies_file}")
# 3. 필수 헤더 구성
# --impersonate가 기본적인 Sec-Fetch를 처리하지만,
# X-Requested-With와 정확한 Referer/Origin은 명시적으로 주는 것이 안전합니다.
has_referer = False
for k, v in self.headers.items():
if k.lower() == 'referer':
cmd += ['--referer', v]
has_referer = True
elif k.lower() == 'user-agent':
# impersonate가 설정한 UA를 명시적 UA로 덮어씀 (필요시)
cmd += ['--user-agent', v]
else:
cmd += ['--add-header', f"{k}:{v}"]
# cdndania 전용 헤더 보강
if 'cdndania.com' in current_url:
if not has_referer:
cmd += ['--referer', 'https://cdndania.com/']
cmd += ['--add-header', 'Origin:https://cdndania.com']
cmd += ['--add-header', 'X-Requested-With:XMLHttpRequest']
cmd.append(current_url)
logger.info(f"Executing refined browser-impersonated yt-dlp CLI (v16): {' '.join(cmd)}")
# 4. subprocess 실행 및 파싱
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1
)
# [download] 10.5% of ~100.00MiB at 2.45MiB/s
prog_re = re.compile(r'\[download\]\s+(?P<percent>[\d\.]+)%\s+of\s+.*?\s+at\s+(?P<speed>.*?)(\s+ETA|$)')
for line in self.process.stdout:
if self.cancelled:
raise Exception("Cancelled")
self.process.terminate()
return False, "Cancelled"
if d['status'] == 'downloading':
# 진행률 추출
total = d.get('total_bytes') or d.get('total_bytes_estimate') or 0
downloaded = d.get('downloaded_bytes', 0)
speed = d.get('speed', 0)
if total > 0:
self.percent = (downloaded / total) * 100
self.current_speed = self.format_speed(speed) if speed else ""
if self.start_time:
elapsed = time.time() - self.start_time
self.elapsed_time = self.format_time(elapsed)
# 콜백 호출
if self.callback:
self.callback(
percent=int(self.percent),
current=int(self.percent),
total=100,
speed=self.current_speed,
elapsed=self.elapsed_time
)
line = line.strip()
if not line: continue
elif d['status'] == 'finished':
logger.info(f"yt-dlp download finished: {d.get('filename', '')}")
match = prog_re.search(line)
if match:
try:
self.percent = float(match.group('percent'))
self.current_speed = match.group('speed').strip()
if self.start_time:
elapsed = time.time() - self.start_time
self.elapsed_time = self.format_time(elapsed)
if self.callback:
self.callback(percent=int(self.percent), current=int(self.percent), total=100, speed=self.current_speed, elapsed=self.elapsed_time)
except: pass
elif 'error' in line.lower() or 'security' in line.lower() or 'unable' in line.lower():
logger.warning(f"yt-dlp output notice: {line}")
self.error_output.append(line)
self.process.wait()
# yt-dlp 옵션 설정
ydl_opts = {
'outtmpl': self.output_path,
'progress_hooks': [progress_hook],
'quiet': False,
'no_warnings': False,
'noprogress': False,
}
# 헤더 추가
http_headers = {}
if self.headers:
if self.headers.get('Referer'):
http_headers['Referer'] = self.headers['Referer']
if self.headers.get('User-Agent'):
http_headers['User-Agent'] = self.headers['User-Agent']
if http_headers:
ydl_opts['http_headers'] = http_headers
logger.info(f"yt-dlp downloading: {self.url}")
logger.info(f"Output path: {self.output_path}")
logger.info(f"Headers: {http_headers}")
# 다운로드 실행
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([self.url])
# 파일 존재 확인
if os.path.exists(self.output_path):
if self.process.returncode == 0 and os.path.exists(self.output_path):
# 가짜 파일(보안 에러 텍스트) 체크
file_size = os.path.getsize(self.output_path)
if file_size < 2000:
try:
with open(self.output_path, 'r') as f:
text = f.read().lower()
if "security error" in text or not text:
os.remove(self.output_path)
return False, f"CDN 보안 차단(가짜 파일 다운로드됨: {file_size}B)"
except: pass
return True, "Download completed"
else:
# yt-dlp가 확장자를 변경했을 수 있음
base_name = os.path.splitext(self.output_path)[0]
for ext in ['.mp4', '.mkv', '.webm', '.ts']:
possible_path = base_name + ext
if os.path.exists(possible_path):
if possible_path != self.output_path:
os.rename(possible_path, self.output_path)
return True, "Download completed"
return False, "Output file not found"
except Exception as e:
error_msg = str(e)
logger.error(f"yt-dlp download error: {error_msg}")
error_msg = "\n".join(self.error_output[-3:]) if self.error_output else f"Exit code {self.process.returncode}"
return False, f"yt-dlp 실패: {error_msg}"
except Exception as e:
logger.error(f"yt-dlp download exception: {e}")
return False, f"yt-dlp download exception: {str(e)}"
def cancel(self):
"""다운로드 취소"""