SQLAlchemy是Python SQL工具箱和ORM框架,它为应用程序开发人员提供了全面而灵活的SQL功能。它提供了一整套企业级持久化方案,旨在高效,高性能地访问数据库,并符合简单的Pythonic哲学。项目代码量比较大,接近200个文件,7万行代码, 我们一起来挑战一下。由于篇幅原因,分成上下两篇,上篇包括如下内容:
源码使用的版本是 1.3.0
, 对应的commitID是 740bb50c2
,和参考链接中官方文档1.3版本一致。项目目录大概包括:
目录 | 描述 |
---|---|
connectors | 连接 |
dialects | 方言 |
engine | 引擎 |
event | 事件 |
ext | 扩展功能 |
orm | orm |
pool | 连接池 |
sql | sql处理 |
util | 工具类 |
SQLAlchemy的架构图如下:
architecture
整体分成3层,从上到下分别是ORM,core和DBAPI,其中core,又分成左右两个区域。我们先学习其中的引擎,连接池,dialects(仅sqlite)和DBAPI部分,也就是架构图的右半侧。其中DBAPI(sqlite相关)是在python-core-library中提供。
先从使用DBAPI操作sqlite的API开始:
import sqlite3
con = sqlite3.connect('example.db')
cur = con.cursor()
# Create table
cur.execute('''CREATE TABLE stocks
(date text, trans text, symbol text, qty real, price real)''')
# Insert a row of data
cur.execute("INSERT INTO stocks VALUES ('2006-01-05','BUY','RHAT',100,35.14)")
# Save (commit) the changes
con.commit()
# Do this instead
t = ('RHAT',)
cur.execute('SELECT * FROM stocks WHERE symbol=?', t)
print(cur.fetchone())
# We can also close the connection if we are done with it.
# Just be sure any chang
con.close()
操作sqlite数据库主要包括了下面几个步骤:
对比一下使用sqlalchemy进行sqlite操作:
from sqlalchemy import create_engine
eng = create_engine("sqlite:///:memory:", echo=True)
conn = eng.connect()
conn.execute("create table x (a integer, b integer)")
conn.execute("insert into x (a, b) values (1, 1)")
conn.execute("insert into x (a, b) values (2, 2)")
result = conn.execute("select x.a, x.b from x")
assert result.keys() == ["a", "b"]
result = conn.execute('''
select x.a, x.b from x where a=1
union
select x.a, x.b from x where a=2
''')
assert result.keys() == ["a", "b"]
可以看到使用sqlalchemy后操作变的简单,把cursor,commit,fetch和close等操作隐藏到engine内部,简化成3步:
跟随create_engine的API,可以看到这里使用策略模式去创建不同的engine实现:
# engine/__init__.py
from . import strategies
default_strategy = "plain" # 默认
def create_engine(*args, **kwargs):
strategy = kwargs.pop("strategy", default_strategy)
strategy = strategies.strategies[strategy]
return strategy.create(*args, **kwargs)
默认的engine策略:
# engine/strategies.py
strategies = {}
class EngineStrategy(object):
def __init__(self):
strategies[self.name] = self
class DefaultEngineStrategy(EngineStrategy):
def create(self, name_or_url, **kwargs):
...
class PlainEngineStrategy(DefaultEngineStrategy):
name = "plain"
engine_cls = base.Engine # 引擎类
PlainEngineStrategy()
重点就在策略的create方法了, 去掉数据准备和异常处理后核心代码如下:
def create(self, name_or_url, **kwargs):
...
# get dialect class
u = url.make_url(name_or_url)
entrypoint = u._get_entrypoint()
dialect_cls = entrypoint.get_dialect_cls(u)
# create dialect
dialect = dialect_cls(**dialect_args)
# pool
poolclass = dialect_cls.get_pool_class(u)
pool = poolclass(creator, **pool_args)
# engine
engineclass = self.engine_cls
engine = engineclass(pool, dialect, u, **engine_args)
...
return engine
create函数可以理解为engine的创建模版,主要是下面3个步骤:
Engine的构造函数和connect方法如下:
class Engine(Connectable, log.Identified):
_connection_cls = Connection
def __init__(
self,
pool,
dialect,
url,
logging_name=None,
echo=None,
proxy=None,
execution_options=None,
):
self.pool = pool
self.url = url
self.dialect = dialect
self.engine = self
...
def connect(self, **kwargs):
return self._connection_cls(self, **kwargs)
engine主要功能就是管理和持有connection,pool和dialect,对外提供API。
dialect是根据url自动识别,使用PluginLoader进行动态加载:
class PluginLoader(object):
def __init__(self, group, auto_fn=None):
self.group = group
self.impls = {}
self.auto_fn = auto_fn
def load(self, name):
# import一次
if name in self.impls:
return self.impls[name]()
if self.auto_fn:
loader = self.auto_fn(name)
if loader:
self.impls[name] = loader
return loader()
...
sqlite-dialect使用下面的 __import__
动态加载模块:
def _auto_fn(name):
if "." in name:
dialect, driver = name.split(".")
else:
dialect = name
driver = "base"
if dialect in _translates:
translated = _translates[dialect]
dialect = translated
try:
# 动态加载模块
module = __import__("sqlalchemy.dialects.%s" % (dialect,)).dialects
except ImportError:
return None
module = getattr(module, dialect)
if hasattr(module, driver):
module = getattr(module, driver)
return lambda: module.dialect
else:
return None
registry = util.PluginLoader("sqlalchemy.dialects", auto_fn=_auto_fn)
不同方言实现需要提供一个dialect对象,在sqlite中是这样的:
## sqlalchemy/dialects/sqlite/__init__.py
base.dialect = dialect = pysqlite.dialect
## sqlalchemy/dialects/sqlite/pysqlite.py
class SQLiteDialect_pysqlite(SQLiteDialect):
pass
dialect = SQLiteDialect_pysqlite
SQLiteDialect功能相简单,一是决定POOL_CLASS的类型: memory实现使用的是SingletonThreadPool;db文件使用NullPool,下面分析Pool时候会用到。
class SQLiteDialect_pysqlite(SQLiteDialect):
@classmethod
def get_pool_class(cls, url):
if url.database and url.database != ":memory:":
return pool.NullPool
else:
return pool.SingletonThreadPool
二是提供包装DBAPI得到的connect:
class DefaultDialect(interfaces.Dialect):
...
def connect(self, *cargs, **cparams):
return self.dbapi.connect(*cargs, **cparams)
class SQLiteDialect_pysqlite(SQLiteDialect):
...
@classmethod
def dbapi(cls):
try:
from pysqlite2 import dbapi2 as sqlite
except ImportError:
try:
from sqlite3 import dbapi2 as sqlite # try 2.5+ stdlib name.
except ImportError as e:
raise e
return sqlite
def connect(self, *cargs, **cparams):
passphrase = cparams.pop("passphrase", "")
pragmas = dict((key, cparams.pop(key, None)) for key in self.pragmas)
conn = super(SQLiteDialect_pysqlcipher, self).connect(
*cargs, **cparams
)
conn.execute('pragma key="%s"' % passphrase)
for prag, value in pragmas.items():
if value is not None:
conn.execute('pragma %s="%s"' % (prag, value))
return conn
connect在SQLiteDialect_pysqlite类和父类DefaultDialect之间反复横跳,核心功能就是下面2句代码:
from sqlite3 import dbapi2 as sqlite
sqlite.connect(*cargs, **cparams)
Connection构造函数如下:
class Connection(Connectable):
def __init__(
self,
engine,
connection=None,
close_with_result=False,
_branch_from=None,
_execution_options=None,
_dispatch=None,
_has_events=None,
):
self.engine = engine
self.dialect = engine.dialect
self.__connection = engine.raw_connection()
...
connection主要使用engine.raw_connection创建了一个DBAPI连接
class Engine(Connectable, log.Identified):
def raw_connection(self, _connection=None):
return self._wrap_pool_connect(
self.pool.unique_connection, _connection
)
def _wrap_pool_connect(self, fn, connection):
dialect = self.dialect
try:
return fn()
except dialect.dbapi.Error as e:
...
pool.unique_connection负责创建数据库连接,这里的实现过程比较复杂,个人觉得也挺绕的,涉及Pool,ConnectionFairy和ConnectionRecord三个类。我们一点一点的跟踪:
class SingletonThreadPool(Pool):
def __init__(self, creator, pool_size=5, **kw):
Pool.__init__(self, creator, **kw)
self._conn = threading.local()
self._all_conns = set()
self.size = pool_size
def unique_connection(self):
return _ConnectionFairy._checkout(self)
def _do_get(self):
c = _ConnectionRecord(self)
self._conn.current = weakref.ref(c)
if len(self._all_conns) >= self.size:
self._cleanup()
self._all_conns.add(c)
return c
SingletonThreadPool主要在_do_get的实现,创建一个ConnectionRecor对象,然后将其加入到自己管理的集合中后再返回,标准的池操作了。如何通过unique_connection方法去触发_do_get方法并得到实际的db-connect
class _ConnectionFairy(object):
def __init__(self, dbapi_connection, connection_record, echo):
self.connection = dbapi_connection
self._connection_record = connection_record
@classmethod
def _checkout(cls, pool, threadconns=None, fairy=None):
if not fairy:
fairy = _ConnectionRecord.checkout(pool)
fairy._pool = pool
fairy._counter = 0
return fairy
...
class _ConnectionRecord(object):
def __init__(self, pool, connect=True):
self.__pool = pool
@classmethod
def checkout(cls, pool):
rec = pool._do_get()
try:
dbapi_connection = rec.get_connection()
except Exception as err:
...
fairy = _ConnectionFairy(dbapi_connection, rec, echo)
rec.fairy_ref = weakref.ref(
fairy,
lambda ref: _finalize_fairy
and _finalize_fairy(None, rec, pool, ref, echo),
)
...
return fairy
def get_connection(self):
pool = self.__pool
connection = pool.creator(self)
self.connection = connection
return connection
...
class DefaultEngineStrategy(EngineStrategy):
def create(self, name_or_url, **kwargs):
def connect(connection_record=None):
# dbapai-connection
return dialect.connect(*cargs, **cparams)
creator = pop_kwarg("creator", connect)
pool = poolclass(creator, **pool_args)
...
整个过程大概是这样的:
除了执行过程在来回穿插外,还因为ConnectionFairy和ConnectionRecord是循环依赖的:
class _ConnectionRecord(object):
fairy_ref = None
...
class _ConnectionFairy(object):
def __init__(self, dbapi_connection, connection_record, echo):
self._connection_record = connection_record
知道connection如何创建后,继续看connection使用execute方法执行sql语句:
def execute(self, object_, *multiparams, **params):
if isinstance(object_, util.string_types[0]):
return self._execute_text(object_, multiparams, params)
...
def _execute_text(self, statement, multiparams, params):
"""Execute a string SQL statement."""
dialect = self.dialect
parameters = _distill_params(multiparams, params)
ret = self._execute_context(
dialect,
dialect.execution_ctx_cls._init_statement,
statement,
parameters,
statement,
parameters,
)
return ret
def _execute_context(
self, dialect, constructor, statement, parameters, *args
):
conn = self.__connection
...
context = constructor(dialect, self, conn, *args)
...
cursor, statement, parameters = (
context.cursor,
context.statement,
context.parameters,
)
...
self.dialect.do_execute(
cursor, statement, parameters, context
)
...
result = context._setup_crud_result_proxy()
return result
execute还有一些其它分支,可以适用ORM等场景,本篇只介绍纯文本的sql
函数层层穿透后,主要包括下面三段代码:
dialect涉及的上下文context创建和sql执行:
class DefaultDialect(interfaces.Dialect):
def do_execute(self, cursor, statement, parameters, context=None):
cursor.execute(statement, parameters)
DefaultDialect.execution_ctx_cls = DefaultExecutionContext
可以看到执行语句就是使用cursor对象,和前面直接操作sqlite一致。每条sql执行的上下文context是下面方式构建的:
class DefaultExecutionContext(interfaces.ExecutionContext):
@classmethod
def _init_statement(
cls, dialect, connection, dbapi_connection, statement, parameters
):
self = cls.__new__(cls)
self.root_connection = connection
self._dbapi_connection = dbapi_connection
self.dialect = connection.dialect
...
self.parameters = [{}]
...
self.statement = self.unicode_statement = statement
self.cursor = self.create_cursor()
return self
def create_cursor(self):
return self._dbapi_connection.cursor()
sql执行的结果,在context._setup_crud_result_proxy
中返回ResultProxy对象。ResultProxy是一个可以迭代的对象,可以使用fetchone获取单条记录:
class ResultProxy(object):
def __iter__(self):
while True:
row = self.fetchone()
if row is None:
return
else:
yield row
def __next__(self):
row = self.fetchone()
if row is None:
raise StopIteration()
else:
return row
def fetchone(self):
try:
row = self._fetchone_impl()
if row is not None:
return self.process_rows([row])[0]
def _fetchone_impl(self):
try:
return self.cursor.fetchone()
except AttributeError:
return self._non_result(None)
对获取的记录还可以使用process_rows进行数据封装,这个以后再介绍。
我们完整的追逐了使用sqlalchemy执行sql语句的过程,可以简单小结如下:
下面的类图介绍的更详细, 完整展示了engine/pool/connection/dialect的关系:
deprecated是一个废弃API装饰器, 主要给一些不再支持/推荐的API加上使用警告和更替的方法:
def deprecated(version, message=None, add_deprecation_to_docstring=True):
if add_deprecation_to_docstring:
header = ".. deprecated:: %s %s" % (version, (message or ""))
else:
header = None
if message is None:
message = "Call to deprecated function %(func)s"
def decorate(fn):
return _decorate_with_warning(
fn,
exc.SADeprecationWarning,
message % dict(func=fn.__name__),
header,
)
return decorate
比如Connectable.contextual_connect的API这样使用:
class Connectable(object):
@util.deprecated(
"1.3",
"The :meth:`.Engine.contextual_connect` and "
":meth:`.Connection.contextual_connect` methods are deprecated. This "
"method is an artifact of the threadlocal engine strategy which is "
"also to be deprecated. For explicit connections from an "
":class:`.Engine`, use the :meth:`.Engine.connect` method.",
)
def contextual_connect(self, *arg, **kw):
...
这对库/框架的开发者非常有用,API的变动可以这种方式通知使用者,进行平滑的升级替换。
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/xpAo_XmbFlH4oQf94S9hbQ
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。