Coverage for teiphy/collation.py: 99.94%

1649 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 16:57 +0000

1#!/usr/bin/env python3 

2 

3from enum import Enum 

4from typing import List, Union 

5import os 

6from pathlib import Path 

7from datetime import datetime # for calculating the current year (for dating and tree height purposes) 

8import math # for special functions 

9import time # to time calculations for users 

10import string # for easy retrieval of character ranges 

11from lxml import etree as et # for reading TEI XML inputs 

12import numpy as np # for random number sampling and collation matrix outputs 

13import pandas as pd # for writing to DataFrames, CSV, Excel, etc. 

14from slugify import slugify # for converting Unicode text from readings to ASCII for NEXUS 

15from jinja2 import Environment, PackageLoader, select_autoescape # for filling output XML templates 

16 

17from .common import xml_ns, tei_ns 

18from .format import Format 

19from .witness import Witness 

20from .variation_unit import VariationUnit 

21 

22 

23class ParsingException(Exception): 

24 pass 

25 

26 

27class WitnessDateException(Exception): 

28 pass 

29 

30 

31class IntrinsicRelationsException(Exception): 

32 pass 

33 

34 

35class ClockModel(str, Enum): 

36 strict = "strict" 

37 uncorrelated = "uncorrelated" 

38 local = "local" 

39 

40 

41class AncestralLogger(str, Enum): 

42 state = "state" 

43 sequence = "sequence" 

44 none = "none" 

45 

46 

47class TableType(str, Enum): 

48 matrix = "matrix" 

49 distance = "distance" 

50 similarity = "similarity" 

51 idf = "idf" 

52 mean_idf = "mean-idf" 

53 mi = "mi" 

54 mean_mi = "mean-mi" 

55 nexus = "nexus" 

56 long = "long" 

57 

58 

59class SplitMissingType(str, Enum): 

60 uniform = "uniform" 

61 proportional = "proportional" 

62 

63 

64class Collation: 

65 """Base class for storing TEI XML collation data internally. 

66 

67 This corresponds to the entire XML tree, rooted at the TEI element of the collation. 

68 

69 Attributes: 

70 manuscript_suffixes: A list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses. 

71 trivial_reading_types: A set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading. 

72 missing_reading_types: A set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data. 

73 fill_corrector_lacunae: A boolean flag indicating whether or not to fill "lacunae" in witnesses with type "corrector". 

74 fragmentary_threshold: A float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation. 

75 witnesses: A list of Witness instances contained in this Collation. 

76 witness_index_by_id: A dictionary mapping base witness ID strings to their int indices in the witnesses list. 

77 variation_units: A list of VariationUnit instances contained in this Collation. 

78 readings_by_witness: A dictionary mapping base witness ID strings to lists of reading support coefficients for all units (with at least two substantive readings). 

79 substantive_variation_unit_ids: A list of ID strings for variation units with two or more substantive readings. 

80 substantive_variation_unit_reading_tuples: A list of (variation unit ID, reading ID) tuples for substantive readings. 

81 verbose: A boolean flag indicating whether or not to print timing and debugging details for the user. 

82 """ 

83 

84 def __init__( 

85 self, 

86 xml: et.ElementTree, 

87 manuscript_suffixes: List[str] = [], 

88 trivial_reading_types: List[str] = [], 

89 missing_reading_types: List[str] = [], 

90 fill_corrector_lacunae: bool = False, 

91 fragmentary_threshold: float = None, 

92 dates_file: Union[Path, str] = None, 

93 verbose: bool = False, 

94 ): 

95 """Constructs a new Collation instance with the given settings. 

96 

97 Args: 

98 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

99 manuscript_suffixes: An optional list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses. 

100 trivial_reading_types: An optional set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading. 

101 missing_reading_types: An optional set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data. 

102 fill_corrector_lacunae: An optional flag indicating whether or not to fill "lacunae" in witnesses with type "corrector". 

103 fragmentary_threshold: An optional float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation. 

104 dates_file: An optional path to a CSV file containing witness IDs, minimum dates, and maximum dates. If specified, then for all witnesses in the first column, any existing date ranges for them in the TEI XML collation will be ignored. 

105 verbose: An optional flag indicating whether or not to print timing and debugging details for the user. 

106 """ 

107 self.manuscript_suffixes = manuscript_suffixes 

108 self.trivial_reading_types = set(trivial_reading_types) 

109 self.missing_reading_types = set(missing_reading_types) 

110 self.fill_corrector_lacunae = fill_corrector_lacunae 

111 self.fragmentary_threshold = fragmentary_threshold 

112 self.verbose = verbose 

113 self.witnesses = [] 

114 self.witness_index_by_id = {} 

115 self.variation_units = [] 

116 self.readings_by_witness = {} 

117 self.variation_unit_ids = [] 

118 self.substantive_variation_unit_reading_tuples = [] 

119 self.substantive_readings_by_variation_unit_id = {} 

120 self.weight_categories = [] 

121 self.weights_by_id = {} 

122 self.intrinsic_categories = [] 

123 self.intrinsic_odds_by_id = {} 

124 self.transcriptional_categories = [] 

125 self.transcriptional_rates_by_id = {} 

126 self.origin_date_range = [] 

127 # Now parse the XML tree to populate these data structures: 

128 if self.verbose: 

129 print("Initializing collation...") 

130 t0 = time.time() 

131 self.parse_origin_date_range(xml) 

132 self.parse_list_wit(xml) 

133 self.validate_wits(xml) 

134 # If a dates file was specified, then update the witness date ranges manually: 

135 if dates_file is not None: 

136 self.update_witness_date_ranges_from_dates_file(dates_file) 

137 # If the upper bound on a work's date of origin is not defined, then attempt to assign it an upper bound based on the witness dates; 

138 # otherwise, attempt to assign lower bounds to witness dates based on it: 

139 if self.origin_date_range[1] is None: 

140 self.update_origin_date_range_from_witness_date_ranges() 

141 else: 

142 self.update_witness_date_ranges_from_origin_date_range() 

143 self.parse_weights(xml) 

144 self.parse_intrinsic_odds(xml) 

145 self.parse_transcriptional_rates(xml) 

146 self.parse_apps(xml) 

147 self.validate_intrinsic_relations() 

148 self.parse_readings_by_witness() 

149 # If a threshold of readings for fragmentary witnesses is specified, then filter the witness list using the dictionary mapping witness IDs to readings: 

150 if self.fragmentary_threshold is not None: 

151 self.filter_fragmentary_witnesses(xml) 

152 t1 = time.time() 

153 if self.verbose: 

154 print("Total time to initialize collation: %0.4fs." % (t1 - t0)) 

155 

156 def parse_origin_date_range(self, xml: et.ElementTree): 

157 """Given an XML tree for a collation, populates this Collation's list of origin date bounds. 

158 

159 Args: 

160 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

161 """ 

162 if self.verbose: 

163 print("Parsing origin date range...") 

164 t0 = time.time() 

165 self.origin_date_range = [None, None] 

166 for date in xml.xpath( 

167 "//tei:sourceDesc//tei:bibl//tei:date|//tei:sourceDesc//tei:biblStruct//tei:date|//tei:sourceDesc//tei:biblFull//tei:date", 

168 namespaces={"tei": tei_ns}, 

169 ): 

170 # Try the @when attribute first; if it is set, then it accounts for both ends of the date range: 

171 if date.get("when") is not None: 

172 self.origin_date_range[0] = int(date.get("when").split("-")[0]) 

173 self.origin_date_range[1] = self.origin_date_range[0] 

174 # Failing that, if it has @from and @to attributes (indicating the period over which the work was completed), 

175 # then the completion date of the work accounts for both ends of the date range: 

176 elif date.get("to") is not None: 

177 self.origin_date_range[0] = int(date.get("to").split("-")[0]) 

178 self.origin_date_range[1] = self.origin_date_range[0] 

179 # Failing that, set lower and upper bounds on the origin date using the the @notBefore and @notAfter attributes: 

180 elif date.get("notBefore") is not None or date.get("notAfter") is not None: 

181 if date.get("notBefore") is not None: 

182 self.origin_date_range[0] = int(date.get("notBefore").split("-")[0]) 

183 if date.get("notAfter") is not None: 

184 self.origin_date_range[1] = int(date.get("notAfter").split("-")[0]) 

185 return 

186 

187 def get_base_wit(self, wit: str): 

188 """Given a witness siglum, strips of the specified manuscript suffixes until the siglum matches one in the witness list or until no more suffixes can be stripped. 

189 

190 Args: 

191 wit: A string representing a witness siglum, potentially including suffixes to be stripped. 

192 """ 

193 base_wit = wit 

194 # If our starting siglum corresponds to a siglum in the witness list, then just return it: 

195 if base_wit in self.witness_index_by_id: 

196 return base_wit 

197 # Otherwise, strip any suffixes we find until the siglum corresponds to a base witness in the list 

198 # or no more suffixes can be stripped: 

199 suffix_found = True 

200 while suffix_found: 

201 suffix_found = False 

202 for suffix in self.manuscript_suffixes: 

203 if base_wit.endswith(suffix): 

204 suffix_found = True 

205 base_wit = base_wit[: -len(suffix)] 

206 break # stop looking for other suffixes 

207 # If the siglum stripped of this suffix now corresponds to a siglum in the witness list, then return it: 

208 if base_wit in self.witness_index_by_id: 

209 return base_wit 

210 # If we get here, then all possible manuscript suffixes have been stripped, and the resulting siglum does not correspond to a siglum in the witness list: 

211 return base_wit 

212 

213 def parse_list_wit(self, xml: et.ElementTree): 

214 """Given an XML tree for a collation, populates its list of witnesses from its listWit element. 

215 If the XML tree does not contain a listWit element, then a ParsingException is thrown listing all distinct witness sigla encountered in the collation. 

216 

217 Args: 

218 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

219 """ 

220 if self.verbose: 

221 print("Parsing witness list...") 

222 t0 = time.time() 

223 self.witnesses = [] 

224 self.witness_index_by_id = {} 

225 list_wits = xml.xpath("/tei:TEI//tei:listWit", namespaces={"tei": tei_ns}) 

226 if len(list_wits) == 0: 

227 # There is no listWit element: collect all distinct witness sigla in the collation and raise a ParsingException listing them: 

228 distinct_sigla = set() 

229 sigla = [] 

230 # Proceed for each rdg, rdgGrp, or witDetail element: 

231 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}): 

232 wit_str = rdg.get("wit") if rdg.get("wit") is not None else "" 

233 wits = wit_str.split() 

234 for wit in wits: 

235 siglum = wit.strip("#") # remove the URI prefix, if there is one 

236 if siglum not in distinct_sigla: 

237 distinct_sigla.add(siglum) 

238 sigla.append(siglum) 

239 sigla.sort() 

240 msg = "" 

241 msg += "An explicit listWit element must be included in the TEI XML collation.\n" 

242 msg += "The following sigla occur in the collation and should be included as the @xml:id or @n attributes of witness elements under the listWit element:\n" 

243 msg += ", ".join(sigla) 

244 raise ParsingException(msg) 

245 # Otherwise, take the first listWit element as the list of all witnesses and process it: 

246 list_wit = list_wits[0] 

247 for witness in list_wit.xpath("./tei:witness", namespaces={"tei": tei_ns}): 

248 wit = Witness(witness, self.verbose) 

249 self.witness_index_by_id[wit.id] = len(self.witnesses) 

250 self.witnesses.append(wit) 

251 t1 = time.time() 

252 if self.verbose: 

253 print("Finished processing %d witnesses in %0.4fs." % (len(self.witnesses), t1 - t0)) 

254 return 

255 

256 def validate_wits(self, xml: et.ElementTree): 

257 """Given an XML tree for a collation, checks if any witness sigla listed in a rdg, rdgGrp, or witDetail element, 

258 once stripped of ignored suffixes, is not found in the witness list. 

259 A warning will be issued for each distinct siglum like this. 

260 This method also checks if the upper bound of any witness's date is earlier than the lower bound on the collated work's date of origin 

261 and throws an exception if so. 

262 

263 Args: 

264 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

265 """ 

266 if self.verbose: 

267 print("Validating witness list against collation...") 

268 t0 = time.time() 

269 # There is no listWit element: collect all distinct witness sigla in the collation and raise an exception listing them: 

270 distinct_extra_sigla = set() 

271 extra_sigla = [] 

272 # Proceed for each rdg, rdgGrp, or witDetail element: 

273 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}): 

274 wit_str = rdg.get("wit") if rdg.get("wit") is not None else "" 

275 wits = wit_str.split() 

276 for wit in wits: 

277 siglum = wit.strip("#") # remove the URI prefix, if there is one 

278 base_siglum = self.get_base_wit(siglum) 

279 if base_siglum not in self.witness_index_by_id: 

280 if base_siglum not in distinct_extra_sigla: 

281 distinct_extra_sigla.add(base_siglum) 

282 extra_sigla.append(base_siglum) 

283 if len(extra_sigla) > 0: 

284 extra_sigla.sort() 

285 msg = "" 

286 msg += "WARNING: The following sigla occur in the collation that do not have corresponding witness entries in the listWit:\n" 

287 msg += ", ".join(extra_sigla) 

288 print(msg) 

289 # If the lower bound on the date of origin is defined, then check each witness against it: 

290 if self.origin_date_range[0] is not None: 

291 bad_date_witness_sigla = [] 

292 bad_date_upper_bounds_by_witness = {} 

293 for i, wit in enumerate(self.witnesses): 

294 if wit.date_range[1] is not None and wit.date_range[1] < self.origin_date_range[0]: 

295 bad_date_witness_sigla.append(wit.id) 

296 bad_date_upper_bounds_by_witness[wit.id] = wit.date_range[1] 

297 if len(bad_date_witness_sigla) > 0: 

298 msg = "" 

299 msg += "The following witnesses have their latest possible dates before the earliest date of origin %d specified for the collated work:\n" 

300 msg += ", ".join( 

301 [ 

302 (siglum + "(" + str(bad_date_upper_bounds_by_witness[siglum]) + ")") 

303 for siglum in bad_date_witness_sigla 

304 ] 

305 ) 

306 raise WitnessDateException(msg) 

307 t1 = time.time() 

308 if self.verbose: 

309 print("Finished witness validation in %0.4fs." % (t1 - t0)) 

310 return 

311 

312 def update_witness_date_ranges_from_dates_file(self, dates_file: Union[Path, str]): 

313 """Given a CSV-formatted dates file, update the date ranges of all witnesses whose IDs are in the first column of the dates file 

314 (overwriting existing date ranges if necessary). 

315 """ 

316 if self.verbose: 

317 print("Updating witness dates from file %s..." % (str(dates_file))) 

318 t0 = time.time() 

319 dates_df = pd.read_csv(dates_file, index_col=0, names=["id", "min", "max"]) 

320 for witness in self.witnesses: 

321 wit_id = witness.id 

322 if wit_id in dates_df.index: 

323 # For every witness in the list whose ID is specified in the dates file, 

324 # update their date ranges (as long as the date ranges in the file are are well-formed): 

325 min_date = int(dates_df.loc[wit_id]["min"]) if not np.isnan(dates_df.loc[wit_id]["min"]) else None 

326 max_date = ( 

327 int(dates_df.loc[wit_id]["max"]) 

328 if not np.isnan(dates_df.loc[wit_id]["max"]) 

329 else datetime.now().year 

330 ) 

331 print(min_date, max_date) 

332 if min_date is not None and max_date is not None and min_date > max_date: 

333 raise ParsingException( 

334 "In dates file %s, for witness ID %s, the minimum date %d is greater than the maximum date %d." 

335 % (str(dates_file), wit_id, min_date, max_date) 

336 ) 

337 witness.date_range = [min_date, max_date] 

338 t1 = time.time() 

339 if self.verbose: 

340 print("Finished witness date range updates in %0.4fs." % (t1 - t0)) 

341 return 

342 

343 def update_origin_date_range_from_witness_date_ranges(self): 

344 """Conditionally updates the upper bound on the date of origin of the work represented by this Collation 

345 based on the bounds on the witnesses' dates. 

346 If none of the witnesses have bounds on their dates, then nothing is done. 

347 This method is only invoked if the work's date of origin does not already have its upper bound defined. 

348 """ 

349 if self.verbose: 

350 print("Updating upper bound on origin date using witness dates...") 

351 t0 = time.time() 

352 # Set the origin date to the earliest witness date: 

353 witness_date_lower_bounds = [wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None] 

354 witness_date_upper_bounds = [wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None] 

355 min_witness_date = ( 

356 min(witness_date_lower_bounds + witness_date_upper_bounds) 

357 if len(witness_date_lower_bounds + witness_date_upper_bounds) > 0 

358 else None 

359 ) 

360 if min_witness_date is not None: 

361 self.origin_date_range[1] = ( 

362 min(self.origin_date_range[1], min_witness_date) 

363 if self.origin_date_range[1] is not None 

364 else min_witness_date 

365 ) 

366 t1 = time.time() 

367 if self.verbose: 

368 print("Finished updating upper bound on origin date in %0.4fs." % (t1 - t0)) 

369 return 

370 

371 def update_witness_date_ranges_from_origin_date_range(self): 

372 """Attempts to update the lower bounds on the witnesses' dates of origin of the work represented by this Collation 

373 using the upper bound on the date of origin of the work represented by this Collation. 

374 This method is only invoked if the upper bound on the work's date of origin was not already defined 

375 (i.e., if update_origin_date_range_from_witness_date_ranges was not invoked). 

376 """ 

377 if self.verbose: 

378 print("Updating lower bounds on witness dates using origin date...") 

379 t0 = time.time() 

380 # Proceed for every witness: 

381 for i, wit in enumerate(self.witnesses): 

382 # Ensure that the lower bound on this witness's date is no earlier than the upper bound on the date of the work's origin: 

383 wit.date_range[0] = ( 

384 max(wit.date_range[0], self.origin_date_range[1]) 

385 if wit.date_range[0] is not None 

386 else self.origin_date_range[1] 

387 ) 

388 # Then ensure that the upper bound on this witness's date is no earlier than its lower bound, in case we updated it: 

389 wit.date_range[1] = max(wit.date_range[0], wit.date_range[1]) 

390 t1 = time.time() 

391 if self.verbose: 

392 print("Finished updating lower bounds on witness dates in %0.4fs." % (t1 - t0)) 

393 return 

394 

395 def parse_weights(self, xml: et.ElementTree): 

396 """Given an XML tree for a collation, populates this Collation's list of variation unit weight categories 

397 (associated with types of variation that may have different expected frequencies) 

398 and its dictionary mapping these categories to integer weights. 

399 

400 Args: 

401 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

402 """ 

403 if self.verbose: 

