Coverage for teiphy/collation.py: 99.93%

1358 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-11 10:54 +0000

1#!/usr/bin/env python3 

2 

3from enum import Enum 

4from typing import List, Union 

5import os 

6from pathlib import Path 

7from datetime import datetime # for calculating the current year (for dating and tree height purposes) 

8import time # to time calculations for users 

9import string # for easy retrieval of character ranges 

10from lxml import etree as et # for reading TEI XML inputs 

11import numpy as np # for random number sampling and collation matrix outputs 

12import pandas as pd # for writing to DataFrames, CSV, Excel, etc. 

13from slugify import slugify # for converting Unicode text from readings to ASCII for NEXUS 

14from jinja2 import Environment, PackageLoader, select_autoescape # for filling output XML templates 

15 

16from .common import xml_ns, tei_ns 

17from .format import Format 

18from .witness import Witness 

19from .variation_unit import VariationUnit 

20 

21 

22class ParsingException(Exception): 

23 pass 

24 

25 

26class WitnessDateException(Exception): 

27 pass 

28 

29 

30class IntrinsicRelationsException(Exception): 

31 pass 

32 

33 

34class ClockModel(str, Enum): 

35 strict = "strict" 

36 uncorrelated = "uncorrelated" 

37 local = "local" 

38 

39 

40class AncestralLogger(str, Enum): 

41 state = "state" 

42 sequence = "sequence" 

43 none = "none" 

44 

45 

46class TableType(str, Enum): 

47 matrix = "matrix" 

48 distance = "distance" 

49 similarity = "similarity" 

50 nexus = "nexus" 

51 long = "long" 

52 

53 

54class Collation: 

55 """Base class for storing TEI XML collation data internally. 

56 

57 This corresponds to the entire XML tree, rooted at the TEI element of the collation. 

58 

59 Attributes: 

60 manuscript_suffixes: A list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses. 

61 trivial_reading_types: A set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading. 

62 missing_reading_types: A set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data. 

63 fill_corrector_lacunae: A boolean flag indicating whether or not to fill "lacunae" in witnesses with type "corrector". 

64 fragmentary_threshold: A float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation. 

65 witnesses: A list of Witness instances contained in this Collation. 

66 witness_index_by_id: A dictionary mapping base witness ID strings to their int indices in the witnesses list. 

67 variation_units: A list of VariationUnit instances contained in this Collation. 

68 readings_by_witness: A dictionary mapping base witness ID strings to lists of reading support coefficients for all units (with at least two substantive readings). 

69 substantive_variation_unit_ids: A list of ID strings for variation units with two or more substantive readings. 

70 substantive_variation_unit_reading_tuples: A list of (variation unit ID, reading ID) tuples for substantive readings. 

71 verbose: A boolean flag indicating whether or not to print timing and debugging details for the user. 

72 """ 

73 

74 def __init__( 

75 self, 

76 xml: et.ElementTree, 

77 manuscript_suffixes: List[str] = [], 

78 trivial_reading_types: List[str] = [], 

79 missing_reading_types: List[str] = [], 

80 fill_corrector_lacunae: bool = False, 

81 fragmentary_threshold: float = None, 

82 dates_file: Union[Path, str] = None, 

83 verbose: bool = False, 

84 ): 

85 """Constructs a new Collation instance with the given settings. 

86 

87 Args: 

88 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

89 manuscript_suffixes: An optional list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses. 

90 trivial_reading_types: An optional set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading. 

91 missing_reading_types: An optional set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data. 

92 fill_corrector_lacunae: An optional flag indicating whether or not to fill "lacunae" in witnesses with type "corrector". 

93 fragmentary_threshold: An optional float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation. 

94 dates_file: An optional path to a CSV file containing witness IDs, minimum dates, and maximum dates. If specified, then for all witnesses in the first column, any existing date ranges for them in the TEI XML collation will be ignored. 

95 verbose: An optional flag indicating whether or not to print timing and debugging details for the user. 

96 """ 

97 self.manuscript_suffixes = manuscript_suffixes 

98 self.trivial_reading_types = set(trivial_reading_types) 

99 self.missing_reading_types = set(missing_reading_types) 

100 self.fill_corrector_lacunae = fill_corrector_lacunae 

101 self.fragmentary_threshold = fragmentary_threshold 

102 self.verbose = verbose 

103 self.witnesses = [] 

104 self.witness_index_by_id = {} 

105 self.variation_units = [] 

106 self.readings_by_witness = {} 

107 self.variation_unit_ids = [] 

108 self.substantive_variation_unit_reading_tuples = [] 

109 self.substantive_readings_by_variation_unit_id = {} 

110 self.weight_categories = [] 

111 self.weights_by_id = {} 

112 self.intrinsic_categories = [] 

113 self.intrinsic_odds_by_id = {} 

114 self.transcriptional_categories = [] 

115 self.transcriptional_rates_by_id = {} 

116 self.origin_date_range = [] 

117 # Now parse the XML tree to populate these data structures: 

118 if self.verbose: 

119 print("Initializing collation...") 

120 t0 = time.time() 

121 self.parse_origin_date_range(xml) 

122 self.parse_list_wit(xml) 

123 self.validate_wits(xml) 

124 # If a dates file was specified, then update the witness date ranges manually: 

125 if dates_file is not None: 

126 self.update_witness_date_ranges_from_dates_file(dates_file) 

127 # If the upper bound on a work's date of origin is not defined, then attempt to assign it an upper bound based on the witness dates; 

128 # otherwise, attempt to assign lower bounds to witness dates based on it: 

129 if self.origin_date_range[1] is None: 

130 self.update_origin_date_range_from_witness_date_ranges() 

131 else: 

132 self.update_witness_date_ranges_from_origin_date_range() 

133 self.parse_weights(xml) 

134 self.parse_intrinsic_odds(xml) 

135 self.parse_transcriptional_rates(xml) 

136 self.parse_apps(xml) 

137 self.validate_intrinsic_relations() 

138 self.parse_readings_by_witness() 

139 # If a threshold of readings for fragmentary witnesses is specified, then filter the witness list using the dictionary mapping witness IDs to readings: 

140 if self.fragmentary_threshold is not None: 

141 self.filter_fragmentary_witnesses(xml) 

142 t1 = time.time() 

143 if self.verbose: 

144 print("Total time to initialize collation: %0.4fs." % (t1 - t0)) 

145 

146 def parse_origin_date_range(self, xml: et.ElementTree): 

147 """Given an XML tree for a collation, populates this Collation's list of origin date bounds. 

148 

149 Args: 

150 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

151 """ 

152 if self.verbose: 

153 print("Parsing origin date range...") 

154 t0 = time.time() 

155 self.origin_date_range = [None, None] 

156 for date in xml.xpath( 

157 "//tei:sourceDesc//tei:bibl//tei:date|//tei:sourceDesc//tei:biblStruct//tei:date|//tei:sourceDesc//tei:biblFull//tei:date", 

158 namespaces={"tei": tei_ns}, 

159 ): 

160 # Try the @when attribute first; if it is set, then it accounts for both ends of the date range: 

161 if date.get("when") is not None: 

162 self.origin_date_range[0] = int(date.get("when").split("-")[0]) 

163 self.origin_date_range[1] = self.origin_date_range[0] 

164 # Failing that, if it has @from and @to attributes (indicating the period over which the work was completed), 

165 # then the completion date of the work accounts for both ends of the date range: 

166 elif date.get("to") is not None: 

167 self.origin_date_range[0] = int(date.get("to").split("-")[0]) 

168 self.origin_date_range[1] = self.origin_date_range[0] 

169 # Failing that, set lower and upper bounds on the origin date using the the @notBefore and @notAfter attributes: 

170 elif date.get("notBefore") is not None or date.get("notAfter") is not None: 

171 if date.get("notBefore") is not None: 

172 self.origin_date_range[0] = int(date.get("notBefore").split("-")[0]) 

173 if date.get("notAfter") is not None: 

174 self.origin_date_range[1] = int(date.get("notAfter").split("-")[0]) 

175 return 

176 

177 def get_base_wit(self, wit: str): 

178 """Given a witness siglum, strips of the specified manuscript suffixes until the siglum matches one in the witness list or until no more suffixes can be stripped. 

179 

180 Args: 

181 wit: A string representing a witness siglum, potentially including suffixes to be stripped. 

182 """ 

183 base_wit = wit 

184 # If our starting siglum corresponds to a siglum in the witness list, then just return it: 

185 if base_wit in self.witness_index_by_id: 

186 return base_wit 

187 # Otherwise, strip any suffixes we find until the siglum corresponds to a base witness in the list 

188 # or no more suffixes can be stripped: 

189 suffix_found = True 

190 while suffix_found: 

191 suffix_found = False 

192 for suffix in self.manuscript_suffixes: 

193 if base_wit.endswith(suffix): 

194 suffix_found = True 

195 base_wit = base_wit[: -len(suffix)] 

196 break # stop looking for other suffixes 

197 # If the siglum stripped of this suffix now corresponds to a siglum in the witness list, then return it: 

198 if base_wit in self.witness_index_by_id: 

199 return base_wit 

200 # If we get here, then all possible manuscript suffixes have been stripped, and the resulting siglum does not correspond to a siglum in the witness list: 

201 return base_wit 

202 

203 def parse_list_wit(self, xml: et.ElementTree): 

204 """Given an XML tree for a collation, populates its list of witnesses from its listWit element. 

205 If the XML tree does not contain a listWit element, then a ParsingException is thrown listing all distinct witness sigla encountered in the collation. 

206 

207 Args: 

208 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

209 """ 

210 if self.verbose: 

211 print("Parsing witness list...") 

212 t0 = time.time() 

213 self.witnesses = [] 

214 self.witness_index_by_id = {} 

215 list_wits = xml.xpath("/tei:TEI//tei:listWit", namespaces={"tei": tei_ns}) 

216 if len(list_wits) == 0: 

217 # There is no listWit element: collect all distinct witness sigla in the collation and raise a ParsingException listing them: 

218 distinct_sigla = set() 

219 sigla = [] 

220 # Proceed for each rdg, rdgGrp, or witDetail element: 

221 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}): 

222 wit_str = rdg.get("wit") if rdg.get("wit") is not None else "" 

223 wits = wit_str.split() 

224 for wit in wits: 

225 siglum = wit.strip("#") # remove the URI prefix, if there is one 

226 if siglum not in distinct_sigla: 

227 distinct_sigla.add(siglum) 

228 sigla.append(siglum) 

229 sigla.sort() 

230 msg = "" 

231 msg += "An explicit listWit element must be included in the TEI XML collation.\n" 

232 msg += "The following sigla occur in the collation and should be included as the @xml:id or @n attributes of witness elements under the listWit element:\n" 

233 msg += ", ".join(sigla) 

234 raise ParsingException(msg) 

235 # Otherwise, take the first listWit element as the list of all witnesses and process it: 

236 list_wit = list_wits[0] 

237 for witness in list_wit.xpath("./tei:witness", namespaces={"tei": tei_ns}): 

238 wit = Witness(witness, self.verbose) 

239 self.witness_index_by_id[wit.id] = len(self.witnesses) 

240 self.witnesses.append(wit) 

241 t1 = time.time() 

242 if self.verbose: 

243 print("Finished processing %d witnesses in %0.4fs." % (len(self.witnesses), t1 - t0)) 

244 return 

245 

246 def validate_wits(self, xml: et.ElementTree): 

247 """Given an XML tree for a collation, checks if any witness sigla listed in a rdg, rdgGrp, or witDetail element, 

248 once stripped of ignored suffixes, is not found in the witness list. 

249 A warning will be issued for each distinct siglum like this. 

250 This method also checks if the upper bound of any witness's date is earlier than the lower bound on the collated work's date of origin 

251 and throws an exception if so. 

252 

253 Args: 

254 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

255 """ 

256 if self.verbose: 

257 print("Validating witness list against collation...") 

258 t0 = time.time() 

259 # There is no listWit element: collect all distinct witness sigla in the collation and raise an exception listing them: 

260 distinct_extra_sigla = set() 

261 extra_sigla = [] 

262 # Proceed for each rdg, rdgGrp, or witDetail element: 

263 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}): 

264 wit_str = rdg.get("wit") if rdg.get("wit") is not None else "" 

265 wits = wit_str.split() 

266 for wit in wits: 

267 siglum = wit.strip("#") # remove the URI prefix, if there is one 

268 base_siglum = self.get_base_wit(siglum) 

269 if base_siglum not in self.witness_index_by_id: 

270 if base_siglum not in distinct_extra_sigla: 

271 distinct_extra_sigla.add(base_siglum) 

272 extra_sigla.append(base_siglum) 

273 if len(extra_sigla) > 0: 

274 extra_sigla.sort() 

275 msg = "" 

276 msg += "WARNING: The following sigla occur in the collation that do not have corresponding witness entries in the listWit:\n" 

277 msg += ", ".join(extra_sigla) 

278 print(msg) 

279 # If the lower bound on the date of origin is defined, then check each witness against it: 

280 if self.origin_date_range[0] is not None: 

281 bad_date_witness_sigla = [] 

282 bad_date_upper_bounds_by_witness = {} 

283 for i, wit in enumerate(self.witnesses): 

284 if wit.date_range[1] is not None and wit.date_range[1] < self.origin_date_range[0]: 

285 bad_date_witness_sigla.append(wit.id) 

286 bad_date_upper_bounds_by_witness[wit.id] = wit.date_range[1] 

287 if len(bad_date_witness_sigla) > 0: 

288 msg = "" 

289 msg += "The following witnesses have their latest possible dates before the earliest date of origin %d specified for the collated work:\n" 

290 msg += ", ".join( 

291 [ 

292 (siglum + "(" + str(bad_date_upper_bounds_by_witness[siglum]) + ")") 

293 for siglum in bad_date_witness_sigla 

294 ] 

295 ) 

296 raise WitnessDateException(msg) 

297 t1 = time.time() 

298 if self.verbose: 

299 print("Finished witness validation in %0.4fs." % (t1 - t0)) 

300 return 

301 

302 def update_witness_date_ranges_from_dates_file(self, dates_file: Union[Path, str]): 

303 """Given a CSV-formatted dates file, update the date ranges of all witnesses whose IDs are in the first column of the dates file 

304 (overwriting existing date ranges if necessary). 

305 """ 

306 if self.verbose: 

307 print("Updating witness dates from file %s..." % (str(dates_file))) 

308 t0 = time.time() 

309 dates_df = pd.read_csv(dates_file, index_col=0, names=["id", "min", "max"]) 

310 for witness in self.witnesses: 

311 wit_id = witness.id 

312 if wit_id in dates_df.index: 

313 # For every witness in the list whose ID is specified in the dates file, 

314 # update their date ranges (as long as the date ranges in the file are are well-formed): 

315 min_date = int(dates_df.loc[wit_id]["min"]) if not np.isnan(dates_df.loc[wit_id]["min"]) else None 

316 max_date = ( 

317 int(dates_df.loc[wit_id]["max"]) 

318 if not np.isnan(dates_df.loc[wit_id]["max"]) 

319 else datetime.now().year 

320 ) 

321 print(min_date, max_date) 

