阅读量:0
一、说明
建立了几个文件
二、一步一步来
1、建立json_response.py 继承了 Response,
一共三个函数,成功、详情,错误
from rest_framework.response import Response class SuccessResponse(Response): """ 标准响应成功的返回, SuccessResponse(data)或者SuccessResponse(data=data) (1)默认code返回2000, 不支持指定其他返回码 """ def __init__(self, data=None, msg='success', status=None, template_name=None, headers=None, exception=False, content_type=None,page=1,limit=1,total=1): std_data = { "code": 2000, "page": page, "limit": limit, "total": total, "data": data, "msg": msg } super().__init__(std_data, status, template_name, headers, exception, content_type) class DetailResponse(Response): """ 不包含分页信息的接口返回,主要用于单条数据查询 (1)默认code返回2000, 不支持指定其他返回码 """ def __init__(self, data=None, msg='success', status=None, template_name=None, headers=None, exception=False, content_type=None,): std_data = { "code": 2000, "data": data, "msg": msg } super().__init__(std_data, status, template_name, headers, exception, content_type) class ErrorResponse(Response): """ 标准响应错误的返回,ErrorResponse(msg='xxx') (1)默认错误码返回400, 也可以指定其他返回码:ErrorResponse(code=xxx) """ def __init__(self, data=None, msg='error', code=400, status=None, template_name=None, headers=None, exception=False, content_type=None): std_data = { "code": code, "data": data, "msg": msg } super().__init__(std_data, status, template_name, headers, exception, content_type)
2、自定义异常处理函数 文件为exception.py
import logging import traceback from django.db.models import ProtectedError from django.http import Http404 from rest_framework.exceptions import APIException as DRFAPIException, AuthenticationFailed, NotAuthenticated from rest_framework.status import HTTP_401_UNAUTHORIZED from rest_framework.views import set_rollback, exception_handler from apps.utils.json_response import ErrorResponse logger = logging.getLogger(__name__) class CustomAuthenticationFailed(NotAuthenticated): # 设置 status_code 属性为 400 status_code = 400 def CustomExceptionHandler(ex, context): """ 统一异常拦截处理 目的:(1)取消所有的500异常响应,统一响应为标准错误返回 (2)准确显示错误信息 :param ex: :param context: :return: """ msg = '' code = 4000 # 调用默认的异常处理函数 response = exception_handler(ex, context) if isinstance(ex, AuthenticationFailed): # 如果是身份验证错误 if response and response.data.get('detail') == "Given token not valid for any token type": code = 401 msg = ex.detail elif response and response.data.get('detail') == "Token is blacklisted": # token在黑名单 return ErrorResponse(status=HTTP_401_UNAUTHORIZED) else: code = 401 msg = ex.detail elif isinstance(ex, Http404): code = 400 msg = "接口地址不正确" elif isinstance(ex, DRFAPIException): set_rollback() msg = ex.detail if isinstance(msg, dict): for k, v in msg.items(): for i in v: msg = "%s:%s" % (k, i) elif isinstance(ex, ProtectedError): set_rollback() msg = "删除失败:该条数据与其他数据有相关绑定" # elif isinstance(ex, DatabaseError): # set_rollback() # msg = "接口服务器异常,请联系管理员" elif isinstance(ex, Exception): logger.exception(traceback.format_exc()) msg = str(ex) return ErrorResponse(msg=msg, code=code)
3、自定义分页处理 pagination.py
from collections import OrderedDict from django.core import paginator from django.core.paginator import Paginator as DjangoPaginator, InvalidPage from rest_framework.pagination import PageNumberPagination from rest_framework.response import Response class CustomPagination(PageNumberPagination): page_size = 10 page_size_query_param = "limit" max_page_size = 999 django_paginator_class = DjangoPaginator def paginate_queryset(self, queryset, request, view=None): """ Paginate a queryset if required, either returning a page object, or `None` if pagination is not configured for this view. """ empty = True page_size = self.get_page_size(request) if not page_size: return None paginator = self.django_paginator_class(queryset, page_size) page_number = request.query_params.get(self.page_query_param, 1) if page_number in self.last_page_strings: page_number = paginator.num_pages try: self.page = paginator.page(page_number) except InvalidPage as exc: # msg = self.invalid_page_message.format( # page_number=page_number, message=str(exc) # ) # raise NotFound(msg) empty = False if paginator.num_pages > 1 and self.template is not None: # The browsable API should display pagination controls. self.display_page_controls = True self.request = request if not empty: self.page = [] return list(self.page) def get_paginated_response(self, data): code = 2000 msg = 'success' page = int(self.get_page_number(self.request, paginator)) or 1 total = self.page.paginator.count if self.page else 0 limit = int(self.get_page_size(self.request)) or 10 is_next = self.page.has_next() if self.page else False is_previous = self.page.has_previous() if self.page else False if not data: code = 2000 msg = "暂无数据" data = [] return Response(OrderedDict([ ('code', code), ('msg', msg), ('page', page), ('limit', limit), ('total', total), ('is_next', is_next), ('is_previous', is_previous), ('data', data) ]))
4、自定义过滤customfilters.py
import operator import re from collections import OrderedDict from functools import reduce import six from django.db import models from django.db.models import Q, F from django.db.models.constants import LOOKUP_SEP from django_filters import utils, FilterSet from django_filters.constants import ALL_FIELDS from django_filters.filters import CharFilter, DateTimeFromToRangeFilter from django_filters.rest_framework import DjangoFilterBackend from django_filters.utils import get_model_field from rest_framework.filters import BaseFilterBackend from django_filters.conf import settings from apps.system.models import Dept, ApiWhiteList, RoleMenuButtonPermission class CustomDjangoFilterBackend(BaseFilterBackend): """ 自定义时间范围过滤器 """ def filter_queryset(self, request, queryset, view): create_datetime_after = request.query_params.get('create_datetime_after', None) create_datetime_before = request.query_params.get('create_datetime_before', None) update_datetime_after = request.query_params.get('update_datetime_after', None) update_datetime_before = request.query_params.get('update_datetime_after', None) if any([create_datetime_after, create_datetime_before, update_datetime_after, update_datetime_before]): create_filter = Q() if create_datetime_after and create_datetime_before: create_filter &= Q(create_datetime__gte=create_datetime_after) & Q( create_datetime__lte=create_datetime_before) elif create_datetime_after: create_filter &= Q(create_datetime__gte=create_datetime_after) elif create_datetime_before: create_filter &= Q(create_datetime__lte=create_datetime_before) # 更新时间范围过滤条件 update_filter = Q() if update_datetime_after and update_datetime_before: update_filter &= Q(update_datetime__gte=update_datetime_after) & Q( update_datetime__lte=update_datetime_before) elif update_datetime_after: update_filter &= Q(update_datetime__gte=update_datetime_after) elif update_datetime_before: update_filter &= Q(update_datetime__lte=update_datetime_before) # 结合两个时间范围过滤条件 queryset = queryset.filter(create_filter & update_filter) return queryset return queryset def get_dept(dept_id: int, dept_all_list=None, dept_list=None): """ 递归获取部门的所有下级部门 :param dept_id: 需要获取的部门id :param dept_all_list: 所有部门列表 :param dept_list: 递归部门list :return: """ if not dept_all_list: dept_all_list = Dept.objects.all().values("id", "parent") if dept_list is None: dept_list = [dept_id] for ele in dept_all_list: if ele.get("parent") == dept_id: dept_list.append(ele.get("id")) get_dept(ele.get("id"), dept_all_list, dept_list) return list(set(dept_list)) class DataLevelPermissionsFilter(BaseFilterBackend): """ 数据 级权限过滤器 0. 获取用户的部门id,没有部门则返回空 1. 判断过滤的数据是否有创建人所在部门 "creator" 字段,没有则返回全部 2. 如果用户没有关联角色则返回本部门数据 3. 根据角色的最大权限进行数据过滤(会有多个角色,进行去重取最大权限) 3.1 判断用户是否为超级管理员角色/如果有1(所有数据) 则返回所有数据 4. 只为仅本人数据权限时只返回过滤本人数据,并且部门为自己本部门(考虑到用户会变部门,只能看当前用户所在的部门数据) 5. 自定数据权限 获取部门,根据部门过滤 """ def filter_queryset(self, request, queryset, view): """ 接口白名单是否认证数据权限 """ api = request.path # 当前请求接口 method = request.method # 当前请求方法 methodList = ["GET", "POST", "PUT", "DELETE", "OPTIONS"] method = methodList.index(method) # ***接口白名单*** api_white_list = ApiWhiteList.objects.filter(enable_datasource=False).values( permission__api=F("url"), permission__method=F("method") ) api_white_list = [ str(item.get("permission__api").replace("{id}", ".*?")) + ":" + str(item.get("permission__method")) for item in api_white_list if item.get("permission__api") ] for item in api_white_list: new_api = f"{api}:{method}" matchObj = re.match(item, new_api, re.M | re.I) if matchObj is None: continue else: return queryset """ 判断是否为超级管理员: 如果不是超级管理员,则进入下一步权限判断 """ if request.user.is_superuser == 0: return self._extracted_from_filter_queryset_33(request, queryset, api, method) else: return queryset # TODO Rename this here and in `filter_queryset` def _extracted_from_filter_queryset_33(self, request, queryset, api, method): # 0. 获取用户的部门id,没有部门则返回空 user_dept_id = getattr(request.user, "dept_id", None) if not user_dept_id: return queryset.none() # 1. 判断过滤的数据是否有创建人所在部门 "dept_belong_id" 字段 if not getattr(queryset.model, "dept_belong_id", None): return queryset # 2. 如果用户没有关联角色则返回本部门数据 if not hasattr(request.user, "role"): return queryset.filter(dept_belong_id=user_dept_id) # 3. 根据所有角色 获取所有权限范围 # (0, "仅本人数据权限"), # (1, "本部门及以下数据权限"), # (2, "本部门数据权限"), # (3, "全部数据权限"), # (4, "自定数据权限") re_api = api _pk = request.parser_context["kwargs"].get('pk') if _pk: # 判断是否是单例查询 re_api = re.sub(_pk, '{id}', api) role_id_list = request.user.role.values_list('id', flat=True) role_permission_list = RoleMenuButtonPermission.objects.filter( role__in=role_id_list, role__status=1, menu_button__api=re_api, menu_button__method=method).values( 'data_range' ) dataScope_list = [] # 权限范围列表 for ele in role_permission_list: # 判断用户是否为超级管理员角色/如果拥有[全部数据权限]则返回所有数据 if ele.get("data_range") == 3: return queryset dataScope_list.append(ele.get("data_range")) dataScope_list = list(set(dataScope_list)) # 4. 只为仅本人数据权限时只返回过滤本人数据,并且部门为自己本部门(考虑到用户会变部门,只能看当前用户所在的部门数据) if 0 in dataScope_list: return queryset.filter( creator=request.user, dept_belong_id=user_dept_id ) # 5. 自定数据权限 获取部门,根据部门过滤 dept_list = [] for ele in dataScope_list: if ele == 1: dept_list.append(user_dept_id) dept_list.extend( get_dept( user_dept_id, ) ) elif ele == 2: dept_list.append(user_dept_id) elif ele == 4: dept_ids = RoleMenuButtonPermission.objects.filter( role__in=role_id_list, role__status=1, data_range=4).values_list( 'dept__id', flat=True ) dept_list.extend( dept_ids ) if queryset.model._meta.model_name == 'dept': return queryset.filter(id__in=list(set(dept_list))) return queryset.filter(dept_belong_id__in=list(set(dept_list))) class CustomDjangoFilterBackend(DjangoFilterBackend): lookup_prefixes = { "^": "istartswith", "=": "iexact", "@": "search", "$": "iregex", "~": "icontains", } filter_fields = "__all__" def construct_search(self, field_name, lookup_expr=None): lookup = self.lookup_prefixes.get(field_name[0]) if lookup: field_name = field_name[1:] else: lookup = lookup_expr if lookup: if field_name.endswith(lookup): return field_name return LOOKUP_SEP.join([field_name, lookup]) return field_name def find_filter_lookups(self, orm_lookups, search_term_key): for lookup in orm_lookups: # if lookup.find(search_term_key) >= 0: new_lookup = LOOKUP_SEP.join(lookup.split(LOOKUP_SEP)[:-1]) if len(lookup.split(LOOKUP_SEP)) > 1 else lookup # 修复条件搜索错误 bug if new_lookup == search_term_key: return lookup return None def get_filterset_class(self, view, queryset=None): """ Return the `FilterSet` class used to filter the queryset. """ filterset_class = getattr(view, "filterset_class", None) filterset_fields = getattr(view, "filterset_fields", None) # TODO: remove assertion in 2.1 if filterset_class is None and hasattr(view, "filter_class"): utils.deprecate( "`%s.filter_class` attribute should be renamed `filterset_class`." % view.__class__.__name__ ) filterset_class = getattr(view, "filter_class", None) # TODO: remove assertion in 2.1 if filterset_fields is None and hasattr(view, "filter_fields"): utils.deprecate( "`%s.filter_fields` attribute should be renamed `filterset_fields`." % view.__class__.__name__ ) self.filter_fields = getattr(view, "filter_fields", None) if isinstance(self.filter_fields, (list, tuple)): filterset_fields = [ field[1:] if field[0] in self.lookup_prefixes.keys() else field for field in self.filter_fields ] else: filterset_fields = self.filter_fields if filterset_class: filterset_model = filterset_class._meta.model # FilterSets do not need to specify a Meta class if filterset_model and queryset is not None: assert issubclass( queryset.model, filterset_model ), "FilterSet model %s does not match queryset model %s" % ( filterset_model, queryset.model, ) return filterset_class if filterset_fields and queryset is not None: MetaBase = getattr(self.filterset_base, "Meta", object) class AutoFilterSet(self.filterset_base): @classmethod def get_all_model_fields(cls, model): opts = model._meta return [ f.name for f in sorted(opts.fields + opts.many_to_many) if (f.name == "id") or not isinstance(f, models.AutoField) and not (getattr(f.remote_field, "parent_link", False)) ] @classmethod def get_fields(cls): """ Resolve the 'fields' argument that should be used for generating filters on the filterset. This is 'Meta.fields' sans the fields in 'Meta.exclude'. """ model = cls._meta.model fields = cls._meta.fields exclude = cls._meta.exclude assert not (fields is None and exclude is None), ( "Setting 'Meta.model' without either 'Meta.fields' or 'Meta.exclude' " "has been deprecated since 0.15.0 and is now disallowed. Add an explicit " "'Meta.fields' or 'Meta.exclude' to the %s class." % cls.__name__ ) # Setting exclude with no fields implies all other fields. if exclude is not None and fields is None: fields = ALL_FIELDS # Resolve ALL_FIELDS into all fields for the filterset's model. if fields == ALL_FIELDS: fields = cls.get_all_model_fields(model) # Remove excluded fields exclude = exclude or [] if not isinstance(fields, dict): fields = [(f, [settings.DEFAULT_LOOKUP_EXPR]) for f in fields if f not in exclude] else: fields = [(f, lookups) for f, lookups in fields.items() if f not in exclude] return OrderedDict(fields) @classmethod def get_filters(cls): """ Get all filters for the filterset. This is the combination of declared and generated filters. """ # No model specified - skip filter generation if not cls._meta.model: return cls.declared_filters.copy() # Determine the filters that should be included on the filterset. filters = OrderedDict() fields = cls.get_fields() undefined = [] for field_name, lookups in fields.items(): field = get_model_field(cls._meta.model, field_name) from django.db import models from timezone_field import TimeZoneField # 不进行 过滤的model 类 if isinstance(field, (models.JSONField, TimeZoneField)): continue # warn if the field doesn't exist. if field is None: undefined.append(field_name) # 更新默认字符串搜索为模糊搜索 if ( isinstance(field, (models.CharField)) and filterset_fields == "__all__" and lookups == ["exact"] ): lookups = ["icontains"] for lookup_expr in lookups: filter_name = cls.get_filter_name(field_name, lookup_expr) # If the filter is explicitly declared on the class, skip generation if filter_name in cls.declared_filters: filters[filter_name] = cls.declared_filters[filter_name] continue if field is not None: filters[filter_name] = cls.filter_for_field(field, field_name, lookup_expr) # Allow Meta.fields to contain declared filters *only* when a list/tuple if isinstance(cls._meta.fields, (list, tuple)): undefined = [f for f in undefined if f not in cls.declared_filters] if undefined: raise TypeError( "'Meta.fields' must not contain non-model field names: %s" % ", ".join(undefined) ) # Add in declared filters. This is necessary since we don't enforce adding # declared filters to the 'Meta.fields' option filters.update(cls.declared_filters) return filters class Meta(MetaBase): model = queryset.model fields = filterset_fields return AutoFilterSet return None def filter_queryset(self, request, queryset, view): filterset = self.get_filterset(request, queryset, view) if filterset is None: return queryset if filterset.__class__.__name__ == "AutoFilterSet": queryset = filterset.queryset filter_fields = filterset.filters if self.filter_fields == "__all__" else self.filter_fields orm_lookup_dict = dict( zip( [field for field in filter_fields], [filterset.filters[lookup].lookup_expr for lookup in filterset.filters.keys()], ) ) orm_lookups = [ self.construct_search(lookup, lookup_expr) for lookup, lookup_expr in orm_lookup_dict.items() ] # print(orm_lookups) conditions = [] queries = [] for search_term_key in filterset.data.keys(): orm_lookup = self.find_filter_lookups(orm_lookups, search_term_key) if not orm_lookup or filterset.data.get(search_term_key) == '': continue filterset_data_len = len(filterset.data.getlist(search_term_key)) if filterset_data_len == 1: query = Q(**{orm_lookup: filterset.data[search_term_key]}) queries.append(query) elif filterset_data_len == 2: orm_lookup += '__range' query = Q(**{orm_lookup: filterset.data.getlist(search_term_key)}) queries.append(query) if len(queries) > 0: conditions.append(reduce(operator.and_, queries)) queryset = queryset.filter(reduce(operator.and_, conditions)) return queryset else: return queryset if not filterset.is_valid() and self.raise_exception: raise utils.translate_validation(filterset.errors) return filterset.qs
5、配置REST_FRAMEWORK
# ================================================= # # *************** REST_FRAMEWORK配置 *************** # # ================================================= # REST_FRAMEWORK = { 'DEFAULT_PARSER_CLASSES': ( 'rest_framework.parsers.JSONParser', 'rest_framework.parsers.MultiPartParser', ), "DATETIME_FORMAT": "%Y-%m-%d %H:%M:%S", # 日期时间格式配置 "DATE_FORMAT": "%Y-%m-%d", "DEFAULT_FILTER_BACKENDS": ( "apps.utils.customfilters.CustomDjangoFilterBackend", "rest_framework.filters.SearchFilter", "rest_framework.filters.OrderingFilter", ), "DEFAULT_PAGINATION_CLASS": "apps.utils.pagination.CustomPagination", # 自定义分页 "DEFAULT_AUTHENTICATION_CLASSES": ( "rest_framework_simplejwt.authentication.JWTAuthentication", "rest_framework.authentication.SessionAuthentication", ), "DEFAULT_PERMISSION_CLASSES": [ "rest_framework.permissions.IsAuthenticated", # 只有经过身份认证确定用户身份才能访问 ], "EXCEPTION_HANDLER": "apps.utils.exception.CustomExceptionHandler", # 自定义的异常处理 }