404 print("Parsing variation unit weight categories...") 

405 t0 = time.time() 

406 self.weight_categories = [] 

407 self.weights_by_id = {} 

408 for interp in xml.xpath("//tei:interpGrp[@type=\"weight\"]/tei:interp", namespaces={"tei": tei_ns}): 

409 # These must be indexed by the xml:id attribute, so skip any that do not have one: 

410 if interp.get("{%s}id" % xml_ns) is None: 

411 continue 

412 weight_category = interp.get("{%s}id" % xml_ns) 

413 # Retrieve this element's text value and coerce it to an integer, defaulting to 1 if it has none: 

414 weight = 1 

415 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}): 

416 if certainty.get("degree") is not None: 

417 weight = int(certainty.get("degree")) 

418 break 

419 self.weight_categories.append(weight_category) 

420 self.weights_by_id[weight_category] = weight 

421 t1 = time.time() 

422 if self.verbose: 

423 print( 

424 "Finished processing %d variation unit weight categories in %0.4fs." 

425 % (len(self.weight_categories), t1 - t0) 

426 ) 

427 return 

428 

429 def parse_intrinsic_odds(self, xml: et.ElementTree): 

430 """Given an XML tree for a collation, populates this Collation's list of intrinsic probability categories 

431 (e.g., "absolutely more likely," "highly more likely," "more likely," "slightly more likely," "equally likely") 

432 and its dictionary mapping these categories to numerical odds. 

433 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated. 

434 

435 Args: 

436 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

437 """ 

438 if self.verbose: 

439 print("Parsing intrinsic odds categories...") 

440 t0 = time.time() 

441 self.intrinsic_categories = [] 

442 self.intrinsic_odds_by_id = {} 

443 for interp in xml.xpath("//tei:interpGrp[@type=\"intrinsic\"]/tei:interp", namespaces={"tei": tei_ns}): 

444 # These must be indexed by the xml:id attribute, so skip any that do not have one: 

445 if interp.get("{%s}id" % xml_ns) is None: 

446 continue 

447 odds_category = interp.get("{%s}id" % xml_ns) 

448 # If this element contains a certainty subelement with a fixed odds value for this category, then set it: 

449 odds = None 

450 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}): 

451 if certainty.get("degree") is not None: 

452 odds = float(certainty.get("degree")) 

453 break 

454 self.intrinsic_categories.append(odds_category) 

455 self.intrinsic_odds_by_id[odds_category] = odds 

456 t1 = time.time() 

457 if self.verbose: 

458 print( 

459 "Finished processing %d intrinsic odds categories in %0.4fs." 

460 % (len(self.intrinsic_categories), t1 - t0) 

461 ) 

462 return 

463 

464 def parse_transcriptional_rates(self, xml: et.ElementTree): 

465 """Given an XML tree for a collation, populates this Collation's dictionary mapping transcriptional change categories 

466 (e.g., "aural confusion," "visual error," "clarification") to numerical rates. 

467 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated. 

468 

469 Args: 

470 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

471 """ 

472 if self.verbose: 

473 print("Parsing transcriptional change categories...") 

474 t0 = time.time() 

475 self.transcriptional_categories = [] 

476 self.transcriptional_rates_by_id = {} 

477 for interp in xml.xpath("//tei:interpGrp[@type=\"transcriptional\"]/tei:interp", namespaces={"tei": tei_ns}): 

478 # These must be indexed by the xml:id attribute, so skip any that do not have one: 

479 if interp.get("{%s}id" % xml_ns) is None: 

480 continue 

481 transcriptional_category = interp.get("{%s}id" % xml_ns) 

482 # If this element contains a certainty subelement with a fixed rate for this category, then set it: 

483 rate = None 

484 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}): 

485 if certainty.get("degree") is not None: 

486 rate = float(certainty.get("degree")) 

487 break 

488 self.transcriptional_categories.append(transcriptional_category) 

489 self.transcriptional_rates_by_id[transcriptional_category] = rate 

490 t1 = time.time() 

491 if self.verbose: 

492 print( 

493 "Finished processing %d transcriptional change categories in %0.4fs." 

494 % (len(self.transcriptional_rates_by_id), t1 - t0) 

495 ) 

496 return 

497 

498 def validate_intrinsic_relations(self): 

499 """Checks if any VariationUnit's intrinsic_relations map is not a forest. 

500 If any is not, then an IntrinsicRelationsException is thrown describing the VariationUnit at fault. 

501 """ 

502 if self.verbose: 

503 print("Validating intrinsic relation graphs for variation units...") 

504 t0 = time.time() 

505 for vu in self.variation_units: 

506 # Skip any variation units with an empty intrinsic_relations map: 

507 if len(vu.intrinsic_relations) == 0: 

508 continue 

509 # For all others, start by identifying all reading IDs that are not related to by some other reading ID: 

510 in_degree_by_reading = {} 

511 for edge in vu.intrinsic_relations: 

512 s = edge[0] 

513 t = edge[1] 

514 if s not in in_degree_by_reading: 

515 in_degree_by_reading[s] = 0 

516 if t not in in_degree_by_reading: 

517 in_degree_by_reading[t] = 0 

518 in_degree_by_reading[t] += 1 

519 # If any reading has more than one relation pointing to it, then the intrinsic relations graph is not a forest: 

520 excessive_in_degree_readings = [ 

521 rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] > 1 

522 ] 

523 if len(excessive_in_degree_readings) > 0: 

524 msg = "" 

525 msg += ( 

526 "In variation unit %s, the following readings have more than one intrinsic relation pointing to them: %s.\n" 

527 % (vu.id, ", ".join(excessive_in_degree_readings)) 

528 ) 

529 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it." 

530 raise IntrinsicRelationsException(msg) 

531 # If every reading has another reading pointing to it, then the intrinsic relations graph contains a cycle and is not a forest: 

532 starting_nodes = [rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] == 0] 

533 if len(starting_nodes) == 0: 

534 msg = "" 

535 msg += "In variation unit %s, the intrinsic relations contain a cycle.\n" % vu.id 

536 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it." 

537 raise IntrinsicRelationsException(msg) 

538 t1 = time.time() 

539 if self.verbose: 

540 print("Finished intrinsic relations validation in %0.4fs." % (t1 - t0)) 

541 return 

542 

543 def parse_apps(self, xml: et.ElementTree): 

544 """Given an XML tree for a collation, populates its list of variation units from its app elements. 

545 

546 Args: 

547 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element. 

548 """ 

549 if self.verbose: 

550 print("Parsing variation units...") 

551 t0 = time.time() 

552 for a in xml.xpath('//tei:app', namespaces={'tei': tei_ns}): 

553 vu = VariationUnit(a, self.verbose) 

554 self.variation_units.append(vu) 

555 t1 = time.time() 

556 if self.verbose: 

557 print("Finished processing %d variation units in %0.4fs." % (len(self.variation_units), t1 - t0)) 

558 return 

559 

560 def get_readings_by_witness_for_unit(self, vu: VariationUnit): 

561 """Returns a dictionary mapping witness IDs to a list of their reading coefficients for a given variation unit. 

562 

563 Args: 

564 vu: A VariationUnit to be processed. 

565 

566 Returns: 

567 A dictionary mapping witness ID strings to a list of their coefficients for all substantive readings in this VariationUnit. 

568 """ 

569 # In a first pass, populate lists of substantive (variation unit ID, reading ID) tuples and reading labels 

570 # and a map from reading IDs to the indices of their parent substantive reading in this unit: 

571 reading_id_to_index = {} 

572 self.substantive_readings_by_variation_unit_id[vu.id] = [] 

573 for rdg in vu.readings: 

574 # If this reading is missing (e.g., lacunose or inapplicable due to an overlapping variant) or targets another reading, then skip it: 

575 if rdg.type in self.missing_reading_types or len(rdg.certainties) > 0: 

576 continue 

577 # If this reading is trivial, then map it to the last substantive index: 

578 if rdg.type in self.trivial_reading_types: 

579 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1 

580 continue 

581 # Otherwise, the reading is substantive: add it to the map and update the last substantive index: 

582 self.substantive_readings_by_variation_unit_id[vu.id].append(rdg.id) 

583 self.substantive_variation_unit_reading_tuples.append(tuple([vu.id, rdg.id])) 

584 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1 

585 # If the list of substantive readings only contains one entry, then this variation unit is not informative; 

586 # return an empty dictionary and add nothing to the list of substantive reading labels: 

587 if self.verbose: 

588 print( 

589 "Variation unit %s has %d substantive readings." 

590 % (vu.id, len(self.substantive_readings_by_variation_unit_id[vu.id])) 

591 ) 

592 readings_by_witness_for_unit = {} 

593 # Initialize the output dictionary with empty sets for all base witnesses: 

594 for wit in self.witnesses: 

595 readings_by_witness_for_unit[wit.id] = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id]) 

596 # In a second pass, assign each base witness a set containing the readings it supports in this unit: 

597 for rdg in vu.readings: 

598 # Initialize the dictionary indicating support for this reading (or its disambiguations): 

599 rdg_support = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id]) 

600 # If this is a missing reading (e.g., a lacuna or an overlap), then we can skip it, as its corresponding set will be empty: 

601 if rdg.type in self.missing_reading_types: 

602 continue 

603 # Otherwise, if this reading is trivial, then it will contain an entry for the index of its parent substantive reading: 

604 elif rdg.type in self.trivial_reading_types: 

605 rdg_support[reading_id_to_index[rdg.id]] = 1 

606 # Otherwise, if this reading has one or more nonzero certainty degrees, 

607 # then set the entries for these readings to their degrees: 

608 elif sum(rdg.certainties.values()) > 0: 

609 for t in rdg.certainties: 

610 # Skip any reading whose ID is unrecognized in this unit: 

611 if t in reading_id_to_index: 

612 rdg_support[reading_id_to_index[t]] = rdg.certainties[t] 

613 # Otherwise, if this reading has one or more targets (i.e., if it is an ambiguous reading), 

614 # then set the entries for each of its targets to 1: 

615 elif len(rdg.targets) > 0: 

616 for t in rdg.targets: 

617 # Skip any reading whose ID is unrecognized in this unit: 

618 if t in reading_id_to_index: 

619 rdg_support[reading_id_to_index[t]] = 1 

620 # Otherwise, this reading is itself substantive; set the entry for the index of this reading to 1: 

621 else: 

622 rdg_support[reading_id_to_index[rdg.id]] = 1 

623 # Proceed for each witness siglum in the support for this reading: 

624 for wit in rdg.wits: 

625 # Is this siglum a base siglum? 

626 base_wit = self.get_base_wit(wit) 

627 if base_wit not in self.witness_index_by_id: 

628 # If it is not, then it is probably just because we've encountered a corrector or some other secondary witness not included in the witness list; 

629 # report this if we're in verbose mode and move on: 

630 if self.verbose: 

631 print( 

632 "Skipping unknown witness siglum %s (base siglum %s) in variation unit %s, reading %s..." 

633 % (wit, base_wit, vu.id, rdg.id) 

634 ) 

635 continue 

636 # If we've found a base siglum, then add this reading's contribution to the base witness's reading set for this unit; 

637 # normally the existing set will be empty, but if we reduce two suffixed sigla to the same base witness, 

638 # then that witness may attest to multiple readings in the same unit: 

639 readings_by_witness_for_unit[base_wit] = [ 

640 (min(readings_by_witness_for_unit[base_wit][i] + rdg_support[i], 1)) 

641 for i in range(len(rdg_support)) 

642 ] 

643 return readings_by_witness_for_unit 

644 

645 def parse_readings_by_witness(self): 

646 """Populates the internal dictionary mapping witness IDs to a list of their reading support sets for all variation units, and then fills the empty reading support sets for witnesses of type "corrector" with the entries of the previous witness.""" 

647 if self.verbose: 

648 print("Populating internal dictionary of witness readings...") 

649 t0 = time.time() 

650 # Initialize the data structures to be populated here: 

651 self.readings_by_witness = {} 

652 self.variation_unit_ids = [] 

653 for wit in self.witnesses: 

654 self.readings_by_witness[wit.id] = [] 

655 # Populate them for each variation unit: 

656 for vu in self.variation_units: 

657 readings_by_witness_for_unit = self.get_readings_by_witness_for_unit(vu) 

658 if len(readings_by_witness_for_unit) > 0: 

659 self.variation_unit_ids.append(vu.id) 

660 for wit in readings_by_witness_for_unit: 

661 self.readings_by_witness[wit].append(readings_by_witness_for_unit[wit]) 

662 # Optionally, fill the lacunae of the correctors: 

663 if self.fill_corrector_lacunae: 

664 for i, wit in enumerate(self.witnesses): 

665 # If this is the first witness, then it shouldn't be a corrector (since there is no previous witness against which to compare it): 

666 if i == 0: 

667 continue 

668 # Otherwise, if this witness is not a corrector, then skip it: 

669 if wit.type != "corrector": 

670 continue 

671 # Otherwise, retrieve the previous witness: 

672 prev_wit = self.witnesses[i - 1] 

673 for j in range(len(self.readings_by_witness[wit.id])): 

674 # If the corrector has no reading, then set it to the previous witness's reading: 

675 if sum(self.readings_by_witness[wit.id][j]) == 0: 

676 self.readings_by_witness[wit.id][j] = self.readings_by_witness[prev_wit.id][j] 

677 t1 = time.time() 

678 if self.verbose: 

679 print( 

680 "Populated dictionary for %d witnesses over %d substantive variation units in %0.4fs." 

681 % (len(self.witnesses), len(self.variation_unit_ids), t1 - t0) 

682 ) 

683 return 

684 

685 def filter_fragmentary_witnesses(self, xml): 

686 """Filters the original witness list and readings by witness dictionary to exclude witnesses whose proportions of extant passages fall below the fragmentary readings threshold.""" 

687 if self.verbose: 

688 print( 

689 "Filtering fragmentary witnesses (extant in < %f of all variation units) out of internal witness list and dictionary of witness readings..." 

690 % self.fragmentary_threshold 

691 ) 

692 t0 = time.time() 

693 fragmentary_witness_set = set() 

694 # Proceed for each witness in order: 

695 for wit in self.witnesses: 

696 wit_id = wit.id 

697 # We count the number of variation units at which this witness has an extant (i.e., non-missing) reading: 

698 extant_reading_count = 0 

699 total_reading_count = len(self.readings_by_witness[wit.id]) 

700 # Proceed through all reading support lists: 

701 for rdg_support in self.readings_by_witness[wit_id]: 

702 # If the current reading support list is not all zeroes, then increment this witness's count of extant readings: 

703 if sum(rdg_support) != 0: 

704 extant_reading_count += 1 

705 # If the proportion of extant readings falls below the threshold, then add this witness to the list of fragmentary witnesses: 

706 if extant_reading_count / total_reading_count < self.fragmentary_threshold: 

707 fragmentary_witness_set.add(wit_id) 

708 # Then filter the witness list to exclude the fragmentary witnesses: 

709 filtered_witnesses = [wit for wit in self.witnesses if wit.id not in fragmentary_witness_set] 

710 self.witnesses = filtered_witnesses 

711 # Then remove the entries for the fragmentary witnesses from the witnesses-to-readings dictionary: 

712 for wit_id in fragmentary_witness_set: 

713 del self.readings_by_witness[wit_id] 

714 t1 = time.time() 

715 if self.verbose: 

716 print( 

717 "Filtered out %d fragmentary witness(es) (%s) in %0.4fs." 

718 % (len(fragmentary_witness_set), str(list(fragmentary_witness_set)), t1 - t0) 

719 ) 

720 return 

721 

722 def get_nexus_symbols(self): 

723 """Returns a list of one-character symbols needed to represent the states of all substantive readings in NEXUS. 

724 

725 The number of symbols equals the maximum number of substantive readings at any variation unit. 

726 

727 Returns: 

728 A list of individual characters representing states in readings. 

729 """ 

730 # NOTE: IQTREE does not appear to support symbols outside of 0-9 and a-z, and its base symbols must be case-insensitive. 

731 # The official version of MrBayes is likewise limited to 32 symbols. 

732 # But PAUP* allows up to 64 symbols, and Andrew Edmondson's fork of MrBayes does, as well. 

733 # So this method will support symbols from 0-9, a-z, and A-Z (for a total of 62 states) 

734 possible_symbols = list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase) 

735 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

736 nsymbols = 0 

737 # If there are no witnesses, then no symbols are needed at all: 

738 if len(self.witnesses) == 0: 

739 return [] 

740 wit_id = self.witnesses[0].id 

741 for rdg_support in self.readings_by_witness[wit_id]: 

742 nsymbols = max(nsymbols, len(rdg_support)) 

743 nexus_symbols = possible_symbols[:nsymbols] 

744 return nexus_symbols 

745 

746 def to_nexus( 

747 self, 

748 file_addr: Union[Path, str], 

749 drop_constant: bool = False, 

750 char_state_labels: bool = True, 

751 frequency: bool = False, 

752 ambiguous_as_missing: bool = False, 

753 calibrate_dates: bool = False, 

754 mrbayes: bool = False, 

755 clock_model: ClockModel = ClockModel.strict, 

756 ): 

757 """Writes this Collation to a NEXUS file with the given address. 

758 

759 Args: 

760 file_addr: A string representing the path to an output NEXUS file; the file type should be .nex, .nexus, or .nxs. 

761 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

762 char_state_labels: An optional flag indicating whether or not to include the CharStateLabels block. 

763 frequency: An optional flag indicating whether to use the StatesFormat=Frequency setting 

764 instead of the StatesFormat=StatesPresent setting 

765 (and thus represent all states with frequency vectors rather than symbols). 

766 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation. 

767 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data. 

768 If this flag is set, then only base symbols will be generated for the NEXUS file. 

769 It is only applied if the frequency option is False. 

770 calibrate_dates: An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses. 

771 This option is intended for inputs to BEAST 2. 

772 mrbayes: An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses. 

773 This option is intended for inputs to MrBayes. 

774 clock_model: A ClockModel option indicating which type of clock model to use. 

775 This option is intended for inputs to MrBayes and BEAST 2. 

776 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified. 

777 """ 

778 # Populate a list of sites that will correspond to columns of the sequence alignment: 

779 substantive_variation_unit_ids = self.variation_unit_ids 

780 if drop_constant: 

781 substantive_variation_unit_ids = [ 

782 vu_id 

783 for vu_id in self.variation_unit_ids 

784 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

785 ] 

786 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

787 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

788 # Start by calculating the values we will be using here: 

789 ntax = len(self.witnesses) 

790 nchar = len(substantive_variation_unit_ids) 

791 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses] 

792 max_taxlabel_length = max( 

793 [len(taxlabel) for taxlabel in taxlabels] 

794 ) # keep track of the longest taxon label for tabular alignment purposes 

