Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ dist/
tests/
notebooks/
data/
!tools/osw_to_parquet_rust/data/
!tools/osw_to_parquet_rust/data/unimod.xml
examples/
.tmp/
.DS_Store
sandbox/
.pytest_cache/
.ruff_cache/
.ruff_cache/
1 change: 1 addition & 0 deletions pyprophet/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ class ExportIOConfig(BaseIOConfig):
split_transition_data: bool = True
split_runs: bool = False
include_transition_data: bool = True # Whether to include transition data in parquet export
exclude_feature_var: bool = False # Whether to exclude FEATURE_MS1/MS2 variance (VAR_*) columns

# SqMass: Export to parquet
pqp_file: Optional[str] = None # Path to PQP file for precursor/transition mapping
Expand Down
9 changes: 9 additions & 0 deletions pyprophet/cli/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,13 @@ def export_library(
show_default=True,
help="Include transition data in the exported parquet file(s). When disabled, only precursor-level data is exported.",
)
@click.option(
"--exclude_feature_var/--no-exclude_feature_var",
"exclude_feature_var",
default=False,
show_default=True,
help="Exclude feature variance columns (VAR_*) from FEATURE_MS1 and FEATURE_MS2 tables. Significantly speeds up export and reduces file size.",
)
@measure_memory_usage_and_time
def export_parquet(
infile,
Expand All @@ -630,6 +637,7 @@ def export_parquet(
compression,
compression_level,
include_transition_data,
exclude_feature_var,
):
"""
Export OSW or sqMass to parquet format
Expand Down Expand Up @@ -666,6 +674,7 @@ def export_parquet(
compression_method=compression,
compression_level=compression_level,
include_transition_data=include_transition_data,
exclude_feature_var=exclude_feature_var,
)

writer = WriterDispatcher.get_writer(config)
Expand Down
9 changes: 7 additions & 2 deletions pyprophet/cli/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@ def merge():
is_flag=True,
help="Merge OSW output files that have already been scored.",
)
@click.option(
"--fresh",
is_flag=True,
help="Start from scratch, ignoring any existing merged.osw file. If enabled, removes existing output file before starting.",
)
@measure_memory_usage_and_time
def merge_osw(infiles, outfile, same_run, templatefile, merged_post_scored_runs):
def merge_osw(infiles, outfile, same_run, templatefile, merged_post_scored_runs, fresh):
"""
Merge multiple OSW files and (for large experiments, it is recommended to subsample first).
"""
Expand All @@ -70,7 +75,7 @@ def merge_osw(infiles, outfile, same_run, templatefile, merged_post_scored_runs)
"At least one PyProphet input file needs to be provided."
)

_merge_osw(infiles, outfile, templatefile, same_run, merged_post_scored_runs)
_merge_osw(infiles, outfile, templatefile, same_run, merged_post_scored_runs, fresh)


@click.command(name="parquet", cls=AdvancedHelpCommand)
Expand Down
233 changes: 169 additions & 64 deletions pyprophet/io/export/osw.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ def _read_unscored_data(self, con):

def _build_score_sql(self, con):
"""Build SQL fragment for score columns in unscored files."""
# Skip if exclude_feature_var is enabled
if self.config.exclude_feature_var:
return ""

Comment on lines +267 to +270
Comment on lines 265 to +270
score_sql = ""
if check_sqlite_table(con, "FEATURE_MS1"):
score_sql = write_scores_sql_command(
Expand Down Expand Up @@ -1516,7 +1520,23 @@ def _write_parquet(self) -> None:
def _convert_to_split_parquet(self) -> None:
"""Convert OSW to split parquet format"""
conn = duckdb.connect(":memory:")
load_sqlite_scanner(conn)

try:
load_sqlite_scanner(conn)
except Exception as scanner_error:
# If sqlite_scanner fails to load (e.g., in containers without internet),
# provide helpful guidance but continue with fallback
if "Failed to download extension" in str(scanner_error) or "Connection timed out" in str(scanner_error):
click.echo(
"Warning: sqlite_scanner extension could not be loaded (likely in container without internet access).\n"
"To fix: Set DUCKDB_EXTENSION_DIRECTORY environment variable to a directory with pre-downloaded extensions.\n"
"Or pre-download extensions on your host with: "
"python3 -c 'import duckdb; duckdb.connect(\":memory:\").execute(\"LOAD sqlite_scanner\")'\n"
"Continuing with alternative method...",
err=True
)
else:
raise

try:
# Prepare column information
Expand All @@ -1533,7 +1553,23 @@ def _convert_to_split_parquet(self) -> None:
def _convert_to_single_parquet(self) -> None:
"""Convert OSW to single parquet file"""
conn = duckdb.connect(":memory:")
load_sqlite_scanner(conn)

try:
load_sqlite_scanner(conn)
except Exception as scanner_error:
# If sqlite_scanner fails to load (e.g., in containers without internet),
# provide helpful guidance but continue with fallback
if "Failed to download extension" in str(scanner_error) or "Connection timed out" in str(scanner_error):
click.echo(
"Warning: sqlite_scanner extension could not be loaded (likely in container without internet access).\n"
"To fix: Set DUCKDB_EXTENSION_DIRECTORY environment variable to a directory with pre-downloaded extensions.\n"
"Or pre-download extensions on your host with: "
"python3 -c 'import duckdb; duckdb.connect(\":memory:\").execute(\"LOAD sqlite_scanner\")'\n"
"Continuing with alternative method...",
err=True
)
else:
raise

try:
# Prepare column information
Expand Down Expand Up @@ -1614,9 +1650,45 @@ def _prepare_column_info(self, conn) -> dict:
column_info["score_peptide_contexts"] = self._check_contexts(
sql_conn, "SCORE_PEPTIDE"
)

# Create necessary indices to speed up joins
logger.info("Creating indices for faster export")
self._create_export_indices(sql_conn)

return column_info

def _create_export_indices(self, sql_conn: sqlite3.Connection) -> None:
"""Create indices to optimize join performance during export"""
indices_to_create = [
("PRECURSOR_PEPTIDE_MAPPING", "PRECURSOR_ID", "idx_ppm_precursor_id"),
("PRECURSOR_PEPTIDE_MAPPING", "PEPTIDE_ID", "idx_ppm_peptide_id"),
("PEPTIDE_PROTEIN_MAPPING", "PEPTIDE_ID", "idx_pprotm_peptide_id"),
("PEPTIDE_PROTEIN_MAPPING", "PROTEIN_ID", "idx_pprotm_protein_id"),
("PEPTIDE_GENE_MAPPING", "PEPTIDE_ID", "idx_pgm_peptide_id"),
("PEPTIDE_GENE_MAPPING", "GENE_ID", "idx_pgm_gene_id"),
("FEATURE", "PRECURSOR_ID", "idx_feat_precursor_id"),
("FEATURE", "RUN_ID", "idx_feat_run_id"),
("FEATURE_MS1", "FEATURE_ID", "idx_feat_ms1_feature_id"),
("FEATURE_MS2", "FEATURE_ID", "idx_feat_ms2_feature_id"),
("FEATURE_TRANSITION", "FEATURE_ID", "idx_feat_trans_feature_id"),
("FEATURE_TRANSITION", "TRANSITION_ID", "idx_feat_trans_trans_id"),
("TRANSITION_PRECURSOR_MAPPING", "TRANSITION_ID", "idx_tpm_transition_id"),
("TRANSITION_PRECURSOR_MAPPING", "PRECURSOR_ID", "idx_tpm_precursor_id"),
("TRANSITION_PEPTIDE_MAPPING", "TRANSITION_ID", "idx_tpeptm_transition_id"),
("TRANSITION_PEPTIDE_MAPPING", "PEPTIDE_ID", "idx_tpeptm_peptide_id"),
]

for table, column, index_name in indices_to_create:
try:
sql_conn.execute(
f"CREATE INDEX IF NOT EXISTS {index_name} ON {table}({column})"
)
except sqlite3.OperationalError as e:
logger.debug(f"Could not create index {index_name}: {e}")

sql_conn.commit()
logger.debug("Indices created for export optimization")

def _export_split_by_run(self, conn, column_info: dict) -> None:
"""Export data split by run into separate directories"""
os.makedirs(self.config.outfile, exist_ok=True)
Expand Down Expand Up @@ -1707,30 +1779,24 @@ def _export_combined(self, conn, column_info: dict) -> None:
self._export_alignment_data(conn)

def _export_single_file(self, conn, column_info: dict) -> None:
"""Export all data to a single parquet file"""
# Create temp table with combined schema
logger.debug("Creating temporary table for combined export")
self._create_temp_table(conn, column_info)
"""Export all data to a single parquet file using streaming (UNION ALL)"""
logger.info(f"Exporting combined data to {self.config.outfile}")

# Insert precursor data
logger.debug("Inserting precursor data into temp table")
# Build precursor query
precursor_query = self._build_combined_precursor_query(conn, column_info)
# print(precursor_query)
conn.execute(f"INSERT INTO temp_table {precursor_query}")

# Insert transition data if requested
# Build combined query
if self.config.include_transition_data:
logger.debug("Inserting transition data into temp table")
logger.debug("Including transition data in export")
transition_query = self._build_combined_transition_query(column_info)
conn.execute(f"INSERT INTO temp_table {transition_query}")
# Combine queries with UNION ALL - this streams directly to parquet
combined_query = f"{precursor_query}\nUNION ALL\n{transition_query}"
else:
logger.info(
"Skipping transition data export (include_transition_data=False)"
)
logger.info("Skipping transition data export (include_transition_data=False)")
combined_query = precursor_query

# Export to parquet
logger.info(f"Exporting combined data to {self.config.outfile}")
self._execute_copy_query(conn, "SELECT * FROM temp_table", self.config.outfile)
# Stream directly to parquet file without intermediate temp table
self._execute_copy_query(conn, combined_query, self.config.outfile)

# Export alignment data if exists
if column_info["feature_ms2_alignment_exists"]:
Expand Down Expand Up @@ -1775,44 +1841,16 @@ def _register_peptide_ipf_map(self, conn: duckdb.DuckDBPyConnection) -> None:
)

def _create_unimod_to_codename_peptide_id_mapping_table(self) -> None:
"""Create peptide unimod to codename mapping table in SQLite database."""
"""Create peptide unimod to codename mapping table in SQLite database.

Processes peptides in chunks to reduce memory footprint for large datasets.
"""
logger.info(
"Generating peptide unimod to codename mapping and storing in SQLite"
)

with sqlite3.connect(self.config.infile) as sql_conn:
# First get the peptide table and process it with pyopenms
peptide_df = pd.read_sql_query(
"SELECT ID, MODIFIED_SEQUENCE FROM PEPTIDE", sql_conn
)

peptide_df["codename"] = peptide_df["MODIFIED_SEQUENCE"].apply(
unimod_to_codename
)

# Create the merged mapping
unimod_mask = peptide_df["MODIFIED_SEQUENCE"].str.contains("UniMod")
merged_df = pd.merge(
peptide_df[unimod_mask][["codename", "ID"]],
peptide_df[~unimod_mask][["codename", "ID"]],
on="codename",
suffixes=("_unimod", "_codename"),
how="outer",
)

# Fill NaN values in the 'ID_codename' column with the 'ID_unimod' values
merged_df["ID_codename"] = merged_df["ID_codename"].fillna(
merged_df["ID_unimod"]
)
# Fill NaN values in the 'ID_unimod' column with the 'ID_codename' values
merged_df["ID_unimod"] = merged_df["ID_unimod"].fillna(
merged_df["ID_codename"]
)

merged_df["ID_unimod"] = merged_df["ID_unimod"].astype(int)
merged_df["ID_codename"] = merged_df["ID_codename"].astype(int)

# Create the UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING table in SQLite
# Create the mapping table first
sql_conn.execute(
"""
CREATE TABLE IF NOT EXISTS UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING (
Expand All @@ -1824,14 +1862,73 @@ def _create_unimod_to_codename_peptide_id_mapping_table(self) -> None:
"""
)
sql_conn.execute("DELETE FROM UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING")
sql_conn.commit()

# Insert the data into SQLite table
merged_df[["ID_unimod", "ID_codename", "codename"]].to_sql(
"UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING",
sql_conn,
if_exists="append",
index=False,
)
# Get total count for progress tracking
total_count = sql_conn.execute(
"SELECT COUNT(*) FROM PEPTIDE"
).fetchone()[0]
logger.info(f"Processing {total_count} peptides in chunks")

# Process peptides in chunks to reduce memory footprint
chunk_size = 50000 # Process 50k peptides at a time
processed = 0

while processed < total_count:
# Fetch chunk of peptides
peptide_chunk = pd.read_sql_query(
f"""SELECT ID, MODIFIED_SEQUENCE FROM PEPTIDE
LIMIT {chunk_size} OFFSET {processed}""",
sql_conn,
)

if peptide_chunk.empty:
break

# Process chunk
peptide_chunk["codename"] = peptide_chunk["MODIFIED_SEQUENCE"].apply(
unimod_to_codename
)

# Create mapping for this chunk
unimod_mask = peptide_chunk["MODIFIED_SEQUENCE"].str.contains("UniMod", na=False)
unimod_chunk = peptide_chunk[unimod_mask][["codename", "ID"]].copy()
unimod_chunk.columns = ["codename", "ID_unimod"]

codename_chunk = peptide_chunk[~unimod_mask][["codename", "ID"]].copy()
codename_chunk.columns = ["codename", "ID_codename"]

# Merge on codename
merged_chunk = pd.merge(
unimod_chunk,
codename_chunk,
on="codename",
how="outer",
)
Comment on lines +1901 to +1907

# Fill NaN values
merged_chunk["ID_codename"] = merged_chunk["ID_codename"].fillna(
merged_chunk["ID_unimod"]
)
merged_chunk["ID_unimod"] = merged_chunk["ID_unimod"].fillna(
merged_chunk["ID_codename"]
)

merged_chunk["ID_unimod"] = merged_chunk["ID_unimod"].astype(int)
merged_chunk["ID_codename"] = merged_chunk["ID_codename"].astype(int)

# Insert chunk into SQLite
merged_chunk[["ID_unimod", "ID_codename", "codename"]].to_sql(
"UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING",
sql_conn,
if_exists="append",
index=False,
)

processed += len(peptide_chunk)
logger.debug(f"Processed {processed}/{total_count} peptides")

sql_conn.commit()

# Create indices for better performance
sql_conn.execute(
Expand All @@ -1845,8 +1942,12 @@ def _create_unimod_to_codename_peptide_id_mapping_table(self) -> None:
)

sql_conn.commit()

final_count = sql_conn.execute(
"SELECT COUNT(*) FROM UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING"
).fetchone()[0]
logger.info(
f"Successfully created UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING table with {len(merged_df)} mappings"
f"Successfully created UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING table with {final_count} mappings"
)

def _insert_precursor_peptide_ipf_map(self) -> None:
Expand Down Expand Up @@ -2472,7 +2573,7 @@ def _export_alignment_data(self, conn, path: str = None) -> None:
has_score_alignment = check_sqlite_table(sql_conn, "SCORE_ALIGNMENT")

if has_score_alignment:
# Export with alignment scores
# Export with alignment scores - use ROW_NUMBER to get best score per feature
query = f"""
SELECT
FEATURE_MS2_ALIGNMENT.ALIGNMENT_ID,
Expand All @@ -2496,9 +2597,13 @@ def _export_alignment_data(self, conn, path: str = None) -> None:
SCORE_ALIGNMENT.QVALUE AS QVALUE
FROM sqlite_scan('{self.config.infile}', 'FEATURE_MS2_ALIGNMENT') AS FEATURE_MS2_ALIGNMENT
LEFT JOIN (
SELECT FEATURE_ID, SCORE, PEP, QVALUE, MIN(QVALUE) as MIN_QVALUE
FROM sqlite_scan('{self.config.infile}', 'SCORE_ALIGNMENT')
GROUP BY FEATURE_ID
SELECT FEATURE_ID, SCORE, PEP, QVALUE
FROM (
SELECT FEATURE_ID, SCORE, PEP, QVALUE,
ROW_NUMBER() OVER (PARTITION BY FEATURE_ID ORDER BY QVALUE ASC) as rn
FROM sqlite_scan('{self.config.infile}', 'SCORE_ALIGNMENT')
) t
WHERE rn = 1
) AS SCORE_ALIGNMENT
ON FEATURE_MS2_ALIGNMENT.ALIGNED_FEATURE_ID = SCORE_ALIGNMENT.FEATURE_ID
"""
Expand Down
Loading
Loading