智慧水务项目(四)django(drf)+angular 18 配置REST_FRAMEWORK

avatar
作者
筋斗云
阅读量:0

一、说明

建立了几个文件

 

二、一步一步来

1、建立json_response.py 继承了 Response,

一共三个函数,成功、详情,错误

from rest_framework.response import Response   class SuccessResponse(Response):     """     标准响应成功的返回, SuccessResponse(data)或者SuccessResponse(data=data)     (1)默认code返回2000, 不支持指定其他返回码     """      def __init__(self, data=None, msg='success', status=None, template_name=None, headers=None, exception=False,                  content_type=None,page=1,limit=1,total=1):         std_data = {             "code": 2000,             "page": page,             "limit": limit,             "total": total,             "data": data,             "msg": msg         }         super().__init__(std_data, status, template_name, headers, exception, content_type)   class DetailResponse(Response):     """     不包含分页信息的接口返回,主要用于单条数据查询     (1)默认code返回2000, 不支持指定其他返回码     """      def __init__(self, data=None, msg='success', status=None, template_name=None, headers=None, exception=False,                  content_type=None,):         std_data = {             "code": 2000,             "data": data,             "msg": msg         }         super().__init__(std_data, status, template_name, headers, exception, content_type)   class ErrorResponse(Response):     """     标准响应错误的返回,ErrorResponse(msg='xxx')     (1)默认错误码返回400, 也可以指定其他返回码:ErrorResponse(code=xxx)     """      def __init__(self, data=None, msg='error', code=400, status=None, template_name=None, headers=None,                  exception=False, content_type=None):         std_data = {             "code": code,             "data": data,             "msg": msg         }         super().__init__(std_data, status, template_name, headers, exception, content_type) 

2、自定义异常处理函数 文件为exception.py

import logging import traceback  from django.db.models import ProtectedError from django.http import Http404 from rest_framework.exceptions import APIException as DRFAPIException, AuthenticationFailed, NotAuthenticated from rest_framework.status import HTTP_401_UNAUTHORIZED from rest_framework.views import set_rollback, exception_handler  from apps.utils.json_response import ErrorResponse  logger = logging.getLogger(__name__)   class CustomAuthenticationFailed(NotAuthenticated):     # 设置 status_code 属性为 400     status_code = 400   def CustomExceptionHandler(ex, context):     """     统一异常拦截处理     目的:(1)取消所有的500异常响应,统一响应为标准错误返回         (2)准确显示错误信息     :param ex:     :param context:     :return:     """     msg = ''     code = 4000     # 调用默认的异常处理函数     response = exception_handler(ex, context)     if isinstance(ex, AuthenticationFailed):         # 如果是身份验证错误         if response and response.data.get('detail') == "Given token not valid for any token type":             code = 401             msg = ex.detail         elif response and response.data.get('detail') == "Token is blacklisted":             # token在黑名单             return ErrorResponse(status=HTTP_401_UNAUTHORIZED)         else:             code = 401             msg = ex.detail     elif isinstance(ex, Http404):         code = 400         msg = "接口地址不正确"     elif isinstance(ex, DRFAPIException):         set_rollback()         msg = ex.detail         if isinstance(msg, dict):             for k, v in msg.items():                 for i in v:                     msg = "%s:%s" % (k, i)     elif isinstance(ex, ProtectedError):         set_rollback()         msg = "删除失败:该条数据与其他数据有相关绑定"     # elif isinstance(ex, DatabaseError):     #     set_rollback()     #     msg = "接口服务器异常,请联系管理员"     elif isinstance(ex, Exception):         logger.exception(traceback.format_exc())         msg = str(ex)     return ErrorResponse(msg=msg, code=code) 

3、自定义分页处理 pagination.py