322 if min_date is not None and max_date is not None and min_date > max_date: 

323 raise ParsingException( 

324 "In dates file %s, for witness ID %s, the minimum date %d is greater than the maximum date %d." 

325 % (str(dates_file), wit_id, min_date, max_date) 

326 ) 

327 witness.date_range = [min_date, max_date] 

328 t1 = time.time() 

329 if self.verbose: 

330 print("Finished witness date range updates in %0.4fs." % (t1 - t0)) 

331 return 

332 

333 def update_origin_date_range_from_witness_date_ranges(self): 

334 """Conditionally updates the upper bound on the date of origin of the work represented by this Collation 

335 based on the bounds on the witnesses' dates. 

336 If none of the witnesses have bounds on their dates, then nothing is done. 

337 This method is only invoked if the work's date of origin does not already have its upper bound defined. 

338 """ 

339 if self.verbose: 

340 print("Updating upper bound on origin date using witness dates...") 

341 t0 = time.time() 

342 # Set the origin date to the earliest witness date: 

343 witness_date_lower_bounds = [wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None] 

344 witness_date_upper_bounds = [wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None] 

345 min_witness_date = ( 

346 min(witness_date_lower_bounds + witness_date_upper_bounds) 

347 if len(witness_date_lower_bounds + witness_date_upper_bounds) > 0 

348 else None 

349 ) 

350 if min_witness_date is not None: 

351 self.origin_date_range[1] = ( 

352 min(self.origin_date_range[1], min_witness_date) 

353 if self.origin_date_range[1] is not None 

354 else min_witness_date 

355 ) 

356 t1 = time.time() 

357 if self.verbose: 

358 print("Finished updating upper bound on origin date in %0.4fs." % (t1 - t0)) 

359 return 

360 

361 def update_witness_date_ranges_from_origin_date_range(self): 

362 """Attempts to update the lower bounds on the witnesses' dates of origin of the work represented by this Collation 

363 using the upper bound on the date of origin of the work represented by this Collation. 

364 This method is only invoked if the upper bound on the work's date of origin was not already defined 

365 (i.e., if update_origin_date_range_from_witness_date_ranges was not invoked). 

366 """ 

367 if self.verbose: 

368 print("Updating lower bounds on witness dates using origin date...") 

369 t0 = time.time() 

370 # Proceed for every witness: 

371 for i, wit in enumerate(self.witnesses): 

372 # Ensure that the lower bound on this witness's date is no earlier than the upper bound on the date of the work's origin: 

373 wit.date_range[0] = ( 

374 max(wit.date_range[0], self.origin_date_range[1]) 

375 if wit.date_range[0] is not None 

376 else self.origin_date_range[1] 

377 ) 

378 # Then ensure that the upper bound on this witness's date is no earlier than its lower bound, in case we updated it: 

379 wit.date_range[1] = max(wit.date_range[0], wit.date_range[1]) 

380 t1 = time.time() 

381 if self.verbose: 

382 print("Finished updating lower bounds on witness dates in %0.4fs." % (t1 - t0)) 

383 return 

384 

385 def parse_weights(self, xml: et.ElementTree): 

386 """Given an XML tree for a collation, populates this Collation's list of variation unit weight categories 

387 (associated with types of variation that may have different expected frequencies) 

388 and its dictionary mapping these categories to integer weights. 

389 

390 Args: 

391 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

392 """ 

393 if self.verbose: 

394 print("Parsing variation unit weight categories...") 

395 t0 = time.time() 

396 self.weight_categories = [] 

397 self.weights_by_id = {} 

398 for interp in xml.xpath("//tei:interpGrp[@type=\"weight\"]/tei:interp", namespaces={"tei": tei_ns}): 

399 # These must be indexed by the xml:id attribute, so skip any that do not have one: 

400 if interp.get("{%s}id" % xml_ns) is None: 

401 continue 

402 weight_category = interp.get("{%s}id" % xml_ns) 

403 # Retrieve this element's text value and coerce it to an integer, defaulting to 1 if it has none: 

404 weight = 1 

405 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}): 

406 if certainty.get("degree") is not None: 

407 weight = int(certainty.get("degree")) 

408 break 

409 self.weight_categories.append(weight_category) 

410 self.weights_by_id[weight_category] = weight 

411 t1 = time.time() 

412 if self.verbose: 

413 print( 

414 "Finished processing %d variation unit weight categories in %0.4fs." 

415 % (len(self.weight_categories), t1 - t0) 

416 ) 

417 return 

418 

419 def parse_intrinsic_odds(self, xml: et.ElementTree): 

420 """Given an XML tree for a collation, populates this Collation's list of intrinsic probability categories 

421 (e.g., "absolutely more likely," "highly more likely," "more likely," "slightly more likely," "equally likely") 

422 and its dictionary mapping these categories to numerical odds. 

423 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated. 

424 

425 Args: 

426 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

427 """ 

428 if self.verbose: 

429 print("Parsing intrinsic odds categories...") 

430 t0 = time.time() 

431 self.intrinsic_categories = [] 

432 self.intrinsic_odds_by_id = {} 

433 for interp in xml.xpath("//tei:interpGrp[@type=\"intrinsic\"]/tei:interp", namespaces={"tei": tei_ns}): 

434 # These must be indexed by the xml:id attribute, so skip any that do not have one: 

435 if interp.get("{%s}id" % xml_ns) is None: 

436 continue 

437 odds_category = interp.get("{%s}id" % xml_ns) 

438 # If this element contains a certainty subelement with a fixed odds value for this category, then set it: 

439 odds = None 

440 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}): 

441 if certainty.get("degree") is not None: 

442 odds = float(certainty.get("degree")) 

443 break 

444 self.intrinsic_categories.append(odds_category) 

445 self.intrinsic_odds_by_id[odds_category] = odds 

446 t1 = time.time() 

447 if self.verbose: 

448 print( 

449 "Finished processing %d intrinsic odds categories in %0.4fs." 

450 % (len(self.intrinsic_categories), t1 - t0) 

451 ) 

452 return 

453 

454 def parse_transcriptional_rates(self, xml: et.ElementTree): 

455 """Given an XML tree for a collation, populates this Collation's dictionary mapping transcriptional change categories 

456 (e.g., "aural confusion," "visual error," "clarification") to numerical rates. 

457 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated. 

458 

459 Args: 

460 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

461 """ 

462 if self.verbose: 

463 print("Parsing transcriptional change categories...") 

464 t0 = time.time() 

465 self.transcriptional_categories = [] 

466 self.transcriptional_rates_by_id = {} 

467 for interp in xml.xpath("//tei:interpGrp[@type=\"transcriptional\"]/tei:interp", namespaces={"tei": tei_ns}): 

468 # These must be indexed by the xml:id attribute, so skip any that do not have one: 

469 if interp.get("{%s}id" % xml_ns) is None: 

470 continue 

471 transcriptional_category = interp.get("{%s}id" % xml_ns) 

472 # If this element contains a certainty subelement with a fixed rate for this category, then set it: 

473 rate = None 

474 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}): 

475 if certainty.get("degree") is not None: 

476 rate = float(certainty.get("degree")) 

477 break 

478 self.transcriptional_categories.append(transcriptional_category) 

479 self.transcriptional_rates_by_id[transcriptional_category] = rate 

480 t1 = time.time() 

481 if self.verbose: 

482 print( 

483 "Finished processing %d transcriptional change categories in %0.4fs." 

484 % (len(self.transcriptional_rates_by_id), t1 - t0) 

485 ) 

486 return 

487 

488 def validate_intrinsic_relations(self): 

489 """Checks if any VariationUnit's intrinsic_relations map is not a forest. 

490 If any is not, then an IntrinsicRelationsException is thrown describing the VariationUnit at fault. 

491 """ 

492 if self.verbose: 

493 print("Validating intrinsic relation graphs for variation units...") 

494 t0 = time.time() 

495 for vu in self.variation_units: 

496 # Skip any variation units with an empty intrinsic_relations map: 

497 if len(vu.intrinsic_relations) == 0: 

498 continue 

499 # For all others, start by identifying all reading IDs that are not related to by some other reading ID: 

500 in_degree_by_reading = {} 

501 for edge in vu.intrinsic_relations: 

502 s = edge[0] 

503 t = edge[1] 

504 if s not in in_degree_by_reading: 

505 in_degree_by_reading[s] = 0 

506 if t not in in_degree_by_reading: 

507 in_degree_by_reading[t] = 0 

508 in_degree_by_reading[t] += 1 

509 # If any reading has more than one relation pointing to it, then the intrinsic relations graph is not a forest: 

510 excessive_in_degree_readings = [ 

511 rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] > 1 

512 ] 

513 if len(excessive_in_degree_readings) > 0: 

514 msg = "" 

515 msg += ( 

516 "In variation unit %s, the following readings have more than one intrinsic relation pointing to them: %s.\n" 

517 % (vu.id, ", ".join(excessive_in_degree_readings)) 

518 ) 

519 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it." 

520 raise IntrinsicRelationsException(msg) 

521 # If every reading has another reading pointing to it, then the intrinsic relations graph contains a cycle and is not a forest: 

522 starting_nodes = [rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] == 0] 

523 if len(starting_nodes) == 0: 

524 msg = "" 

525 msg += "In variation unit %s, the intrinsic relations contain a cycle.\n" % vu.id 

526 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it." 

527 raise IntrinsicRelationsException(msg) 

528 t1 = time.time() 

529 if self.verbose: 

530 print("Finished intrinsic relations validation in %0.4fs." % (t1 - t0)) 

531 return 

532 

533 def parse_apps(self, xml: et.ElementTree): 

534 """Given an XML tree for a collation, populates its list of variation units from its app elements. 

535 

536 Args: 

537 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

538 """ 

539 if self.verbose: 

540 print("Parsing variation units...") 

541 t0 = time.time() 

542 for a in xml.xpath('//tei:app', namespaces={'tei': tei_ns}): 

543 vu = VariationUnit(a, self.verbose) 

544 self.variation_units.append(vu) 

545 t1 = time.time() 

546 if self.verbose: 

547 print("Finished processing %d variation units in %0.4fs." % (len(self.variation_units), t1 - t0)) 

548 return 

549 

550 def get_readings_by_witness_for_unit(self, vu: VariationUnit): 

551 """Returns a dictionary mapping witness IDs to a list of their reading coefficients for a given variation unit. 

552 

553 Args: 

554 vu: A VariationUnit to be processed. 

555 

556 Returns: 

557 A dictionary mapping witness ID strings to a list of their coefficients for all substantive readings in this VariationUnit. 

558 """ 

559 # In a first pass, populate lists of substantive (variation unit ID, reading ID) tuples and reading labels 

560 # and a map from reading IDs to the indices of their parent substantive reading in this unit: 

561 reading_id_to_index = {} 

562 self.substantive_readings_by_variation_unit_id[vu.id] = [] 

563 for rdg in vu.readings: 

564 # If this reading is missing (e.g., lacunose or inapplicable due to an overlapping variant) or targets another reading, then skip it: 

565 if rdg.type in self.missing_reading_types or len(rdg.certainties) > 0: 

566 continue 

567 # If this reading is trivial, then map it to the last substantive index: 

568 if rdg.type in self.trivial_reading_types: 

569 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1 

570 continue 

571 # Otherwise, the reading is substantive: add it to the map and update the last substantive index: 

572 self.substantive_readings_by_variation_unit_id[vu.id].append(rdg.id) 

573 self.substantive_variation_unit_reading_tuples.append(tuple([vu.id, rdg.id])) 

574 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1 

575 # If the list of substantive readings only contains one entry, then this variation unit is not informative; 

576 # return an empty dictionary and add nothing to the list of substantive reading labels: 

577 if self.verbose: 

578 print( 

579 "Variation unit %s has %d substantive readings." 

580 % (vu.id, len(self.substantive_readings_by_variation_unit_id[vu.id])) 

581 ) 

582 readings_by_witness_for_unit = {} 

583 # Initialize the output dictionary with empty sets for all base witnesses: 

584 for wit in self.witnesses: 

585 readings_by_witness_for_unit[wit.id] = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id]) 

586 # In a second pass, assign each base witness a set containing the readings it supports in this unit: 

587 for rdg in vu.readings: 

588 # Initialize the dictionary indicating support for this reading (or its disambiguations): 

589 rdg_support = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id]) 

590 # If this is a missing reading (e.g., a lacuna or an overlap), then we can skip it, as its corresponding set will be empty: 

591 if rdg.type in self.missing_reading_types: 

592 continue 

593 # Otherwise, if this reading is trivial, then it will contain an entry for the index of its parent substantive reading: 

594 elif rdg.type in self.trivial_reading_types: 

595 rdg_support[reading_id_to_index[rdg.id]] = 1 

596 # Otherwise, if this reading has one or more nonzero certainty degrees, 

597 # then set the entries for these readings to their degrees: 

598 elif sum(rdg.certainties.values()) > 0: 

599 for t in rdg.certainties: 

600 # Skip any reading whose ID is unrecognized in this unit: 

601 if t in reading_id_to_index: 

602 rdg_support[reading_id_to_index[t]] = rdg.certainties[t] 

603 # Otherwise, if this reading has one or more targets (i.e., if it is an ambiguous reading), 

604 # then set the entries for each of its targets to 1: 

605 elif len(rdg.targets) > 0: 

606 for t in rdg.targets: 

607 # Skip any reading whose ID is unrecognized in this unit: 

608 if t in reading_id_to_index: 

609 rdg_support[reading_id_to_index[t]] = 1 

610 # Otherwise, this reading is itself substantive; set the entry for the index of this reading to 1: 

611 else: 

612 rdg_support[reading_id_to_index[rdg.id]] = 1 

613 # Proceed for each witness siglum in the support for this reading: 

614 for wit in rdg.wits: 

615 # Is this siglum a base siglum? 

616 base_wit = self.get_base_wit(wit) 

617 if base_wit not in self.witness_index_by_id: 

618 # If it is not, then it is probably just because we've encountered a corrector or some other secondary witness not included in the witness list; 

619 # report this if we're in verbose mode and move on: 

620 if self.verbose: 

621 print( 

622 "Skipping unknown witness siglum %s (base siglum %s) in variation unit %s, reading %s..." 

623 % (wit, base_wit, vu.id, rdg.id) 

624 ) 

625 continue 

626 # If we've found a base siglum, then add this reading's contribution to the base witness's reading set for this unit; 

627 # normally the existing set will be empty, but if we reduce two suffixed sigla to the same base witness, 

628 # then that witness may attest to multiple readings in the same unit: 

629 readings_by_witness_for_unit[base_wit] = [ 

630 (min(readings_by_witness_for_unit[base_wit][i] + rdg_support[i], 1)) 

631 for i in range(len(rdg_support)) 

632 ] 

633 return readings_by_witness_for_unit 

634 

635 def parse_readings_by_witness(self): 

636 """Populates the internal dictionary mapping witness IDs to a list of their reading support sets for all variation units, and then fills the empty reading support sets for witnesses of type "corrector" with the entries of the previous witness.""" 

637 if self.verbose: 

638 print("Populating internal dictionary of witness readings...") 

639 t0 = time.time() 

640 # Initialize the data structures to be populated here: 

641 self.readings_by_witness = {} 

