Coverage for barbet/markers.py: 59.09%

154 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-12 04:23 +0000

1import os 

2import subprocess 

3from typing import Dict, List, Tuple 

4import gzip 

5import shutil 

6import tempfile 

7from multiprocessing import Manager 

8from concurrent.futures import ProcessPoolExecutor 

9 

10from rich import progress 

11 

12# Marker information 

13BAC120_MARKERS = [ 

14 "PF00380.20", 

15 "PF00410.20", 

16 "PF00466.21", 

17 "PF01025.20", 

18 "PF02576.18", 

19 "PF03726.15", 

20 "TIGR00006", 

21 "TIGR00019", 

22 "TIGR00020", 

23 "TIGR00029", 

24 "TIGR00043", 

25 "TIGR00054", 

26 "TIGR00059", 

27 "TIGR00061", 

28 "TIGR00064", 

29 "TIGR00065", 

30 "TIGR00082", 

31 "TIGR00083", 

32 "TIGR00084", 

33 "TIGR00086", 

34 "TIGR00088", 

35 "TIGR00090", 

36 "TIGR00092", 

37 "TIGR00095", 

38 "TIGR00115", 

39 "TIGR00116", 

40 "TIGR00138", 

41 "TIGR00158", 

42 "TIGR00166", 

43 "TIGR00168", 

44 "TIGR00186", 

45 "TIGR00194", 

46 "TIGR00250", 

47 "TIGR00337", 

48 "TIGR00344", 

49 "TIGR00362", 

50 "TIGR00382", 

51 "TIGR00392", 

52 "TIGR00396", 

53 "TIGR00398", 

54 "TIGR00414", 

55 "TIGR00416", 

56 "TIGR00420", 

57 "TIGR00431", 

58 "TIGR00435", 

59 "TIGR00436", 

60 "TIGR00442", 

61 "TIGR00445", 

62 "TIGR00456", 

63 "TIGR00459", 

64 "TIGR00460", 

65 "TIGR00468", 

66 "TIGR00472", 

67 "TIGR00487", 

68 "TIGR00496", 

69 "TIGR00539", 

70 "TIGR00580", 

71 "TIGR00593", 

72 "TIGR00615", 

73 "TIGR00631", 

74 "TIGR00634", 

75 "TIGR00635", 

76 "TIGR00643", 

77 "TIGR00663", 

78 "TIGR00717", 

79 "TIGR00755", 

80 "TIGR00810", 

81 "TIGR00922", 

82 "TIGR00928", 

83 "TIGR00959", 

84 "TIGR00963", 

85 "TIGR00964", 

86 "TIGR00967", 

87 "TIGR01009", 

88 "TIGR01011", 

89 "TIGR01017", 

90 "TIGR01021", 

91 "TIGR01029", 

92 "TIGR01032", 

93 "TIGR01039", 

94 "TIGR01044", 

95 "TIGR01059", 

96 "TIGR01063", 

97 "TIGR01066", 

98 "TIGR01071", 

99 "TIGR01079", 

100 "TIGR01082", 

101 "TIGR01087", 

102 "TIGR01128", 

103 "TIGR01146", 

104 "TIGR01164", 

105 "TIGR01169", 

106 "TIGR01171", 

107 "TIGR01302", 

108 "TIGR01391", 

109 "TIGR01393", 

110 "TIGR01394", 

111 "TIGR01510", 

112 "TIGR01632", 

113 "TIGR01951", 

114 "TIGR01953", 

115 "TIGR02012", 

116 "TIGR02013", 

117 "TIGR02027", 

118 "TIGR02075", 

119 "TIGR02191", 

120 "TIGR02273", 

121 "TIGR02350", 

122 "TIGR02386", 

123 "TIGR02397", 

124 "TIGR02432", 

125 "TIGR02729", 

126 "TIGR03263", 

127 "TIGR03594", 

128 "TIGR03625", 

129 "TIGR03632", 

130 "TIGR03654", 

131 "TIGR03723", 

132 "TIGR03725", 

133 "TIGR03953", 

134] 

135 

