Coverage for teiphy/collation.py: 99.92%

1329 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-19 15:22 +0000

1#!/usr/bin/env python3 

2 

3from enum import Enum 

4from typing import List, Union 

5import os 

6from pathlib import Path 

7from datetime import datetime # for calculating the current year (for dating and tree height purposes) 

8import time # to time calculations for users 

9import string # for easy retrieval of character ranges 

10from lxml import etree as et # for reading TEI XML inputs 

11import numpy as np # for random number sampling and collation matrix outputs 

12import pandas as pd # for writing to DataFrames, CSV, Excel, etc. 

13from slugify import slugify # for converting Unicode text from readings to ASCII for NEXUS 

14from jinja2 import Environment, PackageLoader, select_autoescape # for filling output XML templates 

15 

16from .common import xml_ns, tei_ns 

17from .format import Format 

18from .witness import Witness 

19from .variation_unit import VariationUnit 

20 

21 

22class ParsingException(Exception): 

23 pass 

24 

25 

26class WitnessDateException(Exception): 

27 pass 

28 

29 

30class IntrinsicRelationsException(Exception): 

31 pass 

32 

33 

34class ClockModel(str, Enum): 

35 strict = "strict" 

36 uncorrelated = "uncorrelated" 

37 local = "local" 

38 

39 

40class AncestralLogger(str, Enum): 

41 state = "state" 

42 sequence = "sequence" 

43 none = "none" 

44 

45 

46class TableType(str, Enum): 

47 matrix = "matrix" 

48 distance = "distance" 

49 similarity = "similarity" 

50 nexus = "nexus" 

51 long = "long" 

52 

53 

54class Collation: 

55 """Base class for storing TEI XML collation data internally. 

56 

57 This corresponds to the entire XML tree, rooted at the TEI element of the collation. 

58 

59 Attributes: 

60 manuscript_suffixes: A list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses. 

61 trivial_reading_types: A set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading. 

62 missing_reading_types: A set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data. 

63 fill_corrector_lacunae: A boolean flag indicating whether or not to fill "lacunae" in witnesses with type "corrector". 

64 fragmentary_threshold: A float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation. 

65 witnesses: A list of Witness instances contained in this Collation. 

66 witness_index_by_id: A dictionary mapping base witness ID strings to their int indices in the witnesses list. 

67 variation_units: A list of VariationUnit instances contained in this Collation. 

68 readings_by_witness: A dictionary mapping base witness ID strings to lists of reading support coefficients for all units (with at least two substantive readings). 

69 substantive_variation_unit_ids: A list of ID strings for variation units with two or more substantive readings. 

70 substantive_variation_unit_reading_tuples: A list of (variation unit ID, reading ID) tuples for substantive readings. 

71 verbose: A boolean flag indicating whether or not to print timing and debugging details for the user. 

72 """ 

73 

74 def __init__( 

75 self, 

76 xml: et.ElementTree, 

77 manuscript_suffixes: List[str] = [], 

78 trivial_reading_types: List[str] = [], 

79 missing_reading_types: List[str] = [], 

80 fill_corrector_lacunae: bool = False, 

81 fragmentary_threshold: float = None, 

82 dates_file: Union[Path, str] = None, 

83 verbose: bool = False, 

84 ): 

85 """Constructs a new Collation instance with the given settings. 

86 

87 Args: 

88 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

89 manuscript_suffixes: An optional list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses. 

90 trivial_reading_types: An optional set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading. 

91 missing_reading_types: An optional set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data. 

92 fill_corrector_lacunae: An optional flag indicating whether or not to fill "lacunae" in witnesses with type "corrector". 

93 fragmentary_threshold: An optional float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation. 

94 dates_file: An optional path to a CSV file containing witness IDs, minimum dates, and maximum dates. If specified, then for all witnesses in the first column, any existing date ranges for them in the TEI XML collation will be ignored. 

95 verbose: An optional flag indicating whether or not to print timing and debugging details for the user. 

96 """ 

97 self.manuscript_suffixes = manuscript_suffixes 

98 self.trivial_reading_types = set(trivial_reading_types) 

99 self.missing_reading_types = set(missing_reading_types) 

100 self.fill_corrector_lacunae = fill_corrector_lacunae 

101 self.fragmentary_threshold = fragmentary_threshold 

102 self.verbose = verbose 

103 self.witnesses = [] 

104 self.witness_index_by_id = {} 

105 self.variation_units = [] 

106 self.readings_by_witness = {} 

107 self.variation_unit_ids = [] 

108 self.substantive_variation_unit_reading_tuples = [] 

109 self.substantive_readings_by_variation_unit_id = {} 

110 self.intrinsic_categories = [] 

111 self.intrinsic_odds_by_id = {} 

112 self.transcriptional_categories = [] 

113 self.transcriptional_rates_by_id = {} 

114 self.origin_date_range = [] 

115 # Now parse the XML tree to populate these data structures: 

116 if self.verbose: 

117 print("Initializing collation...") 

118 t0 = time.time() 

119 self.parse_origin_date_range(xml) 

120 self.parse_list_wit(xml) 

121 self.validate_wits(xml) 

122 # If a dates file was specified, then update the witness date ranges manually: 

123 if dates_file is not None: 

124 self.update_witness_date_ranges_from_dates_file(dates_file) 

125 # If the upper bound on a work's date of origin is not defined, then attempt to assign it an upper bound based on the witness dates; 

126 # otherwise, attempt to assign lower bounds to witness dates based on it: 

127 if self.origin_date_range[1] is None: 

128 self.update_origin_date_range_from_witness_date_ranges() 

129 else: 

130 self.update_witness_date_ranges_from_origin_date_range() 

131 self.parse_intrinsic_odds(xml) 

132 self.parse_transcriptional_rates(xml) 

133 self.parse_apps(xml) 

134 self.validate_intrinsic_relations() 

135 self.parse_readings_by_witness() 

136 # If a threshold of readings for fragmentary witnesses is specified, then filter the witness list using the dictionary mapping witness IDs to readings: 

137 if self.fragmentary_threshold is not None: 

138 self.filter_fragmentary_witnesses(xml) 

139 t1 = time.time() 

140 if self.verbose: 

141 print("Total time to initialize collation: %0.4fs." % (t1 - t0)) 

142 

143 def parse_origin_date_range(self, xml: et.ElementTree): 

144 """Given an XML tree for a collation, populates this Collation's list of origin date bounds. 

145 

146 Args: 

147 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

148 """ 

149 if self.verbose: 

150 print("Parsing origin date range...") 

151 t0 = time.time() 

152 self.origin_date_range = [None, None] 

153 for date in xml.xpath( 

154 "//tei:sourceDesc//tei:bibl//tei:date|//tei:sourceDesc//tei:biblStruct//tei:date|//tei:sourceDesc//tei:biblFull//tei:date", 

155 namespaces={"tei": tei_ns}, 

156 ): 

157 # Try the @when attribute first; if it is set, then it accounts for both ends of the date range: 

158 if date.get("when") is not None: 

159 self.origin_date_range[0] = int(date.get("when").split("-")[0]) 

160 self.origin_date_range[1] = self.origin_date_range[0] 

161 # Failing that, if it has @from and @to attributes (indicating the period over which the work was completed), 

162 # then the completion date of the work accounts for both ends of the date range: 

163 elif date.get("to") is not None: 

164 self.origin_date_range[0] = int(date.get("to").split("-")[0]) 

165 self.origin_date_range[1] = self.origin_date_range[0] 

166 # Failing that, set lower and upper bounds on the origin date using the the @notBefore and @notAfter attributes: 

167 elif date.get("notBefore") is not None or date.get("notAfter") is not None: 

168 if date.get("notBefore") is not None: 

169 self.origin_date_range[0] = int(date.get("notBefore").split("-")[0]) 

170 if date.get("notAfter") is not None: 

171 self.origin_date_range[1] = int(date.get("notAfter").split("-")[0]) 

172 return 

173 

174 def get_base_wit(self, wit: str): 

175 """Given a witness siglum, strips of the specified manuscript suffixes until the siglum matches one in the witness list or until no more suffixes can be stripped. 

176 

177 Args: 

178 wit: A string representing a witness siglum, potentially including suffixes to be stripped. 

179 """ 

180 base_wit = wit 

181 # If our starting siglum corresponds to a siglum in the witness list, then just return it: 

182 if base_wit in self.witness_index_by_id: 

183 return base_wit 

184 # Otherwise, strip any suffixes we find until the siglum corresponds to a base witness in the list 

185 # or no more suffixes can be stripped: 

186 suffix_found = True 

187 while suffix_found: 

188 suffix_found = False 

189 for suffix in self.manuscript_suffixes: 

190 if base_wit.endswith(suffix): 

191 suffix_found = True 

192 base_wit = base_wit[: -len(suffix)] 

193 break # stop looking for other suffixes 

194 # If the siglum stripped of this suffix now corresponds to a siglum in the witness list, then return it: 

195 if base_wit in self.witness_index_by_id: 

196 return base_wit 

197 # If we get here, then all possible manuscript suffixes have been stripped, and the resulting siglum does not correspond to a siglum in the witness list: 

198 return base_wit 

199 

200 def parse_list_wit(self, xml: et.ElementTree): 

201 """Given an XML tree for a collation, populates its list of witnesses from its listWit element. 

202 If the XML tree does not contain a listWit element, then a ParsingException is thrown listing all distinct witness sigla encountered in the collation. 

203 

204 Args: 

205 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

206 """ 

207 if self.verbose: 

208 print("Parsing witness list...") 

209 t0 = time.time() 

210 self.witnesses = [] 

211 self.witness_index_by_id = {} 

212 list_wits = xml.xpath("/tei:TEI//tei:listWit", namespaces={"tei": tei_ns}) 

213 if len(list_wits) == 0: 

214 # There is no listWit element: collect all distinct witness sigla in the collation and raise a ParsingException listing them: 

215 distinct_sigla = set() 

216 sigla = [] 

217 # Proceed for each rdg, rdgGrp, or witDetail element: 

218 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}): 

219 wit_str = rdg.get("wit") if rdg.get("wit") is not None else "" 

220 wits = wit_str.split() 

221 for wit in wits: 

222 siglum = wit.strip("#") # remove the URI prefix, if there is one 

223 if siglum not in distinct_sigla: 

224 distinct_sigla.add(siglum) 

225 sigla.append(siglum) 

226 sigla.sort() 

227 msg = "" 

228 msg += "An explicit listWit element must be included in the TEI XML collation.\n" 

229 msg += "The following sigla occur in the collation and should be included as the @xml:id or @n attributes of witness elements under the listWit element:\n" 

230 msg += ", ".join(sigla) 

231 raise ParsingException(msg) 

232 # Otherwise, take the first listWit element as the list of all witnesses and process it: 

233 list_wit = list_wits[0] 

234 for witness in list_wit.xpath("./tei:witness", namespaces={"tei": tei_ns}): 

235 wit = Witness(witness, self.verbose) 

236 self.witness_index_by_id[wit.id] = len(self.witnesses) 

237 self.witnesses.append(wit) 

238 t1 = time.time() 

239 if self.verbose: 

240 print("Finished processing %d witnesses in %0.4fs." % (len(self.witnesses), t1 - t0)) 

241 return 

242 

243 def validate_wits(self, xml: et.ElementTree): 

244 """Given an XML tree for a collation, checks if any witness sigla listed in a rdg, rdgGrp, or witDetail element, 

245 once stripped of ignored suffixes, is not found in the witness list. 

246 A warning will be issued for each distinct siglum like this. 

247 This method also checks if the upper bound of any witness's date is earlier than the lower bound on the collated work's date of origin 

248 and throws an exception if so. 

249 

250 Args: 

251 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

252 """ 

253 if self.verbose: 

254 print("Validating witness list against collation...") 

255 t0 = time.time() 

256 # There is no listWit element: collect all distinct witness sigla in the collation and raise an exception listing them: 

257 distinct_extra_sigla = set() 

258 extra_sigla = [] 

259 # Proceed for each rdg, rdgGrp, or witDetail element: 

260 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}): 

261 wit_str = rdg.get("wit") if rdg.get("wit") is not None else "" 

262 wits = wit_str.split() 

263 for wit in wits: 

264 siglum = wit.strip("#") # remove the URI prefix, if there is one 

265 base_siglum = self.get_base_wit(siglum) 

266 if base_siglum not in self.witness_index_by_id: 

267 if base_siglum not in distinct_extra_sigla: 

268 distinct_extra_sigla.add(base_siglum) 

269 extra_sigla.append(base_siglum) 

270 if len(extra_sigla) > 0: 

271 extra_sigla.sort() 

272 msg = "" 

273 msg += "WARNING: The following sigla occur in the collation that do not have corresponding witness entries in the listWit:\n" 

274 msg += ", ".join(extra_sigla) 

275 print(msg) 

276 # If the lower bound on the date of origin is defined, then check each witness against it: 

277 if self.origin_date_range[0] is not None: 

278 bad_date_witness_sigla = [] 

279 bad_date_upper_bounds_by_witness = {} 

280 for i, wit in enumerate(self.witnesses): 

281 if wit.date_range[1] is not None and wit.date_range[1] < self.origin_date_range[0]: 

282 bad_date_witness_sigla.append(wit.id) 

283 bad_date_upper_bounds_by_witness[wit.id] = wit.date_range[1] 

284 if len(bad_date_witness_sigla) > 0: 

285 msg = "" 

286 msg += "The following witnesses have their latest possible dates before the earliest date of origin %d specified for the collated work:\n" 

287 msg += ", ".join( 

288 [ 

289 (siglum + "(" + str(bad_date_upper_bounds_by_witness[siglum]) + ")") 

290 for siglum in bad_date_witness_sigla 

291 ] 

292 ) 

293 raise WitnessDateException(msg) 

294 t1 = time.time() 

295 if self.verbose: 

296 print("Finished witness validation in %0.4fs." % (t1 - t0)) 

297 return 

298 

299 def update_witness_date_ranges_from_dates_file(self, dates_file: Union[Path, str]): 

300 """Given a CSV-formatted dates file, update the date ranges of all witnesses whose IDs are in the first column of the dates file 

301 (overwriting existing date ranges if necessary). 

302 """ 

303 if self.verbose: 

304 print("Updating witness dates from file %s..." % (str(dates_file))) 

305 t0 = time.time() 

306 dates_df = pd.read_csv(dates_file, index_col=0, names=["id", "min", "max"]) 

307 for witness in self.witnesses: 

308 wit_id = witness.id 

309 if wit_id in dates_df.index: 

310 # For every witness in the list whose ID is specified in the dates file, 

311 # update their date ranges (as long as the date ranges in the file are are well-formed): 

312 min_date = int(dates_df.loc[wit_id]["min"]) if not np.isnan(dates_df.loc[wit_id]["min"]) else None 

