抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >

背景

在使用flask或者django做后端的时候,经常会有一些列表类接口,需要实现快速查询的目的,带一些条件,有分页,或许还有排序,本身model操作很简单,而当这种查询一多,很多内容是重复写的,尤其是每一次的组装返回值,将结果转换为json,需要一个公共方法来做应对。

需要实现

需要具备以下功能:

  • 单表查询,快速返回结果集并自动完成序列化
  • 结果集支持自动分页,并通过开关控制是否需要分页
  • 支持便捷的传递列名,查询条件,排序条件
  • 支持快速返回符合查询条件的总数
  • 支持多表查询

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import datetime
import flask_sqlalchemy

from easydubbo import db


ignore_filed_list = ['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__mapper__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__table__', '__tablename__', '__weakref__', '_decl_class_registry', '_sa_class_manager', '_sa_instance_state', 'check_password', 'generate_auth_token', 'get_id', 'hash_password', 'is_active', 'is_anonymous', 'is_authenticated', 'metadata', 'password', 'query', 'query_class', 'verify_auth_token']


class ModelOperate:

def __init__(self, nedd_paginate=True):
self._query_column = [] # 查询列
self._query_filter = [] # 过滤条件
self._query_order = [] # 排序条件
self._query_set = '' # 结果集
self.nedd_paginate = nedd_paginate
self._return_column = []

def excute_query(self):
"""
结果集,此处组装完成后还只是sql
"""
self._query_set = db.session.query(*self._query_column).filter(*self._query_filter).order_by(*self._query_order)

def set_query_column(self, *args):
"""
设置查询条件,若无,则返回所有列
"""
if not isinstance(args[0], flask_sqlalchemy.model.DefaultMeta):
self._return_column = args
self._query_column = args

def set_query_filter(self, *args):
"""
设置过滤条件,可不设置
"""
self._query_filter = args[0]

def set_query_order(self, *args):
"""
设置排序字段,可不设置
-xxx降序
+xxx升序
"""
self._query_order = args[0]

@staticmethod
def is_basequery(obj):
"""
判断是否是flask_sqlalchemy.BaseQuery类型
"""
if isinstance(obj, flask_sqlalchemy.BaseQuery):
return True

@property
def total(self):
"""
使用query set的原生方式快速获取结果集的总数
"""
if self.is_basequery(self._query_set):
return self._query_set.count()
raise Exception('Error')

def get_result(self, page=1, page_size=10):
"""
快速进行分页
"""
self.excute_query()
if self.is_basequery(self._query_set):
if self.nedd_paginate:
_result = self._query_set.paginate(page=page, per_page=page_size).items
else:
_result = self._query_set.all()
try:
if self._return_column:
return self.result_convert_to_list(_result)
return self.model_query_convert_to_list(_result)
except Exception as e:
raise Exception("Error: %s" % str(e))

def result_convert_to_list(self, results):
"""
分页结果处理
"""
_resutn_column_nums = len(self._return_column)
_results = []
for _query in results:
_dict = {}
for num in range(0, _resutn_column_nums):
data = _query[num]
name = self._return_column[num].__dict__.get('key')
if isinstance(data, datetime):
_dict[name] = datetime.strftime(data, "%Y-%m-%d %H:%M:%S")
elif data is None:
_dict[name] = ''
else:
_dict[name] = data
_results.append(_dict)
return _results

@staticmethod
def model_query_convert_to_list(results):
_list = []
for i in results:
_dict = {}
for field in [x for x in dir(i) if not x in ignore_filed_list]:
data = i.__getattribute__(field)
try:
# 判断类型
if isinstance(data, datetime):
data = datetime.strftime(data, "%Y-%m-%d %H:%M:%S")
elif data is None:
data = ''
except Exception as e:
data = str(data)
finally:
_dict[field] = data
_list.append(_dict)
return _list

示例Model

现有Model如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from easydubbo import db #easydubbo为我项目名,此处替换成自己的


class User(db.Model):
"""
账号
"""
__tablename__ = 'user'
pkid = db.Column(db.Integer, primary_key=True, autoincrement=True)
account = db.Column(db.String(15), comment="登陆账号", index=True)
password = db.Column(db.String(15), comment="密码")
employee_name = db.Column(db.String(15), comment="用户姓名")
employee_id = db.Column(db.Integer, comment="工号")
permission_type = db.Column(db.Integer, default=1, comment="权限类型,管理员0,普通用户1")
creatime = db.Column(db.DateTime(20), default=datetime.now(), comment="创建时间")
modifytime = db.Column(db.DateTime(20), default=datetime.now(), comment="修改时间")
disabled = db.Column(db.Integer, default=0, comment="0启用,1禁用")
is_deleted = db.Column(db.Integer, default=0, nullable=True, comment="0未删除,1逻辑删除")

数据如下:

image.png

示例结果

什么都不加

image.png

指定返回列

image.png

指定过滤条件

image.png

指定排序

image.png

组合使用

image.png

多表联合查询

image.png

评论