これは、なにをしたくて書いたもの?
前にPythonのORMであるSQLModelを試してみました。
PythonのORM、SQLModelをMySQLで試す - CLOVER🍀
SQLModelは非同期操作もできるようなのですが、SQLモデルのドキュメントを見ても使い方がわからなかったので少しメモして
おこうかなと。
とてもとてもハマりましたが…。
SQLModel+非同期操作
SQLModelのドキュメントはこちらですが、非同期IO…いわゆるasync/awaitと組み合わせる方法は書かれていません。
Advanced User Guideに記載されそうではありますが、現時点では未記載ですね。
The Advanced User Guide is gradually growing, you can already read about some advanced topics.
How to use async and await with the async session.
Advanced User Guide - SQLModel
ではどうしたらいいのかというところですが、SQLAlchemyの非同期IOのサポートと組み合わせます。
Asynchronous I/O (asyncio) — SQLAlchemy 2.0 Documentation
このページがとても重要ですね。
そしてSQLModelのext内にあるAsyncSessionと組み合わせることになります。
https://github.com/fastapi/sqlmodel/blob/0.0.22/sqlmodel/ext/asyncio/session.py
今回はMySQLでSQLModelを非同期IOを使って操作することを試してみましょう。
環境
今回の環境はこちら。
$ python3 --version Python 3.12.3 $ uv --version uv 0.5.29
MySQLは172.17.0.2でアクセスできるものとします。
MySQL localhost:33060+ ssl practice SQL > select version(); +-----------+ | version() | +-----------+ | 8.4.4 | +-----------+ 1 row in set (0.0006 sec)
プロジェクトを作成する
まずはプロジェクトの作成。
$ uv init --vcs none sqlmodel-asyncio-example $ cd sqlmodel-asyncio-example $ rm hello.py
SQLModelとMySQLのドライバーのインストール。
$ uv add sqlmodel $ uv add aiomysql[rsa]
MySQLに対する非同期IOのドライバーは、aiomysqlにすることにします。前にこちらで試してみました。
PythonでMySQLに非同期にアクセスするaiomysqlを、MySQL 8.4で試す - CLOVER🍀
テストなどに必要なライブラリーをインストール。
$ uv add --dev pytest pytest-asyncio mypy ruff
確認はテストコードで行うことにします。
pyproject.toml
[project]
name = "sqlmodel-asyncio-example"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"aiomysql[rsa]>=0.2.0",
"sqlmodel>=0.0.22",
]
[dependency-groups]
dev = [
"mypy>=1.15.0",
"pytest>=8.3.4",
"pytest-asyncio>=0.25.3",
"ruff>=0.9.5",
]
[tool.mypy]
strict = true
disallow_any_unimported = true
disallow_any_expr = true
disallow_any_explicit = true
warn_unreachable = true
pretty = true
インストールされたライブラリーの一覧。
$ uv pip list Package Version ----------------- ------- aiomysql 0.2.0 annotated-types 0.7.0 cffi 1.17.1 cryptography 44.0.0 greenlet 3.1.1 iniconfig 2.0.0 mypy 1.15.0 mypy-extensions 1.0.0 packaging 24.2 pluggy 1.5.0 pycparser 2.22 pydantic 2.10.6 pydantic-core 2.27.2 pymysql 1.1.1 pytest 8.3.4 pytest-asyncio 0.25.3 ruff 0.9.5 sqlalchemy 2.0.38 sqlmodel 0.0.22 typing-extensions 4.12.2
SQLModel+非同期IOを試してみる
それでは、SQLModelで非同期IOを使ってみましょう。
モデルとテーブルを定義する。
まずはモデルとテーブルの作成から。
models.py
import asyncio from sqlalchemy.ext.asyncio import create_async_engine from sqlmodel import Field, Relationship, SQLModel class Book(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) isbn: str = Field(unique=True) title: str price: int class User(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) first_name: str last_name: str age: int posts: list["Post"] = Relationship(back_populates="user") class Post(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) title: str url: str user_id: int | None = Field(default=None, foreign_key="user.id") user: User | None = Relationship(back_populates="posts") async def create_db_and_tables() -> None: url = "mysql+aiomysql://kazuhira:password@172.17.0.2:3306/practice" engine = create_async_engine(url, echo=True) async with engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all) await engine.dispose() if __name__ == "__main__": asyncio.run(create_db_and_tables())
モデルの定義は同期版と変わりません。リレーションもつけています。
AsyncEngineの作成とマイグレーション。
async def create_db_and_tables() -> None: url = "mysql+aiomysql://kazuhira:password@172.17.0.2:3306/practice" engine = create_async_engine(url, echo=True) async with engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all) await engine.dispose()
SQLAlchemyをaiomysqlで使う場合の接続URLの書き方は、このあたりを見ればわかります。
Engine Configuration — SQLAlchemy 2.0 Documentation
MySQL and MariaDB — SQLAlchemy 2.0 Documentation
Engineの作成をcreate_async_engineを使うようにして、あとはasync/awaitを使っておけばいいんだろうと思っていたら
ここでもしっかりハマりまして…。
最初、AsyncEngine#disposeを書いていなかったら
await engine.dispose()
マイグレーション実行時にこんなエラーがずっと出ていました。
Exception ignored in: <function Connection.__del__ at 0x74a9f9bbcc20>
Traceback (most recent call last):
File "/path/to/sqlmodel-asyncio-example/.venv/lib/python3.12/site-packages/aiomysql/connection.py", line 1131, in __del__
self.close()
File "/path/to/sqlmodel-asyncio-example/.venv/lib/python3.12/site-packages/aiomysql/connection.py", line 339, in close
self._writer.transport.close()
File "/usr/lib/python3.12/asyncio/selector_events.py", line 1211, in close
super().close()
File "/usr/lib/python3.12/asyncio/selector_events.py", line 875, in close
self._loop.call_soon(self._call_connection_lost, None)
File "/usr/lib/python3.12/asyncio/base_events.py", line 795, in call_soon
self._check_closed()
File "/usr/lib/python3.12/asyncio/base_events.py", line 541, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
AsyncEngineはコンテキストから外れた後はdisposeした方がよさそうです。
Unlike when using blocking IO, SQLAlchemy cannot properly dispose of these connections within methods like del or weakref finalizers as there is no opportunity to invoke await. Failing to explicitly dispose of the engine when it falls out of scope may result in warnings emitted to standard out resembling the form RuntimeError: Event loop is closed within garbage collection.
Asynchronous I/O (asyncio) — SQLAlchemy 2.0 Documentation
サンプルコード内にも書いてありました。
... # for AsyncEngine created in function scope, close and ... # clean-up pooled connections ... await engine.dispose()
これを見落としていた結果、テストコードでもすごくハマりました…。
またマイグレーションを実行するコードの書き方もわからなくて、そこそこ調べることになりました…。
await conn.run_sync(SQLModel.metadata.create_all)
マイグレーションの実行。
$ uv run models.py
結果。
2025-02-11 00:17:57,313 INFO sqlalchemy.engine.Engine SELECT DATABASE()
2025-02-11 00:17:57,313 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-02-11 00:17:57,313 INFO sqlalchemy.engine.Engine SELECT @@sql_mode
2025-02-11 00:17:57,313 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-02-11 00:17:57,313 INFO sqlalchemy.engine.Engine SELECT @@lower_case_table_names
2025-02-11 00:17:57,313 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-02-11 00:17:57,314 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-02-11 00:17:57,314 INFO sqlalchemy.engine.Engine DESCRIBE `practice`.`book`
2025-02-11 00:17:57,314 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-02-11 00:17:57,315 INFO sqlalchemy.engine.Engine DESCRIBE `practice`.`user`
2025-02-11 00:17:57,315 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-02-11 00:17:57,315 INFO sqlalchemy.engine.Engine DESCRIBE `practice`.`post`
2025-02-11 00:17:57,315 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-02-11 00:17:57,316 INFO sqlalchemy.engine.Engine
CREATE TABLE book (
id INTEGER NOT NULL AUTO_INCREMENT,
isbn VARCHAR(255) NOT NULL,
title VARCHAR(255) NOT NULL,
price INTEGER NOT NULL,
PRIMARY KEY (id),
UNIQUE (isbn)
)
2025-02-11 00:17:57,316 INFO sqlalchemy.engine.Engine [no key 0.00008s] ()
2025-02-11 00:17:57,366 INFO sqlalchemy.engine.Engine
CREATE TABLE user (
id INTEGER NOT NULL AUTO_INCREMENT,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
age INTEGER NOT NULL,
PRIMARY KEY (id)
)
2025-02-11 00:17:57,367 INFO sqlalchemy.engine.Engine [no key 0.00037s] ()
2025-02-11 00:17:57,411 INFO sqlalchemy.engine.Engine
CREATE TABLE post (
id INTEGER NOT NULL AUTO_INCREMENT,
title VARCHAR(255) NOT NULL,
url VARCHAR(255) NOT NULL,
user_id INTEGER,
PRIMARY KEY (id),
FOREIGN KEY(user_id) REFERENCES user (id)
)
2025-02-11 00:17:57,411 INFO sqlalchemy.engine.Engine [no key 0.00044s] ()
2025-02-11 00:17:57,461 INFO sqlalchemy.engine.Engine COMMIT
これで準備ができました。
MySQL localhost:33060+ ssl practice SQL > show tables; +--------------------+ | Tables_in_practice | +--------------------+ | book | | post | | user | +--------------------+ 3 rows in set (0.0022 sec)
テストコードで動作確認する(単一テーブル)
次にテストコードでAsyncEngineおよびAsyncSessionを使って操作してみます。
作成したテストコードはこちら。単一テーブルへのアクセスです。
test_sqlmodel.py
import pytest import pytest_asyncio from sqlalchemy.ext.asyncio import create_async_engine from sqlmodel import col, delete, func, select, or_ from sqlmodel.ext.asyncio.session import AsyncSession from models import Book url = "mysql+aiomysql://kazuhira:password@172.17.0.2:3306/practice" engine = create_async_engine(url) @pytest_asyncio.fixture async def delete_all() -> None: async with AsyncSession(engine) as session: delete_all_statement = delete(Book) await session.exec(delete_all_statement) await session.commit() await engine.dispose() @pytest.mark.asyncio async def test_insert(delete_all: None) -> None: book1 = Book( isbn="978-4297141844", title="MySQL運用・管理[実践]入門 〜安全かつ高速にデータを扱う内部構造・動作原理を学ぶ", price=3080, ) book2 = Book( isbn="978-4798161488", title="MySQL徹底入門 第4版 MySQL 8.0対応", price=4180 ) book3 = Book( isbn="978-4814400812", title="Pythonクイックリファレンス 第4版", price=5280 ) async with AsyncSession(engine) as session: session.add(book1) session.add(book2) session.add(book3) assert book1.id is None assert book2.id is None assert book3.id is None await session.commit() await session.refresh( book1 ) ## これをやらないとAUTO_INREMENTなプライマリーキーの値を取得できない await session.refresh(book2) await session.refresh(book3) assert book1.id is not None assert book2.id is not None assert book3.id is not None await engine.dispose() @pytest.mark.asyncio async def test_select(delete_all: None) -> None: books = [ Book( isbn="978-4297141844", title="MySQL運用・管理[実践]入門 〜安全かつ高速にデータを扱う内部構造・動作原理を学ぶ", price=3080, ), Book( isbn="978-4798161488", title="MySQL徹底入門 第4版 MySQL 8.0対応", price=4180 ), Book( isbn="978-4814400812", title="Pythonクイックリファレンス 第4版", price=5280 ), ] async with AsyncSession(engine) as session: session.add_all(books) await session.commit() selected_book = ( await session.exec(select(Book).where(Book.isbn == "978-4297141844")) ).one() assert ( selected_book.title == "MySQL運用・管理[実践]入門 〜安全かつ高速にデータを扱う内部構造・動作原理を学ぶ" ) selected_books = ( await session.exec( select(Book).where(Book.price > 4000).order_by(col(Book.price).desc()) ) ).all() assert len(selected_books) == 2 assert selected_books[0].title == "Pythonクイックリファレンス 第4版" assert selected_books[0].price == 5280 assert selected_books[1].title == "MySQL徹底入門 第4版 MySQL 8.0対応" assert selected_books[1].price == 4180 selected_books2 = ( await session.exec( select(Book) .where(Book.price > 4000) .where(Book.isbn == "978-4798161488") .order_by(col(Book.price).desc()) ) ).all() assert len(selected_books2) == 1 assert selected_books2[0].title == "MySQL徹底入門 第4版 MySQL 8.0対応" selected_books = ( await session.exec( select(Book) .where(or_(Book.price > 4000, Book.isbn == "978-4798161488")) .order_by(col(Book.price).desc()) ) ).all() assert len(selected_books) == 2 assert selected_books[0].title == "Pythonクイックリファレンス 第4版" assert selected_books[0].price == 5280 assert selected_books[1].title == "MySQL徹底入門 第4版 MySQL 8.0対応" assert selected_books[1].price == 4180 await engine.dispose() @pytest.mark.asyncio async def test_rollback(delete_all: None) -> None: books = [ Book( isbn="978-4297141844", title="MySQL運用・管理[実践]入門 〜安全かつ高速にデータを扱う内部構造・動作原理を学ぶ", price=3080, ), Book( isbn="978-4798161488", title="MySQL徹底入門 第4版 MySQL 8.0対応", price=4180 ), Book( isbn="978-4814400812", title="Pythonクイックリファレンス 第4版", price=5280 ), ] async with AsyncSession(engine) as session: session.add_all(books) await session.rollback() assert (await session.scalar(select(func.count(Book.id)))) == 0 await engine.dispose()
こちらも、テストごとに接続プールを破棄していないと
await engine.dispose()
こういうエラーが出ることになります。
RuntimeError: Event loop is closed The garbage collector is trying to clean up non-checked-in connection <AdaptedConnection <aiomysql.connection.Connection object at 0x7e38f1453290>>, which will be terminated. Please ensure that SQLAlchemy pooled connections are returned to the pool explicitly, either by calling ``close()`` or by using appropriate context managers to manage their lifecycle. sys:1: SAWarning: The garbage collector is trying to clean up non-checked-in connection <AdaptedConnection <aiomysql.connection.Connection object at 0x7e38f1453290>>, which will be terminated. Please ensure that SQLAlchemy pooled connections are returned to the pool explicitly, either by calling ``close()`` or by using appropriate context managers to manage their lifecycle.
他の注意点としては、モデルの更新時にAUTO_INREMENTなどSQL実行後の値が入るものについては明示的に
AsyncSession#refreshが必要です。
async with AsyncSession(engine) as session: session.add(book1) session.add(book2) session.add(book3) assert book1.id is None assert book2.id is None assert book3.id is None await session.commit() await session.refresh( book1 ) ## これをやらないとAUTO_INREMENTなプライマリーキーの値を取得できない await session.refresh(book2) await session.refresh(book3) assert book1.id is not None assert book2.id is not None assert book3.id is not None
あとはクエリーを実行するメソッドには、随所にawaitが要ることに注意すればよいでしょう。
テストコードで動作確認する(複数テーブル・リレーション)
最後は、リレーションを使ったものです。
test_sqlmodel_relation.py
import pytest import pytest_asyncio from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.orm import selectinload from sqlmodel import col, delete, func, select, or_ from sqlmodel.ext.asyncio.session import AsyncSession from models import Post, User url = "mysql+aiomysql://kazuhira:password@172.17.0.2:3306/practice" engine = create_async_engine(url) @pytest_asyncio.fixture async def delete_all() -> None: async with AsyncSession(engine) as session: await session.exec(delete(Post)) await session.exec(delete(User)) await session.commit() await engine.dispose() @pytest.mark.asyncio async def test_insert(delete_all: None) -> None: async with AsyncSession(engine) as session: katsuo = User(first_name="カツオ", last_name="磯野", age=11) wakame = User(first_name="ワカメ", last_name="磯野", age=9) session.add(katsuo) session.add(wakame) await session.commit() katsuo_posts = [ Post( title="Pythonのパッケージ&プロジェクト管理ツールであるuvをUbuntu Linux 24.04 LTSにインストールする", url="https://kazuhira-r.hatenablog.com/entry/2024/12/27/225046", user=katsuo, ), Post( title="PythonでUUID バージョン7、ULIDを扱う", url="https://kazuhira-r.hatenablog.com/entry/2024/12/01/163724", user=katsuo, ), ] wakame_posts = [ Post( title="PyMySQLを使って、PythonからMySQLに接続してみる", url="https://kazuhira-r.hatenablog.com/entry/2024/11/06/235906", user=wakame, ) ] session.add_all(katsuo_posts) session.add_all(wakame_posts) await session.commit() await session.refresh(katsuo) await session.refresh(wakame) result_katsuo = (await session.exec(select(User).where(User.id == katsuo.id).options(selectinload(User.posts)))).one() result_wakame = (await session.exec(select(User).where(User.id == wakame.id).options(selectinload(User.posts)))).one() assert len(result_katsuo.posts) == 2 assert len(result_wakame.posts) == 1 await engine.dispose() @pytest.mark.asyncio async def test_select(delete_all: None) -> None: async with AsyncSession(engine) as session: katsuo = User(first_name="カツオ", last_name="磯野", age=11) wakame = User(first_name="ワカメ", last_name="磯野", age=9) katsuo_posts = [ Post( title="Pythonのパッケージ&プロジェクト管理ツールであるuvをUbuntu Linux 24.04 LTSにインストールする", url="https://kazuhira-r.hatenablog.com/entry/2024/12/27/225046", user=katsuo, ), Post( title="PythonでUUID バージョン7、ULIDを扱う", url="https://kazuhira-r.hatenablog.com/entry/2024/12/01/163724", user=katsuo, ), ] wakame_posts = [ Post( title="PyMySQLを使って、PythonからMySQLに接続してみる", url="https://kazuhira-r.hatenablog.com/entry/2024/11/06/235906", user=wakame, ) ] session.add(katsuo) session.add(wakame) session.add_all(katsuo_posts) session.add_all(wakame_posts) await session.commit() katsuo_post_results = ( await session.exec( select(Post) .where(Post.title.contains("Linux")) .options(selectinload(Post.user)) ) ).all() assert len(katsuo_post_results) == 1 assert ( katsuo_post_results[0].title == "Pythonのパッケージ&プロジェクト管理ツールであるuvをUbuntu Linux 24.04 LTSにインストールする" ) assert katsuo_post_results[0].user.first_name == "カツオ"
データの登録と取得。
@pytest.mark.asyncio async def test_insert(delete_all: None) -> None: async with AsyncSession(engine) as session: katsuo = User(first_name="カツオ", last_name="磯野", age=11) wakame = User(first_name="ワカメ", last_name="磯野", age=9) session.add(katsuo) session.add(wakame) await session.commit() katsuo_posts = [ Post( title="Pythonのパッケージ&プロジェクト管理ツールであるuvをUbuntu Linux 24.04 LTSにインストールする", url="https://kazuhira-r.hatenablog.com/entry/2024/12/27/225046", user=katsuo, ), Post( title="PythonでUUID バージョン7、ULIDを扱う", url="https://kazuhira-r.hatenablog.com/entry/2024/12/01/163724", user=katsuo, ), ] wakame_posts = [ Post( title="PyMySQLを使って、PythonからMySQLに接続してみる", url="https://kazuhira-r.hatenablog.com/entry/2024/11/06/235906", user=wakame, ) ] session.add_all(katsuo_posts) session.add_all(wakame_posts) await session.commit() await session.refresh(katsuo) await session.refresh(wakame) result_katsuo = (await session.exec(select(User).where(User.id == katsuo.id).options(selectinload(User.posts)))).one() result_wakame = (await session.exec(select(User).where(User.id == wakame.id).options(selectinload(User.posts)))).one() assert len(result_katsuo.posts) == 2 assert len(result_wakame.posts) == 1 await engine.dispose()
ポイントはここで、コミット後にモデルのインスタンスをrefreshする必要があります。
await session.commit() await session.refresh(katsuo) await session.refresh(wakame) result_katsuo = (await session.exec(select(User).where(User.id == katsuo.id).options(selectinload(User.posts)))).one() result_wakame = (await session.exec(select(User).where(User.id == wakame.id).options(selectinload(User.posts)))).one()
そしてさらにデータの取得し直しが必要で、関連するデータについてはselectinloadでロードする必要があります。
refreshがない場合はこんなエラーを見ることになりますし、
sqlalchemy.exc.StatementError: (sqlalchemy.exc.MissingGreenlet) greenlet_spawn has not been called; can't call await_only() here. Was IO attempted in an unexpected place?
selectinloadがない場合はリレーションを定義している部分にはデータが入りません。
もうひとつ、selectの例。
@pytest.mark.asyncio async def test_select(delete_all: None) -> None: async with AsyncSession(engine) as session: katsuo = User(first_name="カツオ", last_name="磯野", age=11) wakame = User(first_name="ワカメ", last_name="磯野", age=9) katsuo_posts = [ Post( title="Pythonのパッケージ&プロジェクト管理ツールであるuvをUbuntu Linux 24.04 LTSにインストールする", url="https://kazuhira-r.hatenablog.com/entry/2024/12/27/225046", user=katsuo, ), Post( title="PythonでUUID バージョン7、ULIDを扱う", url="https://kazuhira-r.hatenablog.com/entry/2024/12/01/163724", user=katsuo, ), ] wakame_posts = [ Post( title="PyMySQLを使って、PythonからMySQLに接続してみる", url="https://kazuhira-r.hatenablog.com/entry/2024/11/06/235906", user=wakame, ) ] session.add(katsuo) session.add(wakame) session.add_all(katsuo_posts) session.add_all(wakame_posts) await session.commit() katsuo_post_results = ( await session.exec( select(Post) .where(Post.title.contains("Linux")) .options(selectinload(Post.user)) ) ).all() assert len(katsuo_post_results) == 1 assert ( katsuo_post_results[0].title == "Pythonのパッケージ&プロジェクト管理ツールであるuvをUbuntu Linux 24.04 LTSにインストールする" ) assert katsuo_post_results[0].user.first_name == "カツオ"
こちらもやっぱりポイントはselectinloadになります。
今回はこんなところでしょうか。
おわりに
SQLModelを非同期IOで使ってみました。
同期版からけっこう単純に置き換えられるのかな?と思っていたのですが、意外と癖があってやたらハマりました…。
SQLModel自身には非同期IOに関する情報がなにも載っていないので、SQLAlchemyのドキュメントを見て事前知識をしっかり
付けておいた方が良さそうだなと思いました。