313 max_date = ( 

314 int(dates_df.loc[wit_id]["max"]) 

315 if not np.isnan(dates_df.loc[wit_id]["max"]) 

316 else datetime.now().year 

317 ) 

318 print(min_date, max_date) 

319 if min_date is not None and max_date is not None and min_date > max_date: 

320 raise ParsingException( 

321 "In dates file %s, for witness ID %s, the minimum date %d is greater than the maximum date %d." 

322 % (str(dates_file), wit_id, min_date, max_date) 

323 ) 

324 witness.date_range = [min_date, max_date] 

325 t1 = time.time() 

326 if self.verbose: 

327 print("Finished witness date range updates in %0.4fs." % (t1 - t0)) 

328 return 

329 

330 def update_origin_date_range_from_witness_date_ranges(self): 

331 """Conditionally updates the upper bound on the date of origin of the work represented by this Collation 

332 based on the bounds on the witnesses' dates. 

333 If none of the witnesses have bounds on their dates, then nothing is done. 

334 This method is only invoked if the work's date of origin does not already have its upper bound defined. 

335 """ 

336 if self.verbose: 

337 print("Updating upper bound on origin date using witness dates...") 

338 t0 = time.time() 

339 # Set the origin date to the earliest witness date: 

340 witness_date_lower_bounds = [wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None] 

341 witness_date_upper_bounds = [wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None] 

342 min_witness_date = ( 

343 min(witness_date_lower_bounds + witness_date_upper_bounds) 

344 if len(witness_date_lower_bounds + witness_date_upper_bounds) > 0 

345 else None 

346 ) 

347 if min_witness_date is not None: 

348 self.origin_date_range[1] = ( 

349 min(self.origin_date_range[1], min_witness_date) 

350 if self.origin_date_range[1] is not None 

351 else min_witness_date 

352 ) 

353 t1 = time.time() 

354 if self.verbose: 

355 print("Finished updating upper bound on origin date in %0.4fs." % (t1 - t0)) 

356 return 

357 

358 def update_witness_date_ranges_from_origin_date_range(self): 

359 """Attempts to update the lower bounds on the witnesses' dates of origin of the work represented by this Collation 

360 using the upper bound on the date of origin of the work represented by this Collation. 

361 This method is only invoked if the upper bound on the work's date of origin was not already defined 

362 (i.e., if update_origin_date_range_from_witness_date_ranges was not invoked). 

363 """ 

364 if self.verbose: 

365 print("Updating lower bounds on witness dates using origin date...") 

366 t0 = time.time() 

367 # Proceed for every witness: 

368 for i, wit in enumerate(self.witnesses): 

369 # Ensure that the lower bound on this witness's date is no earlier than the upper bound on the date of the work's origin: 

370 wit.date_range[0] = ( 

371 max(wit.date_range[0], self.origin_date_range[1]) 

372 if wit.date_range[0] is not None 

373 else self.origin_date_range[1] 

374 ) 

375 # Then ensure that the upper bound on this witness's date is no earlier than its lower bound, in case we updated it: 

376 wit.date_range[1] = max(wit.date_range[0], wit.date_range[1]) 

377 t1 = time.time() 

378 if self.verbose: 

379 print("Finished updating lower bounds on witness dates in %0.4fs." % (t1 - t0)) 

380 return 

381 

382 def parse_intrinsic_odds(self, xml: et.ElementTree): 

383 """Given an XML tree for a collation, populates this Collation's list of intrinsic probability categories 

384 (e.g., "absolutely more likely," "highly more likely," "more likely," "slightly more likely," "equally likely") 

385 and its dictionary mapping these categories to numerical odds. 

386 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated. 

387 

388 Args: 

389 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

390 """ 

391 if self.verbose: 

392 print("Parsing intrinsic odds categories...") 

393 t0 = time.time() 

394 self.intrinsic_categories = [] 

395 self.intrinsic_odds_by_id = {} 

396 for interp in xml.xpath("//tei:interpGrp[@type=\"intrinsic\"]/tei:interp", namespaces={"tei": tei_ns}): 

397 # These must be indexed by the xml:id attribute, so skip any that do not have one: 

398 if interp.get("{%s}id" % xml_ns) is None: 

399 continue 

400 odds_category = interp.get("{%s}id" % xml_ns) 

401 # If this element contains a certainty subelement with a fixed odds value for this category, then set it: 

402 odds = None 

403 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}): 

404 if certainty.get("degree") is not None: 

405 odds = float(certainty.get("degree")) 

406 break 

407 self.intrinsic_categories.append(odds_category) 

408 self.intrinsic_odds_by_id[odds_category] = odds 

409 t1 = time.time() 

410 if self.verbose: 

411 print( 

412 "Finished processing %d intrinsic odds categories in %0.4fs." 

413 % (len(self.intrinsic_categories), t1 - t0) 

414 ) 

415 return 

416 

417 def parse_transcriptional_rates(self, xml: et.ElementTree): 

418 """Given an XML tree for a collation, populates this Collation's dictionary mapping transcriptional change categories 

419 (e.g., "aural confusion," "visual error," "clarification") to numerical rates. 

420 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated. 

421 

422 Args: 

423 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

424 """ 

425 if self.verbose: 

426 print("Parsing transcriptional change categories...") 

427 t0 = time.time() 

428 self.transcriptional_categories = [] 

429 self.transcriptional_rates_by_id = {} 

430 for interp in xml.xpath("//tei:interpGrp[@type=\"transcriptional\"]/tei:interp", namespaces={"tei": tei_ns}): 

431 # These must be indexed by the xml:id attribute, so skip any that do not have one: 

432 if interp.get("{%s}id" % xml_ns) is None: 

433 continue 

434 transcriptional_category = interp.get("{%s}id" % xml_ns) 

435 # If this element contains a certainty subelement with a fixed rate for this category, then set it: 

436 rate = None 

437 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}): 

438 if certainty.get("degree") is not None: 

439 rate = float(certainty.get("degree")) 

440 break 

441 self.transcriptional_categories.append(transcriptional_category) 

442 self.transcriptional_rates_by_id[transcriptional_category] = rate 

443 t1 = time.time() 

444 if self.verbose: 

445 print( 

446 "Finished processing %d transcriptional change categories in %0.4fs." 

447 % (len(self.transcriptional_rates_by_id), t1 - t0) 

448 ) 

449 return 

450 

451 def validate_intrinsic_relations(self): 

452 """Checks if any VariationUnit's intrinsic_relations map is not a forest. 

453 If any is not, then an IntrinsicRelationsException is thrown describing the VariationUnit at fault. 

454 """ 

455 if self.verbose: 

456 print("Validating intrinsic relation graphs for variation units...") 

457 t0 = time.time() 

458 for vu in self.variation_units: 

459 # Skip any variation units with an empty intrinsic_relations map: 

460 if len(vu.intrinsic_relations) == 0: 

461 continue 

462 # For all others, start by identifying all reading IDs that are not related to by some other reading ID: 

463 in_degree_by_reading = {} 

464 for edge in vu.intrinsic_relations: 

465 s = edge[0] 

466 t = edge[1] 

467 if s not in in_degree_by_reading: 

468 in_degree_by_reading[s] = 0 

469 if t not in in_degree_by_reading: 

470 in_degree_by_reading[t] = 0 

471 in_degree_by_reading[t] += 1 

472 # If any reading has more than one relation pointing to it, then the intrinsic relations graph is not a forest: 

473 excessive_in_degree_readings = [ 

474 rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] > 1 

475 ] 

476 if len(excessive_in_degree_readings) > 0: 

477 msg = "" 

478 msg += ( 

479 "In variation unit %s, the following readings have more than one intrinsic relation pointing to them: %s.\n" 

480 % (vu.id, ", ".join(excessive_in_degree_readings)) 

481 ) 

482 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it." 

483 raise IntrinsicRelationsException(msg) 

484 # If every reading has another reading pointing to it, then the intrinsic relations graph contains a cycle and is not a forest: 

485 starting_nodes = [rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] == 0] 

486 if len(starting_nodes) == 0: 

487 msg = "" 

488 msg += "In variation unit %s, the intrinsic relations contain a cycle.\n" % vu.id 

489 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it." 

490 raise IntrinsicRelationsException(msg) 

491 t1 = time.time() 

492 if self.verbose: 

493 print("Finished intrinsic relations validation in %0.4fs." % (t1 - t0)) 

494 return 

495 

496 def parse_apps(self, xml: et.ElementTree): 

497 """Given an XML tree for a collation, populates its list of variation units from its app elements. 

498 

499 Args: 

500 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

501 """ 

502 if self.verbose: 

503 print("Parsing variation units...") 

504 t0 = time.time() 

505 for a in xml.xpath('//tei:app', namespaces={'tei': tei_ns}): 

506 vu = VariationUnit(a, self.verbose) 

507 self.variation_units.append(vu) 

508 t1 = time.time() 

509 if self.verbose: 

510 print("Finished processing %d variation units in %0.4fs." % (len(self.variation_units), t1 - t0)) 

511 return 

512 

513 def get_readings_by_witness_for_unit(self, vu: VariationUnit): 

514 """Returns a dictionary mapping witness IDs to a list of their reading coefficients for a given variation unit. 

515 

516 Args: 

517 vu: A VariationUnit to be processed. 

518 

519 Returns: 

520 A dictionary mapping witness ID strings to a list of their coefficients for all substantive readings in this VariationUnit. 

521 """ 

522 # In a first pass, populate lists of substantive (variation unit ID, reading ID) tuples and reading labels 

523 # and a map from reading IDs to the indices of their parent substantive reading in this unit: 

524 reading_id_to_index = {} 

525 self.substantive_readings_by_variation_unit_id[vu.id] = [] 

526 for rdg in vu.readings: 

527 # If this reading is missing (e.g., lacunose or inapplicable due to an overlapping variant) or targets another reading, then skip it: 

528 if rdg.type in self.missing_reading_types or len(rdg.certainties) > 0: 

529 continue 

530 # If this reading is trivial, then map it to the last substantive index: 

531 if rdg.type in self.trivial_reading_types: 

532 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1 

533 continue 

534 # Otherwise, the reading is substantive: add it to the map and update the last substantive index: 

535 self.substantive_readings_by_variation_unit_id[vu.id].append(rdg.id) 

536 self.substantive_variation_unit_reading_tuples.append(tuple([vu.id, rdg.id])) 

537 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1 

538 # If the list of substantive readings only contains one entry, then this variation unit is not informative; 

539 # return an empty dictionary and add nothing to the list of substantive reading labels: 

540 if self.verbose: 

541 print( 

542 "Variation unit %s has %d substantive readings." 

543 % (vu.id, len(self.substantive_readings_by_variation_unit_id[vu.id])) 

544 ) 

545 readings_by_witness_for_unit = {} 

546 # Initialize the output dictionary with empty sets for all base witnesses: 

547 for wit in self.witnesses: 

548 readings_by_witness_for_unit[wit.id] = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id]) 

549 # In a second pass, assign each base witness a set containing the readings it supports in this unit: 

550 for rdg in vu.readings: 

551 # Initialize the dictionary indicating support for this reading (or its disambiguations): 

552 rdg_support = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id]) 

553 # If this is a missing reading (e.g., a lacuna or an overlap), then we can skip it, as its corresponding set will be empty: 

554 if rdg.type in self.missing_reading_types: 

555 continue 

556 # Otherwise, if this reading is trivial, then it will contain an entry for the index of its parent substantive reading: 

557 elif rdg.type in self.trivial_reading_types: 

558 rdg_support[reading_id_to_index[rdg.id]] = 1 

559 # Otherwise, if this reading has one or more nonzero certainty degrees, 

560 # then set the entries for these readings to their degrees: 

561 elif sum(rdg.certainties.values()) > 0: 

562 for t in rdg.certainties: 

563 # Skip any reading whose ID is unrecognized in this unit: 

564 if t in reading_id_to_index: 

565 rdg_support[reading_id_to_index[t]] = rdg.certainties[t] 

566 # Otherwise, if this reading has one or more targets (i.e., if it is an ambiguous reading), 

567 # then set the entries for each of its targets to 1: 

568 elif len(rdg.targets) > 0: 

569 for t in rdg.targets: 

570 # Skip any reading whose ID is unrecognized in this unit: 

571 if t in reading_id_to_index: 

572 rdg_support[reading_id_to_index[t]] = 1 

573 # Otherwise, this reading is itself substantive; set the entry for the index of this reading to 1: 

574 else: 

575 rdg_support[reading_id_to_index[rdg.id]] = 1 

576 # Proceed for each witness siglum in the support for this reading: 

577 for wit in rdg.wits: 

578 # Is this siglum a base siglum? 

579 base_wit = self.get_base_wit(wit) 

580 if base_wit not in self.witness_index_by_id: 

581 # If it is not, then it is probably just because we've encountered a corrector or some other secondary witness not included in the witness list; 

582 # report this if we're in verbose mode and move on: 

583 if self.verbose: 

584 print( 

585 "Skipping unknown witness siglum %s (base siglum %s) in variation unit %s, reading %s..." 

586 % (wit, base_wit, vu.id, rdg.id) 

587 ) 

588 continue 

589 # If we've found a base siglum, then add this reading's contribution to the base witness's reading set for this unit; 

590 # normally the existing set will be empty, but if we reduce two suffixed sigla to the same base witness, 

591 # then that witness may attest to multiple readings in the same unit: 

592 readings_by_witness_for_unit[base_wit] = [ 

593 (min(readings_by_witness_for_unit[base_wit][i] + rdg_support[i], 1)) 

594 for i in range(len(rdg_support)) 

595 ] 

596 return readings_by_witness_for_unit 

597 

598 def parse_readings_by_witness(self): 

599 """Populates the internal dictionary mapping witness IDs to a list of their reading support sets for all variation units, and then fills the empty reading support sets for witnesses of type "corrector" with the entries of the previous witness.""" 

600 if self.verbose: 

601 print("Populating internal dictionary of witness readings...") 

602 t0 = time.time() 

603 # Initialize the data structures to be populated here: 

604 self.readings_by_witness = {} 

605 self.variation_unit_ids = [] 

606 for wit in self.witnesses: 

607 self.readings_by_witness[wit.id] = [] 

608 # Populate them for each variation unit: 

609 for vu in self.variation_units: 

610 readings_by_witness_for_unit = self.get_readings_by_witness_for_unit(vu) 

611 if len(readings_by_witness_for_unit) > 0: 

612 self.variation_unit_ids.append(vu.id) 

613 for wit in readings_by_witness_for_unit: 

614 self.readings_by_witness[wit].append(readings_by_witness_for_unit[wit]) 

615 # Optionally, fill the lacunae of the correctors: 

616 if self.fill_corrector_lacunae: 

617 for i, wit in enumerate(self.witnesses): 

618 # If this is the first witness, then it shouldn't be a corrector (since there is no previous witness against which to compare it): 

619 if i == 0: 

620 continue 

621 # Otherwise, if this witness is not a corrector, then skip it: 

