CLOVER🍀

That was when it all began.

SQLModelを非同期IO(async、await)とMySQLを組み合わせて使う

これは、なにをしたくて書いたもの?

前にPythonのORMであるSQLModelを試してみました。

PythonのORM、SQLModelをMySQLで試す - CLOVER🍀

SQLModelは非同期操作もできるようなのですが、SQLモデルのドキュメントを見ても使い方がわからなかったので少しメモして
おこうかなと。

とてもとてもハマりましたが…。

SQLModel+非同期操作

SQLModelのドキュメントはこちらですが、非同期IO…いわゆるasyncawaitと組み合わせる方法は書かれていません。

Learn - SQLModel

Advanced User Guideに記載されそうではありますが、現時点では未記載ですね。

The Advanced User Guide is gradually growing, you can already read about some advanced topics.

How to use async and await with the async session.

Advanced User Guide - SQLModel

ではどうしたらいいのかというところですが、SQLAlchemyの非同期IOのサポートと組み合わせます。

Asynchronous I/O (asyncio) — SQLAlchemy 2.0 Documentation

このページがとても重要ですね。

そしてSQLModelのext内にあるAsyncSessionと組み合わせることになります。

https://github.com/fastapi/sqlmodel/blob/0.0.22/sqlmodel/ext/asyncio/session.py

今回はMySQLでSQLModelを非同期IOを使って操作することを試してみましょう。

環境

今回の環境はこちら。

$ python3 --version
Python 3.12.3


$ uv --version
uv 0.5.29

MySQLは172.17.0.2でアクセスできるものとします。

 MySQL  localhost:33060+ ssl  practice  SQL > select version();
+-----------+
| version() |
+-----------+
| 8.4.4     |
+-----------+
1 row in set (0.0006 sec)

プロジェクトを作成する

まずはプロジェクトの作成。

$ uv init --vcs none sqlmodel-asyncio-example
$ cd sqlmodel-asyncio-example
$ rm hello.py

SQLModelとMySQLのドライバーのインストール。

$ uv add sqlmodel
$ uv add aiomysql[rsa]

MySQLに対する非同期IOのドライバーは、aiomysqlにすることにします。前にこちらで試してみました。

PythonでMySQLに非同期にアクセスするaiomysqlを、MySQL 8.4で試す - CLOVER🍀

テストなどに必要なライブラリーをインストール。

$ uv add --dev pytest pytest-asyncio mypy ruff

確認はテストコードで行うことにします。

pyproject.toml

[project]
name = "sqlmodel-asyncio-example"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
    "aiomysql[rsa]>=0.2.0",
    "sqlmodel>=0.0.22",
]

[dependency-groups]
dev = [
    "mypy>=1.15.0",
    "pytest>=8.3.4",
    "pytest-asyncio>=0.25.3",
    "ruff>=0.9.5",
]

[tool.mypy]
strict = true
disallow_any_unimported = true
disallow_any_expr = true
disallow_any_explicit = true
warn_unreachable = true
pretty = true

インストールされたライブラリーの一覧。

$ uv pip list
Package           Version
----------------- -------
aiomysql          0.2.0
annotated-types   0.7.0
cffi              1.17.1
cryptography      44.0.0
greenlet          3.1.1
iniconfig         2.0.0
mypy              1.15.0
mypy-extensions   1.0.0
packaging         24.2
pluggy            1.5.0
pycparser         2.22
pydantic          2.10.6
pydantic-core     2.27.2
pymysql           1.1.1
pytest            8.3.4
pytest-asyncio    0.25.3
ruff              0.9.5
sqlalchemy        2.0.38
sqlmodel          0.0.22
typing-extensions 4.12.2

SQLModel+非同期IOを試してみる

それでは、SQLModelで非同期IOを使ってみましょう。

モデルとテーブルを定義する。

まずはモデルとテーブルの作成から。

models.py

import asyncio

from sqlalchemy.ext.asyncio import create_async_engine
from sqlmodel import Field, Relationship, SQLModel


