阅读量:6
1.安装库
pip install django-apscheduler
2.添加 install_app
django_apscheduler
3.在app下添加一个task.py文件,用来实现具体的定时任务
task.py def my_scheduled_job(): print("这个任务每3秒执行一次", time.time())
4.在app下创建一个management文件夹,里面包含一个空的__init__.py和一个文件夹commands,commands里面包含一个空的__init__.py和一个start_tasks.py,其中start_tasks.py是主要的启动脚本,结构如下:
from datetime import datetime from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger from django.core.management.base import BaseCommand from django_apscheduler.jobstores import DjangoJobStore from ...tasks import my_scheduled_job, my_scheduled_job1 # # Django manage.py命令:存储定时任务信息 # class Command(BaseCommand): help = '启动定时任务.' def handle(self, *args, **options): # 调度器 scheduler = BlockingScheduler() # 研发阶段使用 # scheduler = BackgroundScheduler() # 生产阶段使用 # 任务存储 scheduler.add_jobstore(DjangoJobStore(), 'default') # 配置线程池执行器,限制最大并发数为1,防止并发 executor = ThreadPoolExecutor(max_workers=1) scheduler.executor = executor # 注册定义任务 id_print_task = 'print_task__job' print('开始-增加任务({})'.format(id_print_task)) scheduler.add_job( my_scheduled_job, id=id_print_task, name=id_print_task, max_instances=1, replace_existing=True, trigger=IntervalTrigger(seconds=15, start_date=datetime.now(), ), # 从当前时间开始,每15秒钟调度一次 ) # 指定某个时间运行 run_date = datetime(2024, 7, 18, 18, 33) scheduler.add_job( my_scheduled_job1, id='id_print_task1', name='id_print_task1', trigger=DateTrigger(run_date=run_date), # 使用 DateTrigger 并设置 run_date replace_existing=True # 如果已存在同名任务,则替换它 ) # 指定每天某个时间运行 scheduler.add_job( my_scheduled_job1, id='daily_task_at_18_36', name='Daily Task at 18:36', trigger=CronTrigger(hour=18, minute=38), # 使用 CronTrigger 并设置小时和分钟 replace_existing=True # 如果已存在同名任务,则替换它 ) print('完成-增加任务({})'.format(id_print_task)) # 启动定时任务 try: scheduler.start() except KeyboardInterrupt: scheduler.shutdown()
5.修改settings.py配置,将时区改为中国,最主要是把USE两项注释掉,否则后面的定时任务的时间会晚八小时!!!!
settings.py LANGUAGE_CODE = 'zh-hans' # 中文语言代码 TIME_ZONE = 'Asia/Shanghai' # 中国上海时区 # USE_I18N = True # # USE_TZ = True
6.添加完成后,做数据库迁移,会生成 表django_apscheduler_djangojob 和 表django_apscheduler_djangojobexecution
python manage.py migrate
7.启动项目,然后启动定时任务:
python manage.py start_tasks
启动后,任务开始执行,数据库上述两张表会有数据进去,定时任务完成,其他的见后面拓展。
拓展:
django-apscheduler框架还提供了很多操作定时任务的函数。比如:
- 删除任务
scheduler.remove_job(job_name)
- 暂停任务
scheduler.pause_job(job_name)
- 开启任务
scheduler.resume_job(job_name)
- 获取所有任务
scheduler.get_jobs()
- 修改任务
scheduler.modify_job(job_name)
注:修改任务只能修改参数,如果要修改执行时间的话,有3种方法
第一就把任务删了重新创建,
第二直接操作数据库,
第三用到下面重设任务。
- 重设任务
scheduler.reschedule_job(job_name)
scheduler.reschedule_job(job_id="job1", trigger='interval', minutes=1)