136AR53_MARKERS = [ 

137 "PF04919.13", 

138 "PF07541.13", 

139 "PF01000.27", 

140 "PF00687.22", 

141 "PF00466.21", 

142 "PF00827.18", 

143 "PF01280.21", 

144 "PF01090.20", 

145 "PF01200.19", 

146 "PF01015.19", 

147 "PF00900.21", 

148 "PF00410.20", 

149 "TIGR00037", 

150 "TIGR00064", 

151 "TIGR00111", 

152 "TIGR00134", 

153 "TIGR00279", 

154 "TIGR00291", 

155 "TIGR00323", 

156 "TIGR00335", 

157 "TIGR00373", 

158 "TIGR00405", 

159 "TIGR00448", 

160 "TIGR00483", 

161 "TIGR00491", 

162 "TIGR00522", 

163 "TIGR00967", 

164 "TIGR00982", 

165 "TIGR01008", 

166 "TIGR01012", 

167 "TIGR01018", 

168 "TIGR01020", 

169 "TIGR01028", 

170 "TIGR01046", 

171 "TIGR01052", 

172 "TIGR01171", 

173 "TIGR01213", 

174 "TIGR01952", 

175 "TIGR02236", 

176 "TIGR02338", 

177 "TIGR02389", 

178 "TIGR02390", 

179 "TIGR03626", 

180 "TIGR03627", 

181 "TIGR03628", 

182 "TIGR03629", 

183 "TIGR03670", 

184 "TIGR03671", 

185 "TIGR03672", 

186 "TIGR03673", 

187 "TIGR03674", 

188 "TIGR03676", 

189 "TIGR03680", 

190] 

191 

192 

193def read_fasta(path: str) -> Dict[str, str]: 

194 """ 

195 Read a FASTA file into a dictionary of sequences. 

196 

197 Parameters 

198 ---------- 

199 path : str 

200 Filesystem path to the input FASTA file. 

201 

202 Returns 

203 ------- 

204 Dict[str, str] 

205 Mapping from sequence ID (the first token after '>' in the header) 

206 to the full sequence string, with any terminal “*” characters stripped. 

207 

208 Raises 

209 ------ 

210 IOError 

211 If the file cannot be opened for reading. 

212 """ 

213 seqs: Dict[str, str] = {} 

214 with open(path) as fh: 

215 header, buffer = None, [] 

216 for line in fh: 

217 line = line.rstrip() 

218 if not line: 

219 continue 

220 if line.startswith(">"): 

221 if header: 

222 seqs[header] = "".join(buffer).strip("*") 

223 header = line[1:].split()[0] 

224 buffer = [] 

225 else: 

226 buffer.append(line) 

227 if header: 

228 seqs[header] = "".join(buffer).strip("*") 

229 return seqs 

230 

231 

232def run_prodigal( 

233 genome_id: str, 

234 fasta_path: str, 

235 out_dir: str, 

236 force: bool 

237) -> str: 

238 """ 

239 Run Prodigal to predict protein-coding genes from a FASTA file. 

240 Parameters 

241 ---------- 

242 genome_id : str 

243 Unique identifier for the genome (used for output directory and file names). 

244 fasta_path : str 

245 Path to the input FASTA file containing genomic sequences. 

246 out_dir : str 

247 Directory where the output protein FASTA file will be saved. 

248 force : bool 

249 If True, overwrite existing output files. 

250 progress_dict : Dict 

251 Shared dictionary to track progress across multiple processes. 

252 task_id : int 

253 Unique task ID for this genome, used to update progress in the shared dict. 

254 Returns 

255 ------- 

256 str 

257 Path to the output protein FASTA file generated by Prodigal. 

258 

259 """ 

260 prot_dir = os.path.join(out_dir, genome_id) 

261 os.makedirs(prot_dir, exist_ok=True) 

262 prot_fa = os.path.join(prot_dir, f"{genome_id}.faa") 

263 

264 if force and os.path.exists(prot_fa): 

265 os.remove(prot_fa) 

266 

267 # If input is gzipped, decompress to a temporary file 

268 if fasta_path.endswith(".gz"): 

