Coverage for teiphy/collation.py: 100.00%

1648 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2026-04-20 19:45 +0000

1#!/usr/bin/env python3 

2 

3from enum import Enum 

4from typing import List, Union 

5import os 

6from pathlib import Path 

7from datetime import datetime # for calculating the current year (for dating and tree height purposes) 

8import math # for special functions 

9import time # to time calculations for users 

10import string # for easy retrieval of character ranges 

11from lxml import etree as et # for reading TEI XML inputs 

12import numpy as np # for random number sampling and collation matrix outputs 

13import pandas as pd # for writing to DataFrames, CSV, Excel, etc. 

14from slugify import slugify # for converting Unicode text from readings to ASCII for NEXUS 

15from jinja2 import Environment, PackageLoader, select_autoescape # for filling output XML templates 

16from tqdm import tqdm # for progress bars 

17 

18from .common import xml_ns, tei_ns 

19from .format import Format 

20from .witness import Witness 

21from .variation_unit import VariationUnit 

22 

23 

24class ParsingException(Exception): 

25 pass 

26 

27 

28class WitnessDateException(Exception): 

29 pass 

30 

31 

32class IntrinsicRelationsException(Exception): 

33 pass 

34 

35 

36class ClockModel(str, Enum): 

37 strict = "strict" 

38 uncorrelated = "uncorrelated" 

39 local = "local" 

40 

41 

42class AncestralLogger(str, Enum): 

43 state = "state" 

44 sequence = "sequence" 

45 none = "none" 

46 

47 

48class TableType(str, Enum): 

49 matrix = "matrix" 

50 distance = "distance" 

51 similarity = "similarity" 

52 idf = "idf" 

53 mean_idf = "mean-idf" 

54 mi = "mi" 

55 mean_mi = "mean-mi" 

56 nexus = "nexus" 

57 long = "long" 

58 

59 

60class SplitMissingType(str, Enum): 

61 uniform = "uniform" 

62 proportional = "proportional" 

63 

64 

65class TransformMatrixType(str, Enum): 

66 stddev = "stddev" 

67 mad = "mad" 

68 

69 

70class Collation: 

71 """Base class for storing TEI XML collation data internally. 

72 

73 This corresponds to the entire XML tree, rooted at the TEI element of the collation. 

74 

75 Attributes: 

76 manuscript_suffixes: A list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses. 

77 trivial_reading_types: A set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading. 

78 missing_reading_types: A set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data. 

79 fill_corrector_lacunae: A boolean flag indicating whether or not to fill "lacunae" in witnesses with type "corrector". 

80 fragmentary_threshold: A float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation. 

81 witnesses: A list of Witness instances contained in this Collation. 

82 witness_index_by_id: A dictionary mapping base witness ID strings to their int indices in the witnesses list. 

83 variation_units: A list of VariationUnit instances contained in this Collation. 

84 readings_by_witness: A dictionary mapping base witness ID strings to lists of reading support coefficients for all units (with at least two substantive readings). 

85 substantive_variation_unit_ids: A list of ID strings for variation units with two or more substantive readings. 

86 substantive_variation_unit_reading_tuples: A list of (variation unit ID, reading ID) tuples for substantive readings. 

87 verbose: A boolean flag indicating whether or not to print timing and debugging details for the user. 

88 """ 

89 

90 def __init__( 

91 self, 

92 xml: et.ElementTree, 

93 manuscript_suffixes: List[str] = [], 

94 trivial_reading_types: List[str] = [], 

95 missing_reading_types: List[str] = [], 

96 fill_corrector_lacunae: bool = False, 

97 fragmentary_threshold: float = None, 

98 dates_file: Union[Path, str] = None, 

99 verbose: bool = False, 

100 ): 

101 """Constructs a new Collation instance with the given settings. 

102 

103 Args: 

104 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

105 manuscript_suffixes: An optional list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses. 

106 trivial_reading_types: An optional set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading. 

107 missing_reading_types: An optional set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data. 

108 fill_corrector_lacunae: An optional flag indicating whether or not to fill "lacunae" in witnesses with type "corrector". 

109 fragmentary_threshold: An optional float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation. 

110 dates_file: An optional path to a CSV file containing witness IDs, minimum dates, and maximum dates. If specified, then for all witnesses in the first column, any existing date ranges for them in the TEI XML collation will be ignored. 

111 verbose: An optional flag indicating whether or not to print timing and debugging details for the user. 

112 """ 

113 self.manuscript_suffixes = manuscript_suffixes 

114 self.trivial_reading_types = set(trivial_reading_types) 

115 self.missing_reading_types = set(missing_reading_types) 

116 self.fill_corrector_lacunae = fill_corrector_lacunae 

117 self.fragmentary_threshold = fragmentary_threshold 

118 self.verbose = verbose 

119 self.witnesses = [] 

120 self.witness_index_by_id = {} 

121 self.variation_units = [] 

122 self.readings_by_witness = {} 

123 self.variation_unit_ids = [] 

124 self.substantive_variation_unit_reading_tuples = [] 

125 self.substantive_readings_by_variation_unit_id = {} 

126 self.weight_categories = [] 

127 self.weights_by_id = {} 

128 self.intrinsic_categories = [] 

129 self.intrinsic_odds_by_id = {} 

130 self.transcriptional_categories = [] 

131 self.transcriptional_rates_by_id = {} 

132 self.origin_date_range = [] 

133 # Now parse the XML tree to populate these data structures: 

134 if self.verbose: 

135 print("Initializing collation...") 

136 t0 = time.time() 

137 self.parse_origin_date_range(xml) 

138 self.parse_list_wit(xml) 

139 self.validate_wits(xml) 

140 # If a dates file was specified, then update the witness date ranges manually: 

141 if dates_file is not None: 

142 self.update_witness_date_ranges_from_dates_file(dates_file) 

143 # If the upper bound on a work's date of origin is not defined, then attempt to assign it an upper bound based on the witness dates; 

144 # otherwise, attempt to assign lower bounds to witness dates based on it: 

145 if self.origin_date_range[1] is None: 

146 self.update_origin_date_range_from_witness_date_ranges() 

147 else: 

148 self.update_witness_date_ranges_from_origin_date_range() 

149 self.parse_weights(xml) 

150 self.parse_intrinsic_odds(xml) 

151 self.parse_transcriptional_rates(xml) 

152 self.parse_apps(xml) 

153 self.validate_intrinsic_relations() 

154 self.parse_readings_by_witness() 

155 # If a threshold of readings for fragmentary witnesses is specified, then filter the witness list using the dictionary mapping witness IDs to readings: 

156 if self.fragmentary_threshold is not None: 

157 self.filter_fragmentary_witnesses(xml) 

158 t1 = time.time() 

159 if self.verbose: 

160 print("Total time to initialize collation: %0.4fs." % (t1 - t0)) 

161 

162 def parse_origin_date_range(self, xml: et.ElementTree): 

163 """Given an XML tree for a collation, populates this Collation's list of origin date bounds. 

164 

165 Args: 

166 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

167 """ 

168 if self.verbose: 

169 print("Parsing origin date range...") 

170 t0 = time.time() 

171 self.origin_date_range = [None, None] 

172 for date in xml.xpath( 

173 "//tei:sourceDesc//tei:bibl//tei:date|//tei:sourceDesc//tei:biblStruct//tei:date|//tei:sourceDesc//tei:biblFull//tei:date", 

174 namespaces={"tei": tei_ns}, 

175 ): 

176 # Try the @when attribute first; if it is set, then it accounts for both ends of the date range: 

177 if date.get("when") is not None: 

178 self.origin_date_range[0] = int(date.get("when").split("-")[0]) 

179 self.origin_date_range[1] = self.origin_date_range[0] 

180 # Failing that, if it has @from and @to attributes (indicating the period over which the work was completed), 

181 # then the completion date of the work accounts for both ends of the date range: 

182 elif date.get("to") is not None: 

183 self.origin_date_range[0] = int(date.get("to").split("-")[0]) 

184 self.origin_date_range[1] = self.origin_date_range[0] 

185 # Failing that, set lower and upper bounds on the origin date using the the @notBefore and @notAfter attributes: 

186 elif date.get("notBefore") is not None or date.get("notAfter") is not None: 

187 if date.get("notBefore") is not None: 

188 self.origin_date_range[0] = int(date.get("notBefore").split("-")[0]) 

189 if date.get("notAfter") is not None: 

190 self.origin_date_range[1] = int(date.get("notAfter").split("-")[0]) 

191 return 

192 

193 def get_base_wit(self, wit: str): 

194 """Given a witness siglum, strips of the specified manuscript suffixes until the siglum matches one in the witness list or until no more suffixes can be stripped. 

195 

196 Args: 

197 wit: A string representing a witness siglum, potentially including suffixes to be stripped. 

198 """ 

199 base_wit = wit 

200 # If our starting siglum corresponds to a siglum in the witness list, then just return it: 

201 if base_wit in self.witness_index_by_id: 

202 return base_wit 

203 # Otherwise, strip any suffixes we find until the siglum corresponds to a base witness in the list 

204 # or no more suffixes can be stripped: 

205 suffix_found = True 

206 while suffix_found: 

207 suffix_found = False 

208 for suffix in self.manuscript_suffixes: 

209 if base_wit.endswith(suffix): 

210 suffix_found = True 

211 base_wit = base_wit[: -len(suffix)] 

212 break # stop looking for other suffixes 

213 # If the siglum stripped of this suffix now corresponds to a siglum in the witness list, then return it: 

214 if base_wit in self.witness_index_by_id: 

215 return base_wit 

216 # If we get here, then all possible manuscript suffixes have been stripped, and the resulting siglum does not correspond to a siglum in the witness list: 

217 return base_wit 

218 

219 def parse_list_wit(self, xml: et.ElementTree): 

220 """Given an XML tree for a collation, populates its list of witnesses from its listWit element. 

221 If the XML tree does not contain a listWit element, then a ParsingException is thrown listing all distinct witness sigla encountered in the collation. 

222 

223 Args: 

224 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

225 """ 

226 if self.verbose: 

227 print("Parsing witness list...") 

228 t0 = time.time() 

229 self.witnesses = [] 

230 self.witness_index_by_id = {} 

231 list_wits = xml.xpath("/tei:TEI//tei:listWit", namespaces={"tei": tei_ns}) 

232 if len(list_wits) == 0: 

233 # There is no listWit element: collect all distinct witness sigla in the collation and raise a ParsingException listing them: 

234 distinct_sigla = set() 

235 sigla = [] 

236 # Proceed for each rdg, rdgGrp, or witDetail element: 

237 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}): 

238 wit_str = rdg.get("wit") if rdg.get("wit") is not None else "" 

239 wits = wit_str.split() 

240 for wit in wits: 

241 siglum = wit.strip("#") # remove the URI prefix, if there is one 

242 if siglum not in distinct_sigla: 

243 distinct_sigla.add(siglum) 

244 sigla.append(siglum) 

245 sigla.sort() 

246 msg = "" 

247 msg += "An explicit listWit element must be included in the TEI XML collation.\n" 

248 msg += "The following sigla occur in the collation and should be included as the @xml:id or @n attributes of witness elements under the listWit element:\n" 

249 msg += ", ".join(sigla) 

250 raise ParsingException(msg) 

251 # Otherwise, take the first listWit element as the list of all witnesses and process it: 

252 list_wit = list_wits[0] 

253 for witness in list_wit.xpath("./tei:witness", namespaces={"tei": tei_ns}): 

254 wit = Witness(witness, self.verbose) 

255 self.witness_index_by_id[wit.id] = len(self.witnesses) 

256 self.witnesses.append(wit) 

257 t1 = time.time() 

258 if self.verbose: 

259 print("Finished processing %d witnesses in %0.4fs." % (len(self.witnesses), t1 - t0)) 

260 return 

261 

262 def validate_wits(self, xml: et.ElementTree): 

263 """Given an XML tree for a collation, checks if any witness sigla listed in a rdg, rdgGrp, or witDetail element, 

264 once stripped of ignored suffixes, is not found in the witness list. 

265 A warning will be issued for each distinct siglum like this. 

266 This method also checks if the upper bound of any witness's date is earlier than the lower bound on the collated work's date of origin 

267 and throws an exception if so. 

268 

269 Args: 

270 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

271 """ 

272 if self.verbose: 

273 print("Validating witness list against collation...") 

274 t0 = time.time() 

275 # There is no listWit element: collect all distinct witness sigla in the collation and raise an exception listing them: 

276 distinct_extra_sigla = set() 

277 extra_sigla = [] 

278 # Proceed for each rdg, rdgGrp, or witDetail element: 

279 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}): 

280 wit_str = rdg.get("wit") if rdg.get("wit") is not None else "" 

281 wits = wit_str.split() 

282 for wit in wits: 

283 siglum = wit.strip("#") # remove the URI prefix, if there is one 

284 base_siglum = self.get_base_wit(siglum) 

285 if base_siglum not in self.witness_index_by_id: 

286 if base_siglum not in distinct_extra_sigla: 

287 distinct_extra_sigla.add(base_siglum) 

288 extra_sigla.append(base_siglum) 

289 if len(extra_sigla) > 0: 

290 extra_sigla.sort() 

291 msg = "" 

292 msg += "WARNING: The following sigla occur in the collation that do not have corresponding witness entries in the listWit:\n" 

293 msg += ", ".join(extra_sigla) 

294 print(msg) 

295 # If the lower bound on the date of origin is defined, then check each witness against it: 

296 if self.origin_date_range[0] is not None: 

297 bad_date_witness_sigla = [] 

298 bad_date_upper_bounds_by_witness = {} 

299 for i, wit in enumerate(self.witnesses): 

300 if wit.date_range[1] is not None and wit.date_range[1] < self.origin_date_range[0]: 

301 bad_date_witness_sigla.append(wit.id) 

302 bad_date_upper_bounds_by_witness[wit.id] = wit.date_range[1] 

303 if len(bad_date_witness_sigla) > 0: 

304 msg = "" 

305 msg += "The following witnesses have their latest possible dates before the earliest date of origin %d specified for the collated work:\n" 

306 msg += ", ".join( 

307 [ 

308 (siglum + "(" + str(bad_date_upper_bounds_by_witness[siglum]) + ")") 

309 for siglum in bad_date_witness_sigla 

310 ] 

311 ) 

312 raise WitnessDateException(msg) 

313 t1 = time.time() 

314 if self.verbose: 

315 print("Finished witness validation in %0.4fs." % (t1 - t0)) 

316 return 

317 

318 def update_witness_date_ranges_from_dates_file(self, dates_file: Union[Path, str]): 

319 """Given a CSV-formatted dates file, update the date ranges of all witnesses whose IDs are in the first column of the dates file 

320 (overwriting existing date ranges if necessary). 

321 """ 

322 if self.verbose: 

323 print("Updating witness dates from file %s..." % (str(dates_file))) 

324 t0 = time.time() 

325 dates_df = pd.read_csv(dates_file, index_col=0, names=["id", "min", "max"]) 

326 for witness in self.witnesses: 

327 wit_id = witness.id 

328 if wit_id in dates_df.index: 

329 # For every witness in the list whose ID is specified in the dates file, 

330 # update their date ranges (as long as the date ranges in the file are are well-formed): 

331 min_date = int(dates_df.loc[wit_id]["min"]) if not np.isnan(dates_df.loc[wit_id]["min"]) else None 

332 max_date = ( 

333 int(dates_df.loc[wit_id]["max"]) 

334 if not np.isnan(dates_df.loc[wit_id]["max"]) 

335 else datetime.now().year 

336 ) 

337 print(min_date, max_date) 

338 if min_date is not None and max_date is not None and min_date > max_date: 

339 raise ParsingException( 

340 "In dates file %s, for witness ID %s, the minimum date %d is greater than the maximum date %d." 

341 % (str(dates_file), wit_id, min_date, max_date) 

342 ) 

343 witness.date_range = [min_date, max_date] 

344 t1 = time.time() 

345 if self.verbose: 

346 print("Finished witness date range updates in %0.4fs." % (t1 - t0)) 

347 return 

348 

349 def update_origin_date_range_from_witness_date_ranges(self): 

350 """Conditionally updates the upper bound on the date of origin of the work represented by this Collation 

351 based on the bounds on the witnesses' dates. 

352 If none of the witnesses have bounds on their dates, then nothing is done. 

353 This method is only invoked if the work's date of origin does not already have its upper bound defined. 

354 """ 

355 if self.verbose: 

356 print("Updating upper bound on origin date using witness dates...") 

357 t0 = time.time() 

358 # Set the origin date to the earliest witness date: 

359 witness_date_lower_bounds = [wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None] 

360 witness_date_upper_bounds = [wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None] 

361 min_witness_date = ( 

362 min(witness_date_lower_bounds + witness_date_upper_bounds) 

363 if len(witness_date_lower_bounds + witness_date_upper_bounds) > 0 

364 else None 

365 ) 

366 if min_witness_date is not None: 

367 self.origin_date_range[1] = ( 

368 min(self.origin_date_range[1], min_witness_date) 

369 if self.origin_date_range[1] is not None 

370 else min_witness_date 

371 ) 

372 t1 = time.time() 

373 if self.verbose: 

374 print("Finished updating upper bound on origin date in %0.4fs." % (t1 - t0)) 

375 return 

376 

377 def update_witness_date_ranges_from_origin_date_range(self): 

378 """Attempts to update the lower bounds on the witnesses' dates of origin of the work represented by this Collation 

379 using the upper bound on the date of origin of the work represented by this Collation. 

380 This method is only invoked if the upper bound on the work's date of origin was not already defined 

381 (i.e., if update_origin_date_range_from_witness_date_ranges was not invoked). 

382 """ 

383 if self.verbose: 

384 print("Updating lower bounds on witness dates using origin date...") 

385 t0 = time.time() 

386 # Proceed for every witness: 

387 for i, wit in enumerate(self.witnesses): 

388 # Ensure that the lower bound on this witness's date is no earlier than the upper bound on the date of the work's origin: 

389 wit.date_range[0] = ( 

390 max(wit.date_range[0], self.origin_date_range[1]) 

391 if wit.date_range[0] is not None 

392 else self.origin_date_range[1] 

393 ) 

394 # Then ensure that the upper bound on this witness's date is no earlier than its lower bound, in case we updated it: 

395 wit.date_range[1] = max(wit.date_range[0], wit.date_range[1]) 

396 t1 = time.time() 

397 if self.verbose: 

398 print("Finished updating lower bounds on witness dates in %0.4fs." % (t1 - t0)) 

399 return 

400 

401 def parse_weights(self, xml: et.ElementTree): 

402 """Given an XML tree for a collation, populates this Collation's list of variation unit weight categories 

403 (associated with types of variation that may have different expected frequencies) 

404 and its dictionary mapping these categories to integer weights. 

405 

406 Args: 

407 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

408 """ 

409 if self.verbose: 

410 print("Parsing variation unit weight categories...") 

411 t0 = time.time() 

412 self.weight_categories = [] 

