django-apschedule定时任务异常停止

背景

在django项目中使用django-apschedule来实现定时任务,使用的是BackgroundScheduler调度类,该调度的实现是通过后台线程的方式执行定时任务。其中任务都是持久化到数据库中的。

在项目的运行过程中,因为数据库的异常,导致定时任务线程异常终止,即使数据库后续恢复正常,但也不再继续执行。我多次尝试复现未果,在开启定时任务期间,手动将数据库连接断开,定时任务执行失败,然后再将数据库建立连接,定时任务竟然重新恢复了,这让我一时摸不着头脑。

具体的错误日志如下,通过分析,是update_job连接数据库异常,没有任何捕获机制,然后层层网上抛,最终导致线程停止,可以很肯定的是,绝对是因为数据库连接失败导致的定时任务失败,那为什么无法复现呢?

Traceback (most recent call last):
  File "/usr/local/python3/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/local/python3/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/apscheduler/schedulers/blocking.py", line 32, in _main_loop
    wait_seconds = self._process_jobs()
  File "/usr/local/python3/lib/python3.7/site-packages/apscheduler/schedulers/base.py", line 1009, in _process_jobs
    jobstore.update_job(job)
  File "/usr/local/python3/lib/python3.7/site-packages/django_apscheduler/util.py", line 105, in func_wrapper
    result = func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django_apscheduler/jobstores.py", line 249, in update_job
    with transaction.atomic():
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/transaction.py", line 189, in __enter__
    if not connection.get_autocommit():
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 389, in get_autocommit
    self.ensure_connection()
  File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner
     return func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 219, in ensure_connection
    self.connect()
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/utils.py", line 90, in __exit__
    raise dj_exc_value.with_traceback(traceback) from exc_value
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 219, in ensure_connection
    self.connect()
  File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner
    return func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 200, in connect
    self.connection = self.get_new_connection(conn_params)
  File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner
    return func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/postgresql/base.py", line 187, in get_new_connection
    connection = Database.connect(**conn_params)
  File "/usr/local/python3/lib/python3.7/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
django.db.utils.OperationalError: connection to server at "xxxx.postgresql.svc.cluster.local" (xx.xx.xx.xx), port xxxx failed: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.

源码分析原因

可以先看下BackgroundScheduler的实现方式,在start方法中创建了个子线程。

class BackgroundScheduler(BlockingScheduler):

    _thread = None

    def start(self, *args, **kwargs):
        if self._event is None or self._event.is_set():
            self._event = Event()

        BaseScheduler.start(self, *args, **kwargs)
        self._thread = Thread(target=self._main_loop, name='APScheduler')
        self._thread.daemon = self._daemon
        self._thread.start()

    def shutdown(self, *args, **kwargs):
        super(BackgroundScheduler, self).shutdown(*args, **kwargs)
        self._thread.join()
        del self._thread

其中_main_loopBlockingScheduler中实现,是一个死循环,执行_process_jobs方法

class BlockingScheduler(BaseScheduler):
    
    ...

    def _main_loop(self):
        wait_seconds = TIMEOUT_MAX
        while self.state != STATE_STOPPED:
            self._event.wait(wait_seconds)
            self._event.clear()
            wait_seconds = self._process_jobs()
    
    ...

再看_process_jobs中的内容,在BaseScheduler实现的,主要流程如下,先找到所有要执行的job,然后进行遍历运行并更新Job的状态。之前的错误日志,也就是这里的update_job抛出异常,而这里并没有捕获异常,最终层层往上抛,update_job -> _process_jobs -> _main_loop,最终线程异常终止。

def _process_jobs(self):
    for jobstore_alias, jobstore in six.iteritems(self._jobstores):
        try:
            due_jobs = jobstore.get_due_jobs(now)
        except Exception as e:
            ...
            continue

        ...
                
        for job in due_jobs:
      
            ...
            
            try:
                executor.submit_job(job, run_times)
            except BaseException:
                ...

            ...
            jobstore.update_job(job)

那为什么复现不了呢?这个是因为,关闭数据库连接时,程序不一定可以正好运行在update_job,可以看到前面的get_due_jobs进行了异常捕获,如果这里抛出数据库连接异常是可以捕获到的,然后跳过后面的操作,等待下一次定时任务的执行,如果还是失败,则再次等待,所以这里的异常不会抛到最上层导致线程停止。

但如果某个时机,上面连接数据库都成功了,到update_job这里异常抛出,则会导致整个线程停止,定时任务不再执行。

那如何解决该问题呢?

搭建demo

首先我们搭建一个demo出来,模拟复现该问题。

  1. 创建django项目

django-admin startproject apschedule_demo