class Book(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    isbn: str = Field(unique=True)
    title: str
    price: int


class User(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    first_name: str
    last_name: str
    age: int
    posts: list["Post"] = Relationship(back_populates="user")


class Post(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    title: str
    url: str
    user_id: int | None = Field(default=None, foreign_key="user.id")
    user: User | None = Relationship(back_populates="posts")


async def create_db_and_tables() -> None:
    url = "mysql+aiomysql://kazuhira:password@172.17.0.2:3306/practice"
    engine = create_async_engine(url, echo=True)

    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)

    await engine.dispose()


if __name__ == "__main__":
    asyncio.run(create_db_and_tables())

モデルの定義は同期版と変わりません。リレーションもつけています。

AsyncEngineの作成とマイグレーション

async def create_db_and_tables() -> None:
    url = "mysql+aiomysql://kazuhira:password@172.17.0.2:3306/practice"
    engine = create_async_engine(url, echo=True)

    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)

    await engine.dispose()

SQLAlchemyをaiomysqlで使う場合の接続URLの書き方は、このあたりを見ればわかります。

Engine Configuration — SQLAlchemy 2.0 Documentation

MySQL and MariaDB — SQLAlchemy 2.0 Documentation

Engineの作成をcreate_async_engineを使うようにして、あとはasyncawaitを使っておけばいいんだろうと思っていたら
ここでもしっかりハマりまして…。

最初、AsyncEngine#disposeを書いていなかったら

    await engine.dispose()

マイグレーション実行時にこんなエラーがずっと出ていました。

Exception ignored in: <function Connection.__del__ at 0x74a9f9bbcc20>
Traceback (most recent call last):
  File "/path/to/sqlmodel-asyncio-example/.venv/lib/python3.12/site-packages/aiomysql/connection.py", line 1131, in __del__
    self.close()
  File "/path/to/sqlmodel-asyncio-example/.venv/lib/python3.12/site-packages/aiomysql/connection.py", line 339, in close
    self._writer.transport.close()
  File "/usr/lib/python3.12/asyncio/selector_events.py", line 1211, in close
    super().close()
  File "/usr/lib/python3.12/asyncio/selector_events.py", line 875, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/lib/python3.12/asyncio/base_events.py", line 795, in call_soon
    self._check_closed()
  File "/usr/lib/python3.12/asyncio/base_events.py", line 541, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

AsyncEngineはコンテキストから外れた後はdisposeした方がよさそうです。

Unlike when using blocking IO, SQLAlchemy cannot properly dispose of these connections within methods like del or weakref finalizers as there is no opportunity to invoke await. Failing to explicitly dispose of the engine when it falls out of scope may result in warnings emitted to standard out resembling the form RuntimeError: Event loop is closed within garbage collection.

Asynchronous I/O (asyncio) — SQLAlchemy 2.0 Documentation

サンプルコード内にも書いてありました。

...     # for AsyncEngine created in function scope, close and
...     # clean-up pooled connections
...     await engine.dispose()

これを見落としていた結果、テストコードでもすごくハマりました…。

またマイグレーションを実行するコードの書き方もわからなくて、そこそこ調べることになりました…。

        await conn.run_sync(SQLModel.metadata.create_all)

マイグレーションの実行。

$ uv run models.py

結果。

2025-02-11 00:17:57,313 INFO sqlalchemy.engine.Engine SELECT DATABASE()
2025-02-11 00:17:57,313 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-02-11 00:17:57,313 INFO sqlalchemy.engine.Engine SELECT @@sql_mode
2025-02-11 00:17:57,313 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-02-11 00:17:57,313 INFO sqlalchemy.engine.Engine SELECT @@lower_case_table_names
2025-02-11 00:17:57,313 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-02-11 00:17:57,314 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-02-11 00:17:57,314 INFO sqlalchemy.engine.Engine DESCRIBE `practice`.`book`
2025-02-11 00:17:57,314 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-02-11 00:17:57,315 INFO sqlalchemy.engine.Engine DESCRIBE `practice`.`user`
2025-02-11 00:17:57,315 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-02-11 00:17:57,315 INFO sqlalchemy.engine.Engine DESCRIBE `practice`.`post`
2025-02-11 00:17:57,315 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-02-11 00:17:57,316 INFO sqlalchemy.engine.Engine
CREATE TABLE book (
        id INTEGER NOT NULL AUTO_INCREMENT,
        isbn VARCHAR(255) NOT NULL,
        title VARCHAR(255) NOT NULL,
        price INTEGER NOT NULL,
        PRIMARY KEY (id),
        UNIQUE (isbn)
)


2025-02-11 00:17:57,316 INFO sqlalchemy.engine.Engine [no key 0.00008s] ()
2025-02-11 00:17:57,366 INFO sqlalchemy.engine.Engine
CREATE TABLE user (
        id INTEGER NOT NULL AUTO_INCREMENT,
        first_name VARCHAR(255) NOT NULL,
        last_name VARCHAR(255) NOT NULL,
        age INTEGER NOT NULL,
        PRIMARY KEY (id)
)


2025-02-11 00:17:57,367 INFO sqlalchemy.engine.Engine [no key 0.00037s] ()
2025-02-11 00:17:57,411 INFO sqlalchemy.engine.Engine
CREATE TABLE post (
        id INTEGER NOT NULL AUTO_INCREMENT,
        title VARCHAR(255) NOT NULL,
        url VARCHAR(255) NOT NULL,
        user_id INTEGER,
        PRIMARY KEY (id),
        FOREIGN KEY(user_id) REFERENCES user (id)
)


2025-02-11 00:17:57,411 INFO sqlalchemy.engine.Engine [no key 0.00044s] ()
2025-02-11 00:17:57,461 INFO sqlalchemy.engine.Engine COMMIT

これで準備ができました。

 MySQL  localhost:33060+ ssl  practice  SQL > show tables;
+--------------------+
| Tables_in_practice |
+--------------------+
| book               |
| post               |
| user               |
+--------------------+
3 rows in set (0.0022 sec)
テストコードで動作確認する(単一テーブル)

次にテストコードでAsyncEngineおよびAsyncSessionを使って操作してみます。

作成したテストコードはこちら。単一テーブルへのアクセスです。

test_sqlmodel.py

import pytest
import pytest_asyncio

from sqlalchemy.ext.asyncio import create_async_engine
from sqlmodel import col, delete, func, select, or_
from sqlmodel.ext.asyncio.session import AsyncSession

from models import Book

url = "mysql+aiomysql://kazuhira:password@172.17.0.2:3306/practice"
engine = create_async_engine(url)


@pytest_asyncio.fixture
async def delete_all() -> None:
    async with AsyncSession(engine) as session:
        delete_all_statement = delete(Book)
        await session.exec(delete_all_statement)

        await session.commit()

    await engine.dispose()


@pytest.mark.asyncio
async def test_insert(delete_all: None) -> None:
    book1 = Book(
        isbn="978-4297141844",
        title="MySQL運用・管理[実践]入門 〜安全かつ高速にデータを扱う内部構造・動作原理を学ぶ",
        price=3080,
    )
    book2 = Book(
        isbn="978-4798161488", title="MySQL徹底入門 第4版 MySQL 8.0対応", price=4180
    )
    book3 = Book(
        isbn="978-4814400812", title="Pythonクイックリファレンス 第4版", price=5280
    )

    async with AsyncSession(engine) as session:
        session.add(book1)
        session.add(book2)
        session.add(book3)

        assert book1.id is None
        assert book2.id is None
        assert book3.id is None

        await session.commit()

        await session.refresh(
            book1
        )  ## これをやらないとAUTO_INREMENTなプライマリーキーの値を取得できない
        await session.refresh(book2)
        await session.refresh(book3)

        assert book1.id is not None
        assert book2.id is not None
        assert book3.id is not None

    await engine.dispose()


@pytest.mark.asyncio
async def test_select(delete_all: None) -> None:
    books = [
        Book(
            isbn="978-4297141844",
            title="MySQL運用・管理[実践]入門 〜安全かつ高速にデータを扱う内部構造・動作原理を学ぶ",
            price=3080,
        ),
        Book(
            isbn="978-4798161488", title="MySQL徹底入門 第4版 MySQL 8.0対応", price=4180
        ),
        Book(
            isbn="978-4814400812", title="Pythonクイックリファレンス 第4版", price=5280
        ),
    ]

    async with AsyncSession(engine) as session:
        session.add_all(books)
        await session.commit()

        selected_book = (
            await session.exec(select(Book).where(Book.isbn == "978-4297141844"))
        ).one()

        assert (
            selected_book.title
            == "MySQL運用・管理[実践]入門 〜安全かつ高速にデータを扱う内部構造・動作原理を学ぶ"
        )

        selected_books = (
            await session.exec(
                select(Book).where(Book.price > 4000).order_by(col(Book.price).desc())
            )
        ).all()

        assert len(selected_books) == 2

        assert selected_books[0].title == "Pythonクイックリファレンス 第4版"
        assert selected_books[0].price == 5280
        assert selected_books[1].title == "MySQL徹底入門 第4版 MySQL 8.0対応"
        assert selected_books[1].price == 4180

        selected_books2 = (
            await session.exec(
                select(Book)
                .where(Book.price > 4000)
                .where(Book.isbn == "978-4798161488")
                .order_by(col(Book.price).desc())
            )
        ).all()

        assert len(selected_books2) == 1

        assert selected_books2[0].title == "MySQL徹底入門 第4版 MySQL 8.0対応"

        selected_books = (
            await session.exec(
                select(Book)
                .where(or_(Book.price > 4000, Book.isbn == "978-4798161488"))
                .order_by(col(Book.price).desc())
            )
        ).all()

        assert len(selected_books) == 2

        assert selected_books[0].title == "Pythonクイックリファレンス 第4版"
        assert selected_books[0].price == 5280
        assert selected_books[1].title == "MySQL徹底入門 第4版 MySQL 8.0対応"
        assert selected_books[1].price == 4180

    await engine.dispose()


@pytest.mark.asyncio
async def test_rollback(delete_all: None) -> None:
    books = [
        Book(
            isbn="978-4297141844",
            title="MySQL運用・管理[実践]入門 〜安全かつ高速にデータを扱う内部構造・動作原理を学ぶ",
            price=3080,
        ),
        Book(
            isbn="978-4798161488", title="MySQL徹底入門 第4版 MySQL 8.0対応", price=4180
        ),
        Book(
            isbn="978-4814400812", title="Pythonクイックリファレンス 第4版", price=5280
        ),
    ]

    async with AsyncSession(engine) as session:
        session.add_all(books)

        await session.rollback()

        assert (await session.scalar(select(func.count(Book.id)))) == 0

    await engine.dispose()

こちらも、テストごとに接続プールを破棄していないと

    await engine.dispose()

こういうエラーが出ることになります。

RuntimeError: Event loop is closed
The garbage collector is trying to clean up non-checked-in connection <AdaptedConnection <aiomysql.connection.Connection object at 0x7e38f1453290>>, which will be terminated.  Please ensure that SQLAlchemy pooled connections are returned to the pool explicitly, either by calling ``close()`` or by using appropriate context managers to manage their lifecycle.
sys:1: SAWarning: The garbage collector is trying to clean up non-checked-in connection <AdaptedConnection <aiomysql.connection.Connection object at 0x7e38f1453290>>, which will be terminated.  Please ensure that SQLAlchemy pooled connections are returned to the pool explicitly, either by calling ``close()`` or by using appropriate context managers to manage their lifecycle.

他の注意点としては、モデルの更新時にAUTO_INREMENTなどSQL実行後の値が入るものについては明示的に
AsyncSession#refreshが必要です。

    async with AsyncSession(engine) as session:
        session.add(book1)
        session.add(book2)
        session.add(book3)

        assert book1.id is None
        assert book2.id is None
        assert book3.id is None

        await session.commit()

        await session.refresh(
            book1
        )  ## これをやらないとAUTO_INREMENTなプライマリーキーの値を取得できない
        await session.refresh(book2)
        await session.refresh(book3)

        assert book1.id is not None
        assert book2.id is not None
        assert book3.id is not None

あとはクエリーを実行するメソッドには、随所にawaitが要ることに注意すればよいでしょう。

テストコードで動作確認する(複数テーブル・リレーション)

最後は、リレーションを使ったものです。

test_sqlmodel_relation.py

import pytest
import pytest_asyncio

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import selectinload
from sqlmodel import col, delete, func, select, or_
from sqlmodel.ext.asyncio.session import AsyncSession

from models import Post, User

url = "mysql+aiomysql://kazuhira:password@172.17.0.2:3306/practice"
engine = create_async_engine(url)


@pytest_asyncio.fixture
async def delete_all() -> None:
    async with AsyncSession(engine) as session:
        await session.exec(delete(Post))
        await session.exec(delete(User))

        await session.commit()

    await engine.dispose()


@pytest.mark.asyncio
async def test_insert(delete_all: None) -> None:
    async with AsyncSession(engine) as session:
        katsuo = User(first_name="カツオ", last_name="磯野", age=11)
        wakame = User(first_name="ワカメ", last_name="磯野", age=9)

        session.add(katsuo)
        session.add(wakame)

        await session.commit()

        katsuo_posts = [
            Post(
                title="Pythonのパッケージ&プロジェクト管理ツールであるuvをUbuntu Linux 24.04 LTSにインストールする",
                url="https://kazuhira-r.hatenablog.com/entry/2024/12/27/225046",
                user=katsuo,
            ),
            Post(
                title="PythonでUUID バージョン7、ULIDを扱う",
                url="https://kazuhira-r.hatenablog.com/entry/2024/12/01/163724",
                user=katsuo,
            ),
        ]

        wakame_posts = [
            Post(
                title="PyMySQLを使って、PythonからMySQLに接続してみる",
                url="https://kazuhira-r.hatenablog.com/entry/2024/11/06/235906",
                user=wakame,
            )
        ]

        session.add_all(katsuo_posts)
        session.add_all(wakame_posts)

        await session.commit()

        await session.refresh(katsuo)
        await session.refresh(wakame)

        result_katsuo = (await session.exec(select(User).where(User.id == katsuo.id).options(selectinload(User.posts)))).one()
        result_wakame = (await session.exec(select(User).where(User.id == wakame.id).options(selectinload(User.posts)))).one()

        assert len(result_katsuo.posts) == 2
        assert len(result_wakame.posts) == 1

    await engine.dispose()


@pytest.mark.asyncio
async def test_select(delete_all: None) -> None:
    async with AsyncSession(engine) as session:
        katsuo = User(first_name="カツオ", last_name="磯野", age=11)
        wakame = User(first_name="ワカメ", last_name="磯野", age=9)

        katsuo_posts = [
            Post(
                title="Pythonのパッケージ&プロジェクト管理ツールであるuvをUbuntu Linux 24.04 LTSにインストールする",
                url="https://kazuhira-r.hatenablog.com/entry/2024/12/27/225046",
                user=katsuo,
            ),
            Post(
                title="PythonでUUID バージョン7、ULIDを扱う",
                url="https://kazuhira-r.hatenablog.com/entry/2024/12/01/163724",
                user=katsuo,
            ),
        ]

        wakame_posts = [
            Post(
                title="PyMySQLを使って、PythonからMySQLに接続してみる",
                url="https://kazuhira-r.hatenablog.com/entry/2024/11/06/235906",
                user=wakame,
            )
        ]

        session.add(katsuo)
        session.add(wakame)
        session.add_all(katsuo_posts)
        session.add_all(wakame_posts)

        await session.commit()

        katsuo_post_results = (
            await session.exec(
                select(Post)
                .where(Post.title.contains("Linux"))
                .options(selectinload(Post.user))
            )
        ).all()

        assert len(katsuo_post_results) == 1
        assert (
            katsuo_post_results[0].title
            == "Pythonのパッケージ&プロジェクト管理ツールであるuvをUbuntu Linux 24.04 LTSにインストールする"
        )
        assert katsuo_post_results[0].user.first_name == "カツオ"

データの登録と取得。

@pytest.mark.asyncio
async def test_insert(delete_all: None) -> None:
    async with AsyncSession(engine) as session:
        katsuo = User(first_name="カツオ", last_name="磯野", age=11)
        wakame = User(first_name="ワカメ", last_name="磯野", age=9)

        session.add(katsuo)
        session.add(wakame)

        await session.commit()

        katsuo_posts = [
            Post(
                title="Pythonのパッケージ&プロジェクト管理ツールであるuvをUbuntu Linux 24.04 LTSにインストールする",
                url="https://kazuhira-r.hatenablog.com/entry/2024/12/27/225046",
                user=katsuo,
            ),
            Post(
                title="PythonでUUID バージョン7、ULIDを扱う",
                url="https://kazuhira-r.hatenablog.com/entry/2024/12/01/163724",
                user=katsuo,
            ),
        ]

        wakame_posts = [
            Post(
                title="PyMySQLを使って、PythonからMySQLに接続してみる",
                url="https://kazuhira-r.hatenablog.com/entry/2024/11/06/235906",
                user=wakame,
            )
        ]

        session.add_all(katsuo_posts)
        session.add_all(wakame_posts)

        await session.commit()

        await session.refresh(katsuo)
        await session.refresh(wakame)

        result_katsuo = (await session.exec(select(User).where(User.id == katsuo.id).options(selectinload(User.posts)))).one()
        result_wakame = (await session.exec(select(User).where(User.id == wakame.id).options(selectinload(User.posts)))).one()

        assert len(result_katsuo.posts) == 2
        assert len(result_wakame.posts) == 1

    await engine.dispose()

ポイントはここで、コミット後にモデルのインスタンスrefreshする必要があります。

        await session.commit()

        await session.refresh(katsuo)
        await session.refresh(wakame)

        result_katsuo = (await session.exec(select(User).where(User.id == katsuo.id).options(selectinload(User.posts)))).one()
        result_wakame = (await session.exec(select(User).where(User.id == wakame.id).options(selectinload(User.posts)))).one()

そしてさらにデータの取得し直しが必要で、関連するデータについてはselectinloadでロードする必要があります。

refreshがない場合はこんなエラーを見ることになりますし、

sqlalchemy.exc.StatementError: (sqlalchemy.exc.MissingGreenlet) greenlet_spawn has not been called; can't call await_only() here. Was IO attempted in an unexpected place?

selectinloadがない場合はリレーションを定義している部分にはデータが入りません。

もうひとつ、selectの例。

@pytest.mark.asyncio
async def test_select(delete_all: None) -> None:
    async with AsyncSession(engine) as session:
        katsuo = User(first_name="カツオ", last_name="磯野", age=11)
        wakame = User(first_name="ワカメ", last_name="磯野", age=9)

        katsuo_posts = [
            Post(
                title="Pythonのパッケージ&プロジェクト管理ツールであるuvをUbuntu Linux 24.04 LTSにインストールする",
                url="https://kazuhira-r.hatenablog.com/entry/2024/12/27/225046",
                user=katsuo,
            ),
            Post(
                title="PythonでUUID バージョン7、ULIDを扱う",
                url="https://kazuhira-r.hatenablog.com/entry/2024/12/01/163724",
                user=katsuo,
            ),
        ]

        wakame_posts = [
            Post(
                title="PyMySQLを使って、PythonからMySQLに接続してみる",
                url="https://kazuhira-r.hatenablog.com/entry/2024/11/06/235906",
                user=wakame,
            )
        ]

        session.add(katsuo)
        session.add(wakame)
        session.add_all(katsuo_posts)
        session.add_all(wakame_posts)

        await session.commit()

        katsuo_post_results = (
            await session.exec(
                select(Post)
                .where(Post.title.contains("Linux"))
                .options(selectinload(Post.user))
            )
        ).all()

        assert len(katsuo_post_results) == 1
        assert (
            katsuo_post_results[0].title
            == "Pythonのパッケージ&プロジェクト管理ツールであるuvをUbuntu Linux 24.04 LTSにインストールする"
        )
        assert katsuo_post_results[0].user.first_name == "カツオ"

こちらもやっぱりポイントはselectinloadになります。

今回はこんなところでしょうか。

おわりに

SQLModelを非同期IOで使ってみました。

同期版からけっこう単純に置き換えられるのかな?と思っていたのですが、意外と癖があってやたらハマりました…。

SQLModel自身には非同期IOに関する情報がなにも載っていないので、SQLAlchemyのドキュメントを見て事前知識をしっかり
付けておいた方が良さそうだなと思いました。

Asynchronous I/O (asyncio) — SQLAlchemy 2.0 Documentation