from collections import OrderedDict  from django.core import paginator from django.core.paginator import Paginator as DjangoPaginator, InvalidPage from rest_framework.pagination import PageNumberPagination from rest_framework.response import Response   class CustomPagination(PageNumberPagination):     page_size = 10     page_size_query_param = "limit"     max_page_size = 999     django_paginator_class = DjangoPaginator      def paginate_queryset(self, queryset, request, view=None):         """         Paginate a queryset if required, either returning a         page object, or `None` if pagination is not configured for this view.         """         empty = True          page_size = self.get_page_size(request)         if not page_size:             return None          paginator = self.django_paginator_class(queryset, page_size)         page_number = request.query_params.get(self.page_query_param, 1)         if page_number in self.last_page_strings:             page_number = paginator.num_pages          try:             self.page = paginator.page(page_number)         except InvalidPage as exc:             # msg = self.invalid_page_message.format(             #     page_number=page_number, message=str(exc)             # )             # raise NotFound(msg)             empty = False          if paginator.num_pages > 1 and self.template is not None:             # The browsable API should display pagination controls.             self.display_page_controls = True          self.request = request          if not empty:             self.page = []          return list(self.page)      def get_paginated_response(self, data):         code = 2000         msg = 'success'         page = int(self.get_page_number(self.request, paginator)) or 1         total = self.page.paginator.count if self.page else 0         limit = int(self.get_page_size(self.request)) or 10         is_next = self.page.has_next() if self.page else False         is_previous = self.page.has_previous() if self.page else False          if not data:             code = 2000             msg = "暂无数据"             data = []          return Response(OrderedDict([             ('code', code),             ('msg', msg),             ('page', page),             ('limit', limit),             ('total', total),             ('is_next', is_next),             ('is_previous', is_previous),             ('data', data)         ])) 

4、自定义过滤customfilters.py

