SQLAlchemy 2.0¶
This article explains the new updates from SQLAlchemy 1.4 and 2.0, as well as how GINO adapts to such changes.
SQLAlchemy 2.0 will deliver many breaking API changes, and SQLAlchemy 1.4 will be the "interim" version for people to eventually upgrade their software to use SQLAlchemy 2.0.
GINO |
SQLAlchemy |
Dialect |
Comments |
---|---|---|---|
1.0.x |
1.3.x |
Custom |
Current (old-)stable. |
1.1.x |
1.3.x |
Custom |
Next old-stable. |
1.2.x |
1.3.x |
Custom |
Future old-stable (maybe). |
1.4.x |
1.4.x |
Upstream |
2.0 Interim. |
2.0.x |
2.0.x |
Upstream |
Future stable. |
2.1.x |
2.0.x |
Upstream |
Future stable iterations. |
To make things easier, GINO will (luckily) also follow the same versions for the transition. That is, GINO 1.4 will be requiring SQLAlchemy 1.4, providing similar pre-1.4-compatible APIs with deprecations and options to switch to 2.0-API; GINO 2.0 needs SQLAlchemy 2.0 and provides new APIs only.
At the same time, GINO 1.1, 1.2 and 1.3 will be reserved as the old-stable versions with new features added on SQLAlchemy 1.3. And GINO post-2.0 won't match SQLAlchemy versions.
The Async Solution¶
Among all the exciting updates in SQLAlchemy 1.4 / 2.0, native async support is the most significant change for GINO. Simply speaking, SQLAlchemy 1.4 decided to make use of greenlet to mix asynchronous stuff into current code base, avoiding making everything async.
Let's say we have an asynchronous method to create an asyncpg connection:
import asyncpg
async def connect():
return await asyncpg.connect("postgresql:///")
And an end-user method to use it:
async def main():
conn = await connect()
now = await conn.fetchval("SELECT now()")
Now instead of directly calling connect()
from main()
, I would like to add some
additional logic - let's say, a sanity check:
async def safe_connect():
conn = await connect()
try:
await conn.execute("SELECT 1")
except Exception:
return None
else:
return conn
Then the end-user should modify main()
to:
async def main():
conn = await safe_connect()
if conn:
now = await conn.fetchval("SELECT now()")
OK, everything works so far, as they are all regular async code. Here's the interesting
part: safe_connect()
must not be an async def
method. With SQLAlchemy 1.4+, we
could:
from sqlalchemy.util import await_only, greenlet_spawn
def sync_safe_connect():
conn = await_only(connect())
try:
await_only(conn.execute("SELECT 1"))
except Exception:
return None
else:
return conn
async def safe_connect():
return await greenlet_spawn(sync_safe_connect)
Behind the scene, greenlet_spawn()
runs the given "sync" method in a greenlet, which
uses await_only()
to switch to the event loop and bridge the underlying async
methods. As sync_safe_connect()
is just a normal Python method, you can imagine how
it works together with lots of other "sync" code asynchronously.
We're not going deeper into the implementation, but this is basically how SQLAlchemy 1.4 mixes asyncpg driver into its "sync" code base, and still being able to provide async APIs on top of them.
Async SQLAlchemy¶
Although greenlet might be the only way to practically port SQLAlchemy to the async world natively without having to maintain 2 copies of the same code, introducing implicit asynchronous to a large sync code base is still a risky move.
The sync library existed for years, with many assumptions like using threading.Lock
to control concurrency. When switching to asynchronous programming, such primitives and
assumptions usually need to be reviewed and probably replaced by e.g. asyncio.Lock
.
However, the implicit approach is so convenient that most of the blocking code just
works without any changes at all, lowering the odds to review and find possible issues.
As a matter of fact, some issues can only be revealed under (heavy) concurrency. For
example, threading.Lock.acquire()
actually works fine in a single coroutine, but 2
concurrent coroutines
acquiring the same threading.Lock
may easily block the main thread forever. The same
applies to data structures like queues, etc. In short, there must be nothing to block
the main thread.
Lastly, having implicit asynchronous is a potential maintenance risk. Because the context switches are usually hidden behind regular sync methods, it is easy to forget such methods may lead to concurrency issues. Treating coroutines as OS threads is a good idea and it usually works, but they are fundamentally different. Extra care is always needed when trying to support both sync and async with the same code base.
However, such risks are mostly contained within SQLAlchemy itself. End-users are still
using regular explicit asynchronous APIs to leverage the async DB drivers. With proper
reviewing, testing and sufficient community exposure (that's GINO's part), it is still
possible and reliable to have a single SQLAlchemy with 2 sets of APIs (sync + async).
For sure, the APIs are not 100% identical - for example, the ORM lazy-loading won't work
because there is no place for await
in accessing attributes (GINO doesn't like such
implicitness anyways, so yeah).
To quickly get a picture of async SQLAlchemy (Core), here's a sample from SQLAlchemy:
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
async def async_main():
engine = create_async_engine(
"postgresql+asyncpg://scott:tiger@localhost/test", echo=True,
)
async with engine.begin() as conn:
await conn.run_sync(meta.drop_all)
await conn.run_sync(meta.create_all)
await conn.execute(
t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
)
async with engine.connect() as conn:
# select a Result, which will be delivered with buffered
# results
result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))
print(result.fetchall())
asyncio.run(async_main())
Auto-Commit Complication¶
After using PostgreSQL for a long time, I took many features for granted. The
auto-commit feature is one of them. We all know that BEGIN
starts a transaction and
COMMIT
/ ROLLBACK
ends it. But what is happening to SQL statements that is not
wrapped in BEGIN ... COMMIT
blocks?
If you do not issue a
BEGIN
command, then each individual statement has an implicitBEGIN
and (if successful)COMMIT
wrapped around it.—PostgreSQL Documentation, 3.4. Transactions
And yes, implicit ROLLBACK
if not successful. This is not directly named as an
"auto-commit" feature, but PostgreSQL does enforce it. Now imagine a database whose
"auto-commit" feature can be turned off - such individual statements are either executed
without ACID-transactions at all (no surprise, there are databases without ACID :doge:),
or the session is left with a open-ended transaction to be further committed.
PEP 249 (DB-API 2.0) - the standard for
Python database drivers - was created in such background stories. The assumption it made
appears to me like, the database may probably not support ACID; if it does, auto-commit
is usually not supported. Because it defined only commit()
and rollback()
on a
connection, but no begin()
. So I think DB-API assumes that, when executing
statements, the connection is automatically put in a transaction (if ACID is supported),
and you have to call commit()
to persist your changes. Closing a connection will
cause pending transactions rolled back automatically.
Note that if the database supports an auto-commit feature, this (the auto-commit feature -- GINO comments) must be initially off.
—PEP 249
As the API behavior is defined, database drivers for even e.g. PostgreSQL with enforced
"auto-commit" has to mimic such behavior. For example, psycopg2 will automatically emit
a BEGIN
to the database upon the first execution by default, so that such execution
is not auto-committed. The implicit transaction boundary is a very evil thing - people
constantly leaves transactions open (ever seen IDLE IN TRANSACTION
?), sometimes even
holding database locks and eventually causing a deadlock storm.
To work around this workaround, PEP 249 does say:
An interface method may be provided to turn it (the auto-commit feature) back on.
So for psycopg2, one could do this:
import psycopg2
conn = psycopg2.connect("postgresql:///")
conn.autocommit = True
conn.cursor().execute("SELECT now()")
Now the database correctly receives this SELECT
statement only, without any implicit
BEGIN
surprises. But when we want to have explicit transactions, DB-API only gives
us 2 options: 1) Do it manually:
conn.cursor().execute("BEGIN")
conn.cursor().execute("UPDATE ...")
conn.cursor().execute("COMMIT")
Or 2) turn auto-commit off again:
conn.autocommit = False
conn.cursor().execute("UPDATE ...")
conn.commit()
I know this is frustrating (or maybe people have accepted it), but newer database drivers like asyncpg does provide a cleaner API, by not complying to PEP 249:
import asyncpg
async def main():
conn = await asyncpg.connect("postgresql://")
print(await conn.fetchval("SELECT now()")) # SELECT now();
async with conn.transaction(): # BEGIN;
await conn.execute("UPDATE ...") # UPDATE ...;
# COMMIT;
It's much cleaner to see what's actually happening on the wire to the database. This is also how GINO works.
SQLAlchemy for DB-API¶
Because SQLAlchemy is built on PEP 249 (DB-API 2.0), its API is also greatly affected by the PEP standard. For example, imagine what SQL is actually executed by this code:
import sqlalchemy as sa
e = sa.create_engine("postgresql:///", future=True)
with e.connect() as conn:
conn.scalar(sa.text("SELECT now()"))
Only SELECT now()
? No. Here's the answer:
with e.connect() as conn: # BEGIN; SELECT version(); ...; ROLLBACK;
conn.scalar(sa.text("SELECT now()")) # BEGIN; SELECT now();
# ROLLBACK;
注解
We are using SQLAlchemy 2.0 API for simplification, by setting future=True
using
SQLAlchemy 1.4. It's way more complicated in SQLAlchemy 1.3 and I don't want to get
into that.
The reason behind this is, connections have "auto-commit" turned off by default. If
there is no transaction, an implicit BEGIN
will be automatically executed upon the
first statement. This applies to async SQLAlchemy too - even if the underlying asyncpg
is not DB-API-compliant, the AsyncpgDialect in SQLAlchemy still wrapped asyncpg and
simulated a compatible DB-API. Like this:
import sqlalchemy as sa
from sqlalchemy.ext.asyncio import create_async_engine
async def main():
e = create_async_engine("postgresql+asyncpg:///")
async with e.connect() as conn: # BEGIN; SELECT version(); ...; ROLLBACK;
await conn.execute(sa.text("SELECT now()")) # BEGIN; SELECT now();
# ROLLBACK;
If you want to modify the database permanently, you have to commit()
the implicit
transaction explicitly:
async def main():
e = ...
async with e.connect() as conn:
await conn.execute(sa.text("UPDATE ...")) # BEGIN; UPDATE ...;
await conn.commit() # COMMIT;
Or use the explicit transaction API:
async def main():
e = ...
async with e.connect() as conn:
async with conn.begin(): # BEGIN;
await conn.execute(sa.text("UPDATE..")) # UPDATE ...;
# COMMIT;
Please note that, SQLAlchemy 2.0 doesn't allow soft-nested transactions. In other words,
you cannot nest 2 async with conn.begin():
blocks like this:
async def main():
e = ...
async with e.connect() as conn:
async with conn.begin(): # BEGIN;
async with conn.begin(): # Error: a transaction is already begun
...
This limitation applies to implicit transactions too, even though it's weird:
async def main():
e = ...
async with e.connect() as conn:
await conn.execute(sa.text("SELECT now()")) # BEGIN; SELECT now();
async with conn.begin(): # Error: a transaction is already begun
...
You have to explicitly close this implicit transaction in order for an explicit transaction to start successfully:
async def main():
e = ...
async with e.connect() as conn:
await conn.execute(sa.text("SELECT now()")) # BEGIN; SELECT now();
await conn.rollback() # ROLLBACK;
async with conn.begin(): # BEGIN;
await conn.execute(sa.text("UPDATE..")) # UPDATE ...;
# COMMIT;
Similar to Core, SQLAlchemy ORM follows the same principal. Grab a session, use
it without begin()
, and when you want to commit, commit()
. Or, use an explicit
transaction in a with session.begin():
block. Personally I don't like that much, but
it is how SQLAlchemy manages transactions. Please follow the link above to read more.
SQLAlchemy AUTOCOMMIT¶
I know you already miss the WYSIWYG asyncpg and GINO API. Hang in there, let's build GINO 1.4 together with the SQLAlchemy AUTOCOMMIT feature.
To turn AUTOCOMMIT back on, we need to set the isolation_level
to AUTOCOMMIT
in
execution_options
:
async def main():
e = create_async_engine(
"postgresql+asyncpg:///",
execution_options={"isolation_level": "AUTOCOMMIT"},
)
async with e.connect() as conn:
await conn.execute(sa.text("SELECT now()")) # SELECT now();
Hooray! No more implicit BEGIN
magic. We're one step closer.
注解
There is also a keyword argument:
e = create_async_engine(
"postgresql+asyncpg:///",
isolation_level="AUTOCOMMIT",
)
But this is implemented very differently than execution_options
, and I don't
think it's working for GINO's use case.
The next question is, how do we explicitly start a transaction? Let's try begin()
:
async def main():
e = ...
async with e.connect() as conn:
await conn.execute(sa.text("SELECT now()")) # SELECT now();
async with conn.begin(): # Error: a transaction is already begun
...
Wait a second ... we've seen this error before! It is the implicit transaction
conflicting with the explicit transaction. But I thought we're in AUTOCOMMIT mode? Even
though the isolation_level
tell the driver not to send BEGIN
to the database,
SQLAlchemy still manages library-level transaction objects. We have to close the virtual
implicit transaction before starting an explicit transaction:
async def main():
e = ...
async with e.connect() as conn:
await conn.execute(sa.text("SELECT now()")) # SELECT now();
await conn.rollback() # no-op
async with conn.begin(): # no-op
await conn.execute(sa.text("UPDATE..")) # UPDATE ...;
Well, not quite what we expected. With AUTOCOMMIT set, all of begin()
, commit()
and rollback()
become no-ops.
Similar to the answers in psycopg2, we have 2 options here too: 1) manually execute transaction-control SQLs, or 2) turn off AUTOCOMMIT temporarily. As we want to be more compatible with SQLAlchemy, let's try 2):
async def main():
e = ...
async with e.connect() as conn:
await conn.execute(sa.text("SELECT now()")) # SELECT now();
await conn.rollback() # no-op
async with conn.execution_options(
isolation_level="READ COMMITTED"
).begin(): # BEGIN;
await conn.execute(sa.text("UPDATE..")) # UPDATE ...;
# COMMIT;
It's working! According to SQLAlchemy docs, execution_options()
creates a shallow
copy of the connection, and apply new values only to the copy. So the original
connection should still be in AUTOCOMMIT, right? Well...
async def main():
e = ...
async with e.connect() as conn:
...
async with conn.execution_options(
isolation_level="READ COMMITTED"
).begin(): # BEGIN;
await conn.execute(sa.text("UPDATE..")) # UPDATE ...;
# COMMIT;
await conn.execute(sa.text("SELECT now()")) # BEGIN; SELECT now();
# ROLLBACK;
Unfortunately, the implicit transaction is haunting us again. This is because both the
original connection and its shallow copy points to the same "DB-API" connection (in this
case, a SQLAlchemy wrapper of asyncpg connection), and setting isolation_level
modifies the value on "DB-API" connection.
Returning a SQLAlchemy connection back to the pool resets the isolation_level
to its
default value, and acquiring the same connection again will initialize the
isolation_level
with values from execution_options
of the engine. But if we want
to keep using the same connection without returning, we have to manually overwrite its
isolation_level
again:
async def main():
e = ...
async with e.connect() as conn:
...
async with conn.execution_options(
isolation_level="READ COMMITTED"
).begin(): # BEGIN;
await conn.execute(sa.text("UPDATE..")) # UPDATE ...;
# COMMIT;
conn.execution_options(isolation_level="AUTOCOMMIT")
await conn.execute(sa.text("SELECT now()")) # SELECT now();
Eventually we made it! 🎉
Encapsulating all such logic, GINO 1.4 could then provide decent WYSIWYG APIs again:
import gino
async def main():
engine = await gino.create_engine("postgresql:///")
async with engine.acquire() as conn:
await conn.scalar("SELECT now()") # SELECT now();
async with conn.transaction(): # BEGIN;
await conn.status("UPDATE ...") # UPDATE ...;
# COMMIT;
提示
Now I feel that "implementing" auto-commit feature is more like restoring to the original database behavior, and having auto-commit turned off by default should be considered as a new feature called "auto-begin" or "implicit transaction". And it's a bad design introduced in early PEP 249, affecting SQLAlchemy and the ecosystem.
Isolation Levels¶
By far, we only used 2 isolation_level
values:
AUTOCOMMIT
READ COMMITTED
AUTOCOMMIT
is not a valid PostgreSQL isolation level. It's only recognized and
consumed by the SQLAlchemy asyncpg dialect to bypass the "auto-begin" simulation.
READ COMMITTED
is the default PostgreSQL isolation level. You can verify this by
executing a SQL directly in a transaction:
# BEGIN;
BEGIN
# SHOW TRANSACTION ISOLATION LEVEL;
transaction_isolation
-----------------------
read committed
(1 row)
# ROLLBACK;
ROLLBACK
To start a transaction in a different isolation level, you may:
# BEGIN ISOLATION LEVEL SERIALIZABLE;
BEGIN
# SHOW TRANSACTION ISOLATION LEVEL;
transaction_isolation
-----------------------
serializable
(1 row)
# ROLLBACK;
ROLLBACK
As mentioned earlier, all PostgreSQL statements are executed in a transaction. If no
explicit BEGIN
in place, an implicit transaction is used. So this SQL also works
individually:
# SHOW TRANSACTION ISOLATION LEVEL;
transaction_isolation
-----------------------
read committed
(1 row)
But how could we modify the isolation level of such implicit transactions? The answer is to set isolation level session-wise. This affects all subsequent transactions, including both implicit and explicit ones, except for explicit transactions with explicit isolation levels:
# SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET
# SHOW TRANSACTION ISOLATION LEVEL;
transaction_isolation
-----------------------
serializable
(1 row)
# BEGIN;
BEGIN
# SHOW TRANSACTION ISOLATION LEVEL;
transaction_isolation
-----------------------
serializable
(1 row)
# ROLLBACK;
ROLLBACK
# BEGIN ISOLATION LEVEL READ COMMITTED;
BEGIN
# SHOW TRANSACTION ISOLATION LEVEL;
transaction_isolation
-----------------------
read committed
(1 row)
# ROLLBACK;
ROLLBACK
Then let's see how SQLAlchemy with asyncpg solves this problem:
async def main():
e = create_async_engine(
"postgresql+asyncpg:///",
execution_options={"isolation_level": "SERIALIZABLE"},
)
async with e.connect() as conn:
async with conn.begin(): # BEGIN ISOLATION LEVEL SERIALIZABLE;
await conn.execute(sa.text("UPDATE ...")) # UPDATE ...;
# COMMIT;
Under the neath, SQLAlchemy is leveraging asyncpg's
Connection.transaction(isolation="...")
to set isolation level per transaction. In
GINO, we just need to store the user-defined isolation level, and set before
transactions.
But there are 2 issues:
User-defined isolation level is not applied in PostgreSQL implicit transactions (a.k.a. auto-commit statements), because no one
SET SESSION
.asyncpg has a bug that
Connection.transaction(isolation="read_committed")
always emitBEGIN
without explicit isolation level, regardless of the actual default isolation level.
The asyncpg bug should be fixed from upstream, but we could leverage a session-wide isolation level setter from base SQLAlchemy dialect for PostgreSQL:
import sqlalchemy as sa
from sqlalchemy import event
from sqlalchemy.dialects.postgresql.base import PGDialect
from sqlalchemy.ext.asyncio import create_async_engine
async def main():
e = create_async_engine(
"postgresql+asyncpg:///",
execution_options={"isolation_level": "AUTOCOMMIT"},
)
def set_isolation_level(dbapi_conn, record):
PGDialect.set_isolation_level(
e.sync_engine.dialect,
dbapi_conn,
"SERIALIZABLE",
)
event.listen(e.sync_engine, "connect", set_isolation_level)
async with e.connect() as conn:
print(await conn.scalar(sa.text("SHOW TRANSACTION ISOLATION LEVEL")))
# Outputs: serializable