413 self.weights_by_id = {} 

414 for interp in xml.xpath("//tei:interpGrp[@type=\"weight\"]/tei:interp", namespaces={"tei": tei_ns}): 

415 # These must be indexed by the xml:id attribute, so skip any that do not have one: 

416 if interp.get("{%s}id" % xml_ns) is None: 

417 continue 

418 weight_category = interp.get("{%s}id" % xml_ns) 

419 # Retrieve this element's text value and coerce it to an integer, defaulting to 1 if it has none: 

420 weight = 1 

421 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}): 

422 if certainty.get("degree") is not None: 

423 weight = int(certainty.get("degree")) 

424 break 

425 self.weight_categories.append(weight_category) 

426 self.weights_by_id[weight_category] = weight 

427 t1 = time.time() 

428 if self.verbose: 

429 print( 

430 "Finished processing %d variation unit weight categories in %0.4fs." 

431 % (len(self.weight_categories), t1 - t0) 

432 ) 

433 return 

434 

435 def parse_intrinsic_odds(self, xml: et.ElementTree): 

436 """Given an XML tree for a collation, populates this Collation's list of intrinsic probability categories 

437 (e.g., "absolutely more likely," "highly more likely," "more likely," "slightly more likely," "equally likely") 

438 and its dictionary mapping these categories to numerical odds. 

439 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated. 

440 

441 Args: 

442 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

443 """ 

444 if self.verbose: 

445 print("Parsing intrinsic odds categories...") 

446 t0 = time.time() 

447 self.intrinsic_categories = [] 

448 self.intrinsic_odds_by_id = {} 

449 for interp in xml.xpath("//tei:interpGrp[@type=\"intrinsic\"]/tei:interp", namespaces={"tei": tei_ns}): 

450 # These must be indexed by the xml:id attribute, so skip any that do not have one: 

451 if interp.get("{%s}id" % xml_ns) is None: 

452 continue 

453 odds_category = interp.get("{%s}id" % xml_ns) 

454 # If this element contains a certainty subelement with a fixed odds value for this category, then set it: 

455 odds = None 

456 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}): 

457 if certainty.get("degree") is not None: 

458 odds = float(certainty.get("degree")) 

459 break 

460 self.intrinsic_categories.append(odds_category) 

461 self.intrinsic_odds_by_id[odds_category] = odds 

462 t1 = time.time() 

463 if self.verbose: 

464 print( 

465 "Finished processing %d intrinsic odds categories in %0.4fs." 

466 % (len(self.intrinsic_categories), t1 - t0) 

467 ) 

468 return 

469 

470 def parse_transcriptional_rates(self, xml: et.ElementTree): 

471 """Given an XML tree for a collation, populates this Collation's dictionary mapping transcriptional change categories 

472 (e.g., "aural confusion," "visual error," "clarification") to numerical rates. 

473 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated. 

474 

475 Args: 

476 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

477 """ 

478 if self.verbose: 

479 print("Parsing transcriptional change categories...") 

480 t0 = time.time() 

481 self.transcriptional_categories = [] 

482 self.transcriptional_rates_by_id = {} 

483 for interp in xml.xpath("//tei:interpGrp[@type=\"transcriptional\"]/tei:interp", namespaces={"tei": tei_ns}): 

484 # These must be indexed by the xml:id attribute, so skip any that do not have one: 

485 if interp.get("{%s}id" % xml_ns) is None: 

486 continue 

487 transcriptional_category = interp.get("{%s}id" % xml_ns) 

488 # If this element contains a certainty subelement with a fixed rate for this category, then set it: 

489 rate = None 

490 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}): 

491 if certainty.get("degree") is not None: 

492 rate = float(certainty.get("degree")) 

493 break 

494 self.transcriptional_categories.append(transcriptional_category) 

495 self.transcriptional_rates_by_id[transcriptional_category] = rate 

496 t1 = time.time() 

497 if self.verbose: 

498 print( 

499 "Finished processing %d transcriptional change categories in %0.4fs." 

500 % (len(self.transcriptional_rates_by_id), t1 - t0) 

501 ) 

502 return 

503 

504 def validate_intrinsic_relations(self): 

505 """Checks if any VariationUnit's intrinsic_relations map is not a forest. 

506 If any is not, then an IntrinsicRelationsException is thrown describing the VariationUnit at fault. 

507 """ 

508 if self.verbose: 

509 print("Validating intrinsic relation graphs for variation units...") 

510 t0 = time.time() 

511 for vu in self.variation_units: 

512 # Skip any variation units with an empty intrinsic_relations map: 

513 if len(vu.intrinsic_relations) == 0: 

514 continue 

515 # For all others, start by identifying all reading IDs that are not related to by some other reading ID: 

516 in_degree_by_reading = {} 

517 for edge in vu.intrinsic_relations: 

518 s = edge[0] 

519 t = edge[1] 

520 if s not in in_degree_by_reading: 

521 in_degree_by_reading[s] = 0 

522 if t not in in_degree_by_reading: 

523 in_degree_by_reading[t] = 0 

524 in_degree_by_reading[t] += 1 

525 # If any reading has more than one relation pointing to it, then the intrinsic relations graph is not a forest: 

526 excessive_in_degree_readings = [ 

527 rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] > 1 

528 ] 

529 if len(excessive_in_degree_readings) > 0: 

530 msg = "" 

531 msg += ( 

532 "In variation unit %s, the following readings have more than one intrinsic relation pointing to them: %s.\n" 

533 % (vu.id, ", ".join(excessive_in_degree_readings)) 

534 ) 

535 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it." 

536 raise IntrinsicRelationsException(msg) 

537 # If every reading has another reading pointing to it, then the intrinsic relations graph contains a cycle and is not a forest: 

538 starting_nodes = [rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] == 0] 

539 if len(starting_nodes) == 0: 

540 msg = "" 

541 msg += "In variation unit %s, the intrinsic relations contain a cycle.\n" % vu.id 

542 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it." 

543 raise IntrinsicRelationsException(msg) 

544 t1 = time.time() 

545 if self.verbose: 

546 print("Finished intrinsic relations validation in %0.4fs." % (t1 - t0)) 

547 return 

548 

549 def parse_apps(self, xml: et.ElementTree): 

550 """Given an XML tree for a collation, populates its list of variation units from its app elements. 

551 

552 Args: 

553 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

554 """ 

555 if self.verbose: 

556 print("Parsing variation units...") 

557 t0 = time.time() 

558 for a in xml.xpath('//tei:app', namespaces={'tei': tei_ns}): 

559 vu = VariationUnit(a, self.verbose) 

560 self.variation_units.append(vu) 

561 t1 = time.time() 

562 if self.verbose: 

563 print("Finished processing %d variation units in %0.4fs." % (len(self.variation_units), t1 - t0)) 

564 return 

565 

566 def get_readings_by_witness_for_unit(self, vu: VariationUnit): 

567 """Returns a dictionary mapping witness IDs to a list of their reading coefficients for a given variation unit. 

568 

569 Args: 

570 vu: A VariationUnit to be processed. 

571 

572 Returns: 

573 A dictionary mapping witness ID strings to a list of their coefficients for all substantive readings in this VariationUnit. 

574 """ 

575 # In a first pass, populate lists of substantive (variation unit ID, reading ID) tuples and reading labels 

576 # and a map from reading IDs to the indices of their parent substantive reading in this unit: 

577 reading_id_to_index = {} 

578 self.substantive_readings_by_variation_unit_id[vu.id] = [] 

579 for rdg in vu.readings: 

580 # If this reading is missing (e.g., lacunose or inapplicable due to an overlapping variant) or targets another reading, then skip it: 

581 if rdg.type in self.missing_reading_types or len(rdg.certainties) > 0: 

582 continue 

583 # If this reading is trivial, then map it to the last substantive index: 

584 if rdg.type in self.trivial_reading_types: 

585 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1 

586 continue 

587 # Otherwise, the reading is substantive: add it to the map and update the last substantive index: 

588 self.substantive_readings_by_variation_unit_id[vu.id].append(rdg.id) 

589 self.substantive_variation_unit_reading_tuples.append(tuple([vu.id, rdg.id])) 

590 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1 

591 # If the list of substantive readings only contains one entry, then this variation unit is not informative; 

592 # return an empty dictionary and add nothing to the list of substantive reading labels: 

593 if self.verbose: 

594 print( 

595 "Variation unit %s has %d substantive readings." 

596 % (vu.id, len(self.substantive_readings_by_variation_unit_id[vu.id])) 

597 ) 

598 readings_by_witness_for_unit = {} 

599 # Initialize the output dictionary with empty sets for all base witnesses: 

600 for wit in self.witnesses: 

601 readings_by_witness_for_unit[wit.id] = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id]) 

602 # In a second pass, assign each base witness a set containing the readings it supports in this unit: 

603 for rdg in vu.readings: 

604 # Initialize the dictionary indicating support for this reading (or its disambiguations): 

605 rdg_support = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id]) 

606 # If this is a missing reading (e.g., a lacuna or an overlap), then we can skip it, as its corresponding set will be empty: 

607 if rdg.type in self.missing_reading_types: 

608 continue 

609 # Otherwise, if this reading is trivial, then it will contain an entry for the index of its parent substantive reading: 

610 elif rdg.type in self.trivial_reading_types: 

611 rdg_support[reading_id_to_index[rdg.id]] = 1 

612 # Otherwise, if this reading has one or more nonzero certainty degrees, 

613 # then set the entries for these readings to their degrees: 

614 elif sum(rdg.certainties.values()) > 0: 

615 for t in rdg.certainties: 

616 # Skip any reading whose ID is unrecognized in this unit: 

617 if t in reading_id_to_index: 

618 rdg_support[reading_id_to_index[t]] = rdg.certainties[t] 

619 # Otherwise, if this reading has one or more targets (i.e., if it is an ambiguous reading), 

620 # then set the entries for each of its targets to 1: 

621 elif len(rdg.targets) > 0: 

622 for t in rdg.targets: 

623 # Skip any reading whose ID is unrecognized in this unit: 

624 if t in reading_id_to_index: 

625 rdg_support[reading_id_to_index[t]] = 1 

626 # Otherwise, this reading is itself substantive; set the entry for the index of this reading to 1: 

627 else: 

628 rdg_support[reading_id_to_index[rdg.id]] = 1 

629 # Proceed for each witness siglum in the support for this reading: 

630 for wit in rdg.wits: 

631 # Is this siglum a base siglum? 

632 base_wit = self.get_base_wit(wit) 

633 if base_wit not in self.witness_index_by_id: 

634 # If it is not, then it is probably just because we've encountered a corrector or some other secondary witness not included in the witness list; 

635 # report this if we're in verbose mode and move on: 

636 if self.verbose: 

637 print( 

638 "Skipping unknown witness siglum %s (base siglum %s) in variation unit %s, reading %s..." 

639 % (wit, base_wit, vu.id, rdg.id) 

640 ) 

641 continue 

642 # If we've found a base siglum, then add this reading's contribution to the base witness's reading set for this unit; 

643 # normally the existing set will be empty, but if we reduce two suffixed sigla to the same base witness, 

644 # then that witness may attest to multiple readings in the same unit: 

645 readings_by_witness_for_unit[base_wit] = [ 

646 (min(readings_by_witness_for_unit[base_wit][i] + rdg_support[i], 1)) 

647 for i in range(len(rdg_support)) 

648 ] 

649 return readings_by_witness_for_unit 

650 

651 def parse_readings_by_witness(self): 

652 """Populates the internal dictionary mapping witness IDs to a list of their reading support sets for all variation units, and then fills the empty reading support sets for witnesses of type "corrector" with the entries of the previous witness.""" 

653 if self.verbose: 

654 print("Populating internal dictionary of witness readings...") 

655 t0 = time.time() 

656 # Initialize the data structures to be populated here: 

657 self.readings_by_witness = {} 

658 self.variation_unit_ids = [] 

659 for wit in self.witnesses: 

660 self.readings_by_witness[wit.id] = [] 

661 # Populate them for each variation unit: 

662 for vu in self.variation_units: 

663 readings_by_witness_for_unit = self.get_readings_by_witness_for_unit(vu) 

664 if len(readings_by_witness_for_unit) > 0: 

665 self.variation_unit_ids.append(vu.id) 

666 for wit in readings_by_witness_for_unit: 

667 self.readings_by_witness[wit].append(readings_by_witness_for_unit[wit]) 

668 # Optionally, fill the lacunae of the correctors: 

669 if self.fill_corrector_lacunae: 

670 for i, wit in enumerate(self.witnesses): 

671 # If this is the first witness, then it shouldn't be a corrector (since there is no previous witness against which to compare it): 

672 if i == 0: 

673 continue 

674 # Otherwise, if this witness is not a corrector, then skip it: 

675 if wit.type != "corrector": 

676 continue 

677 # Otherwise, retrieve the previous witness: 

678 prev_wit = self.witnesses[i - 1] 

679 for j in range(len(self.readings_by_witness[wit.id])): 

680 # If the corrector has no reading, then set it to the previous witness's reading: 

681 if sum(self.readings_by_witness[wit.id][j]) == 0: 

682 self.readings_by_witness[wit.id][j] = self.readings_by_witness[prev_wit.id][j] 

683 t1 = time.time() 

684 if self.verbose: 

685 print( 

686 "Populated dictionary for %d witnesses over %d substantive variation units in %0.4fs." 

687 % (len(self.witnesses), len(self.variation_unit_ids), t1 - t0) 

688 ) 

689 return 

690 

691 def filter_fragmentary_witnesses(self, xml): 

692 """Filters the original witness list and readings by witness dictionary to exclude witnesses whose proportions of extant passages fall below the fragmentary readings threshold.""" 

693 if self.verbose: 

694 print( 

695 "Filtering fragmentary witnesses (extant in < %f of all variation units) out of internal witness list and dictionary of witness readings..." 

696 % self.fragmentary_threshold 

697 ) 

698 t0 = time.time() 

699 fragmentary_witness_set = set() 

700 # Proceed for each witness in order: 

701 for wit in self.witnesses: 

702 wit_id = wit.id 

703 # We count the number of variation units at which this witness has an extant (i.e., non-missing) reading: 

704 extant_reading_count = 0 

705 total_reading_count = len(self.readings_by_witness[wit.id]) 

706 # Proceed through all reading support lists: 

707 for rdg_support in self.readings_by_witness[wit_id]: 

708 # If the current reading support list is not all zeroes, then increment this witness's count of extant readings: 

709 if sum(rdg_support) != 0: 

710 extant_reading_count += 1 

711 # If the proportion of extant readings falls below the threshold, then add this witness to the list of fragmentary witnesses: 

712 if extant_reading_count / total_reading_count < self.fragmentary_threshold: 

713 fragmentary_witness_set.add(wit_id) 

714 # Then filter the witness list to exclude the fragmentary witnesses: 

715 filtered_witnesses = [wit for wit in self.witnesses if wit.id not in fragmentary_witness_set] 

716 self.witnesses = filtered_witnesses 

717 # Then remove the entries for the fragmentary witnesses from the witnesses-to-readings dictionary: 

718 for wit_id in fragmentary_witness_set: 

719 del self.readings_by_witness[wit_id] 

720 t1 = time.time() 

721 if self.verbose: 

722 print( 

723 "Filtered out %d fragmentary witness(es) (%s) in %0.4fs." 

724 % (len(fragmentary_witness_set), str(list(fragmentary_witness_set)), t1 - t0) 

725 ) 

726 return 

727 

728 def get_nexus_symbols(self): 

729 """Returns a list of one-character symbols needed to represent the states of all substantive readings in NEXUS. 

730 

731 The number of symbols equals the maximum number of substantive readings at any variation unit. 

732 

733 Returns: 

734 A list of individual characters representing states in readings. 

735 """ 

736 # NOTE: IQTREE does not appear to support symbols outside of 0-9 and a-z, and its base symbols must be case-insensitive. 

737 # The official version of MrBayes is likewise limited to 32 symbols. 

738 # But PAUP* allows up to 64 symbols, and Andrew Edmondson's fork of MrBayes does, as well. 

739 # So this method will support symbols from 0-9, a-z, and A-Z (for a total of 62 states) 

740 possible_symbols = list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase) 

741 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

742 nsymbols = 0 

743 # If there are no witnesses, then no symbols are needed at all: 

744 if len(self.witnesses) == 0: 

745 return [] 

746 wit_id = self.witnesses[0].id 

747 for rdg_support in self.readings_by_witness[wit_id]: 

748 nsymbols = max(nsymbols, len(rdg_support)) 

749 nexus_symbols = possible_symbols[:nsymbols] 

750 return nexus_symbols 

751 

752 def to_nexus( 

753 self, 

754 file_addr: Union[Path, str], 

755 drop_constant: bool = False, 

756 char_state_labels: bool = True, 

757 frequency: bool = False, 

758 ambiguous_as_missing: bool = False, 

759 calibrate_dates: bool = False, 

760 mrbayes: bool = False, 

761 clock_model: ClockModel = ClockModel.strict, 

762 ): 

763 """Writes this Collation to a NEXUS file with the given address. 

764 

765 Args: 

766 file_addr: A string representing the path to an output NEXUS file; the file type should be .nex, .nexus, or .nxs. 

767 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

768 char_state_labels: An optional flag indicating whether or not to include the CharStateLabels block. 

769 frequency: An optional flag indicating whether to use the StatesFormat=Frequency setting 

770 instead of the StatesFormat=StatesPresent setting 

771 (and thus represent all states with frequency vectors rather than symbols). 

772 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation. 

773 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data. 

774 If this flag is set, then only base symbols will be generated for the NEXUS file. 

775 It is only applied if the frequency option is False. 

776 calibrate_dates: An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses. 

777 This option is intended for inputs to BEAST 2. 

778 mrbayes: An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses. 

779 This option is intended for inputs to MrBayes. 

780 clock_model: A ClockModel option indicating which type of clock model to use. 

781 This option is intended for inputs to MrBayes and BEAST 2. 

782 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified. 

783 """ 

784 # Populate a list of sites that will correspond to columns of the sequence alignment: 

785 substantive_variation_unit_ids = self.variation_unit_ids 

786 if drop_constant: 

787 substantive_variation_unit_ids = [ 

788 vu_id 

789 for vu_id in self.variation_unit_ids 

790 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

791 ] 

792 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

793 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

794 # Start by calculating the values we will be using here: 

795 ntax = len(self.witnesses) 

796 nchar = len(substantive_variation_unit_ids) 

797 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses] 

798 max_taxlabel_length = max( 

799 [len(taxlabel) for taxlabel in taxlabels] 

800 ) # keep track of the longest taxon label for tabular alignment purposes 

801 charlabels = [slugify(vu_id, lowercase=False, separator='_') for vu_id in substantive_variation_unit_ids] 

802 missing_symbol = '?' 

803 symbols = self.get_nexus_symbols() 

804 # Generate all parent folders for this file that don't already exist: 

805 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

806 # Then write the file: 

807 pbar = tqdm() 

808 with open(file_addr, "w", encoding="utf-8") as f: 

809 # Start with the NEXUS header: 

810 f.write("#NEXUS\n\n") 

811 # Then begin the data block: 

812 f.write("Begin DATA;\n") 

813 # Write the collation matrix dimensions: 

814 f.write("\tDimensions ntax=%d nchar=%d;\n" % (ntax, nchar)) 

815 # Write the format subblock: 

816 f.write("\tFormat\n") 

817 f.write("\t\tDataType=Standard\n") 

818 f.write("\t\tMissing=%s\n" % missing_symbol) 

819 if frequency: 

820 f.write("\t\tStatesFormat=Frequency\n") 

821 f.write("\t\tSymbols=\"%s\";\n" % (" ".join(symbols))) 

822 # If the char_state_labels is set, then write the labels for character-state labels, with each on its own line: 

823 if char_state_labels: 

824 f.write("\tCharStateLabels") 

825 vu_ind = 1 

826 for vu in self.variation_units: 

827 if vu.id not in substantive_variation_unit_ids_set: 

828 continue 

829 if vu_ind == 1: 

830 f.write("\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_'))) 

831 else: 

832 f.write(",\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_'))) 

833 rdg_ind = 0 

834 for rdg in vu.readings: 

835 key = tuple([vu.id, rdg.id]) 

836 if key not in substantive_variation_unit_reading_tuples_set: 

837 continue 

838 ascii_rdg_text = slugify( 

839 rdg.text, lowercase=False, separator='_', replacements=[['η', 'h'], ['ω', 'w']] 

840 ) 

841 if ascii_rdg_text == "": 

842 ascii_rdg_text = "om." 

843 f.write(" %s" % ascii_rdg_text) 

844 rdg_ind += 1 

845 if rdg_ind > 0: 

846 vu_ind += 1 

847 f.write(";\n") 

848 # Write the matrix subblock: 

849 f.write("\tMatrix") 

850 with tqdm(total=len(self.witnesses)) as pbar: 

851 for i, wit in enumerate(self.witnesses): 

852 taxlabel = taxlabels[i] 

853 if frequency: 

854 sequence = "\n\t\t" + taxlabel 

855 for j, vu_id in enumerate(self.variation_unit_ids): 

856 if vu_id not in substantive_variation_unit_ids_set: 

857 continue 

858 rdg_support = self.readings_by_witness[wit.id][j] 

859 sequence += "\n\t\t\t" 

860 # If this reading is lacunose in this witness, then use the missing character: 

861 if sum(rdg_support) == 0: 

862 sequence += missing_symbol 

863 continue 

864 # Otherwise, print out its frequencies for different readings in parentheses: 

865 sequence += "(" 

866 for k, w in enumerate(rdg_support): 

867 sequence += "%s:%0.4f" % (symbols[k], w) 

868 if k < len(rdg_support) - 1: 

869 sequence += " " 

870 sequence += ")" 

871 else: 

872 sequence = "\n\t\t" + taxlabel 

873 # Add enough space after this label ensure that all sequences are nicely aligned: 

874 sequence += " " * (max_taxlabel_length - len(taxlabel) + 1) 

875 for j, vu_id in enumerate(self.variation_unit_ids): 

876 if vu_id not in substantive_variation_unit_ids_set: 

877 continue 

878 rdg_support = self.readings_by_witness[wit.id][j] 

879 # If this reading is lacunose in this witness, then use the missing character: 

880 if sum(rdg_support) == 0: 

881 sequence += missing_symbol 

882 continue 

883 rdg_inds = [ 

884 k for k, w in enumerate(rdg_support) if w > 0 

885 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

886 # For singleton readings, just print the symbol: 

887 if len(rdg_inds) == 1: 

888 sequence += symbols[rdg_inds[0]] 

889 continue 

890 # For multiple readings, print the corresponding readings in braces or the missing symbol depending on input settings: 

891 if ambiguous_as_missing: 

892 sequence += missing_symbol 

893 else: 

894 sequence += "{%s}" % "".join([str(rdg_ind) for rdg_ind in rdg_inds]) 

895 f.write("%s" % (sequence)) 

896 pbar.update(1) 

897 f.write(";\n") 

898 # End the data block: 

899 f.write("End;") 

900 # If calibrate_dates is set, then add the assumptions block: 

901 if calibrate_dates: 

902 f.write("\n\n") 

903 f.write("Begin ASSUMPTIONS;\n") 

904 # Set the scale to years: 

905 f.write("\tOPTIONS SCALE = years;\n\n") 

906 # Then calibrate the witness ages: 

907 calibrate_strings = [] 

908 for i, wit in enumerate(self.witnesses): 

909 taxlabel = taxlabels[i] 

910 date_range = wit.date_range 

911 if date_range[0] is not None: 

912 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution, 

913 # depending on whether the upper and lower bounds match: 

914 min_age = datetime.now().year - date_range[1] 

915 max_age = datetime.now().year - date_range[0] 

916 if min_age == max_age: 

917 calibrate_string = "\tCALIBRATE %s = fixed(%d)" % (taxlabel, min_age) 

918 calibrate_strings.append(calibrate_string) 

919 else: 

920 calibrate_string = "\tCALIBRATE %s = uniform(%d,%d)" % (taxlabel, min_age, max_age) 

921 calibrate_strings.append(calibrate_string) 

922 else: 

923 # If there is no lower bound on the witness's date, then use an offset log-normal distribution: 

924 min_age = datetime.now().year - date_range[1] 

925 calibrate_string = "\tCALIBRATE %s = offsetlognormal(%d,0.0,1.0)" % (taxlabel, min_age) 

926 calibrate_strings.append(calibrate_string) 

927 # Then print the calibrate strings, separated by commas and line breaks and terminated by a semicolon: 

928 f.write("%s;\n\n" % ",\n".join(calibrate_strings)) 

929 # End the assumptions block: 

930 f.write("End;") 

931 # If mrbayes is set, then add the mrbayes block: 

932 if mrbayes: 

933 f.write("\n\n") 

934 f.write("Begin MRBAYES;\n") 

935 # Turn on the autoclose feature by default: 

936 f.write("\tset autoclose=yes;\n") 

937 # Set the branch lengths to be governed by a birth-death clock model, and set up the parameters for this model: 

938 f.write("\n") 

939 f.write("\tprset brlenspr = clock:birthdeath;\n") 

940 f.write("\tprset speciationpr = uniform(0.0,10.0);\n") 

941 f.write("\tprset extinctionpr = beta(2.0,4.0);\n") 

942 f.write("\tprset sampleprob = 0.01;\n") 

943 # Use the specified clock model: 

944 f.write("\n") 

945 if clock_model == clock_model.uncorrelated: 

946 f.write("\tprset clockvarpr=igr;\n") 

947 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n") 

948 f.write("\tprset igrvarpr=exponential(1.0);\n") 

949 else: 

950 f.write("\tprset clockvarpr=strict;\n") 

951 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n") 

952 # Set the priors on the tree age depending on the date range for the origin of the collated work: 

953 f.write("\n") 

954 if self.origin_date_range[0] is not None: 

955 min_tree_age = ( 

956 datetime.now().year - self.origin_date_range[1] 

957 if self.origin_date_range[1] is not None 

958 else 0.0 

959 ) 

960 max_tree_age = datetime.now().year - self.origin_date_range[0] 

961 f.write("\tprset treeagepr = uniform(%d,%d);\n" % (min_tree_age, max_tree_age)) 

962 else: 

963 min_tree_age = ( 

964 datetime.now().year - self.origin_date_range[1] 

965 if self.origin_date_range[1] is not None 

966 else 0.0 

967 ) 

968 f.write("\tprset treeagepr = offsetgamma(%d,1.0,1.0);\n" % (min_tree_age)) 

969 # Then calibrate the witness ages: 

970 f.write("\n") 

971 f.write("\tprset nodeagepr = calibrated;\n") 

972 for i, wit in enumerate(self.witnesses): 

973 taxlabel = taxlabels[i] 

974 date_range = wit.date_range 

975 if date_range[0] is not None: 

976 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution, 

977 # depending on whether the upper and lower bounds match: 

978 min_age = datetime.now().year - date_range[1] 

979 max_age = datetime.now().year - date_range[0] 

980 if min_age == max_age: 

981 f.write("\tcalibrate %s = fixed(%d);\n" % (taxlabel, min_age)) 

982 else: 

983 f.write("\tcalibrate %s = uniform(%d,%d);\n" % (taxlabel, min_age, max_age)) 

984 else: 

985 # If there is no lower bound on the witness's date, then use an offset gamma distribution: 

986 min_age = datetime.now().year - date_range[1] 

987 f.write("\tcalibrate %s = offsetgamma(%d,1.0,1.0);\n" % (taxlabel, min_age)) 

988 f.write("\n") 

989 # Add default settings for MCMC estimation of posterior distribution: 

990 f.write("\tmcmcp ngen=100000;\n") 

991 # Write the command to run MrBayes: 

992 f.write("\tmcmc;\n") 

993 # End the assumptions block: 

994 f.write("End;") 

995 return 

996 

997 def get_hennig86_symbols(self): 

998 """Returns a list of one-character symbols needed to represent the states of all substantive readings in Hennig86 format. 

999 

1000 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1001 

1002 Returns: 

1003 A list of individual characters representing states in readings. 

1004 """ 

1005 possible_symbols = ( 

1006 list(string.digits) + list(string.ascii_uppercase)[:22] 

1007 ) # NOTE: the maximum number of symbols allowed in Hennig86 format is 32 

1008 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1009 nsymbols = 0 

1010 # If there are no witnesses, then no symbols are needed at all: 

1011 if len(self.witnesses) == 0: 

1012 return [] 

1013 wit_id = self.witnesses[0].id 

1014 for rdg_support in self.readings_by_witness[wit_id]: 

1015 nsymbols = max(nsymbols, len(rdg_support)) 

1016 hennig86_symbols = possible_symbols[:nsymbols] 

1017 return hennig86_symbols 

1018 

1019 def to_hennig86(self, file_addr: Union[Path, str], drop_constant: bool = False): 

1020 """Writes this Collation to a file in Hennig86 format with the given address. 

1021 Note that because Hennig86 format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data. 

1022 

1023 Args: 

1024 file_addr: A string representing the path to an output file. 

1025 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1026 """ 

1027 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1028 substantive_variation_unit_ids = self.variation_unit_ids 

1029 if drop_constant: 

1030 substantive_variation_unit_ids = [ 

1031 vu_id 

1032 for vu_id in self.variation_unit_ids 

1033 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1034 ] 

1035 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1036 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1037 # Start by calculating the values we will be using here: 

1038 ntax = len(self.witnesses) 

1039 nchar = len(substantive_variation_unit_ids) 

1040 taxlabels = [] 

1041 for wit in self.witnesses: 

1042 taxlabel = wit.id 

1043 # Hennig86 format requires taxon names to start with a letter, so if this is not the case, then append "WIT_" to the start of the name: 

1044 if taxlabel[0] not in string.ascii_letters: 

1045 taxlabel = "WIT_" + taxlabel 

1046 # Then replace any disallowed characters in the string with an underscore: 

1047 taxlabel = slugify(taxlabel, lowercase=False, separator='_') 

1048 taxlabels.append(taxlabel) 

1049 max_taxlabel_length = max( 

1050 [len(taxlabel) for taxlabel in taxlabels] 

1051 ) # keep track of the longest taxon label for tabular alignment purposes 

1052 missing_symbol = '?' 

1053 symbols = self.get_hennig86_symbols() 

1054 # Generate all parent folders for this file that don't already exist: 

1055 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1056 with open(file_addr, "w", encoding="ascii") as f: 

1057 # Start with the nstates header: 

1058 f.write("nstates %d;\n" % len(symbols)) 

1059 # Then begin the xread block: 

1060 f.write("xread\n") 

1061 # Write the dimensions: 

1062 f.write("%d %d\n" % (nchar, ntax)) 

1063 # Now write the matrix: 

1064 with tqdm(total=len(self.witnesses)) as pbar: 

1065 for i, wit in enumerate(self.witnesses): 

1066 taxlabel = taxlabels[i] 

1067 # Add enough space after this label ensure that all sequences are nicely aligned: 

1068 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel) + 1)) 

1069 for j, vu_id in enumerate(self.variation_unit_ids): 

1070 if vu_id not in substantive_variation_unit_ids_set: 

1071 continue 

1072 rdg_support = self.readings_by_witness[wit.id][j] 

1073 # If this reading is lacunose in this witness, then use the missing character: 

1074 if sum(rdg_support) == 0: 

1075 sequence += missing_symbol 

1076 continue 

1077 rdg_inds = [ 

1078 k for k, w in enumerate(rdg_support) if w > 0 

1079 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1080 # For singleton readings, just print the symbol: 

1081 if len(rdg_inds) == 1: 

1082 sequence += symbols[rdg_inds[0]] 

1083 continue 

1084 # For multiple readings, print the missing symbol: 

1085 sequence += missing_symbol 

1086 f.write("%s\n" % (sequence)) 

1087 pbar.update(1) 

1088 f.write(";") 

1089 return 

1090 

1091 def get_phylip_symbols(self): 

1092 """Returns a list of one-character symbols needed to represent the states of all substantive readings in PHYLIP format. 

1093 

1094 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1095 

1096 Returns: 

1097 A list of individual characters representing states in readings. 

1098 """ 

1099 possible_symbols = ( 

1100 list(string.digits) + list(string.ascii_lowercase)[:22] 

1101 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported 

1102 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1103 nsymbols = 0 

1104 # If there are no witnesses, then no symbols are needed at all: 

1105 if len(self.witnesses) == 0: 

1106 return [] 

1107 wit_id = self.witnesses[0].id 

1108 for rdg_support in self.readings_by_witness[wit_id]: 

1109 nsymbols = max(nsymbols, len(rdg_support)) 

1110 phylip_symbols = possible_symbols[:nsymbols] 

1111 return phylip_symbols 

1112 

1113 def to_phylip(self, file_addr: Union[Path, str], drop_constant: bool = False): 

1114 """Writes this Collation to a file in PHYLIP format with the given address. 

1115 Note that because PHYLIP format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data. 

1116 

1117 Args: 

1118 file_addr: A string representing the path to an output file. 

1119 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1120 """ 

1121 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1122 substantive_variation_unit_ids = self.variation_unit_ids 

1123 if drop_constant: 

1124 substantive_variation_unit_ids = [ 

1125 vu_id 

1126 for vu_id in self.variation_unit_ids 

1127 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1128 ] 

1129 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1130 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1131 # Start by calculating the values we will be using here: 

1132 ntax = len(self.witnesses) 

1133 nchar = len(substantive_variation_unit_ids) 

1134 taxlabels = [] 

1135 for wit in self.witnesses: 

1136 taxlabel = wit.id 

1137 # Then replace any disallowed characters in the string with an underscore: 

1138 taxlabel = slugify(taxlabel, lowercase=False, separator='_') 

1139 taxlabels.append(taxlabel) 

1140 max_taxlabel_length = max( 

1141 [len(taxlabel) for taxlabel in taxlabels] 

1142 ) # keep track of the longest taxon label for tabular alignment purposes 

1143 missing_symbol = '?' 

1144 symbols = self.get_phylip_symbols() 

1145 # Generate all parent folders for this file that don't already exist: 

1146 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1147 with open(file_addr, "w", encoding="ascii") as f: 

1148 # Write the dimensions: 

1149 f.write("%d %d\n" % (ntax, nchar)) 

1150 # Now write the matrix: 

1151 for i, wit in enumerate(self.witnesses): 

1152 taxlabel = taxlabels[i] 

1153 # Add enough space after this label ensure that all sequences are nicely aligned: 

1154 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel))) + "\t" 

1155 for j, vu_id in enumerate(self.variation_unit_ids): 

1156 if vu_id not in substantive_variation_unit_ids_set: 

1157 continue 

1158 rdg_support = self.readings_by_witness[wit.id][j] 

1159 # If this reading is lacunose in this witness, then use the missing character: 

1160 if sum(rdg_support) == 0: 

1161 sequence += missing_symbol 

1162 continue 

1163 rdg_inds = [ 

1164 k for k, w in enumerate(rdg_support) if w > 0 

1165 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1166 # For singleton readings, just print the symbol: 

1167 if len(rdg_inds) == 1: 

1168 sequence += symbols[rdg_inds[0]] 

1169 continue 

1170 # For multiple readings, print the missing symbol: 

1171 sequence += missing_symbol 

1172 f.write("%s\n" % (sequence)) 

1173 return 

1174 

1175 def get_fasta_symbols(self): 

1176 """Returns a list of one-character symbols needed to represent the states of all substantive readings in FASTA format. 

1177 

1178 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1179 

1180 Returns: 

1181 A list of individual characters representing states in readings. 

1182 """ 

1183 possible_symbols = ( 

1184 list(string.digits) + list(string.ascii_lowercase)[:22] 

1185 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported 

1186 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1187 nsymbols = 0 

1188 # If there are no witnesses, then no symbols are needed at all: 

1189 if len(self.witnesses) == 0: 

1190 return [] 

1191 wit_id = self.witnesses[0].id 

1192 for rdg_support in self.readings_by_witness[wit_id]: 

1193 nsymbols = max(nsymbols, len(rdg_support)) 

1194 fasta_symbols = possible_symbols[:nsymbols] 

1195 return fasta_symbols 

1196 

1197 def to_fasta(self, file_addr: Union[Path, str], drop_constant: bool = False): 

1198 """Writes this Collation to a file in FASTA format with the given address. 

1199 Note that because FASTA format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data. 

1200 

1201 Args: 

1202 file_addr: A string representing the path to an output file. 

1203 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1204 """ 

1205 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1206 substantive_variation_unit_ids = self.variation_unit_ids 

1207 if drop_constant: 

1208 substantive_variation_unit_ids = [ 

1209 vu_id 

1210 for vu_id in self.variation_unit_ids 

1211 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1212 ] 

1213 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1214 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1215 # Start by calculating the values we will be using here: 

1216 ntax = len(self.witnesses) 

1217 nchar = len(substantive_variation_unit_ids) 

1218 taxlabels = [] 

1219 for wit in self.witnesses: 

1220 taxlabel = wit.id 

1221 # Then replace any disallowed characters in the string with an underscore: 

1222 taxlabel = slugify(taxlabel, lowercase=False, separator='_') 

1223 taxlabels.append(taxlabel) 

1224 max_taxlabel_length = max( 

1225 [len(taxlabel) for taxlabel in taxlabels] 

1226 ) # keep track of the longest taxon label for tabular alignment purposes 

1227 missing_symbol = '?' 

1228 symbols = self.get_fasta_symbols() 

1229 # Generate all parent folders for this file that don't already exist: 

1230 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1231 with open(file_addr, "w", encoding="ascii") as f: 

1232 # Now write the matrix: 

1233 with tqdm(total=len(self.witnesses)) as pbar: 

1234 for i, wit in enumerate(self.witnesses): 

1235 taxlabel = taxlabels[i] 

1236 # Add enough space after this label ensure that all sequences are nicely aligned: 

1237 sequence = ">%s\n" % taxlabel 

1238 for j, vu_id in enumerate(self.variation_unit_ids): 

1239 if vu_id not in substantive_variation_unit_ids_set: 

1240 continue 

1241 rdg_support = self.readings_by_witness[wit.id][j] 

1242 # If this reading is lacunose in this witness, then use the missing character: 

1243 if sum(rdg_support) == 0: 

1244 sequence += missing_symbol 

1245 continue 

1246 rdg_inds = [ 

1247 k for k, w in enumerate(rdg_support) if w > 0 

1248 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1249 # For singleton readings, just print the symbol: 

1250 if len(rdg_inds) == 1: 

1251 sequence += symbols[rdg_inds[0]] 

1252 continue 

1253 # For multiple readings, print the missing symbol: 

1254 sequence += missing_symbol 

1255 f.write("%s\n" % (sequence)) 

1256 pbar.update(1) 

1257 return 

1258 

1259 def get_beast_symbols(self): 

1260 """Returns a list of one-character symbols needed to represent the states of all substantive readings in BEAST format. 

1261 

1262 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1263 

1264 Returns: 

1265 A list of individual characters representing states in readings. 

1266 """ 

1267 possible_symbols = ( 

1268 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase) 

1269 ) # NOTE: for BEAST, any number of states should theoretically be permissible, but since code maps are required for some reason, we will limit the number of symbols to 62 for now 

1270 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1271 nsymbols = 0 

1272 # If there are no witnesses, then no symbols are needed at all: 

1273 if len(self.witnesses) == 0: 

1274 return [] 

1275 wit_id = self.witnesses[0].id 

1276 for rdg_support in self.readings_by_witness[wit_id]: 

1277 nsymbols = max(nsymbols, len(rdg_support)) 

1278 beast_symbols = possible_symbols[:nsymbols] 

1279 return beast_symbols 

1280 

1281 def get_tip_date_range(self): 

1282 """Gets the minimum and maximum dates attested among the witnesses. 

1283 Also checks if the witness with the latest possible date has a fixed date 

1284 (i.e, if the lower and upper bounds for its date are the same) 

1285 and issues a warning if not, as this will cause unusual behavior in BEAST 2. 

1286 

1287 Returns: 

1288 A tuple containing the earliest and latest possible tip dates. 

1289 """ 

1290 earliest_date = None 

1291 earliest_wit = None 

1292 latest_date = None 

1293 latest_wit = None 

1294 for wit in self.witnesses: 

1295 wit_id = wit.id 

1296 date_range = wit.date_range 

1297 if date_range[0] is not None: 

1298 if earliest_date is not None: 

1299 earliest_wit = wit if date_range[0] < earliest_date else earliest_wit 

1300 earliest_date = min(date_range[0], earliest_date) 

1301 else: 

1302 earliest_wit = wit 

1303 earliest_date = date_range[0] 

1304 if date_range[1] is not None: 

1305 if latest_date is not None: 

1306 latest_wit = ( 

1307 wit 

1308 if (date_range[1] > latest_date or (date_range[0] == date_range[1] == latest_date)) 

1309 else latest_wit 

1310 ) # the second check ensures that a witness with a fixed date is preferred to a witness with a date range that ends with the same date 

1311 latest_date = max(date_range[1], latest_date) 

1312 else: 

1313 latest_wit = wit 

1314 latest_date = date_range[1] 

1315 if latest_wit.date_range[0] is None or latest_wit.date_range[0] != latest_wit.date_range[1]: 

1316 print( 

1317 "WARNING: the latest witness, %s, has a variable date range; this will result in problems with time-dependent substitution models and misalignment of trees in BEAST DensiTree outputs! Please ensure that witness %s has a fixed date." 

1318 % (latest_wit.id, latest_wit.id) 

1319 ) 

1320 return (earliest_date, latest_date) 

1321 

1322 def get_beast_origin_span(self, tip_date_range): 

1323 """Returns a tuple containing the lower and upper bounds for the height of the origin of the Birth-Death Skyline model. 

1324 The upper bound on the height of the tree is the difference between the latest tip date 

1325 and the lower bound on the date of the original work, if both are defined; 

1326 otherwise, it is left undefined. 

1327 The lower bound on the height of the tree is the difference between the latest tip date 

1328 and the upper bound on the date of the original work, if both are defined; 

1329 otherwise, it is the difference between the earliest tip date and the latest, if both are defined. 

1330 

1331 Args: 

1332 tip_date_range: A tuple containing the earliest and latest possible tip dates. 

1333 

1334 Returns: 

1335 A tuple containing lower and upper bounds on the origin height for the Birth-Death Skyline model. 

1336 """ 

1337 origin_span = [0, None] 

1338 # If the upper bound on the date of the work's composition is defined, then set the lower bound on the height of the origin using it and the latest tip date 

1339 # (note that if it had to be defined in terms of witness date lower bounds, then this would have happened already): 

1340 if self.origin_date_range[1] is not None: 

1341 origin_span[0] = tip_date_range[1] - self.origin_date_range[1] 

1342 # If the lower bound on the date of the work's composition is defined, then set the upper bound on the height of the origin using it and the latest tip date: 

1343 if self.origin_date_range[0] is not None: 

1344 origin_span[1] = tip_date_range[1] - self.origin_date_range[0] 

1345 return tuple(origin_span) 

1346 

1347 def get_beast_date_map(self, taxlabels): 

1348 """Returns a string representing witness-to-date mappings in BEAST format. 

1349 

1350 Since this format requires single dates as opposed to date ranges, 

1351 witnesses with closed date ranges will be mapped to the average of their lower and upper bounds, 

1352 and witnesses with open date ranges will not be mapped. 

1353 

1354 Args: 

1355 taxlabels: A list of slugified taxon labels. 

1356 

1357 Returns: 

1358 A string containing comma-separated date calibrations of the form witness_id=date. 

1359 """ 

1360 calibrate_strings = [] 

1361 for i, wit in enumerate(self.witnesses): 

1362 taxlabel = taxlabels[i] 

1363 date_range = wit.date_range 

1364 # If either end of this witness's date range is empty, then do not include it: 

1365 if date_range[0] is None or date_range[1] is None: 

1366 continue 

1367 # Otherwise, take the midpoint of its date range as its date: 

1368 date = int((date_range[0] + date_range[1]) / 2) 

1369 calibrate_string = "%s=%d" % (taxlabel, date) 

1370 calibrate_strings.append(calibrate_string) 

1371 # Then output the full date map string: 

1372 date_map = ",".join(calibrate_strings) 

1373 return date_map 

1374 

1375 def get_beast_code_map_for_unit(self, symbols, missing_symbol, vu_ind): 

1376 """Returns a string containing state/reading code mappings in BEAST format using the given single-state and missing state symbols for the character/variation unit at the given index. 

1377 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a code for a dummy state will be included. 

1378 

1379 Args: 

1380 vu_ind: An integer index for the desired unit. 

1381 

1382 Returns: 

1383 A string containing comma-separated code mappings. 

1384 """ 

1385 vu = self.variation_units[vu_ind] 

1386 vu_id = vu.id 

1387 code_map = {} 

1388 for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id])): 

1389 code_map[symbols[k]] = str(k) 

1390 # If this site is a singleton site, then add a code mapping for the dummy state: 

1391 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1392 code_map[symbols[1]] = str(1) 

1393 # Then add a mapping for the missing state, including a dummy state if this is a singleton site: 

1394 code_map[missing_symbol] = " ".join( 

1395 str(k) for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id])) 

1396 ) 

1397 # If this site is a singleton site, then add the dummy state to the missing state mapping: 

1398 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1399 code_map[missing_symbol] = code_map[missing_symbol] + " " + str(1) 

1400 # Then combine all of the mappings into a single string: 

1401 code_map_string = ", ".join([code + "=" + code_map[code] for code in code_map]) 

1402 return code_map_string 

1403 

1404 def get_beast_equilibrium_frequencies_for_unit(self, vu_ind): 

1405 """Returns a string containing state/reading equilibrium frequencies in BEAST format for the character/variation unit at the given index. 

1406 Since the equilibrium frequencies are not used with the substitution models, the equilibrium frequencies simply correspond to a uniform distribution over the states. 

1407 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then an equilibrium frequency of 0 will be added for a dummy state. 

1408 

1409 Args: 

1410 vu_ind: An integer index for the desired unit. 

1411 

1412 Returns: 

1413 A string containing space-separated equilibrium frequencies. 

1414 """ 

1415 vu = self.variation_units[vu_ind] 

1416 vu_id = vu.id 

1417 # If this unit is a singleton, then return the string "0.5 0.5": 

1418 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1: 

1419 return "0.5 0.5" 

1420 # Otherwise, set the equilibrium frequencies according to a uniform distribution: 

1421 equilibrium_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len( 

1422 self.substantive_readings_by_variation_unit_id[vu_id] 

1423 ) 

1424 equilibrium_frequencies_string = " ".join([str(w) for w in equilibrium_frequencies]) 

1425 return equilibrium_frequencies_string 

1426 

1427 def get_beast_root_frequencies_for_unit(self, vu_ind): 

1428 """Returns a string containing state/reading root frequencies in BEAST format for the character/variation unit at the given index. 

1429 The root frequencies are calculated from the intrinsic odds at this unit. 

1430 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a root frequency of 0 will be added for a dummy state. 

1431 If no intrinsic odds are specified, then a uniform distribution over all states is assumed. 

1432 

1433 Args: 

1434 vu_ind: An integer index for the desired unit. 

1435 

1436 Returns: 

1437 A string containing space-separated root frequencies. 

1438 """ 

1439 vu = self.variation_units[vu_ind] 

1440 vu_id = vu.id 

1441 intrinsic_relations = vu.intrinsic_relations 

1442 intrinsic_odds_by_id = self.intrinsic_odds_by_id 

1443 # If this unit is a singleton, then return the string "1 0": 

1444 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1: 

1445 return "1 0" 

1446 # If this unit has no intrinsic odds, then assume a uniform distribution over all readings: 

1447 if len(intrinsic_relations) == 0: 

1448 root_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len( 

1449 self.substantive_readings_by_variation_unit_id[vu_id] 

1450 ) 

1451 root_frequencies_string = " ".join([str(w) for w in root_frequencies]) 

1452 return root_frequencies_string 

1453 # We will populate the root frequencies based on the intrinsic odds of the readings: 

1454 root_frequencies_by_id = {} 

1455 for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id]: 

1456 root_frequencies_by_id[rdg_id] = 0 

1457 # First, construct an adjacency list for efficient edge iteration: 

1458 neighbors_by_source = {} 

1459 for edge in intrinsic_relations: 

1460 s = edge[0] 

1461 t = edge[1] 

1462 if s not in neighbors_by_source: 

1463 neighbors_by_source[s] = [] 

1464 if t not in neighbors_by_source: 

1465 neighbors_by_source[t] = [] 

1466 neighbors_by_source[s].append(t) 

1467 # Next, identify all readings that are not targeted by any intrinsic odds relation: 

1468 in_degree_by_reading = {} 

1469 for edge in intrinsic_relations: 

1470 s = edge[0] 

1471 t = edge[1] 

1472 if s not in in_degree_by_reading: 

1473 in_degree_by_reading[s] = 0 

1474 if t not in in_degree_by_reading: 

1475 in_degree_by_reading[t] = 0 

1476 in_degree_by_reading[t] += 1 

1477 starting_nodes = [t for t in in_degree_by_reading if in_degree_by_reading[t] == 0] 

1478 # Set the root frequencies for these readings to 1 (they will be normalized later): 

1479 for starting_node in starting_nodes: 

1480 root_frequencies_by_id[starting_node] = 1.0 

1481 # Next, set the frequencies for the remaining readings recursively using the adjacency list: 

1482 def update_root_frequencies(s): 

1483 for t in neighbors_by_source[s]: 

1484 intrinsic_category = intrinsic_relations[(s, t)] 

1485 odds = ( 

1486 intrinsic_odds_by_id[intrinsic_category] 

1487 if intrinsic_odds_by_id[intrinsic_category] is not None 

1488 else 1.0 

1489 ) # TODO: This needs to be handled using parameters once we have it implemented in BEAST 

1490 root_frequencies_by_id[t] = root_frequencies_by_id[s] / odds 

1491 update_root_frequencies(t) 

1492 return 

1493 

1494 for starting_node in starting_nodes: 

1495 update_root_frequencies(starting_node) 

1496 # Then produce a normalized vector of root frequencies that corresponds to a probability distribution: 

1497 root_frequencies = [ 

1498 root_frequencies_by_id[rdg_id] for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id] 

1499 ] 

1500 total_frequencies = sum(root_frequencies) 

1501 for k in range(len(root_frequencies)): 

1502 root_frequencies[k] = root_frequencies[k] / total_frequencies 

1503 root_frequencies_string = " ".join([str(w) for w in root_frequencies]) 

1504 return root_frequencies_string 

1505 

1506 def to_beast( 

1507 self, 

1508 file_addr: Union[Path, str], 

1509 drop_constant: bool = False, 

1510 clock_model: ClockModel = ClockModel.strict, 

1511 ancestral_logger: AncestralLogger = AncestralLogger.state, 

1512 seed: int = None, 

1513 ): 

1514 """Writes this Collation to a file in BEAST format with the given address. 

1515 

1516 Args: 

1517 file_addr: A string representing the path to an output file. 

1518 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1519 clock_model: A ClockModel option indicating which clock model to use. 

1520 ancestral_logger: An AncestralLogger option indicating which class of logger (if any) to use for ancestral states. 

1521 seed: A seed for random number generation (for setting initial values of unspecified transcriptional rates). 

1522 """ 

1523 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1524 substantive_variation_unit_ids = self.variation_unit_ids 

1525 if drop_constant: 

1526 substantive_variation_unit_ids = [ 

1527 vu_id 

1528 for vu_id in self.variation_unit_ids 

1529 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1530 ] 

1531 # Populate sets of substantive variation unit IDs and substantive variant reading tuples: 

1532 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1533 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1534 # First, calculate the values we will be using for the main template: 

1535 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses] 

1536 missing_symbol = '?' 

1537 symbols = self.get_beast_symbols() 

1538 tip_date_range = self.get_tip_date_range() 

1539 origin_span = self.get_beast_origin_span(tip_date_range) 

1540 date_map = self.get_beast_date_map(taxlabels) 

1541 # Then populate the necessary objects for the BEAST XML Jinja template: 

1542 witness_objects = [] 

1543 variation_unit_objects = [] 

1544 intrinsic_category_objects = [] 

1545 transcriptional_category_objects = [] 

1546 with tqdm(total=len(self.witnesses)) as pbar: 

1547 # Start with witnesses: 

1548 for i, wit in enumerate(self.witnesses): 

1549 witness_object = {} 

1550 # Copy the ID for this witness: 

1551 witness_object["id"] = wit.id 

1552 # Copy its date bounds: 

1553 witness_object["min_date"] = wit.date_range[0] 

1554 witness_object["max_date"] = wit.date_range[1] 

1555 # Populate its sequence from its entries in the witness's readings dictionary: 

1556 sequence = "" 

1557 for j, rdg_support in enumerate(self.readings_by_witness[wit.id]): 

1558 vu_id = self.variation_unit_ids[j] 

1559 # Skip any variation units deemed non-substantive: 

1560 if vu_id not in substantive_variation_unit_ids: 

1561 continue 

1562 # If this witness has a certainty of 0 for all readings, then it is a gap; assign a likelihood of 1 to each reading: 

1563 if sum(rdg_support) == 0: 

1564 for k, w in enumerate(rdg_support): 

1565 sequence += "1" 

1566 if k < len(rdg_support) - 1: 

1567 sequence += ", " 

1568 else: 

1569 if len(rdg_support) > 1: 

1570 sequence += "; " 

1571 else: 

1572 # If this site is a singleton site, then add a dummy state: 

1573 sequence += ", 0; " 

1574 # Otherwise, read the probabilities as they are given: 

1575 else: 

1576 for k, w in enumerate(rdg_support): 

1577 sequence += str(w) 

1578 if k < len(rdg_support) - 1: 

1579 sequence += ", " 

1580 else: 

1581 if len(rdg_support) > 1: 

1582 sequence += "; " 

1583 else: 

1584 # If this site is a singleton site, then add a dummy state: 

1585 sequence += ", 0; " 

1586 # Strip the final semicolon and space from the sequence: 

1587 sequence = sequence.strip("; ") 

1588 # Then set the witness object's sequence attribute to this string: 

1589 witness_object["sequence"] = sequence 

1590 witness_objects.append(witness_object) 

1591 pbar.update(1) 

1592 # Then proceed to variation units: 

1593 for j, vu in enumerate(self.variation_units): 

1594 if vu.id not in substantive_variation_unit_ids_set: 

1595 continue 

1596 variation_unit_object = {} 

1597 # Copy the one-based index of this variation unit: 

1598 variation_unit_object["index"] = j + 1 

1599 # Copy the ID of this variation unit: 

1600 variation_unit_object["id"] = vu.id 

1601 # Set a flag indicating if this variation unit is constant: 

1602 variation_unit_object["is_constant"] = ( 

1603 True if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1 else False 

1604 ) 

1605 # Copy this variation unit's number of substantive readings, 

1606 # setting it to 2 if it is a singleton unit: 

1607 variation_unit_object["nstates"] = ( 

1608 len(self.substantive_readings_by_variation_unit_id[vu.id]) 

1609 if len(self.substantive_readings_by_variation_unit_id[vu.id]) > 1 

1610 else 2 

1611 ) 

1612 # Then construct the code map for this unit: 

1613 variation_unit_object["code_map"] = self.get_beast_code_map_for_unit(symbols, missing_symbol, j) 

1614 # Then populate a comma-separated string of reading labels for this unit: 

1615 rdg_texts = [] 

1616 vu_label = vu.id 

1617 for rdg in vu.readings: 

1618 key = tuple([vu.id, rdg.id]) 

1619 if key not in substantive_variation_unit_reading_tuples_set: 

1620 continue 

1621 rdg_text = slugify(rdg.text, lowercase=False, allow_unicode=True, separator='_') 

1622 # Replace any empty reading text with an omission marker: 

1623 if rdg_text == "": 

1624 rdg_text = "om." 

1625 rdg_texts.append(rdg_text) 

1626 # If this site is a singleton site, then add a dummy reading for the dummy state: 

1627 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1628 rdg_texts.append("DUMMY") 

1629 rdg_texts_string = ", ".join(rdg_texts) 

1630 variation_unit_object["rdg_texts"] = rdg_texts_string 

1631 # Then populate this unit's equilibrium frequency string and its root frequency string: 

1632 variation_unit_object["equilibrium_frequencies"] = self.get_beast_equilibrium_frequencies_for_unit(j) 

1633 variation_unit_object["root_frequencies"] = self.get_beast_root_frequencies_for_unit(j) 

1634 # Then populate a dictionary mapping epoch height ranges to lists of off-diagonal entries for substitution models: 

1635 rate_objects_by_epoch_height_range = {} 

1636 epoch_height_ranges = [] 

1637 # Then proceed based on whether the transcriptional relations for this variation unit have been defined: 

1638 if len(vu.transcriptional_relations_by_date_range) == 0: 

1639 # If there are no transcriptional relations, then map the epoch range of (None, None) to their list of off-diagonal entries: 

1640 epoch_height_ranges.append((None, None)) 

1641 rate_objects_by_epoch_height_range[(None, None)] = [] 

1642 rate_objects = rate_objects_by_epoch_height_range[(None, None)] 

1643 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1644 # If this is a singleton site, then use an arbitrary 2x2 rate matrix: 

1645 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1646 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1647 else: 

1648 # If this is a site with multiple substantive readings, but no transcriptional relations list, 

1649 # then use a Lewis Mk substitution matrix with the appropriate number of states: 

1650 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1651 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1652 # Skip diagonal elements: 

1653 if k_1 == k_2: 

1654 continue 

1655 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1656 else: 

1657 # Otherwise, proceed for every date range: 

1658 for date_range in vu.transcriptional_relations_by_date_range: 

1659 # Get the map of transcriptional relations for reference later: 

1660 transcriptional_relations = vu.transcriptional_relations_by_date_range[date_range] 

1661 # Now get the epoch height range corresponding to this date range, and initialize its list in the dictionary: 

1662 epoch_height_range = [None, None] 

1663 epoch_height_range[0] = tip_date_range[1] - date_range[1] if date_range[1] is not None else None 

1664 epoch_height_range[1] = tip_date_range[1] - date_range[0] if date_range[0] is not None else None 

1665 epoch_height_range = tuple(epoch_height_range) 

1666 epoch_height_ranges.append(epoch_height_range) 

1667 rate_objects_by_epoch_height_range[epoch_height_range] = [] 

1668 rate_objects = rate_objects_by_epoch_height_range[epoch_height_range] 

1669 # Then proceed for every pair of readings in this unit: 

1670 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1671 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1672 # Skip diagonal elements: 

1673 if k_1 == k_2: 

1674 continue 

1675 # If the first reading has no transcriptional relation to the second in this unit, then use the default rate: 

1676 if (rdg_id_1, rdg_id_2) not in transcriptional_relations: 

1677 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1678 continue 

1679 # Otherwise, if only one category of transcriptional relations holds between the first and second readings, 

1680 # then use its rate: 

1681 if len(transcriptional_relations[(rdg_id_1, rdg_id_2)]) == 1: 

1682 # If there is only one such category, then add its rate as a standalone var element: 

1683 transcriptional_category = list(transcriptional_relations[(rdg_id_1, rdg_id_2)])[0] 

1684 rate_objects.append( 

1685 {"transcriptional_categories": [transcriptional_category], "expression": None} 

1686 ) 

1687 continue 

1688 # If there is more than one, then add a var element that is a sum of the individual categories' rates: 

1689 transcriptional_categories = list(transcriptional_relations[(rdg_id_1, rdg_id_2)]) 

1690 args = [] 

1691 for transcriptional_category in transcriptional_categories: 

1692 args.append("%s_rate" % transcriptional_category) 

1693 args_string = " ".join(args) 

1694 ops = ["+"] * (len(args) - 1) 

1695 ops_string = " ".join(ops) 

1696 expression_string = " ".join([args_string, ops_string]) 

1697 rate_objects.append( 

1698 { 

1699 "transcriptional_categories": transcriptional_categories, 

1700 "expression": expression_string, 

1701 } 

1702 ) 

1703 # Now reorder the list of epoch height ranges, and get a list of non-null epoch dates in ascending order from the dictionary: 

1704 epoch_height_ranges.reverse() 

1705 epoch_heights = [ 

1706 epoch_height_range[0] for epoch_height_range in epoch_height_ranges if epoch_height_range[0] is not None 

1707 ] 

1708 # Then add all of these data structures to the variation unit object: 

1709 variation_unit_object["epoch_heights"] = epoch_heights 

1710 variation_unit_object["epoch_heights_string"] = " ".join( 

1711 [str(epoch_height) for epoch_height in epoch_heights] 

1712 ) 

1713 variation_unit_object["epoch_height_ranges"] = epoch_height_ranges 

1714 variation_unit_object["epoch_rates"] = [ 

1715 rate_objects_by_epoch_height_range[epoch_height_range] for epoch_height_range in epoch_height_ranges 

1716 ] 

1717 variation_unit_objects.append(variation_unit_object) 

1718 # Then proceed to intrinsic odds categories: 

1719 for intrinsic_category in self.intrinsic_categories: 

1720 intrinsic_category_object = {} 

1721 # Copy the ID of this intrinsic category: 

1722 intrinsic_category_object["id"] = intrinsic_category 

1723 # Then copy the odds factors associated with this intrinsic category, 

1724 # setting it to 1.0 if it is not specified and setting the estimate attribute accordingly: 

1725 odds = self.intrinsic_odds_by_id[intrinsic_category] 

1726 intrinsic_category_object["odds"] = odds if odds is not None else 1.0 

1727 intrinsic_category_object["estimate"] = "false" if odds is not None else "true" 

1728 intrinsic_category_objects.append(intrinsic_category_object) 

1729 # Then proceed to transcriptional rate categories: 

1730 rng = np.random.default_rng(seed) 

1731 for transcriptional_category in self.transcriptional_categories: 

1732 transcriptional_category_object = {} 

1733 # Copy the ID of this transcriptional category: 

1734 transcriptional_category_object["id"] = transcriptional_category 

1735 # Then copy the rate of this transcriptional category, 

1736 # setting it to a random number sampled from a Gamma distribution if it is not specified and setting the estimate attribute accordingly: 

1737 rate = self.transcriptional_rates_by_id[transcriptional_category] 

1738 transcriptional_category_object["rate"] = rate if rate is not None else rng.gamma(5.0, 2.0) 

1739 transcriptional_category_object["estimate"] = "false" if rate is not None else "true" 

1740 transcriptional_category_objects.append(transcriptional_category_object) 

1741 # Now render the output XML file using the Jinja template: 

1742 env = Environment(loader=PackageLoader("teiphy", "templates"), autoescape=select_autoescape()) 

1743 template = env.get_template("beast_template.xml") 

1744 rendered = template.render( 

1745 nsymbols=len(symbols), 

1746 date_map=date_map, 

1747 origin_span=origin_span, 

1748 clock_model=clock_model.value, 

1749 clock_rate_categories=2 * len(self.witnesses) - 1, 

1750 ancestral_logger=ancestral_logger.value, 

1751 witnesses=witness_objects, 

1752 variation_units=variation_unit_objects, 

1753 non_constant_variation_units=[ 

1754 variation_unit_object 

1755 for variation_unit_object in variation_unit_objects 

1756 if not variation_unit_object["is_constant"] 

1757 ], 

1758 constant_variation_unit_filter=",".join( 

1759 [ 

1760 str(variation_unit_object["index"]) 

1761 for variation_unit_object in variation_unit_objects 

1762 if variation_unit_object["is_constant"] 

1763 ] 

1764 ), 

1765 intrinsic_categories=intrinsic_category_objects, 

1766 transcriptional_categories=transcriptional_category_objects, 

1767 ) 

1768 # Generate all parent folders for this file that don't already exist: 

1769 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1770 with open(file_addr, "w", encoding="utf-8") as f: 

1771 f.write(rendered) 

1772 return 

1773 

1774 def to_numpy(self, drop_constant: bool = False, split_missing: SplitMissingType = None): 

1775 """Returns this Collation in the form of a NumPy array, along with arrays of its row and column labels. 

1776 

1777 Args: 

1778 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1779 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

1780 If not specified, then missing data is ignored (i.e., all states are 0). 

1781 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

1782 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

1783 

1784 Returns: 

1785 A NumPy array with a row for each substantive reading and a column for each witness. 

1786 A list of substantive reading ID strings. 

1787 A list of witness ID strings. 

1788 """ 

1789 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1790 substantive_variation_unit_ids = self.variation_unit_ids 

1791 if drop_constant: 

1792 substantive_variation_unit_ids = [ 

1793 vu_id 

1794 for vu_id in self.variation_unit_ids 

1795 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1796 ] 

1797 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1798 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1799 # Initialize the output array with the appropriate dimensions: 

1800 reading_labels = [] 

1801 for vu in self.variation_units: 

1802 if vu.id not in substantive_variation_unit_ids_set: 

1803 continue 

1804 for rdg in vu.readings: 

1805 key = tuple([vu.id, rdg.id]) 

1806 if key in substantive_variation_unit_reading_tuples_set: 

1807 reading_labels.append(vu.id + ", " + rdg.text) 

1808 witness_labels = [wit.id for wit in self.witnesses] 

1809 matrix = np.zeros((len(reading_labels), len(witness_labels)), dtype=float) 

1810 # For each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings: 

1811 support_proportions_by_unit = {} 

1812 for j, vu_id in enumerate(self.variation_unit_ids): 

1813 if vu_id not in substantive_variation_unit_ids_set: 

1814 continue 

1815 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

1816 for i, wit in enumerate(self.witnesses): 

1817 rdg_support = self.readings_by_witness[wit.id][j] 

1818 for l, w in enumerate(rdg_support): 

1819 support_proportions[l] += w 

1820 norm = ( 

1821 sum(support_proportions) if sum(support_proportions) > 0 else 1.0 

1822 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

1823 for l in range(len(support_proportions)): 

1824 support_proportions[l] = support_proportions[l] / norm 

1825 support_proportions_by_unit[vu_id] = support_proportions 

1826 # Then populate it with the appropriate values: 

1827 col_ind = 0 

1828 with tqdm(total=len(self.witnesses)) as pbar: 

1829 for i, wit in enumerate(self.witnesses): 

1830 row_ind = 0 

1831 for j, vu_id in enumerate(self.variation_unit_ids): 

1832 if vu_id not in substantive_variation_unit_ids_set: 

1833 continue 

1834 rdg_support = self.readings_by_witness[wit.id][j] 

1835 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

1836 if sum(rdg_support) == 0: 

1837 if split_missing == SplitMissingType.uniform: 

1838 for l in range(len(rdg_support)): 

1839 matrix[row_ind, col_ind] = 1 / len(rdg_support) 

1840 row_ind += 1 

1841 elif split_missing == SplitMissingType.proportional: 

1842 for l in range(len(rdg_support)): 

1843 matrix[row_ind, col_ind] = support_proportions_by_unit[vu_id][l] 

1844 row_ind += 1 

1845 else: 

1846 row_ind += len(rdg_support) 

1847 # Otherwise, add its coefficients normally: 

1848 else: 

1849 for l in range(len(rdg_support)): 

1850 matrix[row_ind, col_ind] = rdg_support[l] 

1851 row_ind += 1 

1852 col_ind += 1 

1853 pbar.update(1) 

1854 return matrix, reading_labels, witness_labels 

1855 

1856 def get_ext_matrix(self, drop_constant: bool = False, split_missing: SplitMissingType = None): 

1857 """Returns a NumPy matrix containing a row and column for each witness and the number of variation units shared by the row and column witnesses in each cell. 

1858 Note that if the split_missing option is specified, all variation units are counted. 

1859 

1860 Args: 

1861 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1862 Default value is False. 

1863 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

1864 If not specified, then missing data is ignored (i.e., all states are 0). 

1865 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

1866 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

1867 

1868 Returns: 

1869 A NumPy matrix with a row and column for each witness and the number of variation units shared by the row and column witnesses in each cell. 

1870 """ 

1871 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1872 substantive_variation_unit_ids = self.variation_unit_ids 

1873 if drop_constant: 

1874 substantive_variation_unit_ids = [ 

1875 vu_id 

1876 for vu_id in self.variation_unit_ids 

1877 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1878 ] 

1879 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1880 # Then initialize the output matrix: 

1881 witness_labels = [wit.id for wit in self.witnesses] 

1882 ext_matrix = ext_matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) 

1883 # If the split_missing option has been specified, then all entries in the matrix will be the number of substantive variation units, 

1884 # so we can just fill the matrix with this value and return it: 

1885 if split_missing is not None: 

1886 ext_matrix = ext_matrix = np.full( 

1887 (len(witness_labels), len(witness_labels)), len(substantive_variation_unit_ids), dtype=int 

1888 ) 

1889 return ext_matrix 

1890 # Otherwise, populate the matrix for all pairs of witnesses: 

1891 with tqdm(total=len(self.witnesses) ** 2) as pbar: 

1892 # Then calculate the mutual information contribution for each pair of witnesses: 

1893 for i, wit_1 in enumerate(witness_labels): 

1894 for j, wit_2 in enumerate(witness_labels): 

1895 shared_ext_units = 0 

1896 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified, 

1897 # so we only have to calculate it once: 

1898 if i > j: 

1899 pbar.update(1) 

1900 continue 

1901 # Otherwise, calculate the number of substantive variation units at which both of these witnesses are extant: 

1902 for k, vu_id in enumerate(self.variation_unit_ids): 

1903 if vu_id not in substantive_variation_unit_ids_set: 

1904 continue 

1905 wit_1_rdg_support = self.readings_by_witness[wit_1][k] 

1906 wit_2_rdg_support = self.readings_by_witness[wit_2][k] 

1907 if sum(wit_1_rdg_support) == 0.0 or sum(wit_2_rdg_support) == 0.0: 

1908 continue 

1909 shared_ext_units += 1 

1910 ext_matrix[i][j] = shared_ext_units 

1911 ext_matrix[j][i] = shared_ext_units 

1912 pbar.update(1) 

1913 return ext_matrix 

1914 

1915 def transform_matrix(self, matrix: np.ndarray, transform_matrix: TransformMatrixType = None): 

1916 """Transforms a given matrix's columns based on the specified transform_matrix option. 

1917 

1918 Args: 

1919 matrix (ndarray): The matrix whose columns are to be transformed. 

1920 transform_matrix (TransformMatrixType, optional): A TransformMatrixType option indicating how the columns of a witness-to-witness matrix output should be transformed. 

1921 Only applicable for tabular outputs in which the rows and columns correspond to the witnesses in the collation. 

1922 

1923 Returns: 

1924 A Pandas DataFrame corresponding to a collation matrix with reading frequencies or a long table with discrete reading states. 

1925 """ 

1926 # If no transform_matrix option was supplied, then return the matrix as-is: 

1927 if transform_matrix is None: 

1928 return matrix 

1929 # Otherwise, apply the specified column transformation: 

1930 if transform_matrix == TransformMatrixType.stddev: 

1931 means = np.mean(matrix, 0) # get the means of the columns (axis 0) 

1932 stddevs = np.std(matrix, 0) # get the standard deviations of the columns (axis 0) 

1933 transformed_matrix = np.full( 

1934 matrix.shape, 0.0, dtype=float 

1935 ) # the standard deviation can only be 0 if all values equal the mean, so a default value of 0 is acceptable 

1936 np.divide(matrix - means, stddevs, out=transformed_matrix, where=(stddevs != 0)) 

1937 return transformed_matrix 

1938 if transform_matrix == TransformMatrixType.mad: 

1939 medians = np.median(matrix, 0) # get the medians of the columns (axis 0) 

1940 mads = np.median(np.abs(matrix - medians), 0) # get the median absolute deviations of the columns (axis 0) 

1941 transformed_matrix = np.full( 

1942 matrix.shape, np.nan, dtype=float 

1943 ) # the MAD can be 0 even if all values are not equal to the median, so NaN should be used if a column's MAD is 0 

1944 np.divide(matrix - medians, mads, out=transformed_matrix, where=(mads != 0)) 

1945 return transformed_matrix 

1946 

1947 def to_distance_matrix( 

1948 self, 

1949 drop_constant: bool = False, 

1950 proportion: bool = False, 

1951 show_ext: bool = False, 

1952 transform_matrix: TransformMatrixType = None, 

1953 ): 

1954 """Transforms this Collation into a NumPy distance matrix between witnesses, along with an array of its labels for the witnesses. 

1955 Distances can be computed either as counts of disagreements (the default setting), or as proportions of disagreements over all variation units where both witnesses have singleton readings. 

1956 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of disagreements. 

1957 

1958 Args: 

1959 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1960 Default value is False. 

1961 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

1962 Default value is False. 

1963 show_ext: An optional flag indicating whether each cell in the matrix 

1964 should include the number of their extant, unambiguous variation units after the number of their disagreements. 

1965 Default value is False. 

1966 transform_matrix (TransformMatrixType, optional): A TransformMatrixType option indicating how the columns of the matrix should be transformed. 

1967 

1968 Returns: 

1969 A NumPy distance matrix with a row and column for each witness. 

1970 A list of witness ID strings. 

1971 """ 

1972 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1973 substantive_variation_unit_ids = self.variation_unit_ids 

1974 if drop_constant: 

1975 substantive_variation_unit_ids = [ 

1976 vu_id 

1977 for vu_id in self.variation_unit_ids 

1978 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1979 ] 

1980 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1981 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1982 # Initialize the output array with the appropriate dimensions: 

1983 witness_labels = [wit.id for wit in self.witnesses] 

1984 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form disagreements 

1985 with tqdm(total=len(self.witnesses) ** 2) as pbar: 

1986 for i, wit_1 in enumerate(witness_labels): 

1987 for j, wit_2 in enumerate(witness_labels): 

1988 disagreements = 0 

1989 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified, 

1990 # so we only have to calculate it once: 

1991 if i > j: 

1992 pbar.update(1) 

1993 continue 

1994 # Otherwise, calculate the number of units where both witnesses disagree: 

1995 for k, vu_id in enumerate(self.variation_unit_ids): 

1996 if vu_id not in substantive_variation_unit_ids_set: 

1997 continue 

1998 wit_1_rdg_support = self.readings_by_witness[wit_1][k] 

1999 wit_2_rdg_support = self.readings_by_witness[wit_2][k] 

2000 # If either witness is lacunose, then move on: 

2001 if sum(wit_1_rdg_support) == 0.0 or sum(wit_2_rdg_support) == 0.0: 

2002 continue 

2003 # Otherwise, if the (potential) readings of the two witnesses do not overlap, then count them as disagreeing: 

2004 if ( 

2005 sum([wit_1_rdg_support[l] * wit_2_rdg_support[l] for l in range(len(wit_1_rdg_support))]) 

2006 == 0.0 

2007 ): 

2008 disagreements += 1 

2009 matrix[i, j] = disagreements 

2010 matrix[j, i] = disagreements 

2011 pbar.update(1) 

2012 # Initialize a matrix for shared extant variation units for witnesses, and populate it if the proportion or show_ext option is specified: 

2013 ext_matrix = None 

2014 if proportion or show_ext: 

2015 ext_matrix = self.get_ext_matrix(drop_constant=drop_constant) 

2016 # If the proportion option is set, then divide every value in the matrix by the corresponding entry in the matrix of shared extant variation units: 

2017 if proportion: 

2018 proportion_matrix = np.full((len(witness_labels), len(witness_labels)), 0.0, dtype=float) 

2019 np.divide( 

2020 matrix, ext_matrix, out=proportion_matrix, where=(ext_matrix != 0) 

2021 ) # division by 0 can occur if two witnesses have no overlapping units; leave their proportion as 0.0 

2022 matrix = proportion_matrix 

2023 # Then transform the columns of the main matrix as specified: 

2024 matrix = self.transform_matrix(matrix, transform_matrix) 

2025 # If the show_ext option is set, then append the number of shared extant variation units after the matrix's values: 

2026 if show_ext: 

2027 serialized_values = [] 

2028 for i, wit_1 in enumerate(witness_labels): 

2029 serialized_values.append([]) 

2030 for j, wit_2 in enumerate(witness_labels): 

2031 serialized_values[-1].append("/".join([str(matrix[i][j]), str(ext_matrix[i][j])])) 

2032 matrix = np.array(serialized_values) 

2033 return matrix, witness_labels 

2034 

2035 def to_similarity_matrix( 

2036 self, 

2037 drop_constant: bool = False, 

2038 proportion: bool = False, 

2039 show_ext: bool = False, 

2040 transform_matrix: TransformMatrixType = None, 

2041 ): 

2042 """Transforms this Collation into a NumPy similarity matrix between witnesses, along with an array of its labels for the witnesses. 

2043 Similarities can be computed either as counts of agreements (the default setting), or as proportions of agreements over all variation units where both witnesses have singleton readings. 

2044 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of agreements. 

2045 

2046 Args: 

2047 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2048 Default value is False. 

2049 proportion (bool, optional): An optional flag indicating whether or not to calculate similarities as proportions over extant, unambiguous variation units. 

2050 Default value is False. 

2051 show_ext: An optional flag indicating whether each cell in the matrix 

2052 should include the number of their extant, unambiguous variation units after the number of agreements. 

2053 Default value is False. 

2054 transform_matrix (TransformMatrixType, optional): A TransformMatrixType option indicating how the columns of the matrix should be transformed. 

2055 

2056 Returns: 

2057 A NumPy agreement matrix with a row and column for each witness. 

2058 A list of witness ID strings. 

2059 """ 

2060 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2061 substantive_variation_unit_ids = self.variation_unit_ids 

2062 if drop_constant: 

2063 substantive_variation_unit_ids = [ 

2064 vu_id 

2065 for vu_id in self.variation_unit_ids 

2066 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2067 ] 

2068 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2069 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2070 # Initialize the output array with the appropriate dimensions: 

2071 witness_labels = [wit.id for wit in self.witnesses] 

2072 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form agreements 

2073 with tqdm(total=len(self.witnesses) ** 2) as pbar: 

2074 for i, wit_1 in enumerate(witness_labels): 

2075 for j, wit_2 in enumerate(witness_labels): 

2076 agreements = 0 

2077 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified, 

2078 # so we only have to calculate it once: 

2079 if i > j: 

2080 pbar.update(1) 

2081 continue 

2082 # Otherwise, calculate the number of units where both witnesses unambiguously agree: 

2083 for k, vu_id in enumerate(self.variation_unit_ids): 

2084 if vu_id not in substantive_variation_unit_ids_set: 

2085 continue 

2086 wit_1_rdg_support = self.readings_by_witness[wit_1][k] 

2087 wit_2_rdg_support = self.readings_by_witness[wit_2][k] 

2088 wit_1_rdg_inds = [l for l, w in enumerate(wit_1_rdg_support) if w > 0] 

2089 wit_2_rdg_inds = [l for l, w in enumerate(wit_2_rdg_support) if w > 0] 

2090 if len(wit_1_rdg_inds) != 1 or len(wit_2_rdg_inds) != 1: 

2091 continue 

2092 if wit_1_rdg_inds[0] == wit_2_rdg_inds[0]: 

2093 agreements += 1 

2094 matrix[i, j] = agreements 

2095 matrix[j, i] = agreements 

2096 pbar.update(1) 

2097 # Initialize a matrix for shared extant variation units for witnesses, and populate it if the proportion or show_ext option is specified: 

2098 ext_matrix = None 

2099 if proportion or show_ext: 

2100 ext_matrix = self.get_ext_matrix(drop_constant=drop_constant) 

2101 # If the proportion option is set, then divide every value in the matrix by the corresponding entry in the matrix of shared extant variation units: 

2102 if proportion: 

2103 proportion_matrix = np.full((len(witness_labels), len(witness_labels)), 0.0, dtype=float) 

2104 np.divide( 

2105 matrix, ext_matrix, out=proportion_matrix, where=(ext_matrix != 0) 

2106 ) # division by 0 can occur if two witnesses have no overlapping units; leave their proportion as 0.0 

2107 matrix = proportion_matrix 

2108 # Then transform the columns of the main matrix as specified: 

2109 matrix = self.transform_matrix(matrix, transform_matrix) 

2110 # If the show_ext option is set, then append the number of shared extant variation units after the matrix's values: 

2111 if show_ext: 

2112 serialized_values = [] 

2113 for i, wit_1 in enumerate(witness_labels): 

2114 serialized_values.append([]) 

2115 for j, wit_2 in enumerate(witness_labels): 

2116 serialized_values[-1].append("/".join([str(matrix[i][j]), str(ext_matrix[i][j])])) 

2117 matrix = np.array(serialized_values) 

2118 return matrix, witness_labels 

2119 

2120 def to_idf_matrix( 

2121 self, 

2122 drop_constant: bool = False, 

2123 split_missing: SplitMissingType = None, 

2124 proportion: bool = False, 

2125 show_ext: bool = False, 

2126 transform_matrix: TransformMatrixType = None, 

2127 ): 

2128 """Transforms this Collation into a NumPy matrix of agreements between witnesses weighted by inverse document frequency (IDF), along with an array of its labels for the witnesses. 

2129 The IDF weight of an agreement on a given reading is the information content -log(Pr(R)) of the event R of randomly sampling a witness with that reading. 

2130 The IDF-weighted agreement score for two witnesses is the sum of the IDF weights for the readings at which they agree. 

2131 Where any witness is ambiguous, it contributes to its potential readings' sampling probabilities in proportion to its degrees of support for those readings. 

2132 Similarly, where one or both target witnesses are ambiguous, the expected information content of their agreement is calculated based on the probabilities of their having the same reading. 

2133 If a split_missing argument is supplied, then lacunae are handled in the same way. 

2134 If the proportion option is set to True, then each cell will contain the mean IDF weight for its corresponding pair of witnesses over all variation units at which both are extant. 

2135 (If the split_missing argument is also specified, then the mean is taken over all substantive variation units.) 

2136 

2137 Args: 

2138 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2139 Default value is False. 

2140 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2141 If not specified, then missing data is ignored (i.e., all states are 0). 

2142 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2143 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2144 proportion (bool, optional): An optional flag indicating whether or not to calculate the mean IDF weight of each pair of witnesses' agreements as opposed to the total IDF weight of their agreements. 

2145 Default value is False. 

2146 show_ext: An optional flag indicating whether each cell in the matrix 

2147 should include the number of their extant, unambiguous variation units after the number of agreements. 

2148 Default value is False. 

2149 transform_matrix (TransformMatrixType, optional): A TransformMatrixType option indicating how the columns of the matrix should be transformed. 

2150 

2151 Returns: 

2152 A NumPy IDF-weighted agreement matrix with a row and column for each witness. 

2153 A list of witness ID strings. 

2154 """ 

2155 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2156 substantive_variation_unit_ids = self.variation_unit_ids 

2157 if drop_constant: 

2158 substantive_variation_unit_ids = [ 

2159 vu_id 

2160 for vu_id in self.variation_unit_ids 

2161 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2162 ] 

2163 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2164 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2165 # Initialize the output array with the appropriate dimensions: 

2166 witness_labels = [wit.id for wit in self.witnesses] 

2167 # If the split_missing option is "proportional", then for each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings: 

2168 support_proportions_by_unit = {} 

2169 if split_missing == SplitMissingType.proportional: 

2170 for k, vu_id in enumerate(self.variation_unit_ids): 

2171 # Skip this variation unit if it is a dropped constant site: 

2172 if vu_id not in substantive_variation_unit_ids_set: 

2173 continue 

2174 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2175 for i, wit in enumerate(witness_labels): 

2176 rdg_support = self.readings_by_witness[wit][k] 

2177 for l, w in enumerate(rdg_support): 

2178 support_proportions[l] += w 

2179 norm = ( 

2180 sum(support_proportions) if sum(support_proportions) > 0 else 1.0 

2181 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2182 for l in range(len(support_proportions)): 

2183 support_proportions[l] = support_proportions[l] / norm 

2184 support_proportions_by_unit[vu_id] = support_proportions 

2185 # Then populate data structures mapping each variation unit's ID to normalized reading support dictionaries 

2186 # and vectors of sampling probabilities for its substantive readings: 

2187 normalized_reading_support_dicts_by_vu_id = {} 

2188 sampling_probabilities_by_vu_id = {} 

2189 for k, vu_id in enumerate(self.variation_unit_ids): 

2190 # Skip this variation unit if it is a dropped constant site: 

2191 if vu_id not in substantive_variation_unit_ids_set: 

2192 continue 

2193 # Otherwise, populate normalized reading support vector dictionaries and sampling probability vectors in this unit: 

2194 normalized_reading_support_by_wit = {} 

2195 sampling_probabilities = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2196 for i, wit in enumerate(witness_labels): 

2197 rdg_support = self.readings_by_witness[wit][k] 

2198 # Check if this reading support vector represents missing data: 

2199 norm = sum(rdg_support) 

2200 if norm == 0: 

2201 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

2202 if split_missing == SplitMissingType.uniform: 

2203 rdg_support = [1 / len(rdg_support) for l in range(len(rdg_support))] 

2204 elif split_missing == SplitMissingType.proportional: 

2205 rdg_support = [support_proportions_by_unit[vu_id][l] for l in range(len(rdg_support))] 

2206 else: 

2207 # Otherwise, the data is present, though it may be ambiguous; normalize the reading probabilities to sum to 1: 

2208 rdg_support = [w / norm for l, w in enumerate(rdg_support)] 

2209 normalized_reading_support_by_wit[wit] = rdg_support 

2210 # Then add this witness's contributions to the readings' sampling probabilities: 

2211 for l, w in enumerate(normalized_reading_support_by_wit[wit]): 

2212 sampling_probabilities[l] += w 

2213 norm = ( 

2214 sum(sampling_probabilities) if sum(sampling_probabilities) > 0 else 1.0 

2215 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2216 # Otherwise, normalize the sampling probabilities so they sum to 1: 

2217 sampling_probabilities = [w / norm for w in sampling_probabilities] 

2218 normalized_reading_support_dicts_by_vu_id[vu_id] = normalized_reading_support_by_wit 

2219 sampling_probabilities_by_vu_id[vu_id] = sampling_probabilities 

2220 # Then populate the matrix with the total expected information content for agreements between each pair of witnesses: 

2221 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float) 

2222 with tqdm(total=len(self.witnesses) ** 2) as pbar: 

2223 for i, wit_1 in enumerate(witness_labels): 

2224 for j, wit_2 in enumerate(witness_labels): 

2225 total_information_content = 0.0 

2226 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified, 

2227 # so we only have to calculate it once: 

2228 if i > j: 

2229 pbar.update(1) 

2230 continue 

2231 # Otherwise, calculate the expected information content of agreements between these witnesses in each substantive variation unit 

2232 # based on the sampling probabilities of the substantive readings in the unit: 

2233 for k, vu_id in enumerate(self.variation_unit_ids): 

2234 if vu_id not in substantive_variation_unit_ids_set: 

2235 continue 

2236 wit_1_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_1] 

2237 wit_2_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_2] 

2238 sampling_probabilities = sampling_probabilities_by_vu_id[vu_id] 

2239 # First, calculate the probability that these two witnesses agree: 

2240 probability_of_agreement = sum( 

2241 [wit_1_rdg_support[l] * wit_2_rdg_support[l] for l in range(len(sampling_probabilities))] 

2242 ) 

2243 # If these witnesses do not agree at this variation unit, then this unit contributes nothing to their total score: 

2244 if probability_of_agreement == 0.0: 

2245 continue 

2246 # Otherwise, calculate the expected information content (in bits) of their agreement given their agreement on that reading 

2247 # (skipping readings with a sampling probability of 0): 

2248 expected_information_content = sum( 

2249 [ 

2250 -math.log2(sampling_probabilities[l]) 

2251 * (wit_1_rdg_support[l] * wit_2_rdg_support[l] / probability_of_agreement) 

2252 for l in range(len(sampling_probabilities)) 

2253 if sampling_probabilities[l] > 0.0 

2254 ] 

2255 ) 

2256 # Then add this contribution to the total score for these two witnesses: 

2257 total_information_content += expected_information_content 

2258 matrix[i, j] = total_information_content 

2259 matrix[j, i] = total_information_content 

2260 pbar.update(1) 

2261 # Initialize a matrix for shared extant variation units for witnesses, and populate it if the proportion or show_ext option is specified: 

2262 ext_matrix = None 

2263 if proportion or show_ext: 

2264 ext_matrix = self.get_ext_matrix(drop_constant=drop_constant, split_missing=split_missing) 

2265 # If the proportion option is set, then divide every value in the matrix by the corresponding entry in the matrix of shared extant variation units: 

2266 if proportion: 

2267 proportion_matrix = np.full((len(witness_labels), len(witness_labels)), 0.0, dtype=float) 

2268 np.divide( 

2269 matrix, ext_matrix, out=proportion_matrix, where=(ext_matrix != 0) 

2270 ) # division by 0 can occur if two witnesses have no overlapping units; leave their proportion as 0.0 

2271 matrix = proportion_matrix 

2272 # Then transform the columns of the main matrix as specified: 

2273 matrix = self.transform_matrix(matrix, transform_matrix) 

2274 # If the show_ext option is set, then append the number of shared extant variation units after the matrix's values: 

2275 if show_ext: 

2276 serialized_values = [] 

2277 for i, wit_1 in enumerate(witness_labels): 

2278 serialized_values.append([]) 

2279 for j, wit_2 in enumerate(witness_labels): 

2280 serialized_values[-1].append("/".join([str(matrix[i][j]), str(ext_matrix[i][j])])) 

2281 matrix = np.array(serialized_values) 

2282 return matrix, witness_labels 

2283 

2284 def to_mi_matrix( 

2285 self, 

2286 drop_constant: bool = False, 

2287 split_missing: SplitMissingType = None, 

2288 proportion: bool = False, 

2289 show_ext: bool = False, 

2290 transform_matrix: TransformMatrixType = None, 

2291 ): 

2292 """Transforms this Collation into a NumPy matrix of the total mutual information (MI), in bits, between witnesses over all variation units, along with an array of its labels for the witnesses. 

2293 This is equivalent to the total Kullback-Leibler divergence of the joint distribution of the witnesses' observed readings 

2294 from the joint distribution of their expected readings under the assumption that the witnesses are independent, taken over all variation units. 

2295 The value of 0 if and only if the witnesses are completely independent. 

2296 

2297 Args: 

2298 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2299 Default value is False. 

2300 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2301 If not specified, then missing data is ignored (i.e., all states are 0). 

2302 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2303 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2304 proportion (bool, optional): An optional flag indicating whether or not to calculate the mean IDF weight of each pair of witnesses' agreements as opposed to the total IDF weight of their agreements. 

2305 Default value is False. 

2306 show_ext: An optional flag indicating whether each cell in the matrix 

2307 should include the number of their extant, unambiguous variation units after the number of agreements. 

2308 Default value is False. 

2309 transform_matrix (TransformMatrixType, optional): A TransformMatrixType option indicating how the columns of the matrix should be transformed. 

2310 

2311 Returns: 

2312 A NumPy MI matrix with a row and column for each witness. 

2313 A list of witness ID strings. 

2314 """ 

2315 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2316 substantive_variation_unit_ids = self.variation_unit_ids 

2317 if drop_constant: 

2318 substantive_variation_unit_ids = [ 

2319 vu_id 

2320 for vu_id in self.variation_unit_ids 

2321 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2322 ] 

2323 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2324 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2325 # Initialize the output array with the appropriate dimensions: 

2326 witness_labels = [wit.id for wit in self.witnesses] 

2327 # If the split_missing option is "proportional", then for each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings: 

2328 support_proportions_by_unit = {} 

2329 if split_missing == SplitMissingType.proportional: 

2330 for k, vu_id in enumerate(self.variation_unit_ids): 

2331 # Skip this variation unit if it is a dropped constant site: 

2332 if vu_id not in substantive_variation_unit_ids_set: 

2333 continue 

2334 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2335 for i, wit in enumerate(witness_labels): 

2336 rdg_support = self.readings_by_witness[wit][k] 

2337 for l, w in enumerate(rdg_support): 

2338 support_proportions[l] += w 

2339 norm = ( 

2340 sum(support_proportions) if sum(support_proportions) > 0 else 1.0 

2341 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2342 for l in range(len(support_proportions)): 

2343 support_proportions[l] = support_proportions[l] / norm 

2344 support_proportions_by_unit[vu_id] = support_proportions 

2345 # Then populate data structures mapping each variation unit's ID to normalized reading support dictionaries, 

2346 # vectors of sampling probabilities for its substantive readings, and expected joint probability matrices: 

2347 normalized_reading_support_dicts_by_vu_id = {} 

2348 sampling_probabilities_by_vu_id = {} 

2349 expected_joint_probabilities_by_vu_id = {} 

2350 for k, vu_id in enumerate(self.variation_unit_ids): 

2351 # Skip this variation unit if it is a dropped constant site: 

2352 if vu_id not in substantive_variation_unit_ids_set: 

2353 continue 

2354 # Otherwise, populate normalized reading support vector dictionaries and sampling probability vectors in this unit: 

2355 normalized_reading_support_by_wit = {} 

2356 sampling_probabilities = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2357 for i, wit in enumerate(witness_labels): 

2358 rdg_support = self.readings_by_witness[wit][k] 

2359 # Check if this reading support vector represents missing data: 

2360 norm = sum(rdg_support) 

2361 if norm == 0: 

2362 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

2363 if split_missing == SplitMissingType.uniform: 

2364 rdg_support = [1 / len(rdg_support) for l in range(len(rdg_support))] 

2365 elif split_missing == SplitMissingType.proportional: 

2366 rdg_support = [support_proportions_by_unit[vu_id][l] for l in range(len(rdg_support))] 

2367 else: 

2368 # Otherwise, the data is present, though it may be ambiguous; normalize the reading probabilities to sum to 1: 

2369 rdg_support = [w / norm for l, w in enumerate(rdg_support)] 

2370 normalized_reading_support_by_wit[wit] = rdg_support 

2371 # Then add this witness's contributions to the readings' sampling probabilities: 

2372 for l, w in enumerate(normalized_reading_support_by_wit[wit]): 

2373 sampling_probabilities[l] += w 

2374 norm = ( 

2375 sum(sampling_probabilities) if sum(sampling_probabilities) > 0 else 1.0 

2376 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2377 # Otherwise, normalize the sampling probabilities so they sum to 1: 

2378 sampling_probabilities = [w / norm for w in sampling_probabilities] 

2379 normalized_reading_support_dicts_by_vu_id[vu_id] = normalized_reading_support_by_wit 

2380 sampling_probabilities_by_vu_id[vu_id] = sampling_probabilities 

2381 # Then populate a contingency table for the expected probabilities of joint support in this unit: 

2382 expected_joint_probabilities = np.full( 

2383 (len(sampling_probabilities), len(sampling_probabilities)), 0, dtype=float 

2384 ) 

2385 for l1 in range(len(sampling_probabilities)): 

2386 for l2 in range(len(sampling_probabilities)): 

2387 expected_joint_probabilities[l1, l2] = sampling_probabilities[l1] * sampling_probabilities[l2] 

2388 expected_joint_probabilities_by_vu_id[vu_id] = expected_joint_probabilities 

2389 # Then populate the matrix one variation unit at a time: 

2390 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float) 

2391 with tqdm(total=len(self.witnesses) ** 2) as pbar: 

2392 # Then calculate the mutual information contribution for each pair of witnesses: 

2393 for i, wit_1 in enumerate(witness_labels): 

2394 for j, wit_2 in enumerate(witness_labels): 

2395 total_mutual_information = 0.0 

2396 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified, 

2397 # so we only have to calculate it once: 

2398 if i > j: 

2399 pbar.update(1) 

2400 continue 

2401 # Otherwise, calculate the mutual information between these witnesses in each substantive variation unit: 

2402 for k, vu_id in enumerate(self.variation_unit_ids): 

2403 if vu_id not in substantive_variation_unit_ids_set: 

2404 continue 

2405 wit_1_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_1] 

2406 wit_2_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_2] 

2407 sampling_probabilities = sampling_probabilities_by_vu_id[vu_id] 

2408 expected_joint_probabilities = expected_joint_probabilities_by_vu_id[vu_id] 

2409 # If either witness has an all-zeroes vector (because it is lacunose in this unit), then we can skip these witnesses here: 

2410 if sum(wit_1_rdg_support) == 0 or sum(wit_2_rdg_support) == 0: 

2411 continue 

2412 # Otherwise, populate a contingency table for the observed probabilities of joint support in this unit: 

2413 observed_joint_probabilities = np.full( 

2414 (len(sampling_probabilities), len(sampling_probabilities)), 0, dtype=float 

2415 ) 

2416 for l1, w1 in enumerate(wit_1_rdg_support): 

2417 for l2, w2 in enumerate(wit_2_rdg_support): 

2418 observed_joint_probabilities[l1, l2] = w1 * w2 

2419 # Then calculate the mutual information using the expected and observed distribution matrices: 

2420 mutual_information = 0.0 

2421 for l1 in range(len(sampling_probabilities)): 

2422 for l2 in range(len(sampling_probabilities)): 

2423 observed = observed_joint_probabilities[l1, l2] 

2424 expected = expected_joint_probabilities[l1, l2] 

2425 if observed == 0: 

2426 continue 

2427 mutual_information += observed * math.log2(observed / expected) 

2428 # Then add this mutual information to the total for these two witnesses: 

2429 total_mutual_information += mutual_information 

2430 matrix[i, j] += total_mutual_information 

2431 matrix[j, i] += total_mutual_information 

2432 pbar.update(1) 

2433 # Initialize a matrix for shared extant variation units for witnesses, and populate it if the proportion or show_ext option is specified: 

2434 ext_matrix = None 

2435 if proportion or show_ext: 

2436 ext_matrix = self.get_ext_matrix(drop_constant=drop_constant, split_missing=split_missing) 

2437 # If the proportion option is set, then divide every value in the matrix by the corresponding entry in the matrix of shared extant variation units: 

2438 if proportion: 

2439 proportion_matrix = np.full((len(witness_labels), len(witness_labels)), 0.0, dtype=float) 

2440 np.divide( 

2441 matrix, ext_matrix, out=proportion_matrix, where=(ext_matrix != 0) 

2442 ) # division by 0 can occur if two witnesses have no overlapping units; leave their proportion as 0.0 

2443 matrix = proportion_matrix 

2444 # Then transform the columns of the main matrix as specified: 

2445 matrix = self.transform_matrix(matrix, transform_matrix) 

2446 # If the show_ext option is set, then append the number of shared extant variation units after the matrix's values: 

2447 if show_ext: 

2448 serialized_values = [] 

2449 for i, wit_1 in enumerate(witness_labels): 

2450 serialized_values.append([]) 

2451 for j, wit_2 in enumerate(witness_labels): 

2452 serialized_values[-1].append("/".join([str(matrix[i][j]), str(ext_matrix[i][j])])) 

2453 matrix = np.array(serialized_values) 

2454 return matrix, witness_labels 

2455 

2456 def to_nexus_table(self, drop_constant: bool = False, ambiguous_as_missing: bool = False): 

2457 """Returns this Collation in the form of a table with rows for taxa, columns for characters, and reading IDs in cells. 

2458 

2459 Args: 

2460 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2461 Default value is False. 

2462 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data. 

2463 Default value is False. 

2464 

2465 Returns: 

2466 A NumPy array with rows for taxa, columns for characters, and reading IDs in cells. 

2467 A list of substantive reading ID strings. 

2468 A list of witness ID strings. 

2469 """ 

2470 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2471 substantive_variation_unit_ids = self.variation_unit_ids 

2472 if drop_constant: 

2473 substantive_variation_unit_ids = [ 

2474 vu_id 

2475 for vu_id in self.variation_unit_ids 

2476 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2477 ] 

2478 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2479 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2480 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary 

2481 # to the readings' IDs: 

2482 reading_ids_by_indices = {} 

2483 for j, vu in enumerate(self.variation_units): 

2484 if vu.id not in substantive_variation_unit_ids_set: 

2485 continue 

2486 k = 0 

2487 for rdg in vu.readings: 

2488 key = tuple([vu.id, rdg.id]) 

2489 if key not in substantive_variation_unit_reading_tuples_set: 

2490 continue 

2491 indices = tuple([j, k]) 

2492 reading_ids_by_indices[indices] = rdg.id 

2493 k += 1 

2494 # Initialize the output array with the appropriate dimensions: 

2495 missing_symbol = '?' 

2496 witness_labels = [wit.id for wit in self.witnesses] 

2497 matrix = np.full( 

2498 (len(witness_labels), len(substantive_variation_unit_ids)), missing_symbol, dtype=object 

2499 ) # use dtype=object because the maximum string length is not known up front 

2500 # Then populate it with the appropriate values: 

2501 with tqdm(total=len(self.witnesses)) as pbar: 

2502 row_ind = 0 

2503 for i, wit in enumerate(self.witnesses): 

2504 col_ind = 0 

2505 for j, vu in enumerate(self.variation_units): 

2506 if vu.id not in substantive_variation_unit_ids_set: 

2507 continue 

2508 rdg_support = self.readings_by_witness[wit.id][j] 

2509 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

2510 if sum(rdg_support) == 0: 

2511 matrix[row_ind, col_ind] = missing_symbol 

2512 # Otherwise, add its coefficients normally: 

2513 else: 

2514 rdg_inds = [ 

2515 k for k, w in enumerate(rdg_support) if w > 0 

2516 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

2517 # For singleton readings, just print the reading ID: 

2518 if len(rdg_inds) == 1: 

2519 k = rdg_inds[0] 

2520 matrix[row_ind, col_ind] = reading_ids_by_indices[(j, k)] 

2521 # For multiple readings, print the corresponding reading IDs in braces or the missing symbol depending on input settings: 

2522 else: 

2523 if ambiguous_as_missing: 

2524 matrix[row_ind, col_ind] = missing_symbol 

2525 else: 

2526 matrix[row_ind, col_ind] = "{%s}" % " ".join( 

2527 [reading_ids_by_indices[(j, k)] for k in rdg_inds] 

2528 ) 

2529 col_ind += 1 

2530 row_ind += 1 

2531 pbar.update(1) 

2532 return matrix, witness_labels, substantive_variation_unit_ids 

2533 

2534 def to_long_table(self, drop_constant: bool = False): 

2535 """Returns this Collation in the form of a long table with columns for taxa, characters, reading indices, and reading values. 

2536 Note that this method treats ambiguous readings as missing data. 

2537 

2538 Args: 

2539 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2540 Default value is False. 

2541 

2542 Returns: 

2543 A NumPy array with columns for taxa, characters, reading indices, and reading values, and rows for each combination of these values in the matrix. 

2544 A list of column label strings. 

2545 """ 

2546 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2547 substantive_variation_unit_ids = self.variation_unit_ids 

2548 if drop_constant: 

2549 substantive_variation_unit_ids = [ 

2550 vu_id 

2551 for vu_id in self.variation_unit_ids 

2552 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2553 ] 

2554 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2555 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2556 # Initialize the outputs: 

2557 column_labels = ["taxon", "character", "state", "value"] 

2558 long_table_list = [] 

2559 # Populate a dictionary mapping (variation unit index, reading index) tuples to reading texts: 

2560 reading_texts_by_indices = {} 

2561 for j, vu in enumerate(self.variation_units): 

2562 if vu.id not in substantive_variation_unit_ids_set: 

2563 continue 

2564 k = 0 

2565 for rdg in vu.readings: 

2566 key = tuple([vu.id, rdg.id]) 

2567 if key not in substantive_variation_unit_reading_tuples_set: 

2568 continue 

2569 indices = tuple([j, k]) 

2570 reading_texts_by_indices[indices] = rdg.text 

2571 k += 1 

2572 # Then populate the output list with the appropriate values: 

2573 witness_labels = [wit.id for wit in self.witnesses] 

2574 missing_symbol = '?' 

2575 with tqdm(total=len(self.witnesses)) as pbar: 

2576 for i, wit in enumerate(self.witnesses): 

2577 row_ind = 0 

2578 for j, vu_id in enumerate(self.variation_unit_ids): 

2579 if vu_id not in substantive_variation_unit_ids_set: 

2580 continue 

2581 rdg_support = self.readings_by_witness[wit.id][j] 

2582 # Populate a list of nonzero coefficients for this reading support vector: 

2583 rdg_inds = [k for k, w in enumerate(rdg_support) if w > 0] 

2584 # If this list does not consist of exactly one reading, then treat it as missing data: 

2585 if len(rdg_inds) != 1: 

2586 long_table_list.append([wit.id, vu_id, missing_symbol, missing_symbol]) 

2587 continue 

2588 k = rdg_inds[0] 

2589 rdg_text = reading_texts_by_indices[(j, k)] 

2590 # Replace empty reading texts with the omission placeholder: 

2591 if rdg_text == "": 

2592 rdg_text = "om." 

2593 long_table_list.append([wit.id, vu_id, k, rdg_text]) 

2594 pbar.update(1) 

2595 # Then convert the long table entries list to a NumPy array: 

2596 long_table = np.array(long_table_list) 

2597 return long_table, column_labels 

2598 

2599 def to_dataframe( 

2600 self, 

2601 drop_constant: bool = False, 

2602 ambiguous_as_missing: bool = False, 

2603 proportion: bool = False, 

2604 table_type: TableType = TableType.matrix, 

2605 split_missing: SplitMissingType = None, 

2606 transform_matrix: TransformMatrixType = None, 

2607 show_ext: bool = False, 

2608 ): 

2609 """Returns this Collation in the form of a Pandas DataFrame array, including the appropriate row and column labels. 

2610 

2611 Args: 

2612 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2613 Default value is False. 

2614 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data. 

2615 Default value is False. 

2616 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2617 Default value is False. 

2618 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate. 

2619 Only applicable for tabular outputs. 

2620 Default value is "matrix". 

2621 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2622 If not specified, then missing data is ignored (i.e., all states are 0). 

2623 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2624 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2625 Only applicable for table types "matrix" and "idf". 

2626 transform_matrix (TransformMatrixType, optional): A TransformMatrixType option indicating how the columns of a witness-to-witness matrix output should be transformed. 

2627 Only applicable for tabular outputs in which the rows and columns correspond to the witnesses in the collation. 

2628 show_ext: An optional flag indicating whether each cell in the matrix 

2629 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2630 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2631 Default value is False. 

2632 

2633 Returns: 

2634 A Pandas DataFrame corresponding to a collation matrix with reading frequencies or a long table with discrete reading states. 

2635 """ 

2636 df = None 

2637 # Proceed based on the table type: 

2638 if table_type == TableType.matrix: 

2639 # Convert the collation to a NumPy array and get its row and column labels first: 

2640 matrix, reading_labels, witness_labels = self.to_numpy( 

2641 drop_constant=drop_constant, split_missing=split_missing 

2642 ) 

2643 df = pd.DataFrame(matrix, index=reading_labels, columns=witness_labels) 

2644 elif table_type == TableType.distance: 

2645 # Convert the collation to a NumPy array and get its row and column labels first: 

2646 matrix, witness_labels = self.to_distance_matrix( 

2647 drop_constant=drop_constant, proportion=proportion, transform_matrix=transform_matrix, show_ext=show_ext 

2648 ) 

2649 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2650 elif table_type == TableType.similarity: 

2651 # Convert the collation to a NumPy array and get its row and column labels first: 

2652 matrix, witness_labels = self.to_similarity_matrix( 

2653 drop_constant=drop_constant, proportion=proportion, transform_matrix=transform_matrix, show_ext=show_ext 

2654 ) 

2655 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2656 elif table_type == TableType.idf: 

2657 # Convert the collation to a NumPy array and get its row and column labels first: 

2658 matrix, witness_labels = self.to_idf_matrix( 

2659 drop_constant=drop_constant, 

2660 split_missing=split_missing, 

2661 proportion=proportion, 

2662 transform_matrix=transform_matrix, 

2663 show_ext=show_ext, 

2664 ) 

2665 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2666 elif table_type == TableType.mi: 

2667 # Convert the collation to a NumPy array and get its row and column labels first: 

2668 matrix, witness_labels = self.to_mi_matrix( 

2669 drop_constant=drop_constant, 

2670 split_missing=split_missing, 

2671 proportion=proportion, 

2672 transform_matrix=transform_matrix, 

2673 show_ext=show_ext, 

2674 ) 

2675 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2676 elif table_type == TableType.nexus: 

2677 # Convert the collation to a NumPy array and get its row and column labels first: 

2678 matrix, witness_labels, vu_labels = self.to_nexus_table( 

2679 drop_constant=drop_constant, ambiguous_as_missing=ambiguous_as_missing 

2680 ) 

2681 df = pd.DataFrame(matrix, index=witness_labels, columns=vu_labels) 

2682 elif table_type == TableType.long: 

2683 # Convert the collation to a long table and get its column labels first: 

2684 long_table, column_labels = self.to_long_table(drop_constant=drop_constant) 

2685 df = pd.DataFrame(long_table, columns=column_labels) 

2686 return df 

2687 

2688 def to_csv( 

2689 self, 

2690 file_addr: Union[Path, str], 

2691 drop_constant: bool = False, 

2692 ambiguous_as_missing: bool = False, 

2693 proportion: bool = False, 

2694 table_type: TableType = TableType.matrix, 

2695 split_missing: SplitMissingType = None, 

2696 transform_matrix: TransformMatrixType = None, 

2697 show_ext: bool = False, 

2698 **kwargs 

2699 ): 

2700 """Writes this Collation to a comma-separated value (CSV) file with the given address. 

2701 

2702 If your witness IDs are numeric (e.g., Gregory-Aland numbers), then they will be written in full to the CSV file, but Excel will likely interpret them as numbers and truncate any leading zeroes! 

2703 

2704 Args: 

2705 file_addr: A string representing the path to an output CSV file; the file type should be .csv. 

2706 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2707 Default value is False. 

2708 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data. 

2709 Default value is False. 

2710 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2711 Default value is False. 

2712 table_type: A TableType option indicating which type of tabular output to generate. 

2713 Only applicable for tabular outputs. 

2714 Default value is "matrix". 

2715 split_missing: An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2716 If not specified, then missing data is ignored (i.e., all states are 0). 

2717 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2718 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2719 Only applicable for table types "matrix" and "idf". 

2720 transform_matrix: A TransformMatrixType option indicating how the columns of a witness-to-witness matrix output should be transformed. 

2721 Only applicable for tabular outputs in which the rows and columns correspond to the witnesses in the collation. 

2722 show_ext: An optional flag indicating whether each cell in the matrix 

2723 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2724 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2725 Default value is False. 

2726 **kwargs: Keyword arguments for pandas.DataFrame.to_csv. 

2727 """ 

2728 # Convert the collation to a Pandas DataFrame first: 

2729 df = self.to_dataframe( 

2730 drop_constant=drop_constant, 

2731 ambiguous_as_missing=ambiguous_as_missing, 

2732 proportion=proportion, 

2733 table_type=table_type, 

2734 split_missing=split_missing, 

2735 show_ext=show_ext, 

2736 transform_matrix=transform_matrix, 

2737 ) 

2738 # Generate all parent folders for this file that don't already exist: 

2739 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

2740 # Proceed based on the table type: 

2741 if table_type == TableType.long: 

2742 return df.to_csv( 

2743 file_addr, encoding="utf-8-sig", index=False, **kwargs 

2744 ) # add BOM to start of file so that Excel will know to read it as Unicode 

2745 return df.to_csv( 

2746 file_addr, encoding="utf-8-sig", **kwargs 

2747 ) # add BOM to start of file so that Excel will know to read it as Unicode 

2748 

2749 def to_excel( 

2750 self, 

2751 file_addr: Union[Path, str], 

2752 drop_constant: bool = False, 

2753 ambiguous_as_missing: bool = False, 

2754 proportion: bool = False, 

2755 table_type: TableType = TableType.matrix, 

2756 split_missing: SplitMissingType = None, 

2757 transform_matrix: TransformMatrixType = None, 

2758 show_ext: bool = False, 

2759 ): 

2760 """Writes this Collation to an Excel (.xlsx) file with the given address. 

2761 

2762 Since Pandas is deprecating its support for xlwt, specifying an output in old Excel (.xls) output is not recommended. 

2763 

2764 Args: 

2765 file_addr: A string representing the path to an output Excel file; the file type should be .xlsx. 

2766 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2767 Default value is False. 

2768 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data. 

2769 Default value is False. 

2770 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2771 Default value is False. 

2772 table_type: A TableType option indicating which type of tabular output to generate. 

2773 Only applicable for tabular outputs. 

2774 Default value is "matrix". 

2775 split_missing: An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2776 If not specified, then missing data is ignored (i.e., all states are 0). 

2777 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2778 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2779 Only applicable for table types "matrix" and "idf". 

2780 transform_matrix: A TransformMatrixType option indicating how the columns of a witness-to-witness matrix output should be transformed. 

2781 Only applicable for tabular outputs in which the rows and columns correspond to the witnesses in the collation. 

2782 show_ext: An optional flag indicating whether each cell in the matrix 

2783 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2784 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2785 Default value is False. 

2786 """ 

2787 # Convert the collation to a Pandas DataFrame first: 

2788 df = self.to_dataframe( 

2789 drop_constant=drop_constant, 

2790 ambiguous_as_missing=ambiguous_as_missing, 

2791 proportion=proportion, 

2792 table_type=table_type, 

2793 split_missing=split_missing, 

2794 show_ext=show_ext, 

2795 transform_matrix=transform_matrix, 

2796 ) 

2797 # Generate all parent folders for this file that don't already exist: 

2798 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

2799 # Proceed based on the table type: 

2800 if table_type == TableType.long: 

2801 return df.to_excel(file_addr, index=False) 

2802 return df.to_excel(file_addr) 

2803 

2804 def to_phylip_matrix( 

2805 self, 

2806 file_addr: Union[Path, str], 

2807 drop_constant: bool = False, 

2808 proportion: bool = False, 

2809 table_type: TableType = TableType.distance, 

2810 show_ext: bool = False, 

2811 ): 

2812 """Writes this Collation as a PHYLIP-formatted distance/similarity matrix to the file with the given address. 

2813 

2814 Args: 

2815 file_addr: A string representing the path to an output PHYLIP file; the file type should be .ph or .phy. 

2816 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2817 Default value is False. 

2818 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2819 Default value is False. 

2820 table_type: A TableType option indicating which type of tabular output to generate. 

2821 For PHYLIP-formatted outputs, distance and similarity matrices are the only supported table types. 

2822 Default value is "distance". 

2823 show_ext: An optional flag indicating whether each cell in the matrix 

2824 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2825 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2826 Default value is False. 

2827 """ 

2828 # Convert the collation to a Pandas DataFrame first: 

2829 matrix = None 

2830 witness_labels = [] 

2831 # Proceed based on the table type: 

2832 if table_type == TableType.distance: 

2833 # Convert the collation to a NumPy array and get its row and column labels first: 

2834 matrix, witness_labels = self.to_distance_matrix( 

2835 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2836 ) 

2837 elif table_type == TableType.similarity: 

2838 # Convert the collation to a NumPy array and get its row and column labels first: 

2839 matrix, witness_labels = self.to_similarity_matrix( 

2840 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2841 ) 

2842 # Generate all parent folders for this file that don't already exist: 

2843 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

2844 with open(file_addr, "w", encoding="utf-8") as f: 

2845 # The first line contains the number of taxa: 

2846 f.write("%d\n" % len(witness_labels)) 

2847 # Every subsequent line contains a witness label, followed by the values in its row of the matrix: 

2848 for i, wit_id in enumerate(witness_labels): 

2849 wit_label = slugify(wit_id, lowercase=False, allow_unicode=True, separator='_') 

2850 f.write("%s %s\n" % (wit_label, " ".join([str(v) for v in matrix[i]]))) 

2851 return 

2852 

2853 def get_stemma_symbols(self): 

2854 """Returns a list of one-character symbols needed to represent the states of all substantive readings in stemma format. 

2855 

2856 The number of symbols equals the maximum number of substantive readings at any variation unit. 

2857 

2858 Returns: 

2859 A list of individual characters representing states in readings. 

2860 """ 

2861 possible_symbols = ( 

2862 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase) 

2863 ) # NOTE: the maximum number of symbols allowed in stemma format (other than "?" and "-") is 62 