269 with gzip.open(fasta_path, "rt") as gz_in, \ 

270 tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".fasta") as tmp_fa: 

271 shutil.copyfileobj(gz_in, tmp_fa) 

272 input_path = tmp_fa.name 

273 else: 

274 input_path = fasta_path 

275 

276 # Run Prodigal 

277 subprocess.run( 

278 ["prodigal", "-a", prot_fa, "-p", "meta", "-i", input_path], 

279 check=True, 

280 stdout=subprocess.DEVNULL, 

281 stderr=subprocess.DEVNULL, 

282 ) 

283 return prot_fa 

284 

285 

286def parse_domtblout_top_hits(domtbl_path: str) -> Dict[str, List[str]]: 

287 """ 

288 Parse a HMMER --domtblout file and select the top hit per sequence. 

289 

290 Implements the GTDB-Tk comparator logic: for each query sequence, 

291 keeps the hit with highest bitscore; ties broken by lower e-value, 

292 then by lexicographically smaller HMM ID. 

293 

294 Parameters 

295 ---------- 

296 domtbl_path : str 

297 Path to the HMMER --domtblout output file. 

298 

299 Returns 

300 ------- 

301 Dict[str, List[str]] 

302 Mapping from HMM ID to a list of sequence IDs that were chosen 

303 as top hit(s) for that HMM. 

304 

305 Raises 

306 ------ 

307 IOError 

308 If the domtblout file cannot be opened. 

309 ValueError 

310 If a non-numeric e-value or bitscore is encountered. 

311 """ 

312 seq_matches: Dict[str, Tuple[str, float, float]] = {} 

313 with open(domtbl_path) as fh: 

314 for line in fh: 

315 if line.startswith("#"): 

316 continue 

317 parts = line.split() 

318 seq_id = parts[0] 

319 hmm_id = parts[3] 

320 evalue = float(parts[4]) 

321 bitscore = float(parts[5]) 

322 # look for the best hit for this sequence 

323 prev = seq_matches.get(seq_id) 

324 if prev is None: 

325 seq_matches[seq_id] = (hmm_id, bitscore, evalue) 

326 else: 

327 # only keep the best hit 

328 prev_hmm_id, prev_b, prev_e = prev 

329 if ( 

330 bitscore > prev_b 

331 or (bitscore == prev_b and evalue < prev_e) 

332 or ( 

333 bitscore == prev_b and evalue == prev_e and hmm_id < prev_hmm_id 

334 ) 

335 ): 

336 seq_matches[seq_id] = (hmm_id, bitscore, evalue) 

337 

338 # now, invert the mapping to get HMM IDs to sequences 

339 hits: Dict[str, List[str]] = {} 

340 for seq_id, (hmm_id, _, _) in seq_matches.items(): 

341 hits.setdefault(hmm_id, []).append(seq_id) 

342 return hits 

343 

344 

345def _process_single_genome( 

346 args: Tuple[ 

347 str, # genome_id 

348 str, # fasta_path 

349 str, # out_dir 

350 int, # cpus_per_proc 

351 str, # pfam_db 

352 str, # tigr_db 

353 bool, # force 

354 bool, # skip_multiple_hits 

355 int, # number_of_hits_to_keep 

356 Dict, # shared progress dict (Manager().dict()) 

357 int, # task_id in Rich.Progress 

358 ], 

359) -> Tuple[str, Dict[str, List[str]]]: 

360 """ 

361 Worker-function for one genome, updated to report progress after each subtask. 

362 Subtasks: 

363 1) Prodigal 

364 2) Pfam HMM search 

365 3) TIGRFAM HMM search 

366 4) Parse + write FASTAs 

367 

368 Returns: ( "path/to/genome.fasta", { "bac120": [...], "ar53": [...] } ) 

369 """ 

370 ( 

371 gid, 

372 path, 

373 out_dir, 

374 cpus_per_proc, 

375 pfam_db, 

376 tigr_db, 

377 force, 

378 skip_multiple_hits, 

379 number_of_hits_to_keep, 

380 progress_dict, 

381 task_id, 

382 ) = args 