622 if wit.type != "corrector": 

623 continue 

624 # Otherwise, retrieve the previous witness: 

625 prev_wit = self.witnesses[i - 1] 

626 for j in range(len(self.readings_by_witness[wit.id])): 

627 # If the corrector has no reading, then set it to the previous witness's reading: 

628 if sum(self.readings_by_witness[wit.id][j]) == 0: 

629 self.readings_by_witness[wit.id][j] = self.readings_by_witness[prev_wit.id][j] 

630 t1 = time.time() 

631 if self.verbose: 

632 print( 

633 "Populated dictionary for %d witnesses over %d substantive variation units in %0.4fs." 

634 % (len(self.witnesses), len(self.variation_unit_ids), t1 - t0) 

635 ) 

636 return 

637 

638 def filter_fragmentary_witnesses(self, xml): 

639 """Filters the original witness list and readings by witness dictionary to exclude witnesses whose proportions of extant passages fall below the fragmentary readings threshold.""" 

640 if self.verbose: 

641 print( 

642 "Filtering fragmentary witnesses (extant in < %f of all variation units) out of internal witness list and dictionary of witness readings..." 

643 % self.fragmentary_threshold 

644 ) 

645 t0 = time.time() 

646 fragmentary_witness_set = set() 

647 # Proceed for each witness in order: 

648 for wit in self.witnesses: 

649 wit_id = wit.id 

650 # We count the number of variation units at which this witness has an extant (i.e., non-missing) reading: 

651 extant_reading_count = 0 

652 total_reading_count = len(self.readings_by_witness[wit.id]) 

653 # Proceed through all reading support lists: 

654 for rdg_support in self.readings_by_witness[wit_id]: 

655 # If the current reading support list is not all zeroes, then increment this witness's count of extant readings: 

656 if sum(rdg_support) != 0: 

657 extant_reading_count += 1 

658 # If the proportion of extant readings falls below the threshold, then add this witness to the list of fragmentary witnesses: 

659 if extant_reading_count / total_reading_count < self.fragmentary_threshold: 

660 fragmentary_witness_set.add(wit_id) 

661 # Then filter the witness list to exclude the fragmentary witnesses: 

662 filtered_witnesses = [wit for wit in self.witnesses if wit.id not in fragmentary_witness_set] 

663 self.witnesses = filtered_witnesses 

664 # Then remove the entries for the fragmentary witnesses from the witnesses-to-readings dictionary: 

665 for wit_id in fragmentary_witness_set: 

666 del self.readings_by_witness[wit_id] 

667 t1 = time.time() 

668 if self.verbose: 

669 print( 

670 "Filtered out %d fragmentary witness(es) (%s) in %0.4fs." 

671 % (len(fragmentary_witness_set), str(list(fragmentary_witness_set)), t1 - t0) 

672 ) 

673 return 

674 

675 def get_nexus_symbols(self): 

676 """Returns a list of one-character symbols needed to represent the states of all substantive readings in NEXUS. 

677 

678 The number of symbols equals the maximum number of substantive readings at any variation unit. 

679 

680 Returns: 

681 A list of individual characters representing states in readings. 

682 """ 

683 # NOTE: IQTREE does not appear to support symbols outside of 0-9 and a-z, and its base symbols must be case-insensitive. 

684 # The official version of MrBayes is likewise limited to 32 symbols. 

685 # But PAUP* allows up to 64 symbols, and Andrew Edmondson's fork of MrBayes does, as well. 

686 # So this method will support symbols from 0-9, a-z, and A-Z (for a total of 62 states) 

687 possible_symbols = list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase) 

688 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

689 nsymbols = 0 

690 # If there are no witnesses, then no symbols are needed at all: 

691 if len(self.witnesses) == 0: 

692 return [] 

693 wit_id = self.witnesses[0].id 

694 for rdg_support in self.readings_by_witness[wit_id]: 

695 nsymbols = max(nsymbols, len(rdg_support)) 

696 nexus_symbols = possible_symbols[:nsymbols] 

697 return nexus_symbols 

698 

699 def to_nexus( 

700 self, 

701 file_addr: Union[Path, str], 

702 drop_constant: bool = False, 

703 char_state_labels: bool = True, 

704 frequency: bool = False, 

705 ambiguous_as_missing: bool = False, 

706 calibrate_dates: bool = False, 

707 mrbayes: bool = False, 

708 clock_model: ClockModel = ClockModel.strict, 

709 ): 

710 """Writes this Collation to a NEXUS file with the given address. 

711 

712 Args: 

713 file_addr: A string representing the path to an output NEXUS file; the file type should be .nex, .nexus, or .nxs. 

714 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

715 char_state_labels: An optional flag indicating whether or not to include the CharStateLabels block. 

716 frequency: An optional flag indicating whether to use the StatesFormat=Frequency setting 

717 instead of the StatesFormat=StatesPresent setting 

718 (and thus represent all states with frequency vectors rather than symbols). 

719 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation. 

720 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data. 

721 If this flag is set, then only base symbols will be generated for the NEXUS file. 

722 It is only applied if the frequency option is False. 

723 calibrate_dates: An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses. 

724 This option is intended for inputs to BEAST 2. 

725 mrbayes: An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses. 

726 This option is intended for inputs to MrBayes. 

727 clock_model: A ClockModel option indicating which type of clock model to use. 

728 This option is intended for inputs to MrBayes and BEAST 2. 

729 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified. 

730 """ 

731 # Populate a list of sites that will correspond to columns of the sequence alignment: 

732 substantive_variation_unit_ids = self.variation_unit_ids 

733 if drop_constant: 

734 substantive_variation_unit_ids = [ 

735 vu_id 

736 for vu_id in self.variation_unit_ids 

737 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

738 ] 

739 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

740 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

741 # Start by calculating the values we will be using here: 

742 ntax = len(self.witnesses) 

743 nchar = len(substantive_variation_unit_ids) 

744 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses] 

745 max_taxlabel_length = max( 

746 [len(taxlabel) for taxlabel in taxlabels] 

747 ) # keep track of the longest taxon label for tabular alignment purposes 

748 charlabels = [slugify(vu_id, lowercase=False, separator='_') for vu_id in substantive_variation_unit_ids] 

749 missing_symbol = '?' 

750 symbols = self.get_nexus_symbols() 

751 # Generate all parent folders for this file that don't already exist: 

752 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

753 with open(file_addr, "w", encoding="utf-8") as f: 

754 # Start with the NEXUS header: 

755 f.write("#NEXUS\n\n") 

756 # Then begin the data block: 

757 f.write("Begin DATA;\n") 

758 # Write the collation matrix dimensions: 

759 f.write("\tDimensions ntax=%d nchar=%d;\n" % (ntax, nchar)) 

760 # Write the format subblock: 

761 f.write("\tFormat\n") 

762 f.write("\t\tDataType=Standard\n") 

763 f.write("\t\tMissing=%s\n" % missing_symbol) 

764 if frequency: 

765 f.write("\t\tStatesFormat=Frequency\n") 

766 f.write("\t\tSymbols=\"%s\";\n" % (" ".join(symbols))) 

767 # If the char_state_labels is set, then write the labels for character-state labels, with each on its own line: 

768 if char_state_labels: 

769 f.write("\tCharStateLabels") 

770 vu_ind = 1 

771 for vu in self.variation_units: 

772 if vu.id not in substantive_variation_unit_ids_set: 

773 continue 

774 if vu_ind == 1: 

775 f.write("\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_'))) 

776 else: 

777 f.write(",\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_'))) 

778 rdg_ind = 0 

779 for rdg in vu.readings: 

780 key = tuple([vu.id, rdg.id]) 

781 if key not in substantive_variation_unit_reading_tuples_set: 

782 continue 

783 ascii_rdg_text = slugify( 

784 rdg.text, lowercase=False, separator='_', replacements=[['η', 'h'], ['ω', 'w']] 

785 ) 

786 if ascii_rdg_text == "": 

787 ascii_rdg_text = "om." 

788 f.write(" %s" % ascii_rdg_text) 

789 rdg_ind += 1 

790 if rdg_ind > 0: 

791 vu_ind += 1 

792 f.write(";\n") 

793 # Write the matrix subblock: 

794 f.write("\tMatrix") 

795 for i, wit in enumerate(self.witnesses): 

796 taxlabel = taxlabels[i] 

797 if frequency: 

798 sequence = "\n\t\t" + taxlabel 

799 for j, vu_id in enumerate(self.variation_unit_ids): 

800 if vu_id not in substantive_variation_unit_ids_set: 

801 continue 

802 rdg_support = self.readings_by_witness[wit.id][j] 

803 sequence += "\n\t\t\t" 

804 # If this reading is lacunose in this witness, then use the missing character: 

805 if sum(rdg_support) == 0: 

806 sequence += missing_symbol 

807 continue 

808 # Otherwise, print out its frequencies for different readings in parentheses: 

809 sequence += "(" 

810 for j, w in enumerate(rdg_support): 

811 sequence += "%s:%0.4f" % (symbols[j], w) 

812 if j < len(rdg_support) - 1: 

813 sequence += " " 

814 sequence += ")" 

815 else: 

816 sequence = "\n\t\t" + taxlabel 

817 # Add enough space after this label ensure that all sequences are nicely aligned: 

818 sequence += " " * (max_taxlabel_length - len(taxlabel) + 1) 

819 for j, vu_id in enumerate(self.variation_unit_ids): 

820 if vu_id not in substantive_variation_unit_ids_set: 

821 continue 

822 rdg_support = self.readings_by_witness[wit.id][j] 

823 # If this reading is lacunose in this witness, then use the missing character: 

824 if sum(rdg_support) == 0: 

825 sequence += missing_symbol 

826 continue 

827 rdg_inds = [ 

828 k for k, w in enumerate(rdg_support) if w > 0 

829 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

830 # For singleton readings, just print the symbol: 

831 if len(rdg_inds) == 1: 

832 sequence += symbols[rdg_inds[0]] 

833 continue 

834 # For multiple readings, print the corresponding readings in braces or the missing symbol depending on input settings: 

835 if ambiguous_as_missing: 

836 sequence += missing_symbol 

837 else: 

838 sequence += "{%s}" % "".join([str(rdg_ind) for rdg_ind in rdg_inds]) 

839 f.write("%s" % (sequence)) 

840 f.write(";\n") 

841 # End the data block: 

842 f.write("End;") 

843 # If calibrate_dates is set, then add the assumptions block: 

844 if calibrate_dates: 

845 f.write("\n\n") 

846 f.write("Begin ASSUMPTIONS;\n") 

847 # Set the scale to years: 

848 f.write("\tOPTIONS SCALE = years;\n\n") 

849 # Then calibrate the witness ages: 

850 calibrate_strings = [] 

851 for i, wit in enumerate(self.witnesses): 

852 taxlabel = taxlabels[i] 

853 date_range = wit.date_range 

854 if date_range[0] is not None: 

855 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution, 

856 # depending on whether the upper and lower bounds match: 

857 min_age = datetime.now().year - date_range[1] 

858 max_age = datetime.now().year - date_range[0] 

859 if min_age == max_age: 

860 calibrate_string = "\tCALIBRATE %s = fixed(%d)" % (taxlabel, min_age) 

861 calibrate_strings.append(calibrate_string) 

862 else: 

863 calibrate_string = "\tCALIBRATE %s = uniform(%d,%d)" % (taxlabel, min_age, max_age) 

864 calibrate_strings.append(calibrate_string) 

865 else: 

866 # If there is no lower bound on the witness's date, then use an offset log-normal distribution: 

867 min_age = datetime.now().year - date_range[1] 

868 calibrate_string = "\tCALIBRATE %s = offsetlognormal(%d,0.0,1.0)" % (taxlabel, min_age) 

869 calibrate_strings.append(calibrate_string) 

870 # Then print the calibrate strings, separated by commas and line breaks and terminated by a semicolon: 

871 f.write("%s;\n\n" % ",\n".join(calibrate_strings)) 

872 # End the assumptions block: 

873 f.write("End;") 

874 # If mrbayes is set, then add the mrbayes block: 

875 if mrbayes: 

876 f.write("\n\n") 

877 f.write("Begin MRBAYES;\n") 

878 # Turn on the autoclose feature by default: 

879 f.write("\tset autoclose=yes;\n") 

880 # Set the branch lengths to be governed by a birth-death clock model, and set up the parameters for this model: 

881 f.write("\n") 

882 f.write("\tprset brlenspr = clock:birthdeath;\n") 

883 f.write("\tprset speciationpr = uniform(0.0,10.0);\n") 

884 f.write("\tprset extinctionpr = beta(2.0,4.0);\n") 

885 f.write("\tprset sampleprob = 0.01;\n") 

886 # Use the specified clock model: 

887 f.write("\n") 

888 if clock_model == clock_model.uncorrelated: 

889 f.write("\tprset clockvarpr=igr;\n") 

890 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n") 

891 f.write("\tprset igrvarpr=exponential(1.0);\n") 

892 else: 

893 f.write("\tprset clockvarpr=strict;\n") 

894 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n") 

895 # Set the priors on the tree age depending on the date range for the origin of the collated work: 

896 f.write("\n") 

897 if self.origin_date_range[0] is not None: 

898 min_tree_age = ( 

899 datetime.now().year - self.origin_date_range[1] 

900 if self.origin_date_range[1] is not None 

901 else 0.0 

902 ) 

903 max_tree_age = datetime.now().year - self.origin_date_range[0] 

904 f.write("\tprset treeagepr = uniform(%d,%d);\n" % (min_tree_age, max_tree_age)) 

905 else: 

906 min_tree_age = ( 

907 datetime.now().year - self.origin_date_range[1] 

908 if self.origin_date_range[1] is not None 

909 else 0.0 

910 ) 

911 f.write("\tprset treeagepr = offsetgamma(%d,1.0,1.0);\n" % (min_tree_age)) 

912 # Then calibrate the witness ages: 

913 f.write("\n") 

914 f.write("\tprset nodeagepr = calibrated;\n") 

915 for i, wit in enumerate(self.witnesses): 

916 taxlabel = taxlabels[i] 

917 date_range = wit.date_range 

918 if date_range[0] is not None: 

919 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution, 

920 # depending on whether the upper and lower bounds match: 

921 min_age = datetime.now().year - date_range[1] 

922 max_age = datetime.now().year - date_range[0] 

923 if min_age == max_age: 

924 f.write("\tcalibrate %s = fixed(%d);\n" % (taxlabel, min_age)) 

925 else: 

926 f.write("\tcalibrate %s = uniform(%d,%d);\n" % (taxlabel, min_age, max_age)) 

927 else: 

928 # If there is no lower bound on the witness's date, then use an offset gamma distribution: 

929 min_age = datetime.now().year - date_range[1] 

930 f.write("\tcalibrate %s = offsetgamma(%d,1.0,1.0);\n" % (taxlabel, min_age)) 

931 f.write("\n") 

932 # Add default settings for MCMC estimation of posterior distribution: 

933 f.write("\tmcmcp ngen=100000;\n") 

934 # Write the command to run MrBayes: 

935 f.write("\tmcmc;\n") 

936 # End the assumptions block: 

937 f.write("End;") 

938 return 

939 

940 def get_hennig86_symbols(self): 

941 """Returns a list of one-character symbols needed to represent the states of all substantive readings in Hennig86 format. 

942 

943 The number of symbols equals the maximum number of substantive readings at any variation unit. 

944 

945 Returns: 

946 A list of individual characters representing states in readings. 

947 """ 

948 possible_symbols = ( 

949 list(string.digits) + list(string.ascii_uppercase)[:22] 

950 ) # NOTE: the maximum number of symbols allowed in Hennig86 format is 32 

951 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

952 nsymbols = 0 

953 # If there are no witnesses, then no symbols are needed at all: 

954 if len(self.witnesses) == 0: 

955 return [] 

956 wit_id = self.witnesses[0].id 

957 for rdg_support in self.readings_by_witness[wit_id]: 

958 nsymbols = max(nsymbols, len(rdg_support)) 

959 hennig86_symbols = possible_symbols[:nsymbols] 

960 return hennig86_symbols 

961 

962 def to_hennig86(self, file_addr: Union[Path, str], drop_constant: bool = False): 

963 """Writes this Collation to a file in Hennig86 format with the given address. 

964 Note that because Hennig86 format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data. 

965 

966 Args: 

967 file_addr: A string representing the path to an output file. 

968 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

969 """ 

970 # Populate a list of sites that will correspond to columns of the sequence alignment: 

971 substantive_variation_unit_ids = self.variation_unit_ids 

972 if drop_constant: 

973 substantive_variation_unit_ids = [ 

974 vu_id 

975 for vu_id in self.variation_unit_ids 

976 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

977 ] 

978 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

979 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

980 # Start by calculating the values we will be using here: 

981 ntax = len(self.witnesses) 

982 nchar = len(substantive_variation_unit_ids) 

983 taxlabels = [] 

984 for wit in self.witnesses: 

985 taxlabel = wit.id 

986 # Hennig86 format requires taxon names to start with a letter, so if this is not the case, then append "WIT_" to the start of the name: 

987 if taxlabel[0] not in string.ascii_letters: 

988 taxlabel = "WIT_" + taxlabel 

989 # Then replace any disallowed characters in the string with an underscore: 

990 taxlabel = slugify(taxlabel, lowercase=False, separator='_') 

991 taxlabels.append(taxlabel) 

992 max_taxlabel_length = max( 

993 [len(taxlabel) for taxlabel in taxlabels] 

994 ) # keep track of the longest taxon label for tabular alignment purposes 

995 missing_symbol = '?' 

996 symbols = self.get_hennig86_symbols() 

997 # Generate all parent folders for this file that don't already exist: 

998 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

999 with open(file_addr, "w", encoding="ascii") as f: 

1000 # Start with the nstates header: 

1001 f.write("nstates %d;\n" % len(symbols)) 

1002 # Then begin the xread block: 

1003 f.write("xread\n") 

1004 # Write the dimensions: 

1005 f.write("%d %d\n" % (nchar, ntax)) 

1006 # Now write the matrix: 

1007 for i, wit in enumerate(self.witnesses): 

1008 taxlabel = taxlabels[i] 

1009 # Add enough space after this label ensure that all sequences are nicely aligned: 

1010 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel) + 1)) 

