Coverage for teiphy/collation.py: 100.00%

1728 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2026-04-09 05:29 +0000

1#!/usr/bin/env python3 

2 

3from enum import Enum 

4from typing import List, Union 

5import os 

6from pathlib import Path 

7from datetime import datetime # for calculating the current year (for dating and tree height purposes) 

8import math # for special functions 

9import time # to time calculations for users 

10import string # for easy retrieval of character ranges 

11from lxml import etree as et # for reading TEI XML inputs 

12import numpy as np # for random number sampling and collation matrix outputs 

13import pandas as pd # for writing to DataFrames, CSV, Excel, etc. 

14from slugify import slugify # for converting Unicode text from readings to ASCII for NEXUS 

15from jinja2 import Environment, PackageLoader, select_autoescape # for filling output XML templates 

16from tqdm import tqdm # for progress bars 

17 

18from .common import xml_ns, tei_ns 

19from .format import Format 

20from .witness import Witness 

21from .variation_unit import VariationUnit 

22 

23 

24class ParsingException(Exception): 

25 pass 

26 

27 

28class WitnessDateException(Exception): 

29 pass 

30 

31 

32class IntrinsicRelationsException(Exception): 

33 pass 

34 

35 

36class ClockModel(str, Enum): 

37 strict = "strict" 

38 uncorrelated = "uncorrelated" 

39 local = "local" 

40 

41 

42class AncestralLogger(str, Enum): 

43 state = "state" 

44 sequence = "sequence" 

45 none = "none" 

46 

47 

48class TableType(str, Enum): 

49 matrix = "matrix" 

50 distance = "distance" 

51 similarity = "similarity" 

52 idf = "idf" 

53 mean_idf = "mean-idf" 

54 mi = "mi" 

55 mean_mi = "mean-mi" 

56 nexus = "nexus" 

57 long = "long" 

58 

59 

60class SplitMissingType(str, Enum): 

61 uniform = "uniform" 

62 proportional = "proportional" 

63 

64 

65class Collation: 

66 """Base class for storing TEI XML collation data internally. 

67 

68 This corresponds to the entire XML tree, rooted at the TEI element of the collation. 

69 

70 Attributes: 

71 manuscript_suffixes: A list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses. 

72 trivial_reading_types: A set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading. 

73 missing_reading_types: A set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data. 

74 fill_corrector_lacunae: A boolean flag indicating whether or not to fill "lacunae" in witnesses with type "corrector". 

75 fragmentary_threshold: A float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation. 

76 witnesses: A list of Witness instances contained in this Collation. 

77 witness_index_by_id: A dictionary mapping base witness ID strings to their int indices in the witnesses list. 

78 variation_units: A list of VariationUnit instances contained in this Collation. 

79 readings_by_witness: A dictionary mapping base witness ID strings to lists of reading support coefficients for all units (with at least two substantive readings). 

80 substantive_variation_unit_ids: A list of ID strings for variation units with two or more substantive readings. 

81 substantive_variation_unit_reading_tuples: A list of (variation unit ID, reading ID) tuples for substantive readings. 

82 verbose: A boolean flag indicating whether or not to print timing and debugging details for the user. 

83 """ 

84 

85 def __init__( 

86 self, 

87 xml: et.ElementTree, 

88 manuscript_suffixes: List[str] = [], 

89 trivial_reading_types: List[str] = [], 

90 missing_reading_types: List[str] = [], 

91 fill_corrector_lacunae: bool = False, 

92 fragmentary_threshold: float = None, 

93 dates_file: Union[Path, str] = None, 

94 verbose: bool = False, 

95 ): 

96 """Constructs a new Collation instance with the given settings. 

97 

98 Args: 

99 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

100 manuscript_suffixes: An optional list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses. 

101 trivial_reading_types: An optional set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading. 

102 missing_reading_types: An optional set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data. 

103 fill_corrector_lacunae: An optional flag indicating whether or not to fill "lacunae" in witnesses with type "corrector". 

104 fragmentary_threshold: An optional float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation. 

105 dates_file: An optional path to a CSV file containing witness IDs, minimum dates, and maximum dates. If specified, then for all witnesses in the first column, any existing date ranges for them in the TEI XML collation will be ignored. 

106 verbose: An optional flag indicating whether or not to print timing and debugging details for the user. 

107 """ 

108 self.manuscript_suffixes = manuscript_suffixes 

109 self.trivial_reading_types = set(trivial_reading_types) 

110 self.missing_reading_types = set(missing_reading_types) 

111 self.fill_corrector_lacunae = fill_corrector_lacunae 

112 self.fragmentary_threshold = fragmentary_threshold 

113 self.verbose = verbose 

114 self.witnesses = [] 

115 self.witness_index_by_id = {} 

116 self.variation_units = [] 

117 self.readings_by_witness = {} 

118 self.variation_unit_ids = [] 

119 self.substantive_variation_unit_reading_tuples = [] 

120 self.substantive_readings_by_variation_unit_id = {} 

121 self.weight_categories = [] 

122 self.weights_by_id = {} 

123 self.intrinsic_categories = [] 

124 self.intrinsic_odds_by_id = {} 

125 self.transcriptional_categories = [] 

126 self.transcriptional_rates_by_id = {} 

127 self.origin_date_range = [] 

128 # Now parse the XML tree to populate these data structures: 

129 if self.verbose: 

130 print("Initializing collation...") 

131 t0 = time.time() 

132 self.parse_origin_date_range(xml) 

133 self.parse_list_wit(xml) 

134 self.validate_wits(xml) 

135 # If a dates file was specified, then update the witness date ranges manually: 

136 if dates_file is not None: 

137 self.update_witness_date_ranges_from_dates_file(dates_file) 

138 # If the upper bound on a work's date of origin is not defined, then attempt to assign it an upper bound based on the witness dates; 

139 # otherwise, attempt to assign lower bounds to witness dates based on it: 

140 if self.origin_date_range[1] is None: 

141 self.update_origin_date_range_from_witness_date_ranges() 

142 else: 

143 self.update_witness_date_ranges_from_origin_date_range() 

144 self.parse_weights(xml) 

145 self.parse_intrinsic_odds(xml) 

146 self.parse_transcriptional_rates(xml) 

147 self.parse_apps(xml) 

148 self.validate_intrinsic_relations() 

149 self.parse_readings_by_witness() 

150 # If a threshold of readings for fragmentary witnesses is specified, then filter the witness list using the dictionary mapping witness IDs to readings: 

151 if self.fragmentary_threshold is not None: 

152 self.filter_fragmentary_witnesses(xml) 

153 t1 = time.time() 

154 if self.verbose: 

155 print("Total time to initialize collation: %0.4fs." % (t1 - t0)) 

156 

157 def parse_origin_date_range(self, xml: et.ElementTree): 

158 """Given an XML tree for a collation, populates this Collation's list of origin date bounds. 

159 

160 Args: 

161 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

162 """ 

163 if self.verbose: 

164 print("Parsing origin date range...") 

165 t0 = time.time() 

166 self.origin_date_range = [None, None] 

167 for date in xml.xpath( 

168 "//tei:sourceDesc//tei:bibl//tei:date|//tei:sourceDesc//tei:biblStruct//tei:date|//tei:sourceDesc//tei:biblFull//tei:date", 

169 namespaces={"tei": tei_ns}, 

170 ): 

171 # Try the @when attribute first; if it is set, then it accounts for both ends of the date range: 

172 if date.get("when") is not None: 

173 self.origin_date_range[0] = int(date.get("when").split("-")[0]) 

174 self.origin_date_range[1] = self.origin_date_range[0] 

175 # Failing that, if it has @from and @to attributes (indicating the period over which the work was completed), 

176 # then the completion date of the work accounts for both ends of the date range: 

177 elif date.get("to") is not None: 

178 self.origin_date_range[0] = int(date.get("to").split("-")[0]) 

179 self.origin_date_range[1] = self.origin_date_range[0] 

180 # Failing that, set lower and upper bounds on the origin date using the the @notBefore and @notAfter attributes: 

181 elif date.get("notBefore") is not None or date.get("notAfter") is not None: 

182 if date.get("notBefore") is not None: 

183 self.origin_date_range[0] = int(date.get("notBefore").split("-")[0]) 

184 if date.get("notAfter") is not None: 

185 self.origin_date_range[1] = int(date.get("notAfter").split("-")[0]) 

186 return 

187 

188 def get_base_wit(self, wit: str): 

189 """Given a witness siglum, strips of the specified manuscript suffixes until the siglum matches one in the witness list or until no more suffixes can be stripped. 

190 

191 Args: 

192 wit: A string representing a witness siglum, potentially including suffixes to be stripped. 

193 """ 

194 base_wit = wit 

195 # If our starting siglum corresponds to a siglum in the witness list, then just return it: 

196 if base_wit in self.witness_index_by_id: 

197 return base_wit 

198 # Otherwise, strip any suffixes we find until the siglum corresponds to a base witness in the list 

199 # or no more suffixes can be stripped: 

200 suffix_found = True 

201 while suffix_found: 

202 suffix_found = False 

203 for suffix in self.manuscript_suffixes: 

204 if base_wit.endswith(suffix): 

205 suffix_found = True 

206 base_wit = base_wit[: -len(suffix)] 

207 break # stop looking for other suffixes 

208 # If the siglum stripped of this suffix now corresponds to a siglum in the witness list, then return it: 

209 if base_wit in self.witness_index_by_id: 

210 return base_wit 

211 # If we get here, then all possible manuscript suffixes have been stripped, and the resulting siglum does not correspond to a siglum in the witness list: 

212 return base_wit 

213 

214 def parse_list_wit(self, xml: et.ElementTree): 

215 """Given an XML tree for a collation, populates its list of witnesses from its listWit element. 

216 If the XML tree does not contain a listWit element, then a ParsingException is thrown listing all distinct witness sigla encountered in the collation. 

217 

218 Args: 

219 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

220 """ 

221 if self.verbose: 

222 print("Parsing witness list...") 

223 t0 = time.time() 

224 self.witnesses = [] 

225 self.witness_index_by_id = {} 

226 list_wits = xml.xpath("/tei:TEI//tei:listWit", namespaces={"tei": tei_ns}) 

227 if len(list_wits) == 0: 

228 # There is no listWit element: collect all distinct witness sigla in the collation and raise a ParsingException listing them: 

229 distinct_sigla = set() 

230 sigla = [] 

231 # Proceed for each rdg, rdgGrp, or witDetail element: 

232 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}): 

233 wit_str = rdg.get("wit") if rdg.get("wit") is not None else "" 

234 wits = wit_str.split() 

235 for wit in wits: 

236 siglum = wit.strip("#") # remove the URI prefix, if there is one 

237 if siglum not in distinct_sigla: 

238 distinct_sigla.add(siglum) 

239 sigla.append(siglum) 

240 sigla.sort() 

241 msg = "" 

242 msg += "An explicit listWit element must be included in the TEI XML collation.\n" 

243 msg += "The following sigla occur in the collation and should be included as the @xml:id or @n attributes of witness elements under the listWit element:\n" 

244 msg += ", ".join(sigla) 

245 raise ParsingException(msg) 

246 # Otherwise, take the first listWit element as the list of all witnesses and process it: 

247 list_wit = list_wits[0] 

248 for witness in list_wit.xpath("./tei:witness", namespaces={"tei": tei_ns}): 

249 wit = Witness(witness, self.verbose) 

250 self.witness_index_by_id[wit.id] = len(self.witnesses) 

251 self.witnesses.append(wit) 

252 t1 = time.time() 

253 if self.verbose: 

254 print("Finished processing %d witnesses in %0.4fs." % (len(self.witnesses), t1 - t0)) 

255 return 

256 

257 def validate_wits(self, xml: et.ElementTree): 

258 """Given an XML tree for a collation, checks if any witness sigla listed in a rdg, rdgGrp, or witDetail element, 

259 once stripped of ignored suffixes, is not found in the witness list. 

260 A warning will be issued for each distinct siglum like this. 

261 This method also checks if the upper bound of any witness's date is earlier than the lower bound on the collated work's date of origin 

262 and throws an exception if so. 

263 

264 Args: 

265 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

266 """ 

267 if self.verbose: 

268 print("Validating witness list against collation...") 

269 t0 = time.time() 

270 # There is no listWit element: collect all distinct witness sigla in the collation and raise an exception listing them: 

271 distinct_extra_sigla = set() 

272 extra_sigla = [] 

273 # Proceed for each rdg, rdgGrp, or witDetail element: 

274 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}): 

275 wit_str = rdg.get("wit") if rdg.get("wit") is not None else "" 

276 wits = wit_str.split() 

277 for wit in wits: 

278 siglum = wit.strip("#") # remove the URI prefix, if there is one 

279 base_siglum = self.get_base_wit(siglum) 

280 if base_siglum not in self.witness_index_by_id: 

281 if base_siglum not in distinct_extra_sigla: 

282 distinct_extra_sigla.add(base_siglum) 

283 extra_sigla.append(base_siglum) 

284 if len(extra_sigla) > 0: 

285 extra_sigla.sort() 

286 msg = "" 

287 msg += "WARNING: The following sigla occur in the collation that do not have corresponding witness entries in the listWit:\n" 

288 msg += ", ".join(extra_sigla) 

289 print(msg) 

290 # If the lower bound on the date of origin is defined, then check each witness against it: 

291 if self.origin_date_range[0] is not None: 

292 bad_date_witness_sigla = [] 

293 bad_date_upper_bounds_by_witness = {} 

294 for i, wit in enumerate(self.witnesses): 

295 if wit.date_range[1] is not None and wit.date_range[1] < self.origin_date_range[0]: 

296 bad_date_witness_sigla.append(wit.id) 

297 bad_date_upper_bounds_by_witness[wit.id] = wit.date_range[1] 

298 if len(bad_date_witness_sigla) > 0: 

299 msg = "" 

300 msg += "The following witnesses have their latest possible dates before the earliest date of origin %d specified for the collated work:\n" 

301 msg += ", ".join( 

302 [ 

303 (siglum + "(" + str(bad_date_upper_bounds_by_witness[siglum]) + ")") 

304 for siglum in bad_date_witness_sigla 

305 ] 

306 ) 

307 raise WitnessDateException(msg) 

308 t1 = time.time() 

309 if self.verbose: 

310 print("Finished witness validation in %0.4fs." % (t1 - t0)) 

311 return 

312 

313 def update_witness_date_ranges_from_dates_file(self, dates_file: Union[Path, str]): 

314 """Given a CSV-formatted dates file, update the date ranges of all witnesses whose IDs are in the first column of the dates file 

315 (overwriting existing date ranges if necessary). 

316 """ 

317 if self.verbose: 

318 print("Updating witness dates from file %s..." % (str(dates_file))) 

319 t0 = time.time() 

320 dates_df = pd.read_csv(dates_file, index_col=0, names=["id", "min", "max"]) 

321 for witness in self.witnesses: 

322 wit_id = witness.id 

323 if wit_id in dates_df.index: 

324 # For every witness in the list whose ID is specified in the dates file, 

325 # update their date ranges (as long as the date ranges in the file are are well-formed): 

326 min_date = int(dates_df.loc[wit_id]["min"]) if not np.isnan(dates_df.loc[wit_id]["min"]) else None 

327 max_date = ( 

328 int(dates_df.loc[wit_id]["max"]) 

329 if not np.isnan(dates_df.loc[wit_id]["max"]) 

330 else datetime.now().year 

331 ) 

332 print(min_date, max_date) 

333 if min_date is not None and max_date is not None and min_date > max_date: 

334 raise ParsingException( 

335 "In dates file %s, for witness ID %s, the minimum date %d is greater than the maximum date %d." 

336 % (str(dates_file), wit_id, min_date, max_date) 

337 ) 

338 witness.date_range = [min_date, max_date] 

339 t1 = time.time() 

340 if self.verbose: 

341 print("Finished witness date range updates in %0.4fs." % (t1 - t0)) 

342 return 

343 

344 def update_origin_date_range_from_witness_date_ranges(self): 

345 """Conditionally updates the upper bound on the date of origin of the work represented by this Collation 

346 based on the bounds on the witnesses' dates. 

347 If none of the witnesses have bounds on their dates, then nothing is done. 

348 This method is only invoked if the work's date of origin does not already have its upper bound defined. 

349 """ 

350 if self.verbose: 

351 print("Updating upper bound on origin date using witness dates...") 

352 t0 = time.time() 

353 # Set the origin date to the earliest witness date: 

354 witness_date_lower_bounds = [wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None] 

355 witness_date_upper_bounds = [wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None] 

356 min_witness_date = ( 

357 min(witness_date_lower_bounds + witness_date_upper_bounds) 

358 if len(witness_date_lower_bounds + witness_date_upper_bounds) > 0 

359 else None 

360 ) 

361 if min_witness_date is not None: 

362 self.origin_date_range[1] = ( 

363 min(self.origin_date_range[1], min_witness_date) 

364 if self.origin_date_range[1] is not None 

365 else min_witness_date 

366 ) 

367 t1 = time.time() 

368 if self.verbose: 

369 print("Finished updating upper bound on origin date in %0.4fs." % (t1 - t0)) 

370 return 

371 

372 def update_witness_date_ranges_from_origin_date_range(self): 

373 """Attempts to update the lower bounds on the witnesses' dates of origin of the work represented by this Collation 

374 using the upper bound on the date of origin of the work represented by this Collation. 

375 This method is only invoked if the upper bound on the work's date of origin was not already defined 

376 (i.e., if update_origin_date_range_from_witness_date_ranges was not invoked). 

377 """ 

378 if self.verbose: 

379 print("Updating lower bounds on witness dates using origin date...") 

380 t0 = time.time() 

381 # Proceed for every witness: 

382 for i, wit in enumerate(self.witnesses): 

383 # Ensure that the lower bound on this witness's date is no earlier than the upper bound on the date of the work's origin: 

384 wit.date_range[0] = ( 

385 max(wit.date_range[0], self.origin_date_range[1]) 

386 if wit.date_range[0] is not None 

387 else self.origin_date_range[1] 

388 ) 

389 # Then ensure that the upper bound on this witness's date is no earlier than its lower bound, in case we updated it: 

390 wit.date_range[1] = max(wit.date_range[0], wit.date_range[1]) 

391 t1 = time.time() 

392 if self.verbose: 

393 print("Finished updating lower bounds on witness dates in %0.4fs." % (t1 - t0)) 

394 return 

395 

396 def parse_weights(self, xml: et.ElementTree): 

397 """Given an XML tree for a collation, populates this Collation's list of variation unit weight categories 

398 (associated with types of variation that may have different expected frequencies) 

399 and its dictionary mapping these categories to integer weights. 

400 

401 Args: 

402 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

403 """ 

404 if self.verbose: 

405 print("Parsing variation unit weight categories...") 

406 t0 = time.time() 

407 self.weight_categories = [] 

408 self.weights_by_id = {} 

409 for interp in xml.xpath("//tei:interpGrp[@type=\"weight\"]/tei:interp", namespaces={"tei": tei_ns}): 

410 # These must be indexed by the xml:id attribute, so skip any that do not have one: 

411 if interp.get("{%s}id" % xml_ns) is None: 

412 continue 

413 weight_category = interp.get("{%s}id" % xml_ns) 

414 # Retrieve this element's text value and coerce it to an integer, defaulting to 1 if it has none: 