383 

384 genome_fastas: Dict[str, List[str]] = {"bac120": [], "ar53": []} 

385 

386 # Signal that we are starting the job 

387 progress_dict[task_id] = {"progress": 0} 

388 

389 # Prodigal 

390 prot_fa = run_prodigal(gid, path, out_dir, force) 

391 prot_seqs = read_fasta(prot_fa) 

392 # Signal to the shared dict that we've completed step 1 (Prodigal) 

393 progress_dict[task_id] = {"progress": 1} 

394 

395 # Pfam HMM search 

396 pf_out = os.path.join(out_dir, gid, "pfam.tblout") 

397 if force and os.path.exists(pf_out): 

398 os.remove(pf_out) 

399 

400 subprocess.run( 

401 [ 

402 "hmmsearch", 

403 "--cpu", 

404 str(cpus_per_proc), 

405 "--notextw", 

406 "-E", 

407 "0.001", 

408 "--domE", 

409 "0.001", 

410 "--tblout", 

411 pf_out, 

412 pfam_db, 

413 prot_fa, 

414 ], 

415 check=True, 

416 stdout=subprocess.DEVNULL, 

417 stderr=subprocess.DEVNULL, 

418 ) 

419 progress_dict[task_id] = {"progress": 2} 

420 

421 # TIGRFAM HMM search 

422 tg_out = os.path.join(out_dir, gid, "tigrfam.tblout") 

423 if force and os.path.exists(tg_out): 

424 os.remove(tg_out) 

425 

426 subprocess.run( 

427 [ 

428 "hmmsearch", 

429 "--cpu", 

430 str(cpus_per_proc), 

431 "--noali", 

432 "--notextw", 

433 "--cut_nc", 

434 "--tblout", 

435 tg_out, 

436 tigr_db, 

437 prot_fa, 

438 ], 

439 check=True, 

440 stdout=subprocess.DEVNULL, 

441 stderr=subprocess.DEVNULL, 

442 ) 

443 progress_dict[task_id] = {"progress": 3} 

444 

445 # Write FASTAs for each marker 

446 pf_hits = parse_domtblout_top_hits(pf_out) 

447 tg_hits = parse_domtblout_top_hits(tg_out) 

448 combined_hits = {**pf_hits, **tg_hits} 

449 

450 for marker, seq_ids in combined_hits.items(): 

451 if not seq_ids: 

452 continue 

453 elif len(seq_ids) == 1: 

454 seqs = [prot_seqs[seq_ids[0]]] 

455 else: 

456 unique_seqs = set(prot_seqs[s] for s in seq_ids) 

457 if len(unique_seqs) != 1 and skip_multiple_hits: 

458 # faster but can miss some markers 

459 continue 

460 seqs = list(unique_seqs)[:number_of_hits_to_keep] 

461 

462 for dom in ("bac120", "ar53"): 

463 if (dom == "bac120" and marker in BAC120_MARKERS) or ( 

464 dom == "ar53" and marker in AR53_MARKERS 

465 ): 

466 genome_dir = os.path.join(out_dir, gid, dom) 

467 os.makedirs(genome_dir, exist_ok=True) 

468 for i, seq in enumerate(seqs, start=1): 

469 fa_path = os.path.join(genome_dir, f"{marker}.{i}.fa") 

470 with open(fa_path, "w") as fh: 

471 fh.write(f">{gid}\n{seq}\n") 

472 genome_fastas[dom].append(fa_path) 

473 

474 # Signal completion of subtask 4 (and thus the entire genome job) 

475 progress_dict[task_id] = {"progress": 4} 

476 return (path, genome_fastas) 

477 

478 

479def extract_markers_genes( 

480 genomes: Dict[str, str], 

481 out_dir: str, 

482 cpus: int = 1, 

483 pfam_db: str = os.environ.get("PFAM_HMMDB"), 

484 tigr_db: str = os.environ.get("TIGR_HMMDB"), 

485 force: bool = False, 

486 skip_multiple_hits: bool = False, 

487 number_of_hits_to_keep: int = 1, 

488) -> Dict[str, Dict[str, List[str]]]: 

