阅读量:0
1 Django中集成方式一(通用方案)
1.1 把上面的包-复制到djagno项目中
1.2 在views中编写视图函数
1.3 配置路由
1.4 浏览器访问,提交任务
1.5 启动worker执行任务
1.6 查看任务结果
2 Django中集成方式二(官方方案)
2.0 安装模块
pip installDjango==3.2.22 pip install celery pip install redis pip install eventlet #在windows环境下需要安装eventlet包 ----------- pip install django-celery-beat pip install django-celery-results pip install django-simpleui
2.1 在项目目录下新建celery.py
import os import django from celery import Celery from django.conf import settings os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_demo.settings') django.setup() # broker = 'redis://127.0.0.1:6379/1' # backend = 'redis://127.0.0.1:6379/2' # app = Celery('celery_demo',broker=broker, backend=backend) app = Celery('celery_demo') # app.conf.update( # BROKER_URL='redis://127.0.0.1:6379/1', # # BACKEND配置,使用redis # CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2', # CELERY_ACCEPT_CONTENT=['json'], # CELERY_TASK_SERIALIZER='json', # # 结果序列化方案 # CELERY_RESULT_SERIALIZER='json', # # 任务结果过期时间,秒 # CELERY_TASK_RESULT_EXPIRES=60 * 60 * 24, # # 时区配置 # CELERY_TIMEZONE='Asia/Shanghai', # ) app.config_from_object('django.conf:settings') app.autodiscover_tasks() # app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
2.2 在django配置文件中加入
# celery 配置 ###----Celery redis 配置-----### # Broker配置,使用Redis作为消息中间件 BROKER_URL = 'redis://127.0.0.1:6379/1' # BACKEND配置,使用redis CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' # 结果序列化方案 CELERY_RESULT_SERIALIZER = 'json' # 任务结果过期时间,秒 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 时区配置 CELERY_TIMEZONE = 'Asia/Shanghai'
2.3 在主目录的__init__.py中添加如下代码
from .celery import app as celery_app __all__ = ('celery_app',)
2.4 在app下新建tasks.py(必须叫tasks.py)
from celery import shared_task import time @shared_task() def add(): time.sleep(1) print('结果是') return 10 @shared_task() def send_email(mail): time.sleep(1) print(f'给{mail}发送邮件了') return '成功'
2.5 实现异步views.py
from django.shortcuts import render,HttpResponse # Create your views here. from .tasks import add def celery_add(request): res=add.delay() return HttpResponse(res)
2.6 配置路由
总路由urls.py
from django.contrib import admin from django.urls import path,include urlpatterns = [ path('admin/', admin.site.urls), path('app01/', include('app01.urls')), ]
app自己的路由urls.py
from django.contrib import admin from django.urls import path from .views import celery_add urlpatterns = [ path('celery_demo/', celery_add), ]
2.7 启动celery
celery -A celery_demo worker -l debug -P eventlet
2.8 浏览器访问-添加任务
http://127.0.0.1:8000/app01/celery_demo/
3 实现定时任务
3.1 settings.py加入
# celery_beat CELERYBEAT_SCHEDULE = { 'every_5_seconds': { # 任务路径 'task': 'app01.tasks.add', # 每5秒执行一次 'schedule': 200, 'args': () }, # 'every_10_seconds': { # # 任务路径 # 'task': 'app01.tasks.send_email', # # 每10秒执行一次,task1的参数是5 # 'schedule': 10, # 'args': (['306334678@qq.com']) # } }
3.2 启动worker和beat
celery -A celery_demo worker -l debug -P eventlet celery -A celery_demo beat -l debug
4 通过Admin配置定时任务
通过settings.py的配置可以实现定时任务的配置,做为实际项目中可能还是不够实用,更加工程化的做法是将定时任务的配置放到数据库里通过界面来配置。 Celery对此也提供了很好的支持,这需要安装django-celery-beat插件
4.1 安装djiango-celery-beat
pip install django-celery-beat
4.2 在APP中注册djiango-celery-beat
INSTALLED_APPS = [ .... 'django_celery_beat', ]
4.3 在settings.py中设置调度器及时区
在settings.py中屏蔽到原来的调度器,加入
CELERYBEAT_SCHEDULER = 'django_celery_beat.schedulers.DatabaseScheduler'
4.4 设置时区
LANGUAGE_CODE = 'zh-hans' TIME_ZONE = 'Asia/Shanghai' USE_I18N = True USE_TZ = False
4.5 数据库迁移
python manage.py migrate django_celery_beat
4.6 启动woker和beat
#在两个控制台分别启动woker和beta celery -A celery_demo worker -l debug -P eventlet celery -A celery_demo beat -l debug
4.7 创建超级用户-访问admin的web管理端
# 1 创建超级用户 python manage.py createsuperuser # 2 访问admin http://127.0.0.1:8000/admin/login/
4.8 美化admin
# 1 开源地址 https://gitee.com/tompeppa/simpleui # 2 文档地址 https://newpanjing.github.io/simpleui_docs/config.html # 3 安装 pip3 install django-simpleui # 4 配置app INSTALLED_APPS = [ 'simpleui', 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', ... ] # 5 重新打开admin
5 admin监控任务执行情况
在控制台监控任务执行情况,还不是很方便,最好是能够通过web界面看到任务的执行情况,如有多少任务在执行,有多少任务执行失败了等。 这个Celery也是可以做到了,就是将任务执行结果写到数据库中,通过web界面显示出来。 这里要用到django-celery-results插件。 通过插件可以使用Django的orm作为结果存储,这样的好处在于我们可以直接通过django的数据查看到任务状态,同时为可以制定更多的操作
5.1 安装django-celery-results
pip install django-celery-results
5.2 配置settings.py,注册app
INSTALLED_APPS = ( ..., 'django_celery_results', )
5.3 修改backend配置,将Redis改为django-db
# BACKEND配置,使用redis #CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' # 使用使用django orm 作为结果存储 CELERY_RESULT_BACKEND = 'django-db' #使用django orm 作为结果存储
5.4 迁移数据库
python manage.py migrate django_celery_results # 可以看到创建了django_celery_results相关的表
5.5 admin 查看
6 Flower监控任务执行情况
如果不想通django的管理界面监控任务的执行,还可以通过Flower插件来进行任务的监控。Flower的界面更加丰富,可以监控的信息更全
Flower 是一个用于监控和管理 Celery 集群的开源 Web 应用程序。它提供有关 Celery workers 和tasks状态的实时信息
# Flower可以: 1 实时监控celery的Events -查看任务进度和历史记录 -查看任务详细信息(参数、开始时间、运行时间等) 2 远程操作 -查看workers 状态和统计数据 -关闭并重新启动workers 实例 -控制工作池大小和自动缩放设置 -查看和修改工作实例消耗的队列 -查看当前正在运行的任务 -查看计划任务(预计到达时间/倒计时) -查看保留和撤销的任务 -应用时间和速率限制 -撤销或终止任务 3 Broker 监控 -查看所有 Celery 队列的统计信息
6.1 安装和启动
# 安装 pip install flower # 启动 # 方式一: celery -A celery_demo flower --port-5555 #方式二 celery --broker=redis://127.0.0.1:6379/1 flower # 浏览器访问: http://127.0.0.1:5555/
7 任务异常自动告警
虽然可以通过界面来监控了,但是我们想要得更多,人不可能天天盯着界面看吧,如果能实现任务执行失败就自动发邮件告警就好了。这个Celery当然也是没有问题的。
通过钩子程序在异常的时候触发邮件通知
7.1 tasks.py中加入
from celery import shared_task import time from celery import Task from django.core.mail import send_mail from django.conf import settings # 成功失败邮件告警 class SendEmailTask(Task): def on_success(self, retval, task_id, args, kwargs): info = f'任务成功-- 任务id是:{task_id} , 参数是:{args} , 执行成功 !' send_mail('celery任务监控成功告警', info, settings.EMAIL_HOST_USER, ["616564099@qq.com",]) print('------------成功') def on_failure(self, exc, task_id, args, kwargs, einfo): info = f'任务失败-- 任务id为:{task_id} , 参数为:{args} , 失败 ! 失败信息为: {exc}' send_mail('celery任务监控失败告警', info, settings.EMAIL_HOST_USER, ["616564099@qq.com",]) print('------------失败') def on_retry(self, exc, task_id, args, kwargs, einfo): print(f'任务id位::{task_id} , 参数为:{args} , 重试了 ! 错误信息为: {exc}') @shared_task(base=SendEmailTask, bind=True) def add(a,b): time.sleep(1) return a+b @shared_task() def send_email(mail): print(f'给{mail}发送邮件了') return '成功' # celery -A celery_demo worker -l debug -P eventlet # celery -A celery_demo beat -l debug # celery -A celery_demo flower --port-5566
7.2 重启服务
# celery -A celery_demo worker -l debug -P eventlet # celery -A celery_demo beat -l debug
7.3 验证效果
在任务成功或失败的时候发邮件通知
8 爬取技术文章并告警
8.1 task.py
import requests from bs4 import BeautifulSoup from redis import Redis from app01.models import Article @shared_task(base=SendEmailTask, bind=True) def crawl_cnblogs(self): # redis 链接 conn = Redis(host='127.0.0.1', port='6379') res = requests.get('https://www.cnblogs.com/') soup = BeautifulSoup(res.text, 'html.parser') article_list = soup.find_all(name='article', class_='post-item') for article in article_list: title = article.find(name='a', class_='post-item-title').text author = article.find(name='a', class_='post-item-author').span.text url = article.find(name='a', class_='post-item-title').attrs.get('href') desc = article.find(name='p', class_='post-item-summary').text.strip() print(f''' 文章标题:{title} 文章作者:{author} 文章地址:{url} 文章摘要:{desc} ''') res = conn.sadd('urls', url) if res: Article.objects.create(title=title, author=author, url=url, desc=desc)
8.2 models.py
class Article(models.Model): title = models.CharField(max_length=64) author = models.CharField(max_length=64) url = models.CharField(max_length=64) desc = models.TextField()
8.3 在admin中添加任务并查看结果
#### 邮箱配置#### EMAIL_HOST = 'smtp.qq.com' # 如果是 163 改成 smtp.163.com EMAIL_PORT = 465 EMAIL_HOST_USER = '306334678@qq.com' # 帐号 EMAIL_HOST_PASSWORD = 'nbjpdbazeeflbjej' # 密码 DEFAULT_FROM_EMAIL = EMAIL_HOST_USER #这样收到的邮件,收件人处就会这样显示 #DEFAULT_FROM_EMAIL = 'lqz<'306334678@qq.com>' EMAIL_USE_SSL = True #使用ssl #EMAIL_USE_TLS = False # 使用tls #EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True
8.4 爬美女图片
import os # 爬取美女图片 @shared_task(base=SendEmailTask, bind=True) def crawl_photo(self,url): res = requests.get(url) res.encoding = 'gbk' # print(res.text) soup = BeautifulSoup(res.text, 'html.parser') ul = soup.find('ul', class_='clearfix') img_list = ul.find_all(name='img', src=True) for img in img_list: try: url = img.attrs.get('src') if not url.startswith('http'): url = 'https://pic.netbian.com' + url print(url) res1 = requests.get(url) name = url.split('-')[-1] with open(os.path.join(settings.BASE_DIR,'img',name), 'wb') as f: for line in res1.iter_content(): f.write(line) except Exception as e: continue