795 charlabels = [slugify(vu_id, lowercase=False, separator='_') for vu_id in substantive_variation_unit_ids] 

796 missing_symbol = '?' 

797 symbols = self.get_nexus_symbols() 

798 # Generate all parent folders for this file that don't already exist: 

799 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

800 with open(file_addr, "w", encoding="utf-8") as f: 

801 # Start with the NEXUS header: 

802 f.write("#NEXUS\n\n") 

803 # Then begin the data block: 

804 f.write("Begin DATA;\n") 

805 # Write the collation matrix dimensions: 

806 f.write("\tDimensions ntax=%d nchar=%d;\n" % (ntax, nchar)) 

807 # Write the format subblock: 

808 f.write("\tFormat\n") 

809 f.write("\t\tDataType=Standard\n") 

810 f.write("\t\tMissing=%s\n" % missing_symbol) 

811 if frequency: 

812 f.write("\t\tStatesFormat=Frequency\n") 

813 f.write("\t\tSymbols=\"%s\";\n" % (" ".join(symbols))) 

814 # If the char_state_labels is set, then write the labels for character-state labels, with each on its own line: 

815 if char_state_labels: 

816 f.write("\tCharStateLabels") 

817 vu_ind = 1 

818 for vu in self.variation_units: 

819 if vu.id not in substantive_variation_unit_ids_set: 

820 continue 

821 if vu_ind == 1: 

822 f.write("\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_'))) 

823 else: 

824 f.write(",\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_'))) 

825 rdg_ind = 0 

826 for rdg in vu.readings: 

827 key = tuple([vu.id, rdg.id]) 

828 if key not in substantive_variation_unit_reading_tuples_set: 

829 continue 

830 ascii_rdg_text = slugify( 

831 rdg.text, lowercase=False, separator='_', replacements=[['η', 'h'], ['ω', 'w']] 

832 ) 

833 if ascii_rdg_text == "": 

834 ascii_rdg_text = "om." 

835 f.write(" %s" % ascii_rdg_text) 

836 rdg_ind += 1 

837 if rdg_ind > 0: 

838 vu_ind += 1 

839 f.write(";\n") 

840 # Write the matrix subblock: 

841 f.write("\tMatrix") 

842 for i, wit in enumerate(self.witnesses): 

843 taxlabel = taxlabels[i] 

844 if frequency: 

845 sequence = "\n\t\t" + taxlabel 

846 for j, vu_id in enumerate(self.variation_unit_ids): 

847 if vu_id not in substantive_variation_unit_ids_set: 

848 continue 

849 rdg_support = self.readings_by_witness[wit.id][j] 

850 sequence += "\n\t\t\t" 

851 # If this reading is lacunose in this witness, then use the missing character: 

852 if sum(rdg_support) == 0: 

853 sequence += missing_symbol 

854 continue 

855 # Otherwise, print out its frequencies for different readings in parentheses: 

856 sequence += "(" 

857 for j, w in enumerate(rdg_support): 

858 sequence += "%s:%0.4f" % (symbols[j], w) 

859 if j < len(rdg_support) - 1: 

860 sequence += " " 

861 sequence += ")" 

862 else: 

863 sequence = "\n\t\t" + taxlabel 

864 # Add enough space after this label ensure that all sequences are nicely aligned: 

865 sequence += " " * (max_taxlabel_length - len(taxlabel) + 1) 

866 for j, vu_id in enumerate(self.variation_unit_ids): 

867 if vu_id not in substantive_variation_unit_ids_set: 

868 continue 

869 rdg_support = self.readings_by_witness[wit.id][j] 

870 # If this reading is lacunose in this witness, then use the missing character: 

871 if sum(rdg_support) == 0: 

872 sequence += missing_symbol 

873 continue 

874 rdg_inds = [ 

875 k for k, w in enumerate(rdg_support) if w > 0 

876 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

877 # For singleton readings, just print the symbol: 

878 if len(rdg_inds) == 1: 

879 sequence += symbols[rdg_inds[0]] 

880 continue 

881 # For multiple readings, print the corresponding readings in braces or the missing symbol depending on input settings: 

882 if ambiguous_as_missing: 

883 sequence += missing_symbol 

884 else: 

885 sequence += "{%s}" % "".join([str(rdg_ind) for rdg_ind in rdg_inds]) 

886 f.write("%s" % (sequence)) 

887 f.write(";\n") 

888 # End the data block: 

889 f.write("End;") 

890 # If calibrate_dates is set, then add the assumptions block: 

891 if calibrate_dates: 

892 f.write("\n\n") 

893 f.write("Begin ASSUMPTIONS;\n") 

894 # Set the scale to years: 

895 f.write("\tOPTIONS SCALE = years;\n\n") 

896 # Then calibrate the witness ages: 

897 calibrate_strings = [] 

898 for i, wit in enumerate(self.witnesses): 

899 taxlabel = taxlabels[i] 

900 date_range = wit.date_range 

901 if date_range[0] is not None: 

902 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution, 

903 # depending on whether the upper and lower bounds match: 

904 min_age = datetime.now().year - date_range[1] 

905 max_age = datetime.now().year - date_range[0] 

906 if min_age == max_age: 

907 calibrate_string = "\tCALIBRATE %s = fixed(%d)" % (taxlabel, min_age) 

908 calibrate_strings.append(calibrate_string) 

909 else: 

910 calibrate_string = "\tCALIBRATE %s = uniform(%d,%d)" % (taxlabel, min_age, max_age) 

911 calibrate_strings.append(calibrate_string) 

912 else: 

913 # If there is no lower bound on the witness's date, then use an offset log-normal distribution: 

914 min_age = datetime.now().year - date_range[1] 

915 calibrate_string = "\tCALIBRATE %s = offsetlognormal(%d,0.0,1.0)" % (taxlabel, min_age) 

916 calibrate_strings.append(calibrate_string) 

917 # Then print the calibrate strings, separated by commas and line breaks and terminated by a semicolon: 

918 f.write("%s;\n\n" % ",\n".join(calibrate_strings)) 

919 # End the assumptions block: 

920 f.write("End;") 

921 # If mrbayes is set, then add the mrbayes block: 

922 if mrbayes: 

923 f.write("\n\n") 

924 f.write("Begin MRBAYES;\n") 

925 # Turn on the autoclose feature by default: 

926 f.write("\tset autoclose=yes;\n") 

927 # Set the branch lengths to be governed by a birth-death clock model, and set up the parameters for this model: 

928 f.write("\n") 

929 f.write("\tprset brlenspr = clock:birthdeath;\n") 

930 f.write("\tprset speciationpr = uniform(0.0,10.0);\n") 

931 f.write("\tprset extinctionpr = beta(2.0,4.0);\n") 

932 f.write("\tprset sampleprob = 0.01;\n") 

933 # Use the specified clock model: 

934 f.write("\n") 

935 if clock_model == clock_model.uncorrelated: 

936 f.write("\tprset clockvarpr=igr;\n") 

937 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n") 

938 f.write("\tprset igrvarpr=exponential(1.0);\n") 

939 else: 

940 f.write("\tprset clockvarpr=strict;\n") 

941 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n") 

942 # Set the priors on the tree age depending on the date range for the origin of the collated work: 

943 f.write("\n") 

944 if self.origin_date_range[0] is not None: 

945 min_tree_age = ( 

946 datetime.now().year - self.origin_date_range[1] 

947 if self.origin_date_range[1] is not None 

948 else 0.0 

949 ) 

950 max_tree_age = datetime.now().year - self.origin_date_range[0] 

951 f.write("\tprset treeagepr = uniform(%d,%d);\n" % (min_tree_age, max_tree_age)) 

952 else: 

953 min_tree_age = ( 

954 datetime.now().year - self.origin_date_range[1] 

955 if self.origin_date_range[1] is not None 

956 else 0.0 

957 ) 

958 f.write("\tprset treeagepr = offsetgamma(%d,1.0,1.0);\n" % (min_tree_age)) 

959 # Then calibrate the witness ages: 

960 f.write("\n") 

961 f.write("\tprset nodeagepr = calibrated;\n") 

962 for i, wit in enumerate(self.witnesses): 

963 taxlabel = taxlabels[i] 

964 date_range = wit.date_range 

965 if date_range[0] is not None: 

966 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution, 

967 # depending on whether the upper and lower bounds match: 

968 min_age = datetime.now().year - date_range[1] 

969 max_age = datetime.now().year - date_range[0] 

970 if min_age == max_age: 

971 f.write("\tcalibrate %s = fixed(%d);\n" % (taxlabel, min_age)) 

972 else: 

973 f.write("\tcalibrate %s = uniform(%d,%d);\n" % (taxlabel, min_age, max_age)) 

974 else: 

975 # If there is no lower bound on the witness's date, then use an offset gamma distribution: 

976 min_age = datetime.now().year - date_range[1] 

977 f.write("\tcalibrate %s = offsetgamma(%d,1.0,1.0);\n" % (taxlabel, min_age)) 

978 f.write("\n") 

979 # Add default settings for MCMC estimation of posterior distribution: 

980 f.write("\tmcmcp ngen=100000;\n") 

981 # Write the command to run MrBayes: 

982 f.write("\tmcmc;\n") 

983 # End the assumptions block: 

984 f.write("End;") 

985 return 

986 

987 def get_hennig86_symbols(self): 

988 """Returns a list of one-character symbols needed to represent the states of all substantive readings in Hennig86 format. 

989 

990 The number of symbols equals the maximum number of substantive readings at any variation unit. 

991 

992 Returns: 

993 A list of individual characters representing states in readings. 

994 """ 

995 possible_symbols = ( 

996 list(string.digits) + list(string.ascii_uppercase)[:22] 

997 ) # NOTE: the maximum number of symbols allowed in Hennig86 format is 32 

998 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

999 nsymbols = 0 

1000 # If there are no witnesses, then no symbols are needed at all: 

1001 if len(self.witnesses) == 0: 

1002 return [] 

1003 wit_id = self.witnesses[0].id 

1004 for rdg_support in self.readings_by_witness[wit_id]: 

1005 nsymbols = max(nsymbols, len(rdg_support)) 

1006 hennig86_symbols = possible_symbols[:nsymbols] 

1007 return hennig86_symbols 

1008 

1009 def to_hennig86(self, file_addr: Union[Path, str], drop_constant: bool = False): 

1010 """Writes this Collation to a file in Hennig86 format with the given address. 

1011 Note that because Hennig86 format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data. 

1012 

1013 Args: 

1014 file_addr: A string representing the path to an output file. 

1015 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1016 """ 

1017 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1018 substantive_variation_unit_ids = self.variation_unit_ids 

1019 if drop_constant: 

1020 substantive_variation_unit_ids = [ 

1021 vu_id 

1022 for vu_id in self.variation_unit_ids 

1023 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1024 ] 

1025 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1026 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1027 # Start by calculating the values we will be using here: 

1028 ntax = len(self.witnesses) 

1029 nchar = len(substantive_variation_unit_ids) 

1030 taxlabels = [] 

1031 for wit in self.witnesses: 

1032 taxlabel = wit.id 

1033 # Hennig86 format requires taxon names to start with a letter, so if this is not the case, then append "WIT_" to the start of the name: 

1034 if taxlabel[0] not in string.ascii_letters: 

1035 taxlabel = "WIT_" + taxlabel 

1036 # Then replace any disallowed characters in the string with an underscore: 

1037 taxlabel = slugify(taxlabel, lowercase=False, separator='_') 

1038 taxlabels.append(taxlabel) 

1039 max_taxlabel_length = max( 

1040 [len(taxlabel) for taxlabel in taxlabels] 

1041 ) # keep track of the longest taxon label for tabular alignment purposes 

1042 missing_symbol = '?' 

1043 symbols = self.get_hennig86_symbols() 

1044 # Generate all parent folders for this file that don't already exist: 

1045 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1046 with open(file_addr, "w", encoding="ascii") as f: 

1047 # Start with the nstates header: 

1048 f.write("nstates %d;\n" % len(symbols)) 

1049 # Then begin the xread block: 

1050 f.write("xread\n") 

1051 # Write the dimensions: 

1052 f.write("%d %d\n" % (nchar, ntax)) 

1053 # Now write the matrix: 

1054 for i, wit in enumerate(self.witnesses): 

1055 taxlabel = taxlabels[i] 

1056 # Add enough space after this label ensure that all sequences are nicely aligned: 

1057 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel) + 1)) 

1058 for j, vu_id in enumerate(self.variation_unit_ids): 

1059 if vu_id not in substantive_variation_unit_ids_set: 

1060 continue 

1061 rdg_support = self.readings_by_witness[wit.id][j] 

1062 # If this reading is lacunose in this witness, then use the missing character: 

1063 if sum(rdg_support) == 0: 

1064 sequence += missing_symbol 

1065 continue 

1066 rdg_inds = [ 

1067 k for k, w in enumerate(rdg_support) if w > 0 

1068 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1069 # For singleton readings, just print the symbol: 

1070 if len(rdg_inds) == 1: 

1071 sequence += symbols[rdg_inds[0]] 

1072 continue 

1073 # For multiple readings, print the missing symbol: 

1074 sequence += missing_symbol 

1075 f.write("%s\n" % (sequence)) 

1076 f.write(";") 

1077 return 

1078 

1079 def get_phylip_symbols(self): 

1080 """Returns a list of one-character symbols needed to represent the states of all substantive readings in PHYLIP format. 

1081 

1082 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1083 

1084 Returns: 

1085 A list of individual characters representing states in readings. 

1086 """ 

1087 possible_symbols = ( 

1088 list(string.digits) + list(string.ascii_lowercase)[:22] 

1089 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported 

1090 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1091 nsymbols = 0 

1092 # If there are no witnesses, then no symbols are needed at all: 

1093 if len(self.witnesses) == 0: 

1094 return [] 

1095 wit_id = self.witnesses[0].id 

1096 for rdg_support in self.readings_by_witness[wit_id]: 

1097 nsymbols = max(nsymbols, len(rdg_support)) 

1098 phylip_symbols = possible_symbols[:nsymbols] 

1099 return phylip_symbols 

1100 

1101 def to_phylip(self, file_addr: Union[Path, str], drop_constant: bool = False): 

1102 """Writes this Collation to a file in PHYLIP format with the given address. 

1103 Note that because PHYLIP format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data. 

1104 

1105 Args: 

1106 file_addr: A string representing the path to an output file. 

1107 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1108 """ 

1109 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1110 substantive_variation_unit_ids = self.variation_unit_ids 

1111 if drop_constant: 

1112 substantive_variation_unit_ids = [ 

1113 vu_id 

1114 for vu_id in self.variation_unit_ids 

1115 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1116 ] 

1117 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1118 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1119 # Start by calculating the values we will be using here: 

1120 ntax = len(self.witnesses) 

1121 nchar = len(substantive_variation_unit_ids) 

1122 taxlabels = [] 

1123 for wit in self.witnesses: 

1124 taxlabel = wit.id 

1125 # Then replace any disallowed characters in the string with an underscore: 

1126 taxlabel = slugify(taxlabel, lowercase=False, separator='_') 

1127 taxlabels.append(taxlabel) 

1128 max_taxlabel_length = max( 

1129 [len(taxlabel) for taxlabel in taxlabels] 

1130 ) # keep track of the longest taxon label for tabular alignment purposes 

1131 missing_symbol = '?' 

1132 symbols = self.get_phylip_symbols() 

1133 # Generate all parent folders for this file that don't already exist: 

1134 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1135 with open(file_addr, "w", encoding="ascii") as f: 

1136 # Write the dimensions: 

1137 f.write("%d %d\n" % (ntax, nchar)) 

1138 # Now write the matrix: 

1139 for i, wit in enumerate(self.witnesses): 

1140 taxlabel = taxlabels[i] 

1141 # Add enough space after this label ensure that all sequences are nicely aligned: 

1142 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel))) + "\t" 

1143 for j, vu_id in enumerate(self.variation_unit_ids): 

1144 if vu_id not in substantive_variation_unit_ids_set: 

1145 continue 

1146 rdg_support = self.readings_by_witness[wit.id][j] 

1147 # If this reading is lacunose in this witness, then use the missing character: 

1148 if sum(rdg_support) == 0: 

1149 sequence += missing_symbol 

1150 continue 

1151 rdg_inds = [ 

1152 k for k, w in enumerate(rdg_support) if w > 0 

1153 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1154 # For singleton readings, just print the symbol: 

1155 if len(rdg_inds) == 1: 

1156 sequence += symbols[rdg_inds[0]] 

1157 continue 

1158 # For multiple readings, print the missing symbol: 

1159 sequence += missing_symbol 

1160 f.write("%s\n" % (sequence)) 

1161 return 

1162 

1163 def get_fasta_symbols(self): 

1164 """Returns a list of one-character symbols needed to represent the states of all substantive readings in FASTA format. 

1165 

1166 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1167 

1168 Returns: 

1169 A list of individual characters representing states in readings. 

1170 """ 

1171 possible_symbols = ( 

1172 list(string.digits) + list(string.ascii_lowercase)[:22] 

1173 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported 

1174 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1175 nsymbols = 0 

1176 # If there are no witnesses, then no symbols are needed at all: 

1177 if len(self.witnesses) == 0: 

1178 return [] 

1179 wit_id = self.witnesses[0].id 

1180 for rdg_support in self.readings_by_witness[wit_id]: 

1181 nsymbols = max(nsymbols, len(rdg_support)) 

1182 fasta_symbols = possible_symbols[:nsymbols] 

1183 return fasta_symbols 

1184 

1185 def to_fasta(self, file_addr: Union[Path, str], drop_constant: bool = False): 

1186 """Writes this Collation to a file in FASTA format with the given address. 

1187 Note that because FASTA format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data. 

1188 

1189 Args: 

1190 file_addr: A string representing the path to an output file. 

1191 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1192 """ 

1193 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1194 substantive_variation_unit_ids = self.variation_unit_ids 

1195 if drop_constant: 

1196 substantive_variation_unit_ids = [ 

1197 vu_id 

1198 for vu_id in self.variation_unit_ids 

1199 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1200 ] 

1201 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1202 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1203 # Start by calculating the values we will be using here: 

1204 ntax = len(self.witnesses) 

1205 nchar = len(substantive_variation_unit_ids) 

1206 taxlabels = [] 

1207 for wit in self.witnesses: 

1208 taxlabel = wit.id 

1209 # Then replace any disallowed characters in the string with an underscore: 

1210 taxlabel = slugify(taxlabel, lowercase=False, separator='_') 

1211 taxlabels.append(taxlabel) 

1212 max_taxlabel_length = max( 

1213 [len(taxlabel) for taxlabel in taxlabels] 

1214 ) # keep track of the longest taxon label for tabular alignment purposes 

1215 missing_symbol = '?' 

1216 symbols = self.get_fasta_symbols() 

1217 # Generate all parent folders for this file that don't already exist: 

1218 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1219 with open(file_addr, "w", encoding="ascii") as f: 

1220 # Now write the matrix: 

1221 for i, wit in enumerate(self.witnesses): 

1222 taxlabel = taxlabels[i] 

1223 # Add enough space after this label ensure that all sequences are nicely aligned: 

1224 sequence = ">%s\n" % taxlabel 

1225 for j, vu_id in enumerate(self.variation_unit_ids): 

1226 if vu_id not in substantive_variation_unit_ids_set: 

1227 continue 

1228 rdg_support = self.readings_by_witness[wit.id][j] 

1229 # If this reading is lacunose in this witness, then use the missing character: 

1230 if sum(rdg_support) == 0: 

1231 sequence += missing_symbol 

1232 continue 

1233 rdg_inds = [ 

1234 k for k, w in enumerate(rdg_support) if w > 0 

1235 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

1236 # For singleton readings, just print the symbol: 

1237 if len(rdg_inds) == 1: 

1238 sequence += symbols[rdg_inds[0]] 

1239 continue 

1240 # For multiple readings, print the missing symbol: 

1241 sequence += missing_symbol 

1242 f.write("%s\n" % (sequence)) 

1243 return 

1244 

1245 def get_beast_symbols(self): 

1246 """Returns a list of one-character symbols needed to represent the states of all substantive readings in BEAST format. 

1247 

1248 The number of symbols equals the maximum number of substantive readings at any variation unit. 

1249 

1250 Returns: 

1251 A list of individual characters representing states in readings. 

1252 """ 

1253 possible_symbols = ( 

1254 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase) 

1255 ) # NOTE: for BEAST, any number of states should theoretically be permissible, but since code maps are required for some reason, we will limit the number of symbols to 62 for now 

1256 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

1257 nsymbols = 0 

1258 # If there are no witnesses, then no symbols are needed at all: 

1259 if len(self.witnesses) == 0: 

1260 return [] 

1261 wit_id = self.witnesses[0].id 

1262 for rdg_support in self.readings_by_witness[wit_id]: 

1263 nsymbols = max(nsymbols, len(rdg_support)) 

1264 beast_symbols = possible_symbols[:nsymbols] 

1265 return beast_symbols 

1266 

1267 def get_tip_date_range(self): 

1268 """Gets the minimum and maximum dates attested among the witnesses. 

1269 Also checks if the witness with the latest possible date has a fixed date 

1270 (i.e, if the lower and upper bounds for its date are the same) 

1271 and issues a warning if not, as this will cause unusual behavior in BEAST 2. 

1272 

1273 Returns: 

1274 A tuple containing the earliest and latest possible tip dates. 

1275 """ 

1276 earliest_date = None 

1277 earliest_wit = None 

1278 latest_date = None 

1279 latest_wit = None 

1280 for wit in self.witnesses: 

1281 wit_id = wit.id 

1282 date_range = wit.date_range 

1283 if date_range[0] is not None: 

1284 if earliest_date is not None: 

1285 earliest_wit = wit if date_range[0] < earliest_date else earliest_wit 

1286 earliest_date = min(date_range[0], earliest_date) 

1287 else: 

1288 earliest_wit = wit 

1289 earliest_date = date_range[0] 

1290 if date_range[1] is not None: 

1291 if latest_date is not None: 

1292 latest_wit = ( 

1293 wit 

1294 if (date_range[1] > latest_date or (date_range[0] == date_range[1] == latest_date)) 

1295 else latest_wit 

1296 ) # the second check ensures that a witness with a fixed date is preferred to a witness with a date range that ends with the same date 

1297 latest_date = max(date_range[1], latest_date) 

1298 else: 

1299 latest_wit = wit 

1300 latest_date = date_range[1] 

1301 if latest_wit.date_range[0] is None or latest_wit.date_range[0] != latest_wit.date_range[1]: 

1302 print( 

1303 "WARNING: the latest witness, %s, has a variable date range; this will result in problems with time-dependent substitution models and misalignment of trees in BEAST DensiTree outputs! Please ensure that witness %s has a fixed date." 

1304 % (latest_wit.id, latest_wit.id) 

1305 ) 

1306 return (earliest_date, latest_date) 

1307 

1308 def get_beast_origin_span(self, tip_date_range): 

1309 """Returns a tuple containing the lower and upper bounds for the height of the origin of the Birth-Death Skyline model. 

1310 The upper bound on the height of the tree is the difference between the latest tip date 

1311 and the lower bound on the date of the original work, if both are defined; 

1312 otherwise, it is left undefined. 

1313 The lower bound on the height of the tree is the difference between the latest tip date 

1314 and the upper bound on the date of the original work, if both are defined; 

1315 otherwise, it is the difference between the earliest tip date and the latest, if both are defined. 

1316 

1317 Args: 

1318 tip_date_range: A tuple containing the earliest and latest possible tip dates. 

1319 

1320 Returns: 

1321 A tuple containing lower and upper bounds on the origin height for the Birth-Death Skyline model. 

1322 """ 

1323 origin_span = [0, None] 

1324 # If the upper bound on the date of the work's composition is defined, then set the lower bound on the height of the origin using it and the latest tip date 

1325 # (note that if it had to be defined in terms of witness date lower bounds, then this would have happened already): 

1326 if self.origin_date_range[1] is not None: 

1327 origin_span[0] = tip_date_range[1] - self.origin_date_range[1] 

1328 # If the lower bound on the date of the work's composition is defined, then set the upper bound on the height of the origin using it and the latest tip date: 

1329 if self.origin_date_range[0] is not None: 

1330 origin_span[1] = tip_date_range[1] - self.origin_date_range[0] 

1331 return tuple(origin_span) 

1332 

1333 def get_beast_date_map(self, taxlabels): 

1334 """Returns a string representing witness-to-date mappings in BEAST format. 

1335 

1336 Since this format requires single dates as opposed to date ranges, 

1337 witnesses with closed date ranges will be mapped to the average of their lower and upper bounds, 

1338 and witnesses with open date ranges will not be mapped. 

1339 

1340 Args: 

1341 taxlabels: A list of slugified taxon labels. 

1342 

1343 Returns: 

1344 A string containing comma-separated date calibrations of the form witness_id=date. 

1345 """ 

1346 calibrate_strings = [] 

1347 for i, wit in enumerate(self.witnesses): 

1348 taxlabel = taxlabels[i] 

1349 date_range = wit.date_range 

1350 # If either end of this witness's date range is empty, then do not include it: 

1351 if date_range[0] is None or date_range[1] is None: 

1352 continue 

1353 # Otherwise, take the midpoint of its date range as its date: 

1354 date = int((date_range[0] + date_range[1]) / 2) 

1355 calibrate_string = "%s=%d" % (taxlabel, date) 

1356 calibrate_strings.append(calibrate_string) 

1357 # Then output the full date map string: 

1358 date_map = ",".join(calibrate_strings) 

1359 return date_map 

1360 

1361 def get_beast_code_map_for_unit(self, symbols, missing_symbol, vu_ind): 

1362 """Returns a string containing state/reading code mappings in BEAST format using the given single-state and missing state symbols for the character/variation unit at the given index. 

1363 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a code for a dummy state will be included. 

1364 

1365 Args: 

1366 vu_ind: An integer index for the desired unit. 

1367 

1368 Returns: 

1369 A string containing comma-separated code mappings. 

1370 """ 

1371 vu = self.variation_units[vu_ind] 

1372 vu_id = vu.id 

1373 code_map = {} 

1374 for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id])): 

1375 code_map[symbols[k]] = str(k) 

1376 # If this site is a singleton site, then add a code mapping for the dummy state: 

1377 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1378 code_map[symbols[1]] = str(1) 

1379 # Then add a mapping for the missing state, including a dummy state if this is a singleton site: 

1380 code_map[missing_symbol] = " ".join( 

1381 str(k) for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id])) 

1382 ) 

1383 # If this site is a singleton site, then add the dummy state to the missing state mapping: 

1384 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1385 code_map[missing_symbol] = code_map[missing_symbol] + " " + str(1) 

1386 # Then combine all of the mappings into a single string: 

1387 code_map_string = ", ".join([code + "=" + code_map[code] for code in code_map]) 

1388 return code_map_string 

1389 

1390 def get_beast_equilibrium_frequencies_for_unit(self, vu_ind): 

1391 """Returns a string containing state/reading equilibrium frequencies in BEAST format for the character/variation unit at the given index. 

1392 Since the equilibrium frequencies are not used with the substitution models, the equilibrium frequencies simply correspond to a uniform distribution over the states. 

1393 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then an equilibrium frequency of 0 will be added for a dummy state. 

1394 

1395 Args: 

1396 vu_ind: An integer index for the desired unit. 

1397 

1398 Returns: 

1399 A string containing space-separated equilibrium frequencies. 

1400 """ 

1401 vu = self.variation_units[vu_ind] 

1402 vu_id = vu.id 

1403 # If this unit is a singleton, then return the string "0.5 0.5": 

1404 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1: 

1405 return "0.5 0.5" 

1406 # Otherwise, set the equilibrium frequencies according to a uniform distribution: 

1407 equilibrium_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len( 

1408 self.substantive_readings_by_variation_unit_id[vu_id] 

1409 ) 

1410 equilibrium_frequencies_string = " ".join([str(w) for w in equilibrium_frequencies]) 

1411 return equilibrium_frequencies_string 

1412 

1413 def get_beast_root_frequencies_for_unit(self, vu_ind): 

1414 """Returns a string containing state/reading root frequencies in BEAST format for the character/variation unit at the given index. 

1415 The root frequencies are calculated from the intrinsic odds at this unit. 

1416 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a root frequency of 0 will be added for a dummy state. 

1417 If no intrinsic odds are specified, then a uniform distribution over all states is assumed. 

1418 

1419 Args: 

1420 vu_ind: An integer index for the desired unit. 

1421 

1422 Returns: 

1423 A string containing space-separated root frequencies. 

1424 """ 

1425 vu = self.variation_units[vu_ind] 

1426 vu_id = vu.id 

1427 intrinsic_relations = vu.intrinsic_relations 

1428 intrinsic_odds_by_id = self.intrinsic_odds_by_id 

1429 # If this unit is a singleton, then return the string "1 0": 

1430 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1: 

1431 return "1 0" 

1432 # If this unit has no intrinsic odds, then assume a uniform distribution over all readings: 

1433 if len(intrinsic_relations) == 0: 

1434 root_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len( 

1435 self.substantive_readings_by_variation_unit_id[vu_id] 

1436 ) 

1437 root_frequencies_string = " ".join([str(w) for w in root_frequencies]) 

1438 return root_frequencies_string 

1439 # We will populate the root frequencies based on the intrinsic odds of the readings: 

1440 root_frequencies_by_id = {} 

1441 for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id]: 

1442 root_frequencies_by_id[rdg_id] = 0 

1443 # First, construct an adjacency list for efficient edge iteration: 

1444 neighbors_by_source = {} 

1445 for edge in intrinsic_relations: 

1446 s = edge[0] 

1447 t = edge[1] 

1448 if s not in neighbors_by_source: 

1449 neighbors_by_source[s] = [] 

1450 if t not in neighbors_by_source: 

1451 neighbors_by_source[t] = [] 

1452 neighbors_by_source[s].append(t) 

1453 # Next, identify all readings that are not targeted by any intrinsic odds relation: 

1454 in_degree_by_reading = {} 

1455 for edge in intrinsic_relations: 

1456 s = edge[0] 

1457 t = edge[1] 

1458 if s not in in_degree_by_reading: 

1459 in_degree_by_reading[s] = 0 

1460 if t not in in_degree_by_reading: 

1461 in_degree_by_reading[t] = 0 

1462 in_degree_by_reading[t] += 1 

1463 starting_nodes = [t for t in in_degree_by_reading if in_degree_by_reading[t] == 0] 

1464 # Set the root frequencies for these readings to 1 (they will be normalized later): 

1465 for starting_node in starting_nodes: 

1466 root_frequencies_by_id[starting_node] = 1.0 

1467 # Next, set the frequencies for the remaining readings recursively using the adjacency list: 

1468 def update_root_frequencies(s): 

1469 for t in neighbors_by_source[s]: 

1470 intrinsic_category = intrinsic_relations[(s, t)] 

1471 odds = ( 

1472 intrinsic_odds_by_id[intrinsic_category] 

1473 if intrinsic_odds_by_id[intrinsic_category] is not None 

1474 else 1.0 

1475 ) # TODO: This needs to be handled using parameters once we have it implemented in BEAST 

1476 root_frequencies_by_id[t] = root_frequencies_by_id[s] / odds 

1477 update_root_frequencies(t) 

1478 return 

1479 

1480 for starting_node in starting_nodes: 

1481 update_root_frequencies(starting_node) 

1482 # Then produce a normalized vector of root frequencies that corresponds to a probability distribution: 

1483 root_frequencies = [ 

1484 root_frequencies_by_id[rdg_id] for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id] 

1485 ] 

1486 total_frequencies = sum(root_frequencies) 

1487 for k in range(len(root_frequencies)): 

1488 root_frequencies[k] = root_frequencies[k] / total_frequencies 

1489 root_frequencies_string = " ".join([str(w) for w in root_frequencies]) 

1490 return root_frequencies_string 

1491 

1492 def to_beast( 

1493 self, 

1494 file_addr: Union[Path, str], 

1495 drop_constant: bool = False, 

1496 clock_model: ClockModel = ClockModel.strict, 

1497 ancestral_logger: AncestralLogger = AncestralLogger.state, 

1498 seed: int = None, 

1499 ): 

1500 """Writes this Collation to a file in BEAST format with the given address. 

1501 

1502 Args: 

1503 file_addr: A string representing the path to an output file. 

1504 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1505 clock_model: A ClockModel option indicating which clock model to use. 

1506 ancestral_logger: An AncestralLogger option indicating which class of logger (if any) to use for ancestral states. 

1507 seed: A seed for random number generation (for setting initial values of unspecified transcriptional rates). 

1508 """ 

1509 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1510 substantive_variation_unit_ids = self.variation_unit_ids 

1511 if drop_constant: 

1512 substantive_variation_unit_ids = [ 

1513 vu_id 

1514 for vu_id in self.variation_unit_ids 

1515 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1516 ] 

1517 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1518 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1519 # First, calculate the values we will be using for the main template: 

1520 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses] 

1521 missing_symbol = '?' 

1522 symbols = self.get_beast_symbols() 

1523 tip_date_range = self.get_tip_date_range() 

1524 origin_span = self.get_beast_origin_span(tip_date_range) 

1525 date_map = self.get_beast_date_map(taxlabels) 

1526 # Then populate the necessary objects for the BEAST XML Jinja template: 

1527 witness_objects = [] 

1528 variation_unit_objects = [] 

1529 intrinsic_category_objects = [] 

1530 transcriptional_category_objects = [] 

1531 # Start with witnesses: 

1532 for i, wit in enumerate(self.witnesses): 

1533 witness_object = {} 

1534 # Copy the ID for this witness: 

1535 witness_object["id"] = wit.id 

1536 # Copy its date bounds: 

1537 witness_object["min_date"] = wit.date_range[0] 

1538 witness_object["max_date"] = wit.date_range[1] 

1539 # Populate its sequence from its entries in the witness's readings dictionary: 

1540 sequence = "" 

1541 for j, rdg_support in enumerate(self.readings_by_witness[wit.id]): 

1542 vu_id = self.variation_unit_ids[j] 

1543 # Skip any variation units deemed non-substantive: 

1544 if vu_id not in substantive_variation_unit_ids: 

1545 continue 

1546 # If this witness has a certainty of 0 for all readings, then it is a gap; assign a likelihood of 1 to each reading: 

1547 if sum(rdg_support) == 0: 

1548 for k, w in enumerate(rdg_support): 

1549 sequence += "1" 

1550 if k < len(rdg_support) - 1: 

1551 sequence += ", " 

1552 else: 

1553 if len(rdg_support) > 1: 

1554 sequence += "; " 

1555 else: 

1556 # If this site is a singleton site, then add a dummy state: 

1557 sequence += ", 0; " 

1558 # Otherwise, read the probabilities as they are given: 

1559 else: 

1560 for k, w in enumerate(rdg_support): 

1561 sequence += str(w) 

1562 if k < len(rdg_support) - 1: 

1563 sequence += ", " 

1564 else: 

1565 if len(rdg_support) > 1: 

1566 sequence += "; " 

1567 else: 

1568 # If this site is a singleton site, then add a dummy state: 

1569 sequence += ", 0; " 

1570 # Strip the final semicolon and space from the sequence: 

1571 sequence = sequence.strip("; ") 

1572 # Then set the witness object's sequence attribute to this string: 

1573 witness_object["sequence"] = sequence 

1574 witness_objects.append(witness_object) 

1575 # Then proceed to variation units: 

1576 for j, vu in enumerate(self.variation_units): 

1577 if vu.id not in substantive_variation_unit_ids_set: 

1578 continue 

1579 variation_unit_object = {} 

1580 # Copy the ID of this variation unit: 

1581 variation_unit_object["id"] = vu.id 

1582 # Copy this variation unit's number of substantive readings, 

1583 # setting it to 2 if it is a singleton unit: 

1584 variation_unit_object["nstates"] = ( 

1585 len(self.substantive_readings_by_variation_unit_id[vu.id]) 

1586 if len(self.substantive_readings_by_variation_unit_id[vu.id]) > 1 

1587 else 2 

1588 ) 

1589 # Then construct the code map for this unit: 

1590 variation_unit_object["code_map"] = self.get_beast_code_map_for_unit(symbols, missing_symbol, j) 

1591 # Then populate a comma-separated string of reading labels for this unit: 

1592 rdg_texts = [] 

1593 vu_label = vu.id 

1594 for rdg in vu.readings: 

1595 key = tuple([vu.id, rdg.id]) 

1596 if key not in substantive_variation_unit_reading_tuples_set: 

1597 continue 

1598 rdg_text = slugify(rdg.text, lowercase=False, allow_unicode=True, separator='_') 

1599 # Replace any empty reading text with an omission marker: 

1600 if rdg_text == "": 

1601 rdg_text = "om." 

1602 rdg_texts.append(rdg_text) 

1603 # If this site is a singleton site, then add a dummy reading for the dummy state: 

1604 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1605 rdg_texts.append("DUMMY") 

1606 rdg_texts_string = ", ".join(rdg_texts) 

1607 variation_unit_object["rdg_texts"] = rdg_texts_string 

1608 # Then populate this unit's equilibrium frequency string and its root frequency string: 

1609 variation_unit_object["equilibrium_frequencies"] = self.get_beast_equilibrium_frequencies_for_unit(j) 

1610 variation_unit_object["root_frequencies"] = self.get_beast_root_frequencies_for_unit(j) 

1611 # Then populate a dictionary mapping epoch height ranges to lists of off-diagonal entries for substitution models: 

1612 rate_objects_by_epoch_height_range = {} 

1613 epoch_height_ranges = [] 

1614 # Then proceed based on whether the transcriptional relations for this variation unit have been defined: 

1615 if len(vu.transcriptional_relations_by_date_range) == 0: 

1616 # If there are no transcriptional relations, then map the epoch range of (None, None) to their list of off-diagonal entries: 

1617 epoch_height_ranges.append((None, None)) 

1618 rate_objects_by_epoch_height_range[(None, None)] = [] 

1619 rate_objects = rate_objects_by_epoch_height_range[(None, None)] 

1620 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1: 

1621 # If this is a singleton site, then use an arbitrary 2x2 rate matrix: 

1622 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1623 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1624 else: 

1625 # If this is a site with multiple substantive readings, but no transcriptional relations list, 

1626 # then use a Lewis Mk substitution matrix with the appropriate number of states: 

1627 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1628 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1629 # Skip diagonal elements: 

1630 if k_1 == k_2: 

1631 continue 

1632 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1633 else: 

1634 # Otherwise, proceed for every date range: 

1635 for date_range in vu.transcriptional_relations_by_date_range: 

1636 # Get the map of transcriptional relations for reference later: 

1637 transcriptional_relations = vu.transcriptional_relations_by_date_range[date_range] 

1638 # Now get the epoch height range corresponding to this date range, and initialize its list in the dictionary: 

1639 epoch_height_range = [None, None] 

1640 epoch_height_range[0] = tip_date_range[1] - date_range[1] if date_range[1] is not None else None 

1641 epoch_height_range[1] = tip_date_range[1] - date_range[0] if date_range[0] is not None else None 

1642 epoch_height_range = tuple(epoch_height_range) 

1643 epoch_height_ranges.append(epoch_height_range) 

1644 rate_objects_by_epoch_height_range[epoch_height_range] = [] 

1645 rate_objects = rate_objects_by_epoch_height_range[epoch_height_range] 

1646 # Then proceed for every pair of readings in this unit: 

1647 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1648 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]): 

1649 # Skip diagonal elements: 

1650 if k_1 == k_2: 

1651 continue 

1652 # If the first reading has no transcriptional relation to the second in this unit, then use the default rate: 

1653 if (rdg_id_1, rdg_id_2) not in transcriptional_relations: 

1654 rate_objects.append({"transcriptional_categories": ["default"], "expression": None}) 

1655 continue 

1656 # Otherwise, if only one category of transcriptional relations holds between the first and second readings, 

1657 # then use its rate: 

1658 if len(transcriptional_relations[(rdg_id_1, rdg_id_2)]) == 1: 

1659 # If there is only one such category, then add its rate as a standalone var element: 

1660 transcriptional_category = list(transcriptional_relations[(rdg_id_1, rdg_id_2)])[0] 

1661 rate_objects.append( 

1662 {"transcriptional_categories": [transcriptional_category], "expression": None} 

1663 ) 

1664 continue 

1665 # If there is more than one, then add a var element that is a sum of the individual categories' rates: 

1666 transcriptional_categories = list(transcriptional_relations[(rdg_id_1, rdg_id_2)]) 

1667 args = [] 

1668 for transcriptional_category in transcriptional_categories: 

1669 args.append("%s_rate" % transcriptional_category) 

1670 args_string = " ".join(args) 

1671 ops = ["+"] * (len(args) - 1) 

1672 ops_string = " ".join(ops) 

1673 expression_string = " ".join([args_string, ops_string]) 

1674 rate_objects.append( 

1675 { 

1676 "transcriptional_categories": transcriptional_categories, 

1677 "expression": expression_string, 

1678 } 

1679 ) 

1680 # Now reorder the list of epoch height ranges, and get a list of non-null epoch dates in ascending order from the dictionary: 

1681 epoch_height_ranges.reverse() 

1682 epoch_heights = [ 

1683 epoch_height_range[0] for epoch_height_range in epoch_height_ranges if epoch_height_range[0] is not None 

1684 ] 

1685 # Then add all of these data structures to the variation unit object: 

1686 variation_unit_object["epoch_heights"] = epoch_heights 

1687 variation_unit_object["epoch_heights_string"] = " ".join( 

1688 [str(epoch_height) for epoch_height in epoch_heights] 

1689 ) 

1690 variation_unit_object["epoch_height_ranges"] = epoch_height_ranges 

1691 variation_unit_object["epoch_rates"] = [ 

1692 rate_objects_by_epoch_height_range[epoch_height_range] for epoch_height_range in epoch_height_ranges 

1693 ] 

1694 variation_unit_objects.append(variation_unit_object) 

1695 # Then proceed to intrinsic odds categories: 

1696 for intrinsic_category in self.intrinsic_categories: 

1697 intrinsic_category_object = {} 

1698 # Copy the ID of this intrinsic category: 

1699 intrinsic_category_object["id"] = intrinsic_category 

1700 # Then copy the odds factors associated with this intrinsic category, 

1701 # setting it to 1.0 if it is not specified and setting the estimate attribute accordingly: 

1702 odds = self.intrinsic_odds_by_id[intrinsic_category] 

1703 intrinsic_category_object["odds"] = odds if odds is not None else 1.0 

1704 intrinsic_category_object["estimate"] = "false" if odds is not None else "true" 

1705 intrinsic_category_objects.append(intrinsic_category_object) 

1706 # Then proceed to transcriptional rate categories: 

1707 rng = np.random.default_rng(seed) 

1708 for transcriptional_category in self.transcriptional_categories: 

1709 transcriptional_category_object = {} 

1710 # Copy the ID of this transcriptional category: 

1711 transcriptional_category_object["id"] = transcriptional_category 

1712 # Then copy the rate of this transcriptional category, 

1713 # setting it to a random number sampled from a Gamma distribution if it is not specified and setting the estimate attribute accordingly: 

1714 rate = self.transcriptional_rates_by_id[transcriptional_category] 

1715 transcriptional_category_object["rate"] = rate if rate is not None else rng.gamma(5.0, 2.0) 

1716 transcriptional_category_object["estimate"] = "false" if rate is not None else "true" 

1717 transcriptional_category_objects.append(transcriptional_category_object) 

1718 # Now render the output XML file using the Jinja template: 

1719 env = Environment(loader=PackageLoader("teiphy", "templates"), autoescape=select_autoescape()) 

1720 template = env.get_template("beast_template.xml") 

1721 rendered = template.render( 

1722 nsymbols=len(symbols), 

1723 date_map=date_map, 

1724 origin_span=origin_span, 

1725 clock_model=clock_model.value, 

1726 clock_rate_categories=2 * len(self.witnesses) - 1, 

1727 ancestral_logger=ancestral_logger.value, 

1728 witnesses=witness_objects, 

1729 variation_units=variation_unit_objects, 

1730 intrinsic_categories=intrinsic_category_objects, 

1731 transcriptional_categories=transcriptional_category_objects, 

1732 ) 

1733 # Generate all parent folders for this file that don't already exist: 

1734 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

1735 with open(file_addr, "w", encoding="utf-8") as f: 

1736 f.write(rendered) 

1737 return 

1738 

1739 def to_numpy(self, drop_constant: bool = False, split_missing: SplitMissingType = None): 

1740 """Returns this Collation in the form of a NumPy array, along with arrays of its row and column labels. 

1741 

1742 Args: 

1743 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1744 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

1745 If not specified, then missing data is ignored (i.e., all states are 0). 

1746 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

1747 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

1748 

1749 Returns: 

1750 A NumPy array with a row for each substantive reading and a column for each witness. 

1751 A list of substantive reading ID strings. 

1752 A list of witness ID strings. 

1753 """ 

1754 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1755 substantive_variation_unit_ids = self.variation_unit_ids 

1756 if drop_constant: 

1757 substantive_variation_unit_ids = [ 

1758 vu_id 

1759 for vu_id in self.variation_unit_ids 

1760 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1761 ] 

1762 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1763 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1764 # Initialize the output array with the appropriate dimensions: 

1765 reading_labels = [] 

1766 for vu in self.variation_units: 

1767 if vu.id not in substantive_variation_unit_ids_set: 

1768 continue 

1769 for rdg in vu.readings: 

1770 key = tuple([vu.id, rdg.id]) 

1771 if key in substantive_variation_unit_reading_tuples_set: 

1772 reading_labels.append(vu.id + ", " + rdg.text) 

1773 witness_labels = [wit.id for wit in self.witnesses] 

1774 matrix = np.zeros((len(reading_labels), len(witness_labels)), dtype=float) 

1775 # For each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings: 

1776 support_proportions_by_unit = {} 

1777 for j, vu_id in enumerate(self.variation_unit_ids): 

1778 if vu_id not in substantive_variation_unit_ids_set: 

1779 continue 

1780 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

1781 for i, wit in enumerate(self.witnesses): 

1782 rdg_support = self.readings_by_witness[wit.id][j] 

1783 for l, w in enumerate(rdg_support): 

1784 support_proportions[l] += w 