415 weight = 1 

416 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}): 

417 if certainty.get("degree") is not None: 

418 weight = int(certainty.get("degree")) 

419 break 

420 self.weight_categories.append(weight_category) 

421 self.weights_by_id[weight_category] = weight 

422 t1 = time.time() 

423 if self.verbose: 

424 print( 

425 "Finished processing %d variation unit weight categories in %0.4fs." 

426 % (len(self.weight_categories), t1 - t0) 

427 ) 

428 return 

429 

430 def parse_intrinsic_odds(self, xml: et.ElementTree): 

431 """Given an XML tree for a collation, populates this Collation's list of intrinsic probability categories 

432 (e.g., "absolutely more likely," "highly more likely," "more likely," "slightly more likely," "equally likely") 

433 and its dictionary mapping these categories to numerical odds. 

434 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated. 

435 

436 Args: 

437 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

438 """ 

439 if self.verbose: 

440 print("Parsing intrinsic odds categories...") 

441 t0 = time.time() 

442 self.intrinsic_categories = [] 

443 self.intrinsic_odds_by_id = {} 

444 for interp in xml.xpath("//tei:interpGrp[@type=\"intrinsic\"]/tei:interp", namespaces={"tei": tei_ns}): 

445 # These must be indexed by the xml:id attribute, so skip any that do not have one: 

446 if interp.get("{%s}id" % xml_ns) is None: 

447 continue 

448 odds_category = interp.get("{%s}id" % xml_ns) 

449 # If this element contains a certainty subelement with a fixed odds value for this category, then set it: 

450 odds = None 

451 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}): 

452 if certainty.get("degree") is not None: 

453 odds = float(certainty.get("degree")) 

454 break 

455 self.intrinsic_categories.append(odds_category) 

456 self.intrinsic_odds_by_id[odds_category] = odds 

457 t1 = time.time() 

458 if self.verbose: 

459 print( 

460 "Finished processing %d intrinsic odds categories in %0.4fs." 

461 % (len(self.intrinsic_categories), t1 - t0) 

462 ) 

463 return 

464 

465 def parse_transcriptional_rates(self, xml: et.ElementTree): 

466 """Given an XML tree for a collation, populates this Collation's dictionary mapping transcriptional change categories 

467 (e.g., "aural confusion," "visual error," "clarification") to numerical rates. 

468 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated. 

469 

470 Args: 

471 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

472 """ 

473 if self.verbose: 

474 print("Parsing transcriptional change categories...") 

475 t0 = time.time() 

476 self.transcriptional_categories = [] 

477 self.transcriptional_rates_by_id = {} 

478 for interp in xml.xpath("//tei:interpGrp[@type=\"transcriptional\"]/tei:interp", namespaces={"tei": tei_ns}): 

479 # These must be indexed by the xml:id attribute, so skip any that do not have one: 

480 if interp.get("{%s}id" % xml_ns) is None: 

481 continue 

482 transcriptional_category = interp.get("{%s}id" % xml_ns) 

483 # If this element contains a certainty subelement with a fixed rate for this category, then set it: 

484 rate = None 

485 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}): 

486 if certainty.get("degree") is not None: 

487 rate = float(certainty.get("degree")) 

488 break 

489 self.transcriptional_categories.append(transcriptional_category) 

490 self.transcriptional_rates_by_id[transcriptional_category] = rate 

491 t1 = time.time() 

492 if self.verbose: 

493 print( 

494 "Finished processing %d transcriptional change categories in %0.4fs." 

495 % (len(self.transcriptional_rates_by_id), t1 - t0) 

496 ) 

497 return 

498 

499 def validate_intrinsic_relations(self): 

500 """Checks if any VariationUnit's intrinsic_relations map is not a forest. 

501 If any is not, then an IntrinsicRelationsException is thrown describing the VariationUnit at fault. 

502 """ 

503 if self.verbose: 

504 print("Validating intrinsic relation graphs for variation units...") 

505 t0 = time.time() 

506 for vu in self.variation_units: 

507 # Skip any variation units with an empty intrinsic_relations map: 

508 if len(vu.intrinsic_relations) == 0: 

509 continue 

510 # For all others, start by identifying all reading IDs that are not related to by some other reading ID: 

511 in_degree_by_reading = {} 

512 for edge in vu.intrinsic_relations: 

513 s = edge[0] 

514 t = edge[1] 

515 if s not in in_degree_by_reading: 

516 in_degree_by_reading[s] = 0 

517 if t not in in_degree_by_reading: 

518 in_degree_by_reading[t] = 0 

519 in_degree_by_reading[t] += 1 

520 # If any reading has more than one relation pointing to it, then the intrinsic relations graph is not a forest: 

521 excessive_in_degree_readings = [ 

522 rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] > 1 

523 ] 

524 if len(excessive_in_degree_readings) > 0: 

525 msg = "" 

526 msg += ( 

527 "In variation unit %s, the following readings have more than one intrinsic relation pointing to them: %s.\n" 

528 % (vu.id, ", ".join(excessive_in_degree_readings)) 

529 ) 

530 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it." 

531 raise IntrinsicRelationsException(msg) 

532 # If every reading has another reading pointing to it, then the intrinsic relations graph contains a cycle and is not a forest: 

533 starting_nodes = [rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] == 0] 

534 if len(starting_nodes) == 0: 

535 msg = "" 

536 msg += "In variation unit %s, the intrinsic relations contain a cycle.\n" % vu.id 

537 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it." 

538 raise IntrinsicRelationsException(msg) 

539 t1 = time.time() 

540 if self.verbose: 

541 print("Finished intrinsic relations validation in %0.4fs." % (t1 - t0)) 

542 return 

543 

544 def parse_apps(self, xml: et.ElementTree): 

545 """Given an XML tree for a collation, populates its list of variation units from its app elements. 

546 

547 Args: 

548 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

549 """ 

550 if self.verbose: 

551 print("Parsing variation units...") 

552 t0 = time.time() 

553 for a in xml.xpath('//tei:app', namespaces={'tei': tei_ns}): 

554 vu = VariationUnit(a, self.verbose) 

555 self.variation_units.append(vu) 

556 t1 = time.time() 

557 if self.verbose: 

558 print("Finished processing %d variation units in %0.4fs." % (len(self.variation_units), t1 - t0)) 

559 return 

560 

561 def get_readings_by_witness_for_unit(self, vu: VariationUnit): 

562 """Returns a dictionary mapping witness IDs to a list of their reading coefficients for a given variation unit. 

563 

564 Args: 

565 vu: A VariationUnit to be processed. 

566 

567 Returns: 

568 A dictionary mapping witness ID strings to a list of their coefficients for all substantive readings in this VariationUnit. 

569 """ 

570 # In a first pass, populate lists of substantive (variation unit ID, reading ID) tuples and reading labels 

571 # and a map from reading IDs to the indices of their parent substantive reading in this unit: 

572 reading_id_to_index = {} 

573 self.substantive_readings_by_variation_unit_id[vu.id] = [] 

574 for rdg in vu.readings: 

575 # If this reading is missing (e.g., lacunose or inapplicable due to an overlapping variant) or targets another reading, then skip it: 

576 if rdg.type in self.missing_reading_types or len(rdg.certainties) > 0: 

577 continue 

578 # If this reading is trivial, then map it to the last substantive index: 

579 if rdg.type in self.trivial_reading_types: 

580 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1 

581 continue 

582 # Otherwise, the reading is substantive: add it to the map and update the last substantive index: 

583 self.substantive_readings_by_variation_unit_id[vu.id].append(rdg.id) 

584 self.substantive_variation_unit_reading_tuples.append(tuple([vu.id, rdg.id])) 

585 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1 

586 # If the list of substantive readings only contains one entry, then this variation unit is not informative; 

587 # return an empty dictionary and add nothing to the list of substantive reading labels: 

588 if self.verbose: 

589 print( 

590 "Variation unit %s has %d substantive readings." 

591 % (vu.id, len(self.substantive_readings_by_variation_unit_id[vu.id])) 

592 ) 

593 readings_by_witness_for_unit = {} 

594 # Initialize the output dictionary with empty sets for all base witnesses: 

595 for wit in self.witnesses: 

596 readings_by_witness_for_unit[wit.id] = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id]) 

597 # In a second pass, assign each base witness a set containing the readings it supports in this unit: 

598 for rdg in vu.readings: 

599 # Initialize the dictionary indicating support for this reading (or its disambiguations): 

600 rdg_support = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id]) 

601 # If this is a missing reading (e.g., a lacuna or an overlap), then we can skip it, as its corresponding set will be empty: 

602 if rdg.type in self.missing_reading_types: 

603 continue 

604 # Otherwise, if this reading is trivial, then it will contain an entry for the index of its parent substantive reading: 

605 elif rdg.type in self.trivial_reading_types: 

606 rdg_support[reading_id_to_index[rdg.id]] = 1 

607 # Otherwise, if this reading has one or more nonzero certainty degrees, 

608 # then set the entries for these readings to their degrees: 

609 elif sum(rdg.certainties.values()) > 0: 

610 for t in rdg.certainties: 

611 # Skip any reading whose ID is unrecognized in this unit: 

612 if t in reading_id_to_index: 

613 rdg_support[reading_id_to_index[t]] = rdg.certainties[t] 

614 # Otherwise, if this reading has one or more targets (i.e., if it is an ambiguous reading), 

615 # then set the entries for each of its targets to 1: 

616 elif len(rdg.targets) > 0: 

617 for t in rdg.targets: 

618 # Skip any reading whose ID is unrecognized in this unit: 

619 if t in reading_id_to_index: 

620 rdg_support[reading_id_to_index[t]] = 1 

621 # Otherwise, this reading is itself substantive; set the entry for the index of this reading to 1: 

622 else: 

623 rdg_support[reading_id_to_index[rdg.id]] = 1 

624 # Proceed for each witness siglum in the support for this reading: 

625 for wit in rdg.wits: 

626 # Is this siglum a base siglum? 

627 base_wit = self.get_base_wit(wit) 

628 if base_wit not in self.witness_index_by_id: 

629 # If it is not, then it is probably just because we've encountered a corrector or some other secondary witness not included in the witness list; 

630 # report this if we're in verbose mode and move on: 

631 if self.verbose: 

632 print( 

633 "Skipping unknown witness siglum %s (base siglum %s) in variation unit %s, reading %s..." 

634 % (wit, base_wit, vu.id, rdg.id) 

635 ) 

636 continue 

637 # If we've found a base siglum, then add this reading's contribution to the base witness's reading set for this unit; 

638 # normally the existing set will be empty, but if we reduce two suffixed sigla to the same base witness, 

639 # then that witness may attest to multiple readings in the same unit: 

640 readings_by_witness_for_unit[base_wit] = [ 

641 (min(readings_by_witness_for_unit[base_wit][i] + rdg_support[i], 1)) 

642 for i in range(len(rdg_support)) 

643 ] 

644 return readings_by_witness_for_unit 

645 

646 def parse_readings_by_witness(self): 

647 """Populates the internal dictionary mapping witness IDs to a list of their reading support sets for all variation units, and then fills the empty reading support sets for witnesses of type "corrector" with the entries of the previous witness.""" 

648 if self.verbose: 

649 print("Populating internal dictionary of witness readings...") 

650 t0 = time.time() 

651 # Initialize the data structures to be populated here: 

652 self.readings_by_witness = {} 

653 self.variation_unit_ids = [] 

654 for wit in self.witnesses: 

655 self.readings_by_witness[wit.id] = [] 

656 # Populate them for each variation unit: 

657 for vu in self.variation_units: 

658 readings_by_witness_for_unit = self.get_readings_by_witness_for_unit(vu) 

659 if len(readings_by_witness_for_unit) > 0: 

660 self.variation_unit_ids.append(vu.id) 

661 for wit in readings_by_witness_for_unit: 

662 self.readings_by_witness[wit].append(readings_by_witness_for_unit[wit]) 

663 # Optionally, fill the lacunae of the correctors: 

664 if self.fill_corrector_lacunae: 

665 for i, wit in enumerate(self.witnesses): 

666 # If this is the first witness, then it shouldn't be a corrector (since there is no previous witness against which to compare it): 

667 if i == 0: 

668 continue 

669 # Otherwise, if this witness is not a corrector, then skip it: 

670 if wit.type != "corrector": 

671 continue 

672 # Otherwise, retrieve the previous witness: 

673 prev_wit = self.witnesses[i - 1] 

674 for j in range(len(self.readings_by_witness[wit.id])): 

675 # If the corrector has no reading, then set it to the previous witness's reading: 

676 if sum(self.readings_by_witness[wit.id][j]) == 0: 

677 self.readings_by_witness[wit.id][j] = self.readings_by_witness[prev_wit.id][j] 

678 t1 = time.time() 

679 if self.verbose: 

680 print( 

681 "Populated dictionary for %d witnesses over %d substantive variation units in %0.4fs." 

682 % (len(self.witnesses), len(self.variation_unit_ids), t1 - t0) 

683 ) 

684 return 

685 

686 def filter_fragmentary_witnesses(self, xml): 

687 """Filters the original witness list and readings by witness dictionary to exclude witnesses whose proportions of extant passages fall below the fragmentary readings threshold.""" 

688 if self.verbose: 

689 print( 

690 "Filtering fragmentary witnesses (extant in < %f of all variation units) out of internal witness list and dictionary of witness readings..." 

691 % self.fragmentary_threshold 

692 ) 

693 t0 = time.time() 

694 fragmentary_witness_set = set() 

695 # Proceed for each witness in order: 

696 for wit in self.witnesses: 

697 wit_id = wit.id 

698 # We count the number of variation units at which this witness has an extant (i.e., non-missing) reading: 

699 extant_reading_count = 0 

700 total_reading_count = len(self.readings_by_witness[wit.id]) 

701 # Proceed through all reading support lists: 

702 for rdg_support in self.readings_by_witness[wit_id]: 

703 # If the current reading support list is not all zeroes, then increment this witness's count of extant readings: 

704 if sum(rdg_support) != 0: 

705 extant_reading_count += 1 

706 # If the proportion of extant readings falls below the threshold, then add this witness to the list of fragmentary witnesses: 

707 if extant_reading_count / total_reading_count < self.fragmentary_threshold: 

708 fragmentary_witness_set.add(wit_id) 

709 # Then filter the witness list to exclude the fragmentary witnesses: 

710 filtered_witnesses = [wit for wit in self.witnesses if wit.id not in fragmentary_witness_set] 

711 self.witnesses = filtered_witnesses 

712 # Then remove the entries for the fragmentary witnesses from the witnesses-to-readings dictionary: 

713 for wit_id in fragmentary_witness_set: 

714 del self.readings_by_witness[wit_id] 

715 t1 = time.time() 

716 if self.verbose: 

717 print( 

718 "Filtered out %d fragmentary witness(es) (%s) in %0.4fs." 

719 % (len(fragmentary_witness_set), str(list(fragmentary_witness_set)), t1 - t0) 

720 ) 

721 return 

722 

723 def get_nexus_symbols(self): 

724 """Returns a list of one-character symbols needed to represent the states of all substantive readings in NEXUS. 

725 

726 The number of symbols equals the maximum number of substantive readings at any variation unit. 

727 

728 Returns: 

729 A list of individual characters representing states in readings. 

730 """ 

731 # NOTE: IQTREE does not appear to support symbols outside of 0-9 and a-z, and its base symbols must be case-insensitive. 

732 # The official version of MrBayes is likewise limited to 32 symbols. 

733 # But PAUP* allows up to 64 symbols, and Andrew Edmondson's fork of MrBayes does, as well. 

734 # So this method will support symbols from 0-9, a-z, and A-Z (for a total of 62 states) 

735 possible_symbols = list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase) 

736 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

737 nsymbols = 0 

738 # If there are no witnesses, then no symbols are needed at all: 

739 if len(self.witnesses) == 0: 

740 return [] 

741 wit_id = self.witnesses[0].id 

742 for rdg_support in self.readings_by_witness[wit_id]: 

743 nsymbols = max(nsymbols, len(rdg_support)) 

744 nexus_symbols = possible_symbols[:nsymbols] 

745 return nexus_symbols 

746 

747 def to_nexus( 

748 self, 

749 file_addr: Union[Path, str], 

750 drop_constant: bool = False, 

751 char_state_labels: bool = True, 

752 frequency: bool = False, 

753 ambiguous_as_missing: bool = False, 

754 calibrate_dates: bool = False, 

755 mrbayes: bool = False, 

756 clock_model: ClockModel = ClockModel.strict, 

757 ): 

758 """Writes this Collation to a NEXUS file with the given address. 

759 

760 Args: 

761 file_addr: A string representing the path to an output NEXUS file; the file type should be .nex, .nexus, or .nxs. 

762 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

763 char_state_labels: An optional flag indicating whether or not to include the CharStateLabels block. 

764 frequency: An optional flag indicating whether to use the StatesFormat=Frequency setting 

765 instead of the StatesFormat=StatesPresent setting 

766 (and thus represent all states with frequency vectors rather than symbols). 

767 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation. 

768 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data. 

769 If this flag is set, then only base symbols will be generated for the NEXUS file. 

770 It is only applied if the frequency option is False. 

771 calibrate_dates: An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses. 

772 This option is intended for inputs to BEAST 2. 

773 mrbayes: An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses. 

774 This option is intended for inputs to MrBayes. 

775 clock_model: A ClockModel option indicating which type of clock model to use. 

776 This option is intended for inputs to MrBayes and BEAST 2. 

777 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified. 

778 """ 

779 # Populate a list of sites that will correspond to columns of the sequence alignment: 

780 substantive_variation_unit_ids = self.variation_unit_ids 

781 if drop_constant: 

782 substantive_variation_unit_ids = [ 

783 vu_id 

784 for vu_id in self.variation_unit_ids 

785 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

786 ] 

787 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

788 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

789 # Start by calculating the values we will be using here: 

790 ntax = len(self.witnesses) 

791 nchar = len(substantive_variation_unit_ids) 

792 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses] 

793 max_taxlabel_length = max( 

794 [len(taxlabel) for taxlabel in taxlabels] 

795 ) # keep track of the longest taxon label for tabular alignment purposes 

796 charlabels = [slugify(vu_id, lowercase=False, separator='_') for vu_id in substantive_variation_unit_ids] 

797 missing_symbol = '?' 

798 symbols = self.get_nexus_symbols() 

799 # Generate all parent folders for this file that don't already exist: 

800 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

801 # Then write the file: 

802 pbar = tqdm() 

803 with open(file_addr, "w", encoding="utf-8") as f: 

804 # Start with the NEXUS header: 

805 f.write("#NEXUS\n\n") 

806 # Then begin the data block: 

807 f.write("Begin DATA;\n") 

808 # Write the collation matrix dimensions: 

809 f.write("\tDimensions ntax=%d nchar=%d;\n" % (ntax, nchar)) 

810 # Write the format subblock: 

811 f.write("\tFormat\n") 

812 f.write("\t\tDataType=Standard\n") 

813 f.write("\t\tMissing=%s\n" % missing_symbol) 

814 if frequency: 

815 f.write("\t\tStatesFormat=Frequency\n") 

816 f.write("\t\tSymbols=\"%s\";\n" % (" ".join(symbols))) 

817 # If the char_state_labels is set, then write the labels for character-state labels, with each on its own line: 

818 if char_state_labels: 

819 f.write("\tCharStateLabels") 

820 vu_ind = 1 

821 for vu in self.variation_units: 

822 if vu.id not in substantive_variation_unit_ids_set: 

823 continue 

824 if vu_ind == 1: 

825 f.write("\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_'))) 

826 else: 

827 f.write(",\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_'))) 

828 rdg_ind = 0 

829 for rdg in vu.readings: 

830 key = tuple([vu.id, rdg.id]) 

831 if key not in substantive_variation_unit_reading_tuples_set: 

832 continue 

833 ascii_rdg_text = slugify( 

834 rdg.text, lowercase=False, separator='_', replacements=[['η', 'h'], ['ω', 'w']] 

835 ) 

836 if ascii_rdg_text == "": 

837 ascii_rdg_text = "om." 

838 f.write(" %s" % ascii_rdg_text) 

839 rdg_ind += 1 

840 if rdg_ind > 0: 

841 vu_ind += 1 

842 f.write(";\n") 

843 # Write the matrix subblock: 

844 f.write("\tMatrix") 

845 with tqdm(total=len(self.witnesses)) as pbar: 

846 for i, wit in enumerate(self.witnesses): 

847 taxlabel = taxlabels[i] 

848 if frequency: 

849 sequence = "\n\t\t" + taxlabel 

850 for j, vu_id in enumerate(self.variation_unit_ids): 

851 if vu_id not in substantive_variation_unit_ids_set: 

852 continue 

853 rdg_support = self.readings_by_witness[wit.id][j] 

854 sequence += "\n\t\t\t" 

855 # If this reading is lacunose in this witness, then use the missing character: 

856 if sum(rdg_support) == 0: 

857 sequence += missing_symbol 

858 continue 

859 # Otherwise, print out its frequencies for different readings in parentheses: 

860 sequence += "(" 

861 for k, w in enumerate(rdg_support): 

862 sequence += "%s:%0.4f" % (symbols[k], w) 

863 if k < len(rdg_support) - 1: 

864 sequence += " " 

865 sequence += ")" 

866 else: 

867 sequence = "\n\t\t" + taxlabel 

868 # Add enough space after this label ensure that all sequences are nicely aligned: 

869 sequence += " " * (max_taxlabel_length - len(taxlabel) + 1) 

870 for j, vu_id in enumerate(self.variation_unit_ids): 

871 if vu_id not in substantive_variation_unit_ids_set: 

872 continue 

873 rdg_support = self.readings_by_witness[wit.id][j] 

874 # If this reading is lacunose in this witness, then use the missing character: 

875 if sum(rdg_support) == 0: 

876 sequence += missing_symbol 

877 continue 

878 rdg_inds = [ 

879 k for k, w in enumerate(rdg_support) if w > 0 

880 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

881 # For singleton readings, just print the symbol: 

882 if len(rdg_inds) == 1: 

883 sequence += symbols[rdg_inds[0]] 

884 continue 

885 # For multiple readings, print the corresponding readings in braces or the missing symbol depending on input settings: 

886 if ambiguous_as_missing: 

887 sequence += missing_symbol 

888 else: 

889 sequence += "{%s}" % "".join([str(rdg_ind) for rdg_ind in rdg_inds]) 

890 f.write("%s" % (sequence)) 

891 pbar.update(1) 

892 f.write(";\n") 

893 # End the data block: 

894 f.write("End;") 

895 # If calibrate_dates is set, then add the assumptions block: 

896 if calibrate_dates: 

897 f.write("\n\n") 

898 f.write("Begin ASSUMPTIONS;\n") 

899 # Set the scale to years: 

900 f.write("\tOPTIONS SCALE = years;\n\n") 

901 # Then calibrate the witness ages: 

902 calibrate_strings = [] 

903 for i, wit in enumerate(self.witnesses): 

904 taxlabel = taxlabels[i] 

905 date_range = wit.date_range 

906 if date_range[0] is not None: 

907 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution, 

908 # depending on whether the upper and lower bounds match: 

909 min_age = datetime.now().year - date_range[1] 

910 max_age = datetime.now().year - date_range[0] 

911 if min_age == max_age: 

912 calibrate_string = "\tCALIBRATE %s = fixed(%d)" % (taxlabel, min_age) 

913 calibrate_strings.append(calibrate_string) 

914 else: 

915 calibrate_string = "\tCALIBRATE %s = uniform(%d,%d)" % (taxlabel, min_age, max_age) 

916 calibrate_strings.append(calibrate_string) 

917 else: 

918 # If there is no lower bound on the witness's date, then use an offset log-normal distribution: 

919 min_age = datetime.now().year - date_range[1] 

920 calibrate_string = "\tCALIBRATE %s = offsetlognormal(%d,0.0,1.0)" % (taxlabel, min_age) 

921 calibrate_strings.append(calibrate_string) 

922 # Then print the calibrate strings, separated by commas and line breaks and terminated by a semicolon: 

923 f.write("%s;\n\n" % ",\n".join(calibrate_strings)) 

924 # End the assumptions block: 

925 f.write("End;") 

926 # If mrbayes is set, then add the mrbayes block: 

927 if mrbayes: 

928 f.write("\n\n") 

929 f.write("Begin MRBAYES;\n") 

930 # Turn on the autoclose feature by default: 

931 f.write("\tset autoclose=yes;\n") 

932 # Set the branch lengths to be governed by a birth-death clock model, and set up the parameters for this model: 

933 f.write("\n") 

934 f.write("\tprset brlenspr = clock:birthdeath;\n") 

935 f.write("\tprset speciationpr = uniform(0.0,10.0);\n") 

936 f.write("\tprset extinctionpr = beta(2.0,4.0);\n") 

937 f.write("\tprset sampleprob = 0.01;\n") 

938 # Use the specified clock model: 

939 f.write("\n") 

940 if clock_model == clock_model.uncorrelated: 

941 f.write("\tprset clockvarpr=igr;\n") 

942 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n") 

943 f.write("\tprset igrvarpr=exponential(1.0);\n") 

944 else: 

945 f.write("\tprset clockvarpr=strict;\n") 

946 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n") 

947 # Set the priors on the tree age depending on the date range for the origin of the collated work: 

948 f.write("\n") 

949 if self.origin_date_range[0] is not None: 

950 min_tree_age = ( 

951 datetime.now().year - self.origin_date_range[1] 

952 if self.origin_date_range[1] is not None 

953 else 0.0 

954 ) 

955 max_tree_age = datetime.now().year - self.origin_date_range[0] 

956 f.write("\tprset treeagepr = uniform(%d,%d);\n" % (min_tree_age, max_tree_age)) 

957 else: 

958 min_tree_age = ( 

959 datetime.now().year - self.origin_date_range[1] 

960 if self.origin_date_range[1] is not None 

961 else 0.0 

962 ) 

963 f.write("\tprset treeagepr = offsetgamma(%d,1.0,1.0);\n" % (min_tree_age)) 

964 # Then calibrate the witness ages: 

965 f.write("\n") 

966 f.write("\tprset nodeagepr = calibrated;\n") 

967 for i, wit in enumerate(self.witnesses): 

968 taxlabel = taxlabels[i] 

969 date_range = wit.date_range 

970 if date_range[0] is not None: 

971 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution, 

972 # depending on whether the upper and lower bounds match: 

973 min_age = datetime.now().year - date_range[1] 

974 max_age = datetime.now().year - date_range[0] 

975 if min_age == max_age: 

976 f.write("\tcalibrate %s = fixed(%d);\n" % (taxlabel, min_age)) 

977 else: 

978 f.write("\tcalibrate %s = uniform(%d,%d);\n" % (taxlabel, min_age, max_age)) 

979 else: 

980 # If there is no lower bound on the witness's date, then use an offset gamma distribution: 

981 min_age = datetime.now().year - date_range[1] 

982 f.write("\tcalibrate %s = offsetgamma(%d,1.0,1.0);\n" % (taxlabel, min_age)) 

983 f.write("\n") 

984 # Add default settings for MCMC estimation of posterior distribution: 

985 f.write("\tmcmcp ngen=100000;\n") 

986 # Write the command to run MrBayes: 

987 f.write("\tmcmc;\n") 

988 # End the assumptions block: 

989 f.write("End;") 

990 return 

991 

992 def get_hennig86_symbols(self): 

993 """Returns a list of one-character symbols needed to represent the states of all substantive readings in Hennig86 format. 

994 

995 The number of symbols equals the maximum number of substantive readings at any variation unit. 

996 

997 Returns: 

998 A list of individual characters representing states in readings. 

999 """ 

1000 possible_symbols = ( 

1001 list(string.digits) + list(string.ascii_uppercase)[:22] 

1002 ) # NOTE: the maximum number of symbols allowed in Hennig86 format is 32 

1003 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1004 nsymbols = 0 

1005 # If there are no witnesses, then no symbols are needed at all: 

1006 if len(self.witnesses) == 0: 

1007 return [] 

1008 wit_id = self.witnesses[0].id 

1009 for rdg_support in self.readings_by_witness[wit_id]: 

1010 nsymbols = max(nsymbols, len(rdg_support)) 

1011 hennig86_symbols = possible_symbols[:nsymbols] 

1012 return hennig86_symbols 

1013 

1014 def to_hennig86(self, file_addr: Union[Path, str], drop_constant: bool = False): 

1015 """Writes this Collation to a file in Hennig86 format with the given address. 

1016 Note that because Hennig86 format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data. 

1017 

1018 Args: 

1019 file_addr: A string representing the path to an output file. 

1020 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1021 """ 

1022 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1023 substantive_variation_unit_ids = self.variation_unit_ids 

1024 if drop_constant: 

1025 substantive_variation_unit_ids = [ 

1026 vu_id 

1027 for vu_id in self.variation_unit_ids 

1028 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1029 ] 

1030 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1031 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1032 # Start by calculating the values we will be using here: 

1033 ntax = len(self.witnesses) 

1034 nchar = len(substantive_variation_unit_ids) 

1035 taxlabels = [] 

1036 for wit in self.witnesses: 

1037 taxlabel = wit.id 

1038 # Hennig86 format requires taxon names to start with a letter, so if this is not the case, then append "WIT_" to the start of the name: 

1039 if taxlabel[0] not in string.ascii_letters: 

1040 taxlabel = "WIT_" + taxlabel 

1041 # Then replace any disallowed characters in the string with an underscore: 

1042 taxlabel = slugify(taxlabel, lowercase=False, separator='_') 

1043 taxlabels.append(taxlabel) 

1044 max_taxlabel_length = max( 

1045 [len(taxlabel) for taxlabel in taxlabels] 

1046 ) # keep track of the longest taxon label for tabular alignment purposes 

1047 missing_symbol = '?' 

1048 symbols = self.get_hennig86_symbols() 

1049 # Generate all parent folders for this file that don't already exist: 

1050 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1051 with open(file_addr, "w", encoding="ascii") as f: 

1052 # Start with the nstates header: 

1053 f.write("nstates %d;\n" % len(symbols)) 

1054 # Then begin the xread block: 

1055 f.write("xread\n") 

1056 # Write the dimensions: 

1057 f.write("%d %d\n" % (nchar, ntax)) 

1058 # Now write the matrix: 

1059 with tqdm(total=len(self.witnesses)) as pbar: 

1060 for i, wit in enumerate(self.witnesses): 

1061 taxlabel = taxlabels[i] 

1062 # Add enough space after this label ensure that all sequences are nicely aligned: 

1063 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel) + 1)) 

1064 for j, vu_id in enumerate(self.variation_unit_ids): 

1065 if vu_id not in substantive_variation_unit_ids_set: 

1066 continue 

1067 rdg_support = self.readings_by_witness[wit.id][j] 

1068 # If this reading is lacunose in this witness, then use the missing character: 

1069 if sum(rdg_support) == 0: 

1070 sequence += missing_symbol 

1071 continue 

1072 rdg_inds = [ 

1073 k for k, w in enumerate(rdg_support) if w > 0 

1074 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1075 # For singleton readings, just print the symbol: 

1076 if len(rdg_inds) == 1: 

1077 sequence += symbols[rdg_inds[0]] 

1078 continue 

1079 # For multiple readings, print the missing symbol: 

1080 sequence += missing_symbol 

1081 f.write("%s\n" % (sequence)) 

1082 pbar.update(1) 

1083 f.write(";") 

1084 return 

1085 

1086 def get_phylip_symbols(self): 

1087 """Returns a list of one-character symbols needed to represent the states of all substantive readings in PHYLIP format. 

1088 

1089 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1090 

1091 Returns: 

1092 A list of individual characters representing states in readings. 

1093 """ 

1094 possible_symbols = ( 

1095 list(string.digits) + list(string.ascii_lowercase)[:22] 

1096 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported 

1097 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1098 nsymbols = 0 

1099 # If there are no witnesses, then no symbols are needed at all: 

1100 if len(self.witnesses) == 0: 

1101 return [] 

1102 wit_id = self.witnesses[0].id 

1103 for rdg_support in self.readings_by_witness[wit_id]: 

1104 nsymbols = max(nsymbols, len(rdg_support)) 

1105 phylip_symbols = possible_symbols[:nsymbols] 

1106 return phylip_symbols 

1107 

1108 def to_phylip(self, file_addr: Union[Path, str], drop_constant: bool = False): 

1109 """Writes this Collation to a file in PHYLIP format with the given address. 

1110 Note that because PHYLIP format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data. 

1111 

1112 Args: 

1113 file_addr: A string representing the path to an output file. 

1114 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1115 """ 

1116 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1117 substantive_variation_unit_ids = self.variation_unit_ids 

1118 if drop_constant: 

1119 substantive_variation_unit_ids = [ 

1120 vu_id 

1121 for vu_id in self.variation_unit_ids 

1122 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1123 ] 

1124 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1125 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1126 # Start by calculating the values we will be using here: 

1127 ntax = len(self.witnesses) 

1128 nchar = len(substantive_variation_unit_ids) 

1129 taxlabels = [] 

1130 for wit in self.witnesses: 

1131 taxlabel = wit.id 

1132 # Then replace any disallowed characters in the string with an underscore: 

1133 taxlabel = slugify(taxlabel, lowercase=False, separator='_') 

1134 taxlabels.append(taxlabel) 

1135 max_taxlabel_length = max( 

1136 [len(taxlabel) for taxlabel in taxlabels] 

1137 ) # keep track of the longest taxon label for tabular alignment purposes 

1138 missing_symbol = '?' 

1139 symbols = self.get_phylip_symbols() 

1140 # Generate all parent folders for this file that don't already exist: 

1141 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1142 with open(file_addr, "w", encoding="ascii") as f: 

1143 # Write the dimensions: 

1144 f.write("%d %d\n" % (ntax, nchar)) 

1145 # Now write the matrix: 

1146 for i, wit in enumerate(self.witnesses): 

1147 taxlabel = taxlabels[i] 

1148 # Add enough space after this label ensure that all sequences are nicely aligned: 

1149 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel))) + "\t" 

1150 for j, vu_id in enumerate(self.variation_unit_ids): 

1151 if vu_id not in substantive_variation_unit_ids_set: 

1152 continue 

1153 rdg_support = self.readings_by_witness[wit.id][j] 

1154 # If this reading is lacunose in this witness, then use the missing character: 

1155 if sum(rdg_support) == 0: 

1156 sequence += missing_symbol 

1157 continue 

1158 rdg_inds = [ 

1159 k for k, w in enumerate(rdg_support) if w > 0 

1160 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1161 # For singleton readings, just print the symbol: 

1162 if len(rdg_inds) == 1: 

1163 sequence += symbols[rdg_inds[0]] 

1164 continue 

1165 # For multiple readings, print the missing symbol: 

1166 sequence += missing_symbol 

1167 f.write("%s\n" % (sequence)) 

1168 return 

1169 

1170 def get_fasta_symbols(self): 

1171 """Returns a list of one-character symbols needed to represent the states of all substantive readings in FASTA format. 

1172 

1173 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1174 

1175 Returns: 

1176 A list of individual characters representing states in readings. 

1177 """ 

1178 possible_symbols = ( 

1179 list(string.digits) + list(string.ascii_lowercase)[:22] 

1180 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported 

1181 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1182 nsymbols = 0 

1183 # If there are no witnesses, then no symbols are needed at all: 

1184 if len(self.witnesses) == 0: 

1185 return [] 

1186 wit_id = self.witnesses[0].id 

1187 for rdg_support in self.readings_by_witness[wit_id]: 

1188 nsymbols = max(nsymbols, len(rdg_support)) 

1189 fasta_symbols = possible_symbols[:nsymbols] 

1190 return fasta_symbols 

1191 

1192 def to_fasta(self, file_addr: Union[Path, str], drop_constant: bool = False): 

1193 """Writes this Collation to a file in FASTA format with the given address. 

1194 Note that because FASTA format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data. 

1195 

1196 Args: 

1197 file_addr: A string representing the path to an output file. 

1198 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1199 """ 

1200 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1201 substantive_variation_unit_ids = self.variation_unit_ids 

1202 if drop_constant: 

1203 substantive_variation_unit_ids = [ 

1204 vu_id 

1205 for vu_id in self.variation_unit_ids 

1206 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1207 ] 

1208 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1209 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1210 # Start by calculating the values we will be using here: 

1211 ntax = len(self.witnesses) 

1212 nchar = len(substantive_variation_unit_ids) 

1213 taxlabels = [] 

1214 for wit in self.witnesses: 

1215 taxlabel = wit.id 

1216 # Then replace any disallowed characters in the string with an underscore: 

1217 taxlabel = slugify(taxlabel, lowercase=False, separator='_') 

1218 taxlabels.append(taxlabel) 

1219 max_taxlabel_length = max( 

1220 [len(taxlabel) for taxlabel in taxlabels] 

1221 ) # keep track of the longest taxon label for tabular alignment purposes 

1222 missing_symbol = '?' 

1223 symbols = self.get_fasta_symbols() 

1224 # Generate all parent folders for this file that don't already exist: 

1225 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1226 with open(file_addr, "w", encoding="ascii") as f: 

1227 # Now write the matrix: 

1228 with tqdm(total=len(self.witnesses)) as pbar: 

1229 for i, wit in enumerate(self.witnesses): 

1230 taxlabel = taxlabels[i] 

1231 # Add enough space after this label ensure that all sequences are nicely aligned: 

1232 sequence = ">%s\n" % taxlabel 

1233 for j, vu_id in enumerate(self.variation_unit_ids): 

1234 if vu_id not in substantive_variation_unit_ids_set: 

1235 continue 

1236 rdg_support = self.readings_by_witness[wit.id][j] 

1237 # If this reading is lacunose in this witness, then use the missing character: 

1238 if sum(rdg_support) == 0: 

1239 sequence += missing_symbol 

1240 continue 

1241 rdg_inds = [ 

1242 k for k, w in enumerate(rdg_support) if w > 0 

1243 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1244 # For singleton readings, just print the symbol: 

1245 if len(rdg_inds) == 1: 

1246 sequence += symbols[rdg_inds[0]] 

1247 continue 

1248 # For multiple readings, print the missing symbol: 

1249 sequence += missing_symbol 

1250 f.write("%s\n" % (sequence)) 

1251 pbar.update(1) 

1252 return 

1253 

1254 def get_beast_symbols(self): 

1255 """Returns a list of one-character symbols needed to represent the states of all substantive readings in BEAST format. 

1256 

1257 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1258 

1259 Returns: 

1260 A list of individual characters representing states in readings. 

1261 """ 

1262 possible_symbols = ( 

1263 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase) 

1264 ) # NOTE: for BEAST, any number of states should theoretically be permissible, but since code maps are required for some reason, we will limit the number of symbols to 62 for now 

1265 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1266 nsymbols = 0 

1267 # If there are no witnesses, then no symbols are needed at all: 

1268 if len(self.witnesses) == 0: 

1269 return [] 

1270 wit_id = self.witnesses[0].id 

1271 for rdg_support in self.readings_by_witness[wit_id]: 

1272 nsymbols = max(nsymbols, len(rdg_support)) 

1273 beast_symbols = possible_symbols[:nsymbols] 

1274 return beast_symbols 

1275 

1276 def get_tip_date_range(self): 

1277 """Gets the minimum and maximum dates attested among the witnesses. 

1278 Also checks if the witness with the latest possible date has a fixed date 

1279 (i.e, if the lower and upper bounds for its date are the same) 

1280 and issues a warning if not, as this will cause unusual behavior in BEAST 2. 

1281 

1282 Returns: 

1283 A tuple containing the earliest and latest possible tip dates. 

1284 """ 

1285 earliest_date = None 

1286 earliest_wit = None 

1287 latest_date = None 

1288 latest_wit = None 

1289 for wit in self.witnesses: 

1290 wit_id = wit.id 

1291 date_range = wit.date_range 

1292 if date_range[0] is not None: 

1293 if earliest_date is not None: 

1294 earliest_wit = wit if date_range[0] < earliest_date else earliest_wit 

1295 earliest_date = min(date_range[0], earliest_date) 

1296 else: 

1297 earliest_wit = wit 

1298 earliest_date = date_range[0] 

1299 if date_range[1] is not None: 

1300 if latest_date is not None: 

1301 latest_wit = ( 

1302 wit 

1303 if (date_range[1] > latest_date or (date_range[0] == date_range[1] == latest_date)) 

1304 else latest_wit 

1305 ) # the second check ensures that a witness with a fixed date is preferred to a witness with a date range that ends with the same date 

1306 latest_date = max(date_range[1], latest_date) 

1307 else: 

1308 latest_wit = wit 

1309 latest_date = date_range[1] 

1310 if latest_wit.date_range[0] is None or latest_wit.date_range[0] != latest_wit.date_range[1]: 

1311 print( 

1312 "WARNING: the latest witness, %s, has a variable date range; this will result in problems with time-dependent substitution models and misalignment of trees in BEAST DensiTree outputs! Please ensure that witness %s has a fixed date." 

1313 % (latest_wit.id, latest_wit.id) 

1314 ) 

1315 return (earliest_date, latest_date) 

1316 

1317 def get_beast_origin_span(self, tip_date_range): 

1318 """Returns a tuple containing the lower and upper bounds for the height of the origin of the Birth-Death Skyline model. 

1319 The upper bound on the height of the tree is the difference between the latest tip date 

1320 and the lower bound on the date of the original work, if both are defined; 

1321 otherwise, it is left undefined. 

1322 The lower bound on the height of the tree is the difference between the latest tip date 

1323 and the upper bound on the date of the original work, if both are defined; 

1324 otherwise, it is the difference between the earliest tip date and the latest, if both are defined. 

1325 

1326 Args: 

1327 tip_date_range: A tuple containing the earliest and latest possible tip dates. 

1328 

1329 Returns: 

1330 A tuple containing lower and upper bounds on the origin height for the Birth-Death Skyline model. 

1331 """ 

1332 origin_span = [0, None] 

1333 # If the upper bound on the date of the work's composition is defined, then set the lower bound on the height of the origin using it and the latest tip date 

1334 # (note that if it had to be defined in terms of witness date lower bounds, then this would have happened already): 

1335 if self.origin_date_range[1] is not None: 

1336 origin_span[0] = tip_date_range[1] - self.origin_date_range[1] 

1337 # If the lower bound on the date of the work's composition is defined, then set the upper bound on the height of the origin using it and the latest tip date: 

1338 if self.origin_date_range[0] is not None: 

1339 origin_span[1] = tip_date_range[1] - self.origin_date_range[0] 

1340 return tuple(origin_span) 

1341 

1342 def get_beast_date_map(self, taxlabels): 

1343 """Returns a string representing witness-to-date mappings in BEAST format. 

1344 

1345 Since this format requires single dates as opposed to date ranges, 

1346 witnesses with closed date ranges will be mapped to the average of their lower and upper bounds, 

1347 and witnesses with open date ranges will not be mapped. 

1348 

1349 Args: 

1350 taxlabels: A list of slugified taxon labels. 

1351 

1352 Returns: 

1353 A string containing comma-separated date calibrations of the form witness_id=date. 

1354 """ 

1355 calibrate_strings = [] 

1356 for i, wit in enumerate(self.witnesses): 

1357 taxlabel = taxlabels[i] 

1358 date_range = wit.date_range 

1359 # If either end of this witness's date range is empty, then do not include it: 

1360 if date_range[0] is None or date_range[1] is None: 

1361 continue 

1362 # Otherwise, take the midpoint of its date range as its date: 

1363 date = int((date_range[0] + date_range[1]) / 2) 

1364 calibrate_string = "%s=%d" % (taxlabel, date) 

1365 calibrate_strings.append(calibrate_string) 

1366 # Then output the full date map string: 

1367 date_map = ",".join(calibrate_strings) 

1368 return date_map 

1369 

1370 def get_beast_code_map_for_unit(self, symbols, missing_symbol, vu_ind): 

1371 """Returns a string containing state/reading code mappings in BEAST format using the given single-state and missing state symbols for the character/variation unit at the given index. 

1372 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a code for a dummy state will be included. 

1373 

1374 Args: 

1375 vu_ind: An integer index for the desired unit. 

1376 

1377 Returns: 

1378 A string containing comma-separated code mappings. 

1379 """ 

1380 vu = self.variation_units[vu_ind] 

1381 vu_id = vu.id 

1382 code_map = {} 

1383 for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id])): 

1384 code_map[symbols[k]] = str(k) 

1385 # If this site is a singleton site, then add a code mapping for the dummy state: 

1386 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1387 code_map[symbols[1]] = str(1) 

1388 # Then add a mapping for the missing state, including a dummy state if this is a singleton site: 

1389 code_map[missing_symbol] = " ".join( 

1390 str(k) for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id])) 

1391 ) 

1392 # If this site is a singleton site, then add the dummy state to the missing state mapping: 

1393 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1394 code_map[missing_symbol] = code_map[missing_symbol] + " " + str(1) 

1395 # Then combine all of the mappings into a single string: 

1396 code_map_string = ", ".join([code + "=" + code_map[code] for code in code_map]) 

1397 return code_map_string 

1398 

1399 def get_beast_equilibrium_frequencies_for_unit(self, vu_ind): 

1400 """Returns a string containing state/reading equilibrium frequencies in BEAST format for the character/variation unit at the given index. 

1401 Since the equilibrium frequencies are not used with the substitution models, the equilibrium frequencies simply correspond to a uniform distribution over the states. 

1402 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then an equilibrium frequency of 0 will be added for a dummy state. 

1403 

1404 Args: 

1405 vu_ind: An integer index for the desired unit. 

1406 

1407 Returns: 

1408 A string containing space-separated equilibrium frequencies. 

1409 """ 

1410 vu = self.variation_units[vu_ind] 

1411 vu_id = vu.id 

1412 # If this unit is a singleton, then return the string "0.5 0.5": 

1413 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1: 

1414 return "0.5 0.5" 

1415 # Otherwise, set the equilibrium frequencies according to a uniform distribution: 

1416 equilibrium_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len( 

1417 self.substantive_readings_by_variation_unit_id[vu_id] 

1418 ) 

1419 equilibrium_frequencies_string = " ".join([str(w) for w in equilibrium_frequencies]) 

1420 return equilibrium_frequencies_string 

1421 

1422 def get_beast_root_frequencies_for_unit(self, vu_ind): 

1423 """Returns a string containing state/reading root frequencies in BEAST format for the character/variation unit at the given index. 

1424 The root frequencies are calculated from the intrinsic odds at this unit. 

1425 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a root frequency of 0 will be added for a dummy state. 

1426 If no intrinsic odds are specified, then a uniform distribution over all states is assumed. 

1427 

1428 Args: 

1429 vu_ind: An integer index for the desired unit. 

1430 

1431 Returns: 

1432 A string containing space-separated root frequencies. 

1433 """ 

1434 vu = self.variation_units[vu_ind] 

1435 vu_id = vu.id 

1436 intrinsic_relations = vu.intrinsic_relations 

1437 intrinsic_odds_by_id = self.intrinsic_odds_by_id 

1438 # If this unit is a singleton, then return the string "1 0": 

1439 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1: 

1440 return "1 0" 

1441 # If this unit has no intrinsic odds, then assume a uniform distribution over all readings: 

1442 if len(intrinsic_relations) == 0: 

1443 root_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len( 

1444 self.substantive_readings_by_variation_unit_id[vu_id] 

1445 ) 

1446 root_frequencies_string = " ".join([str(w) for w in root_frequencies]) 

1447 return root_frequencies_string 

1448 # We will populate the root frequencies based on the intrinsic odds of the readings: 

1449 root_frequencies_by_id = {} 

1450 for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id]: 

1451 root_frequencies_by_id[rdg_id] = 0 

1452 # First, construct an adjacency list for efficient edge iteration: 

1453 neighbors_by_source = {} 

1454 for edge in intrinsic_relations: 

1455 s = edge[0] 

1456 t = edge[1] 

1457 if s not in neighbors_by_source: 

1458 neighbors_by_source[s] = [] 

1459 if t not in neighbors_by_source: 

1460 neighbors_by_source[t] = [] 

1461 neighbors_by_source[s].append(t) 

1462 # Next, identify all readings that are not targeted by any intrinsic odds relation: 

1463 in_degree_by_reading = {} 

1464 for edge in intrinsic_relations: 

1465 s = edge[0] 

1466 t = edge[1] 

1467 if s not in in_degree_by_reading: 

1468 in_degree_by_reading[s] = 0 

1469 if t not in in_degree_by_reading: 

1470 in_degree_by_reading[t] = 0 

1471 in_degree_by_reading[t] += 1 

1472 starting_nodes = [t for t in in_degree_by_reading if in_degree_by_reading[t] == 0] 

1473 # Set the root frequencies for these readings to 1 (they will be normalized later): 

1474 for starting_node in starting_nodes: 

1475 root_frequencies_by_id[starting_node] = 1.0 

1476 # Next, set the frequencies for the remaining readings recursively using the adjacency list: 

1477 def update_root_frequencies(s): 

1478 for t in neighbors_by_source[s]: 

1479 intrinsic_category = intrinsic_relations[(s, t)] 

1480 odds = ( 

1481 intrinsic_odds_by_id[intrinsic_category] 

1482 if intrinsic_odds_by_id[intrinsic_category] is not None 

1483 else 1.0 

1484 ) # TODO: This needs to be handled using parameters once we have it implemented in BEAST 

1485 root_frequencies_by_id[t] = root_frequencies_by_id[s] / odds 

1486 update_root_frequencies(t) 

1487 return 

1488 

1489 for starting_node in starting_nodes: 

1490 update_root_frequencies(starting_node) 

1491 # Then produce a normalized vector of root frequencies that corresponds to a probability distribution: 

1492 root_frequencies = [ 

1493 root_frequencies_by_id[rdg_id] for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id] 

1494 ] 

1495 total_frequencies = sum(root_frequencies) 

1496 for k in range(len(root_frequencies)): 

1497 root_frequencies[k] = root_frequencies[k] / total_frequencies 

1498 root_frequencies_string = " ".join([str(w) for w in root_frequencies]) 

1499 return root_frequencies_string 

1500 

1501 def to_beast( 

1502 self, 

1503 file_addr: Union[Path, str], 

1504 drop_constant: bool = False, 

1505 clock_model: ClockModel = ClockModel.strict, 

1506 ancestral_logger: AncestralLogger = AncestralLogger.state, 

1507 seed: int = None, 

1508 ): 

1509 """Writes this Collation to a file in BEAST format with the given address. 

1510 

1511 Args: 

1512 file_addr: A string representing the path to an output file. 

1513 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1514 clock_model: A ClockModel option indicating which clock model to use. 

1515 ancestral_logger: An AncestralLogger option indicating which class of logger (if any) to use for ancestral states. 

1516 seed: A seed for random number generation (for setting initial values of unspecified transcriptional rates). 

1517 """ 

1518 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1519 substantive_variation_unit_ids = self.variation_unit_ids 

1520 if drop_constant: 

1521 substantive_variation_unit_ids = [ 

1522 vu_id 

1523 for vu_id in self.variation_unit_ids 

1524 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1525 ] 

1526 # Populate sets of substantive variation unit IDs and substantive variant reading tuples: 

1527 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1528 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1529 # First, calculate the values we will be using for the main template: 

1530 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses] 

1531 missing_symbol = '?' 

1532 symbols = self.get_beast_symbols() 

1533 tip_date_range = self.get_tip_date_range() 

1534 origin_span = self.get_beast_origin_span(tip_date_range) 

1535 date_map = self.get_beast_date_map(taxlabels) 

1536 # Then populate the necessary objects for the BEAST XML Jinja template: 

1537 witness_objects = [] 

1538 variation_unit_objects = [] 

1539 intrinsic_category_objects = [] 

1540 transcriptional_category_objects = [] 

1541 with tqdm(total=len(self.witnesses)) as pbar: 

1542 # Start with witnesses: 

1543 for i, wit in enumerate(self.witnesses): 

1544 witness_object = {} 

1545 # Copy the ID for this witness: 

1546 witness_object["id"] = wit.id 

1547 # Copy its date bounds: 

1548 witness_object["min_date"] = wit.date_range[0] 

1549 witness_object["max_date"] = wit.date_range[1] 

1550 # Populate its sequence from its entries in the witness's readings dictionary: 

1551 sequence = "" 

1552 for j, rdg_support in enumerate(self.readings_by_witness[wit.id]): 

1553 vu_id = self.variation_unit_ids[j] 

1554 # Skip any variation units deemed non-substantive: 

1555 if vu_id not in substantive_variation_unit_ids: 

1556 continue 

1557 # If this witness has a certainty of 0 for all readings, then it is a gap; assign a likelihood of 1 to each reading: 

1558 if sum(rdg_support) == 0: 

1559 for k, w in enumerate(rdg_support): 

1560 sequence += "1" 

1561 if k < len(rdg_support) - 1: 

1562 sequence += ", " 

1563 else: 

1564 if len(rdg_support) > 1: 

1565 sequence += "; " 

1566 else: 

1567 # If this site is a singleton site, then add a dummy state: 

1568 sequence += ", 0; " 

1569 # Otherwise, read the probabilities as they are given: 

1570 else: 

1571 for k, w in enumerate(rdg_support): 

1572 sequence += str(w) 

1573 if k < len(rdg_support) - 1: 

1574 sequence += ", " 

1575 else: 

1576 if len(rdg_support) > 1: 

1577 sequence += "; " 

1578 else: 

1579 # If this site is a singleton site, then add a dummy state: 

1580 sequence += ", 0; " 

1581 # Strip the final semicolon and space from the sequence: 

1582 sequence = sequence.strip("; ") 

1583 # Then set the witness object's sequence attribute to this string: 

1584 witness_object["sequence"] = sequence 

1585 witness_objects.append(witness_object) 

1586 pbar.update(1) 

1587 # Then proceed to variation units: 

1588 for j, vu in enumerate(self.variation_units): 

1589 if vu.id not in substantive_variation_unit_ids_set: 

1590 continue 

1591 variation_unit_object = {} 

1592 # Copy the one-based index of this variation unit: 

1593 variation_unit_object["index"] = j + 1 

1594 # Copy the ID of this variation unit: 

1595 variation_unit_object["id"] = vu.id 

1596 # Set a flag indicating if this variation unit is constant: 

1597 variation_unit_object["is_constant"] = ( 

1598 True if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1 else False 

1599 ) 

1600 # Copy this variation unit's number of substantive readings, 

1601 # setting it to 2 if it is a singleton unit: 

1602 variation_unit_object["nstates"] = ( 

1603 len(self.substantive_readings_by_variation_unit_id[vu.id]) 

1604 if len(self.substantive_readings_by_variation_unit_id[vu.id]) > 1 

1605 else 2 

1606 ) 

1607 # Then construct the code map for this unit: 

1608 variation_unit_object["code_map"] = self.get_beast_code_map_for_unit(symbols, missing_symbol, j) 

1609 # Then populate a comma-separated string of reading labels for this unit: 

1610 rdg_texts = [] 

1611 vu_label = vu.id 

1612 for rdg in vu.readings: 

1613 key = tuple([vu.id, rdg.id]) 

1614 if key not in substantive_variation_unit_reading_tuples_set: 

1615 continue 

1616 rdg_text = slugify(rdg.text, lowercase=False, allow_unicode=True, separator='_') 

1617 # Replace any empty reading text with an omission marker: 

1618 if rdg_text == "": 

1619 rdg_text = "om." 

1620 rdg_texts.append(rdg_text) 

1621 # If this site is a singleton site, then add a dummy reading for the dummy state: 

1622 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1623 rdg_texts.append("DUMMY") 

1624 rdg_texts_string = ", ".join(rdg_texts) 

1625 variation_unit_object["rdg_texts"] = rdg_texts_string 

1626 # Then populate this unit's equilibrium frequency string and its root frequency string: 

1627 variation_unit_object["equilibrium_frequencies"] = self.get_beast_equilibrium_frequencies_for_unit(j) 

1628 variation_unit_object["root_frequencies"] = self.get_beast_root_frequencies_for_unit(j) 

1629 # Then populate a dictionary mapping epoch height ranges to lists of off-diagonal entries for substitution models: 

1630 rate_objects_by_epoch_height_range = {} 

1631 epoch_height_ranges = [] 

1632 # Then proceed based on whether the transcriptional relations for this variation unit have been defined: 

1633 if len(vu.transcriptional_relations_by_date_range) == 0: 

1634 # If there are no transcriptional relations, then map the epoch range of (None, None) to their list of off-diagonal entries: 

1635 epoch_height_ranges.append((None, None)) 

1636 rate_objects_by_epoch_height_range[(None, None)] = [] 

1637 rate_objects = rate_objects_by_epoch_height_range[(None, None)] 

1638 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1639 # If this is a singleton site, then use an arbitrary 2x2 rate matrix: 

1640 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1641 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1642 else: 

1643 # If this is a site with multiple substantive readings, but no transcriptional relations list, 

1644 # then use a Lewis Mk substitution matrix with the appropriate number of states: 

1645 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1646 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1647 # Skip diagonal elements: 

1648 if k_1 == k_2: 

1649 continue 

1650 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1651 else: 

1652 # Otherwise, proceed for every date range: 

1653 for date_range in vu.transcriptional_relations_by_date_range: 

1654 # Get the map of transcriptional relations for reference later: 

1655 transcriptional_relations = vu.transcriptional_relations_by_date_range[date_range] 

1656 # Now get the epoch height range corresponding to this date range, and initialize its list in the dictionary: 

1657 epoch_height_range = [None, None] 

1658 epoch_height_range[0] = tip_date_range[1] - date_range[1] if date_range[1] is not None else None 

1659 epoch_height_range[1] = tip_date_range[1] - date_range[0] if date_range[0] is not None else None 

1660 epoch_height_range = tuple(epoch_height_range) 

1661 epoch_height_ranges.append(epoch_height_range) 

1662 rate_objects_by_epoch_height_range[epoch_height_range] = [] 

1663 rate_objects = rate_objects_by_epoch_height_range[epoch_height_range] 

1664 # Then proceed for every pair of readings in this unit: 

1665 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1666 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1667 # Skip diagonal elements: 

1668 if k_1 == k_2: 

1669 continue 

1670 # If the first reading has no transcriptional relation to the second in this unit, then use the default rate: 

1671 if (rdg_id_1, rdg_id_2) not in transcriptional_relations: 

1672 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1673 continue 

1674 # Otherwise, if only one category of transcriptional relations holds between the first and second readings, 

1675 # then use its rate: 

1676 if len(transcriptional_relations[(rdg_id_1, rdg_id_2)]) == 1: 

1677 # If there is only one such category, then add its rate as a standalone var element: 

1678 transcriptional_category = list(transcriptional_relations[(rdg_id_1, rdg_id_2)])[0] 

1679 rate_objects.append( 

1680 {"transcriptional_categories": [transcriptional_category], "expression": None} 

1681 ) 

1682 continue 

1683 # If there is more than one, then add a var element that is a sum of the individual categories' rates: 

1684 transcriptional_categories = list(transcriptional_relations[(rdg_id_1, rdg_id_2)]) 

1685 args = [] 

1686 for transcriptional_category in transcriptional_categories: 

1687 args.append("%s_rate" % transcriptional_category) 

1688 args_string = " ".join(args) 

1689 ops = ["+"] * (len(args) - 1) 

1690 ops_string = " ".join(ops) 

1691 expression_string = " ".join([args_string, ops_string]) 

1692 rate_objects.append( 

1693 { 

1694 "transcriptional_categories": transcriptional_categories, 

1695 "expression": expression_string, 

1696 } 

1697 ) 

1698 # Now reorder the list of epoch height ranges, and get a list of non-null epoch dates in ascending order from the dictionary: 

1699 epoch_height_ranges.reverse() 

1700 epoch_heights = [ 

1701 epoch_height_range[0] for epoch_height_range in epoch_height_ranges if epoch_height_range[0] is not None 

1702 ] 

1703 # Then add all of these data structures to the variation unit object: 

1704 variation_unit_object["epoch_heights"] = epoch_heights 

1705 variation_unit_object["epoch_heights_string"] = " ".join( 

1706 [str(epoch_height) for epoch_height in epoch_heights] 

1707 ) 

1708 variation_unit_object["epoch_height_ranges"] = epoch_height_ranges 

1709 variation_unit_object["epoch_rates"] = [ 

1710 rate_objects_by_epoch_height_range[epoch_height_range] for epoch_height_range in epoch_height_ranges 

1711 ] 

1712 variation_unit_objects.append(variation_unit_object) 

1713 # Then proceed to intrinsic odds categories: 

1714 for intrinsic_category in self.intrinsic_categories: 

1715 intrinsic_category_object = {} 

1716 # Copy the ID of this intrinsic category: 

1717 intrinsic_category_object["id"] = intrinsic_category 

1718 # Then copy the odds factors associated with this intrinsic category, 

1719 # setting it to 1.0 if it is not specified and setting the estimate attribute accordingly: 

1720 odds = self.intrinsic_odds_by_id[intrinsic_category] 

1721 intrinsic_category_object["odds"] = odds if odds is not None else 1.0 

1722 intrinsic_category_object["estimate"] = "false" if odds is not None else "true" 

1723 intrinsic_category_objects.append(intrinsic_category_object) 

1724 # Then proceed to transcriptional rate categories: 

1725 rng = np.random.default_rng(seed) 

1726 for transcriptional_category in self.transcriptional_categories: 

1727 transcriptional_category_object = {} 

1728 # Copy the ID of this transcriptional category: 

1729 transcriptional_category_object["id"] = transcriptional_category 

1730 # Then copy the rate of this transcriptional category, 

1731 # setting it to a random number sampled from a Gamma distribution if it is not specified and setting the estimate attribute accordingly: 

1732 rate = self.transcriptional_rates_by_id[transcriptional_category] 

1733 transcriptional_category_object["rate"] = rate if rate is not None else rng.gamma(5.0, 2.0) 

1734 transcriptional_category_object["estimate"] = "false" if rate is not None else "true" 

1735 transcriptional_category_objects.append(transcriptional_category_object) 

1736 # Now render the output XML file using the Jinja template: 

1737 env = Environment(loader=PackageLoader("teiphy", "templates"), autoescape=select_autoescape()) 

1738 template = env.get_template("beast_template.xml") 

1739 rendered = template.render( 

1740 nsymbols=len(symbols), 

1741 date_map=date_map, 

1742 origin_span=origin_span, 

1743 clock_model=clock_model.value, 

1744 clock_rate_categories=2 * len(self.witnesses) - 1, 

1745 ancestral_logger=ancestral_logger.value, 

1746 witnesses=witness_objects, 

1747 variation_units=variation_unit_objects, 

1748 non_constant_variation_units=[ 

1749 variation_unit_object 

1750 for variation_unit_object in variation_unit_objects 

1751 if not variation_unit_object["is_constant"] 

1752 ], 

1753 constant_variation_unit_filter=",".join( 

1754 [ 

1755 str(variation_unit_object["index"]) 

1756 for variation_unit_object in variation_unit_objects 

1757 if variation_unit_object["is_constant"] 

1758 ] 

1759 ), 

1760 intrinsic_categories=intrinsic_category_objects, 

1761 transcriptional_categories=transcriptional_category_objects, 

1762 ) 

1763 # Generate all parent folders for this file that don't already exist: 

1764 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1765 with open(file_addr, "w", encoding="utf-8") as f: 

1766 f.write(rendered) 

1767 return 

1768 

1769 def to_numpy(self, drop_constant: bool = False, split_missing: SplitMissingType = None): 

1770 """Returns this Collation in the form of a NumPy array, along with arrays of its row and column labels. 

1771 

1772 Args: 

1773 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1774 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

1775 If not specified, then missing data is ignored (i.e., all states are 0). 

1776 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

1777 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

1778 

1779 Returns: 

1780 A NumPy array with a row for each substantive reading and a column for each witness. 

1781 A list of substantive reading ID strings. 

1782 A list of witness ID strings. 

1783 """ 

1784 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1785 substantive_variation_unit_ids = self.variation_unit_ids 

1786 if drop_constant: 

1787 substantive_variation_unit_ids = [ 

1788 vu_id 

1789 for vu_id in self.variation_unit_ids 

1790 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1791 ] 

1792 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1793 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1794 # Initialize the output array with the appropriate dimensions: 

1795 reading_labels = [] 

1796 for vu in self.variation_units: 

1797 if vu.id not in substantive_variation_unit_ids_set: 

1798 continue 

1799 for rdg in vu.readings: 

1800 key = tuple([vu.id, rdg.id]) 

1801 if key in substantive_variation_unit_reading_tuples_set: 

1802 reading_labels.append(vu.id + ", " + rdg.text) 

1803 witness_labels = [wit.id for wit in self.witnesses] 

1804 matrix = np.zeros((len(reading_labels), len(witness_labels)), dtype=float) 

1805 # For each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings: 

1806 support_proportions_by_unit = {} 

1807 for j, vu_id in enumerate(self.variation_unit_ids): 

1808 if vu_id not in substantive_variation_unit_ids_set: 

1809 continue 

1810 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

1811 for i, wit in enumerate(self.witnesses): 

1812 rdg_support = self.readings_by_witness[wit.id][j] 

1813 for l, w in enumerate(rdg_support): 

1814 support_proportions[l] += w 

1815 norm = ( 

1816 sum(support_proportions) if sum(support_proportions) > 0 else 1.0 

1817 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

1818 for l in range(len(support_proportions)): 

1819 support_proportions[l] = support_proportions[l] / norm 

1820 support_proportions_by_unit[vu_id] = support_proportions 

1821 # Then populate it with the appropriate values: 

1822 col_ind = 0 

1823 with tqdm(total=len(self.witnesses)) as pbar: 

1824 for i, wit in enumerate(self.witnesses): 

1825 row_ind = 0 

1826 for j, vu_id in enumerate(self.variation_unit_ids): 

1827 if vu_id not in substantive_variation_unit_ids_set: 

1828 continue 

1829 rdg_support = self.readings_by_witness[wit.id][j] 

1830 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

1831 if sum(rdg_support) == 0: 

1832 if split_missing == SplitMissingType.uniform: 

1833 for l in range(len(rdg_support)): 

1834 matrix[row_ind, col_ind] = 1 / len(rdg_support) 

1835 row_ind += 1 

1836 elif split_missing == SplitMissingType.proportional: 

1837 for l in range(len(rdg_support)): 

1838 matrix[row_ind, col_ind] = support_proportions_by_unit[vu_id][l] 

1839 row_ind += 1 

1840 else: 

1841 row_ind += len(rdg_support) 

1842 # Otherwise, add its coefficients normally: 

1843 else: 

1844 for l in range(len(rdg_support)): 

1845 matrix[row_ind, col_ind] = rdg_support[l] 

1846 row_ind += 1 

1847 col_ind += 1 

1848 pbar.update(1) 

1849 return matrix, reading_labels, witness_labels 

1850 

1851 def to_distance_matrix(self, drop_constant: bool = False, proportion: bool = False, show_ext: bool = False): 

1852 """Transforms this Collation into a NumPy distance matrix between witnesses, along with an array of its labels for the witnesses. 

1853 Distances can be computed either as counts of disagreements (the default setting), or as proportions of disagreements over all variation units where both witnesses have singleton readings. 

1854 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of disagreements. 

1855 

1856 Args: 

1857 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1858 Default value is False. 

1859 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

1860 Default value is False. 

1861 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

1862 should include the number of their extant, unambiguous variation units after the number of their disagreements. 

1863 Default value is False. 

1864 

1865 Returns: 

1866 A NumPy distance matrix with a row and column for each witness. 

1867 A list of witness ID strings. 

1868 """ 

1869 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1870 substantive_variation_unit_ids = self.variation_unit_ids 

1871 if drop_constant: 

1872 substantive_variation_unit_ids = [ 

1873 vu_id 

1874 for vu_id in self.variation_unit_ids 

1875 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1876 ] 

1877 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1878 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1879 # Initialize the output array with the appropriate dimensions: 

1880 witness_labels = [wit.id for wit in self.witnesses] 

1881 # The type of the matrix will depend on the input options: 

1882 matrix = None 

1883 if show_ext: 

1884 matrix = np.full( 

1885 (len(witness_labels), len(witness_labels)), "NA", dtype=object 

1886 ) # strings of the form "disagreements/extant" 

1887 elif proportion: 

1888 matrix = np.full( 

1889 (len(witness_labels), len(witness_labels)), 0.0, dtype=float 

1890 ) # floats of the form disagreements/extant 

1891 else: 

1892 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form disagreements 

1893 with tqdm(total=len(self.witnesses) ** 2) as pbar: 

1894 for i, wit_1 in enumerate(witness_labels): 

1895 for j, wit_2 in enumerate(witness_labels): 

1896 extant_units = 0 

1897 disagreements = 0 

1898 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified, 

1899 # so we only have to calculate it once: 

1900 if i > j: 

1901 pbar.update(1) 

1902 continue 

1903 # Otherwise, calculate the number of units where both witnesses have unambiguous readings 

1904 # and the number of units where they disagree: 

1905 for k, vu_id in enumerate(self.variation_unit_ids): 

1906 if vu_id not in substantive_variation_unit_ids_set: 

1907 continue 

1908 wit_1_rdg_support = self.readings_by_witness[wit_1][k] 

1909 wit_2_rdg_support = self.readings_by_witness[wit_2][k] 

1910 wit_1_rdg_inds = [l for l, w in enumerate(wit_1_rdg_support) if w > 0] 

1911 wit_2_rdg_inds = [l for l, w in enumerate(wit_2_rdg_support) if w > 0] 

1912 if len(wit_1_rdg_inds) != 1 or len(wit_2_rdg_inds) != 1: 

1913 continue 

1914 extant_units += 1 

1915 if wit_1_rdg_inds[0] != wit_2_rdg_inds[0]: 

1916 disagreements += 1 

1917 cell_entry = None 

1918 if proportion: 

1919 cell_entry = disagreements / max( 

1920 extant_units, 1 

1921 ) # the max in the denominator is to prevent division by 0; the distance entry will be 0 if the two witnesses have no overlap 

1922 else: 

1923 cell_entry = disagreements 

1924 if show_ext: 

1925 cell_entry = str(cell_entry) + "/" + str(extant_units) 

1926 matrix[i, j] = cell_entry 

1927 matrix[j, i] = cell_entry 

1928 pbar.update(1) 

1929 return matrix, witness_labels 

1930 

1931 def to_similarity_matrix(self, drop_constant: bool = False, proportion: bool = False, show_ext: bool = False): 

1932 """Transforms this Collation into a NumPy similarity matrix between witnesses, along with an array of its labels for the witnesses. 

1933 Similarities can be computed either as counts of agreements (the default setting), or as proportions of agreements over all variation units where both witnesses have singleton readings. 

1934 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of agreements. 

1935 

1936 Args: 

1937 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1938 Default value is False. 

1939 proportion (bool, optional): An optional flag indicating whether or not to calculate similarities as proportions over extant, unambiguous variation units. 

1940 Default value is False. 

1941 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

1942 should include the number of their extant, unambiguous variation units after the number of agreements. 

1943 Default value is False. 

1944 

1945 Returns: 

1946 A NumPy agreement matrix with a row and column for each witness. 

1947 A list of witness ID strings. 

1948 """ 

1949 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1950 substantive_variation_unit_ids = self.variation_unit_ids 

1951 if drop_constant: 

1952 substantive_variation_unit_ids = [ 

1953 vu_id 

1954 for vu_id in self.variation_unit_ids 

1955 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1956 ] 

1957 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1958 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1959 # Initialize the output array with the appropriate dimensions: 

1960 witness_labels = [wit.id for wit in self.witnesses] 

1961 # The type of the matrix will depend on the input options: 

1962 matrix = None 

1963 if show_ext: 

1964 matrix = np.full( 

1965 (len(witness_labels), len(witness_labels)), "NA", dtype=object 

1966 ) # strings of the form "agreements/extant" 

1967 elif proportion: 

1968 matrix = np.full( 

1969 (len(witness_labels), len(witness_labels)), 0.0, dtype=float 

1970 ) # floats of the form agreements/extant 

1971 else: 

1972 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form agreements 

1973 with tqdm(total=len(self.witnesses) ** 2) as pbar: 

1974 for i, wit_1 in enumerate(witness_labels): 

1975 for j, wit_2 in enumerate(witness_labels): 

1976 extant_units = 0 

1977 agreements = 0 

1978 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified, 

1979 # so we only have to calculate it once: 

1980 if i > j: 

1981 pbar.update(1) 

1982 continue 

1983 # Otherwise, calculate the number of units where both witnesses have unambiguous readings 

1984 # and the number of units where they agree: 

1985 for k, vu_id in enumerate(self.variation_unit_ids): 

1986 if vu_id not in substantive_variation_unit_ids_set: 

1987 continue 

1988 wit_1_rdg_support = self.readings_by_witness[wit_1][k] 

1989 wit_2_rdg_support = self.readings_by_witness[wit_2][k] 

1990 wit_1_rdg_inds = [l for l, w in enumerate(wit_1_rdg_support) if w > 0] 

1991 wit_2_rdg_inds = [l for l, w in enumerate(wit_2_rdg_support) if w > 0] 

1992 if len(wit_1_rdg_inds) != 1 or len(wit_2_rdg_inds) != 1: 

1993 continue 

1994 extant_units += 1 

1995 if wit_1_rdg_inds[0] == wit_2_rdg_inds[0]: 

1996 agreements += 1 

1997 cell_entry = None 

1998 if proportion: 

1999 cell_entry = agreements / max( 

2000 extant_units, 1 

2001 ) # the max in the denominator is to prevent division by 0; the distance entry will be 0 if the two witnesses have no overlap 

2002 else: 

2003 cell_entry = agreements 

2004 if show_ext: 

2005 cell_entry = str(cell_entry) + "/" + str(extant_units) 

2006 matrix[i, j] = cell_entry 

2007 matrix[j, i] = cell_entry 

2008 pbar.update(1) 

2009 return matrix, witness_labels 

2010 

2011 def to_idf_matrix(self, drop_constant: bool = False, split_missing: SplitMissingType = None): 

2012 """Transforms this Collation into a NumPy matrix of agreements between witnesses weighted by inverse document frequency (IDF), along with an array of its labels for the witnesses. 

2013 The IDF weight of an agreement on a given reading is the information content -log(Pr(R)) of the event R of randomly sampling a witness with that reading. 

2014 The IDF-weighted agreement score for two witnesses is the sum of the IDF weights for the readings at which they agree. 

2015 Where any witness is ambiguous, it contributes to its potential readings' sampling probabilities in proportion to its degrees of support for those readings. 

2016 Similarly, where one or both target witnesses are ambiguous, the expected information content of their agreement is calculated based on the probabilities of their having the same reading. 

2017 If a split_missing argument is supplied, then lacunae are handled in the same way. 

2018 

2019 Args: 

2020 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2021 Default value is False. 

2022 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2023 If not specified, then missing data is ignored (i.e., all states are 0). 

2024 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2025 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2026 

2027 Returns: 

2028 A NumPy IDF-weighted agreement matrix with a row and column for each witness. 

2029 A list of witness ID strings. 

2030 """ 

2031 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2032 substantive_variation_unit_ids = self.variation_unit_ids 

2033 if drop_constant: 

2034 substantive_variation_unit_ids = [ 

2035 vu_id 

2036 for vu_id in self.variation_unit_ids 

2037 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2038 ] 

2039 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2040 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2041 # Initialize the output array with the appropriate dimensions: 

2042 witness_labels = [wit.id for wit in self.witnesses] 

2043 # For each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings: 

2044 support_proportions_by_unit = {} 

2045 for k, vu_id in enumerate(self.variation_unit_ids): 

2046 # Skip this variation unit if it is a dropped constant site: 

2047 if vu_id not in substantive_variation_unit_ids_set: 

2048 continue 

2049 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2050 for i, wit in enumerate(witness_labels): 

2051 rdg_support = self.readings_by_witness[wit][k] 

2052 for l, w in enumerate(rdg_support): 

2053 support_proportions[l] += w 

2054 norm = ( 

2055 sum(support_proportions) if sum(support_proportions) > 0 else 1.0 

2056 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2057 for l in range(len(support_proportions)): 

2058 support_proportions[l] = support_proportions[l] / norm 

2059 support_proportions_by_unit[vu_id] = support_proportions 

2060 # Then populate data structures mapping each variation unit's ID to normalized reading support dictionaries 

2061 # and vectors of sampling probabilities for its substantive readings: 

2062 normalized_reading_support_dicts_by_vu_id = {} 

2063 sampling_probabilities_by_vu_id = {} 

2064 for k, vu_id in enumerate(self.variation_unit_ids): 

2065 # Skip this variation unit if it is a dropped constant site: 

2066 if vu_id not in substantive_variation_unit_ids_set: 

2067 continue 

2068 # Otherwise, populate normalized reading support vector dictionaries and sampling probability vectors in this unit: 

2069 normalized_reading_support_by_wit = {} 

2070 sampling_probabilities = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2071 for i, wit in enumerate(witness_labels): 

2072 rdg_support = self.readings_by_witness[wit][k] 

2073 # Check if this reading support vector represents missing data: 

2074 norm = sum(rdg_support) 

2075 if norm == 0: 

2076 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

2077 if split_missing == SplitMissingType.uniform: 

2078 rdg_support = [1 / len(rdg_support) for l in range(len(rdg_support))] 

2079 elif split_missing == SplitMissingType.proportional: 

2080 rdg_support = [support_proportions_by_unit[vu_id][l] for l in range(len(rdg_support))] 

2081 else: 

2082 # Otherwise, the data is present, though it may be ambiguous; normalize the reading probabilities to sum to 1: 

2083 rdg_support = [w / norm for l, w in enumerate(rdg_support)] 

2084 normalized_reading_support_by_wit[wit] = rdg_support 

2085 # Then add this witness's contributions to the readings' sampling probabilities: 

2086 for l, w in enumerate(normalized_reading_support_by_wit[wit]): 

2087 sampling_probabilities[l] += w 

2088 norm = ( 

2089 sum(sampling_probabilities) if sum(sampling_probabilities) > 0 else 1.0 

2090 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2091 # Otherwise, normalize the sampling probabilities so they sum to 1: 

2092 sampling_probabilities = [w / norm for w in sampling_probabilities] 

2093 normalized_reading_support_dicts_by_vu_id[vu_id] = normalized_reading_support_by_wit 

2094 sampling_probabilities_by_vu_id[vu_id] = sampling_probabilities 

2095 # Then populate the matrix with the total expected information content for agreements between each pair of witnesses: 

2096 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float) 

2097 with tqdm(total=len(self.witnesses) ** 2) as pbar: 

2098 for i, wit_1 in enumerate(witness_labels): 

2099 for j, wit_2 in enumerate(witness_labels): 

2100 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified, 

2101 # so we only have to calculate it once: 

2102 if i > j: 

2103 pbar.update(1) 

2104 continue 

2105 # Otherwise, calculate the expected information content of agreements between these witnesses in each substantive variation unit 

2106 # based on the sampling probabilities of the substantive readings in the unit: 

2107 for k, vu_id in enumerate(self.variation_unit_ids): 

2108 if vu_id not in substantive_variation_unit_ids_set: 

2109 continue 

2110 wit_1_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_1] 

2111 wit_2_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_2] 

2112 sampling_probabilities = sampling_probabilities_by_vu_id[vu_id] 

2113 # First, calculate the probability that these two witnesses agree: 

2114 probability_of_agreement = sum( 

2115 [wit_1_rdg_support[l] * wit_2_rdg_support[l] for l in range(len(sampling_probabilities))] 

2116 ) 

2117 # If these witnesses do not agree at this variation unit, then this unit contributes nothing to their total score: 

2118 if probability_of_agreement == 0.0: 

2119 continue 

2120 # Otherwise, calculate the expected information content (in bits) of their agreement given their agreement on that reading 

2121 # (skipping readings with a sampling probability of 0): 

2122 expected_information_content = sum( 

2123 [ 

2124 -math.log2(sampling_probabilities[l]) 

2125 * (wit_1_rdg_support[l] * wit_2_rdg_support[l] / probability_of_agreement) 

2126 for l in range(len(sampling_probabilities)) 

2127 if sampling_probabilities[l] > 0.0 

2128 ] 

2129 ) 

2130 # Then add this contribution to the total score for these two witnesses: 

2131 matrix[i, j] += expected_information_content 

2132 matrix[j, i] += expected_information_content 

2133 pbar.update(1) 

2134 return matrix, witness_labels 

2135 

2136 def to_mean_idf_matrix(self, drop_constant: bool = False, split_missing: SplitMissingType = None): 

2137 """Transforms this Collation into a NumPy matrix of average inverse document frequency (IDF) weights on agreements between witnesses, along with an array of its labels for the witnesses. 

2138 The IDF weight of an agreement on a given reading is the information content -log(Pr(R)) of the event R of randomly sampling a witness with that reading. 

2139 The IDF-weighted agreement score for two witnesses is the sum of the IDF weights for the readings at which they agree. 

2140 The average is taken over all variation units at which both witnesses have a non-zero reading support vector. 

2141 Where any witness is ambiguous, it contributes to its potential readings' sampling probabilities in proportion to its degrees of support for those readings. 

2142 Similarly, where one or both target witnesses are ambiguous, the expected information content of their agreement is calculated based on the probabilities of their having the same reading. 

2143 If a split_missing argument is supplied, then lacunae are handled in the same way. 

2144 This will improve the value of relationships involving more fragmentary witnesses, 

2145 but be warned that extremely fragmentary witnesses may have their values inflated and should probably be excluded from consideration. 

2146 

2147 Args: 

2148 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2149 Default value is False. 

2150 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2151 If not specified, then missing data is ignored (i.e., all states are 0). 

2152 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2153 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2154 

2155 Returns: 

2156 A NumPy IDF-weighted agreement matrix with a row and column for each witness. 

2157 A list of witness ID strings. 

2158 """ 

2159 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2160 substantive_variation_unit_ids = self.variation_unit_ids 

2161 if drop_constant: 

2162 substantive_variation_unit_ids = [ 

2163 vu_id 

2164 for vu_id in self.variation_unit_ids 

2165 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2166 ] 

2167 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2168 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2169 # Initialize the output array with the appropriate dimensions: 

2170 witness_labels = [wit.id for wit in self.witnesses] 

2171 # For each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings: 

2172 support_proportions_by_unit = {} 

2173 for k, vu_id in enumerate(self.variation_unit_ids): 

2174 # Skip this variation unit if it is a dropped constant site: 

2175 if vu_id not in substantive_variation_unit_ids_set: 

2176 continue 

2177 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2178 for i, wit in enumerate(witness_labels): 

2179 rdg_support = self.readings_by_witness[wit][k] 

2180 for l, w in enumerate(rdg_support): 

2181 support_proportions[l] += w 

2182 norm = ( 

2183 sum(support_proportions) if sum(support_proportions) > 0 else 1.0 

2184 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2185 for l in range(len(support_proportions)): 

2186 support_proportions[l] = support_proportions[l] / norm 

2187 support_proportions_by_unit[vu_id] = support_proportions 

2188 # Then populate data structures mapping each variation unit's ID to normalized reading support dictionaries 

2189 # and vectors of sampling probabilities for its substantive readings: 

2190 normalized_reading_support_dicts_by_vu_id = {} 

2191 sampling_probabilities_by_vu_id = {} 

2192 for k, vu_id in enumerate(self.variation_unit_ids): 

2193 # Skip this variation unit if it is a dropped constant site: 

2194 if vu_id not in substantive_variation_unit_ids_set: 

2195 continue 

2196 # Otherwise, populate normalized reading support vector dictionaries and sampling probability vectors in this unit: 

2197 normalized_reading_support_by_wit = {} 

2198 sampling_probabilities = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2199 for i, wit in enumerate(witness_labels): 

2200 rdg_support = self.readings_by_witness[wit][k] 

2201 # Check if this reading support vector represents missing data: 

2202 norm = sum(rdg_support) 

2203 if norm == 0: 

2204 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

2205 if split_missing == SplitMissingType.uniform: 

2206 rdg_support = [1 / len(rdg_support) for l in range(len(rdg_support))] 

2207 elif split_missing == SplitMissingType.proportional: 

2208 rdg_support = [support_proportions_by_unit[vu_id][l] for l in range(len(rdg_support))] 

2209 else: 

2210 # Otherwise, the data is present, though it may be ambiguous; normalize the reading probabilities to sum to 1: 

2211 rdg_support = [w / norm for l, w in enumerate(rdg_support)] 

2212 normalized_reading_support_by_wit[wit] = rdg_support 

2213 # Then add this witness's contributions to the readings' sampling probabilities: 

2214 for l, w in enumerate(normalized_reading_support_by_wit[wit]): 

2215 sampling_probabilities[l] += w 

2216 norm = ( 

2217 sum(sampling_probabilities) if sum(sampling_probabilities) > 0 else 1.0 

2218 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2219 # Otherwise, normalize the sampling probabilities so they sum to 1: 

2220 sampling_probabilities = [w / norm for w in sampling_probabilities] 

2221 normalized_reading_support_dicts_by_vu_id[vu_id] = normalized_reading_support_by_wit 

2222 sampling_probabilities_by_vu_id[vu_id] = sampling_probabilities 

2223 # Then populate the matrix one variation unit at a time: 

2224 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float) 

2225 nonzero_vu_count_matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float) 

2226 with tqdm(total=len(self.witnesses) ** 2) as pbar: 

2227 # Then calculate the IDF weights for agreements between witnesses in this unit: 

2228 for i, wit_1 in enumerate(witness_labels): 

2229 for j, wit_2 in enumerate(witness_labels): 

2230 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified, 

2231 # so we only have to calculate it once: 

2232 if i > j: 

2233 pbar.update(1) 

2234 continue 

2235 # Otherwise, calculate the expected information content of agreements between these witnesses in each substantive variation unit 

2236 # based on the sampling probabilities of the substantive readings in the unit: 

2237 for k, vu_id in enumerate(self.variation_unit_ids): 

2238 if vu_id not in substantive_variation_unit_ids_set: 

2239 continue 

2240 wit_1_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_1] 

2241 wit_2_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_2] 

2242 sampling_probabilities = sampling_probabilities_by_vu_id[vu_id] 

2243 # If either witness has an all-zero reading support vector, then skip this variation unit: 

2244 if sum(wit_1_rdg_support) == 0 or sum(wit_2_rdg_support) == 0: 

2245 continue 

2246 # Otherwise, the witnesses both have non-zero vectors here; increment the count of the units where this happens: 

2247 nonzero_vu_count_matrix[i, j] += 1 

2248 nonzero_vu_count_matrix[j, i] += 1 

2249 # Then calculate the probability that these two witnesses agree: 

2250 probability_of_agreement = sum( 

2251 [wit_1_rdg_support[l] * wit_2_rdg_support[l] for l in range(len(sampling_probabilities))] 

2252 ) 

2253 # If these witnesses do not agree at this variation unit, then this unit contributes nothing to their total score: 

2254 if probability_of_agreement == 0: 

2255 continue 

2256 # Otherwise, calculate the expected information content (in bits) of their agreement given their agreement on that reading 

2257 # (skipping readings with a sampling probability of 0): 

2258 expected_information_content = sum( 

2259 [ 

2260 -math.log2(sampling_probabilities[l]) 

2261 * (wit_1_rdg_support[l] * wit_2_rdg_support[l] / probability_of_agreement) 

2262 for l in range(len(sampling_probabilities)) 

2263 if sampling_probabilities[l] > 0.0 

2264 ] 

2265 ) 

2266 # Then add this contribution to the total score for these two witnesses: 

2267 matrix[i, j] += expected_information_content 

2268 matrix[j, i] += expected_information_content 

2269 pbar.update(1) 

2270 # Finally, obtain an average information content for each pair of witnesses: 

2271 for i, wit_1 in enumerate(witness_labels): 

2272 for j, wit_2 in enumerate(witness_labels): 

2273 matrix[i, j] = matrix[i, j] / nonzero_vu_count_matrix[i, j] if nonzero_vu_count_matrix[i, j] > 0 else 0 

2274 return matrix, witness_labels 

2275 

2276 def to_mi_matrix(self, drop_constant: bool = False, split_missing: SplitMissingType = None): 

2277 """Transforms this Collation into a NumPy matrix of the total mutual information (MI), in bits, between witnesses over all variation units, along with an array of its labels for the witnesses. 

2278 This is equivalent to the total Kullback-Leibler divergence of the joint distribution of the witnesses' observed readings 

2279 from the joint distribution of their expected readings under the assumption that the witnesses are independent, taken over all variation units. 

2280 The value of 0 if and only if the witnesses are completely independent. 

2281 

2282 Args: 

2283 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2284 Default value is False. 

2285 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2286 If not specified, then missing data is ignored (i.e., all states are 0). 

2287 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2288 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2289 

2290 Returns: 

2291 A NumPy MI matrix with a row and column for each witness. 

2292 A list of witness ID strings. 

2293 """ 

2294 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2295 substantive_variation_unit_ids = self.variation_unit_ids 

2296 if drop_constant: 

2297 substantive_variation_unit_ids = [ 

2298 vu_id 

2299 for vu_id in self.variation_unit_ids 

2300 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2301 ] 

2302 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2303 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2304 # Initialize the output array with the appropriate dimensions: 

2305 witness_labels = [wit.id for wit in self.witnesses] 

2306 # For each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings: 

2307 support_proportions_by_unit = {} 

2308 for k, vu_id in enumerate(self.variation_unit_ids): 

2309 # Skip this variation unit if it is a dropped constant site: 

2310 if vu_id not in substantive_variation_unit_ids_set: 

2311 continue 

2312 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2313 for i, wit in enumerate(witness_labels): 

2314 rdg_support = self.readings_by_witness[wit][k] 

2315 for l, w in enumerate(rdg_support): 

2316 support_proportions[l] += w 

2317 norm = ( 

2318 sum(support_proportions) if sum(support_proportions) > 0 else 1.0 

2319 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2320 for l in range(len(support_proportions)): 

2321 support_proportions[l] = support_proportions[l] / norm 

2322 support_proportions_by_unit[vu_id] = support_proportions 

2323 # Then populate data structures mapping each variation unit's ID to normalized reading support dictionaries, 

2324 # vectors of sampling probabilities for its substantive readings, and expected joint probability matrices: 

2325 normalized_reading_support_dicts_by_vu_id = {} 

2326 sampling_probabilities_by_vu_id = {} 

2327 expected_joint_probabilities_by_vu_id = {} 

2328 for k, vu_id in enumerate(self.variation_unit_ids): 

2329 # Skip this variation unit if it is a dropped constant site: 

2330 if vu_id not in substantive_variation_unit_ids_set: 

2331 continue 

2332 # Otherwise, populate normalized reading support vector dictionaries and sampling probability vectors in this unit: 

2333 normalized_reading_support_by_wit = {} 

2334 sampling_probabilities = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2335 for i, wit in enumerate(witness_labels): 

2336 rdg_support = self.readings_by_witness[wit][k] 

2337 # Check if this reading support vector represents missing data: 

2338 norm = sum(rdg_support) 

2339 if norm == 0: 

2340 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

2341 if split_missing == SplitMissingType.uniform: 

2342 rdg_support = [1 / len(rdg_support) for l in range(len(rdg_support))] 

2343 elif split_missing == SplitMissingType.proportional: 

2344 rdg_support = [support_proportions_by_unit[vu_id][l] for l in range(len(rdg_support))] 

2345 else: 

2346 # Otherwise, the data is present, though it may be ambiguous; normalize the reading probabilities to sum to 1: 

2347 rdg_support = [w / norm for l, w in enumerate(rdg_support)] 

2348 normalized_reading_support_by_wit[wit] = rdg_support 

2349 # Then add this witness's contributions to the readings' sampling probabilities: 

2350 for l, w in enumerate(normalized_reading_support_by_wit[wit]): 

2351 sampling_probabilities[l] += w 

2352 norm = ( 

2353 sum(sampling_probabilities) if sum(sampling_probabilities) > 0 else 1.0 

2354 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2355 # Otherwise, normalize the sampling probabilities so they sum to 1: 

2356 sampling_probabilities = [w / norm for w in sampling_probabilities] 

2357 normalized_reading_support_dicts_by_vu_id[vu_id] = normalized_reading_support_by_wit 

2358 sampling_probabilities_by_vu_id[vu_id] = sampling_probabilities 

2359 # Then populate a contingency table for the expected probabilities of joint support in this unit: 

2360 expected_joint_probabilities = np.full( 

2361 (len(sampling_probabilities), len(sampling_probabilities)), 0, dtype=float 

2362 ) 

2363 for l1 in range(len(sampling_probabilities)): 

2364 for l2 in range(len(sampling_probabilities)): 

2365 expected_joint_probabilities[l1, l2] = sampling_probabilities[l1] * sampling_probabilities[l2] 

2366 expected_joint_probabilities_by_vu_id[vu_id] = expected_joint_probabilities 

2367 # Then populate the matrix one variation unit at a time: 

2368 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float) 

2369 with tqdm(total=len(self.witnesses) ** 2) as pbar: 

2370 # Then calculate the mutual information contribution for each pair of witnesses: 

2371 for i, wit_1 in enumerate(witness_labels): 

2372 for j, wit_2 in enumerate(witness_labels): 

2373 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified, 

2374 # so we only have to calculate it once: 

2375 if i > j: 

2376 pbar.update(1) 

2377 continue 

2378 # Otherwise, calculate the mutual information between these witnesses in each substantive variation unit: 

2379 for k, vu_id in enumerate(self.variation_unit_ids): 

2380 if vu_id not in substantive_variation_unit_ids_set: 

2381 continue 

2382 wit_1_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_1] 

2383 wit_2_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_2] 

2384 sampling_probabilities = sampling_probabilities_by_vu_id[vu_id] 

2385 expected_joint_probabilities = expected_joint_probabilities_by_vu_id[vu_id] 

2386 # If either witness has an all-zeroes vector (because it is lacunose in this unit), then we can skip these witnesses here: 

2387 if sum(wit_1_rdg_support) == 0 or sum(wit_2_rdg_support) == 0: 

2388 continue 

2389 # Otherwise, populate a contingency table for the observed probabilities of joint support in this unit: 

2390 observed_joint_probabilities = np.full( 

2391 (len(sampling_probabilities), len(sampling_probabilities)), 0, dtype=float 

2392 ) 

2393 for l1, w1 in enumerate(wit_1_rdg_support): 

2394 for l2, w2 in enumerate(wit_2_rdg_support): 

2395 observed_joint_probabilities[l1, l2] = w1 * w2 

2396 # Then calculate the mutual information using the expected and observed distribution matrices: 

2397 mutual_information = 0.0 

2398 for l1 in range(len(sampling_probabilities)): 

2399 for l2 in range(len(sampling_probabilities)): 

2400 observed = observed_joint_probabilities[l1, l2] 

2401 expected = expected_joint_probabilities[l1, l2] 

2402 if observed == 0: 

2403 continue 

2404 mutual_information += observed * math.log2(observed / expected) 

2405 # Then add this mutual information to the total for these two witnesses: 

2406 matrix[i, j] += mutual_information 

2407 matrix[j, i] += mutual_information 

2408 pbar.update(1) 

2409 return matrix, witness_labels 

2410 

2411 def to_mean_mi_matrix(self, drop_constant: bool = False, split_missing: SplitMissingType = None): 

2412 """Transforms this Collation into a NumPy matrix of the average mutual information (MI), in bits, between witnesses over all variation units, along with an array of its labels for the witnesses. 

2413 This is equivalent to the average Kullback-Leibler divergence of the joint distribution of the witnesses' observed readings 

2414 from the joint distribution of their expected readings under the assumption that the witnesses are independent, taken over all variation units. 

2415 The value of 0 if and only if the witnesses are completely independent. 

2416 This will improve the value of relationships involving more fragmentary witnesses, 

2417 but be warned that extremely fragmentary witnesses may have their values inflated and should probably be excluded from consideration. 

2418 

2419 Args: 

2420 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2421 Default value is False. 

2422 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2423 If not specified, then missing data is ignored (i.e., all states are 0). 

2424 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2425 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2426 

2427 Returns: 

2428 A NumPy MI matrix with a row and column for each witness. 

2429 A list of witness ID strings. 

2430 """ 

2431 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2432 substantive_variation_unit_ids = self.variation_unit_ids 

2433 if drop_constant: 

2434 substantive_variation_unit_ids = [ 

2435 vu_id 

2436 for vu_id in self.variation_unit_ids 

2437 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2438 ] 

2439 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2440 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2441 # Initialize the output array with the appropriate dimensions: 

2442 witness_labels = [wit.id for wit in self.witnesses] 

2443 # For each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings: 

2444 support_proportions_by_unit = {} 

2445 for k, vu_id in enumerate(self.variation_unit_ids): 

2446 # Skip this variation unit if it is a dropped constant site: 

2447 if vu_id not in substantive_variation_unit_ids_set: 

2448 continue 

2449 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2450 for i, wit in enumerate(witness_labels): 

2451 rdg_support = self.readings_by_witness[wit][k] 

2452 for l, w in enumerate(rdg_support): 

2453 support_proportions[l] += w 

2454 norm = ( 

2455 sum(support_proportions) if sum(support_proportions) > 0 else 1.0 

2456 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2457 for l in range(len(support_proportions)): 

2458 support_proportions[l] = support_proportions[l] / norm 

2459 support_proportions_by_unit[vu_id] = support_proportions 

2460 # Then populate data structures mapping each variation unit's ID to normalized reading support dictionaries, 

2461 # vectors of sampling probabilities for its substantive readings, and expected joint probability matrices: 

2462 normalized_reading_support_dicts_by_vu_id = {} 

2463 sampling_probabilities_by_vu_id = {} 

2464 expected_joint_probabilities_by_vu_id = {} 

2465 for k, vu_id in enumerate(self.variation_unit_ids): 

2466 # Skip this variation unit if it is a dropped constant site: 

2467 if vu_id not in substantive_variation_unit_ids_set: 

2468 continue 

2469 # Otherwise, populate normalized reading support vector dictionaries and sampling probability vectors in this unit: 

2470 normalized_reading_support_by_wit = {} 

2471 sampling_probabilities = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2472 for i, wit in enumerate(witness_labels): 

2473 rdg_support = self.readings_by_witness[wit][k] 

2474 # Check if this reading support vector represents missing data: 

2475 norm = sum(rdg_support) 

2476 if norm == 0: 

2477 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

2478 if split_missing == SplitMissingType.uniform: 

2479 rdg_support = [1 / len(rdg_support) for l in range(len(rdg_support))] 

2480 elif split_missing == SplitMissingType.proportional: 

2481 rdg_support = [support_proportions_by_unit[vu_id][l] for l in range(len(rdg_support))] 

2482 else: 

2483 # Otherwise, the data is present, though it may be ambiguous; normalize the reading probabilities to sum to 1: 

2484 rdg_support = [w / norm for l, w in enumerate(rdg_support)] 

2485 normalized_reading_support_by_wit[wit] = rdg_support 

2486 # Then add this witness's contributions to the readings' sampling probabilities: 

2487 for l, w in enumerate(normalized_reading_support_by_wit[wit]): 

2488 sampling_probabilities[l] += w 

2489 norm = ( 

2490 sum(sampling_probabilities) if sum(sampling_probabilities) > 0 else 1.0 

2491 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2492 # Otherwise, normalize the sampling probabilities so they sum to 1: 

2493 sampling_probabilities = [w / norm for w in sampling_probabilities] 

2494 normalized_reading_support_dicts_by_vu_id[vu_id] = normalized_reading_support_by_wit 

2495 sampling_probabilities_by_vu_id[vu_id] = sampling_probabilities 

2496 # Then populate a contingency table for the expected probabilities of joint support in this unit: 

2497 expected_joint_probabilities = np.full( 

2498 (len(sampling_probabilities), len(sampling_probabilities)), 0, dtype=float 

2499 ) 

2500 for l1 in range(len(sampling_probabilities)): 

2501 for l2 in range(len(sampling_probabilities)): 

2502 expected_joint_probabilities[l1, l2] = sampling_probabilities[l1] * sampling_probabilities[l2] 

2503 expected_joint_probabilities_by_vu_id[vu_id] = expected_joint_probabilities 

2504 # Then populate the matrix one variation unit at a time: 

2505 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float) 

2506 nonzero_vu_count_matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float) 

2507 with tqdm(total=len(self.witnesses) ** 2) as pbar: 

2508 # Then calculate the mutual information contribution for each pair of witnesses: 

2509 for i, wit_1 in enumerate(witness_labels): 

2510 for j, wit_2 in enumerate(witness_labels): 

2511 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified, 

2512 # so we only have to calculate it once: 

2513 if i > j: 

2514 pbar.update(1) 

2515 continue 

2516 # Otherwise, calculate the mutual information between these witnesses in each substantive variation unit: 

2517 for k, vu_id in enumerate(self.variation_unit_ids): 

2518 if vu_id not in substantive_variation_unit_ids_set: 

2519 continue 

2520 wit_1_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_1] 

2521 wit_2_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_2] 

2522 sampling_probabilities = sampling_probabilities_by_vu_id[vu_id] 

2523 expected_joint_probabilities = expected_joint_probabilities_by_vu_id[vu_id] 

2524 # If either witness has an all-zeroes vector (because it is lacunose in this unit), then we can skip these witnesses here: 

2525 if sum(wit_1_rdg_support) == 0 or sum(wit_2_rdg_support) == 0: 

2526 continue 

2527 # Otherwise, populate a contingency table for the observed probabilities of joint support in this unit: 

2528 observed_joint_probabilities = np.full( 

2529 (len(sampling_probabilities), len(sampling_probabilities)), 0, dtype=float 

2530 ) 

2531 for l1, w1 in enumerate(wit_1_rdg_support): 

2532 for l2, w2 in enumerate(wit_2_rdg_support): 

2533 observed_joint_probabilities[l1, l2] = w1 * w2 

2534 # Then calculate the mutual information using the expected and observed distribution matrices: 

2535 mutual_information = 0.0 

2536 for l1 in range(len(sampling_probabilities)): 

2537 for l2 in range(len(sampling_probabilities)): 

2538 observed = observed_joint_probabilities[l1, l2] 

2539 expected = expected_joint_probabilities[l1, l2] 

2540 if observed == 0: 

2541 continue 

2542 mutual_information += observed * math.log2(observed / expected) 

2543 # Then add this mutual information to the total for these two witnesses, and increment the count of variation units where both have non-zero reading support vectors: 

2544 matrix[i, j] += mutual_information 

2545 matrix[j, i] += mutual_information 

2546 nonzero_vu_count_matrix[i, j] += 1 

2547 nonzero_vu_count_matrix[j, i] += 1 

2548 pbar.update(1) 

2549 # Finally, obtain an average mutual information for each pair of witnesses: 

2550 for i, wit_1 in enumerate(witness_labels): 

2551 for j, wit_2 in enumerate(witness_labels): 

2552 matrix[i, j] = matrix[i, j] / nonzero_vu_count_matrix[i, j] if nonzero_vu_count_matrix[i, j] > 0 else 0 

2553 return matrix, witness_labels 

2554 

2555 def to_nexus_table(self, drop_constant: bool = False, ambiguous_as_missing: bool = False): 

2556 """Returns this Collation in the form of a table with rows for taxa, columns for characters, and reading IDs in cells. 

2557 

2558 Args: 

2559 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2560 Default value is False. 

2561 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data. 

2562 Default value is False. 

2563 

2564 Returns: 

2565 A NumPy array with rows for taxa, columns for characters, and reading IDs in cells. 

2566 A list of substantive reading ID strings. 

2567 A list of witness ID strings. 

2568 """ 

2569 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2570 substantive_variation_unit_ids = self.variation_unit_ids 

2571 if drop_constant: 

2572 substantive_variation_unit_ids = [ 

2573 vu_id 

2574 for vu_id in self.variation_unit_ids 

2575 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2576 ] 

2577 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2578 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2579 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary 

2580 # to the readings' IDs: 

2581 reading_ids_by_indices = {} 

2582 for j, vu in enumerate(self.variation_units): 

2583 if vu.id not in substantive_variation_unit_ids_set: 

2584 continue 

2585 k = 0 

2586 for rdg in vu.readings: 

2587 key = tuple([vu.id, rdg.id]) 

2588 if key not in substantive_variation_unit_reading_tuples_set: 

2589 continue 

2590 indices = tuple([j, k]) 

2591 reading_ids_by_indices[indices] = rdg.id 

2592 k += 1 

2593 # Initialize the output array with the appropriate dimensions: 

2594 missing_symbol = '?' 

2595 witness_labels = [wit.id for wit in self.witnesses] 

2596 matrix = np.full( 

2597 (len(witness_labels), len(substantive_variation_unit_ids)), missing_symbol, dtype=object 

2598 ) # use dtype=object because the maximum string length is not known up front 

2599 # Then populate it with the appropriate values: 

2600 with tqdm(total=len(self.witnesses)) as pbar: 

2601 row_ind = 0 

2602 for i, wit in enumerate(self.witnesses): 

2603 col_ind = 0 

2604 for j, vu in enumerate(self.variation_units): 

2605 if vu.id not in substantive_variation_unit_ids_set: 

2606 continue 

2607 rdg_support = self.readings_by_witness[wit.id][j] 

2608 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

2609 if sum(rdg_support) == 0: 

2610 matrix[row_ind, col_ind] = missing_symbol 

2611 # Otherwise, add its coefficients normally: 

2612 else: 

2613 rdg_inds = [ 

2614 k for k, w in enumerate(rdg_support) if w > 0 

2615 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

2616 # For singleton readings, just print the reading ID: 

2617 if len(rdg_inds) == 1: 

2618 k = rdg_inds[0] 

2619 matrix[row_ind, col_ind] = reading_ids_by_indices[(j, k)] 

2620 # For multiple readings, print the corresponding reading IDs in braces or the missing symbol depending on input settings: 

2621 else: 

2622 if ambiguous_as_missing: 

2623 matrix[row_ind, col_ind] = missing_symbol 

2624 else: 

2625 matrix[row_ind, col_ind] = "{%s}" % " ".join( 

2626 [reading_ids_by_indices[(j, k)] for k in rdg_inds] 

2627 ) 

2628 col_ind += 1 

2629 row_ind += 1 

2630 pbar.update(1) 

2631 return matrix, witness_labels, substantive_variation_unit_ids 

2632 

2633 def to_long_table(self, drop_constant: bool = False): 

2634 """Returns this Collation in the form of a long table with columns for taxa, characters, reading indices, and reading values. 

2635 Note that this method treats ambiguous readings as missing data. 

2636 

2637 Args: 

2638 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2639 Default value is False. 

2640 

2641 Returns: 

2642 A NumPy array with columns for taxa, characters, reading indices, and reading values, and rows for each combination of these values in the matrix. 

2643 A list of column label strings. 

2644 """ 

2645 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2646 substantive_variation_unit_ids = self.variation_unit_ids 

2647 if drop_constant: 

2648 substantive_variation_unit_ids = [ 

2649 vu_id 

2650 for vu_id in self.variation_unit_ids 

2651 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2652 ] 

2653 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2654 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2655 # Initialize the outputs: 

2656 column_labels = ["taxon", "character", "state", "value"] 

2657 long_table_list = [] 

2658 # Populate a dictionary mapping (variation unit index, reading index) tuples to reading texts: 

2659 reading_texts_by_indices = {} 

2660 for j, vu in enumerate(self.variation_units): 

2661 if vu.id not in substantive_variation_unit_ids_set: 

2662 continue 

2663 k = 0 

2664 for rdg in vu.readings: 

2665 key = tuple([vu.id, rdg.id]) 

2666 if key not in substantive_variation_unit_reading_tuples_set: 

2667 continue 

2668 indices = tuple([j, k]) 

2669 reading_texts_by_indices[indices] = rdg.text 

2670 k += 1 

2671 # Then populate the output list with the appropriate values: 

2672 witness_labels = [wit.id for wit in self.witnesses] 

2673 missing_symbol = '?' 

2674 with tqdm(total=len(self.witnesses)) as pbar: 

2675 for i, wit in enumerate(self.witnesses): 

2676 row_ind = 0 

2677 for j, vu_id in enumerate(self.variation_unit_ids): 

2678 if vu_id not in substantive_variation_unit_ids_set: 

2679 continue 

2680 rdg_support = self.readings_by_witness[wit.id][j] 

2681 # Populate a list of nonzero coefficients for this reading support vector: 

2682 rdg_inds = [k for k, w in enumerate(rdg_support) if w > 0] 

2683 # If this list does not consist of exactly one reading, then treat it as missing data: 

2684 if len(rdg_inds) != 1: 

2685 long_table_list.append([wit.id, vu_id, missing_symbol, missing_symbol]) 

2686 continue 

2687 k = rdg_inds[0] 

2688 rdg_text = reading_texts_by_indices[(j, k)] 

2689 # Replace empty reading texts with the omission placeholder: 

2690 if rdg_text == "": 

2691 rdg_text = "om." 

2692 long_table_list.append([wit.id, vu_id, k, rdg_text]) 

2693 pbar.update(1) 

2694 # Then convert the long table entries list to a NumPy array: 

2695 long_table = np.array(long_table_list) 

2696 return long_table, column_labels 

2697 

2698 def to_dataframe( 

2699 self, 

2700 drop_constant: bool = False, 

2701 ambiguous_as_missing: bool = False, 

2702 proportion: bool = False, 

2703 table_type: TableType = TableType.matrix, 

2704 split_missing: SplitMissingType = None, 

2705 show_ext: bool = False, 

2706 ): 

2707 """Returns this Collation in the form of a Pandas DataFrame array, including the appropriate row and column labels. 

2708 

2709 Args: 

2710 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2711 Default value is False. 

2712 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data. 

2713 Default value is False. 

2714 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2715 Default value is False. 

2716 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate. 

2717 Only applicable for tabular outputs. 

2718 Default value is "matrix". 

2719 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2720 If not specified, then missing data is ignored (i.e., all states are 0). 

2721 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2722 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2723 Only applicable for table types "matrix" and "idf". 

2724 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2725 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2726 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2727 Default value is False. 

2728 

2729 Returns: 

2730 A Pandas DataFrame corresponding to a collation matrix with reading frequencies or a long table with discrete reading states. 

2731 """ 

2732 df = None 

2733 # Proceed based on the table type: 

2734 if table_type == TableType.matrix: 

2735 # Convert the collation to a NumPy array and get its row and column labels first: 

2736 matrix, reading_labels, witness_labels = self.to_numpy( 

2737 drop_constant=drop_constant, split_missing=split_missing 

2738 ) 

2739 df = pd.DataFrame(matrix, index=reading_labels, columns=witness_labels) 

2740 elif table_type == TableType.distance: 

2741 # Convert the collation to a NumPy array and get its row and column labels first: 

2742 matrix, witness_labels = self.to_distance_matrix( 

2743 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2744 ) 

2745 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2746 elif table_type == TableType.similarity: 

2747 # Convert the collation to a NumPy array and get its row and column labels first: 

2748 matrix, witness_labels = self.to_similarity_matrix( 

2749 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2750 ) 

2751 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2752 elif table_type == TableType.idf: 

2753 # Convert the collation to a NumPy array and get its row and column labels first: 

2754 matrix, witness_labels = self.to_idf_matrix(drop_constant=drop_constant, split_missing=split_missing) 

2755 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2756 elif table_type == TableType.mean_idf: 

2757 # Convert the collation to a NumPy array and get its row and column labels first: 

2758 matrix, witness_labels = self.to_mean_idf_matrix(drop_constant=drop_constant, split_missing=split_missing) 

2759 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2760 elif table_type == TableType.mi: 

2761 # Convert the collation to a NumPy array and get its row and column labels first: 

2762 matrix, witness_labels = self.to_mi_matrix(drop_constant=drop_constant, split_missing=split_missing) 

2763 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2764 elif table_type == TableType.mean_mi: 

2765 # Convert the collation to a NumPy array and get its row and column labels first: 

2766 matrix, witness_labels = self.to_mean_mi_matrix(drop_constant=drop_constant, split_missing=split_missing) 

2767 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2768 elif table_type == TableType.nexus: 

2769 # Convert the collation to a NumPy array and get its row and column labels first: 

2770 matrix, witness_labels, vu_labels = self.to_nexus_table( 

2771 drop_constant=drop_constant, ambiguous_as_missing=ambiguous_as_missing 

2772 ) 

2773 df = pd.DataFrame(matrix, index=witness_labels, columns=vu_labels) 

2774 elif table_type == TableType.long: 

2775 # Convert the collation to a long table and get its column labels first: 

2776 long_table, column_labels = self.to_long_table(drop_constant=drop_constant) 

2777 df = pd.DataFrame(long_table, columns=column_labels) 

2778 return df 

2779 

2780 def to_csv( 

2781 self, 

2782 file_addr: Union[Path, str], 

2783 drop_constant: bool = False, 

2784 ambiguous_as_missing: bool = False, 

2785 proportion: bool = False, 

2786 table_type: TableType = TableType.matrix, 

2787 split_missing: SplitMissingType = None, 

2788 show_ext: bool = False, 

2789 **kwargs 

2790 ): 

2791 """Writes this Collation to a comma-separated value (CSV) file with the given address. 

2792 

2793 If your witness IDs are numeric (e.g., Gregory-Aland numbers), then they will be written in full to the CSV file, but Excel will likely interpret them as numbers and truncate any leading zeroes! 

2794 

2795 Args: 

2796 file_addr: A string representing the path to an output CSV file; the file type should be .csv. 

2797 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2798 Default value is False. 

2799 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data. 

2800 Default value is False. 

2801 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2802 Default value is False. 

2803 table_type: A TableType option indicating which type of tabular output to generate. 

2804 Only applicable for tabular outputs. 

2805 Default value is "matrix". 

2806 split_missing: An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2807 If not specified, then missing data is ignored (i.e., all states are 0). 

2808 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2809 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2810 Only applicable for table types "matrix" and "idf". 

2811 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2812 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2813 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2814 Default value is False. 

2815 **kwargs: Keyword arguments for pandas.DataFrame.to_csv. 

2816 """ 

2817 # Convert the collation to a Pandas DataFrame first: 

2818 df = self.to_dataframe( 

2819 drop_constant=drop_constant, 

2820 ambiguous_as_missing=ambiguous_as_missing, 

2821 proportion=proportion, 

2822 table_type=table_type, 

2823 split_missing=split_missing, 

2824 show_ext=show_ext, 

2825 ) 

2826 # Generate all parent folders for this file that don't already exist: 

2827 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

2828 # Proceed based on the table type: 

2829 if table_type == TableType.long: 

2830 return df.to_csv( 

2831 file_addr, encoding="utf-8-sig", index=False, **kwargs 

2832 ) # add BOM to start of file so that Excel will know to read it as Unicode 

2833 return df.to_csv( 

2834 file_addr, encoding="utf-8-sig", **kwargs 

2835 ) # add BOM to start of file so that Excel will know to read it as Unicode 

2836 

2837 def to_excel( 

2838 self, 

2839 file_addr: Union[Path, str], 

2840 drop_constant: bool = False, 

2841 ambiguous_as_missing: bool = False, 

2842 proportion: bool = False, 

2843 table_type: TableType = TableType.matrix, 

2844 split_missing: SplitMissingType = None, 

2845 show_ext: bool = False, 

2846 ): 

2847 """Writes this Collation to an Excel (.xlsx) file with the given address. 

2848 

2849 Since Pandas is deprecating its support for xlwt, specifying an output in old Excel (.xls) output is not recommended. 

2850 

2851 Args: 

2852 file_addr: A string representing the path to an output Excel file; the file type should be .xlsx. 

2853 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2854 Default value is False. 

2855 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data. 

2856 Default value is False. 

2857 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2858 Default value is False. 

2859 table_type: A TableType option indicating which type of tabular output to generate. 

2860 Only applicable for tabular outputs. 

2861 Default value is "matrix". 

2862 split_missing: An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2863 If not specified, then missing data is ignored (i.e., all states are 0). 

2864 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2865 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2866 Only applicable for table types "matrix" and "idf". 

2867 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2868 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2869 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2870 Default value is False. 

2871 """ 

2872 # Convert the collation to a Pandas DataFrame first: 

2873 df = self.to_dataframe( 

2874 drop_constant=drop_constant, 

2875 ambiguous_as_missing=ambiguous_as_missing, 

2876 proportion=proportion, 

2877 table_type=table_type, 

2878 split_missing=split_missing, 

2879 show_ext=show_ext, 

2880 ) 

2881 # Generate all parent folders for this file that don't already exist: 

2882 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

2883 # Proceed based on the table type: 

2884 if table_type == TableType.long: 

2885 return df.to_excel(file_addr, index=False) 

2886 return df.to_excel(file_addr) 

2887 

2888 def to_phylip_matrix( 

2889 self, 

2890 file_addr: Union[Path, str], 

2891 drop_constant: bool = False, 

2892 proportion: bool = False, 

2893 table_type: TableType = TableType.distance, 

2894 show_ext: bool = False, 

2895 ): 

2896 """Writes this Collation as a PHYLIP-formatted distance/similarity matrix to the file with the given address. 

2897 

2898 Args: 

2899 file_addr: A string representing the path to an output PHYLIP file; the file type should be .ph or .phy. 

2900 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2901 Default value is False. 

2902 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2903 Default value is False. 

2904 table_type: A TableType option indicating which type of tabular output to generate. 

2905 For PHYLIP-formatted outputs, distance and similarity matrices are the only supported table types. 

2906 Default value is "distance". 

2907 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2908 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2909 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2910 Default value is False. 

2911 """ 

2912 # Convert the collation to a Pandas DataFrame first: 

2913 matrix = None 

2914 witness_labels = [] 

2915 # Proceed based on the table type: 

2916 if table_type == TableType.distance: 

2917 # Convert the collation to a NumPy array and get its row and column labels first: 

2918 matrix, witness_labels = self.to_distance_matrix( 

2919 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2920 ) 

2921 elif table_type == TableType.similarity: 

2922 # Convert the collation to a NumPy array and get its row and column labels first: 

2923 matrix, witness_labels = self.to_similarity_matrix( 

2924 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2925 ) 

2926 # Generate all parent folders for this file that don't already exist: 

2927 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

2928 with open(file_addr, "w", encoding="utf-8") as f: 

2929 # The first line contains the number of taxa: 

2930 f.write("%d\n" % len(witness_labels)) 

2931 # Every subsequent line contains a witness label, followed by the values in its row of the matrix: 

2932 for i, wit_id in enumerate(witness_labels): 

2933 wit_label = slugify(wit_id, lowercase=False, allow_unicode=True, separator='_') 

2934 f.write("%s %s\n" % (wit_label, " ".join([str(v) for v in matrix[i]]))) 

2935 return 

2936 

2937 def get_stemma_symbols(self): 

2938 """Returns a list of one-character symbols needed to represent the states of all substantive readings in stemma format. 

2939 

2940 The number of symbols equals the maximum number of substantive readings at any variation unit. 

2941 

2942 Returns: 

2943 A list of individual characters representing states in readings. 

2944 """ 

2945 possible_symbols = ( 

2946 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase) 

2947 ) # NOTE: the maximum number of symbols allowed in stemma format (other than "?" and "-") is 62 