642 self.variation_unit_ids = [] 

643 for wit in self.witnesses: 

644 self.readings_by_witness[wit.id] = [] 

645 # Populate them for each variation unit: 

646 for vu in self.variation_units: 

647 readings_by_witness_for_unit = self.get_readings_by_witness_for_unit(vu) 

648 if len(readings_by_witness_for_unit) > 0: 

649 self.variation_unit_ids.append(vu.id) 

650 for wit in readings_by_witness_for_unit: 

651 self.readings_by_witness[wit].append(readings_by_witness_for_unit[wit]) 

652 # Optionally, fill the lacunae of the correctors: 

653 if self.fill_corrector_lacunae: 

654 for i, wit in enumerate(self.witnesses): 

655 # If this is the first witness, then it shouldn't be a corrector (since there is no previous witness against which to compare it): 

656 if i == 0: 

657 continue 

658 # Otherwise, if this witness is not a corrector, then skip it: 

659 if wit.type != "corrector": 

660 continue 

661 # Otherwise, retrieve the previous witness: 

662 prev_wit = self.witnesses[i - 1] 

663 for j in range(len(self.readings_by_witness[wit.id])): 

664 # If the corrector has no reading, then set it to the previous witness's reading: 

665 if sum(self.readings_by_witness[wit.id][j]) == 0: 

666 self.readings_by_witness[wit.id][j] = self.readings_by_witness[prev_wit.id][j] 

667 t1 = time.time() 

668 if self.verbose: 

669 print( 

670 "Populated dictionary for %d witnesses over %d substantive variation units in %0.4fs." 

671 % (len(self.witnesses), len(self.variation_unit_ids), t1 - t0) 

672 ) 

673 return 

674 

675 def filter_fragmentary_witnesses(self, xml): 

676 """Filters the original witness list and readings by witness dictionary to exclude witnesses whose proportions of extant passages fall below the fragmentary readings threshold.""" 

677 if self.verbose: 

678 print( 

679 "Filtering fragmentary witnesses (extant in < %f of all variation units) out of internal witness list and dictionary of witness readings..." 

680 % self.fragmentary_threshold 

681 ) 

682 t0 = time.time() 

683 fragmentary_witness_set = set() 

684 # Proceed for each witness in order: 

685 for wit in self.witnesses: 

686 wit_id = wit.id 

687 # We count the number of variation units at which this witness has an extant (i.e., non-missing) reading: 

688 extant_reading_count = 0 

689 total_reading_count = len(self.readings_by_witness[wit.id]) 

690 # Proceed through all reading support lists: 

691 for rdg_support in self.readings_by_witness[wit_id]: 

692 # If the current reading support list is not all zeroes, then increment this witness's count of extant readings: 

693 if sum(rdg_support) != 0: 

694 extant_reading_count += 1 

695 # If the proportion of extant readings falls below the threshold, then add this witness to the list of fragmentary witnesses: 

696 if extant_reading_count / total_reading_count < self.fragmentary_threshold: 

697 fragmentary_witness_set.add(wit_id) 

698 # Then filter the witness list to exclude the fragmentary witnesses: 

699 filtered_witnesses = [wit for wit in self.witnesses if wit.id not in fragmentary_witness_set] 

700 self.witnesses = filtered_witnesses 

701 # Then remove the entries for the fragmentary witnesses from the witnesses-to-readings dictionary: 

702 for wit_id in fragmentary_witness_set: 

703 del self.readings_by_witness[wit_id] 

704 t1 = time.time() 

705 if self.verbose: 

706 print( 

707 "Filtered out %d fragmentary witness(es) (%s) in %0.4fs." 

708 % (len(fragmentary_witness_set), str(list(fragmentary_witness_set)), t1 - t0) 

709 ) 

710 return 

711 

712 def get_nexus_symbols(self): 

713 """Returns a list of one-character symbols needed to represent the states of all substantive readings in NEXUS. 

714 

715 The number of symbols equals the maximum number of substantive readings at any variation unit. 

716 

717 Returns: 

718 A list of individual characters representing states in readings. 

719 """ 

720 # NOTE: IQTREE does not appear to support symbols outside of 0-9 and a-z, and its base symbols must be case-insensitive. 

721 # The official version of MrBayes is likewise limited to 32 symbols. 

722 # But PAUP* allows up to 64 symbols, and Andrew Edmondson's fork of MrBayes does, as well. 

723 # So this method will support symbols from 0-9, a-z, and A-Z (for a total of 62 states) 

724 possible_symbols = list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase) 

725 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

726 nsymbols = 0 

727 # If there are no witnesses, then no symbols are needed at all: 

728 if len(self.witnesses) == 0: 

729 return [] 

730 wit_id = self.witnesses[0].id 

731 for rdg_support in self.readings_by_witness[wit_id]: 

732 nsymbols = max(nsymbols, len(rdg_support)) 

733 nexus_symbols = possible_symbols[:nsymbols] 

734 return nexus_symbols 

735 

736 def to_nexus( 

737 self, 

738 file_addr: Union[Path, str], 

739 drop_constant: bool = False, 

740 char_state_labels: bool = True, 

741 frequency: bool = False, 

742 ambiguous_as_missing: bool = False, 

743 calibrate_dates: bool = False, 

744 mrbayes: bool = False, 

745 clock_model: ClockModel = ClockModel.strict, 

746 ): 

747 """Writes this Collation to a NEXUS file with the given address. 

748 

749 Args: 

750 file_addr: A string representing the path to an output NEXUS file; the file type should be .nex, .nexus, or .nxs. 

751 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

752 char_state_labels: An optional flag indicating whether or not to include the CharStateLabels block. 

753 frequency: An optional flag indicating whether to use the StatesFormat=Frequency setting 

754 instead of the StatesFormat=StatesPresent setting 

755 (and thus represent all states with frequency vectors rather than symbols). 

756 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation. 

757 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data. 

758 If this flag is set, then only base symbols will be generated for the NEXUS file. 

759 It is only applied if the frequency option is False. 

760 calibrate_dates: An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses. 

761 This option is intended for inputs to BEAST 2. 

762 mrbayes: An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses. 

763 This option is intended for inputs to MrBayes. 

764 clock_model: A ClockModel option indicating which type of clock model to use. 

765 This option is intended for inputs to MrBayes and BEAST 2. 

766 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified. 

767 """ 

768 # Populate a list of sites that will correspond to columns of the sequence alignment: 

769 substantive_variation_unit_ids = self.variation_unit_ids 

770 if drop_constant: 

771 substantive_variation_unit_ids = [ 

772 vu_id 

773 for vu_id in self.variation_unit_ids 

774 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

775 ] 

776 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

777 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

778 # Start by calculating the values we will be using here: 

779 ntax = len(self.witnesses) 

780 nchar = len(substantive_variation_unit_ids) 

781 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses] 

782 max_taxlabel_length = max( 

783 [len(taxlabel) for taxlabel in taxlabels] 

784 ) # keep track of the longest taxon label for tabular alignment purposes 

785 charlabels = [slugify(vu_id, lowercase=False, separator='_') for vu_id in substantive_variation_unit_ids] 

786 missing_symbol = '?' 

787 symbols = self.get_nexus_symbols() 

788 # Generate all parent folders for this file that don't already exist: 

789 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

790 with open(file_addr, "w", encoding="utf-8") as f: 

791 # Start with the NEXUS header: 

792 f.write("#NEXUS\n\n") 

793 # Then begin the data block: 

794 f.write("Begin DATA;\n") 

795 # Write the collation matrix dimensions: 

796 f.write("\tDimensions ntax=%d nchar=%d;\n" % (ntax, nchar)) 

797 # Write the format subblock: 

798 f.write("\tFormat\n") 

799 f.write("\t\tDataType=Standard\n") 

800 f.write("\t\tMissing=%s\n" % missing_symbol) 

801 if frequency: 

802 f.write("\t\tStatesFormat=Frequency\n") 

803 f.write("\t\tSymbols=\"%s\";\n" % (" ".join(symbols))) 

804 # If the char_state_labels is set, then write the labels for character-state labels, with each on its own line: 

805 if char_state_labels: 

806 f.write("\tCharStateLabels") 

807 vu_ind = 1 

808 for vu in self.variation_units: 

809 if vu.id not in substantive_variation_unit_ids_set: 

810 continue 

811 if vu_ind == 1: 

812 f.write("\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_'))) 

813 else: 

814 f.write(",\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_'))) 

815 rdg_ind = 0 

816 for rdg in vu.readings: 

817 key = tuple([vu.id, rdg.id]) 

818 if key not in substantive_variation_unit_reading_tuples_set: 

819 continue 

820 ascii_rdg_text = slugify( 

821 rdg.text, lowercase=False, separator='_', replacements=[['η', 'h'], ['ω', 'w']] 

822 ) 

823 if ascii_rdg_text == "": 

824 ascii_rdg_text = "om." 

825 f.write(" %s" % ascii_rdg_text) 

826 rdg_ind += 1 

827 if rdg_ind > 0: 

828 vu_ind += 1 

829 f.write(";\n") 

830 # Write the matrix subblock: 

831 f.write("\tMatrix") 

832 for i, wit in enumerate(self.witnesses): 

833 taxlabel = taxlabels[i] 

834 if frequency: 

835 sequence = "\n\t\t" + taxlabel 

836 for j, vu_id in enumerate(self.variation_unit_ids): 

837 if vu_id not in substantive_variation_unit_ids_set: 

838 continue 

839 rdg_support = self.readings_by_witness[wit.id][j] 

840 sequence += "\n\t\t\t" 

841 # If this reading is lacunose in this witness, then use the missing character: 

842 if sum(rdg_support) == 0: 

843 sequence += missing_symbol 

844 continue 

845 # Otherwise, print out its frequencies for different readings in parentheses: 

846 sequence += "(" 

847 for j, w in enumerate(rdg_support): 

848 sequence += "%s:%0.4f" % (symbols[j], w) 

849 if j < len(rdg_support) - 1: 

850 sequence += " " 

851 sequence += ")" 

852 else: 

853 sequence = "\n\t\t" + taxlabel 

854 # Add enough space after this label ensure that all sequences are nicely aligned: 

855 sequence += " " * (max_taxlabel_length - len(taxlabel) + 1) 

856 for j, vu_id in enumerate(self.variation_unit_ids): 

857 if vu_id not in substantive_variation_unit_ids_set: 

858 continue 

859 rdg_support = self.readings_by_witness[wit.id][j] 

860 # If this reading is lacunose in this witness, then use the missing character: 

861 if sum(rdg_support) == 0: 

862 sequence += missing_symbol 

863 continue 

864 rdg_inds = [ 

865 k for k, w in enumerate(rdg_support) if w > 0 

866 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

867 # For singleton readings, just print the symbol: 

868 if len(rdg_inds) == 1: 

869 sequence += symbols[rdg_inds[0]] 

870 continue 

871 # For multiple readings, print the corresponding readings in braces or the missing symbol depending on input settings: 

872 if ambiguous_as_missing: 

873 sequence += missing_symbol 

874 else: 

875 sequence += "{%s}" % "".join([str(rdg_ind) for rdg_ind in rdg_inds]) 

876 f.write("%s" % (sequence)) 

877 f.write(";\n") 

878 # End the data block: 

879 f.write("End;") 

880 # If calibrate_dates is set, then add the assumptions block: 

881 if calibrate_dates: 

882 f.write("\n\n") 

883 f.write("Begin ASSUMPTIONS;\n") 

884 # Set the scale to years: 

885 f.write("\tOPTIONS SCALE = years;\n\n") 

886 # Then calibrate the witness ages: 

887 calibrate_strings = [] 

888 for i, wit in enumerate(self.witnesses): 

889 taxlabel = taxlabels[i] 

890 date_range = wit.date_range 

891 if date_range[0] is not None: 

892 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution, 

893 # depending on whether the upper and lower bounds match: 

894 min_age = datetime.now().year - date_range[1] 

895 max_age = datetime.now().year - date_range[0] 

896 if min_age == max_age: 

897 calibrate_string = "\tCALIBRATE %s = fixed(%d)" % (taxlabel, min_age) 

898 calibrate_strings.append(calibrate_string) 

899 else: 

900 calibrate_string = "\tCALIBRATE %s = uniform(%d,%d)" % (taxlabel, min_age, max_age) 

901 calibrate_strings.append(calibrate_string) 

902 else: 

903 # If there is no lower bound on the witness's date, then use an offset log-normal distribution: 

904 min_age = datetime.now().year - date_range[1] 

905 calibrate_string = "\tCALIBRATE %s = offsetlognormal(%d,0.0,1.0)" % (taxlabel, min_age) 

906 calibrate_strings.append(calibrate_string) 

907 # Then print the calibrate strings, separated by commas and line breaks and terminated by a semicolon: 

908 f.write("%s;\n\n" % ",\n".join(calibrate_strings)) 

909 # End the assumptions block: 

910 f.write("End;") 

911 # If mrbayes is set, then add the mrbayes block: 

912 if mrbayes: 

913 f.write("\n\n") 

914 f.write("Begin MRBAYES;\n") 

915 # Turn on the autoclose feature by default: 

916 f.write("\tset autoclose=yes;\n") 

917 # Set the branch lengths to be governed by a birth-death clock model, and set up the parameters for this model: 

918 f.write("\n") 

919 f.write("\tprset brlenspr = clock:birthdeath;\n") 

920 f.write("\tprset speciationpr = uniform(0.0,10.0);\n") 

921 f.write("\tprset extinctionpr = beta(2.0,4.0);\n") 

922 f.write("\tprset sampleprob = 0.01;\n") 

923 # Use the specified clock model: 

924 f.write("\n") 

925 if clock_model == clock_model.uncorrelated: 

926 f.write("\tprset clockvarpr=igr;\n") 

927 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n") 

928 f.write("\tprset igrvarpr=exponential(1.0);\n") 

929 else: 

930 f.write("\tprset clockvarpr=strict;\n") 

931 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n") 

932 # Set the priors on the tree age depending on the date range for the origin of the collated work: 

933 f.write("\n") 

934 if self.origin_date_range[0] is not None: 

935 min_tree_age = ( 

936 datetime.now().year - self.origin_date_range[1] 

937 if self.origin_date_range[1] is not None 

938 else 0.0 

939 ) 

940 max_tree_age = datetime.now().year - self.origin_date_range[0] 

941 f.write("\tprset treeagepr = uniform(%d,%d);\n" % (min_tree_age, max_tree_age)) 

942 else: 

943 min_tree_age = ( 

944 datetime.now().year - self.origin_date_range[1] 

945 if self.origin_date_range[1] is not None 

946 else 0.0 

947 ) 

948 f.write("\tprset treeagepr = offsetgamma(%d,1.0,1.0);\n" % (min_tree_age)) 

949 # Then calibrate the witness ages: 

950 f.write("\n") 

951 f.write("\tprset nodeagepr = calibrated;\n") 

952 for i, wit in enumerate(self.witnesses): 

953 taxlabel = taxlabels[i] 

954 date_range = wit.date_range 

955 if date_range[0] is not None: 

956 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution, 

957 # depending on whether the upper and lower bounds match: 

958 min_age = datetime.now().year - date_range[1] 

959 max_age = datetime.now().year - date_range[0] 

960 if min_age == max_age: 

961 f.write("\tcalibrate %s = fixed(%d);\n" % (taxlabel, min_age)) 

962 else: 

963 f.write("\tcalibrate %s = uniform(%d,%d);\n" % (taxlabel, min_age, max_age)) 

964 else: 

965 # If there is no lower bound on the witness's date, then use an offset gamma distribution: 

966 min_age = datetime.now().year - date_range[1] 

967 f.write("\tcalibrate %s = offsetgamma(%d,1.0,1.0);\n" % (taxlabel, min_age)) 

968 f.write("\n") 

969 # Add default settings for MCMC estimation of posterior distribution: 

970 f.write("\tmcmcp ngen=100000;\n") 

971 # Write the command to run MrBayes: 

972 f.write("\tmcmc;\n") 

973 # End the assumptions block: 

974 f.write("End;") 

975 return 

976 

977 def get_hennig86_symbols(self): 

978 """Returns a list of one-character symbols needed to represent the states of all substantive readings in Hennig86 format. 

979 

980 The number of symbols equals the maximum number of substantive readings at any variation unit. 

981 

982 Returns: 

983 A list of individual characters representing states in readings. 

984 """ 

985 possible_symbols = ( 

986 list(string.digits) + list(string.ascii_uppercase)[:22] 

987 ) # NOTE: the maximum number of symbols allowed in Hennig86 format is 32 

988 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

989 nsymbols = 0 

990 # If there are no witnesses, then no symbols are needed at all: 

991 if len(self.witnesses) == 0: 

992 return [] 

993 wit_id = self.witnesses[0].id 

994 for rdg_support in self.readings_by_witness[wit_id]: 

995 nsymbols = max(nsymbols, len(rdg_support)) 

996 hennig86_symbols = possible_symbols[:nsymbols] 

997 return hennig86_symbols 

998 

999 def to_hennig86(self, file_addr: Union[Path, str], drop_constant: bool = False): 

1000 """Writes this Collation to a file in Hennig86 format with the given address. 

1001 Note that because Hennig86 format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data. 

1002 

1003 Args: 

1004 file_addr: A string representing the path to an output file. 

1005 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1006 """ 

1007 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1008 substantive_variation_unit_ids = self.variation_unit_ids 

1009 if drop_constant: 

1010 substantive_variation_unit_ids = [ 

1011 vu_id 

1012 for vu_id in self.variation_unit_ids 

1013 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1014 ] 

1015 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1016 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1017 # Start by calculating the values we will be using here: 

1018 ntax = len(self.witnesses) 

1019 nchar = len(substantive_variation_unit_ids) 

1020 taxlabels = [] 

1021 for wit in self.witnesses: 

1022 taxlabel = wit.id 

1023 # Hennig86 format requires taxon names to start with a letter, so if this is not the case, then append "WIT_" to the start of the name: 

1024 if taxlabel[0] not in string.ascii_letters: 

1025 taxlabel = "WIT_" + taxlabel 

1026 # Then replace any disallowed characters in the string with an underscore: 

1027 taxlabel = slugify(taxlabel, lowercase=False, separator='_') 

1028 taxlabels.append(taxlabel) 

1029 max_taxlabel_length = max( 

1030 [len(taxlabel) for taxlabel in taxlabels] 

1031 ) # keep track of the longest taxon label for tabular alignment purposes 

1032 missing_symbol = '?' 

1033 symbols = self.get_hennig86_symbols() 

1034 # Generate all parent folders for this file that don't already exist: 

1035 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1036 with open(file_addr, "w", encoding="ascii") as f: 

1037 # Start with the nstates header: 

1038 f.write("nstates %d;\n" % len(symbols)) 

1039 # Then begin the xread block: 

1040 f.write("xread\n") 

1041 # Write the dimensions: 

1042 f.write("%d %d\n" % (nchar, ntax)) 

1043 # Now write the matrix: 

1044 for i, wit in enumerate(self.witnesses): 

1045 taxlabel = taxlabels[i] 

1046 # Add enough space after this label ensure that all sequences are nicely aligned: 

1047 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel) + 1)) 

1048 for j, vu_id in enumerate(self.variation_unit_ids): 

1049 if vu_id not in substantive_variation_unit_ids_set: 

1050 continue 

1051 rdg_support = self.readings_by_witness[wit.id][j] 

1052 # If this reading is lacunose in this witness, then use the missing character: 

1053 if sum(rdg_support) == 0: 

1054 sequence += missing_symbol 

1055 continue 

1056 rdg_inds = [ 

1057 k for k, w in enumerate(rdg_support) if w > 0 

1058 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1059 # For singleton readings, just print the symbol: 

1060 if len(rdg_inds) == 1: 

1061 sequence += symbols[rdg_inds[0]] 

1062 continue 

1063 # For multiple readings, print the missing symbol: 

1064 sequence += missing_symbol 

1065 f.write("%s\n" % (sequence)) 

1066 f.write(";") 

1067 return 

1068 

1069 def get_phylip_symbols(self): 

1070 """Returns a list of one-character symbols needed to represent the states of all substantive readings in PHYLIP format. 

1071 

1072 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1073 

1074 Returns: 

1075 A list of individual characters representing states in readings. 

1076 """ 

1077 possible_symbols = ( 

1078 list(string.digits) + list(string.ascii_lowercase)[:22] 

1079 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported 

1080 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1081 nsymbols = 0 

1082 # If there are no witnesses, then no symbols are needed at all: 

1083 if len(self.witnesses) == 0: 

1084 return [] 

1085 wit_id = self.witnesses[0].id 

1086 for rdg_support in self.readings_by_witness[wit_id]: 

1087 nsymbols = max(nsymbols, len(rdg_support)) 

1088 phylip_symbols = possible_symbols[:nsymbols] 

1089 return phylip_symbols 

1090 

1091 def to_phylip(self, file_addr: Union[Path, str], drop_constant: bool = False): 

1092 """Writes this Collation to a file in PHYLIP format with the given address. 

1093 Note that because PHYLIP format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data. 

1094 

1095 Args: 

1096 file_addr: A string representing the path to an output file. 

1097 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1098 """ 

1099 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1100 substantive_variation_unit_ids = self.variation_unit_ids 

1101 if drop_constant: 

1102 substantive_variation_unit_ids = [ 

1103 vu_id 

1104 for vu_id in self.variation_unit_ids 

1105 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1106 ] 

1107 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1108 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1109 # Start by calculating the values we will be using here: 

1110 ntax = len(self.witnesses) 

1111 nchar = len(substantive_variation_unit_ids) 

1112 taxlabels = [] 

1113 for wit in self.witnesses: 

1114 taxlabel = wit.id 

1115 # Then replace any disallowed characters in the string with an underscore: 

1116 taxlabel = slugify(taxlabel, lowercase=False, separator='_') 

1117 taxlabels.append(taxlabel) 

1118 max_taxlabel_length = max( 

1119 [len(taxlabel) for taxlabel in taxlabels] 

1120 ) # keep track of the longest taxon label for tabular alignment purposes 

1121 missing_symbol = '?' 

1122 symbols = self.get_phylip_symbols() 

1123 # Generate all parent folders for this file that don't already exist: 

1124 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1125 with open(file_addr, "w", encoding="ascii") as f: 

1126 # Write the dimensions: 

1127 f.write("%d %d\n" % (ntax, nchar)) 

1128 # Now write the matrix: 

1129 for i, wit in enumerate(self.witnesses): 

1130 taxlabel = taxlabels[i] 

1131 # Add enough space after this label ensure that all sequences are nicely aligned: 

1132 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel))) + "\t" 

1133 for j, vu_id in enumerate(self.variation_unit_ids): 

1134 if vu_id not in substantive_variation_unit_ids_set: 

1135 continue 

1136 rdg_support = self.readings_by_witness[wit.id][j] 

1137 # If this reading is lacunose in this witness, then use the missing character: 

1138 if sum(rdg_support) == 0: 

1139 sequence += missing_symbol 

1140 continue 

1141 rdg_inds = [ 

1142 k for k, w in enumerate(rdg_support) if w > 0 

1143 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1144 # For singleton readings, just print the symbol: 

1145 if len(rdg_inds) == 1: 

1146 sequence += symbols[rdg_inds[0]] 

1147 continue 

1148 # For multiple readings, print the missing symbol: 

1149 sequence += missing_symbol 

1150 f.write("%s\n" % (sequence)) 

1151 return 

1152 

1153 def get_fasta_symbols(self): 

1154 """Returns a list of one-character symbols needed to represent the states of all substantive readings in FASTA format. 

1155 

1156 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1157 

1158 Returns: 

1159 A list of individual characters representing states in readings. 

1160 """ 

1161 possible_symbols = ( 

1162 list(string.digits) + list(string.ascii_lowercase)[:22] 

1163 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported 

1164 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1165 nsymbols = 0 

1166 # If there are no witnesses, then no symbols are needed at all: 

1167 if len(self.witnesses) == 0: 

1168 return [] 

1169 wit_id = self.witnesses[0].id 

1170 for rdg_support in self.readings_by_witness[wit_id]: 

1171 nsymbols = max(nsymbols, len(rdg_support)) 

1172 fasta_symbols = possible_symbols[:nsymbols] 

1173 return fasta_symbols 

1174 

1175 def to_fasta(self, file_addr: Union[Path, str], drop_constant: bool = False): 

1176 """Writes this Collation to a file in FASTA format with the given address. 

1177 Note that because FASTA format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data. 

1178 

1179 Args: 

1180 file_addr: A string representing the path to an output file. 

1181 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1182 """ 

1183 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1184 substantive_variation_unit_ids = self.variation_unit_ids 

1185 if drop_constant: 

1186 substantive_variation_unit_ids = [ 

1187 vu_id 

1188 for vu_id in self.variation_unit_ids 

1189 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1190 ] 

1191 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1192 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1193 # Start by calculating the values we will be using here: 

1194 ntax = len(self.witnesses) 

1195 nchar = len(substantive_variation_unit_ids) 

1196 taxlabels = [] 

1197 for wit in self.witnesses: 

1198 taxlabel = wit.id 

1199 # Then replace any disallowed characters in the string with an underscore: 

1200 taxlabel = slugify(taxlabel, lowercase=False, separator='_') 

1201 taxlabels.append(taxlabel) 

1202 max_taxlabel_length = max( 

1203 [len(taxlabel) for taxlabel in taxlabels] 

1204 ) # keep track of the longest taxon label for tabular alignment purposes 

1205 missing_symbol = '?' 

1206 symbols = self.get_fasta_symbols() 

1207 # Generate all parent folders for this file that don't already exist: 

1208 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1209 with open(file_addr, "w", encoding="ascii") as f: 

1210 # Now write the matrix: 

1211 for i, wit in enumerate(self.witnesses): 

1212 taxlabel = taxlabels[i] 

1213 # Add enough space after this label ensure that all sequences are nicely aligned: 

1214 sequence = ">%s\n" % taxlabel 

1215 for j, vu_id in enumerate(self.variation_unit_ids): 

1216 if vu_id not in substantive_variation_unit_ids_set: 

1217 continue 

1218 rdg_support = self.readings_by_witness[wit.id][j] 

1219 # If this reading is lacunose in this witness, then use the missing character: 

1220 if sum(rdg_support) == 0: 

1221 sequence += missing_symbol 

1222 continue 

1223 rdg_inds = [ 

1224 k for k, w in enumerate(rdg_support) if w > 0 

1225 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1226 # For singleton readings, just print the symbol: 

1227 if len(rdg_inds) == 1: 

1228 sequence += symbols[rdg_inds[0]] 

1229 continue 

1230 # For multiple readings, print the missing symbol: 

1231 sequence += missing_symbol 

1232 f.write("%s\n" % (sequence)) 

1233 return 

1234 

1235 def get_beast_symbols(self): 

1236 """Returns a list of one-character symbols needed to represent the states of all substantive readings in BEAST format. 

1237 

1238 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1239 

1240 Returns: 

1241 A list of individual characters representing states in readings. 

1242 """ 

1243 possible_symbols = ( 

1244 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase) 

1245 ) # NOTE: for BEAST, any number of states should theoretically be permissible, but since code maps are required for some reason, we will limit the number of symbols to 62 for now 

1246 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1247 nsymbols = 0 

1248 # If there are no witnesses, then no symbols are needed at all: 

1249 if len(self.witnesses) == 0: 

1250 return [] 

1251 wit_id = self.witnesses[0].id 

1252 for rdg_support in self.readings_by_witness[wit_id]: 

1253 nsymbols = max(nsymbols, len(rdg_support)) 

1254 beast_symbols = possible_symbols[:nsymbols] 

1255 return beast_symbols 

1256 

1257 def get_tip_date_range(self): 

1258 """Gets the minimum and maximum dates attested among the witnesses. 

1259 Also checks if the witness with the latest possible date has a fixed date 

1260 (i.e, if the lower and upper bounds for its date are the same) 

1261 and issues a warning if not, as this will cause unusual behavior in BEAST 2. 

1262 

1263 Returns: 

1264 A tuple containing the earliest and latest possible tip dates. 

1265 """ 

1266 earliest_date = None 

1267 earliest_wit = None 

1268 latest_date = None 

1269 latest_wit = None 

1270 for wit in self.witnesses: 

1271 wit_id = wit.id 

1272 date_range = wit.date_range 

1273 if date_range[0] is not None: 

1274 if earliest_date is not None: 

1275 earliest_wit = wit if date_range[0] < earliest_date else earliest_wit 

1276 earliest_date = min(date_range[0], earliest_date) 

1277 else: 

1278 earliest_wit = wit 

1279 earliest_date = date_range[0] 

1280 if date_range[1] is not None: 

1281 if latest_date is not None: 

1282 latest_wit = ( 

1283 wit 

1284 if (date_range[1] > latest_date or (date_range[0] == date_range[1] == latest_date)) 

1285 else latest_wit 

1286 ) # the second check ensures that a witness with a fixed date is preferred to a witness with a date range that ends with the same date 

1287 latest_date = max(date_range[1], latest_date) 

1288 else: 

1289 latest_wit = wit 

1290 latest_date = date_range[1] 

1291 if latest_wit.date_range[0] is None or latest_wit.date_range[0] != latest_wit.date_range[1]: 

1292 print( 

1293 "WARNING: the latest witness, %s, has a variable date range; this will result in problems with time-dependent substitution models and misalignment of trees in BEAST DensiTree outputs! Please ensure that witness %s has a fixed date." 

1294 % (latest_wit.id, latest_wit.id) 

1295 ) 

1296 return (earliest_date, latest_date) 

1297 

1298 def get_beast_origin_span(self, tip_date_range): 

1299 """Returns a tuple containing the lower and upper bounds for the height of the origin of the Birth-Death Skyline model. 

1300 The upper bound on the height of the tree is the difference between the latest tip date 

1301 and the lower bound on the date of the original work, if both are defined; 

1302 otherwise, it is left undefined. 

1303 The lower bound on the height of the tree is the difference between the latest tip date 

1304 and the upper bound on the date of the original work, if both are defined; 

1305 otherwise, it is the difference between the earliest tip date and the latest, if both are defined. 

1306 

1307 Args: 

1308 tip_date_range: A tuple containing the earliest and latest possible tip dates. 

1309 

1310 Returns: 

1311 A tuple containing lower and upper bounds on the origin height for the Birth-Death Skyline model. 

1312 """ 

1313 origin_span = [0, None] 

1314 # If the upper bound on the date of the work's composition is defined, then set the lower bound on the height of the origin using it and the latest tip date 

1315 # (note that if it had to be defined in terms of witness date lower bounds, then this would have happened already): 

1316 if self.origin_date_range[1] is not None: 

1317 origin_span[0] = tip_date_range[1] - self.origin_date_range[1] 

1318 # If the lower bound on the date of the work's composition is defined, then set the upper bound on the height of the origin using it and the latest tip date: 

1319 if self.origin_date_range[0] is not None: 

1320 origin_span[1] = tip_date_range[1] - self.origin_date_range[0] 

1321 return tuple(origin_span) 

1322 

1323 def get_beast_date_map(self, taxlabels): 

1324 """Returns a string representing witness-to-date mappings in BEAST format. 

1325 

1326 Since this format requires single dates as opposed to date ranges, 

1327 witnesses with closed date ranges will be mapped to the average of their lower and upper bounds, 

1328 and witnesses with open date ranges will not be mapped. 

1329 

1330 Args: 

1331 taxlabels: A list of slugified taxon labels. 

1332 

1333 Returns: 

1334 A string containing comma-separated date calibrations of the form witness_id=date. 

1335 """ 

1336 calibrate_strings = [] 

1337 for i, wit in enumerate(self.witnesses): 

1338 taxlabel = taxlabels[i] 

1339 date_range = wit.date_range 

1340 # If either end of this witness's date range is empty, then do not include it: 

1341 if date_range[0] is None or date_range[1] is None: 

1342 continue 

1343 # Otherwise, take the midpoint of its date range as its date: 

1344 date = int((date_range[0] + date_range[1]) / 2) 

1345 calibrate_string = "%s=%d" % (taxlabel, date) 

1346 calibrate_strings.append(calibrate_string) 

1347 # Then output the full date map string: 

1348 date_map = ",".join(calibrate_strings) 

1349 return date_map 

1350 

1351 def get_beast_code_map_for_unit(self, symbols, missing_symbol, vu_ind): 

1352 """Returns a string containing state/reading code mappings in BEAST format using the given single-state and missing state symbols for the character/variation unit at the given index. 

1353 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a code for a dummy state will be included. 

1354 

1355 Args: 

1356 vu_ind: An integer index for the desired unit. 

1357 

1358 Returns: 

1359 A string containing comma-separated code mappings. 

1360 """ 

1361 vu = self.variation_units[vu_ind] 

1362 vu_id = vu.id 

1363 code_map = {} 

1364 for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id])): 