1011 for j, vu_id in enumerate(self.variation_unit_ids): 

1012 if vu_id not in substantive_variation_unit_ids_set: 

1013 continue 

1014 rdg_support = self.readings_by_witness[wit.id][j] 

1015 # If this reading is lacunose in this witness, then use the missing character: 

1016 if sum(rdg_support) == 0: 

1017 sequence += missing_symbol 

1018 continue 

1019 rdg_inds = [ 

1020 k for k, w in enumerate(rdg_support) if w > 0 

1021 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1022 # For singleton readings, just print the symbol: 

1023 if len(rdg_inds) == 1: 

1024 sequence += symbols[rdg_inds[0]] 

1025 continue 

1026 # For multiple readings, print the missing symbol: 

1027 sequence += missing_symbol 

1028 f.write("%s\n" % (sequence)) 

1029 f.write(";") 

1030 return 

1031 

1032 def get_phylip_symbols(self): 

1033 """Returns a list of one-character symbols needed to represent the states of all substantive readings in PHYLIP format. 

1034 

1035 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1036 

1037 Returns: 

1038 A list of individual characters representing states in readings. 

1039 """ 

1040 possible_symbols = ( 

1041 list(string.digits) + list(string.ascii_lowercase)[:22] 

1042 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported 

1043 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1044 nsymbols = 0 

1045 # If there are no witnesses, then no symbols are needed at all: 

1046 if len(self.witnesses) == 0: 

1047 return [] 

1048 wit_id = self.witnesses[0].id 

1049 for rdg_support in self.readings_by_witness[wit_id]: 

1050 nsymbols = max(nsymbols, len(rdg_support)) 

1051 phylip_symbols = possible_symbols[:nsymbols] 

1052 return phylip_symbols 

1053 

1054 def to_phylip(self, file_addr: Union[Path, str], drop_constant: bool = False): 

1055 """Writes this Collation to a file in PHYLIP format with the given address. 

1056 Note that because PHYLIP format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data. 

1057 

1058 Args: 

1059 file_addr: A string representing the path to an output file. 

1060 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1061 """ 

1062 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1063 substantive_variation_unit_ids = self.variation_unit_ids 

1064 if drop_constant: 

1065 substantive_variation_unit_ids = [ 

1066 vu_id 

1067 for vu_id in self.variation_unit_ids 

1068 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1069 ] 

1070 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1071 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1072 # Start by calculating the values we will be using here: 

1073 ntax = len(self.witnesses) 

1074 nchar = len(substantive_variation_unit_ids) 

1075 taxlabels = [] 

1076 for wit in self.witnesses: 

1077 taxlabel = wit.id 

1078 # Then replace any disallowed characters in the string with an underscore: 

1079 taxlabel = slugify(taxlabel, lowercase=False, separator='_') 

1080 taxlabels.append(taxlabel) 

1081 max_taxlabel_length = max( 

1082 [len(taxlabel) for taxlabel in taxlabels] 

1083 ) # keep track of the longest taxon label for tabular alignment purposes 

1084 missing_symbol = '?' 

1085 symbols = self.get_phylip_symbols() 

1086 # Generate all parent folders for this file that don't already exist: 

1087 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1088 with open(file_addr, "w", encoding="ascii") as f: 

1089 # Write the dimensions: 

1090 f.write("%d %d\n" % (ntax, nchar)) 

1091 # Now write the matrix: 

1092 for i, wit in enumerate(self.witnesses): 

1093 taxlabel = taxlabels[i] 

1094 # Add enough space after this label ensure that all sequences are nicely aligned: 

1095 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel))) + "\t" 

1096 for j, vu_id in enumerate(self.variation_unit_ids): 

1097 if vu_id not in substantive_variation_unit_ids_set: 

1098 continue 

1099 rdg_support = self.readings_by_witness[wit.id][j] 

1100 # If this reading is lacunose in this witness, then use the missing character: 

1101 if sum(rdg_support) == 0: 

1102 sequence += missing_symbol 

1103 continue 

1104 rdg_inds = [ 

1105 k for k, w in enumerate(rdg_support) if w > 0 

1106 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1107 # For singleton readings, just print the symbol: 

1108 if len(rdg_inds) == 1: 

1109 sequence += symbols[rdg_inds[0]] 

1110 continue 

1111 # For multiple readings, print the missing symbol: 

1112 sequence += missing_symbol 

1113 f.write("%s\n" % (sequence)) 

1114 return 

1115 

1116 def get_fasta_symbols(self): 

1117 """Returns a list of one-character symbols needed to represent the states of all substantive readings in FASTA format. 

1118 

1119 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1120 

1121 Returns: 

1122 A list of individual characters representing states in readings. 

1123 """ 

1124 possible_symbols = ( 

1125 list(string.digits) + list(string.ascii_lowercase)[:22] 

1126 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported 

1127 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1128 nsymbols = 0 

1129 # If there are no witnesses, then no symbols are needed at all: 

1130 if len(self.witnesses) == 0: 

1131 return [] 

1132 wit_id = self.witnesses[0].id 

1133 for rdg_support in self.readings_by_witness[wit_id]: 

1134 nsymbols = max(nsymbols, len(rdg_support)) 

1135 fasta_symbols = possible_symbols[:nsymbols] 

1136 return fasta_symbols 

1137 

1138 def to_fasta(self, file_addr: Union[Path, str], drop_constant: bool = False): 

1139 """Writes this Collation to a file in FASTA format with the given address. 

1140 Note that because FASTA format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data. 

1141 

1142 Args: 

1143 file_addr: A string representing the path to an output file. 

1144 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1145 """ 

1146 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1147 substantive_variation_unit_ids = self.variation_unit_ids 

1148 if drop_constant: 

1149 substantive_variation_unit_ids = [ 

1150 vu_id 

1151 for vu_id in self.variation_unit_ids 

1152 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1153 ] 

1154 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1155 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1156 # Start by calculating the values we will be using here: 

1157 ntax = len(self.witnesses) 

1158 nchar = len(substantive_variation_unit_ids) 

1159 taxlabels = [] 

1160 for wit in self.witnesses: 

1161 taxlabel = wit.id 

1162 # Then replace any disallowed characters in the string with an underscore: 

1163 taxlabel = slugify(taxlabel, lowercase=False, separator='_') 

1164 taxlabels.append(taxlabel) 

1165 max_taxlabel_length = max( 

1166 [len(taxlabel) for taxlabel in taxlabels] 

1167 ) # keep track of the longest taxon label for tabular alignment purposes 

1168 missing_symbol = '?' 

1169 symbols = self.get_fasta_symbols() 

1170 # Generate all parent folders for this file that don't already exist: 

1171 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1172 with open(file_addr, "w", encoding="ascii") as f: 

1173 # Now write the matrix: 

1174 for i, wit in enumerate(self.witnesses): 

1175 taxlabel = taxlabels[i] 

1176 # Add enough space after this label ensure that all sequences are nicely aligned: 

1177 sequence = ">%s\n" % taxlabel 

1178 for j, vu_id in enumerate(self.variation_unit_ids): 

1179 if vu_id not in substantive_variation_unit_ids_set: 

1180 continue 

1181 rdg_support = self.readings_by_witness[wit.id][j] 

1182 # If this reading is lacunose in this witness, then use the missing character: 

1183 if sum(rdg_support) == 0: 

1184 sequence += missing_symbol 

1185 continue 

1186 rdg_inds = [ 

1187 k for k, w in enumerate(rdg_support) if w > 0 

1188 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1189 # For singleton readings, just print the symbol: 

1190 if len(rdg_inds) == 1: 

1191 sequence += symbols[rdg_inds[0]] 

1192 continue 

1193 # For multiple readings, print the missing symbol: 

1194 sequence += missing_symbol 

1195 f.write("%s\n" % (sequence)) 

1196 return 

1197 

1198 def get_beast_symbols(self): 

1199 """Returns a list of one-character symbols needed to represent the states of all substantive readings in BEAST format. 

1200 

1201 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1202 

1203 Returns: 

1204 A list of individual characters representing states in readings. 

1205 """ 

1206 possible_symbols = ( 

1207 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase) 

1208 ) # NOTE: for BEAST, any number of states should theoretically be permissible, but since code maps are required for some reason, we will limit the number of symbols to 62 for now 

1209 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1210 nsymbols = 0 

1211 # If there are no witnesses, then no symbols are needed at all: 

1212 if len(self.witnesses) == 0: 

1213 return [] 

1214 wit_id = self.witnesses[0].id 

1215 for rdg_support in self.readings_by_witness[wit_id]: 

1216 nsymbols = max(nsymbols, len(rdg_support)) 

1217 beast_symbols = possible_symbols[:nsymbols] 

1218 return beast_symbols 

1219 

1220 def get_tip_date_range(self): 

1221 """Gets the minimum and maximum dates attested among the witnesses. 

1222 Also checks if the witness with the latest possible date has a fixed date 

1223 (i.e, if the lower and upper bounds for its date are the same) 

1224 and issues a warning if not, as this will cause unusual behavior in BEAST 2. 

1225 

1226 Returns: 

1227 A tuple containing the earliest and latest possible tip dates. 

1228 """ 

1229 earliest_date = None 

1230 earliest_wit = None 

1231 latest_date = None 

1232 latest_wit = None 

1233 for wit in self.witnesses: 

1234 wit_id = wit.id 

1235 date_range = wit.date_range 

1236 if date_range[0] is not None: 

1237 if earliest_date is not None: 

1238 earliest_wit = wit if date_range[0] < earliest_date else earliest_wit 

1239 earliest_date = min(date_range[0], earliest_date) 

1240 else: 

1241 earliest_wit = wit 

1242 earliest_date = date_range[0] 

1243 if date_range[1] is not None: 

1244 if latest_date is not None: 

1245 latest_wit = wit if date_range[1] > latest_date else latest_wit 

1246 latest_date = max(date_range[1], latest_date) 

1247 else: 

1248 latest_wit = wit 

1249 latest_date = date_range[1] 

1250 if latest_wit.date_range[0] is None or latest_wit.date_range[0] != latest_wit.date_range[1]: 

1251 print( 

1252 "WARNING: the latest witness, %s, has a variable date range; this will result in problems with time-dependent substitution models and misalignment of trees in BEAST DensiTree outputs! Please ensure that witness %s has a fixed date." 

1253 % (latest_wit.id, latest_wit.id) 

1254 ) 

1255 return (earliest_date, latest_date) 

1256 

1257 def get_beast_origin_span(self, tip_date_range): 

1258 """Returns a tuple containing the lower and upper bounds for the height of the origin of the Birth-Death Skyline model. 

1259 The upper bound on the height of the tree is the difference between the latest tip date 

1260 and the lower bound on the date of the original work, if both are defined; 

1261 otherwise, it is left undefined. 

1262 The lower bound on the height of the tree is the difference between the latest tip date 

1263 and the upper bound on the date of the original work, if both are defined; 

1264 otherwise, it is the difference between the earliest tip date and the latest, if both are defined. 

1265 

1266 Args: 

1267 tip_date_range: A tuple containing the earliest and latest possible tip dates. 

1268 

1269 Returns: 

1270 A tuple containing lower and upper bounds on the origin height for the Birth-Death Skyline model. 

1271 """ 

1272 origin_span = [0, None] 

1273 # If the upper bound on the date of the work's composition is defined, then set the lower bound on the height of the origin using it and the latest tip date 

1274 # (note that if it had to be defined in terms of witness date lower bounds, then this would have happened already): 

1275 if self.origin_date_range[1] is not None: 

1276 origin_span[0] = tip_date_range[1] - self.origin_date_range[1] 

1277 # If the lower bound on the date of the work's composition is defined, then set the upper bound on the height of the origin using it and the latest tip date: 

1278 if self.origin_date_range[0] is not None: 

1279 origin_span[1] = tip_date_range[1] - self.origin_date_range[0] 

1280 return tuple(origin_span) 

1281 

1282 def get_beast_date_map(self, taxlabels): 

1283 """Returns a string representing witness-to-date mappings in BEAST format. 

1284 

1285 Since this format requires single dates as opposed to date ranges, 

1286 witnesses with closed date ranges will be mapped to the average of their lower and upper bounds, 

1287 and witnesses with open date ranges will not be mapped. 

1288 

1289 Args: 

1290 taxlabels: A list of slugified taxon labels. 

1291 

1292 Returns: 

1293 A string containing comma-separated date calibrations of the form witness_id=date. 

1294 """ 

1295 calibrate_strings = [] 

1296 for i, wit in enumerate(self.witnesses): 

1297 taxlabel = taxlabels[i] 

1298 date_range = wit.date_range 

1299 # If either end of this witness's date range is empty, then do not include it: 

1300 if date_range[0] is None or date_range[1] is None: 

1301 continue 

1302 # Otherwise, take the midpoint of its date range as its date: 

1303 date = int((date_range[0] + date_range[1]) / 2) 

1304 calibrate_string = "%s=%d" % (taxlabel, date) 

1305 calibrate_strings.append(calibrate_string) 

1306 # Then output the full date map string: 

1307 date_map = ",".join(calibrate_strings) 

1308 return date_map 

1309 

1310 def get_beast_code_map_for_unit(self, symbols, missing_symbol, vu_ind): 

1311 """Returns a string containing state/reading code mappings in BEAST format using the given single-state and missing state symbols for the character/variation unit at the given index. 

1312 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a code for a dummy state will be included. 

1313 

1314 Args: 

1315 vu_ind: An integer index for the desired unit. 

1316 

1317 Returns: 

1318 A string containing comma-separated code mappings. 

1319 """ 

1320 vu = self.variation_units[vu_ind] 

1321 vu_id = vu.id 

1322 code_map = {} 

1323 for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id])): 

