ORM 江湖
曾几何时,程序员因为惧怕SQL而在开发的时候小心翼翼的写着sql,心中总是少不了恐慌,万一不小心sql语句出错,搞坏了数据库怎么办?又或者为了获取一些数据,什么内外左右连接,函数存储过程等等。毫无疑问,不搞懂这些,怎么都觉得变扭,说不定某天就跳进了坑里,叫天天不应,喊地地不答。
ORM 的出现,让畏惧SQL的开发者,在坑里看见了爬出去的绳索,仿佛天空并不是那么黑暗,至少再暗,我们也有了眼睛。顾名思义,ORM 对象关系映射,简而言之,就是把数据库的一个个table(表),映射为编程语言的class(类)。
python中比较著名的ORM框架有很多,大名顶顶的 SQLAlchemy 是python世界里当仁不让的ORM框架。江湖中peewee,strom, pyorm,SQLObject 各领风骚,可是最终还是SQLAlchemy 傲视群雄。
SQLAlchemy 简介
SQLAlchemy 分为两个部分,一个用于 ORM 的对象映射,另外一个是核心的 SQL expression 。第一个很好理解,纯粹的ORM,后面这个不是 ORM,而是DBAPI的封装,当然也提供了很多方法,避免了直接写sql,而是通过一些sql表达式。使用 SQLAlchemy 则可以分为三种方式。
- 使用 sql expression ,通过 SQLAlchemy 的方法写sql表达式,简介的写sql
- 使用 raw sql, 直接书写 sql
- 使用 ORM 避开直接书写 sql
本文先探讨 SQLAlchemy的 sql expresstion 部分的用法。主要还是跟着官方的 SQL Expression Language Tutorial.介绍
为什么要学习 sql expresstion ,而不直接上 ORM?因为后面这个两个是 orm 的基础。并且,即是不使用orm,后面这两个也能很好的完成工作,并且代码的可读性更好。纯粹把SQLAlchemy当成dbapi使用。首先SQLAlchemy 内建数据库连接池,解决了连接操作相关繁琐的处理。其次,提供方便的强大的log功能,最后,复杂的查询语句,依靠单纯的ORM比较难实现。
实战
连接数据库
首先需要导入 sqlalchemy 库,然后建立数据库连接,这里使用 mysql。通过create_engine方法进行
from sqlalchemy import create_engine engine = create_engine("mysql://root:@localhost:3306/webpy",encoding="utf-8", echo=True)
create_engine 方法进行数据库连接,返回一个 db 对象。里面的参数表示
数据库类型://用户名:密码(没有密码则为空,不填)@数据库主机地址/数据库名"SELECT * FROM user") 也可以通过 engine 获取连接在查询,例如 conn = engine.connect() 通过 conn.execute()方法进行查询。两者有什么差别呢?
直接使用engine的execute执行sql的方式, 叫做connnectionless执行,
借助 engine.connect()获取conn, 然后通过conn执行sql, 叫做connection执行
主要差别在于是否使用transaction模式, 如果不涉及transaction, 两种方法效果是一样的. 官网推荐使用后者。
定义表
定义数据表,才能进行sql表达式的操作,毕竟sql表达式的表的确定,是sqlalchemy制定的,如果数据库已经存在了数据表还需要定义么?当然,这里其实是一个映射关系,如果不指定,查询表达式就不知道是附加在那个表的操作,当然定义的时候,注意表名和字段名,代码和数据的必须保持一致。定义好之后,就能创建数据表,一旦创建了,再次运行创建的代码,数据库是不会创建的。
# -*- coding: utf-8 -*- __author__ = 'ghost' from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey # 连接数据库 engine = create_engine("mysql://root:@localhost:3306/webpy",encoding="utf-8", echo=True) # 获取元数据 metadata = MetaData() # 定义表 user = Table('user', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), Column('fullname', String(40)), ) address = Table('address', metadata, Column('id', Integer, primary_key=True), Column('user_id', None, ForeignKey('user.id')), Column('email', String(60), nullable=False) ) # 创建数据表,如果数据表存在,则忽视 metadata.create_all(engine) # 获取数据库连接 conn = engine.connect()
插入 insert
有了数据表和连接对象,对应数据库操作就简单了。
> i = user.insert() # 使用查询 > i <sqlalchemy.sql.dml.Insert object at 0x0000000002637748> > print i # 内部构件的sql语句 INSERT INTO "user" (id, name, fullname) VALUES (:id, :name, :fullname) > u = dict(name='jack', fullname='jack Jone') > r = conn.execute(i, **u) # 执行查询,第一个为查询对象,第二个参数为一个插入数据字典,如果插入的是多个对象,就把对象字典放在列表里面 > r <sqlalchemy.engine.result.ResultProxy object at 0x0000000002EF9390> > r.inserted_primary_key # 返回插入行 主键 id [4L] > addresses [{'user_id': 1, 'email': 'jack@yahoo.com'}, {'user_id': 1, 'email': 'jack@msn.com'}, {'user_id': 2, 'email': 'www@www.org'}, {'user_id': 2, 'email': 'wendy@aol.com'}] > i = address.insert() > r = conn.execute(i, addresses) # 插入多条记录 > r <sqlalchemy.engine.result.ResultProxy object at 0x0000000002EB5080> > r.rowcount #返回影响的行数 4L > i = user.insert().values(name='tom', fullname='tom Jim') > i.compile() <sqlalchemy.sql.compiler.SQLCompiler object at 0x0000000002F6F390> > print i.compile() INSERT INTO "user" (name, fullname) VALUES (:name, :fullname) > print i.compile().params {'fullname': 'tom Jim', 'name': 'tom'} > r = conn.execute(i) > r.rowcount 1L
查询 select
查询方式很灵活,多数时候使用 sqlalchemy.sql 下面的 select方法
> s = select([user]) # 查询 user表 > s <sqlalchemy.sql.selectable.Select at 0x25a7748; Select object> > print s SELECT "user".id, "user".name, "user".fullname FROM "user"
如果需要查询自定义的字段,可是使用 user 的cloumn 对象,例如
> user.c # 表 user 的字段column对象 <sqlalchemy.sql.base.ImmutableColumnCollection object at 0x0000000002E804A8> > print user.c ['user.id', 'user.name', 'user.fullname'] > s = select([user.c.name,user.c.fullname]) > r = conn.execute(s) > r <sqlalchemy.engine.result.ResultProxy object at 0x00000000025A7748> > r.rowcount # 影响的行数 5L > ru = r.fetchall() > ru [(u'hello', u'hello world'), (u'Jack', u'Jack Jone'), (u'Jack', u'Jack Jone'), (u'jack', u'jack Jone'), (u'tom', u'tom Jim')] > r <sqlalchemy.engine.result.ResultProxy object at 0x00000000025A7748> > r.closed # 只要 r.fetchall() 之后,就会自动关闭 ResultProxy 对象 True
同时查询两个表
> s = select([user.c.name, address.c.user_id]).where(user.c.id==address.c.user_id) # 使用了字段和字段比较的条件 > s <sqlalchemy.sql.selectable.Select at 0x2f03390; Select object> > print s SELECT "user".name, address.user_id FROM "user", address WHERE "user".id = address.user_id
操作符
> print user.c.id == address.c.user_id # 返回一个编译的字符串 "user".id = address.user_id > print user.c.id == 7 "user".id = :id_1 # 编译成为带参数的sql 语句片段字符串 > print user.c.id != 7 "user".id != :id_1 > print user.c.id > 7 "user".id > :id_1 > print user.c.id == None "user".id IS NULL > print user.c.id + address.c.id # 使用两个整形的变成 + "user".id + address.id > print user.c.name + address.c.email # 使用两个字符串 变成 || "user".name || address.email
操作连接
这里的连接指条件查询的时候,逻辑运算符的连接,即 and or 和 not
> print and_( user.c.name.like('j%'), user.c.id == address.c.user_id, or_( address.c.email == 'wendy@aol.com', address.c.email == 'jack@yahoo.com' ), not_(user.c.id>5)) "user".name LIKE :name_1 AND "user".id = address.user_id AND (address.email = :email_1 OR address.email = :email_2) AND "user".id <= :id_1 >
得到的结果为 编译的sql语句片段,下面看一个完整的例子
> se_sql = [(user.c.fullname +", " + address.c.email).label('title')] > wh_sql = and_( user.c.id == address.c.user_id, user.c.name.between('m', 'z'), or_( address.c.email.like('%@aol.com'), address.c.email.like('%@msn.com') ) ) > print wh_sql "user".id = address.user_id AND "user".name BETWEEN :name_1 AND :name_2 AND (address.email LIKE :email_1 OR address.email LIKE :email_2) > s = select(se_sql).where(wh_sql) > print s SELECT "user".fullname || :fullname_1 || address.email AS title FROM "user", address WHERE "user".id = address.user_id AND "user".name BETWEEN :name_1 AND :name_2 AND (address.email LIKE :email_1 OR address.email LIKE :email_2) > r = conn.execute(s) > r.fetchall()
使用 raw sql 方式
遇到负责的sql语句的时候,可以使用 sqlalchemy.sql 下面的 text 函数。将字符串的sql语句包装编译成为 execute执行需要的sql对象。例如:、
> text_sql = "SELECT id, name, fullname FROM user WHERE id=:id" # 原始sql语句,参数用( :value)表示 > s = text(text_sql) > print s SELECT id, name, fullname FROM user WHERE id=:id > s <sqlalchemy.sql.elements.TextClause object at 0x0000000002587668> > conn.execute(s, id=3).fetchall() # id=3 传递:id参数 [(3L, u'Jack', u'Jack Jone')]
连接 join
连接有join 和 outejoin 两个方法,join 有两个参数,第一个是join 的表,第二个是on 的条件,joing之后必须要配合select_from 方法:
> print user.join(address) "user" JOIN address ON "user".id = address.user_id # 因为开启了外键 ,所以join 能只能识别 on 条件 > print user.join(address, address.c.user_id==user.c.id) # 手动指定 on 条件 "user" JOIN address ON address.user_id = "user".id > s = select([user.c.name, address.c.email]).select_from(user.join(address, user.c.id==address.c.user_id)) # 被jion的sql语句需要用 select_from方法配合 > s <sqlalchemy.sql.selectable.Select at 0x2eb63c8; Select object> > print s SELECT "user".name, address.email FROM "user" JOIN address ON "user".id = address.user_id > conn.execute(s).fetchall() [(u'hello', u'jack@yahoo.com'), (u'hello', u'jack@msn.com'), (u'hello', u'jack@yahoo.com'), (u'hello', u'jack@msn.com'), (u'Jack', u'www@www.org'), (u'Jack', u'wendy@aol.com'), (u'Jack', u'www@www.org'), (u'Jack', u'wendy@aol.com')]
排序 分组 分页
排序使用 order_by 方法,分组是 group_by ,分页自然就是limit 和 offset两个方法配合
> s = select([user.c.name]).order_by(user.c.name) # order_by > print s SELECT "user".name FROM "user" ORDER BY "user".name > s = select([user]).order_by(user.c.name.desc()) > print s SELECT "user".id, "user".name, "user".fullname FROM "user" ORDER BY "user".name DESC > s = select([user]).group_by(user.c.name) # group_by > print s SELECT "user".id, "user".name, "user".fullname FROM "user" GROUP BY "user".name > s = select([user]).order_by(user.c.name.desc()).limit(1).offset(3) # limit(1).offset(3) > print s SELECT "user".id, "user".name, "user".fullname FROM "user" ORDER BY "user".name DESC LIMIT :param_1 OFFSET :param_2 [(4L, u'jack', u'jack Jone')]
更新 update
前面都是一些查询,更新和插入的方法很像,都是 表下面的方法,不同的是,update 多了一个 where 方法 用来选择过滤
> s = user.update() > print s UPDATE "user" SET id=:id, name=:name, fullname=:fullname > s = user.update().values(fullname=user.c.name) # values 指定了更新的字段 > print s UPDATE "user" SET fullname="user".name > s = user.update().where(user.c.name == 'jack').values(name='ed') # where 进行选择过滤 > print s UPDATE "user" SET name=:name WHERE "user".name = :name_1 > r = conn.execute(s) > print r.rowcount # 影响行数 3
还有一个高级用法,就是一次命令执行多个记录的更新,需要用到 bindparam 方法
> s = user.update().where(user.c.name==bindparam('oldname')).values(name=bindparam('newname')) # oldname 与下面的传入的从拿书进行绑定,newname也一样 > print s UPDATE "user" SET name=:newname WHERE "user".name = :oldname > u = [{'oldname':'hello', 'newname':'edd'}, {'oldname':'ed', 'newname':'mary'}, {'oldname':'tom', 'newname':'jake'}] > r = conn.execute(s, u) > r.rowcount 5L
删除 delete
删除比较容易,调用 delete方法即可,不加 where 过滤,则删除所有数据,但是不会drop掉表,等于清空了数据表
> r = conn.execute(address.delete()) # 清空表 > print r <sqlalchemy.engine.result.ResultProxy object at 0x0000000002EAF550> > r.rowcount 8L > r = conn.execute(users.delete().where(users.c.name > 'm')) # 删除记录 > r.rowcount 3L
flask-sqlalchemy
SQLAlchemy已经成为了python世界里面orm的标准,flask是一个轻巧的web框架,可以自由的使用orm,其中flask-sqlalchemy是专门为flask指定的插件。
安装flask-sqlalchemy
pip install flask-sqlalchemy
初始化sqlalchemy
from flask import Flask from flask.ext.sqlalchemy import SQLAlchemy app = Flask(__name__) # dialect+driver://username:password@host:port/database"htmlcode">class User(db.Model): """ 定义了三个字段, 数据库表名为model名小写 """ id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True) email = db.Column(db.String(120), unique=True) def __init__(self, username, email): self.username = username self.email = email def __repr__(self): return '<User %r>' % self.username def save(self): db.session.add(self) db.session.commit()创建数据表
数据包的创建使用sqlalchemy app,如果表已经存在,则忽略,如果不存在,则新建> from yourapp import db, User > u = User(username='admin', email='admin@example.com') # 创建实例 > db.session.add(u) # 添加session > db.session.commit() # 提交查询 > users = User.query.all() # 查询需要注意的是,如果要插入中文,必须插入 unicode字符串
> u = User(username=u'人世间', email='rsj@example.com') > u.save()定义关系
关系型数据库,最重要的就是关系。通常关系分为 一对一(例如无限级栏目),一对多(文章和栏目),多对多(文章和标签)one to many:
我们定义一个Category(栏目)和Post(文章),两者是一对多的关系,一个栏目有许多文章,一个文章属于一个栏目。class Category(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(50)) def __init__(self, name): self.name = name def __repr__(self): return '<Category %r>' % self.name class Post(db.Model): """ 定义了五个字段,分别是 id,title,body,pub_date,category_id """ id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(80)) body = db.Column(db.Text) pub_date = db.Column(db.String(20)) # 用于外键的字段 category_id = db.Column(db.Integer, db.ForeignKey('category.id')) # 外键对象,不会生成数据库实际字段 # backref指反向引用,也就是外键Category通过backref(post_set)查询Post category = db.relationship('Category', backref=db.backref('post_set', lazy='dynamic')) def __init__(self, title, body, category, pub_date=None): self.title = title self.body = body if pub_date is None: pub_date = time.time() self.pub_date = pub_date self.category = category def __repr__(self): return '<Post %r>' % self.title def save(self): db.session.add(self) db.session.commit()如何使用查询呢?
> c = Category(name='Python') > c <Category 'Python'> > c.post_set <sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B58F60> > c.post_set.all() [] > p = Post(title='hello python', body='python is cool', category=c) > p.save() > c.post_set <sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B73710> > c.post_set.all() # 反向查询 [<Post u'hello python'>] > p <Post u'hello python'> > p.category <Category u'Python'> # 也可以使用category_id 字段来添加 > p = Post(title='hello flask', body='flask is cool', category_id=1) > p.save()many to many (评论已经指出,这样的做法无法关联删除,简书没有删除线格式,多多对例子作废,在此提示,以免被误导)
对于多对多的关系,往往是定义一个两个model的id的另外一张表,例如 Post 和 Tag之间是多对多,需要定义一个 Post_Tag的表post_tag = db.Table('post_tag', db.Column('post_id', db.Integer, db.ForeignKey('post.id')), db.Column('tag_id', db.Integer, db.ForeignKey('tag.id')) ) class Post(db.Model): id = db.Column(db.Integer, primary_key=True) # ... 省略 # 定义一个反向引用,tag可以通过 post_set查询到 post的集合 tags = db.relationship('Tag', secondary=post_tag, backref=db.backref('post_set', lazy='dynamic')) class Tag(db.Model): id = db.Column(db.Integer, primary_key=True) content = db.Column(db.String(10), unique=True) # 定义反向查询 posts = db.relationship('Post', secondary=post_tag, backref=db.backref('tag_set', lazy='dynamic')) def __init__(self, content): self.content = content def save(self): db.session.add(self) db.session.commit()查询:
> tag_list = [] > tags = ['python', 'flask', 'ruby', 'rails'] > for tag in tags: t = Tag(tag) tag_list.append(t) > tag_list [<f_sqlalchemy.Tag object at 0x0000000003B7CF28>, <f_sqlalchemy.Tag object at 0x0000000003B7CF98>, <f_sqlalchemy.Tag object at 0x0000000003B7CEB8>, <f_sqlalchemy.Tag object at 0x0000000003B7CE80>] > p <Post u'hello python'> > p.tags [] > p.tags = tag_list # 添加多对多的数据 > p.save() > p.tags [<f_sqlalchemy.Tag object at 0x0000000003B7CF28>, <f_sqlalchemy.Tag object at 0x0000000003B7CF98>, <f_sqlalchemy.Tag object at 0x0000000003B7CEB8>, <f_sqlalchemy.Tag object at 0x0000000003B7CE80>] > p.tag_set # 反向查询 <sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B7C080> > p.tag_set.all() [<f_sqlalchemy.Tag object at 0x0000000003B7CF28>, <f_sqlalchemy.Tag object at 0x0000000003B7CF98>, <f_sqlalchemy.Tag object at 0x0000000003B7CEB8>, <f_sqlalchemy.Tag object at 0x0000000003B7CE80>] > t = Tag.query.all()[1] > t <f_sqlalchemy.Tag object at 0x0000000003B7CF28> > t.content u'python' > t.posts [<Post u'hello python'>] > t.post_set <sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B7C358> > t.post_set.all() [<Post u'hello python'>] self one to one自身一对一也是常用的需求,比如无限分级栏目
class Category(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(50)) # 父级 id pid = db.Column(db.Integer, db.ForeignKey('category.id')) # 父栏目对象 pcategory = db.relationship('Category', uselist=False, remote_side=[id], backref=db.backref('scategory', uselist=False)) def __init__(self, name, pcategory=None): self.name = name self.pcategory = pcategory def __repr__(self): return '<Category %r>' % self.name def save(self): db.session.add(self) db.session.commit()查询:
> p = Category('Python') > p <Category 'Python'> > p.pid > p.pcategory # 查询父栏目 > p.scategory # 查询子栏目 > f = Category('Flask', p) > f.save() > f <Category u'Flask'> > f.pid 1L > f.pcategory # 查询父栏目 <Category u'Python'> > f.scategory # 查询父栏目 > p.scategory # 查询子栏目 <Category u'Flask'>关于 flask-sqlalchemy 定义models的简单应用就这么多,更多的技巧在于如何查询。
稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!
昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。
这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。
而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?
更新日志
- 小骆驼-《草原狼2(蓝光CD)》[原抓WAV+CUE]
- 群星《欢迎来到我身边 电影原声专辑》[320K/MP3][105.02MB]
- 群星《欢迎来到我身边 电影原声专辑》[FLAC/分轨][480.9MB]
- 雷婷《梦里蓝天HQⅡ》 2023头版限量编号低速原抓[WAV+CUE][463M]
- 群星《2024好听新歌42》AI调整音效【WAV分轨】
- 王思雨-《思念陪着鸿雁飞》WAV
- 王思雨《喜马拉雅HQ》头版限量编号[WAV+CUE]
- 李健《无时无刻》[WAV+CUE][590M]
- 陈奕迅《酝酿》[WAV分轨][502M]
- 卓依婷《化蝶》2CD[WAV+CUE][1.1G]
- 群星《吉他王(黑胶CD)》[WAV+CUE]
- 齐秦《穿乐(穿越)》[WAV+CUE]
- 发烧珍品《数位CD音响测试-动向效果(九)》【WAV+CUE】
- 邝美云《邝美云精装歌集》[DSF][1.6G]
- 吕方《爱一回伤一回》[WAV+CUE][454M]