Skip to content

SQLAlchemy bulk insert is executed per row #602

@varvarvarvar

Description

@varvarvarvar

Expected behavior

I'm using sqlalchemy@2.0.49 to execute bulk insert statement through trino. I expect bulk insert of 1000 rows to be executed in one query, instead there're 1000 separate queries inserting 1 row.

with engine.connect() as conn:
    rows = [
        {"probe_id": i, "probe_name": random.choice(string.ascii_letters)}
        for i in range(1000)
    ]
    col_list = ", ".join(rows[0].keys())
    placeholders = ", ".join([f":{col}" for col in rows[0].keys()])
    quoted_target_table = f'"{_SCHEMA}"."{table_name}"'
    stmt = f"""
        INSERT INTO {quoted_target_table} ({col_list})
        VALUES ({placeholders})
    """
    conn.execute(text(stmt), rows)
    conn.commit()

Actual behavior

Bulk insert of 1000 rows results in 1000 queries each performing 1 insert.

Steps To Reproduce

import random
import string
import uuid

from sqlalchemy import create_engine, text
from sqlalchemy.engine import URL, Engine

from mysettings import trino_settings

_CATALOG = "test_catalog"
_SCHEMA = "test_schema"


def _build_engine() -> Engine:
    url = URL.create(
        "trino",
        username=trino_settings.trino_user,
        host=trino_settings.trino_host,
        port=trino_settings.trino_port,
        database=f"{_CATALOG}/{_SCHEMA}",
    )
    return create_engine(
        url,
        pool_pre_ping=True,
        connect_args={"http_scheme": trino_settings.trino_http_scheme},
    )


def main(engine: Engine, table_name: str) -> None:
    with engine.connect() as conn:
        conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{_CATALOG}"."{_SCHEMA}"'))
        conn.commit()

        conn.execute(
            text(
                f'CREATE TABLE IF NOT EXISTS "{_SCHEMA}"."{table_name}" ('
                '"probe_id" INTEGER, "probe_name" VARCHAR)'
            )
        )
        conn.commit()

        rows = [
            {"probe_id": i, "probe_name": random.choice(string.ascii_letters)}
            for i in range(1000)
        ]
        col_list = ", ".join(rows[0].keys())
        placeholders = ", ".join([f":{col}" for col in rows[0].keys()])
        quoted_target_table = f'"{_SCHEMA}"."{table_name}"'
        stmt = f"""
            INSERT INTO {quoted_target_table} ({col_list})
            VALUES ({placeholders})
        """
        conn.execute(text(stmt), rows)
        conn.commit()

        count_result = conn.execute(
            text(f'SELECT COUNT(*) FROM "{_SCHEMA}"."{table_name}"')
        )
        row_count = int(count_result.scalar_one())
        conn.commit()
        print(f"Rows after insert: {row_count}")
        assert row_count == len(rows)


if __name__ == "__main__":
    engine = _build_engine()
    probe_suffix = uuid.uuid4().hex[:10]
    print(f"Running queries for table ID {probe_suffix}")
    table_name = f"tmp_insert_probe_{probe_suffix}"
    main(engine=engine, table_name=table_name)

Log output

No response

Operating System

MacOS Tahoe 26.3.1(a)

Trino Python client version

0.337.0

Trino Server version

479

Python version

3.14.3

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions