Coverage for teiphy/collation.py: 99.93%
1437 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-11 02:58 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-11 02:58 +0000
1#!/usr/bin/env python3
3from enum import Enum
4from typing import List, Union
5import os
6from pathlib import Path
7from datetime import datetime # for calculating the current year (for dating and tree height purposes)
8import math # for special functions
9import time # to time calculations for users
10import string # for easy retrieval of character ranges
11from lxml import etree as et # for reading TEI XML inputs
12import numpy as np # for random number sampling and collation matrix outputs
13import pandas as pd # for writing to DataFrames, CSV, Excel, etc.
14from slugify import slugify # for converting Unicode text from readings to ASCII for NEXUS
15from jinja2 import Environment, PackageLoader, select_autoescape # for filling output XML templates
17from .common import xml_ns, tei_ns
18from .format import Format
19from .witness import Witness
20from .variation_unit import VariationUnit
23class ParsingException(Exception):
24 pass
27class WitnessDateException(Exception):
28 pass
31class IntrinsicRelationsException(Exception):
32 pass
35class ClockModel(str, Enum):
36 strict = "strict"
37 uncorrelated = "uncorrelated"
38 local = "local"
41class AncestralLogger(str, Enum):
42 state = "state"
43 sequence = "sequence"
44 none = "none"
47class TableType(str, Enum):
48 matrix = "matrix"
49 distance = "distance"
50 similarity = "similarity"
51 idf = "idf"
52 nexus = "nexus"
53 long = "long"
56class SplitMissingType(str, Enum):
57 uniform = "uniform"
58 proportional = "proportional"
61class Collation:
62 """Base class for storing TEI XML collation data internally.
64 This corresponds to the entire XML tree, rooted at the TEI element of the collation.
66 Attributes:
67 manuscript_suffixes: A list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses.
68 trivial_reading_types: A set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading.
69 missing_reading_types: A set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data.
70 fill_corrector_lacunae: A boolean flag indicating whether or not to fill "lacunae" in witnesses with type "corrector".
71 fragmentary_threshold: A float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation.
72 witnesses: A list of Witness instances contained in this Collation.
73 witness_index_by_id: A dictionary mapping base witness ID strings to their int indices in the witnesses list.
74 variation_units: A list of VariationUnit instances contained in this Collation.
75 readings_by_witness: A dictionary mapping base witness ID strings to lists of reading support coefficients for all units (with at least two substantive readings).
76 substantive_variation_unit_ids: A list of ID strings for variation units with two or more substantive readings.
77 substantive_variation_unit_reading_tuples: A list of (variation unit ID, reading ID) tuples for substantive readings.
78 verbose: A boolean flag indicating whether or not to print timing and debugging details for the user.
79 """
81 def __init__(
82 self,
83 xml: et.ElementTree,
84 manuscript_suffixes: List[str] = [],
85 trivial_reading_types: List[str] = [],
86 missing_reading_types: List[str] = [],
87 fill_corrector_lacunae: bool = False,
88 fragmentary_threshold: float = None,
89 dates_file: Union[Path, str] = None,
90 verbose: bool = False,
91 ):
92 """Constructs a new Collation instance with the given settings.
94 Args:
95 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
96 manuscript_suffixes: An optional list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses.
97 trivial_reading_types: An optional set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading.
98 missing_reading_types: An optional set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data.
99 fill_corrector_lacunae: An optional flag indicating whether or not to fill "lacunae" in witnesses with type "corrector".
100 fragmentary_threshold: An optional float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation.
101 dates_file: An optional path to a CSV file containing witness IDs, minimum dates, and maximum dates. If specified, then for all witnesses in the first column, any existing date ranges for them in the TEI XML collation will be ignored.
102 verbose: An optional flag indicating whether or not to print timing and debugging details for the user.
103 """
104 self.manuscript_suffixes = manuscript_suffixes
105 self.trivial_reading_types = set(trivial_reading_types)
106 self.missing_reading_types = set(missing_reading_types)
107 self.fill_corrector_lacunae = fill_corrector_lacunae
108 self.fragmentary_threshold = fragmentary_threshold
109 self.verbose = verbose
110 self.witnesses = []
111 self.witness_index_by_id = {}
112 self.variation_units = []
113 self.readings_by_witness = {}
114 self.variation_unit_ids = []
115 self.substantive_variation_unit_reading_tuples = []
116 self.substantive_readings_by_variation_unit_id = {}
117 self.weight_categories = []
118 self.weights_by_id = {}
119 self.intrinsic_categories = []
120 self.intrinsic_odds_by_id = {}
121 self.transcriptional_categories = []
122 self.transcriptional_rates_by_id = {}
123 self.origin_date_range = []
124 # Now parse the XML tree to populate these data structures:
125 if self.verbose:
126 print("Initializing collation...")
127 t0 = time.time()
128 self.parse_origin_date_range(xml)
129 self.parse_list_wit(xml)
130 self.validate_wits(xml)
131 # If a dates file was specified, then update the witness date ranges manually:
132 if dates_file is not None:
133 self.update_witness_date_ranges_from_dates_file(dates_file)
134 # If the upper bound on a work's date of origin is not defined, then attempt to assign it an upper bound based on the witness dates;
135 # otherwise, attempt to assign lower bounds to witness dates based on it:
136 if self.origin_date_range[1] is None:
137 self.update_origin_date_range_from_witness_date_ranges()
138 else:
139 self.update_witness_date_ranges_from_origin_date_range()
140 self.parse_weights(xml)
141 self.parse_intrinsic_odds(xml)
142 self.parse_transcriptional_rates(xml)
143 self.parse_apps(xml)
144 self.validate_intrinsic_relations()
145 self.parse_readings_by_witness()
146 # If a threshold of readings for fragmentary witnesses is specified, then filter the witness list using the dictionary mapping witness IDs to readings:
147 if self.fragmentary_threshold is not None:
148 self.filter_fragmentary_witnesses(xml)
149 t1 = time.time()
150 if self.verbose:
151 print("Total time to initialize collation: %0.4fs." % (t1 - t0))
153 def parse_origin_date_range(self, xml: et.ElementTree):
154 """Given an XML tree for a collation, populates this Collation's list of origin date bounds.
156 Args:
157 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
158 """
159 if self.verbose:
160 print("Parsing origin date range...")
161 t0 = time.time()
162 self.origin_date_range = [None, None]
163 for date in xml.xpath(
164 "//tei:sourceDesc//tei:bibl//tei:date|//tei:sourceDesc//tei:biblStruct//tei:date|//tei:sourceDesc//tei:biblFull//tei:date",
165 namespaces={"tei": tei_ns},
166 ):
167 # Try the @when attribute first; if it is set, then it accounts for both ends of the date range:
168 if date.get("when") is not None:
169 self.origin_date_range[0] = int(date.get("when").split("-")[0])
170 self.origin_date_range[1] = self.origin_date_range[0]
171 # Failing that, if it has @from and @to attributes (indicating the period over which the work was completed),
172 # then the completion date of the work accounts for both ends of the date range:
173 elif date.get("to") is not None:
174 self.origin_date_range[0] = int(date.get("to").split("-")[0])
175 self.origin_date_range[1] = self.origin_date_range[0]
176 # Failing that, set lower and upper bounds on the origin date using the the @notBefore and @notAfter attributes:
177 elif date.get("notBefore") is not None or date.get("notAfter") is not None:
178 if date.get("notBefore") is not None:
179 self.origin_date_range[0] = int(date.get("notBefore").split("-")[0])
180 if date.get("notAfter") is not None:
181 self.origin_date_range[1] = int(date.get("notAfter").split("-")[0])
182 return
184 def get_base_wit(self, wit: str):
185 """Given a witness siglum, strips of the specified manuscript suffixes until the siglum matches one in the witness list or until no more suffixes can be stripped.
187 Args:
188 wit: A string representing a witness siglum, potentially including suffixes to be stripped.
189 """
190 base_wit = wit
191 # If our starting siglum corresponds to a siglum in the witness list, then just return it:
192 if base_wit in self.witness_index_by_id:
193 return base_wit
194 # Otherwise, strip any suffixes we find until the siglum corresponds to a base witness in the list
195 # or no more suffixes can be stripped:
196 suffix_found = True
197 while suffix_found:
198 suffix_found = False
199 for suffix in self.manuscript_suffixes:
200 if base_wit.endswith(suffix):
201 suffix_found = True
202 base_wit = base_wit[: -len(suffix)]
203 break # stop looking for other suffixes
204 # If the siglum stripped of this suffix now corresponds to a siglum in the witness list, then return it:
205 if base_wit in self.witness_index_by_id:
206 return base_wit
207 # If we get here, then all possible manuscript suffixes have been stripped, and the resulting siglum does not correspond to a siglum in the witness list:
208 return base_wit
210 def parse_list_wit(self, xml: et.ElementTree):
211 """Given an XML tree for a collation, populates its list of witnesses from its listWit element.
212 If the XML tree does not contain a listWit element, then a ParsingException is thrown listing all distinct witness sigla encountered in the collation.
214 Args:
215 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
216 """
217 if self.verbose:
218 print("Parsing witness list...")
219 t0 = time.time()
220 self.witnesses = []
221 self.witness_index_by_id = {}
222 list_wits = xml.xpath("/tei:TEI//tei:listWit", namespaces={"tei": tei_ns})
223 if len(list_wits) == 0:
224 # There is no listWit element: collect all distinct witness sigla in the collation and raise a ParsingException listing them:
225 distinct_sigla = set()
226 sigla = []
227 # Proceed for each rdg, rdgGrp, or witDetail element:
228 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}):
229 wit_str = rdg.get("wit") if rdg.get("wit") is not None else ""
230 wits = wit_str.split()
231 for wit in wits:
232 siglum = wit.strip("#") # remove the URI prefix, if there is one
233 if siglum not in distinct_sigla:
234 distinct_sigla.add(siglum)
235 sigla.append(siglum)
236 sigla.sort()
237 msg = ""
238 msg += "An explicit listWit element must be included in the TEI XML collation.\n"
239 msg += "The following sigla occur in the collation and should be included as the @xml:id or @n attributes of witness elements under the listWit element:\n"
240 msg += ", ".join(sigla)
241 raise ParsingException(msg)
242 # Otherwise, take the first listWit element as the list of all witnesses and process it:
243 list_wit = list_wits[0]
244 for witness in list_wit.xpath("./tei:witness", namespaces={"tei": tei_ns}):
245 wit = Witness(witness, self.verbose)
246 self.witness_index_by_id[wit.id] = len(self.witnesses)
247 self.witnesses.append(wit)
248 t1 = time.time()
249 if self.verbose:
250 print("Finished processing %d witnesses in %0.4fs." % (len(self.witnesses), t1 - t0))
251 return
253 def validate_wits(self, xml: et.ElementTree):
254 """Given an XML tree for a collation, checks if any witness sigla listed in a rdg, rdgGrp, or witDetail element,
255 once stripped of ignored suffixes, is not found in the witness list.
256 A warning will be issued for each distinct siglum like this.
257 This method also checks if the upper bound of any witness's date is earlier than the lower bound on the collated work's date of origin
258 and throws an exception if so.
260 Args:
261 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
262 """
263 if self.verbose:
264 print("Validating witness list against collation...")
265 t0 = time.time()
266 # There is no listWit element: collect all distinct witness sigla in the collation and raise an exception listing them:
267 distinct_extra_sigla = set()
268 extra_sigla = []
269 # Proceed for each rdg, rdgGrp, or witDetail element:
270 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}):
271 wit_str = rdg.get("wit") if rdg.get("wit") is not None else ""
272 wits = wit_str.split()
273 for wit in wits:
274 siglum = wit.strip("#") # remove the URI prefix, if there is one
275 base_siglum = self.get_base_wit(siglum)
276 if base_siglum not in self.witness_index_by_id:
277 if base_siglum not in distinct_extra_sigla:
278 distinct_extra_sigla.add(base_siglum)
279 extra_sigla.append(base_siglum)
280 if len(extra_sigla) > 0:
281 extra_sigla.sort()
282 msg = ""
283 msg += "WARNING: The following sigla occur in the collation that do not have corresponding witness entries in the listWit:\n"
284 msg += ", ".join(extra_sigla)
285 print(msg)
286 # If the lower bound on the date of origin is defined, then check each witness against it:
287 if self.origin_date_range[0] is not None:
288 bad_date_witness_sigla = []
289 bad_date_upper_bounds_by_witness = {}
290 for i, wit in enumerate(self.witnesses):
291 if wit.date_range[1] is not None and wit.date_range[1] < self.origin_date_range[0]:
292 bad_date_witness_sigla.append(wit.id)
293 bad_date_upper_bounds_by_witness[wit.id] = wit.date_range[1]
294 if len(bad_date_witness_sigla) > 0:
295 msg = ""
296 msg += "The following witnesses have their latest possible dates before the earliest date of origin %d specified for the collated work:\n"
297 msg += ", ".join(
298 [
299 (siglum + "(" + str(bad_date_upper_bounds_by_witness[siglum]) + ")")
300 for siglum in bad_date_witness_sigla
301 ]
302 )
303 raise WitnessDateException(msg)
304 t1 = time.time()
305 if self.verbose:
306 print("Finished witness validation in %0.4fs." % (t1 - t0))
307 return
309 def update_witness_date_ranges_from_dates_file(self, dates_file: Union[Path, str]):
310 """Given a CSV-formatted dates file, update the date ranges of all witnesses whose IDs are in the first column of the dates file
311 (overwriting existing date ranges if necessary).
312 """
313 if self.verbose:
314 print("Updating witness dates from file %s..." % (str(dates_file)))
315 t0 = time.time()
316 dates_df = pd.read_csv(dates_file, index_col=0, names=["id", "min", "max"])
317 for witness in self.witnesses:
318 wit_id = witness.id
319 if wit_id in dates_df.index:
320 # For every witness in the list whose ID is specified in the dates file,
321 # update their date ranges (as long as the date ranges in the file are are well-formed):
322 min_date = int(dates_df.loc[wit_id]["min"]) if not np.isnan(dates_df.loc[wit_id]["min"]) else None
323 max_date = (
324 int(dates_df.loc[wit_id]["max"])
325 if not np.isnan(dates_df.loc[wit_id]["max"])
326 else datetime.now().year
327 )
328 print(min_date, max_date)
329 if min_date is not None and max_date is not None and min_date > max_date:
330 raise ParsingException(
331 "In dates file %s, for witness ID %s, the minimum date %d is greater than the maximum date %d."
332 % (str(dates_file), wit_id, min_date, max_date)
333 )
334 witness.date_range = [min_date, max_date]
335 t1 = time.time()
336 if self.verbose:
337 print("Finished witness date range updates in %0.4fs." % (t1 - t0))
338 return
340 def update_origin_date_range_from_witness_date_ranges(self):
341 """Conditionally updates the upper bound on the date of origin of the work represented by this Collation
342 based on the bounds on the witnesses' dates.
343 If none of the witnesses have bounds on their dates, then nothing is done.
344 This method is only invoked if the work's date of origin does not already have its upper bound defined.
345 """
346 if self.verbose:
347 print("Updating upper bound on origin date using witness dates...")
348 t0 = time.time()
349 # Set the origin date to the earliest witness date:
350 witness_date_lower_bounds = [wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None]
351 witness_date_upper_bounds = [wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None]
352 min_witness_date = (
353 min(witness_date_lower_bounds + witness_date_upper_bounds)
354 if len(witness_date_lower_bounds + witness_date_upper_bounds) > 0
355 else None
356 )
357 if min_witness_date is not None:
358 self.origin_date_range[1] = (
359 min(self.origin_date_range[1], min_witness_date)
360 if self.origin_date_range[1] is not None
361 else min_witness_date
362 )
363 t1 = time.time()
364 if self.verbose:
365 print("Finished updating upper bound on origin date in %0.4fs." % (t1 - t0))
366 return
368 def update_witness_date_ranges_from_origin_date_range(self):
369 """Attempts to update the lower bounds on the witnesses' dates of origin of the work represented by this Collation
370 using the upper bound on the date of origin of the work represented by this Collation.
371 This method is only invoked if the upper bound on the work's date of origin was not already defined
372 (i.e., if update_origin_date_range_from_witness_date_ranges was not invoked).
373 """
374 if self.verbose:
375 print("Updating lower bounds on witness dates using origin date...")
376 t0 = time.time()
377 # Proceed for every witness:
378 for i, wit in enumerate(self.witnesses):
379 # Ensure that the lower bound on this witness's date is no earlier than the upper bound on the date of the work's origin:
380 wit.date_range[0] = (
381 max(wit.date_range[0], self.origin_date_range[1])
382 if wit.date_range[0] is not None
383 else self.origin_date_range[1]
384 )
385 # Then ensure that the upper bound on this witness's date is no earlier than its lower bound, in case we updated it:
386 wit.date_range[1] = max(wit.date_range[0], wit.date_range[1])
387 t1 = time.time()
388 if self.verbose:
389 print("Finished updating lower bounds on witness dates in %0.4fs." % (t1 - t0))
390 return
392 def parse_weights(self, xml: et.ElementTree):
393 """Given an XML tree for a collation, populates this Collation's list of variation unit weight categories
394 (associated with types of variation that may have different expected frequencies)
395 and its dictionary mapping these categories to integer weights.
397 Args:
398 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
399 """
400 if self.verbose:
401 print("Parsing variation unit weight categories...")
402 t0 = time.time()
403 self.weight_categories = []
404 self.weights_by_id = {}
405 for interp in xml.xpath("//tei:interpGrp[@type=\"weight\"]/tei:interp", namespaces={"tei": tei_ns}):
406 # These must be indexed by the xml:id attribute, so skip any that do not have one:
407 if interp.get("{%s}id" % xml_ns) is None:
408 continue
409 weight_category = interp.get("{%s}id" % xml_ns)
410 # Retrieve this element's text value and coerce it to an integer, defaulting to 1 if it has none:
411 weight = 1
412 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}):
413 if certainty.get("degree") is not None:
414 weight = int(certainty.get("degree"))
415 break
416 self.weight_categories.append(weight_category)
417 self.weights_by_id[weight_category] = weight
418 t1 = time.time()
419 if self.verbose:
420 print(
421 "Finished processing %d variation unit weight categories in %0.4fs."
422 % (len(self.weight_categories), t1 - t0)
423 )
424 return
426 def parse_intrinsic_odds(self, xml: et.ElementTree):
427 """Given an XML tree for a collation, populates this Collation's list of intrinsic probability categories
428 (e.g., "absolutely more likely," "highly more likely," "more likely," "slightly more likely," "equally likely")
429 and its dictionary mapping these categories to numerical odds.
430 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated.
432 Args:
433 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
434 """
435 if self.verbose:
436 print("Parsing intrinsic odds categories...")
437 t0 = time.time()
438 self.intrinsic_categories = []
439 self.intrinsic_odds_by_id = {}
440 for interp in xml.xpath("//tei:interpGrp[@type=\"intrinsic\"]/tei:interp", namespaces={"tei": tei_ns}):
441 # These must be indexed by the xml:id attribute, so skip any that do not have one:
442 if interp.get("{%s}id" % xml_ns) is None:
443 continue
444 odds_category = interp.get("{%s}id" % xml_ns)
445 # If this element contains a certainty subelement with a fixed odds value for this category, then set it:
446 odds = None
447 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}):
448 if certainty.get("degree") is not None:
449 odds = float(certainty.get("degree"))
450 break
451 self.intrinsic_categories.append(odds_category)
452 self.intrinsic_odds_by_id[odds_category] = odds
453 t1 = time.time()
454 if self.verbose:
455 print(
456 "Finished processing %d intrinsic odds categories in %0.4fs."
457 % (len(self.intrinsic_categories), t1 - t0)
458 )
459 return
461 def parse_transcriptional_rates(self, xml: et.ElementTree):
462 """Given an XML tree for a collation, populates this Collation's dictionary mapping transcriptional change categories
463 (e.g., "aural confusion," "visual error," "clarification") to numerical rates.
464 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated.
466 Args:
467 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
468 """
469 if self.verbose:
470 print("Parsing transcriptional change categories...")
471 t0 = time.time()
472 self.transcriptional_categories = []
473 self.transcriptional_rates_by_id = {}
474 for interp in xml.xpath("//tei:interpGrp[@type=\"transcriptional\"]/tei:interp", namespaces={"tei": tei_ns}):
475 # These must be indexed by the xml:id attribute, so skip any that do not have one:
476 if interp.get("{%s}id" % xml_ns) is None:
477 continue
478 transcriptional_category = interp.get("{%s}id" % xml_ns)
479 # If this element contains a certainty subelement with a fixed rate for this category, then set it:
480 rate = None
481 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}):
482 if certainty.get("degree") is not None:
483 rate = float(certainty.get("degree"))
484 break
485 self.transcriptional_categories.append(transcriptional_category)
486 self.transcriptional_rates_by_id[transcriptional_category] = rate
487 t1 = time.time()
488 if self.verbose:
489 print(
490 "Finished processing %d transcriptional change categories in %0.4fs."
491 % (len(self.transcriptional_rates_by_id), t1 - t0)
492 )
493 return
495 def validate_intrinsic_relations(self):
496 """Checks if any VariationUnit's intrinsic_relations map is not a forest.
497 If any is not, then an IntrinsicRelationsException is thrown describing the VariationUnit at fault.
498 """
499 if self.verbose:
500 print("Validating intrinsic relation graphs for variation units...")
501 t0 = time.time()
502 for vu in self.variation_units:
503 # Skip any variation units with an empty intrinsic_relations map:
504 if len(vu.intrinsic_relations) == 0:
505 continue
506 # For all others, start by identifying all reading IDs that are not related to by some other reading ID:
507 in_degree_by_reading = {}
508 for edge in vu.intrinsic_relations:
509 s = edge[0]
510 t = edge[1]
511 if s not in in_degree_by_reading:
512 in_degree_by_reading[s] = 0
513 if t not in in_degree_by_reading:
514 in_degree_by_reading[t] = 0
515 in_degree_by_reading[t] += 1
516 # If any reading has more than one relation pointing to it, then the intrinsic relations graph is not a forest:
517 excessive_in_degree_readings = [
518 rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] > 1
519 ]
520 if len(excessive_in_degree_readings) > 0:
521 msg = ""
522 msg += (
523 "In variation unit %s, the following readings have more than one intrinsic relation pointing to them: %s.\n"
524 % (vu.id, ", ".join(excessive_in_degree_readings))
525 )
526 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it."
527 raise IntrinsicRelationsException(msg)
528 # If every reading has another reading pointing to it, then the intrinsic relations graph contains a cycle and is not a forest:
529 starting_nodes = [rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] == 0]
530 if len(starting_nodes) == 0:
531 msg = ""
532 msg += "In variation unit %s, the intrinsic relations contain a cycle.\n" % vu.id
533 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it."
534 raise IntrinsicRelationsException(msg)
535 t1 = time.time()
536 if self.verbose:
537 print("Finished intrinsic relations validation in %0.4fs." % (t1 - t0))
538 return
540 def parse_apps(self, xml: et.ElementTree):
541 """Given an XML tree for a collation, populates its list of variation units from its app elements.
543 Args:
544 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
545 """
546 if self.verbose:
547 print("Parsing variation units...")
548 t0 = time.time()
549 for a in xml.xpath('//tei:app', namespaces={'tei': tei_ns}):
550 vu = VariationUnit(a, self.verbose)
551 self.variation_units.append(vu)
552 t1 = time.time()
553 if self.verbose:
554 print("Finished processing %d variation units in %0.4fs." % (len(self.variation_units), t1 - t0))
555 return
557 def get_readings_by_witness_for_unit(self, vu: VariationUnit):
558 """Returns a dictionary mapping witness IDs to a list of their reading coefficients for a given variation unit.
560 Args:
561 vu: A VariationUnit to be processed.
563 Returns:
564 A dictionary mapping witness ID strings to a list of their coefficients for all substantive readings in this VariationUnit.
565 """
566 # In a first pass, populate lists of substantive (variation unit ID, reading ID) tuples and reading labels
567 # and a map from reading IDs to the indices of their parent substantive reading in this unit:
568 reading_id_to_index = {}
569 self.substantive_readings_by_variation_unit_id[vu.id] = []
570 for rdg in vu.readings:
571 # If this reading is missing (e.g., lacunose or inapplicable due to an overlapping variant) or targets another reading, then skip it:
572 if rdg.type in self.missing_reading_types or len(rdg.certainties) > 0:
573 continue
574 # If this reading is trivial, then map it to the last substantive index:
575 if rdg.type in self.trivial_reading_types:
576 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1
577 continue
578 # Otherwise, the reading is substantive: add it to the map and update the last substantive index:
579 self.substantive_readings_by_variation_unit_id[vu.id].append(rdg.id)
580 self.substantive_variation_unit_reading_tuples.append(tuple([vu.id, rdg.id]))
581 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1
582 # If the list of substantive readings only contains one entry, then this variation unit is not informative;
583 # return an empty dictionary and add nothing to the list of substantive reading labels:
584 if self.verbose:
585 print(
586 "Variation unit %s has %d substantive readings."
587 % (vu.id, len(self.substantive_readings_by_variation_unit_id[vu.id]))
588 )
589 readings_by_witness_for_unit = {}
590 # Initialize the output dictionary with empty sets for all base witnesses:
591 for wit in self.witnesses:
592 readings_by_witness_for_unit[wit.id] = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id])
593 # In a second pass, assign each base witness a set containing the readings it supports in this unit:
594 for rdg in vu.readings:
595 # Initialize the dictionary indicating support for this reading (or its disambiguations):
596 rdg_support = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id])
597 # If this is a missing reading (e.g., a lacuna or an overlap), then we can skip it, as its corresponding set will be empty:
598 if rdg.type in self.missing_reading_types:
599 continue
600 # Otherwise, if this reading is trivial, then it will contain an entry for the index of its parent substantive reading:
601 elif rdg.type in self.trivial_reading_types:
602 rdg_support[reading_id_to_index[rdg.id]] = 1
603 # Otherwise, if this reading has one or more nonzero certainty degrees,
604 # then set the entries for these readings to their degrees:
605 elif sum(rdg.certainties.values()) > 0:
606 for t in rdg.certainties:
607 # Skip any reading whose ID is unrecognized in this unit:
608 if t in reading_id_to_index:
609 rdg_support[reading_id_to_index[t]] = rdg.certainties[t]
610 # Otherwise, if this reading has one or more targets (i.e., if it is an ambiguous reading),
611 # then set the entries for each of its targets to 1:
612 elif len(rdg.targets) > 0:
613 for t in rdg.targets:
614 # Skip any reading whose ID is unrecognized in this unit:
615 if t in reading_id_to_index:
616 rdg_support[reading_id_to_index[t]] = 1
617 # Otherwise, this reading is itself substantive; set the entry for the index of this reading to 1:
618 else:
619 rdg_support[reading_id_to_index[rdg.id]] = 1
620 # Proceed for each witness siglum in the support for this reading:
621 for wit in rdg.wits:
622 # Is this siglum a base siglum?
623 base_wit = self.get_base_wit(wit)
624 if base_wit not in self.witness_index_by_id:
625 # If it is not, then it is probably just because we've encountered a corrector or some other secondary witness not included in the witness list;
626 # report this if we're in verbose mode and move on:
627 if self.verbose:
628 print(
629 "Skipping unknown witness siglum %s (base siglum %s) in variation unit %s, reading %s..."
630 % (wit, base_wit, vu.id, rdg.id)
631 )
632 continue
633 # If we've found a base siglum, then add this reading's contribution to the base witness's reading set for this unit;
634 # normally the existing set will be empty, but if we reduce two suffixed sigla to the same base witness,
635 # then that witness may attest to multiple readings in the same unit:
636 readings_by_witness_for_unit[base_wit] = [
637 (min(readings_by_witness_for_unit[base_wit][i] + rdg_support[i], 1))
638 for i in range(len(rdg_support))
639 ]
640 return readings_by_witness_for_unit
642 def parse_readings_by_witness(self):
643 """Populates the internal dictionary mapping witness IDs to a list of their reading support sets for all variation units, and then fills the empty reading support sets for witnesses of type "corrector" with the entries of the previous witness."""
644 if self.verbose:
645 print("Populating internal dictionary of witness readings...")
646 t0 = time.time()
647 # Initialize the data structures to be populated here:
648 self.readings_by_witness = {}
649 self.variation_unit_ids = []
650 for wit in self.witnesses:
651 self.readings_by_witness[wit.id] = []
652 # Populate them for each variation unit:
653 for vu in self.variation_units:
654 readings_by_witness_for_unit = self.get_readings_by_witness_for_unit(vu)
655 if len(readings_by_witness_for_unit) > 0:
656 self.variation_unit_ids.append(vu.id)
657 for wit in readings_by_witness_for_unit:
658 self.readings_by_witness[wit].append(readings_by_witness_for_unit[wit])
659 # Optionally, fill the lacunae of the correctors:
660 if self.fill_corrector_lacunae:
661 for i, wit in enumerate(self.witnesses):
662 # If this is the first witness, then it shouldn't be a corrector (since there is no previous witness against which to compare it):
663 if i == 0:
664 continue
665 # Otherwise, if this witness is not a corrector, then skip it:
666 if wit.type != "corrector":
667 continue
668 # Otherwise, retrieve the previous witness:
669 prev_wit = self.witnesses[i - 1]
670 for j in range(len(self.readings_by_witness[wit.id])):
671 # If the corrector has no reading, then set it to the previous witness's reading:
672 if sum(self.readings_by_witness[wit.id][j]) == 0:
673 self.readings_by_witness[wit.id][j] = self.readings_by_witness[prev_wit.id][j]
674 t1 = time.time()
675 if self.verbose:
676 print(
677 "Populated dictionary for %d witnesses over %d substantive variation units in %0.4fs."
678 % (len(self.witnesses), len(self.variation_unit_ids), t1 - t0)
679 )
680 return
682 def filter_fragmentary_witnesses(self, xml):
683 """Filters the original witness list and readings by witness dictionary to exclude witnesses whose proportions of extant passages fall below the fragmentary readings threshold."""
684 if self.verbose:
685 print(
686 "Filtering fragmentary witnesses (extant in < %f of all variation units) out of internal witness list and dictionary of witness readings..."
687 % self.fragmentary_threshold
688 )
689 t0 = time.time()
690 fragmentary_witness_set = set()
691 # Proceed for each witness in order:
692 for wit in self.witnesses:
693 wit_id = wit.id
694 # We count the number of variation units at which this witness has an extant (i.e., non-missing) reading:
695 extant_reading_count = 0
696 total_reading_count = len(self.readings_by_witness[wit.id])
697 # Proceed through all reading support lists:
698 for rdg_support in self.readings_by_witness[wit_id]:
699 # If the current reading support list is not all zeroes, then increment this witness's count of extant readings:
700 if sum(rdg_support) != 0:
701 extant_reading_count += 1
702 # If the proportion of extant readings falls below the threshold, then add this witness to the list of fragmentary witnesses:
703 if extant_reading_count / total_reading_count < self.fragmentary_threshold:
704 fragmentary_witness_set.add(wit_id)
705 # Then filter the witness list to exclude the fragmentary witnesses:
706 filtered_witnesses = [wit for wit in self.witnesses if wit.id not in fragmentary_witness_set]
707 self.witnesses = filtered_witnesses
708 # Then remove the entries for the fragmentary witnesses from the witnesses-to-readings dictionary:
709 for wit_id in fragmentary_witness_set:
710 del self.readings_by_witness[wit_id]
711 t1 = time.time()
712 if self.verbose:
713 print(
714 "Filtered out %d fragmentary witness(es) (%s) in %0.4fs."
715 % (len(fragmentary_witness_set), str(list(fragmentary_witness_set)), t1 - t0)
716 )
717 return
719 def get_nexus_symbols(self):
720 """Returns a list of one-character symbols needed to represent the states of all substantive readings in NEXUS.
722 The number of symbols equals the maximum number of substantive readings at any variation unit.
724 Returns:
725 A list of individual characters representing states in readings.
726 """
727 # NOTE: IQTREE does not appear to support symbols outside of 0-9 and a-z, and its base symbols must be case-insensitive.
728 # The official version of MrBayes is likewise limited to 32 symbols.
729 # But PAUP* allows up to 64 symbols, and Andrew Edmondson's fork of MrBayes does, as well.
730 # So this method will support symbols from 0-9, a-z, and A-Z (for a total of 62 states)
731 possible_symbols = list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase)
732 # The number of symbols needed is equal to the length of the longest substantive reading vector:
733 nsymbols = 0
734 # If there are no witnesses, then no symbols are needed at all:
735 if len(self.witnesses) == 0:
736 return []
737 wit_id = self.witnesses[0].id
738 for rdg_support in self.readings_by_witness[wit_id]:
739 nsymbols = max(nsymbols, len(rdg_support))
740 nexus_symbols = possible_symbols[:nsymbols]
741 return nexus_symbols
743 def to_nexus(
744 self,
745 file_addr: Union[Path, str],
746 drop_constant: bool = False,
747 char_state_labels: bool = True,
748 frequency: bool = False,
749 ambiguous_as_missing: bool = False,
750 calibrate_dates: bool = False,
751 mrbayes: bool = False,
752 clock_model: ClockModel = ClockModel.strict,
753 ):
754 """Writes this Collation to a NEXUS file with the given address.
756 Args:
757 file_addr: A string representing the path to an output NEXUS file; the file type should be .nex, .nexus, or .nxs.
758 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading.
759 char_state_labels: An optional flag indicating whether or not to include the CharStateLabels block.
760 frequency: An optional flag indicating whether to use the StatesFormat=Frequency setting
761 instead of the StatesFormat=StatesPresent setting
762 (and thus represent all states with frequency vectors rather than symbols).
763 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation.
764 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data.
765 If this flag is set, then only base symbols will be generated for the NEXUS file.
766 It is only applied if the frequency option is False.
767 calibrate_dates: An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses.
768 This option is intended for inputs to BEAST 2.
769 mrbayes: An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses.
770 This option is intended for inputs to MrBayes.
771 clock_model: A ClockModel option indicating which type of clock model to use.
772 This option is intended for inputs to MrBayes and BEAST 2.
773 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified.
774 """
775 # Populate a list of sites that will correspond to columns of the sequence alignment:
776 substantive_variation_unit_ids = self.variation_unit_ids
777 if drop_constant:
778 substantive_variation_unit_ids = [
779 vu_id
780 for vu_id in self.variation_unit_ids
781 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
782 ]
783 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
784 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
785 # Start by calculating the values we will be using here:
786 ntax = len(self.witnesses)
787 nchar = len(substantive_variation_unit_ids)
788 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses]
789 max_taxlabel_length = max(
790 [len(taxlabel) for taxlabel in taxlabels]
791 ) # keep track of the longest taxon label for tabular alignment purposes
792 charlabels = [slugify(vu_id, lowercase=False, separator='_') for vu_id in substantive_variation_unit_ids]
793 missing_symbol = '?'
794 symbols = self.get_nexus_symbols()
795 # Generate all parent folders for this file that don't already exist:
796 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
797 with open(file_addr, "w", encoding="utf-8") as f:
798 # Start with the NEXUS header:
799 f.write("#NEXUS\n\n")
800 # Then begin the data block:
801 f.write("Begin DATA;\n")
802 # Write the collation matrix dimensions:
803 f.write("\tDimensions ntax=%d nchar=%d;\n" % (ntax, nchar))
804 # Write the format subblock:
805 f.write("\tFormat\n")
806 f.write("\t\tDataType=Standard\n")
807 f.write("\t\tMissing=%s\n" % missing_symbol)
808 if frequency:
809 f.write("\t\tStatesFormat=Frequency\n")
810 f.write("\t\tSymbols=\"%s\";\n" % (" ".join(symbols)))
811 # If the char_state_labels is set, then write the labels for character-state labels, with each on its own line:
812 if char_state_labels:
813 f.write("\tCharStateLabels")
814 vu_ind = 1
815 for vu in self.variation_units:
816 if vu.id not in substantive_variation_unit_ids_set:
817 continue
818 if vu_ind == 1:
819 f.write("\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_')))
820 else:
821 f.write(",\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_')))
822 rdg_ind = 0
823 for rdg in vu.readings:
824 key = tuple([vu.id, rdg.id])
825 if key not in substantive_variation_unit_reading_tuples_set:
826 continue
827 ascii_rdg_text = slugify(
828 rdg.text, lowercase=False, separator='_', replacements=[['η', 'h'], ['ω', 'w']]
829 )
830 if ascii_rdg_text == "":
831 ascii_rdg_text = "om."
832 f.write(" %s" % ascii_rdg_text)
833 rdg_ind += 1
834 if rdg_ind > 0:
835 vu_ind += 1
836 f.write(";\n")
837 # Write the matrix subblock:
838 f.write("\tMatrix")
839 for i, wit in enumerate(self.witnesses):
840 taxlabel = taxlabels[i]
841 if frequency:
842 sequence = "\n\t\t" + taxlabel
843 for j, vu_id in enumerate(self.variation_unit_ids):
844 if vu_id not in substantive_variation_unit_ids_set:
845 continue
846 rdg_support = self.readings_by_witness[wit.id][j]
847 sequence += "\n\t\t\t"
848 # If this reading is lacunose in this witness, then use the missing character:
849 if sum(rdg_support) == 0:
850 sequence += missing_symbol
851 continue
852 # Otherwise, print out its frequencies for different readings in parentheses:
853 sequence += "("
854 for j, w in enumerate(rdg_support):
855 sequence += "%s:%0.4f" % (symbols[j], w)
856 if j < len(rdg_support) - 1:
857 sequence += " "
858 sequence += ")"
859 else:
860 sequence = "\n\t\t" + taxlabel
861 # Add enough space after this label ensure that all sequences are nicely aligned:
862 sequence += " " * (max_taxlabel_length - len(taxlabel) + 1)
863 for j, vu_id in enumerate(self.variation_unit_ids):
864 if vu_id not in substantive_variation_unit_ids_set:
865 continue
866 rdg_support = self.readings_by_witness[wit.id][j]
867 # If this reading is lacunose in this witness, then use the missing character:
868 if sum(rdg_support) == 0:
869 sequence += missing_symbol
870 continue
871 rdg_inds = [
872 k for k, w in enumerate(rdg_support) if w > 0
873 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them
874 # For singleton readings, just print the symbol:
875 if len(rdg_inds) == 1:
876 sequence += symbols[rdg_inds[0]]
877 continue
878 # For multiple readings, print the corresponding readings in braces or the missing symbol depending on input settings:
879 if ambiguous_as_missing:
880 sequence += missing_symbol
881 else:
882 sequence += "{%s}" % "".join([str(rdg_ind) for rdg_ind in rdg_inds])
883 f.write("%s" % (sequence))
884 f.write(";\n")
885 # End the data block:
886 f.write("End;")
887 # If calibrate_dates is set, then add the assumptions block:
888 if calibrate_dates:
889 f.write("\n\n")
890 f.write("Begin ASSUMPTIONS;\n")
891 # Set the scale to years:
892 f.write("\tOPTIONS SCALE = years;\n\n")
893 # Then calibrate the witness ages:
894 calibrate_strings = []
895 for i, wit in enumerate(self.witnesses):
896 taxlabel = taxlabels[i]
897 date_range = wit.date_range
898 if date_range[0] is not None:
899 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution,
900 # depending on whether the upper and lower bounds match:
901 min_age = datetime.now().year - date_range[1]
902 max_age = datetime.now().year - date_range[0]
903 if min_age == max_age:
904 calibrate_string = "\tCALIBRATE %s = fixed(%d)" % (taxlabel, min_age)
905 calibrate_strings.append(calibrate_string)
906 else:
907 calibrate_string = "\tCALIBRATE %s = uniform(%d,%d)" % (taxlabel, min_age, max_age)
908 calibrate_strings.append(calibrate_string)
909 else:
910 # If there is no lower bound on the witness's date, then use an offset log-normal distribution:
911 min_age = datetime.now().year - date_range[1]
912 calibrate_string = "\tCALIBRATE %s = offsetlognormal(%d,0.0,1.0)" % (taxlabel, min_age)
913 calibrate_strings.append(calibrate_string)
914 # Then print the calibrate strings, separated by commas and line breaks and terminated by a semicolon:
915 f.write("%s;\n\n" % ",\n".join(calibrate_strings))
916 # End the assumptions block:
917 f.write("End;")
918 # If mrbayes is set, then add the mrbayes block:
919 if mrbayes:
920 f.write("\n\n")
921 f.write("Begin MRBAYES;\n")
922 # Turn on the autoclose feature by default:
923 f.write("\tset autoclose=yes;\n")
924 # Set the branch lengths to be governed by a birth-death clock model, and set up the parameters for this model:
925 f.write("\n")
926 f.write("\tprset brlenspr = clock:birthdeath;\n")
927 f.write("\tprset speciationpr = uniform(0.0,10.0);\n")
928 f.write("\tprset extinctionpr = beta(2.0,4.0);\n")
929 f.write("\tprset sampleprob = 0.01;\n")
930 # Use the specified clock model:
931 f.write("\n")
932 if clock_model == clock_model.uncorrelated:
933 f.write("\tprset clockvarpr=igr;\n")
934 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n")
935 f.write("\tprset igrvarpr=exponential(1.0);\n")
936 else:
937 f.write("\tprset clockvarpr=strict;\n")
938 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n")
939 # Set the priors on the tree age depending on the date range for the origin of the collated work:
940 f.write("\n")
941 if self.origin_date_range[0] is not None:
942 min_tree_age = (
943 datetime.now().year - self.origin_date_range[1]
944 if self.origin_date_range[1] is not None
945 else 0.0
946 )
947 max_tree_age = datetime.now().year - self.origin_date_range[0]
948 f.write("\tprset treeagepr = uniform(%d,%d);\n" % (min_tree_age, max_tree_age))
949 else:
950 min_tree_age = (
951 datetime.now().year - self.origin_date_range[1]
952 if self.origin_date_range[1] is not None
953 else 0.0
954 )
955 f.write("\tprset treeagepr = offsetgamma(%d,1.0,1.0);\n" % (min_tree_age))
956 # Then calibrate the witness ages:
957 f.write("\n")
958 f.write("\tprset nodeagepr = calibrated;\n")
959 for i, wit in enumerate(self.witnesses):
960 taxlabel = taxlabels[i]
961 date_range = wit.date_range
962 if date_range[0] is not None:
963 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution,
964 # depending on whether the upper and lower bounds match:
965 min_age = datetime.now().year - date_range[1]
966 max_age = datetime.now().year - date_range[0]
967 if min_age == max_age:
968 f.write("\tcalibrate %s = fixed(%d);\n" % (taxlabel, min_age))
969 else:
970 f.write("\tcalibrate %s = uniform(%d,%d);\n" % (taxlabel, min_age, max_age))
971 else:
972 # If there is no lower bound on the witness's date, then use an offset gamma distribution:
973 min_age = datetime.now().year - date_range[1]
974 f.write("\tcalibrate %s = offsetgamma(%d,1.0,1.0);\n" % (taxlabel, min_age))
975 f.write("\n")
976 # Add default settings for MCMC estimation of posterior distribution:
977 f.write("\tmcmcp ngen=100000;\n")
978 # Write the command to run MrBayes:
979 f.write("\tmcmc;\n")
980 # End the assumptions block:
981 f.write("End;")
982 return
984 def get_hennig86_symbols(self):
985 """Returns a list of one-character symbols needed to represent the states of all substantive readings in Hennig86 format.
987 The number of symbols equals the maximum number of substantive readings at any variation unit.
989 Returns:
990 A list of individual characters representing states in readings.
991 """
992 possible_symbols = (
993 list(string.digits) + list(string.ascii_uppercase)[:22]
994 ) # NOTE: the maximum number of symbols allowed in Hennig86 format is 32
995 # The number of symbols needed is equal to the length of the longest substantive reading vector:
996 nsymbols = 0
997 # If there are no witnesses, then no symbols are needed at all:
998 if len(self.witnesses) == 0:
999 return []
1000 wit_id = self.witnesses[0].id
1001 for rdg_support in self.readings_by_witness[wit_id]:
1002 nsymbols = max(nsymbols, len(rdg_support))
1003 hennig86_symbols = possible_symbols[:nsymbols]
1004 return hennig86_symbols
1006 def to_hennig86(self, file_addr: Union[Path, str], drop_constant: bool = False):
1007 """Writes this Collation to a file in Hennig86 format with the given address.
1008 Note that because Hennig86 format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data.
1010 Args:
1011 file_addr: A string representing the path to an output file.
1012 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1013 """
1014 # Populate a list of sites that will correspond to columns of the sequence alignment:
1015 substantive_variation_unit_ids = self.variation_unit_ids
1016 if drop_constant:
1017 substantive_variation_unit_ids = [
1018 vu_id
1019 for vu_id in self.variation_unit_ids
1020 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1021 ]
1022 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1023 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1024 # Start by calculating the values we will be using here:
1025 ntax = len(self.witnesses)
1026 nchar = len(substantive_variation_unit_ids)
1027 taxlabels = []
1028 for wit in self.witnesses:
1029 taxlabel = wit.id
1030 # Hennig86 format requires taxon names to start with a letter, so if this is not the case, then append "WIT_" to the start of the name:
1031 if taxlabel[0] not in string.ascii_letters:
1032 taxlabel = "WIT_" + taxlabel
1033 # Then replace any disallowed characters in the string with an underscore:
1034 taxlabel = slugify(taxlabel, lowercase=False, separator='_')
1035 taxlabels.append(taxlabel)
1036 max_taxlabel_length = max(
1037 [len(taxlabel) for taxlabel in taxlabels]
1038 ) # keep track of the longest taxon label for tabular alignment purposes
1039 missing_symbol = '?'
1040 symbols = self.get_hennig86_symbols()
1041 # Generate all parent folders for this file that don't already exist:
1042 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
1043 with open(file_addr, "w", encoding="ascii") as f:
1044 # Start with the nstates header:
1045 f.write("nstates %d;\n" % len(symbols))
1046 # Then begin the xread block:
1047 f.write("xread\n")
1048 # Write the dimensions:
1049 f.write("%d %d\n" % (nchar, ntax))
1050 # Now write the matrix:
1051 for i, wit in enumerate(self.witnesses):
1052 taxlabel = taxlabels[i]
1053 # Add enough space after this label ensure that all sequences are nicely aligned:
1054 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel) + 1))
1055 for j, vu_id in enumerate(self.variation_unit_ids):
1056 if vu_id not in substantive_variation_unit_ids_set:
1057 continue
1058 rdg_support = self.readings_by_witness[wit.id][j]
1059 # If this reading is lacunose in this witness, then use the missing character:
1060 if sum(rdg_support) == 0:
1061 sequence += missing_symbol
1062 continue
1063 rdg_inds = [
1064 k for k, w in enumerate(rdg_support) if w > 0
1065 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them
1066 # For singleton readings, just print the symbol:
1067 if len(rdg_inds) == 1:
1068 sequence += symbols[rdg_inds[0]]
1069 continue
1070 # For multiple readings, print the missing symbol:
1071 sequence += missing_symbol
1072 f.write("%s\n" % (sequence))
1073 f.write(";")
1074 return
1076 def get_phylip_symbols(self):
1077 """Returns a list of one-character symbols needed to represent the states of all substantive readings in PHYLIP format.
1079 The number of symbols equals the maximum number of substantive readings at any variation unit.
1081 Returns:
1082 A list of individual characters representing states in readings.
1083 """
1084 possible_symbols = (
1085 list(string.digits) + list(string.ascii_lowercase)[:22]
1086 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported
1087 # The number of symbols needed is equal to the length of the longest substantive reading vector:
1088 nsymbols = 0
1089 # If there are no witnesses, then no symbols are needed at all:
1090 if len(self.witnesses) == 0:
1091 return []
1092 wit_id = self.witnesses[0].id
1093 for rdg_support in self.readings_by_witness[wit_id]:
1094 nsymbols = max(nsymbols, len(rdg_support))
1095 phylip_symbols = possible_symbols[:nsymbols]
1096 return phylip_symbols
1098 def to_phylip(self, file_addr: Union[Path, str], drop_constant: bool = False):
1099 """Writes this Collation to a file in PHYLIP format with the given address.
1100 Note that because PHYLIP format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data.
1102 Args:
1103 file_addr: A string representing the path to an output file.
1104 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1105 """
1106 # Populate a list of sites that will correspond to columns of the sequence alignment:
1107 substantive_variation_unit_ids = self.variation_unit_ids
1108 if drop_constant:
1109 substantive_variation_unit_ids = [
1110 vu_id
1111 for vu_id in self.variation_unit_ids
1112 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1113 ]
1114 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1115 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1116 # Start by calculating the values we will be using here:
1117 ntax = len(self.witnesses)
1118 nchar = len(substantive_variation_unit_ids)
1119 taxlabels = []
1120 for wit in self.witnesses:
1121 taxlabel = wit.id
1122 # Then replace any disallowed characters in the string with an underscore:
1123 taxlabel = slugify(taxlabel, lowercase=False, separator='_')
1124 taxlabels.append(taxlabel)
1125 max_taxlabel_length = max(
1126 [len(taxlabel) for taxlabel in taxlabels]
1127 ) # keep track of the longest taxon label for tabular alignment purposes
1128 missing_symbol = '?'
1129 symbols = self.get_phylip_symbols()
1130 # Generate all parent folders for this file that don't already exist:
1131 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
1132 with open(file_addr, "w", encoding="ascii") as f:
1133 # Write the dimensions:
1134 f.write("%d %d\n" % (ntax, nchar))
1135 # Now write the matrix:
1136 for i, wit in enumerate(self.witnesses):
1137 taxlabel = taxlabels[i]
1138 # Add enough space after this label ensure that all sequences are nicely aligned:
1139 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel))) + "\t"
1140 for j, vu_id in enumerate(self.variation_unit_ids):
1141 if vu_id not in substantive_variation_unit_ids_set:
1142 continue
1143 rdg_support = self.readings_by_witness[wit.id][j]
1144 # If this reading is lacunose in this witness, then use the missing character:
1145 if sum(rdg_support) == 0:
1146 sequence += missing_symbol
1147 continue
1148 rdg_inds = [
1149 k for k, w in enumerate(rdg_support) if w > 0
1150 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them
1151 # For singleton readings, just print the symbol:
1152 if len(rdg_inds) == 1:
1153 sequence += symbols[rdg_inds[0]]
1154 continue
1155 # For multiple readings, print the missing symbol:
1156 sequence += missing_symbol
1157 f.write("%s\n" % (sequence))
1158 return
1160 def get_fasta_symbols(self):
1161 """Returns a list of one-character symbols needed to represent the states of all substantive readings in FASTA format.
1163 The number of symbols equals the maximum number of substantive readings at any variation unit.
1165 Returns:
1166 A list of individual characters representing states in readings.
1167 """
1168 possible_symbols = (
1169 list(string.digits) + list(string.ascii_lowercase)[:22]
1170 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported
1171 # The number of symbols needed is equal to the length of the longest substantive reading vector:
1172 nsymbols = 0
1173 # If there are no witnesses, then no symbols are needed at all:
1174 if len(self.witnesses) == 0:
1175 return []
1176 wit_id = self.witnesses[0].id
1177 for rdg_support in self.readings_by_witness[wit_id]:
1178 nsymbols = max(nsymbols, len(rdg_support))
1179 fasta_symbols = possible_symbols[:nsymbols]
1180 return fasta_symbols
1182 def to_fasta(self, file_addr: Union[Path, str], drop_constant: bool = False):
1183 """Writes this Collation to a file in FASTA format with the given address.
1184 Note that because FASTA format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data.
1186 Args:
1187 file_addr: A string representing the path to an output file.
1188 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1189 """
1190 # Populate a list of sites that will correspond to columns of the sequence alignment:
1191 substantive_variation_unit_ids = self.variation_unit_ids
1192 if drop_constant:
1193 substantive_variation_unit_ids = [
1194 vu_id
1195 for vu_id in self.variation_unit_ids
1196 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1197 ]
1198 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1199 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1200 # Start by calculating the values we will be using here:
1201 ntax = len(self.witnesses)
1202 nchar = len(substantive_variation_unit_ids)
1203 taxlabels = []
1204 for wit in self.witnesses:
1205 taxlabel = wit.id
1206 # Then replace any disallowed characters in the string with an underscore:
1207 taxlabel = slugify(taxlabel, lowercase=False, separator='_')
1208 taxlabels.append(taxlabel)
1209 max_taxlabel_length = max(
1210 [len(taxlabel) for taxlabel in taxlabels]
1211 ) # keep track of the longest taxon label for tabular alignment purposes
1212 missing_symbol = '?'
1213 symbols = self.get_fasta_symbols()
1214 # Generate all parent folders for this file that don't already exist:
1215 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
1216 with open(file_addr, "w", encoding="ascii") as f:
1217 # Now write the matrix:
1218 for i, wit in enumerate(self.witnesses):
1219 taxlabel = taxlabels[i]
1220 # Add enough space after this label ensure that all sequences are nicely aligned:
1221 sequence = ">%s\n" % taxlabel
1222 for j, vu_id in enumerate(self.variation_unit_ids):
1223 if vu_id not in substantive_variation_unit_ids_set:
1224 continue
1225 rdg_support = self.readings_by_witness[wit.id][j]
1226 # If this reading is lacunose in this witness, then use the missing character:
1227 if sum(rdg_support) == 0:
1228 sequence += missing_symbol
1229 continue
1230 rdg_inds = [
1231 k for k, w in enumerate(rdg_support) if w > 0
1232 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them
1233 # For singleton readings, just print the symbol:
1234 if len(rdg_inds) == 1:
1235 sequence += symbols[rdg_inds[0]]
1236 continue
1237 # For multiple readings, print the missing symbol:
1238 sequence += missing_symbol
1239 f.write("%s\n" % (sequence))
1240 return
1242 def get_beast_symbols(self):
1243 """Returns a list of one-character symbols needed to represent the states of all substantive readings in BEAST format.
1245 The number of symbols equals the maximum number of substantive readings at any variation unit.
1247 Returns:
1248 A list of individual characters representing states in readings.
1249 """
1250 possible_symbols = (
1251 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase)
1252 ) # NOTE: for BEAST, any number of states should theoretically be permissible, but since code maps are required for some reason, we will limit the number of symbols to 62 for now
1253 # The number of symbols needed is equal to the length of the longest substantive reading vector:
1254 nsymbols = 0
1255 # If there are no witnesses, then no symbols are needed at all:
1256 if len(self.witnesses) == 0:
1257 return []
1258 wit_id = self.witnesses[0].id
1259 for rdg_support in self.readings_by_witness[wit_id]:
1260 nsymbols = max(nsymbols, len(rdg_support))
1261 beast_symbols = possible_symbols[:nsymbols]
1262 return beast_symbols
1264 def get_tip_date_range(self):
1265 """Gets the minimum and maximum dates attested among the witnesses.
1266 Also checks if the witness with the latest possible date has a fixed date
1267 (i.e, if the lower and upper bounds for its date are the same)
1268 and issues a warning if not, as this will cause unusual behavior in BEAST 2.
1270 Returns:
1271 A tuple containing the earliest and latest possible tip dates.
1272 """
1273 earliest_date = None
1274 earliest_wit = None
1275 latest_date = None
1276 latest_wit = None
1277 for wit in self.witnesses:
1278 wit_id = wit.id
1279 date_range = wit.date_range
1280 if date_range[0] is not None:
1281 if earliest_date is not None:
1282 earliest_wit = wit if date_range[0] < earliest_date else earliest_wit
1283 earliest_date = min(date_range[0], earliest_date)
1284 else:
1285 earliest_wit = wit
1286 earliest_date = date_range[0]
1287 if date_range[1] is not None:
1288 if latest_date is not None:
1289 latest_wit = (
1290 wit
1291 if (date_range[1] > latest_date or (date_range[0] == date_range[1] == latest_date))
1292 else latest_wit
1293 ) # the second check ensures that a witness with a fixed date is preferred to a witness with a date range that ends with the same date
1294 latest_date = max(date_range[1], latest_date)
1295 else:
1296 latest_wit = wit
1297 latest_date = date_range[1]
1298 if latest_wit.date_range[0] is None or latest_wit.date_range[0] != latest_wit.date_range[1]:
1299 print(
1300 "WARNING: the latest witness, %s, has a variable date range; this will result in problems with time-dependent substitution models and misalignment of trees in BEAST DensiTree outputs! Please ensure that witness %s has a fixed date."
1301 % (latest_wit.id, latest_wit.id)
1302 )
1303 return (earliest_date, latest_date)
1305 def get_beast_origin_span(self, tip_date_range):
1306 """Returns a tuple containing the lower and upper bounds for the height of the origin of the Birth-Death Skyline model.
1307 The upper bound on the height of the tree is the difference between the latest tip date
1308 and the lower bound on the date of the original work, if both are defined;
1309 otherwise, it is left undefined.
1310 The lower bound on the height of the tree is the difference between the latest tip date
1311 and the upper bound on the date of the original work, if both are defined;
1312 otherwise, it is the difference between the earliest tip date and the latest, if both are defined.
1314 Args:
1315 tip_date_range: A tuple containing the earliest and latest possible tip dates.
1317 Returns:
1318 A tuple containing lower and upper bounds on the origin height for the Birth-Death Skyline model.
1319 """
1320 origin_span = [0, None]
1321 # If the upper bound on the date of the work's composition is defined, then set the lower bound on the height of the origin using it and the latest tip date
1322 # (note that if it had to be defined in terms of witness date lower bounds, then this would have happened already):
1323 if self.origin_date_range[1] is not None:
1324 origin_span[0] = tip_date_range[1] - self.origin_date_range[1]
1325 # If the lower bound on the date of the work's composition is defined, then set the upper bound on the height of the origin using it and the latest tip date:
1326 if self.origin_date_range[0] is not None:
1327 origin_span[1] = tip_date_range[1] - self.origin_date_range[0]
1328 return tuple(origin_span)
1330 def get_beast_date_map(self, taxlabels):
1331 """Returns a string representing witness-to-date mappings in BEAST format.
1333 Since this format requires single dates as opposed to date ranges,
1334 witnesses with closed date ranges will be mapped to the average of their lower and upper bounds,
1335 and witnesses with open date ranges will not be mapped.
1337 Args:
1338 taxlabels: A list of slugified taxon labels.
1340 Returns:
1341 A string containing comma-separated date calibrations of the form witness_id=date.
1342 """
1343 calibrate_strings = []
1344 for i, wit in enumerate(self.witnesses):
1345 taxlabel = taxlabels[i]
1346 date_range = wit.date_range
1347 # If either end of this witness's date range is empty, then do not include it:
1348 if date_range[0] is None or date_range[1] is None:
1349 continue
1350 # Otherwise, take the midpoint of its date range as its date:
1351 date = int((date_range[0] + date_range[1]) / 2)
1352 calibrate_string = "%s=%d" % (taxlabel, date)
1353 calibrate_strings.append(calibrate_string)
1354 # Then output the full date map string:
1355 date_map = ",".join(calibrate_strings)
1356 return date_map
1358 def get_beast_code_map_for_unit(self, symbols, missing_symbol, vu_ind):
1359 """Returns a string containing state/reading code mappings in BEAST format using the given single-state and missing state symbols for the character/variation unit at the given index.
1360 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a code for a dummy state will be included.
1362 Args:
1363 vu_ind: An integer index for the desired unit.
1365 Returns:
1366 A string containing comma-separated code mappings.
1367 """
1368 vu = self.variation_units[vu_ind]
1369 vu_id = vu.id
1370 code_map = {}
1371 for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id])):
1372 code_map[symbols[k]] = str(k)
1373 # If this site is a singleton site, then add a code mapping for the dummy state:
1374 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1:
1375 code_map[symbols[1]] = str(1)
1376 # Then add a mapping for the missing state, including a dummy state if this is a singleton site:
1377 code_map[missing_symbol] = " ".join(
1378 str(k) for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id]))
1379 )
1380 # If this site is a singleton site, then add the dummy state to the missing state mapping:
1381 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1:
1382 code_map[missing_symbol] = code_map[missing_symbol] + " " + str(1)
1383 # Then combine all of the mappings into a single string:
1384 code_map_string = ", ".join([code + "=" + code_map[code] for code in code_map])
1385 return code_map_string
1387 def get_beast_equilibrium_frequencies_for_unit(self, vu_ind):
1388 """Returns a string containing state/reading equilibrium frequencies in BEAST format for the character/variation unit at the given index.
1389 Since the equilibrium frequencies are not used with the substitution models, the equilibrium frequencies simply correspond to a uniform distribution over the states.
1390 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then an equilibrium frequency of 0 will be added for a dummy state.
1392 Args:
1393 vu_ind: An integer index for the desired unit.
1395 Returns:
1396 A string containing space-separated equilibrium frequencies.
1397 """
1398 vu = self.variation_units[vu_ind]
1399 vu_id = vu.id
1400 # If this unit is a singleton, then return the string "0.5 0.5":
1401 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1:
1402 return "0.5 0.5"
1403 # Otherwise, set the equilibrium frequencies according to a uniform distribution:
1404 equilibrium_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len(
1405 self.substantive_readings_by_variation_unit_id[vu_id]
1406 )
1407 equilibrium_frequencies_string = " ".join([str(w) for w in equilibrium_frequencies])
1408 return equilibrium_frequencies_string
1410 def get_beast_root_frequencies_for_unit(self, vu_ind):
1411 """Returns a string containing state/reading root frequencies in BEAST format for the character/variation unit at the given index.
1412 The root frequencies are calculated from the intrinsic odds at this unit.
1413 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a root frequency of 0 will be added for a dummy state.
1414 If no intrinsic odds are specified, then a uniform distribution over all states is assumed.
1416 Args:
1417 vu_ind: An integer index for the desired unit.
1419 Returns:
1420 A string containing space-separated root frequencies.
1421 """
1422 vu = self.variation_units[vu_ind]
1423 vu_id = vu.id
1424 intrinsic_relations = vu.intrinsic_relations
1425 intrinsic_odds_by_id = self.intrinsic_odds_by_id
1426 # If this unit is a singleton, then return the string "1 0":
1427 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1:
1428 return "1 0"
1429 # If this unit has no intrinsic odds, then assume a uniform distribution over all readings:
1430 if len(intrinsic_relations) == 0:
1431 root_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len(
1432 self.substantive_readings_by_variation_unit_id[vu_id]
1433 )
1434 root_frequencies_string = " ".join([str(w) for w in root_frequencies])
1435 return root_frequencies_string
1436 # We will populate the root frequencies based on the intrinsic odds of the readings:
1437 root_frequencies_by_id = {}
1438 for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id]:
1439 root_frequencies_by_id[rdg_id] = 0
1440 # First, construct an adjacency list for efficient edge iteration:
1441 neighbors_by_source = {}
1442 for edge in intrinsic_relations:
1443 s = edge[0]
1444 t = edge[1]
1445 if s not in neighbors_by_source:
1446 neighbors_by_source[s] = []
1447 if t not in neighbors_by_source:
1448 neighbors_by_source[t] = []
1449 neighbors_by_source[s].append(t)
1450 # Next, identify all readings that are not targeted by any intrinsic odds relation:
1451 in_degree_by_reading = {}
1452 for edge in intrinsic_relations:
1453 s = edge[0]
1454 t = edge[1]
1455 if s not in in_degree_by_reading:
1456 in_degree_by_reading[s] = 0
1457 if t not in in_degree_by_reading:
1458 in_degree_by_reading[t] = 0
1459 in_degree_by_reading[t] += 1
1460 starting_nodes = [t for t in in_degree_by_reading if in_degree_by_reading[t] == 0]
1461 # Set the root frequencies for these readings to 1 (they will be normalized later):
1462 for starting_node in starting_nodes:
1463 root_frequencies_by_id[starting_node] = 1.0
1464 # Next, set the frequencies for the remaining readings recursively using the adjacency list:
1465 def update_root_frequencies(s):
1466 for t in neighbors_by_source[s]:
1467 intrinsic_category = intrinsic_relations[(s, t)]
1468 odds = (
1469 intrinsic_odds_by_id[intrinsic_category]
1470 if intrinsic_odds_by_id[intrinsic_category] is not None
1471 else 1.0
1472 ) # TODO: This needs to be handled using parameters once we have it implemented in BEAST
1473 root_frequencies_by_id[t] = root_frequencies_by_id[s] / odds
1474 update_root_frequencies(t)
1475 return
1477 for starting_node in starting_nodes:
1478 update_root_frequencies(starting_node)
1479 # Then produce a normalized vector of root frequencies that corresponds to a probability distribution:
1480 root_frequencies = [
1481 root_frequencies_by_id[rdg_id] for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id]
1482 ]
1483 total_frequencies = sum(root_frequencies)
1484 for k in range(len(root_frequencies)):
1485 root_frequencies[k] = root_frequencies[k] / total_frequencies
1486 root_frequencies_string = " ".join([str(w) for w in root_frequencies])
1487 return root_frequencies_string
1489 def to_beast(
1490 self,
1491 file_addr: Union[Path, str],
1492 drop_constant: bool = False,
1493 clock_model: ClockModel = ClockModel.strict,
1494 ancestral_logger: AncestralLogger = AncestralLogger.state,
1495 seed: int = None,
1496 ):
1497 """Writes this Collation to a file in BEAST format with the given address.
1499 Args:
1500 file_addr: A string representing the path to an output file.
1501 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1502 clock_model: A ClockModel option indicating which clock model to use.
1503 ancestral_logger: An AncestralLogger option indicating which class of logger (if any) to use for ancestral states.
1504 seed: A seed for random number generation (for setting initial values of unspecified transcriptional rates).
1505 """
1506 # Populate a list of sites that will correspond to columns of the sequence alignment:
1507 substantive_variation_unit_ids = self.variation_unit_ids
1508 if drop_constant:
1509 substantive_variation_unit_ids = [
1510 vu_id
1511 for vu_id in self.variation_unit_ids
1512 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1513 ]
1514 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1515 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1516 # First, calculate the values we will be using for the main template:
1517 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses]
1518 missing_symbol = '?'
1519 symbols = self.get_beast_symbols()
1520 tip_date_range = self.get_tip_date_range()
1521 origin_span = self.get_beast_origin_span(tip_date_range)
1522 date_map = self.get_beast_date_map(taxlabels)
1523 # Then populate the necessary objects for the BEAST XML Jinja template:
1524 witness_objects = []
1525 variation_unit_objects = []
1526 intrinsic_category_objects = []
1527 transcriptional_category_objects = []
1528 # Start with witnesses:
1529 for i, wit in enumerate(self.witnesses):
1530 witness_object = {}
1531 # Copy the ID for this witness:
1532 witness_object["id"] = wit.id
1533 # Copy its date bounds:
1534 witness_object["min_date"] = wit.date_range[0]
1535 witness_object["max_date"] = wit.date_range[1]
1536 # Populate its sequence from its entries in the witness's readings dictionary:
1537 sequence = ""
1538 for j, rdg_support in enumerate(self.readings_by_witness[wit.id]):
1539 vu_id = self.variation_unit_ids[j]
1540 # Skip any variation units deemed non-substantive:
1541 if vu_id not in substantive_variation_unit_ids:
1542 continue
1543 # If this witness has a certainty of 0 for all readings, then it is a gap; assign a likelihood of 1 to each reading:
1544 if sum(rdg_support) == 0:
1545 for k, w in enumerate(rdg_support):
1546 sequence += "1"
1547 if k < len(rdg_support) - 1:
1548 sequence += ", "
1549 else:
1550 if len(rdg_support) > 1:
1551 sequence += "; "
1552 else:
1553 # If this site is a singleton site, then add a dummy state:
1554 sequence += ", 0; "
1555 # Otherwise, read the probabilities as they are given:
1556 else:
1557 for k, w in enumerate(rdg_support):
1558 sequence += str(w)
1559 if k < len(rdg_support) - 1:
1560 sequence += ", "
1561 else:
1562 if len(rdg_support) > 1:
1563 sequence += "; "
1564 else:
1565 # If this site is a singleton site, then add a dummy state:
1566 sequence += ", 0; "
1567 # Strip the final semicolon and space from the sequence:
1568 sequence = sequence.strip("; ")
1569 # Then set the witness object's sequence attribute to this string:
1570 witness_object["sequence"] = sequence
1571 witness_objects.append(witness_object)
1572 # Then proceed to variation units:
1573 for j, vu in enumerate(self.variation_units):
1574 if vu.id not in substantive_variation_unit_ids_set:
1575 continue
1576 variation_unit_object = {}
1577 # Copy the ID of this variation unit:
1578 variation_unit_object["id"] = vu.id
1579 # Copy this variation unit's number of substantive readings,
1580 # setting it to 2 if it is a singleton unit:
1581 variation_unit_object["nstates"] = (
1582 len(self.substantive_readings_by_variation_unit_id[vu.id])
1583 if len(self.substantive_readings_by_variation_unit_id[vu.id]) > 1
1584 else 2
1585 )
1586 # Then construct the code map for this unit:
1587 variation_unit_object["code_map"] = self.get_beast_code_map_for_unit(symbols, missing_symbol, j)
1588 # Then populate a comma-separated string of reading labels for this unit:
1589 rdg_texts = []
1590 vu_label = vu.id
1591 for rdg in vu.readings:
1592 key = tuple([vu.id, rdg.id])
1593 if key not in substantive_variation_unit_reading_tuples_set:
1594 continue
1595 rdg_text = slugify(rdg.text, lowercase=False, allow_unicode=True, separator='_')
1596 # Replace any empty reading text with an omission marker:
1597 if rdg_text == "":
1598 rdg_text = "om."
1599 rdg_texts.append(rdg_text)
1600 # If this site is a singleton site, then add a dummy reading for the dummy state:
1601 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1:
1602 rdg_texts.append("DUMMY")
1603 rdg_texts_string = ", ".join(rdg_texts)
1604 variation_unit_object["rdg_texts"] = rdg_texts_string
1605 # Then populate this unit's equilibrium frequency string and its root frequency string:
1606 variation_unit_object["equilibrium_frequencies"] = self.get_beast_equilibrium_frequencies_for_unit(j)
1607 variation_unit_object["root_frequencies"] = self.get_beast_root_frequencies_for_unit(j)
1608 # Then populate a dictionary mapping epoch height ranges to lists of off-diagonal entries for substitution models:
1609 rate_objects_by_epoch_height_range = {}
1610 epoch_height_ranges = []
1611 # Then proceed based on whether the transcriptional relations for this variation unit have been defined:
1612 if len(vu.transcriptional_relations_by_date_range) == 0:
1613 # If there are no transcriptional relations, then map the epoch range of (None, None) to their list of off-diagonal entries:
1614 epoch_height_ranges.append((None, None))
1615 rate_objects_by_epoch_height_range[(None, None)] = []
1616 rate_objects = rate_objects_by_epoch_height_range[(None, None)]
1617 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1:
1618 # If this is a singleton site, then use an arbitrary 2x2 rate matrix:
1619 rate_objects.append({"transcriptional_categories": ["default"], "expression": None})
1620 rate_objects.append({"transcriptional_categories": ["default"], "expression": None})
1621 else:
1622 # If this is a site with multiple substantive readings, but no transcriptional relations list,
1623 # then use a Lewis Mk substitution matrix with the appropriate number of states:
1624 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]):
1625 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]):
1626 # Skip diagonal elements:
1627 if k_1 == k_2:
1628 continue
1629 rate_objects.append({"transcriptional_categories": ["default"], "expression": None})
1630 else:
1631 # Otherwise, proceed for every date range:
1632 for date_range in vu.transcriptional_relations_by_date_range:
1633 # Get the map of transcriptional relations for reference later:
1634 transcriptional_relations = vu.transcriptional_relations_by_date_range[date_range]
1635 # Now get the epoch height range corresponding to this date range, and initialize its list in the dictionary:
1636 epoch_height_range = [None, None]
1637 epoch_height_range[0] = tip_date_range[1] - date_range[1] if date_range[1] is not None else None
1638 epoch_height_range[1] = tip_date_range[1] - date_range[0] if date_range[0] is not None else None
1639 epoch_height_range = tuple(epoch_height_range)
1640 epoch_height_ranges.append(epoch_height_range)
1641 rate_objects_by_epoch_height_range[epoch_height_range] = []
1642 rate_objects = rate_objects_by_epoch_height_range[epoch_height_range]
1643 # Then proceed for every pair of readings in this unit:
1644 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]):
1645 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]):
1646 # Skip diagonal elements:
1647 if k_1 == k_2:
1648 continue
1649 # If the first reading has no transcriptional relation to the second in this unit, then use the default rate:
1650 if (rdg_id_1, rdg_id_2) not in transcriptional_relations:
1651 rate_objects.append({"transcriptional_categories": ["default"], "expression": None})
1652 continue
1653 # Otherwise, if only one category of transcriptional relations holds between the first and second readings,
1654 # then use its rate:
1655 if len(transcriptional_relations[(rdg_id_1, rdg_id_2)]) == 1:
1656 # If there is only one such category, then add its rate as a standalone var element:
1657 transcriptional_category = list(transcriptional_relations[(rdg_id_1, rdg_id_2)])[0]
1658 rate_objects.append(
1659 {"transcriptional_categories": [transcriptional_category], "expression": None}
1660 )
1661 continue
1662 # If there is more than one, then add a var element that is a sum of the individual categories' rates:
1663 transcriptional_categories = list(transcriptional_relations[(rdg_id_1, rdg_id_2)])
1664 args = []
1665 for transcriptional_category in transcriptional_categories:
1666 args.append("%s_rate" % transcriptional_category)
1667 args_string = " ".join(args)
1668 ops = ["+"] * (len(args) - 1)
1669 ops_string = " ".join(ops)
1670 expression_string = " ".join([args_string, ops_string])
1671 rate_objects.append(
1672 {
1673 "transcriptional_categories": transcriptional_categories,
1674 "expression": expression_string,
1675 }
1676 )
1677 # Now reorder the list of epoch height ranges, and get a list of non-null epoch dates in ascending order from the dictionary:
1678 epoch_height_ranges.reverse()
1679 epoch_heights = [
1680 epoch_height_range[0] for epoch_height_range in epoch_height_ranges if epoch_height_range[0] is not None
1681 ]
1682 # Then add all of these data structures to the variation unit object:
1683 variation_unit_object["epoch_heights"] = epoch_heights
1684 variation_unit_object["epoch_heights_string"] = " ".join(
1685 [str(epoch_height) for epoch_height in epoch_heights]
1686 )
1687 variation_unit_object["epoch_height_ranges"] = epoch_height_ranges
1688 variation_unit_object["epoch_rates"] = [
1689 rate_objects_by_epoch_height_range[epoch_height_range] for epoch_height_range in epoch_height_ranges
1690 ]
1691 variation_unit_objects.append(variation_unit_object)
1692 # Then proceed to intrinsic odds categories:
1693 for intrinsic_category in self.intrinsic_categories:
1694 intrinsic_category_object = {}
1695 # Copy the ID of this intrinsic category:
1696 intrinsic_category_object["id"] = intrinsic_category
1697 # Then copy the odds factors associated with this intrinsic category,
1698 # setting it to 1.0 if it is not specified and setting the estimate attribute accordingly:
1699 odds = self.intrinsic_odds_by_id[intrinsic_category]
1700 intrinsic_category_object["odds"] = odds if odds is not None else 1.0
1701 intrinsic_category_object["estimate"] = "false" if odds is not None else "true"
1702 intrinsic_category_objects.append(intrinsic_category_object)
1703 # Then proceed to transcriptional rate categories:
1704 rng = np.random.default_rng(seed)
1705 for transcriptional_category in self.transcriptional_categories:
1706 transcriptional_category_object = {}
1707 # Copy the ID of this transcriptional category:
1708 transcriptional_category_object["id"] = transcriptional_category
1709 # Then copy the rate of this transcriptional category,
1710 # setting it to a random number sampled from a Gamma distribution if it is not specified and setting the estimate attribute accordingly:
1711 rate = self.transcriptional_rates_by_id[transcriptional_category]
1712 transcriptional_category_object["rate"] = rate if rate is not None else rng.gamma(5.0, 2.0)
1713 transcriptional_category_object["estimate"] = "false" if rate is not None else "true"
1714 transcriptional_category_objects.append(transcriptional_category_object)
1715 # Now render the output XML file using the Jinja template:
1716 env = Environment(loader=PackageLoader("teiphy", "templates"), autoescape=select_autoescape())
1717 template = env.get_template("beast_template.xml")
1718 rendered = template.render(
1719 nsymbols=len(symbols),
1720 date_map=date_map,
1721 origin_span=origin_span,
1722 clock_model=clock_model.value,
1723 clock_rate_categories=2 * len(self.witnesses) - 1,
1724 ancestral_logger=ancestral_logger.value,
1725 witnesses=witness_objects,
1726 variation_units=variation_unit_objects,
1727 intrinsic_categories=intrinsic_category_objects,
1728 transcriptional_categories=transcriptional_category_objects,
1729 )
1730 # Generate all parent folders for this file that don't already exist:
1731 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
1732 with open(file_addr, "w", encoding="utf-8") as f:
1733 f.write(rendered)
1734 return
1736 def to_numpy(self, drop_constant: bool = False, split_missing: SplitMissingType = None):
1737 """Returns this Collation in the form of a NumPy array, along with arrays of its row and column labels.
1739 Args:
1740 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1741 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings.
1742 If not specified, then missing data is ignored (i.e., all states are 0).
1743 If "uniform", then the contribution of 1 is divided evenly over all substantive readings.
1744 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing.
1746 Returns:
1747 A NumPy array with a row for each substantive reading and a column for each witness.
1748 A list of substantive reading ID strings.
1749 A list of witness ID strings.
1750 """
1751 # Populate a list of sites that will correspond to columns of the sequence alignment:
1752 substantive_variation_unit_ids = self.variation_unit_ids
1753 if drop_constant:
1754 substantive_variation_unit_ids = [
1755 vu_id
1756 for vu_id in self.variation_unit_ids
1757 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1758 ]
1759 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1760 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1761 # Initialize the output array with the appropriate dimensions:
1762 reading_labels = []
1763 for vu in self.variation_units:
1764 if vu.id not in substantive_variation_unit_ids_set:
1765 continue
1766 for rdg in vu.readings:
1767 key = tuple([vu.id, rdg.id])
1768 if key in substantive_variation_unit_reading_tuples_set:
1769 reading_labels.append(vu.id + ", " + rdg.text)
1770 witness_labels = [wit.id for wit in self.witnesses]
1771 matrix = np.zeros((len(reading_labels), len(witness_labels)), dtype=float)
1772 # For each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings:
1773 support_proportions_by_unit = {}
1774 for j, vu_id in enumerate(self.variation_unit_ids):
1775 if vu_id not in substantive_variation_unit_ids_set:
1776 continue
1777 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id])
1778 for i, wit in enumerate(self.witnesses):
1779 rdg_support = self.readings_by_witness[wit.id][j]
1780 for l, w in enumerate(rdg_support):
1781 support_proportions[l] += w
1782 norm = (
1783 sum(support_proportions) if sum(support_proportions) > 0 else 1.0
1784 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero
1785 for l in range(len(support_proportions)):
1786 support_proportions[l] = support_proportions[l] / norm
1787 support_proportions_by_unit[vu_id] = support_proportions
1788 # Then populate it with the appropriate values:
1789 col_ind = 0
1790 for i, wit in enumerate(self.witnesses):
1791 row_ind = 0
1792 for j, vu_id in enumerate(self.variation_unit_ids):
1793 if vu_id not in substantive_variation_unit_ids_set:
1794 continue
1795 rdg_support = self.readings_by_witness[wit.id][j]
1796 # If this reading support vector sums to 0, then this is missing data; handle it as specified:
1797 if sum(rdg_support) == 0:
1798 if split_missing == SplitMissingType.uniform:
1799 for l in range(len(rdg_support)):
1800 matrix[row_ind, col_ind] = 1 / len(rdg_support)
1801 row_ind += 1
1802 elif split_missing == SplitMissingType.proportional:
1803 for l in range(len(rdg_support)):
1804 matrix[row_ind, col_ind] = support_proportions_by_unit[vu_id][l]
1805 row_ind += 1
1806 else:
1807 row_ind += len(rdg_support)
1808 # Otherwise, add its coefficients normally:
1809 else:
1810 for l in range(len(rdg_support)):
1811 matrix[row_ind, col_ind] = rdg_support[l]
1812 row_ind += 1
1813 col_ind += 1
1814 return matrix, reading_labels, witness_labels
1816 def to_distance_matrix(self, drop_constant: bool = False, proportion: bool = False, show_ext: bool = False):
1817 """Transforms this Collation into a NumPy distance matrix between witnesses, along with an array of its labels for the witnesses.
1818 Distances can be computed either as counts of disagreements (the default setting), or as proportions of disagreements over all variation units where both witnesses have singleton readings.
1819 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of disagreements.
1821 Args:
1822 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1823 Default value is False.
1824 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units.
1825 Default value is False.
1826 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix
1827 should include the number of their extant, unambiguous variation units after the number of their disagreements.
1828 Default value is False.
1830 Returns:
1831 A NumPy distance matrix with a row and column for each witness.
1832 A list of witness ID strings.
1833 """
1834 # Populate a list of sites that will correspond to columns of the sequence alignment:
1835 substantive_variation_unit_ids = self.variation_unit_ids
1836 if drop_constant:
1837 substantive_variation_unit_ids = [
1838 vu_id
1839 for vu_id in self.variation_unit_ids
1840 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1841 ]
1842 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1843 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1844 # Initialize the output array with the appropriate dimensions:
1845 witness_labels = [wit.id for wit in self.witnesses]
1846 # The type of the matrix will depend on the input options:
1847 matrix = None
1848 if show_ext:
1849 matrix = np.full(
1850 (len(witness_labels), len(witness_labels)), "NA", dtype=object
1851 ) # strings of the form "disagreements/extant"
1852 elif proportion:
1853 matrix = np.full(
1854 (len(witness_labels), len(witness_labels)), 0.0, dtype=float
1855 ) # floats of the form disagreements/extant
1856 else:
1857 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form disagreements
1858 for i, wit_1 in enumerate(witness_labels):
1859 for j, wit_2 in enumerate(witness_labels):
1860 extant_units = 0
1861 disagreements = 0
1862 # If either of the cells for this pair of witnesses has been populated already,
1863 # then just copy the entry from the other side of the diagonal without recalculating:
1864 if i > j:
1865 matrix[i, j] = matrix[j, i]
1866 continue
1867 # Otherwise, calculate the number of units where both witnesses have unambiguous readings
1868 # and the number of units where they disagree:
1869 for k, vu_id in enumerate(self.variation_unit_ids):
1870 if vu_id not in substantive_variation_unit_ids_set:
1871 continue
1872 wit_1_rdg_support = self.readings_by_witness[wit_1][k]
1873 wit_2_rdg_support = self.readings_by_witness[wit_2][k]
1874 wit_1_rdg_inds = [l for l, w in enumerate(wit_1_rdg_support) if w > 0]
1875 wit_2_rdg_inds = [l for l, w in enumerate(wit_2_rdg_support) if w > 0]
1876 if len(wit_1_rdg_inds) != 1 or len(wit_2_rdg_inds) != 1:
1877 continue
1878 extant_units += 1
1879 if wit_1_rdg_inds[0] != wit_2_rdg_inds[0]:
1880 disagreements += 1
1881 cell_entry = None
1882 if proportion:
1883 cell_entry = disagreements / max(
1884 extant_units, 1
1885 ) # the max in the denominator is to prevent division by 0; the distance entry will be 0 if the two witnesses have no overlap
1886 else:
1887 cell_entry = disagreements
1888 if show_ext:
1889 cell_entry = str(cell_entry) + "/" + str(extant_units)
1890 matrix[i, j] = cell_entry
1891 return matrix, witness_labels
1893 def to_similarity_matrix(self, drop_constant: bool = False, proportion: bool = False, show_ext: bool = False):
1894 """Transforms this Collation into a NumPy similarity matrix between witnesses, along with an array of its labels for the witnesses.
1895 Similarities can be computed either as counts of agreements (the default setting), or as proportions of agreements over all variation units where both witnesses have singleton readings.
1896 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of agreements.
1898 Args:
1899 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1900 Default value is False.
1901 proportion (bool, optional): An optional flag indicating whether or not to calculate similarities as proportions over extant, unambiguous variation units.
1902 Default value is False.
1903 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix
1904 should include the number of their extant, unambiguous variation units after the number of agreements.
1905 Default value is False.
1907 Returns:
1908 A NumPy agreement matrix with a row and column for each witness.
1909 A list of witness ID strings.
1910 """
1911 # Populate a list of sites that will correspond to columns of the sequence alignment:
1912 substantive_variation_unit_ids = self.variation_unit_ids
1913 if drop_constant:
1914 substantive_variation_unit_ids = [
1915 vu_id
1916 for vu_id in self.variation_unit_ids
1917 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1918 ]
1919 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1920 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1921 # Initialize the output array with the appropriate dimensions:
1922 witness_labels = [wit.id for wit in self.witnesses]
1923 # The type of the matrix will depend on the input options:
1924 matrix = None
1925 if show_ext:
1926 matrix = np.full(
1927 (len(witness_labels), len(witness_labels)), "NA", dtype=object
1928 ) # strings of the form "agreements/extant"
1929 elif proportion:
1930 matrix = np.full(
1931 (len(witness_labels), len(witness_labels)), 0.0, dtype=float
1932 ) # floats of the form agreements/extant
1933 else:
1934 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form agreements
1935 for i, wit_1 in enumerate(witness_labels):
1936 for j, wit_2 in enumerate(witness_labels):
1937 extant_units = 0
1938 agreements = 0
1939 # If either of the cells for this pair of witnesses has been populated already,
1940 # then just copy the entry from the other side of the diagonal without recalculating:
1941 if i > j:
1942 matrix[i, j] = matrix[j, i]
1943 continue
1944 # Otherwise, calculate the number of units where both witnesses have unambiguous readings
1945 # and the number of units where they agree:
1946 for k, vu_id in enumerate(self.variation_unit_ids):
1947 if vu_id not in substantive_variation_unit_ids_set:
1948 continue
1949 wit_1_rdg_support = self.readings_by_witness[wit_1][k]
1950 wit_2_rdg_support = self.readings_by_witness[wit_2][k]
1951 wit_1_rdg_inds = [l for l, w in enumerate(wit_1_rdg_support) if w > 0]
1952 wit_2_rdg_inds = [l for l, w in enumerate(wit_2_rdg_support) if w > 0]
1953 if len(wit_1_rdg_inds) != 1 or len(wit_2_rdg_inds) != 1:
1954 continue
1955 extant_units += 1
1956 if wit_1_rdg_inds[0] == wit_2_rdg_inds[0]:
1957 agreements += 1
1958 cell_entry = None
1959 if proportion:
1960 cell_entry = agreements / max(
1961 extant_units, 1
1962 ) # the max in the denominator is to prevent division by 0; the distance entry will be 0 if the two witnesses have no overlap
1963 else:
1964 cell_entry = agreements
1965 if show_ext:
1966 cell_entry = str(cell_entry) + "/" + str(extant_units)
1967 matrix[i, j] = cell_entry
1968 return matrix, witness_labels
1970 def to_idf_matrix(self, drop_constant: bool = False, split_missing: SplitMissingType = None):
1971 """Transforms this Collation into a NumPy inverse document frequency-weighted agreements matrix between witnesses, along with an array of its labels for the witnesses.
1972 The IDF weight of an agreement on a given reading is the information content -log(Pr(R)) of the event R of randomly sampling a witness with that reading.
1973 The IDF-weighted agreement score for two witnesses is the sum of the IDF weights for the readings at which they agree.
1974 Where any witness is lacunose or ambiguous, it contributes to its potential readings' sampling probabilities in proportion to its degrees of support for those readings.
1975 Similarly, where one or both target witnesses are lacunose or ambiguous, the expected information content of their agreement is calculated based on the probabilities of their having the same reading.
1977 Args:
1978 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1979 Default value is False.
1980 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings.
1981 If not specified, then missing data is ignored (i.e., all states are 0).
1982 If "uniform", then the contribution of 1 is divided evenly over all substantive readings.
1983 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing.
1985 Returns:
1986 A NumPy IDF-weighted agreement matrix with a row and column for each witness.
1987 A list of witness ID strings.
1988 """
1989 # Populate a list of sites that will correspond to columns of the sequence alignment:
1990 substantive_variation_unit_ids = self.variation_unit_ids
1991 if drop_constant:
1992 substantive_variation_unit_ids = [
1993 vu_id
1994 for vu_id in self.variation_unit_ids
1995 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1996 ]
1997 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1998 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1999 # Initialize the output array with the appropriate dimensions:
2000 witness_labels = [wit.id for wit in self.witnesses]
2001 # For each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings:
2002 support_proportions_by_unit = {}
2003 for j, vu_id in enumerate(self.variation_unit_ids):
2004 # Skip this variation unit if it is a dropped constant site:
2005 if vu_id not in substantive_variation_unit_ids_set:
2006 continue
2007 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id])
2008 for i, wit in enumerate(witness_labels):
2009 rdg_support = self.readings_by_witness[wit][j]
2010 for l, w in enumerate(rdg_support):
2011 support_proportions[l] += w
2012 norm = (
2013 sum(support_proportions) if sum(support_proportions) > 0 else 1.0
2014 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero
2015 for l in range(len(support_proportions)):
2016 support_proportions[l] = support_proportions[l] / norm
2017 support_proportions_by_unit[vu_id] = support_proportions
2018 # Then populate the matrix one variation unit at a time:
2019 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float)
2020 for k, vu_id in enumerate(self.variation_unit_ids):
2021 # Skip this variation unit if it is a dropped constant site:
2022 if vu_id not in substantive_variation_unit_ids_set:
2023 continue
2024 # Otherwise, calculate the sampling probabilities for each reading in this unit:
2025 sampling_probabilities = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id])
2026 rdg_support_by_witness = {}
2027 for i, wit in enumerate(witness_labels):
2028 rdg_support = self.readings_by_witness[wit][k]
2029 # Check if this reading support vector represents missing data:
2030 norm = sum(rdg_support)
2031 if norm == 0:
2032 # If this reading support vector sums to 0, then this is missing data; handle it as specified:
2033 if split_missing == SplitMissingType.uniform:
2034 rdg_support = [1 / len(rdg_support) for l in range(len(rdg_support))]
2035 elif split_missing == SplitMissingType.proportional:
2036 rdg_support = [support_proportions_by_unit[vu_id][l] for l in range(len(rdg_support))]
2037 else:
2038 # Otherwise, the data is present, though it may be ambiguous; normalize the reading probabilities to sum to 1:
2039 rdg_support = [w / norm for l, w in enumerate(rdg_support)]
2040 rdg_support_by_witness[wit] = rdg_support
2041 # Then add this witness's contributions to the readings' sampling probabilities:
2042 for l, w in enumerate(rdg_support_by_witness[wit]):
2043 sampling_probabilities[l] += w
2044 norm = (
2045 sum(sampling_probabilities) if sum(sampling_probabilities) > 0 else 1.0
2046 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero
2047 # Otherwise, normalize the sampling probabilities so they sum to 1:
2048 sampling_probabilities = [w / norm for w in sampling_probabilities]
2049 # Then calculate the IDF weights for agreements between witnesses in this unit:
2050 for i, wit_1 in enumerate(witness_labels):
2051 for j, wit_2 in enumerate(witness_labels):
2052 # The contribution to the weighted sum for these witnesses will be identical regardless of the order in which they are specified,
2053 # so we only have to calculate it once:
2054 if i > j:
2055 matrix[i, j] = matrix[j, i]
2056 continue
2057 # First, calculate the probability that these two witnesses agree:
2058 wit_1_rdg_support = rdg_support_by_witness[wit_1]
2059 wit_2_rdg_support = rdg_support_by_witness[wit_2]
2060 probability_of_agreement = sum(
2061 [wit_1_rdg_support[l] * wit_2_rdg_support[l] for l in range(len(sampling_probabilities))]
2062 )
2063 # If these witnesses do not agree at this variation unit, then this unit contributes nothing to their total score:
2064 if probability_of_agreement == 0:
2065 continue
2066 # Otherwise, calculate the expected information content (in bits) of their agreement given their agreement on that reading
2067 # (skipping readings with a sampling probability of 0):
2068 expected_information_content = sum(
2069 [
2070 -math.log2(sampling_probabilities[l])
2071 * (wit_1_rdg_support[l] * wit_2_rdg_support[l] / probability_of_agreement)
2072 for l in range(len(sampling_probabilities))
2073 if sampling_probabilities[l] > 0.0
2074 ]
2075 )
2076 # Then add this contribution to the total score for these two witnesses:
2077 matrix[i, j] += expected_information_content
2078 matrix[j, i] += expected_information_content
2079 return matrix, witness_labels
2081 def to_nexus_table(self, drop_constant: bool = False, ambiguous_as_missing: bool = False):
2082 """Returns this Collation in the form of a table with rows for taxa, columns for characters, and reading IDs in cells.
2084 Args:
2085 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
2086 Default value is False.
2087 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data.
2088 Default value is False.
2090 Returns:
2091 A NumPy array with rows for taxa, columns for characters, and reading IDs in cells.
2092 A list of substantive reading ID strings.
2093 A list of witness ID strings.
2094 """
2095 # Populate a list of sites that will correspond to columns of the sequence alignment:
2096 substantive_variation_unit_ids = self.variation_unit_ids
2097 if drop_constant:
2098 substantive_variation_unit_ids = [
2099 vu_id
2100 for vu_id in self.variation_unit_ids
2101 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
2102 ]
2103 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
2104 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
2105 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary
2106 # to the readings' IDs:
2107 reading_ids_by_indices = {}
2108 for j, vu in enumerate(self.variation_units):
2109 if vu.id not in substantive_variation_unit_ids_set:
2110 continue
2111 k = 0
2112 for rdg in vu.readings:
2113 key = tuple([vu.id, rdg.id])
2114 if key not in substantive_variation_unit_reading_tuples_set:
2115 continue
2116 indices = tuple([j, k])
2117 reading_ids_by_indices[indices] = rdg.id
2118 k += 1
2119 # Initialize the output array with the appropriate dimensions:
2120 missing_symbol = '?'
2121 witness_labels = [wit.id for wit in self.witnesses]
2122 matrix = np.full(
2123 (len(witness_labels), len(substantive_variation_unit_ids)), missing_symbol, dtype=object
2124 ) # use dtype=object because the maximum string length is not known up front
2125 # Then populate it with the appropriate values:
2126 row_ind = 0
2127 for i, wit in enumerate(self.witnesses):
2128 col_ind = 0
2129 for j, vu in enumerate(self.variation_units):
2130 if vu.id not in substantive_variation_unit_ids_set:
2131 continue
2132 rdg_support = self.readings_by_witness[wit.id][j]
2133 # If this reading support vector sums to 0, then this is missing data; handle it as specified:
2134 if sum(rdg_support) == 0:
2135 matrix[row_ind, col_ind] = missing_symbol
2136 # Otherwise, add its coefficients normally:
2137 else:
2138 rdg_inds = [
2139 k for k, w in enumerate(rdg_support) if w > 0
2140 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them
2141 # For singleton readings, just print the reading ID:
2142 if len(rdg_inds) == 1:
2143 k = rdg_inds[0]
2144 matrix[row_ind, col_ind] = reading_ids_by_indices[(j, k)]
2145 # For multiple readings, print the corresponding reading IDs in braces or the missing symbol depending on input settings:
2146 else:
2147 if ambiguous_as_missing:
2148 matrix[row_ind, col_ind] = missing_symbol
2149 else:
2150 matrix[row_ind, col_ind] = "{%s}" % " ".join(
2151 [reading_ids_by_indices[(j, k)] for k in rdg_inds]
2152 )
2153 col_ind += 1
2154 row_ind += 1
2155 return matrix, witness_labels, substantive_variation_unit_ids
2157 def to_long_table(self, drop_constant: bool = False):
2158 """Returns this Collation in the form of a long table with columns for taxa, characters, reading indices, and reading values.
2159 Note that this method treats ambiguous readings as missing data.
2161 Args:
2162 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
2163 Default value is False.
2165 Returns:
2166 A NumPy array with columns for taxa, characters, reading indices, and reading values, and rows for each combination of these values in the matrix.
2167 A list of column label strings.
2168 """
2169 # Populate a list of sites that will correspond to columns of the sequence alignment:
2170 substantive_variation_unit_ids = self.variation_unit_ids
2171 if drop_constant:
2172 substantive_variation_unit_ids = [
2173 vu_id
2174 for vu_id in self.variation_unit_ids
2175 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
2176 ]
2177 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
2178 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
2179 # Initialize the outputs:
2180 column_labels = ["taxon", "character", "state", "value"]
2181 long_table_list = []
2182 # Populate a dictionary mapping (variation unit index, reading index) tuples to reading texts:
2183 reading_texts_by_indices = {}
2184 for j, vu in enumerate(self.variation_units):
2185 if vu.id not in substantive_variation_unit_ids_set:
2186 continue
2187 k = 0
2188 for rdg in vu.readings:
2189 key = tuple([vu.id, rdg.id])
2190 if key not in substantive_variation_unit_reading_tuples_set:
2191 continue
2192 indices = tuple([j, k])
2193 reading_texts_by_indices[indices] = rdg.text
2194 k += 1
2195 # Then populate the output list with the appropriate values:
2196 witness_labels = [wit.id for wit in self.witnesses]
2197 missing_symbol = '?'
2198 for i, wit in enumerate(self.witnesses):
2199 row_ind = 0
2200 for j, vu_id in enumerate(self.variation_unit_ids):
2201 if vu_id not in substantive_variation_unit_ids_set:
2202 continue
2203 rdg_support = self.readings_by_witness[wit.id][j]
2204 # Populate a list of nonzero coefficients for this reading support vector:
2205 rdg_inds = [k for k, w in enumerate(rdg_support) if w > 0]
2206 # If this list does not consist of exactly one reading, then treat it as missing data:
2207 if len(rdg_inds) != 1:
2208 long_table_list.append([wit.id, vu_id, missing_symbol, missing_symbol])
2209 continue
2210 k = rdg_inds[0]
2211 rdg_text = reading_texts_by_indices[(j, k)]
2212 # Replace empty reading texts with the omission placeholder:
2213 if rdg_text == "":
2214 rdg_text = "om."
2215 long_table_list.append([wit.id, vu_id, k, rdg_text])
2216 # Then convert the long table entries list to a NumPy array:
2217 long_table = np.array(long_table_list)
2218 return long_table, column_labels
2220 def to_dataframe(
2221 self,
2222 drop_constant: bool = False,
2223 ambiguous_as_missing: bool = False,
2224 proportion: bool = False,
2225 table_type: TableType = TableType.matrix,
2226 split_missing: SplitMissingType = None,
2227 show_ext: bool = False,
2228 ):
2229 """Returns this Collation in the form of a Pandas DataFrame array, including the appropriate row and column labels.
2231 Args:
2232 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
2233 Default value is False.
2234 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data.
2235 Default value is False.
2236 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units.
2237 Default value is False.
2238 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate.
2239 Only applicable for tabular outputs.
2240 Default value is "matrix".
2241 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings.
2242 If not specified, then missing data is ignored (i.e., all states are 0).
2243 If "uniform", then the contribution of 1 is divided evenly over all substantive readings.
2244 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing.
2245 Only applicable for table types "matrix" and "idf".
2246 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix
2247 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements.
2248 Only applicable for tabular output formats of type \"distance\" or \"similarity\".
2249 Default value is False.
2251 Returns:
2252 A Pandas DataFrame corresponding to a collation matrix with reading frequencies or a long table with discrete reading states.
2253 """
2254 df = None
2255 # Proceed based on the table type:
2256 if table_type == TableType.matrix:
2257 # Convert the collation to a NumPy array and get its row and column labels first:
2258 matrix, reading_labels, witness_labels = self.to_numpy(
2259 drop_constant=drop_constant, split_missing=split_missing
2260 )
2261 df = pd.DataFrame(matrix, index=reading_labels, columns=witness_labels)
2262 elif table_type == TableType.distance:
2263 # Convert the collation to a NumPy array and get its row and column labels first:
2264 matrix, witness_labels = self.to_distance_matrix(
2265 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext
2266 )
2267 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels)
2268 elif table_type == TableType.similarity:
2269 # Convert the collation to a NumPy array and get its row and column labels first:
2270 matrix, witness_labels = self.to_similarity_matrix(
2271 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext
2272 )
2273 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels)
2274 elif table_type == TableType.idf:
2275 # Convert the collation to a NumPy array and get its row and column labels first:
2276 matrix, witness_labels = self.to_idf_matrix(drop_constant=drop_constant, split_missing=split_missing)
2277 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels)
2278 elif table_type == TableType.nexus:
2279 # Convert the collation to a NumPy array and get its row and column labels first:
2280 matrix, witness_labels, vu_labels = self.to_nexus_table(
2281 drop_constant=drop_constant, ambiguous_as_missing=ambiguous_as_missing
2282 )
2283 df = pd.DataFrame(matrix, index=witness_labels, columns=vu_labels)
2284 elif table_type == TableType.long:
2285 # Convert the collation to a long table and get its column labels first:
2286 long_table, column_labels = self.to_long_table(drop_constant=drop_constant)
2287 df = pd.DataFrame(long_table, columns=column_labels)
2288 return df
2290 def to_csv(
2291 self,
2292 file_addr: Union[Path, str],
2293 drop_constant: bool = False,
2294 ambiguous_as_missing: bool = False,
2295 proportion: bool = False,
2296 table_type: TableType = TableType.matrix,
2297 split_missing: SplitMissingType = None,
2298 show_ext: bool = False,
2299 **kwargs
2300 ):
2301 """Writes this Collation to a comma-separated value (CSV) file with the given address.
2303 If your witness IDs are numeric (e.g., Gregory-Aland numbers), then they will be written in full to the CSV file, but Excel will likely interpret them as numbers and truncate any leading zeroes!
2305 Args:
2306 file_addr: A string representing the path to an output CSV file; the file type should be .csv.
2307 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading.
2308 Default value is False.
2309 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data.
2310 Default value is False.
2311 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units.
2312 Default value is False.
2313 table_type: A TableType option indicating which type of tabular output to generate.
2314 Only applicable for tabular outputs.
2315 Default value is "matrix".
2316 split_missing: An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings.
2317 If not specified, then missing data is ignored (i.e., all states are 0).
2318 If "uniform", then the contribution of 1 is divided evenly over all substantive readings.
2319 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing.
2320 Only applicable for table types "matrix" and "idf".
2321 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix
2322 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements.
2323 Only applicable for tabular output formats of type \"distance\" or \"similarity\".
2324 Default value is False.
2325 **kwargs: Keyword arguments for pandas.DataFrame.to_csv.
2326 """
2327 # Convert the collation to a Pandas DataFrame first:
2328 df = self.to_dataframe(
2329 drop_constant=drop_constant,
2330 ambiguous_as_missing=ambiguous_as_missing,
2331 proportion=proportion,
2332 table_type=table_type,
2333 split_missing=split_missing,
2334 show_ext=show_ext,
2335 )
2336 # Generate all parent folders for this file that don't already exist:
2337 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
2338 # Proceed based on the table type:
2339 if table_type == TableType.long:
2340 return df.to_csv(
2341 file_addr, encoding="utf-8-sig", index=False, **kwargs
2342 ) # add BOM to start of file so that Excel will know to read it as Unicode
2343 return df.to_csv(
2344 file_addr, encoding="utf-8-sig", **kwargs
2345 ) # add BOM to start of file so that Excel will know to read it as Unicode
2347 def to_excel(
2348 self,
2349 file_addr: Union[Path, str],
2350 drop_constant: bool = False,
2351 ambiguous_as_missing: bool = False,
2352 proportion: bool = False,
2353 table_type: TableType = TableType.matrix,
2354 split_missing: SplitMissingType = None,
2355 show_ext: bool = False,
2356 ):
2357 """Writes this Collation to an Excel (.xlsx) file with the given address.
2359 Since Pandas is deprecating its support for xlwt, specifying an output in old Excel (.xls) output is not recommended.
2361 Args:
2362 file_addr: A string representing the path to an output Excel file; the file type should be .xlsx.
2363 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading.
2364 Default value is False.
2365 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data.
2366 Default value is False.
2367 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units.
2368 Default value is False.
2369 table_type: A TableType option indicating which type of tabular output to generate.
2370 Only applicable for tabular outputs.
2371 Default value is "matrix".
2372 split_missing: An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings.
2373 If not specified, then missing data is ignored (i.e., all states are 0).
2374 If "uniform", then the contribution of 1 is divided evenly over all substantive readings.
2375 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing.
2376 Only applicable for table types "matrix" and "idf".
2377 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix
2378 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements.
2379 Only applicable for tabular output formats of type \"distance\" or \"similarity\".
2380 Default value is False.
2381 """
2382 # Convert the collation to a Pandas DataFrame first:
2383 df = self.to_dataframe(
2384 drop_constant=drop_constant,
2385 ambiguous_as_missing=ambiguous_as_missing,
2386 proportion=proportion,
2387 table_type=table_type,
2388 split_missing=split_missing,
2389 show_ext=show_ext,
2390 )
2391 # Generate all parent folders for this file that don't already exist:
2392 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
2393 # Proceed based on the table type:
2394 if table_type == TableType.long:
2395 return df.to_excel(file_addr, index=False)
2396 return df.to_excel(file_addr)
2398 def to_phylip_matrix(
2399 self,
2400 file_addr: Union[Path, str],
2401 drop_constant: bool = False,
2402 proportion: bool = False,
2403 table_type: TableType = TableType.distance,
2404 show_ext: bool = False,
2405 ):
2406 """Writes this Collation as a PHYLIP-formatted distance/similarity matrix to the file with the given address.
2408 Args:
2409 file_addr: A string representing the path to an output PHYLIP file; the file type should be .ph or .phy.
2410 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading.
2411 Default value is False.
2412 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units.
2413 Default value is False.
2414 table_type: A TableType option indicating which type of tabular output to generate.
2415 For PHYLIP-formatted outputs, distance and similarity matrices are the only supported table types.
2416 Default value is "distance".
2417 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix
2418 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements.
2419 Only applicable for tabular output formats of type \"distance\" or \"similarity\".
2420 Default value is False.
2421 """
2422 # Convert the collation to a Pandas DataFrame first:
2423 matrix = None
2424 witness_labels = []
2425 # Proceed based on the table type:
2426 if table_type == TableType.distance:
2427 # Convert the collation to a NumPy array and get its row and column labels first:
2428 matrix, witness_labels = self.to_distance_matrix(
2429 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext
2430 )
2431 elif table_type == TableType.similarity:
2432 # Convert the collation to a NumPy array and get its row and column labels first:
2433 matrix, witness_labels = self.to_similarity_matrix(
2434 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext
2435 )
2436 # Generate all parent folders for this file that don't already exist:
2437 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
2438 with open(file_addr, "w", encoding="utf-8") as f:
2439 # The first line contains the number of taxa:
2440 f.write("%d\n" % len(witness_labels))
2441 # Every subsequent line contains a witness label, followed by the values in its row of the matrix:
2442 for i, wit_id in enumerate(witness_labels):
2443 wit_label = slugify(wit_id, lowercase=False, allow_unicode=True, separator='_')
2444 f.write("%s %s\n" % (wit_label, " ".join([str(v) for v in matrix[i]])))
2445 return
2447 def get_stemma_symbols(self):
2448 """Returns a list of one-character symbols needed to represent the states of all substantive readings in stemma format.
2450 The number of symbols equals the maximum number of substantive readings at any variation unit.
2452 Returns:
2453 A list of individual characters representing states in readings.
2454 """
2455 possible_symbols = (
2456 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase)
2457 ) # NOTE: the maximum number of symbols allowed in stemma format (other than "?" and "-") is 62
2458 # The number of symbols needed is equal to the length of the longest substantive reading vector:
2459 nsymbols = 0
2460 # If there are no witnesses, then no symbols are needed at all:
2461 if len(self.witnesses) == 0:
2462 return []
2463 wit_id = self.witnesses[0].id
2464 for rdg_support in self.readings_by_witness[wit_id]:
2465 nsymbols = max(nsymbols, len(rdg_support))
2466 stemma_symbols = possible_symbols[:nsymbols]
2467 return stemma_symbols
2469 def to_stemma(self, file_addr: Union[Path, str]):
2470 """Writes this Collation to a stemma file without an extension and a Chron file (containing low, middle, and high dates for all witnesses) without an extension.
2472 Since this format does not support ambiguous states, all reading vectors with anything other than one nonzero entry will be interpreted as lacunose.
2473 If an interpGrp for weights is specified in the TEI XML collation, then the weights for the interp elements will be used as weights
2474 for the variation units that specify them in their ana attribute.
2476 Args:
2477 file_addr: A string representing the path to an output stemma prep file; the file should have no extension.
2478 The accompanying chron file will match this file name, except that it will have "_chron" appended to the end.
2479 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading.
2480 """
2481 # Populate a list of sites that will correspond to columns of the sequence alignment
2482 # (by default, constant sites are dropped):
2483 substantive_variation_unit_ids = [
2484 vu_id for vu_id in self.variation_unit_ids if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
2485 ]
2486 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
2487 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
2488 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary
2489 # to the readings' texts:
2490 reading_texts_by_indices = {}
2491 for j, vu in enumerate(self.variation_units):
2492 if vu.id not in substantive_variation_unit_ids_set:
2493 continue
2494 k = 0
2495 for rdg in vu.readings:
2496 key = tuple([vu.id, rdg.id])
2497 if key not in substantive_variation_unit_reading_tuples_set:
2498 continue
2499 indices = tuple([j, k])
2500 reading_texts_by_indices[indices] = rdg.text
2501 k += 1
2502 # In a second pass, populate another dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary
2503 # to the witnesses exclusively supporting those readings:
2504 reading_wits_by_indices = {}
2505 for indices in reading_texts_by_indices:
2506 reading_wits_by_indices[indices] = []
2507 for i, wit in enumerate(self.witnesses):
2508 for j, vu_id in enumerate(self.variation_unit_ids):
2509 if vu_id not in substantive_variation_unit_ids_set:
2510 continue
2511 rdg_support = self.readings_by_witness[wit.id][j]
2512 # If this witness does not exclusively support exactly one reading at this unit, then treat it as lacunose:
2513 if len([k for k, w in enumerate(rdg_support) if w > 0]) != 1:
2514 continue
2515 k = rdg_support.index(1)
2516 indices = tuple([j, k])
2517 reading_wits_by_indices[indices].append(wit.id)
2518 # In a third pass, write to the stemma file:
2519 symbols = self.get_stemma_symbols()
2520 Path(file_addr).parent.mkdir(
2521 parents=True, exist_ok=True
2522 ) # generate all parent folders for this file that don't already exist
2523 chron_file_addr = str(file_addr) + "_chron"
2524 with open(file_addr, "w", encoding="utf-8") as f:
2525 # Start with the witness list:
2526 f.write(
2527 "* %s ;\n\n"
2528 % " ".join(
2529 [slugify(wit.id, lowercase=False, allow_unicode=True, separator='_') for wit in self.witnesses]
2530 )
2531 )
2532 # f.write("^ %s\n\n" % chron_file_addr) #write the relative path to the chron file
2533 f.write(
2534 "^ %s\n\n" % ("." + os.sep + Path(chron_file_addr).name)
2535 ) # write the relative path to the chron file
2536 # Then add a line indicating that all witnesses are lacunose unless they are specified explicitly:
2537 f.write("= $? $* ;\n\n")
2538 # Then proceed for each variation unit:
2539 for j, vu_id in enumerate(self.variation_unit_ids):
2540 if vu_id not in substantive_variation_unit_ids_set:
2541 continue
2542 # Print the variation unit ID first:
2543 f.write("@ %s\n" % vu_id)
2544 # In a first pass, print the texts of all readings enclosed in brackets:
2545 f.write("[ ")
2546 k = 0
2547 while True:
2548 indices = tuple([j, k])
2549 if indices not in reading_texts_by_indices:
2550 break
2551 text = slugify(
2552 reading_texts_by_indices[indices], lowercase=False, allow_unicode=True, separator='.'
2553 )
2554 # Denote omissions by en-dashes:
2555 if text == "":
2556 text = "\u2013"
2557 # The first reading should not be preceded by anything:
2558 if k == 0:
2559 f.write(text)
2560 f.write(" |")
2561 # Add the weight of this variation unit after the pipe by comparing its analysis categories to their weights:
2562 weight = 1
2563 vu = self.variation_units[j]
2564 if len(vu.analysis_categories) > 0:
2565 weight = int(
2566 sum(
2567 [
2568 self.weights_by_id[ana] if ana in self.weights_by_id else 1
2569 for ana in vu.analysis_categories
2570 ]
2571 )
2572 / len(vu.analysis_categories)
2573 )
2574 f.write("*%d" % weight)
2575 # Every subsequent reading should be preceded by a space:
2576 elif k > 0:
2577 f.write(" %s" % text)
2578 k += 1
2579 f.write(" ]\n")
2580 # In a second pass, print the indices and witnesses for all readings enclosed in angle brackets:
2581 k = 0
2582 f.write("\t< ")
2583 while True:
2584 indices = tuple([j, k])
2585 if indices not in reading_wits_by_indices:
2586 break
2587 rdg_symbol = symbols[k] # get the one-character alphanumeric code for this state
2588 wits = " ".join(reading_wits_by_indices[indices])
2589 # Open the variant reading support block with an angle bracket:
2590 if k == 0:
2591 f.write("%s %s" % (rdg_symbol, wits))
2592 # Open all subsequent variant reading support blocks with pipes on the next line:
2593 else:
2594 f.write("\n\t| %s %s" % (rdg_symbol, wits))
2595 k += 1
2596 f.write(" >\n")
2597 # In a fourth pass, write to the chron file:
2598 max_id_length = max(
2599 [len(slugify(wit.id, lowercase=False, allow_unicode=True, separator='_')) for wit in self.witnesses]
2600 )
2601 max_date_length = 0
2602 for wit in self.witnesses:
2603 if wit.date_range[0] is not None:
2604 max_date_length = max(max_date_length, len(str(wit.date_range[0])))
2605 if wit.date_range[1] is not None:
2606 max_date_length = max(max_date_length, len(str(wit.date_range[1])))
2607 # Attempt to get the minimum and maximum dates for witnesses; if we can't do this, then don't write a chron file:
2608 min_date = None
2609 max_date = None
2610 try:
2611 min_date = min([wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None])
2612 max_date = max([wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None])
2613 except Exception as e:
2614 print("WARNING: no witnesses have date ranges; no chron file will be written!")
2615 return
2616 with open(chron_file_addr, "w", encoding="utf-8") as f:
2617 for wit in self.witnesses:
2618 wit_label = slugify(wit.id, lowercase=False, allow_unicode=True, separator='_')
2619 f.write(wit_label)
2620 f.write(" " * (max_id_length - len(wit.id) + 1))
2621 # If either the lower bound on this witness's date is empty, then use the min and max dates over all witnesses as defaults:
2622 date_range = wit.date_range
2623 if date_range[0] is None:
2624 date_range = tuple([min_date, date_range[1]])
2625 # Then write the date range minimum, average, and maximum to the chron file:
2626 low_date = str(date_range[0])
2627 f.write(" " * (max_date_length - len(low_date) + 2))
2628 f.write(low_date)
2629 avg_date = str(int(((date_range[0] + date_range[1]) / 2)))
2630 f.write(" " * (max_date_length - len(str(avg_date)) + 2))
2631 f.write(avg_date)
2632 high_date = str(date_range[1])
2633 f.write(" " * (max_date_length - len(high_date) + 2))
2634 f.write(high_date)
2635 f.write("\n")
2636 return
2638 def to_file(
2639 self,
2640 file_addr: Union[Path, str],
2641 format: Format = None,
2642 drop_constant: bool = False,
2643 split_missing: SplitMissingType = None,
2644 char_state_labels: bool = True,
2645 frequency: bool = False,
2646 ambiguous_as_missing: bool = False,
2647 proportion: bool = False,
2648 calibrate_dates: bool = False,
2649 mrbayes: bool = False,
2650 clock_model: ClockModel = ClockModel.strict,
2651 ancestral_logger: AncestralLogger = AncestralLogger.state,
2652 table_type: TableType = TableType.matrix,
2653 show_ext: bool = False,
2654 seed: int = None,
2655 ):
2656 """Writes this Collation to the file with the given address.
2658 Args:
2659 file_addr (Union[Path, str]): The path to the output file.
2660 format (Format, optional): The desired output format.
2661 If None then it is infered from the file suffix.
2662 Defaults to None.
2663 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
2664 Default value is False.
2665 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings.
2666 If not specified, then missing data is ignored (i.e., all states are 0).
2667 If "uniform", then the contribution of 1 is divided evenly over all substantive readings.
2668 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing.
2669 Only applicable for tabular outputs of type "matrix" or "idf".
2670 char_state_labels (bool, optional): An optional flag indicating whether to print
2671 the CharStateLabels block in NEXUS output.
2672 Default value is True.
2673 frequency (bool, optional): An optional flag indicating whether to use the StatesFormat=Frequency setting
2674 instead of the StatesFormat=StatesPresent setting
2675 (and thus represent all states with frequency vectors rather than symbols)
2676 in NEXUS output.
2677 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation.
2678 Default value is False.
2679 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data.
2680 If this flag is set, then only base symbols will be generated for the NEXUS file.
2681 It is only applied if the frequency option is False.
2682 Default value is False.
2683 proportion (bool, optional): An optional flag indicating whether to populate a distance matrix's cells
2684 with a proportion of disagreements to variation units where both witnesses are extant.
2685 It is only applied if the table_type option is "distance".
2686 Default value is False.
2687 calibrate_dates (bool, optional): An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses
2688 in NEXUS output.
2689 This option is intended for inputs to BEAST 2.
2690 Default value is False.
2691 mrbayes (bool, optional): An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses
2692 in NEXUS output.
2693 This option is intended for inputs to MrBayes.
2694 Default value is False.
2695 clock_model (ClockModel, optional): A ClockModel option indicating which type of clock model to use.
2696 This option is intended for inputs to MrBayes and BEAST 2.
2697 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified.
2698 Default value is "strict".
2699 ancestral_logger (AncestralLogger, optional): An AncestralLogger option indicating which class of logger (if any) to use for ancestral states.
2700 This option is intended for inputs to BEAST 2.
2701 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate.
2702 Only applicable for tabular outputs and PHYLIP outputs.
2703 If the output is a PHYLIP file, then the type of tabular output must be "distance" or "similarity"; otherwise, it will be ignored.
2704 Default value is "matrix".
2705 show_ext (bool, optional): An optional flag indicating whether each cell in a distance or similarity matrix
2706 should include the number of variation units where both witnesses are extant after the number of their disagreements/agreements.
2707 Only applicable for tabular output formats of type "distance" or "similarity".
2708 Default value is False.
2709 seed (optional, int): A seed for random number generation (for setting initial values of unspecified transcriptional rates in BEAST 2 XML output).
2710 """
2711 file_addr = Path(file_addr)
2712 format = format or Format.infer(
2713 file_addr.suffix
2714 ) # an exception will be raised here if the format or suffix is invalid
2716 if format == Format.NEXUS:
2717 return self.to_nexus(
2718 file_addr,
2719 drop_constant=drop_constant,
2720 char_state_labels=char_state_labels,
2721 frequency=frequency,
2722 ambiguous_as_missing=ambiguous_as_missing,
2723 calibrate_dates=calibrate_dates,
2724 mrbayes=mrbayes,
2725 clock_model=clock_model,
2726 )
2728 if format == format.HENNIG86:
2729 return self.to_hennig86(file_addr, drop_constant=drop_constant)
2731 if format == format.PHYLIP:
2732 if table_type in [TableType.distance, TableType.similarity]:
2733 return self.to_phylip_matrix(
2734 file_addr,
2735 drop_constant=drop_constant,
2736 proportion=proportion,
2737 table_type=table_type,
2738 show_ext=show_ext,
2739 )
2740 return self.to_phylip(file_addr, drop_constant=drop_constant)
2742 if format == format.FASTA:
2743 return self.to_fasta(file_addr, drop_constant=drop_constant)
2745 if format == format.BEAST:
2746 return self.to_beast(
2747 file_addr,
2748 drop_constant=drop_constant,
2749 clock_model=clock_model,
2750 ancestral_logger=ancestral_logger,
2751 seed=seed,
2752 )
2754 if format == Format.CSV:
2755 return self.to_csv(
2756 file_addr,
2757 drop_constant=drop_constant,
2758 ambiguous_as_missing=ambiguous_as_missing,
2759 proportion=proportion,
2760 table_type=table_type,
2761 split_missing=split_missing,
2762 show_ext=show_ext,
2763 )
2765 if format == Format.TSV:
2766 return self.to_csv(
2767 file_addr,
2768 drop_constant=drop_constant,
2769 ambiguous_as_missing=ambiguous_as_missing,
2770 proportion=proportion,
2771 table_type=table_type,
2772 split_missing=split_missing,
2773 show_ext=show_ext,
2774 sep="\t",
2775 )
2777 if format == Format.EXCEL:
2778 return self.to_excel(
2779 file_addr,
2780 drop_constant=drop_constant,
2781 ambiguous_as_missing=ambiguous_as_missing,
2782 proportion=proportion,
2783 table_type=table_type,
2784 split_missing=split_missing,
2785 show_ext=show_ext,
2786 )
2788 if format == Format.STEMMA:
2789 return self.to_stemma(file_addr)