1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
| from sqlalchemy import create_engine, Text, ForeignKey, Column, String, Integer, Table, MetaData
from faker import Factory
# 连接,创建
# sqlite3
from sqlalchemy.orm import relationship, sessionmaker
engine = create_engine('sqlite:///data.sqlite')
print(engine)
# 使用orm
from sqlalchemy.ext.declarative import declarative_base
"""
----1----
"""
# 基类
Base = declarative_base()
# 设计表的结构
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(64), nullable=False)
password = Column(String(64), nullable=False, index=True)
articles = relationship('Article', backref='author')
# 一对一关系,加上userlist=False
userinfo = relationship('UserInfo', backref='user', uselist=False)
def __repr__(self):
return f'{self.username}'
class UserInfo(Base):
__tablename__ = 'userinfos'
id = Column(Integer, primary_key=True)
name = Column(String(64))
qq = Column(String(11))
phone = Column(String(11))
link = Column(String(64))
user_id = Column(Integer, ForeignKey('users.id'))
class Article(Base):
__tablename__ = 'articles'
id = Column(Integer, primary_key=True)
title = Column(String(255), nullable=False, index=True)
content = Column(Text)
user_id = Column(Integer, ForeignKey('users.id'))
tags = relationship('Tag', secondary='article_tag', backref='articles')
# author = relationship('User')
def __repr__(self):
return '%s' % self.__class__.__name__
class Tag(Base):
__tablename__ = 'tags'
id = Column(Integer, primary_key=True)
name = Column(String(64), nullable=False, index=True)
def __repr__(self):
return self.__class__.__name__
# 多对多关系,需要分解成两个一对多关系。
article_tag = Table(
'article_tag', Base.metadata,
Column('article_id', Integer, ForeignKey('articles.id')),
Column('tag_id', Integer, ForeignKey('tags.id'))
)
"""
----2----
"""
# 绑定引擎
# metadata = MetaData(engine)
# 删除表
# metadata.drop_all()
# 设置表的结构
"""
user_table = Table('users', metadata,
Column('id', Integer, primary_key=True),
Column('username', String(64), nullable=False),
Column('password', String(64), nullable=False, index=True),
)
articles = relationship('article_table')
article_table = Table('articles', metadata,
Column('title', String(255), primary_key=True),
Column("content", Text),
Column('user_id', Integer, ForeignKey('users.id')),
)
author = relationship('user_table')
"""
# 创建表
# metadata.create_all()
# 连接表
# autoload_with:指定引擎
# u = Table('user', metadata, autoload=True)
# print(dir(u))
if __name__ == '__main__':
# 创建表
# Base.metadata.create_all(engine)
# 创建一个会话
# faker = Factory.create()
Sessoin = sessionmaker(bind=engine)
sessoin = Sessoin()
# faker_users = [
# User(username=faker.name(),
# password=faker.word(),
# ) for i in range(10)]
# sessoin.add_all(faker_users)
# faker_tags = [Tag(name=faker.word()) for i in range(5)]
# sessoin.add_all(faker_tags)
# for i in range(100):
# article = Article(
# title=faker.sentence(),
# content=faker.sentence(),
# author=random.choice(faker_users)
# )
# for tag in random.sample(faker_tags, random.randint(2, 5)):
# article.tags.append(tag)
# sessoin.add(article)
# sessoin.commit()
# 添加一条操作
# user_1 = User(username='user_1', password='user_1')
# sessoin.add(user_1)
# 添加多条数据
# user_2 = User(username='user_2', password='user_2')
# user_3 = User(username='user_3', password='user_3')
# sessoin.add_all([user_2, user_3])
# sessoin.commit()
# 查找某个表中的全部数据
select_all = sessoin.query(User).all()
# print(select_all)
# 查找表中的一条数据
select_first = sessoin.query(User).first()
# print(select_first)
# 查找的时候筛选数据,配合all 或者 first
select_filterby = sessoin.query(User.password).filter_by(username='user_1')
# print(select_filterby)
# filter和filter_by很像,前者用于简单查询,注意到使用filter查询时,需要添加对象名,要不然会报错,
# 后者用于复杂查询
# equals
select_filter_eq = sessoin.query(User).filter(User.username == 'user_1')
# not equals
select_filter_nq = sessoin.query(User).filter(User.username != 'user_1')
# more than
select_filter_gt = sessoin.query(User).filter(User.id > 5).all()
# less than
select_filter_lt = sessoin.query(User).filter(User.id < 5).all()
# like
select_filter_like = sessoin.query(User).filter(User.username.like('u%'))
# in
select_filter_in = sessoin.query(User).filter(User.username.in_(['user_1', 'user_2']))
# not in
select_filter_not_in = sessoin.query(User).filter(~User.id.in_([2, 3]))
from sqlalchemy.orm.exc import MultipleResultsFound
# print(sessoin.query(User.id).filter(User.username == 'user_1').order_by(User.id).one())
# print(select_filter_eq)
# and
from sqlalchemy import and_
sessoin.query(User).filter(and_(User.id == 1, User.username == 'user_1'))
sessoin.query(User).filter(User.id == 1).filter(User.username == 'user_1')
# or
from sqlalchemy import or_
sessoin.query(User).filter(or_(User.id == 1, User.username == 'user_1'))
# match
sessoin.query(User).filter(User.username.match('user_1'))
# 使用text,配合filter,filter_by
from sqlalchemy import text
sessoin.query(User).filter(text("id>2")).order_by(text('id')).all()
# 使用params,添加参数
sessoin.query(User).filter(text('id >2 and name=:name').params(name='user_1'))
# 使用from_statement执行原生的sql语句
sessoin.query(User).from_statement(text("select * from users"))
# 同样使用params,添加参数
sessoin.query(User).from_statement(text("select * from users where name=:name").params(name='user_1'))
# 统计username='user_1'的数量
b = sessoin.query(User).filter(User.username == 'user_1').count()
print(b)
# 统计某些组的个数,使用func.count()
# [(1, 'Brian Johnson'), (1, 'Charles Barber'), (1, 'Kathryn Walker'), (1, 'Margaret
# Fox'), (1, 'Marie Reyes'), (1, 'Michael Nichols'), (1, 'Mrs. Karen Delacruz'), (1, 'Randy Wallace'), (1,
# 'Teresa Cochran'), (1, 'Tyler Smith'), (2, 'user_1'), (3, 'user_2'), (4, 'user_3')]
from sqlalchemy import func
print(sessoin.query(func.count(User.username), User.username).group_by(User.username).all())
# 如果统计有多少行,需要用select_from 选定一个表
d = sessoin.query(func.count('*')).select_from(User).scalar()
# d ==19
print(d)
# 统计某一列的数量,则不需要
print(sessoin.query(func.count(User.username)).scalar())
# 关联查询
# User 和article是一对多的关系
# 这里是差找每一个user所拥有的article
for u, a in sessoin.query(User, Article).filter(User.id == Article.user_id).all():
print(u, a.title)
# 也可以使用join
sessoin.query(User).join(Article).filter(User.id == Article.user_id).all()
# 搜索有发表article的user。
sessoin.query(User).join(Article, Article.user_id == User.id).all()
# 删除数据前,先找到具体要删除的数据
# delete_data = sessoin.query(User).filter_by(username='user_1').first()
# sessoin.delete(delete_data)
# sessoin.commit()
# 需要数据一般是把数据找出来,在修改字段提交
# select_filterby = sessoin.query(User).filter_by(username='user_1').first()
# select_filterby.password = '123'
# sessoin.commit()
|