Coverage for barbet/markers.py: 59.09%
154 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-12 04:23 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-12 04:23 +0000
1import os
2import subprocess
3from typing import Dict, List, Tuple
4import gzip
5import shutil
6import tempfile
7from multiprocessing import Manager
8from concurrent.futures import ProcessPoolExecutor
10from rich import progress
12# Marker information
13BAC120_MARKERS = [
14 "PF00380.20",
15 "PF00410.20",
16 "PF00466.21",
17 "PF01025.20",
18 "PF02576.18",
19 "PF03726.15",
20 "TIGR00006",
21 "TIGR00019",
22 "TIGR00020",
23 "TIGR00029",
24 "TIGR00043",
25 "TIGR00054",
26 "TIGR00059",
27 "TIGR00061",
28 "TIGR00064",
29 "TIGR00065",
30 "TIGR00082",
31 "TIGR00083",
32 "TIGR00084",
33 "TIGR00086",
34 "TIGR00088",
35 "TIGR00090",
36 "TIGR00092",
37 "TIGR00095",
38 "TIGR00115",
39 "TIGR00116",
40 "TIGR00138",
41 "TIGR00158",
42 "TIGR00166",
43 "TIGR00168",
44 "TIGR00186",
45 "TIGR00194",
46 "TIGR00250",
47 "TIGR00337",
48 "TIGR00344",
49 "TIGR00362",
50 "TIGR00382",
51 "TIGR00392",
52 "TIGR00396",
53 "TIGR00398",
54 "TIGR00414",
55 "TIGR00416",
56 "TIGR00420",
57 "TIGR00431",
58 "TIGR00435",
59 "TIGR00436",
60 "TIGR00442",
61 "TIGR00445",
62 "TIGR00456",
63 "TIGR00459",
64 "TIGR00460",
65 "TIGR00468",
66 "TIGR00472",
67 "TIGR00487",
68 "TIGR00496",
69 "TIGR00539",
70 "TIGR00580",
71 "TIGR00593",
72 "TIGR00615",
73 "TIGR00631",
74 "TIGR00634",
75 "TIGR00635",
76 "TIGR00643",
77 "TIGR00663",
78 "TIGR00717",
79 "TIGR00755",
80 "TIGR00810",
81 "TIGR00922",
82 "TIGR00928",
83 "TIGR00959",
84 "TIGR00963",
85 "TIGR00964",
86 "TIGR00967",
87 "TIGR01009",
88 "TIGR01011",
89 "TIGR01017",
90 "TIGR01021",
91 "TIGR01029",
92 "TIGR01032",
93 "TIGR01039",
94 "TIGR01044",
95 "TIGR01059",
96 "TIGR01063",
97 "TIGR01066",
98 "TIGR01071",
99 "TIGR01079",
100 "TIGR01082",
101 "TIGR01087",
102 "TIGR01128",
103 "TIGR01146",
104 "TIGR01164",
105 "TIGR01169",
106 "TIGR01171",
107 "TIGR01302",
108 "TIGR01391",
109 "TIGR01393",
110 "TIGR01394",
111 "TIGR01510",
112 "TIGR01632",
113 "TIGR01951",
114 "TIGR01953",
115 "TIGR02012",
116 "TIGR02013",
117 "TIGR02027",
118 "TIGR02075",
119 "TIGR02191",
120 "TIGR02273",
121 "TIGR02350",
122 "TIGR02386",
123 "TIGR02397",
124 "TIGR02432",
125 "TIGR02729",
126 "TIGR03263",
127 "TIGR03594",
128 "TIGR03625",
129 "TIGR03632",
130 "TIGR03654",
131 "TIGR03723",
132 "TIGR03725",
133 "TIGR03953",
134]
136AR53_MARKERS = [
137 "PF04919.13",
138 "PF07541.13",
139 "PF01000.27",
140 "PF00687.22",
141 "PF00466.21",
142 "PF00827.18",
143 "PF01280.21",
144 "PF01090.20",
145 "PF01200.19",
146 "PF01015.19",
147 "PF00900.21",
148 "PF00410.20",
149 "TIGR00037",
150 "TIGR00064",
151 "TIGR00111",
152 "TIGR00134",
153 "TIGR00279",
154 "TIGR00291",
155 "TIGR00323",
156 "TIGR00335",
157 "TIGR00373",
158 "TIGR00405",
159 "TIGR00448",
160 "TIGR00483",
161 "TIGR00491",
162 "TIGR00522",
163 "TIGR00967",
164 "TIGR00982",
165 "TIGR01008",
166 "TIGR01012",
167 "TIGR01018",
168 "TIGR01020",
169 "TIGR01028",
170 "TIGR01046",
171 "TIGR01052",
172 "TIGR01171",
173 "TIGR01213",
174 "TIGR01952",
175 "TIGR02236",
176 "TIGR02338",
177 "TIGR02389",
178 "TIGR02390",
179 "TIGR03626",
180 "TIGR03627",
181 "TIGR03628",
182 "TIGR03629",
183 "TIGR03670",
184 "TIGR03671",
185 "TIGR03672",
186 "TIGR03673",
187 "TIGR03674",
188 "TIGR03676",
189 "TIGR03680",
190]
193def read_fasta(path: str) -> Dict[str, str]:
194 """
195 Read a FASTA file into a dictionary of sequences.
197 Parameters
198 ----------
199 path : str
200 Filesystem path to the input FASTA file.
202 Returns
203 -------
204 Dict[str, str]
205 Mapping from sequence ID (the first token after '>' in the header)
206 to the full sequence string, with any terminal “*” characters stripped.
208 Raises
209 ------
210 IOError
211 If the file cannot be opened for reading.
212 """
213 seqs: Dict[str, str] = {}
214 with open(path) as fh:
215 header, buffer = None, []
216 for line in fh:
217 line = line.rstrip()
218 if not line:
219 continue
220 if line.startswith(">"):
221 if header:
222 seqs[header] = "".join(buffer).strip("*")
223 header = line[1:].split()[0]
224 buffer = []
225 else:
226 buffer.append(line)
227 if header:
228 seqs[header] = "".join(buffer).strip("*")
229 return seqs
232def run_prodigal(
233 genome_id: str,
234 fasta_path: str,
235 out_dir: str,
236 force: bool
237) -> str:
238 """
239 Run Prodigal to predict protein-coding genes from a FASTA file.
240 Parameters
241 ----------
242 genome_id : str
243 Unique identifier for the genome (used for output directory and file names).
244 fasta_path : str
245 Path to the input FASTA file containing genomic sequences.
246 out_dir : str
247 Directory where the output protein FASTA file will be saved.
248 force : bool
249 If True, overwrite existing output files.
250 progress_dict : Dict
251 Shared dictionary to track progress across multiple processes.
252 task_id : int
253 Unique task ID for this genome, used to update progress in the shared dict.
254 Returns
255 -------
256 str
257 Path to the output protein FASTA file generated by Prodigal.
259 """
260 prot_dir = os.path.join(out_dir, genome_id)
261 os.makedirs(prot_dir, exist_ok=True)
262 prot_fa = os.path.join(prot_dir, f"{genome_id}.faa")
264 if force and os.path.exists(prot_fa):
265 os.remove(prot_fa)
267 # If input is gzipped, decompress to a temporary file
268 if fasta_path.endswith(".gz"):
269 with gzip.open(fasta_path, "rt") as gz_in, \
270 tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".fasta") as tmp_fa:
271 shutil.copyfileobj(gz_in, tmp_fa)
272 input_path = tmp_fa.name
273 else:
274 input_path = fasta_path
276 # Run Prodigal
277 subprocess.run(
278 ["prodigal", "-a", prot_fa, "-p", "meta", "-i", input_path],
279 check=True,
280 stdout=subprocess.DEVNULL,
281 stderr=subprocess.DEVNULL,
282 )
283 return prot_fa
286def parse_domtblout_top_hits(domtbl_path: str) -> Dict[str, List[str]]:
287 """
288 Parse a HMMER --domtblout file and select the top hit per sequence.
290 Implements the GTDB-Tk comparator logic: for each query sequence,
291 keeps the hit with highest bitscore; ties broken by lower e-value,
292 then by lexicographically smaller HMM ID.
294 Parameters
295 ----------
296 domtbl_path : str
297 Path to the HMMER --domtblout output file.
299 Returns
300 -------
301 Dict[str, List[str]]
302 Mapping from HMM ID to a list of sequence IDs that were chosen
303 as top hit(s) for that HMM.
305 Raises
306 ------
307 IOError
308 If the domtblout file cannot be opened.
309 ValueError
310 If a non-numeric e-value or bitscore is encountered.
311 """
312 seq_matches: Dict[str, Tuple[str, float, float]] = {}
313 with open(domtbl_path) as fh:
314 for line in fh:
315 if line.startswith("#"):
316 continue
317 parts = line.split()
318 seq_id = parts[0]
319 hmm_id = parts[3]
320 evalue = float(parts[4])
321 bitscore = float(parts[5])
322 # look for the best hit for this sequence
323 prev = seq_matches.get(seq_id)
324 if prev is None:
325 seq_matches[seq_id] = (hmm_id, bitscore, evalue)
326 else:
327 # only keep the best hit
328 prev_hmm_id, prev_b, prev_e = prev
329 if (
330 bitscore > prev_b
331 or (bitscore == prev_b and evalue < prev_e)
332 or (
333 bitscore == prev_b and evalue == prev_e and hmm_id < prev_hmm_id
334 )
335 ):
336 seq_matches[seq_id] = (hmm_id, bitscore, evalue)
338 # now, invert the mapping to get HMM IDs to sequences
339 hits: Dict[str, List[str]] = {}
340 for seq_id, (hmm_id, _, _) in seq_matches.items():
341 hits.setdefault(hmm_id, []).append(seq_id)
342 return hits
345def _process_single_genome(
346 args: Tuple[
347 str, # genome_id
348 str, # fasta_path
349 str, # out_dir
350 int, # cpus_per_proc
351 str, # pfam_db
352 str, # tigr_db
353 bool, # force
354 bool, # skip_multiple_hits
355 int, # number_of_hits_to_keep
356 Dict, # shared progress dict (Manager().dict())
357 int, # task_id in Rich.Progress
358 ],
359) -> Tuple[str, Dict[str, List[str]]]:
360 """
361 Worker-function for one genome, updated to report progress after each subtask.
362 Subtasks:
363 1) Prodigal
364 2) Pfam HMM search
365 3) TIGRFAM HMM search
366 4) Parse + write FASTAs
368 Returns: ( "path/to/genome.fasta", { "bac120": [...], "ar53": [...] } )
369 """
370 (
371 gid,
372 path,
373 out_dir,
374 cpus_per_proc,
375 pfam_db,
376 tigr_db,
377 force,
378 skip_multiple_hits,
379 number_of_hits_to_keep,
380 progress_dict,
381 task_id,
382 ) = args
384 genome_fastas: Dict[str, List[str]] = {"bac120": [], "ar53": []}
386 # Signal that we are starting the job
387 progress_dict[task_id] = {"progress": 0}
389 # Prodigal
390 prot_fa = run_prodigal(gid, path, out_dir, force)
391 prot_seqs = read_fasta(prot_fa)
392 # Signal to the shared dict that we've completed step 1 (Prodigal)
393 progress_dict[task_id] = {"progress": 1}
395 # Pfam HMM search
396 pf_out = os.path.join(out_dir, gid, "pfam.tblout")
397 if force and os.path.exists(pf_out):
398 os.remove(pf_out)
400 subprocess.run(
401 [
402 "hmmsearch",
403 "--cpu",
404 str(cpus_per_proc),
405 "--notextw",
406 "-E",
407 "0.001",
408 "--domE",
409 "0.001",
410 "--tblout",
411 pf_out,
412 pfam_db,
413 prot_fa,
414 ],
415 check=True,
416 stdout=subprocess.DEVNULL,
417 stderr=subprocess.DEVNULL,
418 )
419 progress_dict[task_id] = {"progress": 2}
421 # TIGRFAM HMM search
422 tg_out = os.path.join(out_dir, gid, "tigrfam.tblout")
423 if force and os.path.exists(tg_out):
424 os.remove(tg_out)
426 subprocess.run(
427 [
428 "hmmsearch",
429 "--cpu",
430 str(cpus_per_proc),
431 "--noali",
432 "--notextw",
433 "--cut_nc",
434 "--tblout",
435 tg_out,
436 tigr_db,
437 prot_fa,
438 ],
439 check=True,
440 stdout=subprocess.DEVNULL,
441 stderr=subprocess.DEVNULL,
442 )
443 progress_dict[task_id] = {"progress": 3}
445 # Write FASTAs for each marker
446 pf_hits = parse_domtblout_top_hits(pf_out)
447 tg_hits = parse_domtblout_top_hits(tg_out)
448 combined_hits = {**pf_hits, **tg_hits}
450 for marker, seq_ids in combined_hits.items():
451 if not seq_ids:
452 continue
453 elif len(seq_ids) == 1:
454 seqs = [prot_seqs[seq_ids[0]]]
455 else:
456 unique_seqs = set(prot_seqs[s] for s in seq_ids)
457 if len(unique_seqs) != 1 and skip_multiple_hits:
458 # faster but can miss some markers
459 continue
460 seqs = list(unique_seqs)[:number_of_hits_to_keep]
462 for dom in ("bac120", "ar53"):
463 if (dom == "bac120" and marker in BAC120_MARKERS) or (
464 dom == "ar53" and marker in AR53_MARKERS
465 ):
466 genome_dir = os.path.join(out_dir, gid, dom)
467 os.makedirs(genome_dir, exist_ok=True)
468 for i, seq in enumerate(seqs, start=1):
469 fa_path = os.path.join(genome_dir, f"{marker}.{i}.fa")
470 with open(fa_path, "w") as fh:
471 fh.write(f">{gid}\n{seq}\n")
472 genome_fastas[dom].append(fa_path)
474 # Signal completion of subtask 4 (and thus the entire genome job)
475 progress_dict[task_id] = {"progress": 4}
476 return (path, genome_fastas)
479def extract_markers_genes(
480 genomes: Dict[str, str],
481 out_dir: str,
482 cpus: int = 1,
483 pfam_db: str = os.environ.get("PFAM_HMMDB"),
484 tigr_db: str = os.environ.get("TIGR_HMMDB"),
485 force: bool = False,
486 skip_multiple_hits: bool = False,
487 number_of_hits_to_keep: int = 1,
488) -> Dict[str, Dict[str, List[str]]]:
489 """
490 Extract marker genes from multiple genomes in parallel using Prodigal and HMMER.
491 """
493 # Early exit if no genomes
494 if not genomes:
495 return {}
497 n_genomes = len(genomes)
498 n_workers = min(n_genomes, cpus)
499 cpus_per_proc = max(1, cpus // n_workers)
501 # This shared dict allows us to track progress
502 # across multiple processes
503 manager = Manager()
504 progress_dict = manager.dict()
506 genome_to_task: Dict[str, int] = {}
507 futures_to_task: Dict = {}
509 results: Dict[str, Dict[str, List[str]]] = {}
511 with progress.Progress(
512 progress.TextColumn("[progress.description]{task.description}", justify="right"),
513 progress.BarColumn(),
514 progress.TaskProgressColumn(),
515 progress.TimeElapsedColumn(),
516 ) as rich_progress:
518 # Over progress bar that counts how many genomes finished all 4 steps
519 overall_task = rich_progress.add_task("[cyan]Extracting Markers", total=n_genomes)
521 # Create one sub‐task per genome (each with total=4)
522 for idx, (gid, fasta_path) in enumerate(genomes.items()):
523 # 4 total steps per genome: Prodigal, Pfam, TIGRFAM, and writing FASTAs
524 task_id = rich_progress.add_task(gid, total=4, visible=False, start=False)
525 genome_to_task[gid] = task_id
526 progress_dict[task_id] = {}
528 # Submit all genome jobs to a ProcessPoolExecutor
529 with ProcessPoolExecutor(max_workers=n_workers) as executor:
530 for gid, fasta_path in genomes.items():
531 task_id = genome_to_task[gid]
532 args = (
533 gid,
534 fasta_path,
535 out_dir,
536 cpus_per_proc,
537 pfam_db,
538 tigr_db,
539 force,
540 skip_multiple_hits,
541 number_of_hits_to_keep,
542 progress_dict,
543 task_id,
544 )
545 future = executor.submit(_process_single_genome, args)
546 futures_to_task[future] = (task_id, fasta_path)
548 started_tasks = set() # Track which tasks have started
549 # Monitor progress until all futures complete
550 while futures_to_task:
551 for task_id, status in progress_dict.items():
552 started = "progress" in status # only started tasks have a "progress" key
553 if started and task_id not in started_tasks:
554 # If the task has started or progressed, ensure the progress bar is visible
555 rich_progress.update(task_id, visible=True)
556 rich_progress.start_task(task_id)
557 started_tasks.add(task_id)
558 elif started:
559 # Update the individual genome progress bar
560 rich_progress.update(task_id, completed=status["progress"])
562 # Check for any finished futures
563 done_now = []
564 for fut, (task_id, fasta_path) in futures_to_task.items():
565 if fut.done():
566 done_now.append(fut)
567 rich_progress.update(task_id, visible=False) # Hide the task bar
569 for fut in done_now:
570 task_id, fasta_path = futures_to_task.pop(fut)
571 try:
572 path_key, genome_dict = fut.result()
573 except Exception as e:
574 # If a worker failed, re‐raise
575 # TODO: consider logging the error instead
576 raise RuntimeError(f"Error processing {fasta_path}: {e}") from e
578 # Store the results
579 results[path_key] = genome_dict
581 status = progress_dict.get(task_id, {})
582 if status.get("progress", 0) >= 4:
583 # advance overall by one
584 rich_progress.update(overall_task, advance=1)
586 # Small sleep to avoid busy‐waiting too tightly
587 import time
588 time.sleep(0.2)
589 # Ensure we update the overall task to the final count
590 rich_progress.update(overall_task, completed=n_genomes)
592 return results