1785 norm = ( 

1786 sum(support_proportions) if sum(support_proportions) > 0 else 1.0 

1787 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

1788 for l in range(len(support_proportions)): 

1789 support_proportions[l] = support_proportions[l] / norm 

1790 support_proportions_by_unit[vu_id] = support_proportions 

1791 # Then populate it with the appropriate values: 

1792 col_ind = 0 

1793 for i, wit in enumerate(self.witnesses): 

1794 row_ind = 0 

1795 for j, vu_id in enumerate(self.variation_unit_ids): 

1796 if vu_id not in substantive_variation_unit_ids_set: 

1797 continue 

1798 rdg_support = self.readings_by_witness[wit.id][j] 

1799 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

1800 if sum(rdg_support) == 0: 

1801 if split_missing == SplitMissingType.uniform: 

1802 for l in range(len(rdg_support)): 

1803 matrix[row_ind, col_ind] = 1 / len(rdg_support) 

1804 row_ind += 1 

1805 elif split_missing == SplitMissingType.proportional: 

1806 for l in range(len(rdg_support)): 

1807 matrix[row_ind, col_ind] = support_proportions_by_unit[vu_id][l] 

1808 row_ind += 1 

1809 else: 

1810 row_ind += len(rdg_support) 

1811 # Otherwise, add its coefficients normally: 

1812 else: 

1813 for l in range(len(rdg_support)): 

1814 matrix[row_ind, col_ind] = rdg_support[l] 

1815 row_ind += 1 

1816 col_ind += 1 

1817 return matrix, reading_labels, witness_labels 

1818 

1819 def to_distance_matrix(self, drop_constant: bool = False, proportion: bool = False, show_ext: bool = False): 

1820 """Transforms this Collation into a NumPy distance matrix between witnesses, along with an array of its labels for the witnesses. 

1821 Distances can be computed either as counts of disagreements (the default setting), or as proportions of disagreements over all variation units where both witnesses have singleton readings. 

1822 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of disagreements. 

1823 

1824 Args: 

1825 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1826 Default value is False. 

1827 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

1828 Default value is False. 

1829 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

1830 should include the number of their extant, unambiguous variation units after the number of their disagreements. 

1831 Default value is False. 

1832 

1833 Returns: 

1834 A NumPy distance matrix with a row and column for each witness. 

1835 A list of witness ID strings. 

1836 """ 

1837 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1838 substantive_variation_unit_ids = self.variation_unit_ids 

1839 if drop_constant: 

1840 substantive_variation_unit_ids = [ 

1841 vu_id 

1842 for vu_id in self.variation_unit_ids 

1843 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1844 ] 

1845 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1846 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1847 # Initialize the output array with the appropriate dimensions: 

1848 witness_labels = [wit.id for wit in self.witnesses] 

1849 # The type of the matrix will depend on the input options: 

1850 matrix = None 

1851 if show_ext: 

1852 matrix = np.full( 

1853 (len(witness_labels), len(witness_labels)), "NA", dtype=object 

1854 ) # strings of the form "disagreements/extant" 

1855 elif proportion: 

1856 matrix = np.full( 

1857 (len(witness_labels), len(witness_labels)), 0.0, dtype=float 

1858 ) # floats of the form disagreements/extant 

1859 else: 

1860 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form disagreements 

1861 for i, wit_1 in enumerate(witness_labels): 

1862 for j, wit_2 in enumerate(witness_labels): 

1863 extant_units = 0 

1864 disagreements = 0 

1865 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified, 

1866 # so we only have to calculate it once: 

1867 if i > j: 

1868 continue 

1869 # Otherwise, calculate the number of units where both witnesses have unambiguous readings 

1870 # and the number of units where they disagree: 

1871 for k, vu_id in enumerate(self.variation_unit_ids): 

1872 if vu_id not in substantive_variation_unit_ids_set: 

1873 continue 

1874 wit_1_rdg_support = self.readings_by_witness[wit_1][k] 

1875 wit_2_rdg_support = self.readings_by_witness[wit_2][k] 

1876 wit_1_rdg_inds = [l for l, w in enumerate(wit_1_rdg_support) if w > 0] 

1877 wit_2_rdg_inds = [l for l, w in enumerate(wit_2_rdg_support) if w > 0] 

1878 if len(wit_1_rdg_inds) != 1 or len(wit_2_rdg_inds) != 1: 

1879 continue 

1880 extant_units += 1 

1881 if wit_1_rdg_inds[0] != wit_2_rdg_inds[0]: 

1882 disagreements += 1 

1883 cell_entry = None 

1884 if proportion: 

1885 cell_entry = disagreements / max( 

1886 extant_units, 1 

1887 ) # the max in the denominator is to prevent division by 0; the distance entry will be 0 if the two witnesses have no overlap 

1888 else: 

1889 cell_entry = disagreements 

1890 if show_ext: 

1891 cell_entry = str(cell_entry) + "/" + str(extant_units) 

1892 matrix[i, j] = cell_entry 

1893 matrix[j, i] = cell_entry 

1894 return matrix, witness_labels 

1895 

1896 def to_similarity_matrix(self, drop_constant: bool = False, proportion: bool = False, show_ext: bool = False): 

1897 """Transforms this Collation into a NumPy similarity matrix between witnesses, along with an array of its labels for the witnesses. 

1898 Similarities can be computed either as counts of agreements (the default setting), or as proportions of agreements over all variation units where both witnesses have singleton readings. 

1899 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of agreements. 

1900 

1901 Args: 

1902 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1903 Default value is False. 

1904 proportion (bool, optional): An optional flag indicating whether or not to calculate similarities as proportions over extant, unambiguous variation units. 

1905 Default value is False. 

1906 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

1907 should include the number of their extant, unambiguous variation units after the number of agreements. 

1908 Default value is False. 

1909 

1910 Returns: 

1911 A NumPy agreement matrix with a row and column for each witness. 

1912 A list of witness ID strings. 

1913 """ 

1914 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1915 substantive_variation_unit_ids = self.variation_unit_ids 

1916 if drop_constant: 

1917 substantive_variation_unit_ids = [ 

1918 vu_id 

1919 for vu_id in self.variation_unit_ids 

1920 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

1921 ] 

1922 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

1923 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

1924 # Initialize the output array with the appropriate dimensions: 

1925 witness_labels = [wit.id for wit in self.witnesses] 

1926 # The type of the matrix will depend on the input options: 

1927 matrix = None 

1928 if show_ext: 

1929 matrix = np.full( 

1930 (len(witness_labels), len(witness_labels)), "NA", dtype=object 

1931 ) # strings of the form "agreements/extant" 

1932 elif proportion: 

1933 matrix = np.full( 

1934 (len(witness_labels), len(witness_labels)), 0.0, dtype=float 

1935 ) # floats of the form agreements/extant 

1936 else: 

1937 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form agreements 

1938 for i, wit_1 in enumerate(witness_labels): 

1939 for j, wit_2 in enumerate(witness_labels): 

1940 extant_units = 0 

1941 agreements = 0 

1942 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified, 

1943 # so we only have to calculate it once: 

1944 if i > j: 

1945 continue 

1946 # Otherwise, calculate the number of units where both witnesses have unambiguous readings 

1947 # and the number of units where they agree: 

1948 for k, vu_id in enumerate(self.variation_unit_ids): 

1949 if vu_id not in substantive_variation_unit_ids_set: 

1950 continue 

1951 wit_1_rdg_support = self.readings_by_witness[wit_1][k] 

1952 wit_2_rdg_support = self.readings_by_witness[wit_2][k] 

1953 wit_1_rdg_inds = [l for l, w in enumerate(wit_1_rdg_support) if w > 0] 

1954 wit_2_rdg_inds = [l for l, w in enumerate(wit_2_rdg_support) if w > 0] 

1955 if len(wit_1_rdg_inds) != 1 or len(wit_2_rdg_inds) != 1: 

1956 continue 

1957 extant_units += 1 

1958 if wit_1_rdg_inds[0] == wit_2_rdg_inds[0]: 

1959 agreements += 1 

1960 cell_entry = None 

1961 if proportion: 

1962 cell_entry = agreements / max( 

1963 extant_units, 1 

1964 ) # the max in the denominator is to prevent division by 0; the distance entry will be 0 if the two witnesses have no overlap 

1965 else: 

1966 cell_entry = agreements 

1967 if show_ext: 

1968 cell_entry = str(cell_entry) + "/" + str(extant_units) 

1969 matrix[i, j] = cell_entry 

1970 matrix[j, i] = cell_entry 

1971 return matrix, witness_labels 

1972 

1973 def to_idf_matrix(self, drop_constant: bool = False, split_missing: SplitMissingType = None): 

1974 """Transforms this Collation into a NumPy matrix of agreements between witnesses weighted by inverse document frequency (IDF), along with an array of its labels for the witnesses. 

1975 The IDF weight of an agreement on a given reading is the information content -log(Pr(R)) of the event R of randomly sampling a witness with that reading. 

1976 The IDF-weighted agreement score for two witnesses is the sum of the IDF weights for the readings at which they agree. 

1977 Where any witness is ambiguous, it contributes to its potential readings' sampling probabilities in proportion to its degrees of support for those readings. 

1978 Similarly, where one or both target witnesses are ambiguous, the expected information content of their agreement is calculated based on the probabilities of their having the same reading. 

1979 If a split_missing argument is supplied, then lacunae are handled in the same way. 

1980 

1981 Args: 

1982 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

1983 Default value is False. 

1984 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

1985 If not specified, then missing data is ignored (i.e., all states are 0). 

1986 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

1987 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

1988 

1989 Returns: 

1990 A NumPy IDF-weighted agreement matrix with a row and column for each witness. 

1991 A list of witness ID strings. 

1992 """ 

1993 # Populate a list of sites that will correspond to columns of the sequence alignment: 

1994 substantive_variation_unit_ids = self.variation_unit_ids 

1995 if drop_constant: 

1996 substantive_variation_unit_ids = [ 

1997 vu_id 

1998 for vu_id in self.variation_unit_ids 

1999 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2000 ] 

2001 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2002 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2003 # Initialize the output array with the appropriate dimensions: 

2004 witness_labels = [wit.id for wit in self.witnesses] 

2005 # For each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings: 

2006 support_proportions_by_unit = {} 

2007 for k, vu_id in enumerate(self.variation_unit_ids): 

2008 # Skip this variation unit if it is a dropped constant site: 

2009 if vu_id not in substantive_variation_unit_ids_set: 

2010 continue 

2011 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2012 for i, wit in enumerate(witness_labels): 

2013 rdg_support = self.readings_by_witness[wit][k] 

2014 for l, w in enumerate(rdg_support): 

2015 support_proportions[l] += w 

2016 norm = ( 

2017 sum(support_proportions) if sum(support_proportions) > 0 else 1.0 

2018 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2019 for l in range(len(support_proportions)): 

2020 support_proportions[l] = support_proportions[l] / norm 

2021 support_proportions_by_unit[vu_id] = support_proportions 

2022 # Then populate the matrix one variation unit at a time: 

2023 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float) 

2024 for k, vu_id in enumerate(self.variation_unit_ids): 

2025 # Skip this variation unit if it is a dropped constant site: 

2026 if vu_id not in substantive_variation_unit_ids_set: 

2027 continue 

2028 # Otherwise, calculate the sampling probabilities for each reading in this unit: 

2029 sampling_probabilities = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2030 rdg_support_by_witness = {} 

2031 for i, wit in enumerate(witness_labels): 

2032 rdg_support = self.readings_by_witness[wit][k] 

2033 # Check if this reading support vector represents missing data: 

2034 norm = sum(rdg_support) 

2035 if norm == 0: 

2036 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

2037 if split_missing == SplitMissingType.uniform: 

2038 rdg_support = [1 / len(rdg_support) for l in range(len(rdg_support))] 

2039 elif split_missing == SplitMissingType.proportional: 

2040 rdg_support = [support_proportions_by_unit[vu_id][l] for l in range(len(rdg_support))] 

2041 else: 

2042 # Otherwise, the data is present, though it may be ambiguous; normalize the reading probabilities to sum to 1: 

2043 rdg_support = [w / norm for l, w in enumerate(rdg_support)] 

2044 rdg_support_by_witness[wit] = rdg_support 

2045 # Then add this witness's contributions to the readings' sampling probabilities: 

2046 for l, w in enumerate(rdg_support_by_witness[wit]): 

2047 sampling_probabilities[l] += w 

2048 norm = ( 

2049 sum(sampling_probabilities) if sum(sampling_probabilities) > 0 else 1.0 

2050 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2051 # Otherwise, normalize the sampling probabilities so they sum to 1: 

2052 sampling_probabilities = [w / norm for w in sampling_probabilities] 

2053 # Then calculate the IDF weights for agreements between witnesses in this unit: 

2054 for i, wit_1 in enumerate(witness_labels): 

2055 for j, wit_2 in enumerate(witness_labels): 

2056 # The contribution to the weighted sum for these witnesses will be identical regardless of the order in which they are specified, 

2057 # so we only have to calculate it once: 

2058 if i > j: 

2059 continue 

2060 # First, calculate the probability that these two witnesses agree: 

2061 wit_1_rdg_support = rdg_support_by_witness[wit_1] 

2062 wit_2_rdg_support = rdg_support_by_witness[wit_2] 

2063 probability_of_agreement = sum( 

2064 [wit_1_rdg_support[l] * wit_2_rdg_support[l] for l in range(len(sampling_probabilities))] 

2065 ) 

2066 # If these witnesses do not agree at this variation unit, then this unit contributes nothing to their total score: 

2067 if probability_of_agreement == 0: 

2068 continue 

2069 # Otherwise, calculate the expected information content (in bits) of their agreement given their agreement on that reading 

2070 # (skipping readings with a sampling probability of 0): 

2071 expected_information_content = sum( 

2072 [ 

2073 -math.log2(sampling_probabilities[l]) 

2074 * (wit_1_rdg_support[l] * wit_2_rdg_support[l] / probability_of_agreement) 

2075 for l in range(len(sampling_probabilities)) 

2076 if sampling_probabilities[l] > 0.0 

2077 ] 

2078 ) 

2079 # Then add this contribution to the total score for these two witnesses: 

2080 matrix[i, j] += expected_information_content 

2081 matrix[j, i] += expected_information_content 

2082 return matrix, witness_labels 

2083 

2084 def to_mean_idf_matrix(self, drop_constant: bool = False, split_missing: SplitMissingType = None): 

2085 """Transforms this Collation into a NumPy matrix of average inverse document frequency (IDF) weights on agreements between witnesses, along with an array of its labels for the witnesses. 

2086 The IDF weight of an agreement on a given reading is the information content -log(Pr(R)) of the event R of randomly sampling a witness with that reading. 

2087 The IDF-weighted agreement score for two witnesses is the sum of the IDF weights for the readings at which they agree. 

2088 The average is taken over all variation units at which both witnesses have a non-zero reading support vector. 

2089 Where any witness is ambiguous, it contributes to its potential readings' sampling probabilities in proportion to its degrees of support for those readings. 

2090 Similarly, where one or both target witnesses are ambiguous, the expected information content of their agreement is calculated based on the probabilities of their having the same reading. 

2091 If a split_missing argument is supplied, then lacunae are handled in the same way. 

2092 This will improve the value of relationships involving more fragmentary witnesses,  

2093 but be warned that extremely fragmentary witnesses may have their values inflated and should probably be excluded from consideration. 

2094 

2095 Args: 

2096 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2097 Default value is False. 

2098 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2099 If not specified, then missing data is ignored (i.e., all states are 0). 

2100 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2101 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2102 

2103 Returns: 

2104 A NumPy IDF-weighted agreement matrix with a row and column for each witness. 

2105 A list of witness ID strings. 

2106 """ 

2107 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2108 substantive_variation_unit_ids = self.variation_unit_ids 

2109 if drop_constant: 

2110 substantive_variation_unit_ids = [ 

2111 vu_id 

2112 for vu_id in self.variation_unit_ids 

2113 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2114 ] 

2115 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2116 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2117 # Initialize the output array with the appropriate dimensions: 

2118 witness_labels = [wit.id for wit in self.witnesses] 

2119 # For each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings: 

2120 support_proportions_by_unit = {} 

2121 for k, vu_id in enumerate(self.variation_unit_ids): 

2122 # Skip this variation unit if it is a dropped constant site: 

2123 if vu_id not in substantive_variation_unit_ids_set: 

2124 continue 

2125 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2126 for i, wit in enumerate(witness_labels): 

2127 rdg_support = self.readings_by_witness[wit][k] 

2128 for l, w in enumerate(rdg_support): 

2129 support_proportions[l] += w 

2130 norm = ( 

2131 sum(support_proportions) if sum(support_proportions) > 0 else 1.0 

2132 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2133 for l in range(len(support_proportions)): 

2134 support_proportions[l] = support_proportions[l] / norm 

2135 support_proportions_by_unit[vu_id] = support_proportions 

2136 # Then populate the matrix one variation unit at a time: 

2137 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float) 

2138 nonzero_vu_count_matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float) 

2139 for k, vu_id in enumerate(self.variation_unit_ids): 

2140 # Skip this variation unit if it is a dropped constant site: 

2141 if vu_id not in substantive_variation_unit_ids_set: 

2142 continue 

2143 # Otherwise, calculate the sampling probabilities for each reading in this unit: 

2144 sampling_probabilities = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2145 rdg_support_by_witness = {} 

2146 for i, wit in enumerate(witness_labels): 

2147 rdg_support = self.readings_by_witness[wit][k] 

2148 # Check if this reading support vector represents missing data: 

2149 norm = sum(rdg_support) 

2150 if norm == 0: 

2151 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

2152 if split_missing == SplitMissingType.uniform: 

2153 rdg_support = [1 / len(rdg_support) for l in range(len(rdg_support))] 

2154 elif split_missing == SplitMissingType.proportional: 

2155 rdg_support = [support_proportions_by_unit[vu_id][l] for l in range(len(rdg_support))] 

2156 else: 

2157 # Otherwise, the data is present, though it may be ambiguous; normalize the reading probabilities to sum to 1: 

2158 rdg_support = [w / norm for l, w in enumerate(rdg_support)] 

2159 rdg_support_by_witness[wit] = rdg_support 

2160 # Then add this witness's contributions to the readings' sampling probabilities: 

2161 for l, w in enumerate(rdg_support_by_witness[wit]): 

2162 sampling_probabilities[l] += w 

2163 norm = ( 

2164 sum(sampling_probabilities) if sum(sampling_probabilities) > 0 else 1.0 

2165 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2166 # Otherwise, normalize the sampling probabilities so they sum to 1: 

2167 sampling_probabilities = [w / norm for w in sampling_probabilities] 

2168 # Then calculate the IDF weights for agreements between witnesses in this unit: 

2169 for i, wit_1 in enumerate(witness_labels): 

2170 for j, wit_2 in enumerate(witness_labels): 

2171 # The contribution to the weighted sum for these witnesses will be identical regardless of the order in which they are specified, 

2172 # so we only have to calculate it once: 

2173 if i > j: 

2174 continue 

2175 # First, get the witnesses' reading support vectors at this unit: 

2176 wit_1_rdg_support = rdg_support_by_witness[wit_1] 

2177 wit_2_rdg_support = rdg_support_by_witness[wit_2] 

2178 # If either witness has an all-zero reading support vector, then skip this variation unit: 

2179 if sum(wit_1_rdg_support) == 0 or sum(wit_2_rdg_support) == 0: 

2180 continue 

2181 # Otherwise, the witnesses both have non-zero vectors here; increment the count of the units where this happens: 

2182 nonzero_vu_count_matrix[i, j] += 1 

2183 nonzero_vu_count_matrix[j, i] += 1 

2184 # Then calculate the probability that these two witnesses agree: 

2185 probability_of_agreement = sum( 

2186 [wit_1_rdg_support[l] * wit_2_rdg_support[l] for l in range(len(sampling_probabilities))] 

2187 ) 

2188 # If these witnesses do not agree at this variation unit, then this unit contributes nothing to their total score: 

2189 if probability_of_agreement == 0: 

2190 continue 

2191 # Otherwise, calculate the expected information content (in bits) of their agreement given their agreement on that reading 

2192 # (skipping readings with a sampling probability of 0): 

2193 expected_information_content = sum( 

2194 [ 

2195 -math.log2(sampling_probabilities[l]) 

2196 * (wit_1_rdg_support[l] * wit_2_rdg_support[l] / probability_of_agreement) 

2197 for l in range(len(sampling_probabilities)) 

2198 if sampling_probabilities[l] > 0.0 

2199 ] 

2200 ) 

2201 # Then add this contribution to the total score for these two witnesses: 

2202 matrix[i, j] += expected_information_content 

2203 matrix[j, i] += expected_information_content 

2204 # Finally, obtain an average information content for each pair of witnesses: 

2205 for i, wit_1 in enumerate(witness_labels): 

2206 for j, wit_2 in enumerate(witness_labels): 

2207 matrix[i, j] = matrix[i, j] / nonzero_vu_count_matrix[i, j] if nonzero_vu_count_matrix[i, j] > 0 else 0 

2208 return matrix, witness_labels 

2209 

2210 def to_mi_matrix(self, drop_constant: bool = False, split_missing: SplitMissingType = None): 

2211 """Transforms this Collation into a NumPy matrix of the total mutual information (MI), in bits, between witnesses over all variation units, along with an array of its labels for the witnesses. 

2212 This is equivalent to the total Kullback-Leibler divergence of the joint distribution of the witnesses' observed readings  

2213 from the joint distribution of their expected readings under the assumption that the witnesses are independent, taken over all variation units. 

2214 The value of 0 if and only if the witnesses are completely independent. 

2215 

2216 Args: 

2217 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2218 Default value is False. 

2219 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2220 If not specified, then missing data is ignored (i.e., all states are 0). 

2221 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2222 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2223 

2224 Returns: 

2225 A NumPy MI matrix with a row and column for each witness. 

2226 A list of witness ID strings. 

2227 """ 

2228 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2229 substantive_variation_unit_ids = self.variation_unit_ids 

2230 if drop_constant: 

2231 substantive_variation_unit_ids = [ 

2232 vu_id 

2233 for vu_id in self.variation_unit_ids 

2234 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2235 ] 

2236 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2237 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2238 # Initialize the output array with the appropriate dimensions: 

2239 witness_labels = [wit.id for wit in self.witnesses] 

2240 # For each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings: 

2241 support_proportions_by_unit = {} 

2242 for k, vu_id in enumerate(self.variation_unit_ids): 

2243 # Skip this variation unit if it is a dropped constant site: 

2244 if vu_id not in substantive_variation_unit_ids_set: 

2245 continue 

2246 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2247 for i, wit in enumerate(witness_labels): 

2248 rdg_support = self.readings_by_witness[wit][k] 

2249 for l, w in enumerate(rdg_support): 

2250 support_proportions[l] += w 

2251 norm = ( 

2252 sum(support_proportions) if sum(support_proportions) > 0 else 1.0 

2253 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2254 for l in range(len(support_proportions)): 

2255 support_proportions[l] = support_proportions[l] / norm 

2256 support_proportions_by_unit[vu_id] = support_proportions 

2257 # Then populate the matrix one variation unit at a time: 

2258 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float) 

2259 for k, vu_id in enumerate(self.variation_unit_ids): 

2260 # Skip this variation unit if it is a dropped constant site: 

2261 if vu_id not in substantive_variation_unit_ids_set: 

2262 continue 

2263 # Otherwise, calculate the sampling probabilities for each reading in this unit: 

2264 sampling_probabilities = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2265 rdg_support_by_witness = {} 

2266 for i, wit in enumerate(witness_labels): 

2267 rdg_support = self.readings_by_witness[wit][k] 

2268 # Check if this reading support vector represents missing data: 

2269 norm = sum(rdg_support) 

2270 if norm == 0: 

2271 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

2272 if split_missing == SplitMissingType.uniform: 

2273 rdg_support = [1 / len(rdg_support) for l in range(len(rdg_support))] 

2274 elif split_missing == SplitMissingType.proportional: 

2275 rdg_support = [support_proportions_by_unit[vu_id][l] for l in range(len(rdg_support))] 

2276 else: 

2277 # Otherwise, the data is present, though it may be ambiguous; normalize the reading probabilities to sum to 1: 

2278 rdg_support = [w / norm for l, w in enumerate(rdg_support)] 

2279 rdg_support_by_witness[wit] = rdg_support 

2280 # Then add this witness's contributions to the readings' sampling probabilities: 

2281 for l, w in enumerate(rdg_support_by_witness[wit]): 

2282 sampling_probabilities[l] += w 

2283 norm = ( 

2284 sum(sampling_probabilities) if sum(sampling_probabilities) > 0 else 1.0 

2285 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2286 # Otherwise, normalize the sampling probabilities so they sum to 1: 

2287 sampling_probabilities = [w / norm for w in sampling_probabilities] 

2288 # Then populate a contingency table for the expected probabilities of joint support in this unit: 

2289 expected_joint_probabilities = np.full((len(sampling_probabilities), len(sampling_probabilities)), 0, dtype=float) 

2290 for l1 in range(len(sampling_probabilities)): 

2291 for l2 in range(len(sampling_probabilities)): 

2292 expected_joint_probabilities[l1, l2] = sampling_probabilities[l1]*sampling_probabilities[l2] 

2293 # Then calculate the mutual information contribution for each pair of witnesses: 

2294 for i, wit_1 in enumerate(witness_labels): 

2295 for j, wit_2 in enumerate(witness_labels): 

2296 # The mutual information for these witnesses at this unit will be identical regardless of the order in which they are specified, 

2297 # so we only have to calculate it once: 

2298 if i > j: 

2299 continue 

2300 wit_1_rdg_support = rdg_support_by_witness[wit_1] 

2301 wit_2_rdg_support = rdg_support_by_witness[wit_2] 

2302 # If either witness has an all-zeroes vector (because it is lacunose in this unit), then we can skip these witnesses here: 

2303 if sum(wit_1_rdg_support) == 0 or sum(wit_2_rdg_support) == 0: 

2304 continue 

2305 # Otherwise, populate a contingency table for the observed probabilities of joint support in this unit: 

2306 observed_joint_probabilities = np.full((len(sampling_probabilities), len(sampling_probabilities)), 0, dtype=float) 

2307 for l1, w1 in enumerate(rdg_support_by_witness[wit_1]): 

2308 for l2, w2 in enumerate(rdg_support_by_witness[wit_2]): 

2309 observed_joint_probabilities[l1, l2] = w1 * w2 

2310 # Then calculate the mutual information using the expected and observed distribution matrices: 

2311 mutual_information = 0.0 

2312 for l1 in range(len(sampling_probabilities)): 

2313 for l2 in range(len(sampling_probabilities)): 

2314 observed = observed_joint_probabilities[l1, l2] 

2315 expected = expected_joint_probabilities[l1, l2] 

2316 if observed == 0: 

2317 continue 

2318 mutual_information += observed * math.log2(observed / expected) 

2319 # Then add this mutual information to the total for these two witnesses: 

2320 matrix[i, j] += mutual_information 

2321 matrix[j, i] += mutual_information 

2322 return matrix, witness_labels 

2323 

2324 def to_mean_mi_matrix(self, drop_constant: bool = False, split_missing: SplitMissingType = None): 

2325 """Transforms this Collation into a NumPy matrix of the average mutual information (MI), in bits, between witnesses over all variation units, along with an array of its labels for the witnesses. 

2326 This is equivalent to the average Kullback-Leibler divergence of the joint distribution of the witnesses' observed readings  

2327 from the joint distribution of their expected readings under the assumption that the witnesses are independent, taken over all variation units. 

2328 The value of 0 if and only if the witnesses are completely independent. 

2329 This will improve the value of relationships involving more fragmentary witnesses,  

2330 but be warned that extremely fragmentary witnesses may have their values inflated and should probably be excluded from consideration. 

2331 

2332 Args: 

2333 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2334 Default value is False. 

2335 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2336 If not specified, then missing data is ignored (i.e., all states are 0). 

2337 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2338 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2339 

2340 Returns: 

2341 A NumPy MI matrix with a row and column for each witness. 

2342 A list of witness ID strings. 

2343 """ 

2344 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2345 substantive_variation_unit_ids = self.variation_unit_ids 

2346 if drop_constant: 

2347 substantive_variation_unit_ids = [ 

2348 vu_id 

2349 for vu_id in self.variation_unit_ids 

2350 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2351 ] 

2352 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2353 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2354 # Initialize the output array with the appropriate dimensions: 

2355 witness_labels = [wit.id for wit in self.witnesses] 

2356 # For each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings: 

2357 support_proportions_by_unit = {} 

2358 for k, vu_id in enumerate(self.variation_unit_ids): 

2359 # Skip this variation unit if it is a dropped constant site: 

2360 if vu_id not in substantive_variation_unit_ids_set: 

2361 continue 

2362 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2363 for i, wit in enumerate(witness_labels): 

2364 rdg_support = self.readings_by_witness[wit][k] 

2365 for l, w in enumerate(rdg_support): 

2366 support_proportions[l] += w 

2367 norm = ( 

2368 sum(support_proportions) if sum(support_proportions) > 0 else 1.0 

2369 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2370 for l in range(len(support_proportions)): 

2371 support_proportions[l] = support_proportions[l] / norm 

2372 support_proportions_by_unit[vu_id] = support_proportions 

2373 # Then populate the matrix one variation unit at a time: 

2374 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float) 

2375 nonzero_vu_count_matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float) 

2376 for k, vu_id in enumerate(self.variation_unit_ids): 

2377 # Skip this variation unit if it is a dropped constant site: 

2378 if vu_id not in substantive_variation_unit_ids_set: 

2379 continue 

2380 # Otherwise, calculate the sampling probabilities for each reading in this unit: 

2381 sampling_probabilities = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id]) 