1365 code_map[symbols[k]] = str(k) 

1366 # If this site is a singleton site, then add a code mapping for the dummy state: 

1367 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1368 code_map[symbols[1]] = str(1) 

1369 # Then add a mapping for the missing state, including a dummy state if this is a singleton site: 

1370 code_map[missing_symbol] = " ".join( 

1371 str(k) for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id])) 

1372 ) 

1373 # If this site is a singleton site, then add the dummy state to the missing state mapping: 

1374 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1375 code_map[missing_symbol] = code_map[missing_symbol] + " " + str(1) 

1376 # Then combine all of the mappings into a single string: 

1377 code_map_string = ", ".join([code + "=" + code_map[code] for code in code_map]) 

1378 return code_map_string 

1379 

1380 def get_beast_equilibrium_frequencies_for_unit(self, vu_ind): 

1381 """Returns a string containing state/reading equilibrium frequencies in BEAST format for the character/variation unit at the given index. 

1382 Since the equilibrium frequencies are not used with the substitution models, the equilibrium frequencies simply correspond to a uniform distribution over the states. 

1383 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then an equilibrium frequency of 0 will be added for a dummy state. 

1384 

1385 Args: 

1386 vu_ind: An integer index for the desired unit. 

1387 

1388 Returns: 

1389 A string containing space-separated equilibrium frequencies. 

1390 """ 

1391 vu = self.variation_units[vu_ind] 

1392 vu_id = vu.id 

1393 # If this unit is a singleton, then return the string "0.5 0.5": 

1394 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1: 

1395 return "0.5 0.5" 

1396 # Otherwise, set the equilibrium frequencies according to a uniform distribution: 

1397 equilibrium_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len( 

1398 self.substantive_readings_by_variation_unit_id[vu_id] 

1399 ) 

1400 equilibrium_frequencies_string = " ".join([str(w) for w in equilibrium_frequencies]) 

1401 return equilibrium_frequencies_string 

1402 

1403 def get_beast_root_frequencies_for_unit(self, vu_ind): 

1404 """Returns a string containing state/reading root frequencies in BEAST format for the character/variation unit at the given index. 

1405 The root frequencies are calculated from the intrinsic odds at this unit. 

1406 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a root frequency of 0 will be added for a dummy state. 

1407 If no intrinsic odds are specified, then a uniform distribution over all states is assumed. 

1408 

1409 Args: 

1410 vu_ind: An integer index for the desired unit. 

1411 

1412 Returns: 

1413 A string containing space-separated root frequencies. 

1414 """ 

1415 vu = self.variation_units[vu_ind] 

1416 vu_id = vu.id 

1417 intrinsic_relations = vu.intrinsic_relations 

1418 intrinsic_odds_by_id = self.intrinsic_odds_by_id 

1419 # If this unit is a singleton, then return the string "1 0": 

1420 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1: 

1421 return "1 0" 

1422 # If this unit has no intrinsic odds, then assume a uniform distribution over all readings: 

1423 if len(intrinsic_relations) == 0: 

1424 root_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len( 

1425 self.substantive_readings_by_variation_unit_id[vu_id] 

1426 ) 

1427 root_frequencies_string = " ".join([str(w) for w in root_frequencies]) 

1428 return root_frequencies_string 

1429 # We will populate the root frequencies based on the intrinsic odds of the readings: 

1430 root_frequencies_by_id = {} 

1431 for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id]: 

1432 root_frequencies_by_id[rdg_id] = 0 

1433 # First, construct an adjacency list for efficient edge iteration: 

1434 neighbors_by_source = {} 

1435 for edge in intrinsic_relations: 

1436 s = edge[0] 

1437 t = edge[1] 

1438 if s not in neighbors_by_source: 

1439 neighbors_by_source[s] = [] 

1440 if t not in neighbors_by_source: 

1441 neighbors_by_source[t] = [] 

1442 neighbors_by_source[s].append(t) 

1443 # Next, identify all readings that are not targeted by any intrinsic odds relation: 

1444 in_degree_by_reading = {} 

1445 for edge in intrinsic_relations: 

1446 s = edge[0] 

1447 t = edge[1] 

1448 if s not in in_degree_by_reading: 

1449 in_degree_by_reading[s] = 0 

1450 if t not in in_degree_by_reading: 

1451 in_degree_by_reading[t] = 0 

1452 in_degree_by_reading[t] += 1 

1453 starting_nodes = [t for t in in_degree_by_reading if in_degree_by_reading[t] == 0] 

1454 # Set the root frequencies for these readings to 1 (they will be normalized later): 

1455 for starting_node in starting_nodes: 

1456 root_frequencies_by_id[starting_node] = 1.0 

1457 # Next, set the frequencies for the remaining readings recursively using the adjacency list: 

1458 def update_root_frequencies(s): 

1459 for t in neighbors_by_source[s]: 

1460 intrinsic_category = intrinsic_relations[(s, t)] 

1461 odds = ( 

1462 intrinsic_odds_by_id[intrinsic_category] 

1463 if intrinsic_odds_by_id[intrinsic_category] is not None 

1464 else 1.0 

1465 ) # TODO: This needs to be handled using parameters once we have it implemented in BEAST 

1466 root_frequencies_by_id[t] = root_frequencies_by_id[s] / odds 

1467 update_root_frequencies(t) 

1468 return 

1469 

1470 for starting_node in starting_nodes: 

1471 update_root_frequencies(starting_node) 

1472 # Then produce a normalized vector of root frequencies that corresponds to a probability distribution: 

1473 root_frequencies = [ 

1474 root_frequencies_by_id[rdg_id] for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id] 

1475 ] 

1476 total_frequencies = sum(root_frequencies) 

1477 for k in range(len(root_frequencies)): 

1478 root_frequencies[k] = root_frequencies[k] / total_frequencies 

1479 root_frequencies_string = " ".join([str(w) for w in root_frequencies]) 

1480 return root_frequencies_string 

1481 

1482 def to_beast( 

1483 self, 

1484 file_addr: Union[Path, str], 

1485 drop_constant: bool = False, 

1486 clock_model: ClockModel = ClockModel.strict, 

1487 ancestral_logger: AncestralLogger = AncestralLogger.state, 

1488 seed: int = None, 

1489 ): 

1490 """Writes this Collation to a file in BEAST format with the given address. 

1491 

1492 Args: 

1493 file_addr: A string representing the path to an output file. 

1494 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1495 clock_model: A ClockModel option indicating which clock model to use. 

1496 ancestral_logger: An AncestralLogger option indicating which class of logger (if any) to use for ancestral states. 

1497 seed: A seed for random number generation (for setting initial values of unspecified transcriptional rates). 

1498 """ 

1499 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1500 substantive_variation_unit_ids = self.variation_unit_ids 

1501 if drop_constant: 

1502 substantive_variation_unit_ids = [ 

1503 vu_id 

1504 for vu_id in self.variation_unit_ids 

1505 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1506 ] 

1507 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1508 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1509 # First, calculate the values we will be using for the main template: 

1510 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses] 

1511 missing_symbol = '?' 

1512 symbols = self.get_beast_symbols() 

1513 tip_date_range = self.get_tip_date_range() 

1514 origin_span = self.get_beast_origin_span(tip_date_range) 

1515 date_map = self.get_beast_date_map(taxlabels) 

1516 # Then populate the necessary objects for the BEAST XML Jinja template: 

1517 witness_objects = [] 

1518 variation_unit_objects = [] 

1519 intrinsic_category_objects = [] 

1520 transcriptional_category_objects = [] 

1521 # Start with witnesses: 

1522 for i, wit in enumerate(self.witnesses): 

1523 witness_object = {} 

1524 # Copy the ID for this witness: 

1525 witness_object["id"] = wit.id 

1526 # Copy its date bounds: 

1527 witness_object["min_date"] = wit.date_range[0] 

1528 witness_object["max_date"] = wit.date_range[1] 

1529 # Populate its sequence from its entries in the witness's readings dictionary: 

1530 sequence = "" 

1531 for j, rdg_support in enumerate(self.readings_by_witness[wit.id]): 

1532 vu_id = self.variation_unit_ids[j] 

1533 # Skip any variation units deemed non-substantive: 

1534 if vu_id not in substantive_variation_unit_ids: 

1535 continue 

1536 # If this witness has a certainty of 0 for all readings, then it is a gap; assign a likelihood of 1 to each reading: 

1537 if sum(rdg_support) == 0: 

1538 for k, w in enumerate(rdg_support): 

1539 sequence += "1" 

1540 if k < len(rdg_support) - 1: 

1541 sequence += ", " 

1542 else: 

1543 if len(rdg_support) > 1: 

1544 sequence += "; " 

1545 else: 

1546 # If this site is a singleton site, then add a dummy state: 

1547 sequence += ", 0; " 

1548 # Otherwise, read the probabilities as they are given: 

1549 else: 

1550 for k, w in enumerate(rdg_support): 

1551 sequence += str(w) 

1552 if k < len(rdg_support) - 1: 

1553 sequence += ", " 

1554 else: 

1555 if len(rdg_support) > 1: 

1556 sequence += "; " 

1557 else: 

1558 # If this site is a singleton site, then add a dummy state: 

1559 sequence += ", 0; " 

1560 # Strip the final semicolon and space from the sequence: 

1561 sequence = sequence.strip("; ") 

1562 # Then set the witness object's sequence attribute to this string: 

1563 witness_object["sequence"] = sequence 

1564 witness_objects.append(witness_object) 

1565 # Then proceed to variation units: 

1566 for j, vu in enumerate(self.variation_units): 

1567 if vu.id not in substantive_variation_unit_ids_set: 

1568 continue 

1569 variation_unit_object = {} 

1570 # Copy the ID of this variation unit: 

1571 variation_unit_object["id"] = vu.id 

1572 # Copy this variation unit's number of substantive readings, 

1573 # setting it to 2 if it is a singleton unit: 

1574 variation_unit_object["nstates"] = ( 

1575 len(self.substantive_readings_by_variation_unit_id[vu.id]) 

1576 if len(self.substantive_readings_by_variation_unit_id[vu.id]) > 1 

1577 else 2 

1578 ) 

1579 # Then construct the code map for this unit: 

1580 variation_unit_object["code_map"] = self.get_beast_code_map_for_unit(symbols, missing_symbol, j) 

1581 # Then populate a comma-separated string of reading labels for this unit: 

1582 rdg_texts = [] 

1583 vu_label = vu.id 

1584 for rdg in vu.readings: 

1585 key = tuple([vu.id, rdg.id]) 

1586 if key not in substantive_variation_unit_reading_tuples_set: 

1587 continue 

1588 rdg_text = slugify(rdg.text, lowercase=False, allow_unicode=True, separator='_') 

1589 # Replace any empty reading text with an omission marker: 

1590 if rdg_text == "": 

1591 rdg_text = "om." 

1592 rdg_texts.append(rdg_text) 

1593 # If this site is a singleton site, then add a dummy reading for the dummy state: 

1594 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1595 rdg_texts.append("DUMMY") 

1596 rdg_texts_string = ", ".join(rdg_texts) 

1597 variation_unit_object["rdg_texts"] = rdg_texts_string 

1598 # Then populate this unit's equilibrium frequency string and its root frequency string: 

1599 variation_unit_object["equilibrium_frequencies"] = self.get_beast_equilibrium_frequencies_for_unit(j) 

1600 variation_unit_object["root_frequencies"] = self.get_beast_root_frequencies_for_unit(j) 

1601 # Then populate a dictionary mapping epoch height ranges to lists of off-diagonal entries for substitution models: 

1602 rate_objects_by_epoch_height_range = {} 

1603 epoch_height_ranges = [] 

1604 # Then proceed based on whether the transcriptional relations for this variation unit have been defined: 

1605 if len(vu.transcriptional_relations_by_date_range) == 0: 

1606 # If there are no transcriptional relations, then map the epoch range of (None, None) to their list of off-diagonal entries: 

1607 epoch_height_ranges.append((None, None)) 

1608 rate_objects_by_epoch_height_range[(None, None)] = [] 

1609 rate_objects = rate_objects_by_epoch_height_range[(None, None)] 

1610 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1611 # If this is a singleton site, then use an arbitrary 2x2 rate matrix: 

1612 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1613 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1614 else: 

1615 # If this is a site with multiple substantive readings, but no transcriptional relations list, 

1616 # then use a Lewis Mk substitution matrix with the appropriate number of states: 

1617 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1618 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1619 # Skip diagonal elements: 

