django-apschedule定时任务异常停止
背景
在django项目中使用django-apschedule
来实现定时任务,使用的是BackgroundScheduler
调度类,该调度的实现是通过后台线程的方式执行定时任务。其中任务都是持久化到数据库中的。
在项目的运行过程中,因为数据库的异常,导致定时任务线程异常终止,即使数据库后续恢复正常,但也不再继续执行。我多次尝试复现未果,在开启定时任务期间,手动将数据库连接断开,定时任务执行失败,然后再将数据库建立连接,定时任务竟然重新恢复了,这让我一时摸不着头脑。
具体的错误日志如下,通过分析,是update_job连接数据库异常,没有任何捕获机制,然后层层网上抛,最终导致线程停止,可以很肯定的是,绝对是因为数据库连接失败导致的定时任务失败,那为什么无法复现呢?
Traceback (most recent call last):
File "/usr/local/python3/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/local/python3/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/python3/lib/python3.7/site-packages/apscheduler/schedulers/blocking.py", line 32, in _main_loop
wait_seconds = self._process_jobs()
File "/usr/local/python3/lib/python3.7/site-packages/apscheduler/schedulers/base.py", line 1009, in _process_jobs
jobstore.update_job(job)
File "/usr/local/python3/lib/python3.7/site-packages/django_apscheduler/util.py", line 105, in func_wrapper
result = func(*args, **kwargs)
File "/usr/local/python3/lib/python3.7/site-packages/django_apscheduler/jobstores.py", line 249, in update_job
with transaction.atomic():
File "/usr/local/python3/lib/python3.7/site-packages/django/db/transaction.py", line 189, in __enter__
if not connection.get_autocommit():
File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 389, in get_autocommit
self.ensure_connection()
File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner
return func(*args, **kwargs)
File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 219, in ensure_connection
self.connect()
File "/usr/local/python3/lib/python3.7/site-packages/django/db/utils.py", line 90, in __exit__
raise dj_exc_value.with_traceback(traceback) from exc_value
File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 219, in ensure_connection
self.connect()
File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner
return func(*args, **kwargs)
File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 200, in connect
self.connection = self.get_new_connection(conn_params)
File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner
return func(*args, **kwargs)
File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/postgresql/base.py", line 187, in get_new_connection
connection = Database.connect(**conn_params)
File "/usr/local/python3/lib/python3.7/site-packages/psycopg2/__init__.py", line 122, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
django.db.utils.OperationalError: connection to server at "xxxx.postgresql.svc.cluster.local" (xx.xx.xx.xx), port xxxx failed: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
源码分析原因
可以先看下BackgroundScheduler
的实现方式,在start
方法中创建了个子线程。
class BackgroundScheduler(BlockingScheduler):
_thread = None
def start(self, *args, **kwargs):
if self._event is None or self._event.is_set():
self._event = Event()
BaseScheduler.start(self, *args, **kwargs)
self._thread = Thread(target=self._main_loop, name='APScheduler')
self._thread.daemon = self._daemon
self._thread.start()
def shutdown(self, *args, **kwargs):
super(BackgroundScheduler, self).shutdown(*args, **kwargs)
self._thread.join()
del self._thread
其中_main_loop
在BlockingScheduler
中实现,是一个死循环,执行_process_jobs
方法
class BlockingScheduler(BaseScheduler):
...
def _main_loop(self):
wait_seconds = TIMEOUT_MAX
while self.state != STATE_STOPPED:
self._event.wait(wait_seconds)
self._event.clear()
wait_seconds = self._process_jobs()
...
再看_process_jobs中的内容,在BaseScheduler
实现的,主要流程如下,先找到所有要执行的job,然后进行遍历运行并更新Job的状态。之前的错误日志,也就是这里的update_job抛出异常,而这里并没有捕获异常,最终层层往上抛,update_job -> _process_jobs -> _main_loop
,最终线程异常终止。
def _process_jobs(self):
for jobstore_alias, jobstore in six.iteritems(self._jobstores):
try:
due_jobs = jobstore.get_due_jobs(now)
except Exception as e:
...
continue
...
for job in due_jobs:
...
try:
executor.submit_job(job, run_times)
except BaseException:
...
...
jobstore.update_job(job)
那为什么复现不了呢?这个是因为,关闭数据库连接时,程序不一定可以正好运行在update_job
,可以看到前面的get_due_jobs
进行了异常捕获,如果这里抛出数据库连接异常是可以捕获到的,然后跳过后面的操作,等待下一次定时任务的执行,如果还是失败,则再次等待,所以这里的异常不会抛到最上层导致线程停止。
但如果某个时机,上面连接数据库都成功了,到update_job这里异常抛出,则会导致整个线程停止,定时任务不再执行。
那如何解决该问题呢?
搭建demo
首先我们搭建一个demo出来,模拟复现该问题。
- 创建django项目
django-admin startproject apschedule_demo
python manage.py startapp demo
python manage.py makemigrations
python manage.py migrate
- 在settings.py中配置到好数据库信息
DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
"NAME": "apschedule_demo",
"HOST": "xxxx",
"PORT": 5432,
"USER": "xxx",
"PASSWORD": "xxx"
}
}
- 根据django-apschedule官方提供的文档搭建demo
在settings.py中添加该APP
INSTALLED_APPS = (
# ...
"django_apscheduler",
)
创建目录demo/management/commands
,并在其下面创建runapscheduler.py
文件,代码内容如下:
import logging
from django.conf import settings
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore
logger = logging.getLogger(__name__)
def my_job():
# Your job processing logic here...
print("job..")
class Command(BaseCommand):
help = "Runs APScheduler."
def handle(self, *args, **options):
scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
scheduler.add_jobstore(DjangoJobStore(), "default")
scheduler.add_job(
my_job,
trigger=CronTrigger(second="*/3"), # Every 3 seconds
id="my_job", # The `id` assigned to each job MUST be unique
max_instances=1,
replace_existing=True,
)
logger.info("Added job 'my_job'.")
try:
logger.info("Starting scheduler...")
scheduler.start()
# 因为上面是非阻塞开启定时任务,所以这里需要阻塞,不让主线程结束。
while True:
time.sleep(10)
except KeyboardInterrupt:
logger.info("Stopping scheduler...")
scheduler.shutdown()
logger.info("Scheduler shut down successfully!")
可以通过python manage.py runapscheduler
执行上面的命令运行定时任务,该脚本创建了一个每3秒执行一次的任务。
- 复现
我们将断点打在jobstore.update_job(job)
上,然后使用debug模式进行调试,当程序运行到断点上时,将数据库关闭,然后程序继续运行,则会报错,并抛出异常,线程停止了运行。至此,我们复现了该问题。
线程重启
我一开始想,我可以判断该线程是否异常,如果异常则将线程重启就好了
while True:
if not scheduler._thread.is_alive():
scheduler._thread.start()
time.sleep(10)
但事与愿违,抛出了异常,异常信息如下:
RuntimeError: threads can only be started once
通过查看官方文档可以知道,线程的start方法只能调用一次。
listener
apschedule中提供了监听器机制,也就是在定时任务的成功、失败等状态都可以通过提前注册的listener方法来进行回调。但通过分析源码,其并不能捕获到定时任务线程的异常。
下面是简化了代码的listeners的原理流程:
- 外部通过
add_listener
方法注册回调方法 - 在定时任务线程主流程
_process_jobs
中发生的各个事件添加到events中 - 遍历events事件,然后通过与注册的回调方法mask进行匹配,匹配上则调用回调方法
class BaseScheduler:
def __init__(...):
self._listeners = []
def add_listener(self, callback, mask=EVENT_ALL):
self._listeners.append((callback, mask))
def _process_jobs(self):
events = []
...
events.append(event)
...
for event in events:
self._dispatch_event(event)
def _dispatch_event(self, event):
for cb, mask in listeners:
if event.code & mask:
try:
cb(event)
except BaseException:
self._logger.exception('Error notifying listener')
如果线程本身挂了,回调方法是不可执行的。
捕获线程中函数的异常
如果update_job
抛出异常导致线程停止,那我捕获它的异常,然后再continue,等待下次定时任务运行再重试不就好了,但是这就需要改动源码,能不能改源码就尽量不改。所以这边我采用了继承BackgroundScheduler
类,然后再重写_process_jobs
方法来解决。
在重写的_process_jobs
方法中,对父类的_process_jobs()
进行异常的捕获,然后再不断的进行重试,这样即使update_job抛出异常了,也可以不断的进行尝试恢复,直至成功。
class DemoBackgroundScheduler(BackgroundScheduler):
def _process_jobs(self):
while True:
try:
return super()._process_jobs()
except BaseException:
time.sleep(5)
class Command(BaseCommand):
help = "Runs APScheduler."
def handle(self, *args, **options):
scheduler = DemoBackgroundScheduler(timezone=settings.TIME_ZONE)
...
然后再次尝试复现该问题,可以发现在断开数据库后,它能够一直进行重试,线程没有停止,当数据库恢复运行后,job执行成功,不再抛出异常。
相关链接
欢迎关注,互相学习,共同进步~
我的个人博客
公众号:编程黑洞