1324 code_map[symbols[k]] = str(k) 

1325 # If this site is a singleton site, then add a code mapping for the dummy state: 

1326 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1327 code_map[symbols[1]] = str(1) 

1328 # Then add a mapping for the missing state, including a dummy state if this is a singleton site: 

1329 code_map[missing_symbol] = " ".join( 

1330 str(k) for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id])) 

1331 ) 

1332 # If this site is a singleton site, then add the dummy state to the missing state mapping: 

1333 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1334 code_map[missing_symbol] = code_map[missing_symbol] + " " + str(1) 

1335 # Then combine all of the mappings into a single string: 

1336 code_map_string = ", ".join([code + "=" + code_map[code] for code in code_map]) 

1337 return code_map_string 

1338 

1339 def get_beast_equilibrium_frequencies_for_unit(self, vu_ind): 

1340 """Returns a string containing state/reading equilibrium frequencies in BEAST format for the character/variation unit at the given index. 

1341 Since the equilibrium frequencies are not used with the substitution models, the equilibrium frequencies simply correspond to a uniform distribution over the states. 

1342 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then an equilibrium frequency of 0 will be added for a dummy state. 

1343 

1344 Args: 

1345 vu_ind: An integer index for the desired unit. 

1346 

1347 Returns: 

1348 A string containing space-separated equilibrium frequencies. 

1349 """ 

1350 vu = self.variation_units[vu_ind] 

1351 vu_id = vu.id 

1352 # If this unit is a singleton, then return the string "0.5 0.5": 

1353 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1: 

1354 return "0.5 0.5" 

1355 # Otherwise, set the equilibrium frequencies according to a uniform distribution: 

1356 equilibrium_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len( 

1357 self.substantive_readings_by_variation_unit_id[vu_id] 

1358 ) 

1359 equilibrium_frequencies_string = " ".join([str(w) for w in equilibrium_frequencies]) 

1360 return equilibrium_frequencies_string 

1361 

1362 def get_beast_root_frequencies_for_unit(self, vu_ind): 

1363 """Returns a string containing state/reading root frequencies in BEAST format for the character/variation unit at the given index. 

1364 The root frequencies are calculated from the intrinsic odds at this unit. 

1365 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a root frequency of 0 will be added for a dummy state. 

1366 If no intrinsic odds are specified, then a uniform distribution over all states is assumed. 

1367 

1368 Args: 

1369 vu_ind: An integer index for the desired unit. 

1370 

1371 Returns: 

1372 A string containing space-separated root frequencies. 

1373 """ 

1374 vu = self.variation_units[vu_ind] 

1375 vu_id = vu.id 

1376 intrinsic_relations = vu.intrinsic_relations 

1377 intrinsic_odds_by_id = self.intrinsic_odds_by_id 

1378 # If this unit is a singleton, then return the string "1 0": 

1379 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1: 

1380 return "1 0" 

1381 # If this unit has no intrinsic odds, then assume a uniform distribution over all readings: 

1382 if len(intrinsic_relations) == 0: 

1383 root_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len( 

1384 self.substantive_readings_by_variation_unit_id[vu_id] 

1385 ) 

1386 root_frequencies_string = " ".join([str(w) for w in root_frequencies]) 

1387 return root_frequencies_string 

1388 # We will populate the root frequencies based on the intrinsic odds of the readings: 

1389 root_frequencies_by_id = {} 

1390 for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id]: 

1391 root_frequencies_by_id[rdg_id] = 0 

1392 # First, construct an adjacency list for efficient edge iteration: 

1393 neighbors_by_source = {} 

1394 for edge in intrinsic_relations: 

1395 s = edge[0] 

1396 t = edge[1] 

1397 if s not in neighbors_by_source: 

1398 neighbors_by_source[s] = [] 

1399 if t not in neighbors_by_source: 

1400 neighbors_by_source[t] = [] 

1401 neighbors_by_source[s].append(t) 

1402 # Next, identify all readings that are not targeted by any intrinsic odds relation: 

1403 in_degree_by_reading = {} 

1404 for edge in intrinsic_relations: 

1405 s = edge[0] 

1406 t = edge[1] 

1407 if s not in in_degree_by_reading: 

1408 in_degree_by_reading[s] = 0 

1409 if t not in in_degree_by_reading: 

1410 in_degree_by_reading[t] = 0 

1411 in_degree_by_reading[t] += 1 

1412 starting_nodes = [t for t in in_degree_by_reading if in_degree_by_reading[t] == 0] 

1413 # Set the root frequencies for these readings to 1 (they will be normalized later): 

1414 for starting_node in starting_nodes: 

1415 root_frequencies_by_id[starting_node] = 1.0 

1416 # Next, set the frequencies for the remaining readings recursively using the adjacency list: 

1417 def update_root_frequencies(s): 

1418 for t in neighbors_by_source[s]: 

1419 intrinsic_category = intrinsic_relations[(s, t)] 

1420 odds = ( 

1421 intrinsic_odds_by_id[intrinsic_category] 

1422 if intrinsic_odds_by_id[intrinsic_category] is not None 

1423 else 1.0 

1424 ) # TODO: This needs to be handled using parameters once we have it implemented in BEAST 

1425 root_frequencies_by_id[t] = root_frequencies_by_id[s] / odds 

1426 update_root_frequencies(t) 

1427 return 

1428 

1429 for starting_node in starting_nodes: 

1430 update_root_frequencies(starting_node) 

1431 # Then produce a normalized vector of root frequencies that corresponds to a probability distribution: 

1432 root_frequencies = [ 

1433 root_frequencies_by_id[rdg_id] for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id] 

1434 ] 

1435 total_frequencies = sum(root_frequencies) 

1436 for k in range(len(root_frequencies)): 

1437 root_frequencies[k] = root_frequencies[k] / total_frequencies 

1438 root_frequencies_string = " ".join([str(w) for w in root_frequencies]) 

1439 return root_frequencies_string 

1440 

1441 def to_beast( 

1442 self, 

1443 file_addr: Union[Path, str], 

1444 drop_constant: bool = False, 

1445 clock_model: ClockModel = ClockModel.strict, 

1446 ancestral_logger: AncestralLogger = AncestralLogger.state, 

1447 seed: int = None, 

1448 ): 

1449 """Writes this Collation to a file in BEAST format with the given address. 

1450 

1451 Args: 

1452 file_addr: A string representing the path to an output file. 

1453 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1454 clock_model: A ClockModel option indicating which clock model to use. 

1455 ancestral_logger: An AncestralLogger option indicating which class of logger (if any) to use for ancestral states. 

1456 seed: A seed for random number generation (for setting initial values of unspecified transcriptional rates). 

1457 """ 

1458 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1459 substantive_variation_unit_ids = self.variation_unit_ids 

1460 if drop_constant: 

1461 substantive_variation_unit_ids = [ 

1462 vu_id 

1463 for vu_id in self.variation_unit_ids 

1464 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1465 ] 

1466 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1467 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1468 # First, calculate the values we will be using for the main template: 

1469 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses] 

1470 missing_symbol = '?' 

1471 symbols = self.get_beast_symbols() 

1472 tip_date_range = self.get_tip_date_range() 

1473 origin_span = self.get_beast_origin_span(tip_date_range) 

1474 date_map = self.get_beast_date_map(taxlabels) 

1475 # Then populate the necessary objects for the BEAST XML Jinja template: 

1476 witness_objects = [] 

1477 variation_unit_objects = [] 

1478 intrinsic_category_objects = [] 

1479 transcriptional_category_objects = [] 

1480 # Start with witnesses: 

1481 for i, wit in enumerate(self.witnesses): 

1482 witness_object = {} 

1483 # Copy the ID for this witness: 

1484 witness_object["id"] = wit.id 

1485 # Copy its date bounds: 

1486 witness_object["min_date"] = wit.date_range[0] 

1487 witness_object["max_date"] = wit.date_range[1] 

1488 # Populate its sequence from its entries in the witness's readings dictionary: 

1489 sequence = "" 

1490 for j, rdg_support in enumerate(self.readings_by_witness[wit.id]): 

1491 vu_id = self.variation_unit_ids[j] 

1492 # Skip any variation units deemed non-substantive: 

1493 if vu_id not in substantive_variation_unit_ids: 

1494 continue 

1495 # If this witness has a certainty of 0 for all readings, then it is a gap; assign a likelihood of 1 to each reading: 

1496 if sum(rdg_support) == 0: 

1497 for k, w in enumerate(rdg_support): 

1498 sequence += "1" 

1499 if k < len(rdg_support) - 1: 

1500 sequence += ", " 

1501 else: 

1502 if len(rdg_support) > 1: 

1503 sequence += "; " 

1504 else: 

1505 # If this site is a singleton site, then add a dummy state: 

1506 sequence += ", 0; " 

1507 # Otherwise, read the probabilities as they are given: 

1508 else: 

1509 for k, w in enumerate(rdg_support): 

1510 sequence += str(w) 

1511 if k < len(rdg_support) - 1: 

1512 sequence += ", " 

1513 else: 

1514 if len(rdg_support) > 1: 

1515 sequence += "; " 

1516 else: 

1517 # If this site is a singleton site, then add a dummy state: 

1518 sequence += ", 0; " 

1519 # Strip the final semicolon and space from the sequence: 

1520 sequence = sequence.strip("; ") 

1521 # Then set the witness object's sequence attribute to this string: 

1522 witness_object["sequence"] = sequence 

1523 witness_objects.append(witness_object) 

1524 # Then proceed to variation units: 

1525 for j, vu in enumerate(self.variation_units): 

1526 if vu.id not in substantive_variation_unit_ids_set: 

1527 continue 

1528 variation_unit_object = {} 

1529 # Copy the ID of this variation unit: 

1530 variation_unit_object["id"] = vu.id 

1531 # Copy this variation unit's number of substantive readings, 

1532 # setting it to 2 if it is a singleton unit: 

1533 variation_unit_object["nstates"] = ( 

1534 len(self.substantive_readings_by_variation_unit_id[vu.id]) 

1535 if len(self.substantive_readings_by_variation_unit_id[vu.id]) > 1 

1536 else 2 

1537 ) 

1538 # Then construct the code map for this unit: 

1539 variation_unit_object["code_map"] = self.get_beast_code_map_for_unit(symbols, missing_symbol, j) 

1540 # Then populate a comma-separated string of reading labels for this unit: 

1541 rdg_texts = [] 

1542 vu_label = vu.id 

1543 for rdg in vu.readings: 

1544 key = tuple([vu.id, rdg.id]) 

1545 if key not in substantive_variation_unit_reading_tuples_set: 

1546 continue 

1547 rdg_text = slugify(rdg.text, lowercase=False, allow_unicode=True, separator='_') 

1548 # Replace any empty reading text with an omission marker: 

1549 if rdg_text == "": 

1550 rdg_text = "om." 

1551 rdg_texts.append(rdg_text) 

1552 # If this site is a singleton site, then add a dummy reading for the dummy state: 

1553 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1554 rdg_texts.append("DUMMY") 

1555 rdg_texts_string = ", ".join(rdg_texts) 

1556 variation_unit_object["rdg_texts"] = rdg_texts_string 

1557 # Then populate this unit's equilibrium frequency string and its root frequency string: 

1558 variation_unit_object["equilibrium_frequencies"] = self.get_beast_equilibrium_frequencies_for_unit(j) 

1559 variation_unit_object["root_frequencies"] = self.get_beast_root_frequencies_for_unit(j) 

1560 # Then populate a dictionary mapping epoch height ranges to lists of off-diagonal entries for substitution models: 

1561 rate_objects_by_epoch_height_range = {} 

1562 epoch_height_ranges = [] 

1563 # Then proceed based on whether the transcriptional relations for this variation unit have been defined: 

1564 if len(vu.transcriptional_relations_by_date_range) == 0: 

1565 # If there are no transcriptional relations, then map the epoch range of (None, None) to their list of off-diagonal entries: 

1566 epoch_height_ranges.append((None, None)) 

1567 rate_objects_by_epoch_height_range[(None, None)] = [] 

1568 rate_objects = rate_objects_by_epoch_height_range[(None, None)] 

1569 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1570 # If this is a singleton site, then use an arbitrary 2x2 rate matrix: 

1571 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1572 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1573 else: 

1574 # If this is a site with multiple substantive readings, but no transcriptional relations list, 

1575 # then use a Lewis Mk substitution matrix with the appropriate number of states: 

1576 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1577 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1578 # Skip diagonal elements: 

1579 if k_1 == k_2: 

1580 continue 

1581 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1582 else: 

1583 # Otherwise, proceed for every date range: 

1584 for date_range in vu.transcriptional_relations_by_date_range: 