2864 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

2865 nsymbols = 0 

2866 # If there are no witnesses, then no symbols are needed at all: 

2867 if len(self.witnesses) == 0: 

2868 return [] 

2869 wit_id = self.witnesses[0].id 

2870 for rdg_support in self.readings_by_witness[wit_id]: 

2871 nsymbols = max(nsymbols, len(rdg_support)) 

2872 stemma_symbols = possible_symbols[:nsymbols] 

2873 return stemma_symbols 

2874 

2875 def to_stemma(self, file_addr: Union[Path, str]): 

2876 """Writes this Collation to a stemma file without an extension and a Chron file (containing low, middle, and high dates for all witnesses) without an extension. 

2877 

2878 Since this format does not support ambiguous states, all reading vectors with anything other than one nonzero entry will be interpreted as lacunose. 

2879 If an interpGrp for weights is specified in the TEI XML collation, then the weights for the interp elements will be used as weights 

2880 for the variation units that specify them in their ana attribute. 

2881 

2882 Args: 

2883 file_addr: A string representing the path to an output stemma prep file; the file should have no extension. 

2884 The accompanying chron file will match this file name, except that it will have "_chron" appended to the end. 

2885 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2886 """ 

2887 # Populate a list of sites that will correspond to columns of the sequence alignment 

2888 # (by default, constant sites are dropped): 

2889 substantive_variation_unit_ids = [ 

2890 vu_id for vu_id in self.variation_unit_ids if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2891 ] 

2892 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2893 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2894 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary 

2895 # to the readings' texts: 

2896 reading_texts_by_indices = {} 

2897 for j, vu in enumerate(self.variation_units): 

2898 if vu.id not in substantive_variation_unit_ids_set: 

2899 continue 

2900 k = 0 

2901 for rdg in vu.readings: 

2902 key = tuple([vu.id, rdg.id]) 

2903 if key not in substantive_variation_unit_reading_tuples_set: 

2904 continue 

2905 indices = tuple([j, k]) 

2906 reading_texts_by_indices[indices] = rdg.text 

2907 k += 1 

2908 # In a second pass, populate another dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary 

2909 # to the witnesses exclusively supporting those readings: 

2910 reading_wits_by_indices = {} 

2911 for indices in reading_texts_by_indices: 

2912 reading_wits_by_indices[indices] = [] 

2913 for i, wit in enumerate(self.witnesses): 

2914 for j, vu_id in enumerate(self.variation_unit_ids): 

2915 if vu_id not in substantive_variation_unit_ids_set: 

2916 continue 

2917 rdg_support = self.readings_by_witness[wit.id][j] 

2918 # If this witness does not exclusively support exactly one reading at this unit, then treat it as lacunose: 

2919 if len([k for k, w in enumerate(rdg_support) if w > 0]) != 1: 

2920 continue 

2921 k = rdg_support.index(1) 

2922 indices = tuple([j, k]) 

2923 reading_wits_by_indices[indices].append(wit.id) 

2924 # In a third pass, write to the stemma file: 

2925 symbols = self.get_stemma_symbols() 

2926 Path(file_addr).parent.mkdir( 

2927 parents=True, exist_ok=True 

2928 ) # generate all parent folders for this file that don't already exist 

2929 chron_file_addr = str(file_addr) + "_chron" 

2930 with open(file_addr, "w", encoding="utf-8") as f: 

2931 # Start with the witness list: 

2932 f.write( 

2933 "* %s ;\n\n" 

2934 % " ".join( 

2935 [slugify(wit.id, lowercase=False, allow_unicode=True, separator='_') for wit in self.witnesses] 

2936 ) 

2937 ) 

2938 # f.write("^ %s\n\n" % chron_file_addr) #write the relative path to the chron file 

2939 f.write( 

2940 "^ %s\n\n" % ("." + os.sep + Path(chron_file_addr).name) 

2941 ) # write the relative path to the chron file 

2942 # Then add a line indicating that all witnesses are lacunose unless they are specified explicitly: 

2943 f.write("= $? $* ;\n\n") 

2944 with tqdm(total=len(self.variation_unit_ids)) as pbar: 

2945 # Then proceed for each variation unit: 

2946 for j, vu_id in enumerate(self.variation_unit_ids): 

2947 if vu_id not in substantive_variation_unit_ids_set: 

2948 pbar.update(1) 

2949 continue 

2950 # Print the variation unit ID first: 

2951 f.write("@ %s\n" % vu_id) 

2952 # In a first pass, print the texts of all readings enclosed in brackets: 

2953 f.write("[ ") 

2954 k = 0 

2955 while True: 

2956 indices = tuple([j, k]) 

2957 if indices not in reading_texts_by_indices: 

2958 break 

2959 text = slugify( 

2960 reading_texts_by_indices[indices], lowercase=False, allow_unicode=True, separator='.' 

2961 ) 

2962 # Denote omissions by en-dashes: 

2963 if text == "": 

2964 text = "\u2013" 

2965 # The first reading should not be preceded by anything: 

2966 if k == 0: 

2967 f.write(text) 

2968 f.write(" |") 

2969 # Add the weight of this variation unit after the pipe by comparing its analysis categories to their weights: 

2970 weight = 1 

2971 vu = self.variation_units[j] 

2972 if len(vu.analysis_categories) > 0: 

2973 weight = int( 

2974 sum( 

2975 [ 

2976 self.weights_by_id[ana] if ana in self.weights_by_id else 1 

2977 for ana in vu.analysis_categories 

2978 ] 

2979 ) 

2980 / len(vu.analysis_categories) 

2981 ) 

2982 f.write("*%d" % weight) 

2983 # Every subsequent reading should be preceded by a space: 

2984 elif k > 0: 

2985 f.write(" %s" % text) 

2986 k += 1 

2987 f.write(" ]\n") 

2988 # In a second pass, print the indices and witnesses for all readings enclosed in angle brackets: 

2989 k = 0 

2990 f.write("\t< ") 

2991 while True: 

2992 indices = tuple([j, k]) 

2993 if indices not in reading_wits_by_indices: 

2994 break 

2995 rdg_symbol = symbols[k] # get the one-character alphanumeric code for this state 

2996 wits = " ".join(reading_wits_by_indices[indices]) 

2997 # Open the variant reading support block with an angle bracket: 

2998 if k == 0: 

2999 f.write("%s %s" % (rdg_symbol, wits)) 

3000 # Open all subsequent variant reading support blocks with pipes on the next line: 

3001 else: 

3002 f.write("\n\t| %s %s" % (rdg_symbol, wits)) 

3003 k += 1 

3004 f.write(" >\n") 

3005 pbar.update(1) 

3006 # In a fourth pass, write to the chron file: 

3007 max_id_length = max( 

3008 [len(slugify(wit.id, lowercase=False, allow_unicode=True, separator='_')) for wit in self.witnesses] 

3009 ) 

3010 max_date_length = 0 

3011 for wit in self.witnesses: 

3012 if wit.date_range[0] is not None: 

3013 max_date_length = max(max_date_length, len(str(wit.date_range[0]))) 

3014 if wit.date_range[1] is not None: 

3015 max_date_length = max(max_date_length, len(str(wit.date_range[1]))) 

3016 # Attempt to get the minimum and maximum dates for witnesses; if we can't do this, then don't write a chron file: 

3017 min_date = None 

3018 max_date = None 

3019 try: 

3020 min_date = min([wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None]) 

3021 max_date = max([wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None]) 

3022 except Exception as e: 

3023 print("WARNING: no witnesses have date ranges; no chron file will be written!") 

3024 return 

3025 with open(chron_file_addr, "w", encoding="utf-8") as f: 

3026 for wit in self.witnesses: 

3027 wit_label = slugify(wit.id, lowercase=False, allow_unicode=True, separator='_') 

3028 f.write(wit_label) 

3029 f.write(" " * (max_id_length - len(wit.id) + 1)) 

3030 # If either the lower bound on this witness's date is empty, then use the min and max dates over all witnesses as defaults: 

3031 date_range = wit.date_range 

3032 if date_range[0] is None: 

3033 date_range = tuple([min_date, date_range[1]]) 

3034 # Then write the date range minimum, average, and maximum to the chron file: 

3035 low_date = str(date_range[0]) 

3036 f.write(" " * (max_date_length - len(low_date) + 2)) 

3037 f.write(low_date) 

3038 avg_date = str(int(((date_range[0] + date_range[1]) / 2))) 

3039 f.write(" " * (max_date_length - len(str(avg_date)) + 2)) 

3040 f.write(avg_date) 

3041 high_date = str(date_range[1]) 

3042 f.write(" " * (max_date_length - len(high_date) + 2)) 

3043 f.write(high_date) 

3044 f.write("\n") 

3045 return 

3046 

3047 def to_file( 

3048 self, 

3049 file_addr: Union[Path, str], 

3050 format: Format = None, 

3051 drop_constant: bool = False, 

3052 split_missing: SplitMissingType = None, 

3053 char_state_labels: bool = True, 

3054 frequency: bool = False, 

3055 ambiguous_as_missing: bool = False, 

3056 proportion: bool = False, 

3057 calibrate_dates: bool = False, 

3058 mrbayes: bool = False, 

3059 clock_model: ClockModel = ClockModel.strict, 

3060 ancestral_logger: AncestralLogger = AncestralLogger.state, 

3061 table_type: TableType = TableType.matrix, 

3062 transform_matrix: TransformMatrixType = None, 

3063 show_ext: bool = False, 

3064 seed: int = None, 

3065 ): 

3066 """Writes this Collation to the file with the given address. 

3067 

3068 Args: 

3069 file_addr (Union[Path, str]): The path to the output file. 

3070 format (Format, optional): The desired output format. 

3071 If None then it is infered from the file suffix. 

3072 Defaults to None. 

3073 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

3074 Default value is False. 

3075 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

3076 If not specified, then missing data is ignored (i.e., all states are 0). 

3077 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

3078 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

3079 Only applicable for tabular outputs of type "matrix", "idf", "mean-idf", "mi", and "mean-mi". 

3080 char_state_labels (bool, optional): An optional flag indicating whether to print 

3081 the CharStateLabels block in NEXUS output. 

3082 Default value is True. 

3083 frequency (bool, optional): An optional flag indicating whether to use the StatesFormat=Frequency setting 

3084 instead of the StatesFormat=StatesPresent setting 

3085 (and thus represent all states with frequency vectors rather than symbols) 

3086 in NEXUS output. 

3087 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation. 

3088 Default value is False. 

3089 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data. 

3090 If this flag is set, then only base symbols will be generated for the NEXUS file. 

3091 It is only applied if the frequency option is False. 

3092 Default value is False. 

3093 proportion (bool, optional): An optional flag indicating whether to populate a distance matrix's cells 

3094 with a proportion of disagreements to variation units where both witnesses are extant. 

3095 It is only applied if the table_type option is "distance". 

3096 Default value is False. 

3097 calibrate_dates (bool, optional): An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses 

3098 in NEXUS output. 

3099 This option is intended for inputs to BEAST 2. 

3100 Default value is False. 

3101 mrbayes (bool, optional): An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses 

3102 in NEXUS output. 

3103 This option is intended for inputs to MrBayes. 

3104 Default value is False. 

3105 clock_model (ClockModel, optional): A ClockModel option indicating which type of clock model to use. 

3106 This option is intended for inputs to MrBayes and BEAST 2. 

3107 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified. 

3108 Default value is "strict". 

3109 ancestral_logger (AncestralLogger, optional): An AncestralLogger option indicating which class of logger (if any) to use for ancestral states. 

3110 This option is intended for inputs to BEAST 2. 

3111 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate. 

3112 Only applicable for tabular outputs and PHYLIP outputs. 

3113 If the output is a PHYLIP file, then the type of tabular output must be "distance" or "similarity"; otherwise, it will be ignored. 

3114 Default value is "matrix". 

3115 transform_matrix (TransformMatrixType, optional): A TransformMatrixType option indicating how the columns of a witness-to-witness matrix output should be transformed. 

3116 Only applicable for tabular outputs in which the rows and columns correspond to the witnesses in the collation. 

3117 show_ext (bool, optional): An optional flag indicating whether each cell in the matrix 

3118 should include the number of variation units where both witnesses are extant after the number of their disagreements/agreements. 

3119 Only applicable for tabular output formats of type "distance" or "similarity". 

3120 Default value is False. 

3121 seed (optional, int): A seed for random number generation (for setting initial values of unspecified transcriptional rates in BEAST 2 XML output). 

3122 """ 

3123 file_addr = Path(file_addr) 

3124 format = format or Format.infer( 

3125 file_addr.suffix 

3126 ) # an exception will be raised here if the format or suffix is invalid 

3127 

3128 if format == Format.NEXUS: 

3129 return self.to_nexus( 

3130 file_addr, 

3131 drop_constant=drop_constant, 

3132 char_state_labels=char_state_labels, 

3133 frequency=frequency, 

3134 ambiguous_as_missing=ambiguous_as_missing, 

3135 calibrate_dates=calibrate_dates, 

3136 mrbayes=mrbayes, 

3137 clock_model=clock_model, 

3138 ) 

3139 

3140 if format == format.HENNIG86: 

3141 return self.to_hennig86(file_addr, drop_constant=drop_constant) 

3142 

3143 if format == format.PHYLIP: 

3144 if table_type in [TableType.distance, TableType.similarity]: 

3145 return self.to_phylip_matrix( 

3146 file_addr, 

3147 drop_constant=drop_constant, 

3148 proportion=proportion, 

3149 table_type=table_type, 

3150 show_ext=show_ext, 

3151 ) 

3152 return self.to_phylip(file_addr, drop_constant=drop_constant) 

3153 

3154 if format == format.FASTA: 

3155 return self.to_fasta(file_addr, drop_constant=drop_constant) 

3156 

3157 if format == format.BEAST: 

3158 return self.to_beast( 

3159 file_addr, 

3160 drop_constant=drop_constant, 

3161 clock_model=clock_model, 

3162 ancestral_logger=ancestral_logger, 

3163 seed=seed, 

3164 ) 

3165 

3166 if format == Format.CSV: 

3167 return self.to_csv( 

3168 file_addr, 

3169 drop_constant=drop_constant, 

3170 ambiguous_as_missing=ambiguous_as_missing, 

3171 proportion=proportion, 

3172 table_type=table_type, 

3173 split_missing=split_missing, 

3174 transform_matrix=transform_matrix, 

3175 show_ext=show_ext, 

3176 ) 

3177 

3178 if format == Format.TSV: 

3179 return self.to_csv( 

3180 file_addr, 

3181 drop_constant=drop_constant, 

3182 ambiguous_as_missing=ambiguous_as_missing, 

3183 proportion=proportion, 

3184 table_type=table_type, 

3185 split_missing=split_missing, 

3186 transform_matrix=transform_matrix, 

3187 show_ext=show_ext, 

3188 sep="\t", 

3189 ) 

3190 

3191 if format == Format.EXCEL: 

3192 return self.to_excel( 

3193 file_addr, 

3194 drop_constant=drop_constant, 

3195 ambiguous_as_missing=ambiguous_as_missing, 

3196 proportion=proportion, 

3197 table_type=table_type, 

3198 split_missing=split_missing, 

3199 transform_matrix=transform_matrix, 

3200 show_ext=show_ext, 

3201 ) 

3202 

3203 if format == Format.STEMMA: 

3204 return self.to_stemma(file_addr)