1620 if k_1 == k_2: 

1621 continue 

1622 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1623 else: 

1624 # Otherwise, proceed for every date range: 

1625 for date_range in vu.transcriptional_relations_by_date_range: 

1626 # Get the map of transcriptional relations for reference later: 

1627 transcriptional_relations = vu.transcriptional_relations_by_date_range[date_range] 

1628 # Now get the epoch height range corresponding to this date range, and initialize its list in the dictionary: 

1629 epoch_height_range = [None, None] 

1630 epoch_height_range[0] = tip_date_range[1] - date_range[1] if date_range[1] is not None else None 

1631 epoch_height_range[1] = tip_date_range[1] - date_range[0] if date_range[0] is not None else None 

1632 epoch_height_range = tuple(epoch_height_range) 

1633 epoch_height_ranges.append(epoch_height_range) 

1634 rate_objects_by_epoch_height_range[epoch_height_range] = [] 

1635 rate_objects = rate_objects_by_epoch_height_range[epoch_height_range] 

1636 # Then proceed for every pair of readings in this unit: 

1637 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1638 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1639 # Skip diagonal elements: 

1640 if k_1 == k_2: 

1641 continue 

1642 # If the first reading has no transcriptional relation to the second in this unit, then use the default rate: 

1643 if (rdg_id_1, rdg_id_2) not in transcriptional_relations: 

1644 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1645 continue 

1646 # Otherwise, if only one category of transcriptional relations holds between the first and second readings, 

1647 # then use its rate: 

1648 if len(transcriptional_relations[(rdg_id_1, rdg_id_2)]) == 1: 

1649 # If there is only one such category, then add its rate as a standalone var element: 

1650 transcriptional_category = list(transcriptional_relations[(rdg_id_1, rdg_id_2)])[0] 

1651 rate_objects.append( 

1652 {"transcriptional_categories": [transcriptional_category], "expression": None} 

1653 ) 

1654 continue 

1655 # If there is more than one, then add a var element that is a sum of the individual categories' rates: 

1656 transcriptional_categories = list(transcriptional_relations[(rdg_id_1, rdg_id_2)]) 

1657 args = [] 

1658 for transcriptional_category in transcriptional_categories: 

1659 args.append("%s_rate" % transcriptional_category) 

1660 args_string = " ".join(args) 

1661 ops = ["+"] * (len(args) - 1) 

1662 ops_string = " ".join(ops) 

1663 expression_string = " ".join([args_string, ops_string]) 

1664 rate_objects.append( 

1665 { 

1666 "transcriptional_categories": transcriptional_categories, 

1667 "expression": expression_string, 

1668 } 

1669 ) 

1670 # Now reorder the list of epoch height ranges, and get a list of non-null epoch dates in ascending order from the dictionary: 

1671 epoch_height_ranges.reverse() 

1672 epoch_heights = [ 

1673 epoch_height_range[0] for epoch_height_range in epoch_height_ranges if epoch_height_range[0] is not None 

1674 ] 

1675 # Then add all of these data structures to the variation unit object: 

1676 variation_unit_object["epoch_heights"] = epoch_heights 

1677 variation_unit_object["epoch_heights_string"] = " ".join( 

1678 [str(epoch_height) for epoch_height in epoch_heights] 

1679 ) 

1680 variation_unit_object["epoch_height_ranges"] = epoch_height_ranges 

1681 variation_unit_object["epoch_rates"] = [ 

1682 rate_objects_by_epoch_height_range[epoch_height_range] for epoch_height_range in epoch_height_ranges 

1683 ] 

1684 variation_unit_objects.append(variation_unit_object) 

1685 # Then proceed to intrinsic odds categories: 

1686 for intrinsic_category in self.intrinsic_categories: 

1687 intrinsic_category_object = {} 

1688 # Copy the ID of this intrinsic category: 

1689 intrinsic_category_object["id"] = intrinsic_category 

1690 # Then copy the odds factors associated with this intrinsic category, 

1691 # setting it to 1.0 if it is not specified and setting the estimate attribute accordingly: 

1692 odds = self.intrinsic_odds_by_id[intrinsic_category] 

1693 intrinsic_category_object["odds"] = odds if odds is not None else 1.0 

1694 intrinsic_category_object["estimate"] = "false" if odds is not None else "true" 

1695 intrinsic_category_objects.append(intrinsic_category_object) 

1696 # Then proceed to transcriptional rate categories: 

1697 rng = np.random.default_rng(seed) 

1698 for transcriptional_category in self.transcriptional_categories: 

1699 transcriptional_category_object = {} 

1700 # Copy the ID of this transcriptional category: 

1701 transcriptional_category_object["id"] = transcriptional_category 

1702 # Then copy the rate of this transcriptional category, 

1703 # setting it to a random number sampled from a Gamma distribution if it is not specified and setting the estimate attribute accordingly: 

1704 rate = self.transcriptional_rates_by_id[transcriptional_category] 

1705 transcriptional_category_object["rate"] = rate if rate is not None else rng.gamma(5.0, 2.0) 

1706 transcriptional_category_object["estimate"] = "false" if rate is not None else "true" 

1707 transcriptional_category_objects.append(transcriptional_category_object) 

1708 # Now render the output XML file using the Jinja template: 

1709 env = Environment(loader=PackageLoader("teiphy", "templates"), autoescape=select_autoescape()) 

1710 template = env.get_template("beast_template.xml") 

1711 rendered = template.render( 

1712 nsymbols=len(symbols), 

1713 date_map=date_map, 

1714 origin_span=origin_span, 

1715 clock_model=clock_model.value, 

1716 clock_rate_categories=2 * len(self.witnesses) - 1, 

1717 ancestral_logger=ancestral_logger.value, 

1718 witnesses=witness_objects, 

1719 variation_units=variation_unit_objects, 

1720 intrinsic_categories=intrinsic_category_objects, 

1721 transcriptional_categories=transcriptional_category_objects, 

1722 ) 

1723 # Generate all parent folders for this file that don't already exist: 

1724 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1725 with open(file_addr, "w", encoding="utf-8") as f: 

1726 f.write(rendered) 

1727 return 

1728 

1729 def to_numpy(self, drop_constant: bool = False, split_missing: bool = True): 

1730 """Returns this Collation in the form of a NumPy array, along with arrays of its row and column labels. 

1731 

1732 Args: 

1733 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1734 split_missing: An optional flag indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings; if False, then missing data is ignored (i.e., all states are 0). Default value is True. 

1735 

1736 Returns: 

1737 A NumPy array with a row for each substantive reading and a column for each witness. 

1738 A list of substantive reading ID strings. 

1739 A list of witness ID strings. 

1740 """ 

1741 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1742 substantive_variation_unit_ids = self.variation_unit_ids 

1743 if drop_constant: 

1744 substantive_variation_unit_ids = [ 

1745 vu_id 

1746 for vu_id in self.variation_unit_ids 

1747 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1748 ] 

1749 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1750 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1751 # Initialize the output array with the appropriate dimensions: 

1752 reading_labels = [] 

1753 for vu in self.variation_units: 

1754 if vu.id not in substantive_variation_unit_ids_set: 

1755 continue 

1756 for rdg in vu.readings: 

1757 key = tuple([vu.id, rdg.id]) 

1758 if key in substantive_variation_unit_reading_tuples_set: 

1759 reading_labels.append(vu.id + ", " + rdg.text) 

1760 witness_labels = [wit.id for wit in self.witnesses] 

1761 matrix = np.zeros((len(reading_labels), len(witness_labels)), dtype=float) 

1762 # Then populate it with the appropriate values: 

1763 col_ind = 0 

1764 for i, wit in enumerate(self.witnesses): 

1765 row_ind = 0 

1766 for j, vu_id in enumerate(self.variation_unit_ids): 

1767 if vu_id not in substantive_variation_unit_ids_set: 

1768 continue 

1769 rdg_support = self.readings_by_witness[wit.id][j] 

1770 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

1771 if sum(rdg_support) == 0: 

1772 if split_missing: 

1773 for i in range(len(rdg_support)): 

1774 matrix[row_ind, col_ind] = 1 / len(rdg_support) 

1775 row_ind += 1 

1776 else: 

1777 row_ind += len(rdg_support) 

1778 # Otherwise, add its coefficients normally: 

1779 else: 

1780 for i in range(len(rdg_support)): 

1781 matrix[row_ind, col_ind] = rdg_support[i] 

1782 row_ind += 1 

1783 col_ind += 1 

1784 return matrix, reading_labels, witness_labels 

1785 

1786 def to_distance_matrix(self, drop_constant: bool = False, proportion: bool = False, show_ext: bool = False): 

1787 """Transforms this Collation into a NumPy distance matrix between witnesses, along with an array of its labels for the witnesses. 

1788 Distances can be computed either as counts of disagreements (the default setting), or as proportions of disagreements over all variation units where both witnesses have singleton readings. 

1789 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of disagreements. 

1790 

1791 Args: 

1792 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1793 Default value is False. 

1794 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

1795 Default value is False. 

1796 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

1797 should include the number of their extant, unambiguous variation units after the number of their disagreements. 

1798 Default value is False. 

1799 

1800 Returns: 

1801 A NumPy distance matrix with a row and column for each witness. 

1802 A list of witness ID strings. 

1803 """ 

1804 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1805 substantive_variation_unit_ids = self.variation_unit_ids 

1806 if drop_constant: 

1807 substantive_variation_unit_ids = [ 

1808 vu_id 

1809 for vu_id in self.variation_unit_ids 

1810 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1811 ] 

1812 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1813 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1814 # Initialize the output array with the appropriate dimensions: 

1815 witness_labels = [wit.id for wit in self.witnesses] 

1816 # The type of the matrix will depend on the input options: 

1817 matrix = None 

1818 if show_ext: 

1819 matrix = np.full( 

1820 (len(witness_labels), len(witness_labels)), "NA", dtype=object 

1821 ) # strings of the form "disagreements/extant" 

1822 elif proportion: 

1823 matrix = np.full( 

1824 (len(witness_labels), len(witness_labels)), 0.0, dtype=float 

1825 ) # floats of the form disagreements/extant 

1826 else: 

1827 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form disagreements 

1828 for i, wit_1 in enumerate(witness_labels): 

1829 for j, wit_2 in enumerate(witness_labels): 

1830 extant_units = 0 

1831 disagreements = 0 

1832 # If either of the cells for this pair of witnesses has been populated already, 

1833 # then just copy the entry from the other side of the diagonal without recalculating: 

1834 if i > j: 

1835 matrix[i, j] = matrix[j, i] 

1836 continue 

1837 # Otherwise, calculate the number of units where both witnesses have unambiguous readings 

1838 # and the number of units where they disagree: 

1839 for k, vu_id in enumerate(self.variation_unit_ids): 

1840 if vu_id not in substantive_variation_unit_ids_set: 

1841 continue 

1842 wit_1_rdg_support = self.readings_by_witness[wit_1][k] 

1843 wit_2_rdg_support = self.readings_by_witness[wit_2][k] 

1844 wit_1_rdg_inds = [l for l, w in enumerate(wit_1_rdg_support) if w > 0] 

1845 wit_2_rdg_inds = [l for l, w in enumerate(wit_2_rdg_support) if w > 0] 

1846 if len(wit_1_rdg_inds) != 1 or len(wit_2_rdg_inds) != 1: 

1847 continue 

1848 extant_units += 1 

1849 if wit_1_rdg_inds[0] != wit_2_rdg_inds[0]: 

1850 disagreements += 1 

1851 cell_entry = None 

1852 if proportion: 

1853 cell_entry = disagreements / max( 

1854 extant_units, 1 

1855 ) # the max in the denominator is to prevent division by 0; the distance entry will be 0 if the two witnesses have no overlap 

1856 else: 

1857 cell_entry = disagreements 

1858 if show_ext: 

1859 cell_entry = str(cell_entry) + "/" + str(extant_units) 

1860 matrix[i, j] = cell_entry 

1861 return matrix, witness_labels 

1862 

1863 def to_similarity_matrix(self, drop_constant: bool = False, proportion: bool = False, show_ext: bool = False): 

1864 """Transforms this Collation into a NumPy similarity matrix between witnesses, along with an array of its labels for the witnesses. 

1865 Similarities can be computed either as counts of agreements (the default setting), or as proportions of agreements over all variation units where both witnesses have singleton readings. 

1866 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of agreements. 

1867 

1868 Args: 

1869 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1870 Default value is False. 

1871 proportion (bool, optional): An optional flag indicating whether or not to calculate similarities as proportions over extant, unambiguous variation units. 

1872 Default value is False. 

1873 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

1874 should include the number of their extant, unambiguous variation units after the number of agreements. 

1875 Default value is False. 

1876 

1877 Returns: 

1878 A NumPy distance matrix with a row and column for each witness. 

1879 A list of witness ID strings. 

1880 """ 

1881 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1882 substantive_variation_unit_ids = self.variation_unit_ids 

1883 if drop_constant: 

1884 substantive_variation_unit_ids = [ 

1885 vu_id 

1886 for vu_id in self.variation_unit_ids 

1887 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1888 ] 

1889 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1890 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1891 # Initialize the output array with the appropriate dimensions: 

1892 witness_labels = [wit.id for wit in self.witnesses] 

1893 # The type of the matrix will depend on the input options: 

1894 matrix = None 

1895 if show_ext: 

1896 matrix = np.full( 

1897 (len(witness_labels), len(witness_labels)), "NA", dtype=object 

1898 ) # strings of the form "agreements/extant" 

1899 elif proportion: 

1900 matrix = np.full( 

1901 (len(witness_labels), len(witness_labels)), 0.0, dtype=float 

1902 ) # floats of the form agreements/extant 

1903 else: 

1904 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form agreements 

1905 for i, wit_1 in enumerate(witness_labels): 

1906 for j, wit_2 in enumerate(witness_labels): 

1907 extant_units = 0 

1908 agreements = 0 

1909 # If either of the cells for this pair of witnesses has been populated already, 

1910 # then just copy the entry from the other side of the diagonal without recalculating: 

1911 if i > j: 

1912 matrix[i, j] = matrix[j, i] 

1913 continue 

1914 # Otherwise, calculate the number of units where both witnesses have unambiguous readings 

1915 # and the number of units where they agree: 

1916 for k, vu_id in enumerate(self.variation_unit_ids): 

1917 if vu_id not in substantive_variation_unit_ids_set: 

1918 continue 

1919 wit_1_rdg_support = self.readings_by_witness[wit_1][k] 

1920 wit_2_rdg_support = self.readings_by_witness[wit_2][k] 

1921 wit_1_rdg_inds = [l for l, w in enumerate(wit_1_rdg_support) if w > 0] 

1922 wit_2_rdg_inds = [l for l, w in enumerate(wit_2_rdg_support) if w > 0] 

1923 if len(wit_1_rdg_inds) != 1 or len(wit_2_rdg_inds) != 1: 

1924 continue 

1925 extant_units += 1 

1926 if wit_1_rdg_inds[0] == wit_2_rdg_inds[0]: 

1927 agreements += 1 

1928 cell_entry = None 

1929 if proportion: 

