Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ def _source_pageset_to_parquet(
# add source table columns
casted_source_cols = [
# here we cast the column to the specified type ensure the colname remains the same
f"CAST(\"{column['column_name']}\" AS {column['column_dtype']}) AS \"{column['column_name']}\""
f'CAST("{column["column_name"]}" AS {column["column_dtype"]}) AS "{column["column_name"]}"'
for column in source["columns"]
]

Expand Down Expand Up @@ -519,9 +519,9 @@ def _source_pageset_to_parquet(
_write_parquet_table_with_metadata(
table=ddb_reader.execute(f"""
{base_query}
WHERE {source['page_key']} BETWEEN {pageset[0]} AND {pageset[1]}
WHERE {source["page_key"]} BETWEEN {pageset[0]} AND {pageset[1]}
/* optional ordering per pageset */
{"ORDER BY " + source['page_key'] if sort_output else ""};
{"ORDER BY " + source["page_key"] if sort_output else ""};
""").fetch_arrow_table(),
where=result_filepath,
)
Expand Down Expand Up @@ -1734,7 +1734,6 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals
)

if dest_backend == "iceberg":

from cytotable.warehouse.iceberg import write_iceberg_warehouse

return write_iceberg_warehouse(
Expand All @@ -1758,6 +1757,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals
bbox_column_map=bbox_column_map,
sort_output=sort_output,
preset=preset,
drop_null=drop_null,
parsl_config=parsl_config,
**kwargs,
)
Expand Down
44 changes: 38 additions & 6 deletions cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,38 @@
from parsl.app.app import AppBase
from parsl.config import Config
from parsl.errors import NoDataFlowKernelError
from parsl.executors import HighThroughputExecutor
from parsl.executors import (
HighThroughputExecutor,
)
from parsl.executors import ThreadPoolExecutor as ParslThreadPoolExecutor

logger = logging.getLogger(__name__)

CYTOTABLE_THREAD_EXECUTOR_LABEL = "cytotable_threads"


def _ensure_thread_executor(config: Config) -> Config:
"""
Add CytoTable's ThreadPoolExecutor to a Parsl Config if not already present.

The thread executor is used for I/O-bound image processing tasks so they
run in-process (no Arrow serialization cost) alongside the HighThroughputExecutor
that handles the data-preparation pipeline.
"""
labels = {e.label for e in config.executors}
if CYTOTABLE_THREAD_EXECUTOR_LABEL not in labels:
return Config(
executors=list(config.executors)
+ [
ParslThreadPoolExecutor(
label=CYTOTABLE_THREAD_EXECUTOR_LABEL,
max_threads=4,
)
]
)
return config


# reference the original init
original_init = AppBase.__init__

Expand Down Expand Up @@ -70,7 +98,11 @@ def _default_parsl_config():
executors=[
HighThroughputExecutor(
label="htex_default_for_cytotable",
)
),
ParslThreadPoolExecutor(
label=CYTOTABLE_THREAD_EXECUTOR_LABEL,
max_threads=4,
),
]
)

Expand Down Expand Up @@ -275,11 +307,11 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str:
query_parts = tablenumber_sql + ", ".join([f"""
CASE
/* when the storage class type doesn't match the column, return nulltype */
WHEN typeof({col['column_name']}) !=
'{_sqlite_affinity_data_type_lookup(col['column_type'].lower())}' THEN NULL
WHEN typeof({col["column_name"]}) !=
'{_sqlite_affinity_data_type_lookup(col["column_type"].lower())}' THEN NULL
/* else, return the normal value */
ELSE {col['column_name']}
END AS {col['column_name']}
ELSE {col["column_name"]}
END AS {col["column_name"]}
""" for col in column_info])

# perform the select using the cases built above and using chunksize + offset
Expand Down
Loading
Loading