v0.5.0: Zendriver Daemon optimization, Python 3.14 support, and UI/UX improvements
This commit is contained in:
379
mod_ohli24.py
379
mod_ohli24.py
@@ -108,6 +108,108 @@ class LogicOhli24(AnimeModuleBase):
|
||||
download_queue = None
|
||||
download_thread = None
|
||||
current_download_count = 0
|
||||
zendriver_setup_done = False # Zendriver 자동 설치 완료 플래그
|
||||
zendriver_daemon_process = None # Zendriver 데몬 프로세스
|
||||
zendriver_daemon_port = 19876
|
||||
|
||||
@classmethod
|
||||
def ensure_zendriver_installed(cls) -> bool:
|
||||
"""Zendriver 패키지 확인 및 자동 설치"""
|
||||
if cls.zendriver_setup_done:
|
||||
return True
|
||||
|
||||
import importlib.util
|
||||
import subprocess as sp
|
||||
|
||||
# 라이브러리 존재 확인
|
||||
lib_exists = importlib.util.find_spec("zendriver") is not None
|
||||
if lib_exists:
|
||||
cls.zendriver_setup_done = True
|
||||
return True
|
||||
|
||||
# 자동 설치 시도
|
||||
try:
|
||||
logger.info("[Zendriver] Not found, installing via pip...")
|
||||
cmd = [sys.executable, "-m", "pip", "install", "zendriver", "-q"]
|
||||
result = sp.run(cmd, capture_output=True, text=True, timeout=120)
|
||||
|
||||
if result.returncode == 0:
|
||||
cls.zendriver_setup_done = True
|
||||
logger.info("[Zendriver] Successfully installed")
|
||||
return True
|
||||
else:
|
||||
logger.warning(f"[Zendriver] Installation failed: {result.stderr[:200]}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"[Zendriver] Installation error: {e}")
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def start_zendriver_daemon(cls) -> bool:
|
||||
"""Zendriver 데몬 시작"""
|
||||
if cls.is_zendriver_daemon_running():
|
||||
logger.debug("[ZendriverDaemon] Already running")
|
||||
return True
|
||||
|
||||
if not cls.ensure_zendriver_installed():
|
||||
return False
|
||||
|
||||
try:
|
||||
import subprocess
|
||||
daemon_script = os.path.join(os.path.dirname(__file__), "lib", "zendriver_daemon.py")
|
||||
|
||||
if not os.path.exists(daemon_script):
|
||||
logger.warning("[ZendriverDaemon] Daemon script not found")
|
||||
return False
|
||||
|
||||
# 데몬 프로세스 시작 (백그라운드)
|
||||
cls.zendriver_daemon_process = subprocess.Popen(
|
||||
[sys.executable, daemon_script],
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
start_new_session=True
|
||||
)
|
||||
|
||||
# 시작 대기
|
||||
import time
|
||||
for _ in range(10):
|
||||
time.sleep(0.5)
|
||||
if cls.is_zendriver_daemon_running():
|
||||
logger.info(f"[ZendriverDaemon] Started on port {cls.zendriver_daemon_port}")
|
||||
return True
|
||||
|
||||
logger.warning("[ZendriverDaemon] Failed to start (timeout)")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[ZendriverDaemon] Start error: {e}")
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def is_zendriver_daemon_running(cls) -> bool:
|
||||
"""데몬 실행 상태 확인"""
|
||||
try:
|
||||
import requests
|
||||
resp = requests.get(f"http://127.0.0.1:{cls.zendriver_daemon_port}/health", timeout=1)
|
||||
return resp.status_code == 200
|
||||
except:
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def fetch_via_daemon(cls, url: str, timeout: int = 30) -> dict:
|
||||
"""데몬을 통한 HTML 페칭 (빠름)"""
|
||||
try:
|
||||
import requests
|
||||
resp = requests.post(
|
||||
f"http://127.0.0.1:{cls.zendriver_daemon_port}/fetch",
|
||||
json={"url": url, "timeout": timeout},
|
||||
timeout=timeout + 5
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
return resp.json()
|
||||
return {"success": False, "error": f"HTTP {resp.status_code}"}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
def __init__(self, P: Any) -> None:
|
||||
self.name: str = name
|
||||
@@ -499,13 +601,18 @@ class LogicOhli24(AnimeModuleBase):
|
||||
# Fallback to base class for common subs (setting_save, queue_command, entity_list, etc.)
|
||||
return super().process_ajax(sub, req)
|
||||
|
||||
def get_episode(self, clip_id):
|
||||
for _ in self.current_data["episode"]:
|
||||
if _["title"] == clip_id:
|
||||
return _
|
||||
def get_episode(self, clip_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""클립 ID로 에피소드 정보 조회."""
|
||||
for ep in self.current_data["episode"]:
|
||||
if ep["title"] == clip_id:
|
||||
return ep
|
||||
return None
|
||||
|
||||
def process_command(self, command, arg1, arg2, arg3, req):
|
||||
ret = {"ret": "success"}
|
||||
def process_command(
|
||||
self, command: str, arg1: str, arg2: str, arg3: str, req: Any
|
||||
) -> Any:
|
||||
"""커맨드 처리."""
|
||||
ret: Dict[str, Any] = {"ret": "success"}
|
||||
|
||||
if command == "download_program":
|
||||
_pass = arg2
|
||||
@@ -532,8 +639,9 @@ class LogicOhli24(AnimeModuleBase):
|
||||
return super().process_command(command, arg1, arg2, arg3, req)
|
||||
|
||||
@staticmethod
|
||||
def add_whitelist(*args):
|
||||
ret = {}
|
||||
def add_whitelist(*args: str) -> Dict[str, Any]:
|
||||
"""화이트리스트에 코드 추가."""
|
||||
ret: Dict[str, Any] = {}
|
||||
|
||||
logger.debug(f"args: {args}")
|
||||
try:
|
||||
@@ -577,13 +685,14 @@ class LogicOhli24(AnimeModuleBase):
|
||||
ret["log"] = str(e)
|
||||
return ret
|
||||
|
||||
def setting_save_after(self, change_list):
|
||||
def setting_save_after(self, change_list: List[str]) -> None:
|
||||
"""설정 저장 후 처리."""
|
||||
if self.queue.get_max_ffmpeg_count() != P.ModelSetting.get_int("ohli24_max_ffmpeg_process_count"):
|
||||
self.queue.set_max_ffmpeg_count(P.ModelSetting.get_int("ohli24_max_ffmpeg_process_count"))
|
||||
|
||||
def scheduler_function(self):
|
||||
# Todo: 스케쥴링 함수 미구현
|
||||
logger.debug(f"ohli24 scheduler_function::=========================")
|
||||
def scheduler_function(self) -> None:
|
||||
"""스케줄러 함수 - 자동 다운로드 처리."""
|
||||
logger.debug("ohli24 scheduler_function::=========================")
|
||||
|
||||
content_code_list = P.ModelSetting.get_list("ohli24_auto_code_list", "|")
|
||||
logger.debug(f"content_code_list::: {content_code_list}")
|
||||
@@ -964,8 +1073,9 @@ class LogicOhli24(AnimeModuleBase):
|
||||
P.logger.error(traceback.format_exc())
|
||||
return {"ret": "error", "log": str(e)}
|
||||
|
||||
def get_anime_info(self, cate, page):
|
||||
print(cate, page)
|
||||
def get_anime_info(self, cate: str, page: str) -> Dict[str, Any]:
|
||||
"""카테고리별 애니메이션 목록 조회."""
|
||||
logger.debug(f"get_anime_info: cate={cate}, page={page}")
|
||||
try:
|
||||
if cate == "ing":
|
||||
url = P.ModelSetting.get("ohli24_url") + "/bbs/board.php?bo_table=" + cate + "&page=" + page
|
||||
@@ -1038,7 +1148,8 @@ class LogicOhli24(AnimeModuleBase):
|
||||
return {"ret": "error", "log": str(e)}
|
||||
|
||||
# @staticmethod
|
||||
def get_search_result(self, query, page, cate):
|
||||
def get_search_result(self, query: str, page: str, cate: str) -> Dict[str, Any]:
|
||||
"""검색 결과 조회."""
|
||||
try:
|
||||
_query = urllib.parse.quote(query)
|
||||
url = (
|
||||
@@ -1144,6 +1255,13 @@ class LogicOhli24(AnimeModuleBase):
|
||||
|
||||
# 잔여 Temp 폴더 정리
|
||||
self.cleanup_stale_temps()
|
||||
|
||||
# Zendriver 데몬 시작 (백그라운드)
|
||||
try:
|
||||
from threading import Thread
|
||||
Thread(target=LogicOhli24.start_zendriver_daemon, daemon=True).start()
|
||||
except Exception as daemon_err:
|
||||
logger.debug(f"[ZendriverDaemon] Auto-start skipped: {daemon_err}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Exception:%s", e)
|
||||
@@ -1192,17 +1310,33 @@ class LogicOhli24(AnimeModuleBase):
|
||||
pass
|
||||
|
||||
def fetch_url_with_cffi(url, headers, timeout, data, method):
|
||||
"""별도 스레드에서 curl_cffi로 실행"""
|
||||
"""별도 스레드에서 curl_cffi로 실행 (Chrome 124 impersonation + Enhanced Headers)"""
|
||||
from curl_cffi import requests
|
||||
|
||||
# 프록시 설정
|
||||
proxies = LogicOhli24.get_proxies()
|
||||
|
||||
with requests.Session(impersonate="chrome120") as session:
|
||||
# Chrome 124 impersonation (최신 안정 버전)
|
||||
with requests.Session(impersonate="chrome124") as session:
|
||||
# 헤더 설정
|
||||
if headers:
|
||||
session.headers.update(headers)
|
||||
|
||||
# 추가 보안 헤더 (Cloudflare 우회용)
|
||||
enhanced_headers = {
|
||||
"sec-ch-ua": '"Not_A Brand";v="8", "Chromium";v="124", "Google Chrome";v="124"',
|
||||
"sec-ch-ua-mobile": "?0",
|
||||
"sec-ch-ua-platform": '"macOS"',
|
||||
"sec-fetch-dest": "document",
|
||||
"sec-fetch-mode": "navigate",
|
||||
"sec-fetch-site": "none",
|
||||
"sec-fetch-user": "?1",
|
||||
"upgrade-insecure-requests": "1",
|
||||
"dnt": "1",
|
||||
"cache-control": "max-age=0",
|
||||
}
|
||||
session.headers.update(enhanced_headers)
|
||||
|
||||
if method.upper() == 'POST':
|
||||
response = session.post(url, data=data, timeout=timeout, proxies=proxies)
|
||||
else:
|
||||
@@ -1213,9 +1347,10 @@ class LogicOhli24(AnimeModuleBase):
|
||||
|
||||
if headers is None:
|
||||
headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
|
||||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
|
||||
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
|
||||
"Accept-Encoding": "gzip, deflate, br, zstd",
|
||||
}
|
||||
|
||||
if referer:
|
||||
@@ -1230,33 +1365,155 @@ class LogicOhli24(AnimeModuleBase):
|
||||
headers["Referer"] = referer
|
||||
elif "Referer" not in headers and "referer" not in headers:
|
||||
headers["Referer"] = "https://ani.ohli24.com"
|
||||
|
||||
|
||||
max_retries = 3
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
logger.debug(f"get_html (curl_cffi in thread) {method} attempt {attempt + 1}: {url}")
|
||||
|
||||
# ThreadPoolExecutor로 별도 스레드에서 실행
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
future = executor.submit(fetch_url_with_cffi, url, headers, timeout, data, method)
|
||||
response_data = future.result(timeout=timeout + 10)
|
||||
|
||||
if response_data and (len(response_data) > 10 or method.upper() == 'POST'):
|
||||
logger.debug(f"get_html success, length: {len(response_data)}")
|
||||
return response_data
|
||||
# === [TEST MODE] Layer 1, 2 일시 비활성화 - Layer 3, 4만 테스트 ===
|
||||
response_data = "" # 바로 Layer 3로 이동
|
||||
|
||||
# max_retries = 3
|
||||
# for attempt in range(max_retries):
|
||||
# try:
|
||||
# logger.debug(f"get_html (curl_cffi in thread) {method} attempt {attempt + 1}: {url}")
|
||||
#
|
||||
# # ThreadPoolExecutor로 별도 스레드에서 실행
|
||||
# with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
# future = executor.submit(fetch_url_with_cffi, url, headers, timeout, data, method)
|
||||
# response_data = future.result(timeout=timeout + 10)
|
||||
#
|
||||
# if response_data and (len(response_data) > 10 or method.upper() == 'POST'):
|
||||
# logger.debug(f"get_html success, length: {len(response_data)}")
|
||||
# return response_data
|
||||
# else:
|
||||
# logger.warning(f"Short response (len={len(response_data) if response_data else 0})")
|
||||
#
|
||||
# except FuturesTimeoutError:
|
||||
# logger.warning(f"get_html attempt {attempt + 1} timed out")
|
||||
# except Exception as e:
|
||||
# logger.warning(f"get_html attempt {attempt + 1} failed: {e}")
|
||||
#
|
||||
# if attempt < max_retries - 1:
|
||||
# time.sleep(3)
|
||||
|
||||
# # --- Layer 2: Cloudscraper Fallback (가벼운 JS 챌린지 해결) ---
|
||||
# if not response_data or len(response_data) < 10:
|
||||
# logger.info(f"[Layer2] curl_cffi failed, trying cloudscraper: {url}")
|
||||
# try:
|
||||
# import cloudscraper
|
||||
# scraper = cloudscraper.create_scraper(
|
||||
# browser={"browser": "chrome", "platform": "darwin", "mobile": False}
|
||||
# )
|
||||
#
|
||||
# if method.upper() == 'POST':
|
||||
# cs_response = scraper.post(url, data=data, headers=headers, timeout=timeout)
|
||||
# else:
|
||||
# cs_response = scraper.get(url, headers=headers, timeout=timeout)
|
||||
#
|
||||
# if cs_response and cs_response.text and len(cs_response.text) > 10:
|
||||
# logger.info(f"[Layer2] Cloudscraper success, HTML len: {len(cs_response.text)}")
|
||||
# return cs_response.text
|
||||
# else:
|
||||
# logger.warning(f"[Layer2] Cloudscraper short response: {len(cs_response.text) if cs_response else 0}")
|
||||
#
|
||||
# except Exception as e:
|
||||
# logger.warning(f"[Layer2] Cloudscraper failed: {e}")
|
||||
|
||||
|
||||
# --- Layer 3A: Zendriver Daemon (빠름 - 브라우저 상시 대기) ---
|
||||
if not response_data or len(response_data) < 10:
|
||||
if LogicOhli24.is_zendriver_daemon_running():
|
||||
logger.info(f"[Layer3A] Trying Zendriver Daemon: {url}")
|
||||
daemon_result = LogicOhli24.fetch_via_daemon(url, timeout)
|
||||
if daemon_result.get("success") and daemon_result.get("html"):
|
||||
logger.info(f"[Layer3A] Daemon success in {daemon_result.get('elapsed', '?')}s, HTML len: {len(daemon_result['html'])}")
|
||||
return daemon_result["html"]
|
||||
else:
|
||||
logger.warning(f"Short response (len={len(response_data) if response_data else 0})")
|
||||
|
||||
except FuturesTimeoutError:
|
||||
logger.warning(f"get_html attempt {attempt + 1} timed out")
|
||||
except Exception as e:
|
||||
logger.warning(f"get_html attempt {attempt + 1} failed: {e}")
|
||||
logger.warning(f"[Layer3A] Daemon failed: {daemon_result.get('error', 'Unknown')}")
|
||||
|
||||
# --- Layer 3B: Zendriver Subprocess Fallback (데몬 실패 시) ---
|
||||
if not response_data or len(response_data) < 10:
|
||||
logger.info(f"[Layer3B] Trying Zendriver subprocess: {url}")
|
||||
|
||||
if attempt < max_retries - 1:
|
||||
time.sleep(3)
|
||||
# Zendriver 자동 설치 확인
|
||||
if not LogicOhli24.ensure_zendriver_installed():
|
||||
logger.warning("[Layer3B] Zendriver installation failed, skipping to Layer 4")
|
||||
else:
|
||||
try:
|
||||
import subprocess
|
||||
import contextlib
|
||||
script_path = os.path.join(os.path.dirname(__file__), "lib", "zendriver_ohli24.py")
|
||||
|
||||
# gevent fork 경고 억제 (부모 프로세스 stderr 임시 리다이렉트)
|
||||
with open(os.devnull, 'w') as devnull:
|
||||
old_stderr = sys.stderr
|
||||
sys.stderr = devnull
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[sys.executable, script_path, url, str(timeout)],
|
||||
capture_output=False,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.DEVNULL,
|
||||
text=True,
|
||||
timeout=timeout + 30
|
||||
)
|
||||
finally:
|
||||
sys.stderr = old_stderr
|
||||
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
zd_result = json.loads(result.stdout.strip())
|
||||
if zd_result.get("success") and zd_result.get("html"):
|
||||
logger.info(f"[Layer3B] Zendriver success in {zd_result.get('elapsed', '?')}s, HTML len: {len(zd_result['html'])}")
|
||||
return zd_result["html"]
|
||||
else:
|
||||
logger.warning(f"[Layer3B] Zendriver failed: {zd_result.get('error', 'Unknown error')}")
|
||||
else:
|
||||
logger.warning(f"[Layer3B] Zendriver subprocess failed")
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning(f"[Layer3B] Zendriver timed out after {timeout + 30}s")
|
||||
except Exception as e:
|
||||
logger.warning(f"[Layer3B] Zendriver exception: {e}")
|
||||
|
||||
# --- Layer 4: Camoufox Fallback (최후의 수단 - 풀 Firefox 브라우저) ---
|
||||
if not response_data or len(response_data) < 10:
|
||||
logger.info(f"[Layer4] Zendriver failed, trying Camoufox: {url}")
|
||||
try:
|
||||
import subprocess
|
||||
script_path = os.path.join(os.path.dirname(__file__), "lib", "camoufox_ohli24.py")
|
||||
|
||||
# gevent fork 경고 억제 (부모 프로세스 stderr 임시 리다이렉트)
|
||||
with open(os.devnull, 'w') as devnull:
|
||||
old_stderr = sys.stderr
|
||||
sys.stderr = devnull
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[sys.executable, script_path, url, str(timeout)],
|
||||
capture_output=False,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.DEVNULL,
|
||||
text=True,
|
||||
timeout=timeout + 30
|
||||
)
|
||||
finally:
|
||||
sys.stderr = old_stderr
|
||||
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
cf_result = json.loads(result.stdout.strip())
|
||||
if cf_result.get("success") and cf_result.get("html"):
|
||||
logger.info(f"[Layer4] Camoufox success in {cf_result.get('elapsed', '?')}s, HTML len: {len(cf_result['html'])}")
|
||||
return cf_result["html"]
|
||||
else:
|
||||
logger.warning(f"[Layer4] Camoufox failed: {cf_result.get('error', 'Unknown error')}")
|
||||
else:
|
||||
logger.warning(f"[Layer4] Camoufox subprocess failed")
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning(f"[Layer4] Camoufox timed out after {timeout + 30}s")
|
||||
except Exception as e:
|
||||
logger.warning(f"[Layer4] Camoufox exception: {e}")
|
||||
|
||||
return response_data
|
||||
|
||||
|
||||
#########################################################
|
||||
def add(self, episode_info: Dict[str, Any]) -> str:
|
||||
"""Add episode to download queue with early skip checks."""
|
||||
@@ -1357,14 +1614,17 @@ class LogicOhli24(AnimeModuleBase):
|
||||
season_val = int(match.group("season")) if match and match.group("season") else 1
|
||||
savepath = os.path.join(savepath, "Season %s" % season_val)
|
||||
|
||||
# Use glob to find any matching file
|
||||
full_pattern = os.path.join(savepath, filename_pattern)
|
||||
matching_files = glob.glob(full_pattern)
|
||||
|
||||
if matching_files:
|
||||
# Return first matching file
|
||||
logger.debug(f"Found existing file: {matching_files[0]}")
|
||||
return matching_files[0]
|
||||
# Use case-insensitive matching to find any existing file
|
||||
# (prevents duplicate downloads for 1080P vs 1080p)
|
||||
if os.path.isdir(savepath):
|
||||
import fnmatch
|
||||
pattern_basename = os.path.basename(filename_pattern)
|
||||
for fname in os.listdir(savepath):
|
||||
# Case-insensitive fnmatch
|
||||
if fnmatch.fnmatch(fname.lower(), pattern_basename.lower()):
|
||||
matched_path = os.path.join(savepath, fname)
|
||||
logger.debug(f"Found existing file (case-insensitive): {matched_path}")
|
||||
return matched_path
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.debug(f"_predict_filepath error: {e}")
|
||||
@@ -1909,7 +2169,9 @@ class Ohli24QueueEntity(AnimeQueueEntity):
|
||||
P.logger.error("Exception:%s", e)
|
||||
P.logger.error(traceback.format_exc())
|
||||
|
||||
def extract_video_from_cdndania(self, iframe_src, referer_url):
|
||||
def extract_video_from_cdndania(
|
||||
self, iframe_src: str, referer_url: str
|
||||
) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[int]]:
|
||||
"""cdndania.com 플레이어에서 API 호출을 통해 비디오(m3u8) 및 자막(vtt) URL 추출
|
||||
|
||||
Returns:
|
||||
@@ -2215,7 +2477,8 @@ class ModelOhli24Item(ModelBase):
|
||||
self.P.logger.error(traceback.format_exc())
|
||||
|
||||
@classmethod
|
||||
def get_by_id(cls, id):
|
||||
def get_by_id(cls, id: int) -> Optional["ModelOhli24Item"]:
|
||||
"""아이디로 아이템 조회."""
|
||||
try:
|
||||
with F.app.app_context():
|
||||
return F.db.session.query(cls).filter_by(id=int(id)).first()
|
||||
@@ -2224,7 +2487,8 @@ class ModelOhli24Item(ModelBase):
|
||||
cls.P.logger.error(traceback.format_exc())
|
||||
|
||||
@classmethod
|
||||
def get_by_ohli24_id(cls, ohli24_id):
|
||||
def get_by_ohli24_id(cls, ohli24_id: str) -> Optional["ModelOhli24Item"]:
|
||||
"""오리24 ID로 아이템 조회."""
|
||||
try:
|
||||
with F.app.app_context():
|
||||
return F.db.session.query(cls).filter_by(ohli24_id=ohli24_id).first()
|
||||
@@ -2233,14 +2497,16 @@ class ModelOhli24Item(ModelBase):
|
||||
cls.P.logger.error(traceback.format_exc())
|
||||
|
||||
@classmethod
|
||||
def delete_by_id(cls, idx):
|
||||
def delete_by_id(cls, idx: int) -> bool:
|
||||
"""ID로 아이템 삭제."""
|
||||
db.session.query(cls).filter_by(id=idx).delete()
|
||||
db.session.commit()
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def web_list(cls, req):
|
||||
ret = {}
|
||||
def web_list(cls, req: Any) -> Dict[str, Any]:
|
||||
"""웹 목록 조회."""
|
||||
ret: Dict[str, Any] = {}
|
||||
page = int(req.form["page"]) if "page" in req.form else 1
|
||||
page_size = 30
|
||||
job_id = ""
|
||||
@@ -2256,7 +2522,10 @@ class ModelOhli24Item(ModelBase):
|
||||
return ret
|
||||
|
||||
@classmethod
|
||||
def make_query(cls, search="", order="desc", option="all"):
|
||||
def make_query(
|
||||
cls, search: str = "", order: str = "desc", option: str = "all"
|
||||
) -> Any:
|
||||
"""쿼리 생성."""
|
||||
query = db.session.query(cls)
|
||||
if search is not None and search != "":
|
||||
if search.find("|") != -1:
|
||||
|
||||
Reference in New Issue
Block a user