1930 cell_entry = agreements / max( 

1931 extant_units, 1 

1932 ) # the max in the denominator is to prevent division by 0; the distance entry will be 0 if the two witnesses have no overlap 

1933 else: 

1934 cell_entry = agreements 

1935 if show_ext: 

1936 cell_entry = str(cell_entry) + "/" + str(extant_units) 

1937 matrix[i, j] = cell_entry 

1938 return matrix, witness_labels 

1939 

1940 def to_nexus_table(self, drop_constant: bool = False, ambiguous_as_missing: bool = False): 

1941 """Returns this Collation in the form of a table with rows for taxa, columns for characters, and reading IDs in cells. 

1942 

1943 Args: 

1944 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1945 Default value is False. 

1946 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data. 

1947 Default value is False. 

1948 

1949 Returns: 

1950 A NumPy array with rows for taxa, columns for characters, and reading IDs in cells. 

1951 A list of substantive reading ID strings. 

1952 A list of witness ID strings. 

1953 """ 

1954 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1955 substantive_variation_unit_ids = self.variation_unit_ids 

1956 if drop_constant: 

1957 substantive_variation_unit_ids = [ 

1958 vu_id 

1959 for vu_id in self.variation_unit_ids 

1960 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1961 ] 

1962 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1963 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1964 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary 

1965 # to the readings' IDs: 

1966 reading_ids_by_indices = {} 

1967 for j, vu in enumerate(self.variation_units): 

1968 if vu.id not in substantive_variation_unit_ids_set: 

1969 continue 

1970 k = 0 

1971 for rdg in vu.readings: 

1972 key = tuple([vu.id, rdg.id]) 

1973 if key not in substantive_variation_unit_reading_tuples_set: 

1974 continue 

1975 indices = tuple([j, k]) 

1976 reading_ids_by_indices[indices] = rdg.id 

1977 k += 1 

1978 # Initialize the output array with the appropriate dimensions: 

1979 missing_symbol = '?' 

1980 witness_labels = [wit.id for wit in self.witnesses] 

1981 matrix = np.full( 

1982 (len(witness_labels), len(substantive_variation_unit_ids)), missing_symbol, dtype=object 

1983 ) # use dtype=object because the maximum string length is not known up front 

1984 # Then populate it with the appropriate values: 

1985 row_ind = 0 

1986 for i, wit in enumerate(self.witnesses): 

1987 col_ind = 0 

1988 for j, vu in enumerate(self.variation_units): 

1989 if vu.id not in substantive_variation_unit_ids_set: 

1990 continue 

1991 rdg_support = self.readings_by_witness[wit.id][j] 

1992 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

1993 if sum(rdg_support) == 0: 

1994 matrix[row_ind, col_ind] = missing_symbol 

1995 # Otherwise, add its coefficients normally: 

1996 else: 

1997 rdg_inds = [ 

1998 k for k, w in enumerate(rdg_support) if w > 0 

1999 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

2000 # For singleton readings, just print the reading ID: 

2001 if len(rdg_inds) == 1: 

2002 k = rdg_inds[0] 

2003 matrix[row_ind, col_ind] = reading_ids_by_indices[(j, k)] 

2004 # For multiple readings, print the corresponding reading IDs in braces or the missing symbol depending on input settings: 

2005 else: 

2006 if ambiguous_as_missing: 

2007 matrix[row_ind, col_ind] = missing_symbol 

2008 else: 

2009 matrix[row_ind, col_ind] = "{%s}" % " ".join( 

2010 [reading_ids_by_indices[(j, k)] for k in rdg_inds] 

2011 ) 

2012 col_ind += 1 

2013 row_ind += 1 

2014 return matrix, witness_labels, substantive_variation_unit_ids 

2015 

2016 def to_long_table(self, drop_constant: bool = False): 

2017 """Returns this Collation in the form of a long table with columns for taxa, characters, reading indices, and reading values. 

2018 Note that this method treats ambiguous readings as missing data. 

2019 

2020 Args: 

2021 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2022 Default value is False. 

2023 

2024 Returns: 

2025 A NumPy array with columns for taxa, characters, reading indices, and reading values, and rows for each combination of these values in the matrix. 

2026 A list of column label strings. 

2027 """ 

2028 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2029 substantive_variation_unit_ids = self.variation_unit_ids 

2030 if drop_constant: 

2031 substantive_variation_unit_ids = [ 

2032 vu_id 

2033 for vu_id in self.variation_unit_ids 

2034 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2035 ] 

2036 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2037 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2038 # Initialize the outputs: 

2039 column_labels = ["taxon", "character", "state", "value"] 

2040 long_table_list = [] 

2041 # Populate a dictionary mapping (variation unit index, reading index) tuples to reading texts: 

2042 reading_texts_by_indices = {} 

2043 for j, vu in enumerate(self.variation_units): 

2044 if vu.id not in substantive_variation_unit_ids_set: 

2045 continue 

2046 k = 0 

2047 for rdg in vu.readings: 

2048 key = tuple([vu.id, rdg.id]) 

2049 if key not in substantive_variation_unit_reading_tuples_set: 

2050 continue 

2051 indices = tuple([j, k]) 

2052 reading_texts_by_indices[indices] = rdg.text 

2053 k += 1 

2054 # Then populate the output list with the appropriate values: 

2055 witness_labels = [wit.id for wit in self.witnesses] 

2056 missing_symbol = '?' 

2057 for i, wit in enumerate(self.witnesses): 

2058 row_ind = 0 

2059 for j, vu_id in enumerate(self.variation_unit_ids): 

2060 if vu_id not in substantive_variation_unit_ids_set: 

2061 continue 

2062 rdg_support = self.readings_by_witness[wit.id][j] 

2063 # Populate a list of nonzero coefficients for this reading support vector: 

2064 rdg_inds = [k for k, w in enumerate(rdg_support) if w > 0] 

2065 # If this list does not consist of exactly one reading, then treat it as missing data: 

2066 if len(rdg_inds) != 1: 

2067 long_table_list.append([wit.id, vu_id, missing_symbol, missing_symbol]) 

2068 continue 

2069 k = rdg_inds[0] 

2070 rdg_text = reading_texts_by_indices[(j, k)] 

2071 # Replace empty reading texts with the omission placeholder: 

2072 if rdg_text == "": 

2073 rdg_text = "om." 

2074 long_table_list.append([wit.id, vu_id, k, rdg_text]) 

2075 # Then convert the long table entries list to a NumPy array: 

2076 long_table = np.array(long_table_list) 

2077 return long_table, column_labels 

2078 

2079 def to_dataframe( 

2080 self, 

2081 drop_constant: bool = False, 

2082 ambiguous_as_missing: bool = False, 

2083 proportion: bool = False, 

2084 table_type: TableType = TableType.matrix, 

2085 split_missing: bool = True, 

2086 show_ext: bool = False, 

2087 ): 

2088 """Returns this Collation in the form of a Pandas DataFrame array, including the appropriate row and column labels. 

2089 

2090 Args: 

2091 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2092 Default value is False. 

2093 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data. 

2094 Default value is False. 

2095 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2096 Default value is False. 

2097 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate. 

2098 Only applicable for tabular outputs. 

2099 Default value is "matrix". 

2100 split_missing: An optional flag indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings; 

2101 if False, then missing data is ignored (i.e., all states are 0). 

2102 Default value is True. 

2103 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2104 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2105 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2106 Default value is False. 

2107 

2108 Returns: 

2109 A Pandas DataFrame corresponding to a collation matrix with reading frequencies or a long table with discrete reading states. 

2110 """ 

2111 df = None 

2112 # Proceed based on the table type: 

2113 if table_type == TableType.matrix: 

2114 # Convert the collation to a NumPy array and get its row and column labels first: 

2115 matrix, reading_labels, witness_labels = self.to_numpy( 

2116 drop_constant=drop_constant, split_missing=split_missing 

2117 ) 

2118 df = pd.DataFrame(matrix, index=reading_labels, columns=witness_labels) 

2119 elif table_type == TableType.distance: 

2120 # Convert the collation to a NumPy array and get its row and column labels first: 

2121 matrix, witness_labels = self.to_distance_matrix( 

2122 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2123 ) 

2124 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2125 elif table_type == TableType.similarity: 

2126 # Convert the collation to a NumPy array and get its row and column labels first: 

2127 matrix, witness_labels = self.to_similarity_matrix( 

2128 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2129 ) 

2130 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2131 elif table_type == TableType.nexus: 

2132 # Convert the collation to a NumPy array and get its row and column labels first: 

2133 matrix, witness_labels, vu_labels = self.to_nexus_table( 

2134 drop_constant=drop_constant, ambiguous_as_missing=ambiguous_as_missing 

2135 ) 

2136 df = pd.DataFrame(matrix, index=witness_labels, columns=vu_labels) 

2137 elif table_type == TableType.long: 

2138 # Convert the collation to a long table and get its column labels first: 

2139 long_table, column_labels = self.to_long_table(drop_constant=drop_constant) 

2140 df = pd.DataFrame(long_table, columns=column_labels) 

2141 return df 

2142 

2143 def to_csv( 

2144 self, 

2145 file_addr: Union[Path, str], 

2146 drop_constant: bool = False, 

2147 ambiguous_as_missing: bool = False, 

2148 proportion: bool = False, 

2149 table_type: TableType = TableType.matrix, 

2150 split_missing: bool = True, 

2151 show_ext: bool = False, 

2152 **kwargs 

2153 ): 

2154 """Writes this Collation to a comma-separated value (CSV) file with the given address. 

2155 

2156 If your witness IDs are numeric (e.g., Gregory-Aland numbers), then they will be written in full to the CSV file, but Excel will likely interpret them as numbers and truncate any leading zeroes! 

2157 

2158 Args: 

2159 file_addr: A string representing the path to an output CSV file; the file type should be .csv. 

2160 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2161 Default value is False. 

2162 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data. 

2163 Default value is False. 

2164 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2165 Default value is False. 

2166 table_type: A TableType option indicating which type of tabular output to generate. 

2167 Only applicable for tabular outputs. 

2168 Default value is "matrix". 

2169 split_missing: An optional flag indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings; 

2170 if False, then missing data is ignored (i.e., all states are 0). 

2171 Default value is True. 

2172 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2173 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2174 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2175 Default value is False. 

2176 **kwargs: Keyword arguments for pandas.DataFrame.to_csv. 

2177 """ 

2178 # Convert the collation to a Pandas DataFrame first: 

2179 df = self.to_dataframe( 

2180 drop_constant=drop_constant, 

2181 ambiguous_as_missing=ambiguous_as_missing, 

2182 proportion=proportion, 

2183 table_type=table_type, 

2184 split_missing=split_missing, 

2185 show_ext=show_ext, 

2186 ) 

2187 # Generate all parent folders for this file that don't already exist: 

2188 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

2189 # Proceed based on the table type: 

2190 if table_type == TableType.long: 

2191 return df.to_csv( 

2192 file_addr, encoding="utf-8-sig", index=False, **kwargs 

2193 ) # add BOM to start of file so that Excel will know to read it as Unicode 

2194 return df.to_csv( 

2195 file_addr, encoding="utf-8-sig", **kwargs 

2196 ) # add BOM to start of file so that Excel will know to read it as Unicode 

2197 

2198 def to_excel( 

2199 self, 

2200 file_addr: Union[Path, str], 

2201 drop_constant: bool = False, 

2202 ambiguous_as_missing: bool = False, 

2203 proportion: bool = False, 

2204 table_type: TableType = TableType.matrix, 

2205 split_missing: bool = True, 

2206 show_ext: bool = False, 

2207 ): 

2208 """Writes this Collation to an Excel (.xlsx) file with the given address. 

2209 

2210 Since Pandas is deprecating its support for xlwt, specifying an output in old Excel (.xls) output is not recommended. 

2211 

2212 Args: 

2213 file_addr: A string representing the path to an output Excel file; the file type should be .xlsx. 

2214 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2215 Default value is False. 

2216 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data. 

2217 Default value is False. 

2218 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2219 Default value is False. 

2220 table_type: A TableType option indicating which type of tabular output to generate. 

2221 Only applicable for tabular outputs. 

2222 Default value is "matrix". 

2223 split_missing: An optional flag indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings; 

2224 if False, then missing data is ignored (i.e., all states are 0). 

2225 Default value is True. 

2226 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2227 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2228 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2229 Default value is False. 

2230 """ 

2231 # Convert the collation to a Pandas DataFrame first: 

2232 df = self.to_dataframe( 

2233 drop_constant=drop_constant, 

2234 ambiguous_as_missing=ambiguous_as_missing, 

2235 proportion=proportion, 

2236 table_type=table_type, 

2237 split_missing=split_missing, 

2238 show_ext=show_ext, 

2239 ) 

2240 # Generate all parent folders for this file that don't already exist: 

2241 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

2242 # Proceed based on the table type: 

2243 if table_type == TableType.long: 

2244 return df.to_excel(file_addr, index=False) 

2245 return df.to_excel(file_addr) 

2246 

2247 def to_phylip_matrix( 

2248 self, 

2249 file_addr: Union[Path, str], 

2250 drop_constant: bool = False, 

2251 proportion: bool = False, 

2252 table_type: TableType = TableType.distance, 

2253 show_ext: bool = False, 

2254 ): 

2255 """Writes this Collation as a PHYLIP-formatted distance/similarity matrix to the file with the given address. 

2256 

2257 Args: 

2258 file_addr: A string representing the path to an output PHYLIP file; the file type should be .ph or .phy. 

2259 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2260 Default value is False. 

2261 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2262 Default value is False. 

2263 table_type: A TableType option indicating which type of tabular output to generate. 

2264 For PHYLIP-formatted outputs, distance and similarity matrices are the only supported table types. 

2265 Default value is "distance". 

2266 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2267 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2268 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2269 Default value is False. 

2270 """ 

2271 # Convert the collation to a Pandas DataFrame first: 

2272 matrix = None 

2273 witness_labels = [] 

2274 # Proceed based on the table type: 

2275 if table_type == TableType.distance: 

2276 # Convert the collation to a NumPy array and get its row and column labels first: 

2277 matrix, witness_labels = self.to_distance_matrix( 

2278 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2279 ) 

2280 elif table_type == TableType.similarity: 

2281 # Convert the collation to a NumPy array and get its row and column labels first: 

2282 matrix, witness_labels = self.to_similarity_matrix( 

2283 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2284 ) 

2285 # Generate all parent folders for this file that don't already exist: 

2286 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

2287 with open(file_addr, "w", encoding="utf-8") as f: 

2288 # The first line contains the number of taxa: 

2289 f.write("%d\n" % len(witness_labels)) 

2290 # Every subsequent line contains a witness label, followed by the values in its row of the matrix: 

2291 for i, wit_id in enumerate(witness_labels): 

2292 wit_label = slugify(wit_id, lowercase=False, allow_unicode=True, separator='_') 

2293 f.write("%s %s\n" % (wit_label, " ".join([str(v) for v in matrix[i]]))) 

2294 return 

2295 

2296 def get_stemma_symbols(self): 

2297 """Returns a list of one-character symbols needed to represent the states of all substantive readings in stemma format. 

2298 

2299 The number of symbols equals the maximum number of substantive readings at any variation unit. 

2300 

2301 Returns: 

2302 A list of individual characters representing states in readings. 

2303 """ 

2304 possible_symbols = ( 

2305 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase) 

2306 ) # NOTE: the maximum number of symbols allowed in stemma format (other than "?" and "-") is 62 