1585 # Get the map of transcriptional relations for reference later: 

1586 transcriptional_relations = vu.transcriptional_relations_by_date_range[date_range] 

1587 # Now get the epoch height range corresponding to this date range, and initialize its list in the dictionary: 

1588 epoch_height_range = [None, None] 

1589 epoch_height_range[0] = tip_date_range[1] - date_range[1] if date_range[1] is not None else None 

1590 epoch_height_range[1] = tip_date_range[1] - date_range[0] if date_range[0] is not None else None 

1591 epoch_height_range = tuple(epoch_height_range) 

1592 epoch_height_ranges.append(epoch_height_range) 

1593 rate_objects_by_epoch_height_range[epoch_height_range] = [] 

1594 rate_objects = rate_objects_by_epoch_height_range[epoch_height_range] 

1595 # Then proceed for every pair of readings in this unit: 

1596 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1597 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1598 # Skip diagonal elements: 

1599 if k_1 == k_2: 

1600 continue 

1601 # If the first reading has no transcriptional relation to the second in this unit, then use the default rate: 

1602 if (rdg_id_1, rdg_id_2) not in transcriptional_relations: 

1603 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1604 continue 

1605 # Otherwise, if only one category of transcriptional relations holds between the first and second readings, 

1606 # then use its rate: 

1607 if len(transcriptional_relations[(rdg_id_1, rdg_id_2)]) == 1: 

1608 # If there is only one such category, then add its rate as a standalone var element: 

1609 transcriptional_category = list(transcriptional_relations[(rdg_id_1, rdg_id_2)])[0] 

1610 rate_objects.append( 

1611 {"transcriptional_categories": [transcriptional_category], "expression": None} 

1612 ) 

1613 continue 

1614 # If there is more than one, then add a var element that is a sum of the individual categories' rates: 

1615 transcriptional_categories = list(transcriptional_relations[(rdg_id_1, rdg_id_2)]) 

1616 args = [] 

1617 for transcriptional_category in transcriptional_categories: 

1618 args.append("%s_rate" % transcriptional_category) 

1619 args_string = " ".join(args) 

1620 ops = ["+"] * (len(args) - 1) 

1621 ops_string = " ".join(ops) 

1622 expression_string = " ".join([args_string, ops_string]) 

1623 rate_objects.append( 

1624 { 

1625 "transcriptional_categories": transcriptional_categories, 

1626 "expression": expression_string, 

1627 } 

1628 ) 

1629 # Now reorder the list of epoch height ranges, and get a list of non-null epoch dates in ascending order from the dictionary: 

1630 epoch_height_ranges.reverse() 

1631 epoch_heights = [ 

1632 epoch_height_range[0] for epoch_height_range in epoch_height_ranges if epoch_height_range[0] is not None 

1633 ] 

1634 # Then add all of these data structures to the variation unit object: 

1635 variation_unit_object["epoch_heights"] = epoch_heights 

1636 variation_unit_object["epoch_heights_string"] = " ".join( 

1637 [str(epoch_height) for epoch_height in epoch_heights] 

1638 ) 

1639 variation_unit_object["epoch_height_ranges"] = epoch_height_ranges 

1640 variation_unit_object["epoch_rates"] = [ 

1641 rate_objects_by_epoch_height_range[epoch_height_range] for epoch_height_range in epoch_height_ranges 

1642 ] 

1643 variation_unit_objects.append(variation_unit_object) 

1644 # Then proceed to intrinsic odds categories: 

1645 for intrinsic_category in self.intrinsic_categories: 

1646 intrinsic_category_object = {} 

1647 # Copy the ID of this intrinsic category: 

1648 intrinsic_category_object["id"] = intrinsic_category 

1649 # Then copy the odds factors associated with this intrinsic category, 

1650 # setting it to 1.0 if it is not specified and setting the estimate attribute accordingly: 

1651 odds = self.intrinsic_odds_by_id[intrinsic_category] 

1652 intrinsic_category_object["odds"] = odds if odds is not None else 1.0 

1653 intrinsic_category_object["estimate"] = "false" if odds is not None else "true" 

1654 intrinsic_category_objects.append(intrinsic_category_object) 

1655 # Then proceed to transcriptional rate categories: 

1656 rng = np.random.default_rng(seed) 

1657 for transcriptional_category in self.transcriptional_categories: 

1658 transcriptional_category_object = {} 

1659 # Copy the ID of this transcriptional category: 

1660 transcriptional_category_object["id"] = transcriptional_category 

1661 # Then copy the rate of this transcriptional category, 

1662 # setting it to a random number sampled from a Gamma distribution if it is not specified and setting the estimate attribute accordingly: 

1663 rate = self.transcriptional_rates_by_id[transcriptional_category] 

1664 transcriptional_category_object["rate"] = rate if rate is not None else rng.gamma(5.0, 2.0) 

1665 transcriptional_category_object["estimate"] = "false" if rate is not None else "true" 

1666 transcriptional_category_objects.append(transcriptional_category_object) 

1667 # Now render the output XML file using the Jinja template: 

1668 env = Environment(loader=PackageLoader("teiphy", "templates"), autoescape=select_autoescape()) 

1669 template = env.get_template("beast_template.xml") 

1670 rendered = template.render( 

1671 nsymbols=len(symbols), 

1672 date_map=date_map, 

1673 origin_span=origin_span, 

1674 clock_model=clock_model.value, 

1675 clock_rate_categories=2 * len(self.witnesses) - 1, 

1676 ancestral_logger=ancestral_logger.value, 

1677 witnesses=witness_objects, 

1678 variation_units=variation_unit_objects, 

1679 intrinsic_categories=intrinsic_category_objects, 

1680 transcriptional_categories=transcriptional_category_objects, 

1681 ) 

1682 # Generate all parent folders for this file that don't already exist: 

1683 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1684 with open(file_addr, "w", encoding="utf-8") as f: 

1685 f.write(rendered) 

1686 return 

1687 

1688 def to_numpy(self, drop_constant: bool = False, split_missing: bool = True): 

1689 """Returns this Collation in the form of a NumPy array, along with arrays of its row and column labels. 

1690 

1691 Args: 

1692 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1693 split_missing: An optional flag indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings; if False, then missing data is ignored (i.e., all states are 0). Default value is True. 

1694 

1695 Returns: 

1696 A NumPy array with a row for each substantive reading and a column for each witness. 

1697 A list of substantive reading ID strings. 

1698 A list of witness ID strings. 

1699 """ 

1700 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1701 substantive_variation_unit_ids = self.variation_unit_ids 

1702 if drop_constant: 

1703 substantive_variation_unit_ids = [ 

1704 vu_id 

1705 for vu_id in self.variation_unit_ids 

1706 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1707 ] 

1708 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1709 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1710 # Initialize the output array with the appropriate dimensions: 

1711 reading_labels = [] 

1712 for vu in self.variation_units: 

1713 if vu.id not in substantive_variation_unit_ids_set: 

1714 continue 

1715 for rdg in vu.readings: 

1716 key = tuple([vu.id, rdg.id]) 

1717 if key in substantive_variation_unit_reading_tuples_set: 

1718 reading_labels.append(vu.id + ", " + rdg.text) 

1719 witness_labels = [wit.id for wit in self.witnesses] 

1720 matrix = np.zeros((len(reading_labels), len(witness_labels)), dtype=float) 

1721 # Then populate it with the appropriate values: 

1722 col_ind = 0 

1723 for i, wit in enumerate(self.witnesses): 

1724 row_ind = 0 

1725 for j, vu_id in enumerate(self.variation_unit_ids): 

1726 if vu_id not in substantive_variation_unit_ids_set: 

1727 continue 

1728 rdg_support = self.readings_by_witness[wit.id][j] 

1729 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

1730 if sum(rdg_support) == 0: 

1731 if split_missing: 

1732 for i in range(len(rdg_support)): 

1733 matrix[row_ind, col_ind] = 1 / len(rdg_support) 

1734 row_ind += 1 

1735 else: 

1736 row_ind += len(rdg_support) 

1737 # Otherwise, add its coefficients normally: 

1738 else: 

1739 for i in range(len(rdg_support)): 

1740 matrix[row_ind, col_ind] = rdg_support[i] 

1741 row_ind += 1 

1742 col_ind += 1 

1743 return matrix, reading_labels, witness_labels 

1744 

1745 def to_distance_matrix(self, drop_constant: bool = False, proportion: bool = False, show_ext: bool = False): 

1746 """Transforms this Collation into a NumPy distance matrix between witnesses, along with an array of its labels for the witnesses. 

1747 Distances can be computed either as counts of disagreements (the default setting), or as proportions of disagreements over all variation units where both witnesses have singleton readings. 

1748 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of disagreements. 

1749 

1750 Args: 

1751 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1752 Default value is False. 

1753 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

1754 Default value is False. 

1755 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

1756 should include the number of their extant, unambiguous variation units after the number of their disagreements. 

1757 Default value is False. 

1758 

1759 Returns: 

1760 A NumPy distance matrix with a row and column for each witness. 

1761 A list of witness ID strings. 

1762 """ 

1763 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1764 substantive_variation_unit_ids = self.variation_unit_ids 

1765 if drop_constant: 

1766 substantive_variation_unit_ids = [ 

1767 vu_id 

1768 for vu_id in self.variation_unit_ids 

1769 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1770 ] 

1771 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1772 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1773 # Initialize the output array with the appropriate dimensions: 

1774 witness_labels = [wit.id for wit in self.witnesses] 

1775 # The type of the matrix will depend on the input options: 

1776 matrix = None 

1777 if show_ext: 

1778 matrix = np.full( 

1779 (len(witness_labels), len(witness_labels)), "NA", dtype=object 

1780 ) # strings of the form "disagreements/extant" 

1781 elif proportion: 

1782 matrix = np.full( 

1783 (len(witness_labels), len(witness_labels)), 0.0, dtype=float 

1784 ) # floats of the form disagreements/extant 

1785 else: 

1786 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form disagreements 

1787 for i, wit_1 in enumerate(witness_labels): 

1788 for j, wit_2 in enumerate(witness_labels): 

1789 extant_units = 0 

1790 disagreements = 0 

1791 # If either of the cells for this pair of witnesses has been populated already, 

1792 # then just copy the entry from the other side of the diagonal without recalculating: 

1793 if i > j: 

1794 matrix[i, j] = matrix[j, i] 

1795 continue 

1796 # Otherwise, calculate the number of units where both witnesses have unambiguous readings 

1797 # and the number of units where they disagree: 

1798 for k, vu_id in enumerate(self.variation_unit_ids): 

1799 if vu_id not in substantive_variation_unit_ids_set: 

1800 continue 

1801 wit_1_rdg_support = self.readings_by_witness[wit_1][k] 

1802 wit_2_rdg_support = self.readings_by_witness[wit_2][k] 

1803 wit_1_rdg_inds = [l for l, w in enumerate(wit_1_rdg_support) if w > 0] 

1804 wit_2_rdg_inds = [l for l, w in enumerate(wit_2_rdg_support) if w > 0] 

1805 if len(wit_1_rdg_inds) != 1 or len(wit_2_rdg_inds) != 1: 

1806 continue 

1807 extant_units += 1 

1808 if wit_1_rdg_inds[0] != wit_2_rdg_inds[0]: 

1809 disagreements += 1 

1810 cell_entry = None 

1811 if proportion: 

1812 cell_entry = disagreements / max( 

1813 extant_units, 1 

1814 ) # the max in the denominator is to prevent division by 0; the distance entry will be 0 if the two witnesses have no overlap 

1815 else: 

1816 cell_entry = disagreements 

1817 if show_ext: 

1818 cell_entry = str(cell_entry) + "/" + str(extant_units) 

1819 matrix[i, j] = cell_entry 

1820 return matrix, witness_labels 

1821 

1822 def to_similarity_matrix(self, drop_constant: bool = False, proportion: bool = False, show_ext: bool = False): 

1823 """Transforms this Collation into a NumPy similarity matrix between witnesses, along with an array of its labels for the witnesses. 

1824 Similarities can be computed either as counts of agreements (the default setting), or as proportions of agreements over all variation units where both witnesses have singleton readings. 

1825 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of agreements. 

1826 

1827 Args: 

1828 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1829 Default value is False. 

1830 proportion (bool, optional): An optional flag indicating whether or not to calculate similarities as proportions over extant, unambiguous variation units. 

1831 Default value is False. 

1832 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

1833 should include the number of their extant, unambiguous variation units after the number of agreements. 

1834 Default value is False. 

1835 

1836 Returns: 

1837 A NumPy distance matrix with a row and column for each witness. 

1838 A list of witness ID strings. 

1839 """ 

1840 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1841 substantive_variation_unit_ids = self.variation_unit_ids 

1842 if drop_constant: 

1843 substantive_variation_unit_ids = [ 

1844 vu_id 

1845 for vu_id in self.variation_unit_ids 

1846 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1847 ] 

1848 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1849 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1850 # Initialize the output array with the appropriate dimensions: 

1851 witness_labels = [wit.id for wit in self.witnesses] 

1852 # The type of the matrix will depend on the input options: 

1853 matrix = None 

1854 if show_ext: 

1855 matrix = np.full( 

1856 (len(witness_labels), len(witness_labels)), "NA", dtype=object 

1857 ) # strings of the form "agreements/extant" 

1858 elif proportion: 

1859 matrix = np.full( 

1860 (len(witness_labels), len(witness_labels)), 0.0, dtype=float 

1861 ) # floats of the form agreements/extant 

1862 else: 

1863 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form agreements 

1864 for i, wit_1 in enumerate(witness_labels): 

1865 for j, wit_2 in enumerate(witness_labels): 

1866 extant_units = 0 

1867 agreements = 0 

1868 # If either of the cells for this pair of witnesses has been populated already, 

1869 # then just copy the entry from the other side of the diagonal without recalculating: 

1870 if i > j: 

1871 matrix[i, j] = matrix[j, i] 

1872 continue 

1873 # Otherwise, calculate the number of units where both witnesses have unambiguous readings 

1874 # and the number of units where they agree: 

1875 for k, vu_id in enumerate(self.variation_unit_ids): 

1876 if vu_id not in substantive_variation_unit_ids_set: 

1877 continue 

1878 wit_1_rdg_support = self.readings_by_witness[wit_1][k] 

1879 wit_2_rdg_support = self.readings_by_witness[wit_2][k] 

1880 wit_1_rdg_inds = [l for l, w in enumerate(wit_1_rdg_support) if w > 0] 

1881 wit_2_rdg_inds = [l for l, w in enumerate(wit_2_rdg_support) if w > 0] 

1882 if len(wit_1_rdg_inds) != 1 or len(wit_2_rdg_inds) != 1: 

1883 continue 

1884 extant_units += 1 

1885 if wit_1_rdg_inds[0] == wit_2_rdg_inds[0]: 

1886 agreements += 1 

1887 cell_entry = None 

1888 if proportion: 