python manage.py startapp demo

python manage.py makemigrations

python manage.py migrate
  1. 在settings.py中配置到好数据库信息
DATABASES = {
    "default": {
        "ENGINE": "django.db.backends.postgresql",
        "NAME": "apschedule_demo",
        "HOST": "xxxx",
        "PORT": 5432,
        "USER": "xxx",
        "PASSWORD": "xxx"
    }
}

  1. 根据django-apschedule官方提供的文档搭建demo

在settings.py中添加该APP

INSTALLED_APPS = (
    # ...
    "django_apscheduler",
)

创建目录demo/management/commands,并在其下面创建runapscheduler.py文件,代码内容如下:

import logging

from django.conf import settings

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore

logger = logging.getLogger(__name__)


def my_job():
  # Your job processing logic here...
  print("job..")


class Command(BaseCommand):
  help = "Runs APScheduler."

  def handle(self, *args, **options):
    scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
    scheduler.add_jobstore(DjangoJobStore(), "default")

    scheduler.add_job(
      my_job,
      trigger=CronTrigger(second="*/3"),  # Every 3 seconds
      id="my_job",  # The `id` assigned to each job MUST be unique
      max_instances=1,
      replace_existing=True,
    )
    logger.info("Added job 'my_job'.")

    try:
      logger.info("Starting scheduler...")
      scheduler.start()

    # 因为上面是非阻塞开启定时任务,所以这里需要阻塞,不让主线程结束。
    while True:
            time.sleep(10)
    except KeyboardInterrupt:
      logger.info("Stopping scheduler...")
      scheduler.shutdown()
      logger.info("Scheduler shut down successfully!")

可以通过python manage.py runapscheduler执行上面的命令运行定时任务,该脚本创建了一个每3秒执行一次的任务。

  1. 复现

我们将断点打在jobstore.update_job(job)上,然后使用debug模式进行调试,当程序运行到断点上时,将数据库关闭,然后程序继续运行,则会报错,并抛出异常,线程停止了运行。至此,我们复现了该问题。

线程重启

我一开始想,我可以判断该线程是否异常,如果异常则将线程重启就好了

    while True:
        if not scheduler._thread.is_alive():
            scheduler._thread.start()

        time.sleep(10)

但事与愿违,抛出了异常,异常信息如下:

RuntimeError: threads can only be started once

通过查看官方文档可以知道,线程的start方法只能调用一次。

listener

apschedule中提供了监听器机制,也就是在定时任务的成功、失败等状态都可以通过提前注册的listener方法来进行回调。但通过分析源码,其并不能捕获到定时任务线程的异常。

下面是简化了代码的listeners的原理流程:

  1. 外部通过add_listener方法注册回调方法
  2. 在定时任务线程主流程_process_jobs中发生的各个事件添加到events中
  3. 遍历events事件,然后通过与注册的回调方法mask进行匹配,匹配上则调用回调方法
class BaseScheduler:
    def __init__(...):
        self._listeners = []

    def add_listener(self, callback, mask=EVENT_ALL):
        self._listeners.append((callback, mask))

    def _process_jobs(self):

        events = []
        
        ...

        events.append(event)
 
        ...


        for event in events:
            self._dispatch_event(event)


    def _dispatch_event(self, event):
        for cb, mask in listeners:
            if event.code & mask:
                try:
                    cb(event)
                except BaseException:
                    self._logger.exception('Error notifying listener')

如果线程本身挂了,回调方法是不可执行的。

捕获线程中函数的异常

如果update_job抛出异常导致线程停止,那我捕获它的异常,然后再continue,等待下次定时任务运行再重试不就好了,但是这就需要改动源码,能不能改源码就尽量不改。所以这边我采用了继承BackgroundScheduler类,然后再重写_process_jobs方法来解决。

在重写的_process_jobs方法中,对父类的_process_jobs()进行异常的捕获,然后再不断的进行重试,这样即使update_job抛出异常了,也可以不断的进行尝试恢复,直至成功。

class DemoBackgroundScheduler(BackgroundScheduler):
    def _process_jobs(self):
        while True:
            try:
                return super()._process_jobs()
            except BaseException:
                time.sleep(5)

class Command(BaseCommand):
    help = "Runs APScheduler."

    def handle(self, *args, **options):
        scheduler = DemoBackgroundScheduler(timezone=settings.TIME_ZONE)
        ...

然后再次尝试复现该问题,可以发现在断开数据库后,它能够一直进行重试,线程没有停止,当数据库恢复运行后,job执行成功,不再抛出异常。

相关链接

欢迎关注,互相学习,共同进步~

我的个人博客
公众号:编程黑洞