2307 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

2308 nsymbols = 0 

2309 # If there are no witnesses, then no symbols are needed at all: 

2310 if len(self.witnesses) == 0: 

2311 return [] 

2312 wit_id = self.witnesses[0].id 

2313 for rdg_support in self.readings_by_witness[wit_id]: 

2314 nsymbols = max(nsymbols, len(rdg_support)) 

2315 stemma_symbols = possible_symbols[:nsymbols] 

2316 return stemma_symbols 

2317 

2318 def to_stemma(self, file_addr: Union[Path, str]): 

2319 """Writes this Collation to a stemma file without an extension and a Chron file (containing low, middle, and high dates for all witnesses) without an extension. 

2320 

2321 Since this format does not support ambiguous states, all reading vectors with anything other than one nonzero entry will be interpreted as lacunose. 

2322 If an interpGrp for weights is specified in the TEI XML collation, then the weights for the interp elements will be used as weights 

2323 for the variation units that specify them in their ana attribute. 

2324 

2325 Args: 

2326 file_addr: A string representing the path to an output stemma prep file; the file should have no extension. 

2327 The accompanying chron file will match this file name, except that it will have "_chron" appended to the end. 

2328 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2329 """ 

2330 # Populate a list of sites that will correspond to columns of the sequence alignment 

2331 # (by default, constant sites are dropped): 

2332 substantive_variation_unit_ids = [ 

2333 vu_id for vu_id in self.variation_unit_ids if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2334 ] 

2335 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2336 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2337 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary 

2338 # to the readings' texts: 

2339 reading_texts_by_indices = {} 

2340 for j, vu in enumerate(self.variation_units): 

2341 if vu.id not in substantive_variation_unit_ids_set: 

2342 continue 

2343 k = 0 

2344 for rdg in vu.readings: 

2345 key = tuple([vu.id, rdg.id]) 

2346 if key not in substantive_variation_unit_reading_tuples_set: 

2347 continue 

2348 indices = tuple([j, k]) 

2349 reading_texts_by_indices[indices] = rdg.text 

2350 k += 1 

2351 # In a second pass, populate another dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary 

2352 # to the witnesses exclusively supporting those readings: 

2353 reading_wits_by_indices = {} 

2354 for indices in reading_texts_by_indices: 

2355 reading_wits_by_indices[indices] = [] 

2356 for i, wit in enumerate(self.witnesses): 

2357 for j, vu_id in enumerate(self.variation_unit_ids): 

2358 if vu_id not in substantive_variation_unit_ids_set: 

2359 continue 

2360 rdg_support = self.readings_by_witness[wit.id][j] 

2361 # If this witness does not exclusively support exactly one reading at this unit, then treat it as lacunose: 

2362 if len([k for k, w in enumerate(rdg_support) if w > 0]) != 1: 

2363 continue 

2364 k = rdg_support.index(1) 

2365 indices = tuple([j, k]) 

2366 reading_wits_by_indices[indices].append(wit.id) 

2367 # In a third pass, write to the stemma file: 

2368 symbols = self.get_stemma_symbols() 

2369 Path(file_addr).parent.mkdir( 

2370 parents=True, exist_ok=True 

2371 ) # generate all parent folders for this file that don't already exist 

2372 chron_file_addr = str(file_addr) + "_chron" 

2373 with open(file_addr, "w", encoding="utf-8") as f: 

2374 # Start with the witness list: 

2375 f.write( 

2376 "* %s ;\n\n" 

2377 % " ".join( 

2378 [slugify(wit.id, lowercase=False, allow_unicode=True, separator='_') for wit in self.witnesses] 

2379 ) 

2380 ) 

2381 # f.write("^ %s\n\n" % chron_file_addr) #write the relative path to the chron file 

2382 f.write( 

2383 "^ %s\n\n" % ("." + os.sep + Path(chron_file_addr).name) 

2384 ) # write the relative path to the chron file 

2385 # Then add a line indicating that all witnesses are lacunose unless they are specified explicitly: 

2386 f.write("= $? $* ;\n\n") 

2387 # Then proceed for each variation unit: 

2388 for j, vu_id in enumerate(self.variation_unit_ids): 

2389 if vu_id not in substantive_variation_unit_ids_set: 

2390 continue 

2391 # Print the variation unit ID first: 

2392 f.write("@ %s\n" % vu_id) 

2393 # In a first pass, print the texts of all readings enclosed in brackets: 

2394 f.write("[ ") 

2395 k = 0 

2396 while True: 

2397 indices = tuple([j, k]) 

2398 if indices not in reading_texts_by_indices: 

2399 break 

2400 text = slugify( 

2401 reading_texts_by_indices[indices], lowercase=False, allow_unicode=True, separator='.' 

2402 ) 

2403 # Denote omissions by en-dashes: 

2404 if text == "": 

2405 text = "\u2013" 

2406 # The first reading should not be preceded by anything: 

2407 if k == 0: 

2408 f.write(text) 

2409 f.write(" |") 

2410 # Add the weight of this variation unit after the pipe by comparing its analysis categories to their weights: 

2411 weight = 1 

2412 vu = self.variation_units[j] 

2413 if len(vu.analysis_categories) > 0: 

2414 weight = int( 

2415 sum( 

2416 [ 

2417 self.weights_by_id[ana] if ana in self.weights_by_id else 1 

2418 for ana in vu.analysis_categories 

2419 ] 

2420 ) 

2421 / len(vu.analysis_categories) 

2422 ) 

2423 f.write("*%d" % weight) 

2424 # Every subsequent reading should be preceded by a space: 

2425 elif k > 0: 

2426 f.write(" %s" % text) 

2427 k += 1 

2428 f.write(" ]\n") 

2429 # In a second pass, print the indices and witnesses for all readings enclosed in angle brackets: 

2430 k = 0 

2431 f.write("\t< ") 

2432 while True: 

2433 indices = tuple([j, k]) 

2434 if indices not in reading_wits_by_indices: 

2435 break 

2436 rdg_symbol = symbols[k] # get the one-character alphanumeric code for this state 

2437 wits = " ".join(reading_wits_by_indices[indices]) 

2438 # Open the variant reading support block with an angle bracket: 

2439 if k == 0: 

2440 f.write("%s %s" % (rdg_symbol, wits)) 

2441 # Open all subsequent variant reading support blocks with pipes on the next line: 

2442 else: 

2443 f.write("\n\t| %s %s" % (rdg_symbol, wits)) 

2444 k += 1 

2445 f.write(" >\n") 

2446 # In a fourth pass, write to the chron file: 

2447 max_id_length = max( 

2448 [len(slugify(wit.id, lowercase=False, allow_unicode=True, separator='_')) for wit in self.witnesses] 

2449 ) 

2450 max_date_length = 0 

2451 for wit in self.witnesses: 

2452 if wit.date_range[0] is not None: 

2453 max_date_length = max(max_date_length, len(str(wit.date_range[0]))) 

2454 if wit.date_range[1] is not None: 

2455 max_date_length = max(max_date_length, len(str(wit.date_range[1]))) 

2456 # Attempt to get the minimum and maximum dates for witnesses; if we can't do this, then don't write a chron file: 

2457 min_date = None 

2458 max_date = None 

2459 try: 

2460 min_date = min([wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None]) 

2461 max_date = max([wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None]) 

2462 except Exception as e: 

2463 print("WARNING: no witnesses have date ranges; no chron file will be written!") 

2464 return 

2465 with open(chron_file_addr, "w", encoding="utf-8") as f: 

2466 for wit in self.witnesses: 

2467 wit_label = slugify(wit.id, lowercase=False, allow_unicode=True, separator='_') 

2468 f.write(wit_label) 

2469 f.write(" " * (max_id_length - len(wit.id) + 1)) 

2470 # If either the lower bound on this witness's date is empty, then use the min and max dates over all witnesses as defaults: 

2471 date_range = wit.date_range 

2472 if date_range[0] is None: 

2473 date_range = tuple([min_date, date_range[1]]) 

2474 # Then write the date range minimum, average, and maximum to the chron file: 

2475 low_date = str(date_range[0]) 

2476 f.write(" " * (max_date_length - len(low_date) + 2)) 

2477 f.write(low_date) 

2478 avg_date = str(int(((date_range[0] + date_range[1]) / 2))) 

2479 f.write(" " * (max_date_length - len(str(avg_date)) + 2)) 

2480 f.write(avg_date) 

2481 high_date = str(date_range[1]) 

2482 f.write(" " * (max_date_length - len(high_date) + 2)) 

2483 f.write(high_date) 

2484 f.write("\n") 

2485 return 

2486 

2487 def to_file( 

2488 self, 

2489 file_addr: Union[Path, str], 

2490 format: Format = None, 

2491 drop_constant: bool = False, 

2492 split_missing: bool = True, 

2493 char_state_labels: bool = True, 

2494 frequency: bool = False, 

2495 ambiguous_as_missing: bool = False, 

2496 proportion: bool = False, 

2497 calibrate_dates: bool = False, 

2498 mrbayes: bool = False, 

2499 clock_model: ClockModel = ClockModel.strict, 

2500 ancestral_logger: AncestralLogger = AncestralLogger.state, 

2501 table_type: TableType = TableType.matrix, 

2502 show_ext: bool = False, 

2503 seed: int = None, 

2504 ): 

2505 """Writes this Collation to the file with the given address. 

2506 

2507 Args: 

2508 file_addr (Union[Path, str]): The path to the output file. 

2509 format (Format, optional): The desired output format. 

2510 If None then it is infered from the file suffix. 

2511 Defaults to None. 

2512 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2513 Default value is False. 

2514 split_missing (bool, optional): An optional flag indicating whether to treat 

2515 missing characters/variation units as having a contribution of 1 split over all states/readings; 

2516 if False, then missing data is ignored (i.e., all states are 0). 

2517 Not applicable for NEXUS, HENNIG86, PHYLIP, FASTA, or STEMMA format. 

2518 Default value is True. 

2519 char_state_labels (bool, optional): An optional flag indicating whether to print 

2520 the CharStateLabels block in NEXUS output. 

2521 Default value is True. 

2522 frequency (bool, optional): An optional flag indicating whether to use the StatesFormat=Frequency setting 

2523 instead of the StatesFormat=StatesPresent setting 

2524 (and thus represent all states with frequency vectors rather than symbols) 

2525 in NEXUS output. 

2526 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation. 

2527 Default value is False. 

2528 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data. 

2529 If this flag is set, then only base symbols will be generated for the NEXUS file. 

2530 It is only applied if the frequency option is False. 

2531 Default value is False. 

2532 proportion (bool, optional): An optional flag indicating whether to populate a distance matrix's cells 

2533 with a proportion of disagreements to variation units where both witnesses are extant. 

2534 It is only applied if the table_type option is "distance". 

2535 Default value is False. 

2536 calibrate_dates (bool, optional): An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses 

2537 in NEXUS output. 

2538 This option is intended for inputs to BEAST 2. 

2539 Default value is False. 

2540 mrbayes (bool, optional): An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses 

2541 in NEXUS output. 

2542 This option is intended for inputs to MrBayes. 

2543 Default value is False. 

2544 clock_model (ClockModel, optional): A ClockModel option indicating which type of clock model to use. 

2545 This option is intended for inputs to MrBayes and BEAST 2. 

2546 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified. 

2547 Default value is "strict". 

2548 ancestral_logger (AncestralLogger, optional): An AncestralLogger option indicating which class of logger (if any) to use for ancestral states. 

2549 This option is intended for inputs to BEAST 2. 

2550 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate. 

2551 Only applicable for tabular outputs and PHYLIP outputs. 

2552 If the output is a PHYLIP file, then the type of tabular output must be "distance" or "similarity"; otherwise, it will be ignored. 

2553 Default value is "matrix". 

2554 show_ext (bool, optional): An optional flag indicating whether each cell in a distance or similarity matrix 

2555 should include the number of variation units where both witnesses are extant after the number of their disagreements/agreements. 

2556 Only applicable for tabular output formats of type "distance" or "similarity". 

2557 Default value is False. 

2558 seed (optional, int): A seed for random number generation (for setting initial values of unspecified transcriptional rates in BEAST 2 XML output). 

2559 """ 

2560 file_addr = Path(file_addr) 

2561 format = format or Format.infer( 

2562 file_addr.suffix 

2563 ) # an exception will be raised here if the format or suffix is invalid 

2564 

2565 if format == Format.NEXUS: 

2566 return self.to_nexus( 

2567 file_addr, 

2568 drop_constant=drop_constant, 

2569 char_state_labels=char_state_labels, 

2570 frequency=frequency, 

2571 ambiguous_as_missing=ambiguous_as_missing, 

2572 calibrate_dates=calibrate_dates, 

2573 mrbayes=mrbayes, 

2574 clock_model=clock_model, 

2575 ) 

2576 

2577 if format == format.HENNIG86: 

2578 return self.to_hennig86(file_addr, drop_constant=drop_constant) 

2579 

2580 if format == format.PHYLIP: 

2581 if table_type in [TableType.distance, TableType.similarity]: 

2582 return self.to_phylip_matrix( 

2583 file_addr, 

2584 drop_constant=drop_constant, 

2585 proportion=proportion, 

2586 table_type=table_type, 

2587 show_ext=show_ext, 

2588 ) 

2589 return self.to_phylip(file_addr, drop_constant=drop_constant) 

2590 

2591 if format == format.FASTA: 

2592 return self.to_fasta(file_addr, drop_constant=drop_constant) 

2593 

2594 if format == format.BEAST: 

2595 return self.to_beast( 

2596 file_addr, 

2597 drop_constant=drop_constant, 

2598 clock_model=clock_model, 

2599 ancestral_logger=ancestral_logger, 

2600 seed=seed, 

2601 ) 

2602 

2603 if format == Format.CSV: 

2604 return self.to_csv( 

2605 file_addr, 

2606 drop_constant=drop_constant, 

2607 ambiguous_as_missing=ambiguous_as_missing, 

2608 proportion=proportion, 

2609 table_type=table_type, 

2610 split_missing=split_missing, 

2611 show_ext=show_ext, 

2612 ) 

2613 

2614 if format == Format.TSV: 

2615 return self.to_csv( 

2616 file_addr, 

2617 drop_constant=drop_constant, 

2618 ambiguous_as_missing=ambiguous_as_missing, 

2619 proportion=proportion, 

2620 table_type=table_type, 

2621 split_missing=split_missing, 

2622 show_ext=show_ext, 

2623 sep="\t", 

2624 ) 

2625 

2626 if format == Format.EXCEL: 

2627 return self.to_excel( 

2628 file_addr, 

2629 drop_constant=drop_constant, 

2630 ambiguous_as_missing=ambiguous_as_missing, 

2631 proportion=proportion, 

2632 table_type=table_type, 

2633 split_missing=split_missing, 

2634 show_ext=show_ext, 

2635 ) 

2636 

2637 if format == Format.STEMMA: 

2638 return self.to_stemma(file_addr)