import operator import re from collections import OrderedDict from functools import reduce   import six from django.db import models from django.db.models import Q, F from django.db.models.constants import LOOKUP_SEP from django_filters import utils, FilterSet from django_filters.constants import ALL_FIELDS from django_filters.filters import CharFilter, DateTimeFromToRangeFilter from django_filters.rest_framework import DjangoFilterBackend from django_filters.utils import get_model_field from rest_framework.filters import BaseFilterBackend from django_filters.conf import settings  from apps.system.models import Dept, ApiWhiteList, RoleMenuButtonPermission   class CustomDjangoFilterBackend(BaseFilterBackend):     """     自定义时间范围过滤器     """      def filter_queryset(self, request, queryset, view):         create_datetime_after = request.query_params.get('create_datetime_after', None)         create_datetime_before = request.query_params.get('create_datetime_before', None)         update_datetime_after = request.query_params.get('update_datetime_after', None)         update_datetime_before = request.query_params.get('update_datetime_after', None)         if any([create_datetime_after, create_datetime_before, update_datetime_after, update_datetime_before]):             create_filter = Q()             if create_datetime_after and create_datetime_before:                 create_filter &= Q(create_datetime__gte=create_datetime_after) & Q(                     create_datetime__lte=create_datetime_before)             elif create_datetime_after:                 create_filter &= Q(create_datetime__gte=create_datetime_after)             elif create_datetime_before:                 create_filter &= Q(create_datetime__lte=create_datetime_before)              # 更新时间范围过滤条件             update_filter = Q()             if update_datetime_after and update_datetime_before:                 update_filter &= Q(update_datetime__gte=update_datetime_after) & Q(                     update_datetime__lte=update_datetime_before)             elif update_datetime_after:                 update_filter &= Q(update_datetime__gte=update_datetime_after)             elif update_datetime_before:                 update_filter &= Q(update_datetime__lte=update_datetime_before)             # 结合两个时间范围过滤条件             queryset = queryset.filter(create_filter & update_filter)             return queryset         return queryset   def get_dept(dept_id: int, dept_all_list=None, dept_list=None):     """     递归获取部门的所有下级部门     :param dept_id: 需要获取的部门id     :param dept_all_list: 所有部门列表     :param dept_list: 递归部门list     :return:     """     if not dept_all_list:         dept_all_list = Dept.objects.all().values("id", "parent")     if dept_list is None:         dept_list = [dept_id]     for ele in dept_all_list:         if ele.get("parent") == dept_id:             dept_list.append(ele.get("id"))             get_dept(ele.get("id"), dept_all_list, dept_list)     return list(set(dept_list))   class DataLevelPermissionsFilter(BaseFilterBackend):     """     数据 级权限过滤器     0. 获取用户的部门id,没有部门则返回空     1. 判断过滤的数据是否有创建人所在部门 "creator" 字段,没有则返回全部     2. 如果用户没有关联角色则返回本部门数据     3. 根据角色的最大权限进行数据过滤(会有多个角色,进行去重取最大权限)     3.1 判断用户是否为超级管理员角色/如果有1(所有数据) 则返回所有数据      4. 只为仅本人数据权限时只返回过滤本人数据,并且部门为自己本部门(考虑到用户会变部门,只能看当前用户所在的部门数据)     5. 自定数据权限 获取部门,根据部门过滤     """      def filter_queryset(self, request, queryset, view):         """         接口白名单是否认证数据权限         """         api = request.path  # 当前请求接口         method = request.method  # 当前请求方法         methodList = ["GET", "POST", "PUT", "DELETE", "OPTIONS"]         method = methodList.index(method)         # ***接口白名单***         api_white_list = ApiWhiteList.objects.filter(enable_datasource=False).values(             permission__api=F("url"), permission__method=F("method")         )         api_white_list = [             str(item.get("permission__api").replace("{id}", ".*?"))             + ":"             + str(item.get("permission__method"))             for item in api_white_list             if item.get("permission__api")         ]         for item in api_white_list:             new_api = f"{api}:{method}"             matchObj = re.match(item, new_api, re.M | re.I)             if matchObj is None:                 continue             else:                 return queryset         """         判断是否为超级管理员:         如果不是超级管理员,则进入下一步权限判断         """         if request.user.is_superuser == 0:             return self._extracted_from_filter_queryset_33(request, queryset, api, method)         else:             return queryset      # TODO Rename this here and in `filter_queryset`     def _extracted_from_filter_queryset_33(self, request, queryset, api, method):         # 0. 获取用户的部门id,没有部门则返回空         user_dept_id = getattr(request.user, "dept_id", None)         if not user_dept_id:             return queryset.none()          # 1. 判断过滤的数据是否有创建人所在部门 "dept_belong_id" 字段         if not getattr(queryset.model, "dept_belong_id", None):             return queryset          # 2. 如果用户没有关联角色则返回本部门数据         if not hasattr(request.user, "role"):             return queryset.filter(dept_belong_id=user_dept_id)          # 3. 根据所有角色 获取所有权限范围         # (0, "仅本人数据权限"),         # (1, "本部门及以下数据权限"),         # (2, "本部门数据权限"),         # (3, "全部数据权限"),         # (4, "自定数据权限")         re_api = api         _pk = request.parser_context["kwargs"].get('pk')         if _pk:  # 判断是否是单例查询             re_api = re.sub(_pk, '{id}', api)         role_id_list = request.user.role.values_list('id', flat=True)         role_permission_list = RoleMenuButtonPermission.objects.filter(             role__in=role_id_list,             role__status=1,             menu_button__api=re_api,             menu_button__method=method).values(             'data_range'         )         dataScope_list = []  # 权限范围列表         for ele in role_permission_list:             # 判断用户是否为超级管理员角色/如果拥有[全部数据权限]则返回所有数据             if ele.get("data_range") == 3:                 return queryset             dataScope_list.append(ele.get("data_range"))         dataScope_list = list(set(dataScope_list))          # 4. 只为仅本人数据权限时只返回过滤本人数据,并且部门为自己本部门(考虑到用户会变部门,只能看当前用户所在的部门数据)         if 0 in dataScope_list:             return queryset.filter(                 creator=request.user, dept_belong_id=user_dept_id             )          # 5. 自定数据权限 获取部门,根据部门过滤         dept_list = []         for ele in dataScope_list:             if ele == 1:                 dept_list.append(user_dept_id)                 dept_list.extend(                     get_dept(                         user_dept_id,                     )                 )             elif ele == 2:                 dept_list.append(user_dept_id)             elif ele == 4:                 dept_ids = RoleMenuButtonPermission.objects.filter(                     role__in=role_id_list,                     role__status=1,                     data_range=4).values_list(                     'dept__id', flat=True                 )                 dept_list.extend(                     dept_ids                 )         if queryset.model._meta.model_name == 'dept':             return queryset.filter(id__in=list(set(dept_list)))         return queryset.filter(dept_belong_id__in=list(set(dept_list)))    class CustomDjangoFilterBackend(DjangoFilterBackend):     lookup_prefixes = {         "^": "istartswith",         "=": "iexact",         "@": "search",         "$": "iregex",         "~": "icontains",     }     filter_fields = "__all__"      def construct_search(self, field_name, lookup_expr=None):         lookup = self.lookup_prefixes.get(field_name[0])         if lookup:             field_name = field_name[1:]         else:             lookup = lookup_expr         if lookup:             if field_name.endswith(lookup):                 return field_name             return LOOKUP_SEP.join([field_name, lookup])         return field_name      def find_filter_lookups(self, orm_lookups, search_term_key):         for lookup in orm_lookups:             # if lookup.find(search_term_key) >= 0:             new_lookup = LOOKUP_SEP.join(lookup.split(LOOKUP_SEP)[:-1]) if len(lookup.split(LOOKUP_SEP)) > 1 else lookup             # 修复条件搜索错误 bug             if new_lookup == search_term_key:                 return lookup         return None      def get_filterset_class(self, view, queryset=None):         """         Return the `FilterSet` class used to filter the queryset.         """         filterset_class = getattr(view, "filterset_class", None)         filterset_fields = getattr(view, "filterset_fields", None)          # TODO: remove assertion in 2.1         if filterset_class is None and hasattr(view, "filter_class"):             utils.deprecate(                 "`%s.filter_class` attribute should be renamed `filterset_class`." % view.__class__.__name__             )             filterset_class = getattr(view, "filter_class", None)          # TODO: remove assertion in 2.1         if filterset_fields is None and hasattr(view, "filter_fields"):             utils.deprecate(                 "`%s.filter_fields` attribute should be renamed `filterset_fields`." % view.__class__.__name__             )             self.filter_fields = getattr(view, "filter_fields", None)             if isinstance(self.filter_fields, (list, tuple)):                 filterset_fields = [                     field[1:] if field[0] in self.lookup_prefixes.keys() else field for field in self.filter_fields                 ]             else:                 filterset_fields = self.filter_fields          if filterset_class:             filterset_model = filterset_class._meta.model              # FilterSets do not need to specify a Meta class             if filterset_model and queryset is not None:                 assert issubclass(                     queryset.model, filterset_model                 ), "FilterSet model %s does not match queryset model %s" % (                     filterset_model,                     queryset.model,                 )              return filterset_class          if filterset_fields and queryset is not None:             MetaBase = getattr(self.filterset_base, "Meta", object)              class AutoFilterSet(self.filterset_base):                 @classmethod                 def get_all_model_fields(cls, model):                     opts = model._meta                      return [                         f.name                         for f in sorted(opts.fields + opts.many_to_many)                         if (f.name == "id")                         or not isinstance(f, models.AutoField)                         and not (getattr(f.remote_field, "parent_link", False))                     ]                  @classmethod                 def get_fields(cls):                     """                     Resolve the 'fields' argument that should be used for generating filters on the                     filterset. This is 'Meta.fields' sans the fields in 'Meta.exclude'.                     """                     model = cls._meta.model                     fields = cls._meta.fields                     exclude = cls._meta.exclude                      assert not (fields is None and exclude is None), (                         "Setting 'Meta.model' without either 'Meta.fields' or 'Meta.exclude' "                         "has been deprecated since 0.15.0 and is now disallowed. Add an explicit "                         "'Meta.fields' or 'Meta.exclude' to the %s class." % cls.__name__                     )                      # Setting exclude with no fields implies all other fields.                     if exclude is not None and fields is None:                         fields = ALL_FIELDS                      # Resolve ALL_FIELDS into all fields for the filterset's model.                     if fields == ALL_FIELDS:                         fields = cls.get_all_model_fields(model)                      # Remove excluded fields                     exclude = exclude or []                     if not isinstance(fields, dict):                         fields = [(f, [settings.DEFAULT_LOOKUP_EXPR]) for f in fields if f not in exclude]                     else:                         fields = [(f, lookups) for f, lookups in fields.items() if f not in exclude]                      return OrderedDict(fields)                  @classmethod                 def get_filters(cls):                     """                     Get all filters for the filterset. This is the combination of declared and                     generated filters.                     """                      # No model specified - skip filter generation                     if not cls._meta.model:                         return cls.declared_filters.copy()                      # Determine the filters that should be included on the filterset.                     filters = OrderedDict()                     fields = cls.get_fields()                     undefined = []                      for field_name, lookups in fields.items():                         field = get_model_field(cls._meta.model, field_name)                         from django.db import models                         from timezone_field import TimeZoneField                          # 不进行 过滤的model 类                         if isinstance(field, (models.JSONField, TimeZoneField)):                             continue                         # warn if the field doesn't exist.                         if field is None:                             undefined.append(field_name)                         # 更新默认字符串搜索为模糊搜索                         if (                             isinstance(field, (models.CharField))                             and filterset_fields == "__all__"                             and lookups == ["exact"]                         ):                             lookups = ["icontains"]                         for lookup_expr in lookups:                             filter_name = cls.get_filter_name(field_name, lookup_expr)                              # If the filter is explicitly declared on the class, skip generation                             if filter_name in cls.declared_filters:                                 filters[filter_name] = cls.declared_filters[filter_name]                                 continue                              if field is not None:                                 filters[filter_name] = cls.filter_for_field(field, field_name, lookup_expr)                      # Allow Meta.fields to contain declared filters *only* when a list/tuple                     if isinstance(cls._meta.fields, (list, tuple)):                         undefined = [f for f in undefined if f not in cls.declared_filters]                      if undefined:                         raise TypeError(                             "'Meta.fields' must not contain non-model field names: %s" % ", ".join(undefined)                         )                      # Add in declared filters. This is necessary since we don't enforce adding                     # declared filters to the 'Meta.fields' option                     filters.update(cls.declared_filters)                     return filters                  class Meta(MetaBase):                     model = queryset.model                     fields = filterset_fields              return AutoFilterSet          return None      def filter_queryset(self, request, queryset, view):         filterset = self.get_filterset(request, queryset, view)         if filterset is None:             return queryset         if filterset.__class__.__name__ == "AutoFilterSet":             queryset = filterset.queryset             filter_fields = filterset.filters if self.filter_fields == "__all__" else self.filter_fields             orm_lookup_dict = dict(                 zip(                     [field for field in filter_fields],                     [filterset.filters[lookup].lookup_expr for lookup in filterset.filters.keys()],                 )             )             orm_lookups = [                 self.construct_search(lookup, lookup_expr) for lookup, lookup_expr in orm_lookup_dict.items()             ]             # print(orm_lookups)             conditions = []             queries = []             for search_term_key in filterset.data.keys():                 orm_lookup = self.find_filter_lookups(orm_lookups, search_term_key)                 if not orm_lookup or filterset.data.get(search_term_key) == '':                     continue                 filterset_data_len = len(filterset.data.getlist(search_term_key))                 if filterset_data_len == 1:                     query = Q(**{orm_lookup: filterset.data[search_term_key]})                     queries.append(query)                 elif filterset_data_len == 2:                     orm_lookup += '__range'                     query = Q(**{orm_lookup: filterset.data.getlist(search_term_key)})                     queries.append(query)             if len(queries) > 0:                 conditions.append(reduce(operator.and_, queries))                 queryset = queryset.filter(reduce(operator.and_, conditions))                 return queryset             else:                 return queryset          if not filterset.is_valid() and self.raise_exception:             raise utils.translate_validation(filterset.errors)         return filterset.qs 