489 """ 

490 Extract marker genes from multiple genomes in parallel using Prodigal and HMMER.  

491 """ 

492 

493 # Early exit if no genomes 

494 if not genomes: 

495 return {} 

496 

497 n_genomes = len(genomes) 

498 n_workers = min(n_genomes, cpus) 

499 cpus_per_proc = max(1, cpus // n_workers) 

500 

501 # This shared dict allows us to track progress  

502 # across multiple processes 

503 manager = Manager() 

504 progress_dict = manager.dict() 

505 

506 genome_to_task: Dict[str, int] = {} 

507 futures_to_task: Dict = {} 

508 

509 results: Dict[str, Dict[str, List[str]]] = {} 

510 

511 with progress.Progress( 

512 progress.TextColumn("[progress.description]{task.description}", justify="right"), 

513 progress.BarColumn(), 

514 progress.TaskProgressColumn(), 

515 progress.TimeElapsedColumn(), 

516 ) as rich_progress: 

517 

518 # Over progress bar that counts how many genomes finished all 4 steps 

519 overall_task = rich_progress.add_task("[cyan]Extracting Markers", total=n_genomes) 

520 

521 # Create one sub‐task per genome (each with total=4) 

522 for idx, (gid, fasta_path) in enumerate(genomes.items()): 

523 # 4 total steps per genome: Prodigal, Pfam, TIGRFAM, and writing FASTAs 

524 task_id = rich_progress.add_task(gid, total=4, visible=False, start=False) 

525 genome_to_task[gid] = task_id 

526 progress_dict[task_id] = {} 

527 

528 # Submit all genome jobs to a ProcessPoolExecutor 

529 with ProcessPoolExecutor(max_workers=n_workers) as executor: 

530 for gid, fasta_path in genomes.items(): 

531 task_id = genome_to_task[gid] 

532 args = ( 

533 gid, 

534 fasta_path, 

535 out_dir, 

536 cpus_per_proc, 

537 pfam_db, 

538 tigr_db, 

539 force, 

540 skip_multiple_hits, 

541 number_of_hits_to_keep, 

542 progress_dict, 

543 task_id, 

544 ) 

545 future = executor.submit(_process_single_genome, args) 

546 futures_to_task[future] = (task_id, fasta_path) 

547 

548 started_tasks = set() # Track which tasks have started 

549 # Monitor progress until all futures complete 

550 while futures_to_task: 

551 for task_id, status in progress_dict.items(): 

552 started = "progress" in status # only started tasks have a "progress" key 

553 if started and task_id not in started_tasks: 

554 # If the task has started or progressed, ensure the progress bar is visible 

555 rich_progress.update(task_id, visible=True) 

556 rich_progress.start_task(task_id) 

557 started_tasks.add(task_id) 

558 elif started: 

559 # Update the individual genome progress bar 

560 rich_progress.update(task_id, completed=status["progress"]) 

561 

562 # Check for any finished futures 

563 done_now = [] 

564 for fut, (task_id, fasta_path) in futures_to_task.items(): 

565 if fut.done(): 

566 done_now.append(fut) 

567 rich_progress.update(task_id, visible=False) # Hide the task bar 

568 

569 for fut in done_now: 

570 task_id, fasta_path = futures_to_task.pop(fut) 

571 try: 

572 path_key, genome_dict = fut.result() 

573 except Exception as e: 

574 # If a worker failed, re‐raise 

575 # TODO: consider logging the error instead 

576 raise RuntimeError(f"Error processing {fasta_path}: {e}") from e 

577 

578 # Store the results 

579 results[path_key] = genome_dict 

580 

581 status = progress_dict.get(task_id, {}) 

582 if status.get("progress", 0) >= 4: 

583 # advance overall by one 

584 rich_progress.update(overall_task, advance=1) 

585 

586 # Small sleep to avoid busy‐waiting too tightly 

587 import time 

588 time.sleep(0.2) 

589 # Ensure we update the overall task to the final count 

590 rich_progress.update(overall_task, completed=n_genomes) 

591 

592 return results 

593