2382 rdg_support_by_witness = {} 

2383 for i, wit in enumerate(witness_labels): 

2384 rdg_support = self.readings_by_witness[wit][k] 

2385 # Check if this reading support vector represents missing data: 

2386 norm = sum(rdg_support) 

2387 if norm == 0: 

2388 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

2389 if split_missing == SplitMissingType.uniform: 

2390 rdg_support = [1 / len(rdg_support) for l in range(len(rdg_support))] 

2391 elif split_missing == SplitMissingType.proportional: 

2392 rdg_support = [support_proportions_by_unit[vu_id][l] for l in range(len(rdg_support))] 

2393 else: 

2394 # Otherwise, the data is present, though it may be ambiguous; normalize the reading probabilities to sum to 1: 

2395 rdg_support = [w / norm for l, w in enumerate(rdg_support)] 

2396 rdg_support_by_witness[wit] = rdg_support 

2397 # Then add this witness's contributions to the readings' sampling probabilities: 

2398 for l, w in enumerate(rdg_support_by_witness[wit]): 

2399 sampling_probabilities[l] += w 

2400 norm = ( 

2401 sum(sampling_probabilities) if sum(sampling_probabilities) > 0 else 1.0 

2402 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero 

2403 # Otherwise, normalize the sampling probabilities so they sum to 1: 

2404 sampling_probabilities = [w / norm for w in sampling_probabilities] 

2405 # Then populate a contingency table for the expected probabilities of joint support in this unit: 

2406 expected_joint_probabilities = np.full((len(sampling_probabilities), len(sampling_probabilities)), 0, dtype=float) 

2407 for l1 in range(len(sampling_probabilities)): 

2408 for l2 in range(len(sampling_probabilities)): 

2409 expected_joint_probabilities[l1, l2] = sampling_probabilities[l1]*sampling_probabilities[l2] 

2410 # Then calculate the mutual information contribution for each pair of witnesses: 

2411 for i, wit_1 in enumerate(witness_labels): 

2412 for j, wit_2 in enumerate(witness_labels): 

2413 # The mutual information for these witnesses at this unit will be identical regardless of the order in which they are specified, 

2414 # so we only have to calculate it once: 

2415 if i > j: 

2416 continue 

2417 wit_1_rdg_support = rdg_support_by_witness[wit_1] 

2418 wit_2_rdg_support = rdg_support_by_witness[wit_2] 

2419 # If either witness has an all-zeroes vector (because it is lacunose in this unit), then we can skip these witnesses here: 

2420 if sum(wit_1_rdg_support) == 0 or sum(wit_2_rdg_support) == 0: 

2421 continue 

2422 # Otherwise, populate a contingency table for the observed probabilities of joint support in this unit: 

2423 observed_joint_probabilities = np.full((len(sampling_probabilities), len(sampling_probabilities)), 0, dtype=float) 

2424 for l1, w1 in enumerate(rdg_support_by_witness[wit_1]): 

2425 for l2, w2 in enumerate(rdg_support_by_witness[wit_2]): 

2426 observed_joint_probabilities[l1, l2] = w1 * w2 

2427 # Then calculate the mutual information using the expected and observed distribution matrices: 

2428 mutual_information = 0.0 

2429 for l1 in range(len(sampling_probabilities)): 

2430 for l2 in range(len(sampling_probabilities)): 

2431 observed = observed_joint_probabilities[l1, l2] 

2432 expected = expected_joint_probabilities[l1, l2] 

2433 if observed == 0: 

2434 continue 

2435 mutual_information += observed * math.log2(observed / expected) 

2436 # Then add this mutual information to the total for these two witnesses, and increment the count of variation units where both have non-zero reading support vectors: 

2437 matrix[i, j] += mutual_information 

2438 matrix[j, i] += mutual_information 

2439 nonzero_vu_count_matrix[i, j] += 1 

2440 nonzero_vu_count_matrix[j, i] += 1 

2441 # Finally, obtain an average mutual information for each pair of witnesses: 

2442 for i, wit_1 in enumerate(witness_labels): 

2443 for j, wit_2 in enumerate(witness_labels): 

2444 matrix[i, j] = matrix[i, j] / nonzero_vu_count_matrix[i, j] if nonzero_vu_count_matrix[i, j] > 0 else 0 

2445 return matrix, witness_labels 

2446 

2447 def to_nexus_table(self, drop_constant: bool = False, ambiguous_as_missing: bool = False): 

2448 """Returns this Collation in the form of a table with rows for taxa, columns for characters, and reading IDs in cells. 

2449 

2450 Args: 

2451 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2452 Default value is False. 

2453 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data. 

2454 Default value is False. 

2455 

2456 Returns: 

2457 A NumPy array with rows for taxa, columns for characters, and reading IDs in cells. 

2458 A list of substantive reading ID strings. 

2459 A list of witness ID strings. 

2460 """ 

2461 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2462 substantive_variation_unit_ids = self.variation_unit_ids 

2463 if drop_constant: 

2464 substantive_variation_unit_ids = [ 

2465 vu_id 

2466 for vu_id in self.variation_unit_ids 

2467 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2468 ] 

2469 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2470 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2471 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary 

2472 # to the readings' IDs: 

2473 reading_ids_by_indices = {} 

2474 for j, vu in enumerate(self.variation_units): 

2475 if vu.id not in substantive_variation_unit_ids_set: 

2476 continue 

2477 k = 0 

2478 for rdg in vu.readings: 

2479 key = tuple([vu.id, rdg.id]) 

2480 if key not in substantive_variation_unit_reading_tuples_set: 

2481 continue 

2482 indices = tuple([j, k]) 

2483 reading_ids_by_indices[indices] = rdg.id 

2484 k += 1 

2485 # Initialize the output array with the appropriate dimensions: 

2486 missing_symbol = '?' 

2487 witness_labels = [wit.id for wit in self.witnesses] 

2488 matrix = np.full( 

2489 (len(witness_labels), len(substantive_variation_unit_ids)), missing_symbol, dtype=object 

2490 ) # use dtype=object because the maximum string length is not known up front 

2491 # Then populate it with the appropriate values: 

2492 row_ind = 0 

2493 for i, wit in enumerate(self.witnesses): 

2494 col_ind = 0 

2495 for j, vu in enumerate(self.variation_units): 

2496 if vu.id not in substantive_variation_unit_ids_set: 

2497 continue 

2498 rdg_support = self.readings_by_witness[wit.id][j] 

2499 # If this reading support vector sums to 0, then this is missing data; handle it as specified: 

2500 if sum(rdg_support) == 0: 

2501 matrix[row_ind, col_ind] = missing_symbol 

2502 # Otherwise, add its coefficients normally: 

2503 else: 

2504 rdg_inds = [ 

2505 k for k, w in enumerate(rdg_support) if w > 0 

2506 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them 

2507 # For singleton readings, just print the reading ID: 

2508 if len(rdg_inds) == 1: 

2509 k = rdg_inds[0] 

2510 matrix[row_ind, col_ind] = reading_ids_by_indices[(j, k)] 

2511 # For multiple readings, print the corresponding reading IDs in braces or the missing symbol depending on input settings: 

2512 else: 

2513 if ambiguous_as_missing: 

2514 matrix[row_ind, col_ind] = missing_symbol 

2515 else: 

2516 matrix[row_ind, col_ind] = "{%s}" % " ".join( 

2517 [reading_ids_by_indices[(j, k)] for k in rdg_inds] 

2518 ) 

2519 col_ind += 1 

2520 row_ind += 1 

2521 return matrix, witness_labels, substantive_variation_unit_ids 

2522 

2523 def to_long_table(self, drop_constant: bool = False): 

2524 """Returns this Collation in the form of a long table with columns for taxa, characters, reading indices, and reading values. 

2525 Note that this method treats ambiguous readings as missing data. 

2526 

2527 Args: 

2528 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2529 Default value is False. 

2530 

2531 Returns: 

2532 A NumPy array with columns for taxa, characters, reading indices, and reading values, and rows for each combination of these values in the matrix. 

2533 A list of column label strings. 

2534 """ 

2535 # Populate a list of sites that will correspond to columns of the sequence alignment: 

2536 substantive_variation_unit_ids = self.variation_unit_ids 

2537 if drop_constant: 

2538 substantive_variation_unit_ids = [ 

2539 vu_id 

2540 for vu_id in self.variation_unit_ids 

2541 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2542 ] 

2543 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2544 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2545 # Initialize the outputs: 

2546 column_labels = ["taxon", "character", "state", "value"] 

2547 long_table_list = [] 

2548 # Populate a dictionary mapping (variation unit index, reading index) tuples to reading texts: 

2549 reading_texts_by_indices = {} 

2550 for j, vu in enumerate(self.variation_units): 

2551 if vu.id not in substantive_variation_unit_ids_set: 

2552 continue 

2553 k = 0 

2554 for rdg in vu.readings: 

2555 key = tuple([vu.id, rdg.id]) 

2556 if key not in substantive_variation_unit_reading_tuples_set: 

2557 continue 

2558 indices = tuple([j, k]) 

2559 reading_texts_by_indices[indices] = rdg.text 

2560 k += 1 

2561 # Then populate the output list with the appropriate values: 

2562 witness_labels = [wit.id for wit in self.witnesses] 

2563 missing_symbol = '?' 

2564 for i, wit in enumerate(self.witnesses): 

2565 row_ind = 0 

2566 for j, vu_id in enumerate(self.variation_unit_ids): 

2567 if vu_id not in substantive_variation_unit_ids_set: 

2568 continue 

2569 rdg_support = self.readings_by_witness[wit.id][j] 

2570 # Populate a list of nonzero coefficients for this reading support vector: 

2571 rdg_inds = [k for k, w in enumerate(rdg_support) if w > 0] 

2572 # If this list does not consist of exactly one reading, then treat it as missing data: 

2573 if len(rdg_inds) != 1: 

2574 long_table_list.append([wit.id, vu_id, missing_symbol, missing_symbol]) 

2575 continue 

2576 k = rdg_inds[0] 

2577 rdg_text = reading_texts_by_indices[(j, k)] 

2578 # Replace empty reading texts with the omission placeholder: 

2579 if rdg_text == "": 

2580 rdg_text = "om." 

2581 long_table_list.append([wit.id, vu_id, k, rdg_text]) 

2582 # Then convert the long table entries list to a NumPy array: 

2583 long_table = np.array(long_table_list) 

2584 return long_table, column_labels 

2585 

2586 def to_dataframe( 

2587 self, 

2588 drop_constant: bool = False, 

2589 ambiguous_as_missing: bool = False, 

2590 proportion: bool = False, 

2591 table_type: TableType = TableType.matrix, 

2592 split_missing: SplitMissingType = None, 

2593 show_ext: bool = False, 

2594 ): 

2595 """Returns this Collation in the form of a Pandas DataFrame array, including the appropriate row and column labels. 

2596 

2597 Args: 

2598 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

2599 Default value is False. 

2600 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data. 

2601 Default value is False. 

2602 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2603 Default value is False. 

2604 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate. 

2605 Only applicable for tabular outputs. 

2606 Default value is "matrix". 

2607 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2608 If not specified, then missing data is ignored (i.e., all states are 0). 

2609 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2610 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2611 Only applicable for table types "matrix" and "idf". 

2612 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2613 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2614 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2615 Default value is False. 

2616 

2617 Returns: 

2618 A Pandas DataFrame corresponding to a collation matrix with reading frequencies or a long table with discrete reading states. 

2619 """ 

2620 df = None 

2621 # Proceed based on the table type: 

2622 if table_type == TableType.matrix: 

2623 # Convert the collation to a NumPy array and get its row and column labels first: 

2624 matrix, reading_labels, witness_labels = self.to_numpy( 

2625 drop_constant=drop_constant, split_missing=split_missing 

2626 ) 

2627 df = pd.DataFrame(matrix, index=reading_labels, columns=witness_labels) 

2628 elif table_type == TableType.distance: 

2629 # Convert the collation to a NumPy array and get its row and column labels first: 

2630 matrix, witness_labels = self.to_distance_matrix( 

2631 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2632 ) 

2633 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2634 elif table_type == TableType.similarity: 

2635 # Convert the collation to a NumPy array and get its row and column labels first: 

2636 matrix, witness_labels = self.to_similarity_matrix( 

2637 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2638 ) 

2639 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2640 elif table_type == TableType.idf: 

2641 # Convert the collation to a NumPy array and get its row and column labels first: 

2642 matrix, witness_labels = self.to_idf_matrix(drop_constant=drop_constant, split_missing=split_missing) 

2643 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2644 elif table_type == TableType.mean_idf: 

2645 # Convert the collation to a NumPy array and get its row and column labels first: 

2646 matrix, witness_labels = self.to_mean_idf_matrix(drop_constant=drop_constant, split_missing=split_missing) 

2647 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2648 elif table_type == TableType.mi: 

2649 # Convert the collation to a NumPy array and get its row and column labels first: 

2650 matrix, witness_labels = self.to_mi_matrix(drop_constant=drop_constant, split_missing=split_missing) 

2651 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2652 elif table_type == TableType.mean_mi: 

2653 # Convert the collation to a NumPy array and get its row and column labels first: 

2654 matrix, witness_labels = self.to_mean_mi_matrix(drop_constant=drop_constant, split_missing=split_missing) 

2655 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels) 

2656 elif table_type == TableType.nexus: 

2657 # Convert the collation to a NumPy array and get its row and column labels first: 

2658 matrix, witness_labels, vu_labels = self.to_nexus_table( 

2659 drop_constant=drop_constant, ambiguous_as_missing=ambiguous_as_missing 

2660 ) 

2661 df = pd.DataFrame(matrix, index=witness_labels, columns=vu_labels) 

2662 elif table_type == TableType.long: 

2663 # Convert the collation to a long table and get its column labels first: 

2664 long_table, column_labels = self.to_long_table(drop_constant=drop_constant) 

2665 df = pd.DataFrame(long_table, columns=column_labels) 

2666 return df 

2667 

2668 def to_csv( 

2669 self, 

2670 file_addr: Union[Path, str], 

2671 drop_constant: bool = False, 

2672 ambiguous_as_missing: bool = False, 

2673 proportion: bool = False, 

2674 table_type: TableType = TableType.matrix, 

2675 split_missing: SplitMissingType = None, 

2676 show_ext: bool = False, 

2677 **kwargs 

2678 ): 

2679 """Writes this Collation to a comma-separated value (CSV) file with the given address. 

2680 

2681 If your witness IDs are numeric (e.g., Gregory-Aland numbers), then they will be written in full to the CSV file, but Excel will likely interpret them as numbers and truncate any leading zeroes! 

2682 

2683 Args: 

2684 file_addr: A string representing the path to an output CSV file; the file type should be .csv. 

2685 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2686 Default value is False. 

2687 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data. 

2688 Default value is False. 

2689 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2690 Default value is False. 

2691 table_type: A TableType option indicating which type of tabular output to generate. 

2692 Only applicable for tabular outputs. 

2693 Default value is "matrix". 

2694 split_missing: An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2695 If not specified, then missing data is ignored (i.e., all states are 0). 

2696 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2697 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2698 Only applicable for table types "matrix" and "idf". 

2699 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2700 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2701 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2702 Default value is False. 

2703 **kwargs: Keyword arguments for pandas.DataFrame.to_csv. 

2704 """ 

2705 # Convert the collation to a Pandas DataFrame first: 

2706 df = self.to_dataframe( 

2707 drop_constant=drop_constant, 

2708 ambiguous_as_missing=ambiguous_as_missing, 

2709 proportion=proportion, 

2710 table_type=table_type, 

2711 split_missing=split_missing, 

2712 show_ext=show_ext, 

2713 ) 

2714 # Generate all parent folders for this file that don't already exist: 

2715 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

2716 # Proceed based on the table type: 

2717 if table_type == TableType.long: 

2718 return df.to_csv( 

2719 file_addr, encoding="utf-8-sig", index=False, **kwargs 

2720 ) # add BOM to start of file so that Excel will know to read it as Unicode 

2721 return df.to_csv( 

2722 file_addr, encoding="utf-8-sig", **kwargs 

2723 ) # add BOM to start of file so that Excel will know to read it as Unicode 

2724 

2725 def to_excel( 

2726 self, 

2727 file_addr: Union[Path, str], 

2728 drop_constant: bool = False, 

2729 ambiguous_as_missing: bool = False, 

2730 proportion: bool = False, 

2731 table_type: TableType = TableType.matrix, 

2732 split_missing: SplitMissingType = None, 

2733 show_ext: bool = False, 

2734 ): 

2735 """Writes this Collation to an Excel (.xlsx) file with the given address. 

2736 

2737 Since Pandas is deprecating its support for xlwt, specifying an output in old Excel (.xls) output is not recommended. 

2738 

2739 Args: 

2740 file_addr: A string representing the path to an output Excel file; the file type should be .xlsx. 

2741 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2742 Default value is False. 

2743 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data. 

2744 Default value is False. 

2745 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2746 Default value is False. 

2747 table_type: A TableType option indicating which type of tabular output to generate. 

2748 Only applicable for tabular outputs. 

2749 Default value is "matrix". 

2750 split_missing: An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

2751 If not specified, then missing data is ignored (i.e., all states are 0). 

2752 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

2753 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

2754 Only applicable for table types "matrix" and "idf". 

2755 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2756 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2757 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2758 Default value is False. 

2759 """ 

2760 # Convert the collation to a Pandas DataFrame first: 

2761 df = self.to_dataframe( 

2762 drop_constant=drop_constant, 

2763 ambiguous_as_missing=ambiguous_as_missing, 

2764 proportion=proportion, 

2765 table_type=table_type, 

2766 split_missing=split_missing, 

2767 show_ext=show_ext, 

2768 ) 

2769 # Generate all parent folders for this file that don't already exist: 

2770 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

2771 # Proceed based on the table type: 

2772 if table_type == TableType.long: 

2773 return df.to_excel(file_addr, index=False) 

2774 return df.to_excel(file_addr) 

2775 

2776 def to_phylip_matrix( 

2777 self, 

2778 file_addr: Union[Path, str], 

2779 drop_constant: bool = False, 

2780 proportion: bool = False, 

2781 table_type: TableType = TableType.distance, 

2782 show_ext: bool = False, 

2783 ): 

2784 """Writes this Collation as a PHYLIP-formatted distance/similarity matrix to the file with the given address. 

2785 

2786 Args: 

2787 file_addr: A string representing the path to an output PHYLIP file; the file type should be .ph or .phy. 

2788 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2789 Default value is False. 

2790 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units. 

2791 Default value is False. 

2792 table_type: A TableType option indicating which type of tabular output to generate. 

2793 For PHYLIP-formatted outputs, distance and similarity matrices are the only supported table types. 

2794 Default value is "distance". 

2795 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix 

2796 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements. 

2797 Only applicable for tabular output formats of type \"distance\" or \"similarity\". 

2798 Default value is False. 

2799 """ 

2800 # Convert the collation to a Pandas DataFrame first: 

2801 matrix = None 

2802 witness_labels = [] 

2803 # Proceed based on the table type: 

2804 if table_type == TableType.distance: 

2805 # Convert the collation to a NumPy array and get its row and column labels first: 

2806 matrix, witness_labels = self.to_distance_matrix( 

2807 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2808 ) 

2809 elif table_type == TableType.similarity: 

2810 # Convert the collation to a NumPy array and get its row and column labels first: 

2811 matrix, witness_labels = self.to_similarity_matrix( 

2812 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext 

2813 ) 

2814 # Generate all parent folders for this file that don't already exist: 

2815 Path(file_addr).parent.mkdir(parents=True, exist_ok=True) 

2816 with open(file_addr, "w", encoding="utf-8") as f: 

2817 # The first line contains the number of taxa: 

2818 f.write("%d\n" % len(witness_labels)) 

2819 # Every subsequent line contains a witness label, followed by the values in its row of the matrix: 

2820 for i, wit_id in enumerate(witness_labels): 

2821 wit_label = slugify(wit_id, lowercase=False, allow_unicode=True, separator='_') 

2822 f.write("%s %s\n" % (wit_label, " ".join([str(v) for v in matrix[i]]))) 

2823 return 

2824 

2825 def get_stemma_symbols(self): 

2826 """Returns a list of one-character symbols needed to represent the states of all substantive readings in stemma format. 

2827 

2828 The number of symbols equals the maximum number of substantive readings at any variation unit. 

2829 

2830 Returns: 

2831 A list of individual characters representing states in readings. 

2832 """ 

2833 possible_symbols = ( 

2834 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase) 

2835 ) # NOTE: the maximum number of symbols allowed in stemma format (other than "?" and "-") is 62 

