# 定时任务django-apscheduler的使用,以及设定随机触发任务
7 min read
Table of Contents
一、安装
pip install django-apscheduler
然后添加到INSTALLED_APPS
二、使用
1.创建一个新app,如python manage.py startapp SomeScheduler
2.在这个新app下创建文件夹和文件,如app/management /commands/runapscheduler.py
3.在runapscheduler.py下粘贴如下代码:
import loggingimport os
from django.conf import settings
from apscheduler.schedulers.blocking import BlockingSchedulerfrom apscheduler.triggers.cron import CronTriggerfrom django.core.management.base import BaseCommandfrom django_apscheduler.jobstores import DjangoJobStorefrom django_apscheduler.models import DjangoJobExecutionfrom django_apscheduler import util
from changeIP import dnspodimport time
logger = logging.getLogger(__name__)
def sync_ip(): # 此处写你要执行的定时任务 if dnspod.ip_changed(6): print(time.strftime("%Y-%m-%d %H:%M:%S") + " IPv6 is changed:") dnspod.change_dnspod() else: print(time.strftime("%Y-%m-%d %H:%M:%S") + " IPv6 is not changed.")
def check_in(): pass
def check_out(): res = os.system(r"ant -buildfile C:\My_Scripts\build_get_out_test.xml") # res = subprocess.run(["ant", "-buildfile", r"C:\My_Scripts\build_get_out_test.xml"],) # res.returncode if res == 0: print("Check out success.") else: print("Check out failed.")
# 设置保留历史作业执行记录的最长时间,防止数据库被填满@util.close_old_connectionsdef delete_old_job_executions(max_age=604_800): """ :param max_age: 历史执行记录最长保留7天 """ DjangoJobExecution.objects.delete_old_job_executions(max_age)
# 自定义的Django命令,就是runapscheduler.py的文件名class Command(BaseCommand): help = "Runs APScheduler."
def handle(self, *args, **options): scheduler = BlockingScheduler(timezone=settings.TIME_ZONE) scheduler.add_jobstore(DjangoJobStore(), "default") """ id可以不加,如果不加ID是这个应用view下的函数名字 replace_existing=True 这个东西不加的话,他会提示ID冲突了。 """ scheduler.add_job( sync_ip, # jitter就是抖动时间,如第0秒开始,则实际执行时间为0+59秒之内的一个随时间执行 trigger=CronTrigger(minute="*", jitter=59), id="sync_ip", # 你要执行的函数 max_instances=1, replace_existing=True, ) logger.info("Added job 'sync_ip'.")
scheduler.add_job( check_out, trigger=CronTrigger(day_of_week='sat-sun', hour='16', minute='55', second='50'), # 周六周日执行一次 id="check_out", # 你要执行的函数 max_instances=1, replace_existing=True, ) logger.info("Added job 'check_out'.")
scheduler.add_job( delete_old_job_executions, trigger=CronTrigger( day_of_week="mon", hour="00", minute="00" ), # Midnight on Monday, before start of the next work week. id="delete_old_job_executions", max_instances=1, replace_existing=True, ) logger.info( "Added weekly job: 'delete_old_job_executions'." )
try: logger.info("Starting scheduler...") print("开始执行计划任务...") scheduler.start() except KeyboardInterrupt: logger.info("Stopping scheduler...") print("正在停止计划任务...") scheduler.shutdown() logger.info("Scheduler shut down successfully!") print("计划任务已关闭!")4.最后在终端里输入python manage.py runapscheduler就可以执行了,
ps:注意即使django没有运行,计划任务也会执行并写入数据库,除非把中断ctrl+c关闭掉。
参考连接1
参考连接2
三、一些参数说明
1.cron定时调度(某一定时时刻执行)
(int|str) 表示参数既可以是int类型,也可以是str类型(datetime | str) 表示参数既可以是datetime类型,也可以是str类型
year (int|str) – 4-digit year -(表示四位数的年份,如2008年)month (int|str) – month (1-12) -(表示取值范围为1-12月)day (int|str) – day of the (1-31) -(表示取值范围为1-31日)week (int|str) – ISO week (1-53) -(格里历2006年12月31日可以写成2006年-W52-7(扩展形式)或2006W527(紧凑形式))day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) - (表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示)hour (int|str) – hour (0-23) - (表示取值范围为0-23时)minute (int|str) – minute (0-59) - (表示取值范围为0-59分)second (int|str) – second (0-59) - (表示取值范围为0-59秒)start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) - (表示开始时间)end_date (datetime|str) – latest possible date/time to trigger on (inclusive) - (表示结束时间)timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)jitter=Nonejitter (int|None)# 当省略时间参数时,在显示指定参数前的参数都会被设定为*,之后的参数会被设置为最小值,2.interval 间隔调度(每隔多久执行)
weeks (int) – number of weeks to waitdays (int) – number of days to waithours (int) – number of hours to waitminutes (int) – number of minutes to waitseconds (int) – number of seconds to waitstart_date (datetime|str) – (2020-10-10 09:30:00)end_date (datetime|str) – (2021-10-10 09:30:00)timezone (datetime.tzinfo|str) – time zone to use for the date/time calculationsjitter: 振动参数,给每次触发添加一个随机浮动秒数,一般适用多服务器,避免同时执行3.date 定时调度(作业只会执行一次)
run_date (date|datetime|str) – "2009-11-06 16:30:05" -(任务开始的时间)timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already四、调度器
1. BlockingScheduler : 调度器在当前进程的主线程中运行,也就是会阻塞当前线程。2. BackgroundScheduler : 调度器在后台线程中运行,不会阻塞当前线程。(Django框架使用)3. AsyncIOScheduler : 结合 asyncio 模块(一个异步框架)一起使用。4. GeventScheduler : 程序中使用 gevent(高性能的Python并发框架)作为IO模型,和 - GeventExecutor 配合使用。5. TornadoScheduler : 程序中使用 Tornado(一个web框架)的IO模型,用 ioloop.add_timeout 完成定时唤醒。6. TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定时唤醒。scrapy爬虫框架7. QtScheduler : 你的应用是一个 Qt 应用,需使用QTimer完成定时唤醒BlockingScheduler 和BackgroundScheduler 调度器
五、操作定时任务的函数
删除任务:scheduler.remove_job(job_name)暂停任务:scheduler.pause_job(job_name)开启任务:scheduler.resume_job(job_name)获取所有任务:scheduler.get_jobs()修改任务:scheduler.modify_job(job_name)