import threading
import traceback
from datetime import datetime, timedelta
from random import randint
from apscheduler.jobstores.base import JobLookupError
from apscheduler.triggers.cron import CronTrigger
from pytz import timezone
class Scheduler(object):
job_list = []
first_run_check_thread = None
def __init__(self, frame):
self.frame = frame
self.logger = frame.logger
try:
if frame.config['use_gevent']:
from apscheduler.schedulers.gevent import GeventScheduler
self.sched = GeventScheduler(timezone='Asia/Seoul')
else:
raise Exception('')
except:
from apscheduler.schedulers.background import BackgroundScheduler
self.sched = BackgroundScheduler(timezone='Asia/Seoul')
self.sched.start()
self.logger.info('SCHEDULER start..')
def first_run_check_thread_function(self):
try:
#time.sleep(60)
#for i in range(5):
flag_exit = True
for job_instance in self.job_list:
if not job_instance.run:
continue
if job_instance.count == 0 and not job_instance.is_running and job_instance.is_interval:
#if job_instance.count == 0 and not job_instance.is_running:
job = self.sched.get_job(job_instance.job_id)
if job is not None:
self.logger.warning('job_instance : %s', job_instance.plugin)
self.logger.warning('XX job re-sched:%s', job)
flag_exit = False
tmp = randint(1, 20)
job.modify(next_run_time=datetime.now(timezone('Asia/Seoul')) + timedelta(seconds=tmp))
#break
else:
pass
if flag_exit:
self.remove_job("scheduler_check")
#time.sleep(30)
except Exception as exception:
self.logger.error('Exception:%s', exception)
self.logger.error(traceback.format_exc())
def shutdown(self):
self.sched.shutdown()
def kill_scheduler(self, job_id):
try:
self.sched.remove_job(job_id)
except JobLookupError as err:
self.logger.debug("fail to stop Scheduler: {err}".format(err=err))
self.logger.debug(traceback.format_exc())
def add_job_instance(self, job_instance, run=True):
if self.frame.config['run_flask']:
if not self.is_include(job_instance.job_id):
job_instance.run = run
Scheduler.job_list.append(job_instance)
if job_instance.is_interval:
self.sched.add_job(job_instance.job_function, 'interval', minutes=job_instance.interval, seconds=job_instance.interval_seconds, id=job_instance.job_id, args=(None))
elif job_instance.is_cron:
self.sched.add_job(job_instance.job_function, CronTrigger.from_crontab(job_instance.interval), id=job_instance.job_id, args=(None))
job = self.sched.get_job(job_instance.job_id)
if run and job_instance.is_interval:
tmp = randint(5, 20)
job.modify(next_run_time=datetime.now(timezone('Asia/Seoul')) + timedelta(seconds=tmp))
def execute_job(self, job_id):
self.logger.debug('execute_job:%s', job_id)
job = self.sched.get_job(job_id)
tmp = randint(5, 20)
job.modify(next_run_time=datetime.now(timezone('Asia/Seoul')) + timedelta(seconds=tmp))
def is_include(self, job_id):
job = self.sched.get_job(job_id)
return (job is not None)
def remove_job(self, job_id):
try:
if self.is_include(job_id):
self.sched.remove_job(job_id)
job = self.get_job_instance(job_id)
if not job.is_running:
self.remove_job_instance(job_id)
self.logger.debug('remove job_id:%s', job_id)
return True
except JobLookupError as err:
self.logger.debug("fail to remove Scheduler: {err}".format(err=err))
self.logger.debug(traceback.format_exc())
return False
def get_job_instance(self, job_id):
for job in Scheduler.job_list:
if job.job_id == job_id:
return job
def is_running(self, job_id):
job = self.get_job_instance(job_id)
if job is None:
return False
else:
return job.is_running
# job에서만 호출한다..
def remove_job_instance(self, job_id):
# function이 실행중일때 제거하면..
# 실행중이나 목록에서 빠져버린다..
for job in Scheduler.job_list:
if job.job_id == job_id:
Scheduler.job_list.remove(job)
self.logger.debug('remove_job_instance : %s', job_id)
break
def get_job_list_info(self):
ret = []
idx = 0
job_list = self.sched.get_jobs()
#logger.debug('len jobs %s %s', len(jobs), len(Scheduler.job_list))
for j in job_list:
idx += 1
entity = {}
entity['no'] = idx
entity['id'] = j.id
entity['next_run_time'] = j.next_run_time.strftime('%m-%d %H:%M:%S')
remain = (j.next_run_time - datetime.now(timezone('Asia/Seoul')))
tmp = ''
if remain.days > 0:
tmp += '%s일 ' % (remain.days)
remain = remain.seconds
if remain//3600 > 0:
tmp += '%s시간 ' % (remain//3600)
remain = remain % 3600
if remain // 60 > 0:
tmp += '%s분 ' % (remain//60)
tmp += '%s초' % (remain%60)
#entity['remain_time'] = (j.next_run_time - datetime.now(timezone('Asia/Seoul'))).seconds
entity['remain_time'] = tmp
job = self.get_job_instance(j.id)
if job is not None:
entity['count'] = job.count
entity['plugin'] = job.plugin
if job.is_cron:
entity['interval'] = job.interval
elif job.interval == 9999:
entity['interval'] = '항상 실행'
entity['remain_time'] = ''
else:
entity['interval'] = '%s분 %s초' % (job.interval, job.interval_seconds)
entity['is_running'] = job.is_running
entity['description'] = job.description
entity['running_timedelta'] = job.running_timedelta.seconds if job.running_timedelta is not None else '-'
entity['make_time'] = job.make_time.strftime('%m-%d %H:%M:%S')
entity['run'] = job.run
else:
entity['count'] = ''
entity['plugin'] = ''
entity['interval'] = ''
entity['is_running'] = ''
entity['description'] = ''
entity['running_timedelta'] = ''
entity['make_time'] = ''
entity['run'] = True
ret.append(entity)
return ret
class Job(object):
def __init__(self, plugin, job_id, interval, target_function, description, args=None):
self.plugin = plugin
self.job_id = job_id
self.interval = '%s' % interval
self.interval_seconds = randint(1, 59)
self.target_function = target_function
self.description = description
self.is_running = False
self.thread = None
self.start_time = None
self.end_time = None
self.running_timedelta = None
self.status = None
self.count = 0
self.make_time = datetime.now(timezone('Asia/Seoul'))
if len(self.interval.strip().split(' ')) == 5:
self.is_cron = True
self.is_interval = False
else:
self.is_cron = False
self.is_interval = True
if self.is_interval:
if isinstance(self.interval, str):
self.interval = int(self.interval)
self.args = args
# true이고 interval이면 바로 실행
# false이면 스케쥴링시간이 되면 실행
# add_job_instance에서 true이면 20초 이내에 실행하려고 함.
# false이면 넣을 때는 실행하지 않고 다음 주기때 실행
self.run = True
def job_function(self):
try:
from framework import F
self.is_running = True
self.start_time = datetime.now(timezone('Asia/Seoul'))
if self.args is None:
self.thread = threading.Thread(target=self.target_function, args=())
else:
self.thread = threading.Thread(target=self.target_function, args=(self.args,))
self.thread.daemon = True
self.thread.start()
F.socketio.emit('notify', {'type':'success', 'msg':f"{self.description}
작업을 시작합니다." }, namespace='/framework', broadcast=True)
self.thread.join()
F.socketio.emit('notify', {'type':'success', 'msg':f"{self.description}
작업이 종료되었습니다." }, namespace='/framework', broadcast=True)
self.end_time = datetime.now(timezone('Asia/Seoul'))
self.running_timedelta = self.end_time - self.start_time
self.status = 'success'
if not F.scheduler.is_include(self.job_id):
F.scheduler.remove_job_instance(self.job_id)
self.count += 1
except Exception as exception:
self.status = 'exception'
F.logger.error('Exception:%s', exception)
F.logger.error(traceback.format_exc())
finally:
self.is_running = False