1889 cell_entry = agreements / max( 

1890 extant_units, 1 

1891 ) # the max in the denominator is to prevent division by 0; the distance entry will be 0 if the two witnesses have no overlap 

1892 else: 

1893 cell_entry = agreements 

1894 if show_ext: 

1895 cell_entry = str(cell_entry) + "/" + str(extant_units) 

1896 matrix[i, j] = cell_entry 

1897 return matrix, witness_labels 

1898 

1899 def to_nexus_table(self, drop_constant: bool = False, ambiguous_as_missing: bool = False): 

1900 """Returns this Collation in the form of a table with rows for taxa, columns for characters, and reading IDs in cells. 

1901 

1902 Args: 

1903 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1904 Default value is False. 

1905 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data. 

1906 Default value is False. 

1907 

1908 Returns: 

1909 A NumPy array with rows for taxa, columns for characters, and reading IDs in cells. 

1910 A list of substantive reading ID strings. 

1911 A list of witness ID strings. 

1912 """ 

1913 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1914 substantive_variation_unit_ids = self.variation_unit_ids 

1915 if drop_constant: 

1916 substantive_variation_unit_ids = [ 

1917 vu_id 

1918 for vu_id in self.variation_unit_ids 

1919 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1920 ] 

1921 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1922 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1923 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary 

1924 # to the readings' IDs: 

1925 reading_ids_by_indices = {} 

1926 for j, vu in enumerate(self.variation_units): 

1927 if vu.id not in substantive_variation_unit_ids_set: 

1928 continue 

1929 k = 0 

1930 for rdg in vu.readings: 

1931 key = tuple([vu.id, rdg.id]) 

1932 if key not in substantive_variation_unit_reading_tuples_set: 

1933 continue 

1934 indices = tuple([j, k]) 

1935 reading_ids_by_indices[indices] = rdg.id 

1936 k += 1 

1937 # Initialize the output array with the appropriate dimensions: 

1938 missing_symbol = '?' 

1939 witness_labels = [wit.id for wit in self.witnesses] 

1940 matrix = np.full( 

1941 (len(witness_labels), len(substantive_variation_unit_ids)), missing_symbol, dtype=object 

1942 ) # use dtype=object because the maximum string length is not known up front 

1943 # Then populate it with the appropriate values: 

1944 row_ind = 0 

1945 for i, wit in enumerate(self.witnesses): 

1946 col_ind = 0 

1947 for j, vu in enumerate(self.variation_units): 

1948 if vu.id not in substantive_variation_unit_ids_set: 

1949 continue 

1950 rdg_support = self.readings_by_witness[wit.id][j] 

1951 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

1952 if sum(rdg_support) == 0: 

1953 matrix[row_ind, col_ind] = missing_symbol 

1954 # Otherwise, add its coefficients normally: 

1955 else: 

1956 rdg_inds = [ 

1957 k for k, w in enumerate(rdg_support) if w > 0 

1958 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1959 # For singleton readings, just print the reading ID: 

1960 if len(rdg_inds) == 1: 

1961 k = rdg_inds[0] 

1962 matrix[row_ind, col_ind] = reading_ids_by_indices[(j, k)] 

1963 # For multiple readings, print the corresponding reading IDs in braces or the missing symbol depending on input settings: 

1964 else: 

1965 if ambiguous_as_missing: 

1966 matrix[row_ind, col_ind] = missing_symbol 

1967 else: 

1968 matrix[row_ind, col_ind] = "{%s}" % " ".join( 

1969 [reading_ids_by_indices[(j, k)] for k in rdg_inds] 

1970 ) 

1971 col_ind += 1 

1972 row_ind += 1 

1973 return matrix, witness_labels, substantive_variation_unit_ids 

1974 

1975 def to_long_table(self, drop_constant: bool = False): 

1976 """Returns this Collation in the form of a long table with columns for taxa, characters, reading indices, and reading values. 

1977 Note that this method treats ambiguous readings as missing data. 

1978 

1979 Args: 

1980 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1981 Default value is False. 

1982 

1983 Returns: 

1984 A NumPy array with columns for taxa, characters, reading indices, and reading values, and rows for each combination of these values in the matrix. 

1985 A list of column label strings. 

1986 """ 

1987 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1988 substantive_variation_unit_ids = self.variation_unit_ids 

1989 if drop_constant: 

1990 substantive_variation_unit_ids = [ 

1991 vu_id 

1992 for vu_id in self.variation_unit_ids 

1993 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1994 ] 

1995 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1996 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1997 # Initialize the outputs: 

1998 column_labels = ["taxon", "character", "state", "value"] 

1999 long_table_list = [] 

2000 # Populate a dictionary mapping (variation unit index, reading index) tuples to reading texts: 

2001 reading_texts_by_indices = {} 

2002 for j, vu in enumerate(self.variation_units): 

2003 if vu.id not in substantive_variation_unit_ids_set: 

2004 continue 

2005 k = 0 

2006 for rdg in vu.readings: 

2007 key = tuple([vu.id, rdg.id]) 

2008 if key not in substantive_variation_unit_reading_tuples_set: 

2009 continue 

2010 indices = tuple([j, k]) 

2011 reading_texts_by_indices[indices] = rdg.text 

2012 k += 1 

2013 # Then populate the output list with the appropriate values: 

2014 witness_labels = [wit.id for wit in self.witnesses] 

2015 missing_symbol = '?' 

2016 for i, wit in enumerate(self.witnesses): 

2017 row_ind = 0 

2018 for j, vu_id in enumerate(self.variation_unit_ids): 

2019 if vu_id not in substantive_variation_unit_ids_set: 

2020 continue 

2021 rdg_support = self.readings_by_witness[wit.id][j] 

2022 # Populate a list of nonzero coefficients for this reading support vector: 

2023 rdg_inds = [k for k, w in enumerate(rdg_support) if w > 0] 

2024 # If this list does not consist of exactly one reading, then treat it as missing data: 

2025 if len(rdg_inds) != 1: 

2026 long_table_list.append([wit.id, vu_id, missing_symbol, missing_symbol]) 

2027 continue 

2028 k = rdg_inds[0] 

2029 rdg_text = reading_texts_by_indices[(j, k)] 

2030 # Replace empty reading texts with the omission placeholder: 

2031 if rdg_text == "": 

2032 rdg_text = "om." 

2033 long_table_list.append([wit.id, vu_id, k, rdg_text]) 

2034 # Then convert the long table entries list to a NumPy array: 

2035 long_table = np.array(long_table_list) 

2036 return long_table, column_labels 

2037 

2038 def to_dataframe( 

2039 self, 

2040 drop_constant: bool = False, 

2041 ambiguous_as_missing: bool = False, 

2042 proportion: bool = False, 

2043 table_type: TableType = TableType.matrix, 

2044 split_missing: bool = True, 

2045 show_ext: bool = False, 

2046 ): 

2047 """Returns this Collation in the form of a Pandas DataFrame array, including the appropriate row and column labels. 

2048 

2049 Args: 

2050 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2051 Default value is False. 

2052 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data. 

2053 Default value is False. 

2054 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2055 Default value is False. 

2056 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate. 

2057 Only applicable for tabular outputs. 

2058 Default value is "matrix". 

2059 split_missing: An optional flag indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings; 

2060 if False, then missing data is ignored (i.e., all states are 0). 

2061 Default value is True. 

2062 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2063 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2064 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2065 Default value is False. 

2066 

2067 Returns: 

2068 A Pandas DataFrame corresponding to a collation matrix with reading frequencies or a long table with discrete reading states. 

2069 """ 

2070 df = None 

2071 # Proceed based on the table type: 

2072 if table_type == TableType.matrix: 

2073 # Convert the collation to a NumPy array and get its row and column labels first: 

2074 matrix, reading_labels, witness_labels = self.to_numpy( 

2075 drop_constant=drop_constant, split_missing=split_missing 

2076 ) 

2077 df = pd.DataFrame(matrix, index=reading_labels, columns=witness_labels) 

2078 elif table_type == TableType.distance: 

2079 # Convert the collation to a NumPy array and get its row and column labels first: 

2080 matrix, witness_labels = self.to_distance_matrix( 

2081 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2082 ) 

2083 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2084 elif table_type == TableType.similarity: 

2085 # Convert the collation to a NumPy array and get its row and column labels first: 

2086 matrix, witness_labels = self.to_similarity_matrix( 

2087 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2088 ) 

2089 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2090 elif table_type == TableType.nexus: 

2091 # Convert the collation to a NumPy array and get its row and column labels first: 

2092 matrix, witness_labels, vu_labels = self.to_nexus_table( 

2093 drop_constant=drop_constant, ambiguous_as_missing=ambiguous_as_missing 

2094 ) 

2095 df = pd.DataFrame(matrix, index=witness_labels, columns=vu_labels) 

2096 elif table_type == TableType.long: 

2097 # Convert the collation to a long table and get its column labels first: 

2098 long_table, column_labels = self.to_long_table(drop_constant=drop_constant) 

2099 df = pd.DataFrame(long_table, columns=column_labels) 

2100 return df 

2101 

2102 def to_csv( 

2103 self, 

2104 file_addr: Union[Path, str], 

2105 drop_constant: bool = False, 

2106 ambiguous_as_missing: bool = False, 

2107 proportion: bool = False, 

2108 table_type: TableType = TableType.matrix, 

2109 split_missing: bool = True, 

2110 show_ext: bool = False, 

2111 **kwargs 

2112 ): 

2113 """Writes this Collation to a comma-separated value (CSV) file with the given address. 

2114 

2115 If your witness IDs are numeric (e.g., Gregory-Aland numbers), then they will be written in full to the CSV file, but Excel will likely interpret them as numbers and truncate any leading zeroes! 

2116 

2117 Args: 

2118 file_addr: A string representing the path to an output CSV file; the file type should be .csv. 

2119 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2120 Default value is False. 

2121 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data. 

2122 Default value is False. 

2123 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2124 Default value is False. 

2125 table_type: A TableType option indicating which type of tabular output to generate. 

2126 Only applicable for tabular outputs. 

2127 Default value is "matrix". 

2128 split_missing: An optional flag indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings; 

2129 if False, then missing data is ignored (i.e., all states are 0). 

2130 Default value is True. 

2131 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2132 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2133 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2134 Default value is False. 

2135 **kwargs: Keyword arguments for pandas.DataFrame.to_csv. 

2136 """ 

2137 # Convert the collation to a Pandas DataFrame first: 

2138 df = self.to_dataframe( 

2139 drop_constant=drop_constant, 

2140 ambiguous_as_missing=ambiguous_as_missing, 

2141 proportion=proportion, 

2142 table_type=table_type, 

2143 split_missing=split_missing, 

2144 show_ext=show_ext, 

2145 ) 

2146 # Generate all parent folders for this file that don't already exist: 

2147 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

2148 # Proceed based on the table type: 

2149 if table_type == TableType.long: 

2150 return df.to_csv( 

2151 file_addr, encoding="utf-8-sig", index=False, **kwargs 

2152 ) # add BOM to start of file so that Excel will know to read it as Unicode 

2153 return df.to_csv( 

2154 file_addr, encoding="utf-8-sig", **kwargs 

2155 ) # add BOM to start of file so that Excel will know to read it as Unicode 

2156 

2157 def to_excel( 

2158 self, 

2159 file_addr: Union[Path, str], 

2160 drop_constant: bool = False, 

2161 ambiguous_as_missing: bool = False, 

2162 proportion: bool = False, 

2163 table_type: TableType = TableType.matrix, 

2164 split_missing: bool = True, 

2165 show_ext: bool = False, 

2166 ): 

2167 """Writes this Collation to an Excel (.xlsx) file with the given address. 

2168 

2169 Since Pandas is deprecating its support for xlwt, specifying an output in old Excel (.xls) output is not recommended. 

2170 

2171 Args: 

2172 file_addr: A string representing the path to an output Excel file; the file type should be .xlsx. 

2173 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2174 Default value is False. 

2175 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data. 

2176 Default value is False. 

2177 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2178 Default value is False. 

2179 table_type: A TableType option indicating which type of tabular output to generate. 

2180 Only applicable for tabular outputs. 

2181 Default value is "matrix". 

2182 split_missing: An optional flag indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings; 

2183 if False, then missing data is ignored (i.e., all states are 0). 

2184 Default value is True. 

2185 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2186 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2187 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2188 Default value is False. 

2189 """ 

2190 # Convert the collation to a Pandas DataFrame first: 

2191 df = self.to_dataframe( 

2192 drop_constant=drop_constant, 

2193 ambiguous_as_missing=ambiguous_as_missing, 

2194 proportion=proportion, 

2195 table_type=table_type, 

2196 split_missing=split_missing, 

2197 show_ext=show_ext, 

2198 ) 

2199 # Generate all parent folders for this file that don't already exist: 

2200 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

2201 # Proceed based on the table type: 

2202 if table_type == TableType.long: 

2203 return df.to_excel(file_addr, index=False) 

2204 return df.to_excel(file_addr) 

2205 

2206 def to_phylip_matrix( 

2207 self, 

2208 file_addr: Union[Path, str], 

2209 drop_constant: bool = False, 

2210 proportion: bool = False, 

2211 table_type: TableType = TableType.distance, 

2212 show_ext: bool = False, 

2213 ): 

2214 """Writes this Collation as a PHYLIP-formatted distance/similarity matrix to the file with the given address. 

2215 

2216 Args: 

2217 file_addr: A string representing the path to an output PHYLIP file; the file type should be .ph or .phy. 

2218 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2219 Default value is False. 

2220 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2221 Default value is False. 

2222 table_type: A TableType option indicating which type of tabular output to generate. 

2223 For PHYLIP-formatted outputs, distance and similarity matrices are the only supported table types. 

2224 Default value is "distance". 

2225 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2226 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2227 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2228 Default value is False. 

2229 """ 

2230 # Convert the collation to a Pandas DataFrame first: 

2231 matrix = None 

2232 witness_labels = [] 

2233 # Proceed based on the table type: 

2234 if table_type == TableType.distance: 

2235 # Convert the collation to a NumPy array and get its row and column labels first: 

2236 matrix, witness_labels = self.to_distance_matrix( 

2237 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2238 ) 

2239 elif table_type == TableType.similarity: 

2240 # Convert the collation to a NumPy array and get its row and column labels first: 

2241 matrix, witness_labels = self.to_similarity_matrix( 

2242 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2243 ) 

2244 # Generate all parent folders for this file that don't already exist: 

2245 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

2246 with open(file_addr, "w", encoding="utf-8") as f: 

2247 # The first line contains the number of taxa: 

2248 f.write("%d\n" % len(witness_labels)) 

2249 # Every subsequent line contains a witness label, followed by the values in its row of the matrix: 

2250 for i, wit_id in enumerate(witness_labels): 

2251 wit_label = slugify(wit_id, lowercase=False, allow_unicode=True, separator='_') 

2252 f.write("%s %s\n" % (wit_label, " ".join([str(v) for v in matrix[i]]))) 

2253 return 

2254 

2255 def get_stemma_symbols(self): 

2256 """Returns a list of one-character symbols needed to represent the states of all substantive readings in STEMMA format. 

2257 

2258 The number of symbols equals the maximum number of substantive readings at any variation unit. 

2259 

2260 Returns: 

2261 A list of individual characters representing states in readings. 

2262 """ 

2263 possible_symbols = ( 

2264 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase) 

2265 ) # NOTE: the maximum number of symbols allowed in STEMMA format (other than "?" and "-") is 62 