2836 # The number of symbols needed is equal to the length of the longest substantive reading vector: 

2837 nsymbols = 0 

2838 # If there are no witnesses, then no symbols are needed at all: 

2839 if len(self.witnesses) == 0: 

2840 return [] 

2841 wit_id = self.witnesses[0].id 

2842 for rdg_support in self.readings_by_witness[wit_id]: 

2843 nsymbols = max(nsymbols, len(rdg_support)) 

2844 stemma_symbols = possible_symbols[:nsymbols] 

2845 return stemma_symbols 

2846 

2847 def to_stemma(self, file_addr: Union[Path, str]): 

2848 """Writes this Collation to a stemma file without an extension and a Chron file (containing low, middle, and high dates for all witnesses) without an extension. 

2849 

2850 Since this format does not support ambiguous states, all reading vectors with anything other than one nonzero entry will be interpreted as lacunose. 

2851 If an interpGrp for weights is specified in the TEI XML collation, then the weights for the interp elements will be used as weights 

2852 for the variation units that specify them in their ana attribute. 

2853 

2854 Args: 

2855 file_addr: A string representing the path to an output stemma prep file; the file should have no extension. 

2856 The accompanying chron file will match this file name, except that it will have "_chron" appended to the end. 

2857 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading. 

2858 """ 

2859 # Populate a list of sites that will correspond to columns of the sequence alignment 

2860 # (by default, constant sites are dropped): 

2861 substantive_variation_unit_ids = [ 

2862 vu_id for vu_id in self.variation_unit_ids if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1 

2863 ] 

2864 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids) 

2865 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples) 

2866 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary 

2867 # to the readings' texts: 

2868 reading_texts_by_indices = {} 

2869 for j, vu in enumerate(self.variation_units): 

2870 if vu.id not in substantive_variation_unit_ids_set: 

2871 continue 

2872 k = 0 

2873 for rdg in vu.readings: 

2874 key = tuple([vu.id, rdg.id]) 

2875 if key not in substantive_variation_unit_reading_tuples_set: 

2876 continue 

2877 indices = tuple([j, k]) 

2878 reading_texts_by_indices[indices] = rdg.text 

2879 k += 1 

2880 # In a second pass, populate another dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary 

2881 # to the witnesses exclusively supporting those readings: 

2882 reading_wits_by_indices = {} 

2883 for indices in reading_texts_by_indices: 

2884 reading_wits_by_indices[indices] = [] 

2885 for i, wit in enumerate(self.witnesses): 

2886 for j, vu_id in enumerate(self.variation_unit_ids): 

2887 if vu_id not in substantive_variation_unit_ids_set: 

2888 continue 

2889 rdg_support = self.readings_by_witness[wit.id][j] 

2890 # If this witness does not exclusively support exactly one reading at this unit, then treat it as lacunose: 

2891 if len([k for k, w in enumerate(rdg_support) if w > 0]) != 1: 

2892 continue 

2893 k = rdg_support.index(1) 

2894 indices = tuple([j, k]) 

2895 reading_wits_by_indices[indices].append(wit.id) 

2896 # In a third pass, write to the stemma file: 

2897 symbols = self.get_stemma_symbols() 

2898 Path(file_addr).parent.mkdir( 

2899 parents=True, exist_ok=True 

2900 ) # generate all parent folders for this file that don't already exist 

2901 chron_file_addr = str(file_addr) + "_chron" 

2902 with open(file_addr, "w", encoding="utf-8") as f: 

2903 # Start with the witness list: 

2904 f.write( 

2905 "* %s ;\n\n" 

2906 % " ".join( 

2907 [slugify(wit.id, lowercase=False, allow_unicode=True, separator='_') for wit in self.witnesses] 

2908 ) 

2909 ) 

2910 # f.write("^ %s\n\n" % chron_file_addr) #write the relative path to the chron file 

2911 f.write( 

2912 "^ %s\n\n" % ("." + os.sep + Path(chron_file_addr).name) 

2913 ) # write the relative path to the chron file 

2914 # Then add a line indicating that all witnesses are lacunose unless they are specified explicitly: 

2915 f.write("= $? $* ;\n\n") 

2916 # Then proceed for each variation unit: 

2917 for j, vu_id in enumerate(self.variation_unit_ids): 

2918 if vu_id not in substantive_variation_unit_ids_set: 

2919 continue 

2920 # Print the variation unit ID first: 

2921 f.write("@ %s\n" % vu_id) 

2922 # In a first pass, print the texts of all readings enclosed in brackets: 

2923 f.write("[ ") 

2924 k = 0 

2925 while True: 

2926 indices = tuple([j, k]) 

2927 if indices not in reading_texts_by_indices: 

2928 break 

2929 text = slugify( 

2930 reading_texts_by_indices[indices], lowercase=False, allow_unicode=True, separator='.' 

2931 ) 

2932 # Denote omissions by en-dashes: 

2933 if text == "": 

2934 text = "\u2013" 

2935 # The first reading should not be preceded by anything: 

2936 if k == 0: 

2937 f.write(text) 

2938 f.write(" |") 

2939 # Add the weight of this variation unit after the pipe by comparing its analysis categories to their weights: 

2940 weight = 1 

2941 vu = self.variation_units[j] 

2942 if len(vu.analysis_categories) > 0: 

2943 weight = int( 

2944 sum( 

2945 [ 

2946 self.weights_by_id[ana] if ana in self.weights_by_id else 1 

2947 for ana in vu.analysis_categories 

2948 ] 

2949 ) 

2950 / len(vu.analysis_categories) 

2951 ) 

2952 f.write("*%d" % weight) 

2953 # Every subsequent reading should be preceded by a space: 

2954 elif k > 0: 

2955 f.write(" %s" % text) 

2956 k += 1 

2957 f.write(" ]\n") 

2958 # In a second pass, print the indices and witnesses for all readings enclosed in angle brackets: 

2959 k = 0 

2960 f.write("\t< ") 

2961 while True: 

2962 indices = tuple([j, k]) 

2963 if indices not in reading_wits_by_indices: 

2964 break 

2965 rdg_symbol = symbols[k] # get the one-character alphanumeric code for this state 

2966 wits = " ".join(reading_wits_by_indices[indices]) 

2967 # Open the variant reading support block with an angle bracket: 

2968 if k == 0: 

2969 f.write("%s %s" % (rdg_symbol, wits)) 

2970 # Open all subsequent variant reading support blocks with pipes on the next line: 

2971 else: 

2972 f.write("\n\t| %s %s" % (rdg_symbol, wits)) 

2973 k += 1 

2974 f.write(" >\n") 

2975 # In a fourth pass, write to the chron file: 

2976 max_id_length = max( 

2977 [len(slugify(wit.id, lowercase=False, allow_unicode=True, separator='_')) for wit in self.witnesses] 

2978 ) 

2979 max_date_length = 0 

2980 for wit in self.witnesses: 

2981 if wit.date_range[0] is not None: 

2982 max_date_length = max(max_date_length, len(str(wit.date_range[0]))) 

2983 if wit.date_range[1] is not None: 

2984 max_date_length = max(max_date_length, len(str(wit.date_range[1]))) 

2985 # Attempt to get the minimum and maximum dates for witnesses; if we can't do this, then don't write a chron file: 

2986 min_date = None 

2987 max_date = None 

2988 try: 

2989 min_date = min([wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None]) 

2990 max_date = max([wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None]) 

2991 except Exception as e: 

2992 print("WARNING: no witnesses have date ranges; no chron file will be written!") 

2993 return 

2994 with open(chron_file_addr, "w", encoding="utf-8") as f: 

2995 for wit in self.witnesses: 

2996 wit_label = slugify(wit.id, lowercase=False, allow_unicode=True, separator='_') 

2997 f.write(wit_label) 

2998 f.write(" " * (max_id_length - len(wit.id) + 1)) 

2999 # If either the lower bound on this witness's date is empty, then use the min and max dates over all witnesses as defaults: 

3000 date_range = wit.date_range 

3001 if date_range[0] is None: 

3002 date_range = tuple([min_date, date_range[1]]) 

3003 # Then write the date range minimum, average, and maximum to the chron file: 

3004 low_date = str(date_range[0]) 

3005 f.write(" " * (max_date_length - len(low_date) + 2)) 

3006 f.write(low_date) 

3007 avg_date = str(int(((date_range[0] + date_range[1]) / 2))) 

3008 f.write(" " * (max_date_length - len(str(avg_date)) + 2)) 

3009 f.write(avg_date) 

3010 high_date = str(date_range[1]) 

3011 f.write(" " * (max_date_length - len(high_date) + 2)) 

3012 f.write(high_date) 

3013 f.write("\n") 

3014 return 

3015 

3016 def to_file( 

3017 self, 

3018 file_addr: Union[Path, str], 

3019 format: Format = None, 

3020 drop_constant: bool = False, 

3021 split_missing: SplitMissingType = None, 

3022 char_state_labels: bool = True, 

3023 frequency: bool = False, 

3024 ambiguous_as_missing: bool = False, 

3025 proportion: bool = False, 

3026 calibrate_dates: bool = False, 

3027 mrbayes: bool = False, 

3028 clock_model: ClockModel = ClockModel.strict, 

3029 ancestral_logger: AncestralLogger = AncestralLogger.state, 

3030 table_type: TableType = TableType.matrix, 

3031 show_ext: bool = False, 

3032 seed: int = None, 

3033 ): 

3034 """Writes this Collation to the file with the given address. 

3035 

3036 Args: 

3037 file_addr (Union[Path, str]): The path to the output file. 

3038 format (Format, optional): The desired output format. 

3039 If None then it is infered from the file suffix. 

3040 Defaults to None. 

3041 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading. 

3042 Default value is False. 

3043 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings. 

3044 If not specified, then missing data is ignored (i.e., all states are 0). 

3045 If "uniform", then the contribution of 1 is divided evenly over all substantive readings. 

3046 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing. 

3047 Only applicable for tabular outputs of type "matrix" or "idf". 

3048 char_state_labels (bool, optional): An optional flag indicating whether to print 

3049 the CharStateLabels block in NEXUS output. 

3050 Default value is True. 

3051 frequency (bool, optional): An optional flag indicating whether to use the StatesFormat=Frequency setting 

3052 instead of the StatesFormat=StatesPresent setting 

3053 (and thus represent all states with frequency vectors rather than symbols) 

3054 in NEXUS output. 

3055 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation. 

3056 Default value is False. 

3057 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data. 

3058 If this flag is set, then only base symbols will be generated for the NEXUS file. 

3059 It is only applied if the frequency option is False. 

3060 Default value is False. 

3061 proportion (bool, optional): An optional flag indicating whether to populate a distance matrix's cells 

3062 with a proportion of disagreements to variation units where both witnesses are extant. 

3063 It is only applied if the table_type option is "distance". 

3064 Default value is False. 

3065 calibrate_dates (bool, optional): An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses 

3066 in NEXUS output. 

3067 This option is intended for inputs to BEAST 2. 

3068 Default value is False. 

3069 mrbayes (bool, optional): An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses 

3070 in NEXUS output. 

3071 This option is intended for inputs to MrBayes. 

3072 Default value is False. 

3073 clock_model (ClockModel, optional): A ClockModel option indicating which type of clock model to use. 

3074 This option is intended for inputs to MrBayes and BEAST 2. 

3075 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified. 

3076 Default value is "strict". 

3077 ancestral_logger (AncestralLogger, optional): An AncestralLogger option indicating which class of logger (if any) to use for ancestral states. 

3078 This option is intended for inputs to BEAST 2. 

3079 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate. 

3080 Only applicable for tabular outputs and PHYLIP outputs. 

3081 If the output is a PHYLIP file, then the type of tabular output must be "distance" or "similarity"; otherwise, it will be ignored. 

3082 Default value is "matrix". 

3083 show_ext (bool, optional): An optional flag indicating whether each cell in a distance or similarity matrix 

3084 should include the number of variation units where both witnesses are extant after the number of their disagreements/agreements. 

3085 Only applicable for tabular output formats of type "distance" or "similarity". 

3086 Default value is False. 

3087 seed (optional, int): A seed for random number generation (for setting initial values of unspecified transcriptional rates in BEAST 2 XML output). 

3088 """ 

3089 file_addr = Path(file_addr) 

3090 format = format or Format.infer( 

3091 file_addr.suffix 

3092 ) # an exception will be raised here if the format or suffix is invalid 

3093 

3094 if format == Format.NEXUS: 

3095 return self.to_nexus( 

3096 file_addr, 

3097 drop_constant=drop_constant, 

3098 char_state_labels=char_state_labels, 

3099 frequency=frequency, 

3100 ambiguous_as_missing=ambiguous_as_missing, 

3101 calibrate_dates=calibrate_dates, 

3102 mrbayes=mrbayes, 

3103 clock_model=clock_model, 

3104 ) 

3105 

3106 if format == format.HENNIG86: 

3107 return self.to_hennig86(file_addr, drop_constant=drop_constant) 

3108 

3109 if format == format.PHYLIP: 

3110 if table_type in [TableType.distance, TableType.similarity]: 

3111 return self.to_phylip_matrix( 

3112 file_addr, 

3113 drop_constant=drop_constant, 

3114 proportion=proportion, 

3115 table_type=table_type, 

3116 show_ext=show_ext, 

3117 ) 

3118 return self.to_phylip(file_addr, drop_constant=drop_constant) 

3119 

3120 if format == format.FASTA: 

3121 return self.to_fasta(file_addr, drop_constant=drop_constant) 

3122 

3123 if format == format.BEAST: 

3124 return self.to_beast( 

3125 file_addr, 

3126 drop_constant=drop_constant, 

3127 clock_model=clock_model, 

3128 ancestral_logger=ancestral_logger, 

3129 seed=seed, 

3130 ) 

3131 

3132 if format == Format.CSV: 

3133 return self.to_csv( 

3134 file_addr, 

3135 drop_constant=drop_constant, 

3136 ambiguous_as_missing=ambiguous_as_missing, 

3137 proportion=proportion, 

3138 table_type=table_type, 

3139 split_missing=split_missing, 

3140 show_ext=show_ext, 

3141 ) 

3142 

3143 if format == Format.TSV: 

3144 return self.to_csv( 

3145 file_addr, 

3146 drop_constant=drop_constant, 

3147 ambiguous_as_missing=ambiguous_as_missing, 

3148 proportion=proportion, 

3149 table_type=table_type, 

3150 split_missing=split_missing, 

3151 show_ext=show_ext, 

3152 sep="\t", 

3153 ) 

3154 

3155 if format == Format.EXCEL: 

3156 return self.to_excel( 

3157 file_addr, 

3158 drop_constant=drop_constant, 

3159 ambiguous_as_missing=ambiguous_as_missing, 

3160 proportion=proportion, 

3161 table_type=table_type, 

3162 split_missing=split_missing, 

3163 show_ext=show_ext, 

3164 ) 

3165 

3166 if format == Format.STEMMA: 

3167 return self.to_stemma(file_addr)