5、配置REST_FRAMEWORK

 

# ================================================= # # *************** REST_FRAMEWORK配置 *************** # # ================================================= #  REST_FRAMEWORK = {     'DEFAULT_PARSER_CLASSES': (         'rest_framework.parsers.JSONParser',         'rest_framework.parsers.MultiPartParser',     ),     "DATETIME_FORMAT": "%Y-%m-%d %H:%M:%S",  # 日期时间格式配置     "DATE_FORMAT": "%Y-%m-%d",     "DEFAULT_FILTER_BACKENDS": (         "apps.utils.customfilters.CustomDjangoFilterBackend",         "rest_framework.filters.SearchFilter",         "rest_framework.filters.OrderingFilter",     ),      "DEFAULT_PAGINATION_CLASS": "apps.utils.pagination.CustomPagination",  # 自定义分页     "DEFAULT_AUTHENTICATION_CLASSES": (         "rest_framework_simplejwt.authentication.JWTAuthentication",         "rest_framework.authentication.SessionAuthentication",     ),     "DEFAULT_PERMISSION_CLASSES": [         "rest_framework.permissions.IsAuthenticated",  # 只有经过身份认证确定用户身份才能访问     ],     "EXCEPTION_HANDLER": "apps.utils.exception.CustomExceptionHandler",  # 自定义的异常处理  }

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!