2266 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

2267 nsymbols = 0 

2268 # If there are no witnesses, then no symbols are needed at all: 

2269 if len(self.witnesses) == 0: 

2270 return [] 

2271 wit_id = self.witnesses[0].id 

2272 for rdg_support in self.readings_by_witness[wit_id]: 

2273 nsymbols = max(nsymbols, len(rdg_support)) 

2274 stemma_symbols = possible_symbols[:nsymbols] 

2275 return stemma_symbols 

2276 

2277 def to_stemma(self, file_addr: Union[Path, str]): 

2278 """Writes this Collation to a STEMMA file without an extension and a Chron file (containing low, middle, and high dates for all witnesses) without an extension. 

2279 

2280 Since this format does not support ambiguous states, all reading vectors with anything other than one nonzero entry will be interpreted as lacunose. 

2281 

2282 Args: 

2283 file_addr: A string representing the path to an output STEMMA prep file; the file should have no extension. 

2284 The accompanying chron file will match this file name, except that it will have "_chron" appended to the end. 

2285 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2286 """ 

2287 # Populate a list of sites that will correspond to columns of the sequence alignment 

2288 # (by default, constant sites are dropped): 

2289 substantive_variation_unit_ids = [ 

2290 vu_id for vu_id in self.variation_unit_ids if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2291 ] 

2292 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2293 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2294 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary 

2295 # to the readings' texts: 

2296 reading_texts_by_indices = {} 

2297 for j, vu in enumerate(self.variation_units): 

2298 if vu.id not in substantive_variation_unit_ids_set: 

2299 continue 

2300 k = 0 

2301 for rdg in vu.readings: 

2302 key = tuple([vu.id, rdg.id]) 

2303 if key not in substantive_variation_unit_reading_tuples_set: 

2304 continue 

2305 indices = tuple([j, k]) 

2306 reading_texts_by_indices[indices] = rdg.text 

2307 k += 1 

2308 # In a second pass, populate another dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary 

2309 # to the witnesses exclusively supporting those readings: 

2310 reading_wits_by_indices = {} 

2311 for indices in reading_texts_by_indices: 

2312 reading_wits_by_indices[indices] = [] 

2313 for i, wit in enumerate(self.witnesses): 

2314 for j, vu_id in enumerate(self.variation_unit_ids): 

2315 if vu_id not in substantive_variation_unit_ids_set: 

2316 continue 

2317 rdg_support = self.readings_by_witness[wit.id][j] 

2318 # If this witness does not exclusively support exactly one reading at this unit, then treat it as lacunose: 

2319 if len([k for k, w in enumerate(rdg_support) if w > 0]) != 1: 

2320 continue 

2321 k = rdg_support.index(1) 

2322 indices = tuple([j, k]) 

2323 reading_wits_by_indices[indices].append(wit.id) 

2324 # In a third pass, write to the STEMMA file: 

2325 symbols = self.get_stemma_symbols() 

2326 Path(file_addr).parent.mkdir( 

2327 parents=True, exist_ok=True 

2328 ) # generate all parent folders for this file that don't already exist 

2329 chron_file_addr = str(file_addr) + "_chron" 

2330 with open(file_addr, "w", encoding="utf-8") as f: 

2331 # Start with the witness list: 

2332 f.write( 

2333 "* %s ;\n\n" 

2334 % " ".join( 

2335 [slugify(wit.id, lowercase=False, allow_unicode=True, separator='_') for wit in self.witnesses] 

2336 ) 

2337 ) 

2338 # f.write("^ %s\n\n" % chron_file_addr) #write the relative path to the chron file 

2339 f.write( 

2340 "^ %s\n\n" % ("." + os.sep + Path(chron_file_addr).name) 

2341 ) # write the relative path to the chron file 

2342 # Then add a line indicating that all witnesses are lacunose unless they are specified explicitly: 

2343 f.write("= $? $* ;\n\n") 

2344 # Then proceed for each variation unit: 

2345 for j, vu_id in enumerate(self.variation_unit_ids): 

2346 if vu_id not in substantive_variation_unit_ids_set: 

2347 continue 

2348 # Print the variation unit ID first: 

2349 f.write("@ %s\n" % vu_id) 

2350 # In a first pass, print the texts of all readings enclosed in brackets: 

2351 f.write("[ ") 

2352 k = 0 

2353 while True: 

2354 indices = tuple([j, k]) 

2355 if indices not in reading_texts_by_indices: 

2356 break 

2357 text = slugify( 

2358 reading_texts_by_indices[indices], lowercase=False, allow_unicode=True, separator='.' 

2359 ) 

2360 # Denote omissions by en-dashes: 

2361 if text == "": 

2362 text = "\u2013" 

2363 # The first reading should not be preceded by anything: 

2364 if k == 0: 

2365 f.write(text) 

2366 f.write(" |") 

2367 # Every subsequent reading should be preceded by a space: 

2368 elif k > 0: 

2369 f.write(" %s" % text) 

2370 k += 1 

2371 f.write(" ]\n") 

2372 # In a second pass, print the indices and witnesses for all readings enclosed in angle brackets: 

2373 k = 0 

2374 f.write("\t< ") 

2375 while True: 

2376 indices = tuple([j, k]) 

2377 if indices not in reading_wits_by_indices: 

2378 break 

2379 rdg_symbol = symbols[k] # get the one-character alphanumeric code for this state 

2380 wits = " ".join(reading_wits_by_indices[indices]) 

2381 # Open the variant reading support block with an angle bracket: 

2382 if k == 0: 

2383 f.write("%s %s" % (rdg_symbol, wits)) 

2384 # Open all subsequent variant reading support blocks with pipes on the next line: 

2385 else: 

2386 f.write("\n\t| %s %s" % (rdg_symbol, wits)) 

2387 k += 1 

2388 f.write(" >\n") 

2389 # In a fourth pass, write to the chron file: 

2390 max_id_length = max( 

2391 [len(slugify(wit.id, lowercase=False, allow_unicode=True, separator='_')) for wit in self.witnesses] 

2392 ) 

2393 max_date_length = 0 

2394 for wit in self.witnesses: 

2395 if wit.date_range[0] is not None: 

2396 max_date_length = max(max_date_length, len(str(wit.date_range[0]))) 

2397 if wit.date_range[1] is not None: 

2398 max_date_length = max(max_date_length, len(str(wit.date_range[1]))) 

2399 # Attempt to get the minimum and maximum dates for witnesses; if we can't do this, then don't write a chron file: 

2400 min_date = None 

2401 max_date = None 

2402 try: 

2403 min_date = min([wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None]) 

2404 max_date = max([wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None]) 

2405 except Exception as e: 

2406 print("WARNING: no witnesses have date ranges; no chron file will be written!") 

2407 return 

2408 with open(chron_file_addr, "w", encoding="utf-8") as f: 

2409 for wit in self.witnesses: 

2410 wit_label = slugify(wit.id, lowercase=False, allow_unicode=True, separator='_') 

2411 f.write(wit_label) 

2412 f.write(" " * (max_id_length - len(wit.id) + 1)) 

2413 # If either the lower bound on this witness's date is empty, then use the min and max dates over all witnesses as defaults: 

2414 date_range = wit.date_range 

2415 if date_range[0] is None: 

2416 date_range = tuple([min_date, date_range[1]]) 

2417 # Then write the date range minimum, average, and maximum to the chron file: 

2418 low_date = str(date_range[0]) 

2419 f.write(" " * (max_date_length - len(low_date) + 2)) 

2420 f.write(low_date) 

2421 avg_date = str(int(((date_range[0] + date_range[1]) / 2))) 

2422 f.write(" " * (max_date_length - len(str(avg_date)) + 2)) 

2423 f.write(avg_date) 

2424 high_date = str(date_range[1]) 

2425 f.write(" " * (max_date_length - len(high_date) + 2)) 

2426 f.write(high_date) 

2427 f.write("\n") 

2428 return 

2429 

2430 def to_file( 

2431 self, 

2432 file_addr: Union[Path, str], 

2433 format: Format = None, 

2434 drop_constant: bool = False, 

2435 split_missing: bool = True, 

2436 char_state_labels: bool = True, 

2437 frequency: bool = False, 

2438 ambiguous_as_missing: bool = False, 

2439 proportion: bool = False, 

2440 calibrate_dates: bool = False, 

2441 mrbayes: bool = False, 

2442 clock_model: ClockModel = ClockModel.strict, 

2443 ancestral_logger: AncestralLogger = AncestralLogger.state, 

2444 table_type: TableType = TableType.matrix, 

2445 show_ext: bool = False, 

2446 seed: int = None, 

2447 ): 

2448 """Writes this Collation to the file with the given address. 

2449 

2450 Args: 

2451 file_addr (Union[Path, str]): The path to the output file. 

2452 format (Format, optional): The desired output format. 

2453 If None then it is infered from the file suffix. 

2454 Defaults to None. 

2455 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2456 Default value is False. 

2457 split_missing (bool, optional): An optional flag indicating whether to treat 

2458 missing characters/variation units as having a contribution of 1 split over all states/readings; 

2459 if False, then missing data is ignored (i.e., all states are 0). 

2460 Not applicable for NEXUS, HENNIG86, PHYLIP, FASTA, or STEMMA format. 

2461 Default value is True. 

2462 char_state_labels (bool, optional): An optional flag indicating whether to print 

2463 the CharStateLabels block in NEXUS output. 

2464 Default value is True. 

2465 frequency (bool, optional): An optional flag indicating whether to use the StatesFormat=Frequency setting 

2466 instead of the StatesFormat=StatesPresent setting 

2467 (and thus represent all states with frequency vectors rather than symbols) 

2468 in NEXUS output. 

2469 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation. 

2470 Default value is False. 

2471 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data. 

2472 If this flag is set, then only base symbols will be generated for the NEXUS file. 

2473 It is only applied if the frequency option is False. 

2474 Default value is False. 

2475 proportion (bool, optional): An optional flag indicating whether to populate a distance matrix's cells 

2476 with a proportion of disagreements to variation units where both witnesses are extant. 

2477 It is only applied if the table_type option is "distance". 

2478 Default value is False. 

2479 calibrate_dates (bool, optional): An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses 

2480 in NEXUS output. 

2481 This option is intended for inputs to BEAST 2. 

2482 Default value is False. 

2483 mrbayes (bool, optional): An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses 

2484 in NEXUS output. 

2485 This option is intended for inputs to MrBayes. 

2486 Default value is False. 

2487 clock_model (ClockModel, optional): A ClockModel option indicating which type of clock model to use. 

2488 This option is intended for inputs to MrBayes and BEAST 2. 

2489 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified. 

2490 Default value is "strict". 

2491 ancestral_logger (AncestralLogger, optional): An AncestralLogger option indicating which class of logger (if any) to use for ancestral states. 

2492 This option is intended for inputs to BEAST 2. 

2493 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate. 

2494 Only applicable for tabular outputs and PHYLIP outputs. 

2495 If the output is a PHYLIP file, then the type of tabular output must be "distance" or "similarity"; otherwise, it will be ignored. 

2496 Default value is "matrix". 

2497 show_ext (bool, optional): An optional flag indicating whether each cell in a distance or similarity matrix 

2498 should include the number of variation units where both witnesses are extant after the number of their disagreements/agreements. 

2499 Only applicable for tabular output formats of type "distance" or "similarity". 

2500 Default value is False. 

2501 seed (optional, int): A seed for random number generation (for setting initial values of unspecified transcriptional rates in BEAST 2 XML output). 

2502 """ 

2503 file_addr = Path(file_addr) 

2504 format = format or Format.infer( 

2505 file_addr.suffix 

2506 ) # an exception will be raised here if the format or suffix is invalid 

2507 

2508 if format == Format.NEXUS: 

2509 return self.to_nexus( 

2510 file_addr, 

2511 drop_constant=drop_constant, 

2512 char_state_labels=char_state_labels, 

2513 frequency=frequency, 

2514 ambiguous_as_missing=ambiguous_as_missing, 

2515 calibrate_dates=calibrate_dates, 

2516 mrbayes=mrbayes, 

2517 clock_model=clock_model, 

2518 ) 

2519 

2520 if format == format.HENNIG86: 

2521 return self.to_hennig86(file_addr, drop_constant=drop_constant) 

2522 

2523 if format == format.PHYLIP: 

2524 if table_type in [TableType.distance, TableType.similarity]: 

2525 return self.to_phylip_matrix( 

2526 file_addr, 

2527 drop_constant=drop_constant, 

2528 proportion=proportion, 

2529 table_type=table_type, 

2530 show_ext=show_ext, 

2531 ) 

2532 return self.to_phylip(file_addr, drop_constant=drop_constant) 

2533 

2534 if format == format.FASTA: 

2535 return self.to_fasta(file_addr, drop_constant=drop_constant) 

2536 

2537 if format == format.BEAST: 

2538 return self.to_beast( 

2539 file_addr, 

2540 drop_constant=drop_constant, 

2541 clock_model=clock_model, 

2542 ancestral_logger=ancestral_logger, 

2543 seed=seed, 

2544 ) 

2545 

2546 if format == Format.CSV: 

2547 return self.to_csv( 

2548 file_addr, 

2549 drop_constant=drop_constant, 

2550 ambiguous_as_missing=ambiguous_as_missing, 

2551 proportion=proportion, 

2552 table_type=table_type, 

2553 split_missing=split_missing, 

2554 show_ext=show_ext, 

2555 ) 

2556 

2557 if format == Format.TSV: 

2558 return self.to_csv( 

2559 file_addr, 

2560 drop_constant=drop_constant, 

2561 ambiguous_as_missing=ambiguous_as_missing, 

2562 proportion=proportion, 

2563 table_type=table_type, 

2564 split_missing=split_missing, 

2565 show_ext=show_ext, 

2566 sep="\t", 

2567 ) 

2568 

2569 if format == Format.EXCEL: 

2570 return self.to_excel( 

2571 file_addr, 

2572 drop_constant=drop_constant, 

2573 ambiguous_as_missing=ambiguous_as_missing, 

2574 proportion=proportion, 

2575 table_type=table_type, 

2576 split_missing=split_missing, 

2577 show_ext=show_ext, 

2578 ) 

2579 

2580 if format == Format.STEMMA: 

2581 return self.to_stemma(file_addr)