2948 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

2949 nsymbols = 0 

2950 # If there are no witnesses, then no symbols are needed at all: 

2951 if len(self.witnesses) == 0: 

2952 return [] 

2953 wit_id = self.witnesses[0].id 

2954 for rdg_support in self.readings_by_witness[wit_id]: 

2955 nsymbols = max(nsymbols, len(rdg_support)) 

2956 stemma_symbols = possible_symbols[:nsymbols] 

2957 return stemma_symbols 

2958 

2959 def to_stemma(self, file_addr: Union[Path, str]): 

2960 """Writes this Collation to a stemma file without an extension and a Chron file (containing low, middle, and high dates for all witnesses) without an extension. 

2961 

2962 Since this format does not support ambiguous states, all reading vectors with anything other than one nonzero entry will be interpreted as lacunose. 

2963 If an interpGrp for weights is specified in the TEI XML collation, then the weights for the interp elements will be used as weights 

2964 for the variation units that specify them in their ana attribute. 

2965 

2966 Args: 

2967 file_addr: A string representing the path to an output stemma prep file; the file should have no extension. 

2968 The accompanying chron file will match this file name, except that it will have "_chron" appended to the end. 

2969 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2970 """ 

2971 # Populate a list of sites that will correspond to columns of the sequence alignment 

2972 # (by default, constant sites are dropped): 

2973 substantive_variation_unit_ids = [ 

2974 vu_id for vu_id in self.variation_unit_ids if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2975 ] 

2976 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2977 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2978 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary 

2979 # to the readings' texts: 

2980 reading_texts_by_indices = {} 

2981 for j, vu in enumerate(self.variation_units): 

2982 if vu.id not in substantive_variation_unit_ids_set: 

2983 continue 

2984 k = 0 

2985 for rdg in vu.readings: 

2986 key = tuple([vu.id, rdg.id]) 

2987 if key not in substantive_variation_unit_reading_tuples_set: 

2988 continue 

2989 indices = tuple([j, k]) 

2990 reading_texts_by_indices[indices] = rdg.text 

2991 k += 1 

2992 # In a second pass, populate another dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary 

2993 # to the witnesses exclusively supporting those readings: 

2994 reading_wits_by_indices = {} 

2995 for indices in reading_texts_by_indices: 

2996 reading_wits_by_indices[indices] = [] 

2997 for i, wit in enumerate(self.witnesses): 

2998 for j, vu_id in enumerate(self.variation_unit_ids): 

2999 if vu_id not in substantive_variation_unit_ids_set: 

3000 continue 

3001 rdg_support = self.readings_by_witness[wit.id][j] 

3002 # If this witness does not exclusively support exactly one reading at this unit, then treat it as lacunose: 

3003 if len([k for k, w in enumerate(rdg_support) if w > 0]) != 1: 

3004 continue 

3005 k = rdg_support.index(1) 

3006 indices = tuple([j, k]) 

3007 reading_wits_by_indices[indices].append(wit.id) 

3008 # In a third pass, write to the stemma file: 

3009 symbols = self.get_stemma_symbols() 

3010 Path(file_addr).parent.mkdir( 

3011 parents=True, exist_ok=True 

3012 ) # generate all parent folders for this file that don't already exist 

3013 chron_file_addr = str(file_addr) + "_chron" 

3014 with open(file_addr, "w", encoding="utf-8") as f: 

3015 # Start with the witness list: 

3016 f.write( 

3017 "* %s ;\n\n" 

3018 % " ".join( 

3019 [slugify(wit.id, lowercase=False, allow_unicode=True, separator='_') for wit in self.witnesses] 

3020 ) 

3021 ) 

3022 # f.write("^ %s\n\n" % chron_file_addr) #write the relative path to the chron file 

3023 f.write( 

3024 "^ %s\n\n" % ("." + os.sep + Path(chron_file_addr).name) 

3025 ) # write the relative path to the chron file 

3026 # Then add a line indicating that all witnesses are lacunose unless they are specified explicitly: 

3027 f.write("= $? $* ;\n\n") 

3028 with tqdm(total=len(self.variation_unit_ids)) as pbar: 

3029 # Then proceed for each variation unit: 

3030 for j, vu_id in enumerate(self.variation_unit_ids): 

3031 if vu_id not in substantive_variation_unit_ids_set: 

3032 pbar.update(1) 

3033 continue 

3034 # Print the variation unit ID first: 

3035 f.write("@ %s\n" % vu_id) 

3036 # In a first pass, print the texts of all readings enclosed in brackets: 

3037 f.write("[ ") 

3038 k = 0 

3039 while True: 

3040 indices = tuple([j, k]) 

3041 if indices not in reading_texts_by_indices: 

3042 break 

3043 text = slugify( 

3044 reading_texts_by_indices[indices], lowercase=False, allow_unicode=True, separator='.' 

3045 ) 

3046 # Denote omissions by en-dashes: 

3047 if text == "": 

3048 text = "\u2013" 

3049 # The first reading should not be preceded by anything: 

3050 if k == 0: 

3051 f.write(text) 

3052 f.write(" |") 

3053 # Add the weight of this variation unit after the pipe by comparing its analysis categories to their weights: 

3054 weight = 1 

3055 vu = self.variation_units[j] 

3056 if len(vu.analysis_categories) > 0: 

3057 weight = int( 

3058 sum( 

3059 [ 

3060 self.weights_by_id[ana] if ana in self.weights_by_id else 1 

3061 for ana in vu.analysis_categories 

3062 ] 

3063 ) 

3064 / len(vu.analysis_categories) 

3065 ) 

3066 f.write("*%d" % weight) 

3067 # Every subsequent reading should be preceded by a space: 

3068 elif k > 0: 

3069 f.write(" %s" % text) 

3070 k += 1 

3071 f.write(" ]\n") 

3072 # In a second pass, print the indices and witnesses for all readings enclosed in angle brackets: 

3073 k = 0 

3074 f.write("\t< ") 

3075 while True: 

3076 indices = tuple([j, k]) 

3077 if indices not in reading_wits_by_indices: 

3078 break 

3079 rdg_symbol = symbols[k] # get the one-character alphanumeric code for this state 

3080 wits = " ".join(reading_wits_by_indices[indices]) 

3081 # Open the variant reading support block with an angle bracket: 

3082 if k == 0: 

3083 f.write("%s %s" % (rdg_symbol, wits)) 

3084 # Open all subsequent variant reading support blocks with pipes on the next line: 

3085 else: 

3086 f.write("\n\t| %s %s" % (rdg_symbol, wits)) 

3087 k += 1 

3088 f.write(" >\n") 

3089 pbar.update(1) 

3090 # In a fourth pass, write to the chron file: 

3091 max_id_length = max( 

3092 [len(slugify(wit.id, lowercase=False, allow_unicode=True, separator='_')) for wit in self.witnesses] 

3093 ) 

3094 max_date_length = 0 

3095 for wit in self.witnesses: 

3096 if wit.date_range[0] is not None: 

3097 max_date_length = max(max_date_length, len(str(wit.date_range[0]))) 

3098 if wit.date_range[1] is not None: 

3099 max_date_length = max(max_date_length, len(str(wit.date_range[1]))) 

3100 # Attempt to get the minimum and maximum dates for witnesses; if we can't do this, then don't write a chron file: 

3101 min_date = None 

3102 max_date = None 

3103 try: 

3104 min_date = min([wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None]) 

3105 max_date = max([wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None]) 

3106 except Exception as e: 

3107 print("WARNING: no witnesses have date ranges; no chron file will be written!") 

3108 return 

3109 with open(chron_file_addr, "w", encoding="utf-8") as f: 

3110 for wit in self.witnesses: 

3111 wit_label = slugify(wit.id, lowercase=False, allow_unicode=True, separator='_') 

3112 f.write(wit_label) 

3113 f.write(" " * (max_id_length - len(wit.id) + 1)) 

3114 # If either the lower bound on this witness's date is empty, then use the min and max dates over all witnesses as defaults: 

3115 date_range = wit.date_range 

3116 if date_range[0] is None: 

3117 date_range = tuple([min_date, date_range[1]]) 

3118 # Then write the date range minimum, average, and maximum to the chron file: 

3119 low_date = str(date_range[0]) 

3120 f.write(" " * (max_date_length - len(low_date) + 2)) 

3121 f.write(low_date) 

3122 avg_date = str(int(((date_range[0] + date_range[1]) / 2))) 

3123 f.write(" " * (max_date_length - len(str(avg_date)) + 2)) 

3124 f.write(avg_date) 

3125 high_date = str(date_range[1]) 

3126 f.write(" " * (max_date_length - len(high_date) + 2)) 

3127 f.write(high_date) 

3128 f.write("\n") 

3129 return 

3130 

3131 def to_file( 

3132 self, 

3133 file_addr: Union[Path, str], 

3134 format: Format = None, 

3135 drop_constant: bool = False, 

3136 split_missing: SplitMissingType = None, 

3137 char_state_labels: bool = True, 

3138 frequency: bool = False, 

3139 ambiguous_as_missing: bool = False, 

3140 proportion: bool = False, 

3141 calibrate_dates: bool = False, 

3142 mrbayes: bool = False, 

3143 clock_model: ClockModel = ClockModel.strict, 

3144 ancestral_logger: AncestralLogger = AncestralLogger.state, 

3145 table_type: TableType = TableType.matrix, 

3146 show_ext: bool = False, 

3147 seed: int = None, 

3148 ): 

3149 """Writes this Collation to the file with the given address. 

3150 

3151 Args: 

3152 file_addr (Union[Path, str]): The path to the output file. 

3153 format (Format, optional): The desired output format. 

3154 If None then it is infered from the file suffix. 

3155 Defaults to None. 

3156 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

3157 Default value is False. 

3158 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

3159 If not specified, then missing data is ignored (i.e., all states are 0). 

3160 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

3161 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

3162 Only applicable for tabular outputs of type "matrix" or "idf". 

3163 char_state_labels (bool, optional): An optional flag indicating whether to print 

3164 the CharStateLabels block in NEXUS output. 

3165 Default value is True. 

3166 frequency (bool, optional): An optional flag indicating whether to use the StatesFormat=Frequency setting 

3167 instead of the StatesFormat=StatesPresent setting 

3168 (and thus represent all states with frequency vectors rather than symbols) 

3169 in NEXUS output. 

3170 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation. 

3171 Default value is False. 

3172 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data. 

3173 If this flag is set, then only base symbols will be generated for the NEXUS file. 

3174 It is only applied if the frequency option is False. 

3175 Default value is False. 

3176 proportion (bool, optional): An optional flag indicating whether to populate a distance matrix's cells 

3177 with a proportion of disagreements to variation units where both witnesses are extant. 

3178 It is only applied if the table_type option is "distance". 

3179 Default value is False. 

3180 calibrate_dates (bool, optional): An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses 

3181 in NEXUS output. 

3182 This option is intended for inputs to BEAST 2. 

3183 Default value is False. 

3184 mrbayes (bool, optional): An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses 

3185 in NEXUS output. 

3186 This option is intended for inputs to MrBayes. 

3187 Default value is False. 

3188 clock_model (ClockModel, optional): A ClockModel option indicating which type of clock model to use. 

3189 This option is intended for inputs to MrBayes and BEAST 2. 

3190 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified. 

3191 Default value is "strict". 

3192 ancestral_logger (AncestralLogger, optional): An AncestralLogger option indicating which class of logger (if any) to use for ancestral states. 

3193 This option is intended for inputs to BEAST 2. 

3194 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate. 

3195 Only applicable for tabular outputs and PHYLIP outputs. 

3196 If the output is a PHYLIP file, then the type of tabular output must be "distance" or "similarity"; otherwise, it will be ignored. 

3197 Default value is "matrix". 

3198 show_ext (bool, optional): An optional flag indicating whether each cell in a distance or similarity matrix 

3199 should include the number of variation units where both witnesses are extant after the number of their disagreements/agreements. 

3200 Only applicable for tabular output formats of type "distance" or "similarity". 

3201 Default value is False. 

3202 seed (optional, int): A seed for random number generation (for setting initial values of unspecified transcriptional rates in BEAST 2 XML output). 

3203 """ 

3204 file_addr = Path(file_addr) 

3205 format = format or Format.infer( 

3206 file_addr.suffix 

3207 ) # an exception will be raised here if the format or suffix is invalid 

3208 

3209 if format == Format.NEXUS: 

3210 return self.to_nexus( 

3211 file_addr, 

3212 drop_constant=drop_constant, 

3213 char_state_labels=char_state_labels, 

3214 frequency=frequency, 

3215 ambiguous_as_missing=ambiguous_as_missing, 

3216 calibrate_dates=calibrate_dates, 

3217 mrbayes=mrbayes, 

3218 clock_model=clock_model, 

3219 ) 

3220 

3221 if format == format.HENNIG86: 

3222 return self.to_hennig86(file_addr, drop_constant=drop_constant) 

3223 

3224 if format == format.PHYLIP: 

3225 if table_type in [TableType.distance, TableType.similarity]: 

3226 return self.to_phylip_matrix( 

3227 file_addr, 

3228 drop_constant=drop_constant, 

3229 proportion=proportion, 

3230 table_type=table_type, 

3231 show_ext=show_ext, 

3232 ) 

3233 return self.to_phylip(file_addr, drop_constant=drop_constant) 

3234 

3235 if format == format.FASTA: 

3236 return self.to_fasta(file_addr, drop_constant=drop_constant) 

3237 

3238 if format == format.BEAST: 

3239 return self.to_beast( 

3240 file_addr, 

3241 drop_constant=drop_constant, 

3242 clock_model=clock_model, 

3243 ancestral_logger=ancestral_logger, 

3244 seed=seed, 

3245 ) 

3246 

3247 if format == Format.CSV: 

3248 return self.to_csv( 

3249 file_addr, 

3250 drop_constant=drop_constant, 

3251 ambiguous_as_missing=ambiguous_as_missing, 

3252 proportion=proportion, 

3253 table_type=table_type, 

3254 split_missing=split_missing, 

3255 show_ext=show_ext, 

3256 ) 

3257 

3258 if format == Format.TSV: 

3259 return self.to_csv( 

3260 file_addr, 

3261 drop_constant=drop_constant, 

3262 ambiguous_as_missing=ambiguous_as_missing, 

3263 proportion=proportion, 

3264 table_type=table_type, 

3265 split_missing=split_missing, 

3266 show_ext=show_ext, 

3267 sep="\t", 

3268 ) 

3269 

3270 if format == Format.EXCEL: 

3271 return self.to_excel( 

3272 file_addr, 

3273 drop_constant=drop_constant, 

3274 ambiguous_as_missing=ambiguous_as_missing, 

3275 proportion=proportion, 

3276 table_type=table_type, 

3277 split_missing=split_missing, 

3278 show_ext=show_ext, 

3279 ) 

3280 

3281 if format == Format.STEMMA: 

3282 return self.to_stemma(file_addr)