Coverage for teiphy/collation.py: 99.92%
1312 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-15 16:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-15 16:06 +0000
1#!/usr/bin/env python3
3from enum import Enum
4from typing import List, Union
5from pathlib import os, Path
6from datetime import datetime # for calculating the current year (for dating and tree height purposes)
7import time # to time calculations for users
8import string # for easy retrieval of character ranges
9from lxml import etree as et # for reading TEI XML inputs
10import numpy as np # for random number sampling and collation matrix outputs
11import pandas as pd # for writing to DataFrames, CSV, Excel, etc.
12from slugify import slugify # for converting Unicode text from readings to ASCII for NEXUS
13from jinja2 import Environment, PackageLoader, select_autoescape # for filling output XML templates
15from .common import xml_ns, tei_ns
16from .format import Format
17from .witness import Witness
18from .variation_unit import VariationUnit
21class ParsingException(Exception):
22 pass
25class WitnessDateException(Exception):
26 pass
29class IntrinsicRelationsException(Exception):
30 pass
33class ClockModel(str, Enum):
34 strict = "strict"
35 uncorrelated = "uncorrelated"
36 local = "local"
39class AncestralLogger(str, Enum):
40 state = "state"
41 sequence = "sequence"
42 none = "none"
45class TableType(str, Enum):
46 matrix = "matrix"
47 distance = "distance"
48 similarity = "similarity"
49 nexus = "nexus"
50 long = "long"
53class Collation:
54 """Base class for storing TEI XML collation data internally.
56 This corresponds to the entire XML tree, rooted at the TEI element of the collation.
58 Attributes:
59 manuscript_suffixes: A list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses.
60 trivial_reading_types: A set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading.
61 missing_reading_types: A set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data.
62 fill_corrector_lacunae: A boolean flag indicating whether or not to fill "lacunae" in witnesses with type "corrector".
63 fragmentary_threshold: A float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation.
64 witnesses: A list of Witness instances contained in this Collation.
65 witness_index_by_id: A dictionary mapping base witness ID strings to their int indices in the witnesses list.
66 variation_units: A list of VariationUnit instances contained in this Collation.
67 readings_by_witness: A dictionary mapping base witness ID strings to lists of reading support coefficients for all units (with at least two substantive readings).
68 substantive_variation_unit_ids: A list of ID strings for variation units with two or more substantive readings.
69 substantive_variation_unit_reading_tuples: A list of (variation unit ID, reading ID) tuples for substantive readings.
70 verbose: A boolean flag indicating whether or not to print timing and debugging details for the user.
71 """
73 def __init__(
74 self,
75 xml: et.ElementTree,
76 manuscript_suffixes: List[str] = [],
77 trivial_reading_types: List[str] = [],
78 missing_reading_types: List[str] = [],
79 fill_corrector_lacunae: bool = False,
80 fragmentary_threshold: float = None,
81 dates_file: Union[Path, str] = None,
82 verbose: bool = False,
83 ):
84 """Constructs a new Collation instance with the given settings.
86 Args:
87 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
88 manuscript_suffixes: An optional list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses.
89 trivial_reading_types: An optional set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading.
90 missing_reading_types: An optional set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data.
91 fill_corrector_lacunae: An optional flag indicating whether or not to fill "lacunae" in witnesses with type "corrector".
92 fragmentary_threshold: An optional float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation.
93 dates_file: An optional path to a CSV file containing witness IDs, minimum dates, and maximum dates. If specified, then for all witnesses in the first column, any existing date ranges for them in the TEI XML collation will be ignored.
94 verbose: An optional flag indicating whether or not to print timing and debugging details for the user.
95 """
96 self.manuscript_suffixes = manuscript_suffixes
97 self.trivial_reading_types = set(trivial_reading_types)
98 self.missing_reading_types = set(missing_reading_types)
99 self.fill_corrector_lacunae = fill_corrector_lacunae
100 self.fragmentary_threshold = fragmentary_threshold
101 self.verbose = verbose
102 self.witnesses = []
103 self.witness_index_by_id = {}
104 self.variation_units = []
105 self.readings_by_witness = {}
106 self.variation_unit_ids = []
107 self.substantive_variation_unit_reading_tuples = []
108 self.substantive_readings_by_variation_unit_id = {}
109 self.intrinsic_categories = []
110 self.intrinsic_odds_by_id = {}
111 self.transcriptional_categories = []
112 self.transcriptional_rates_by_id = {}
113 self.origin_date_range = []
114 # Now parse the XML tree to populate these data structures:
115 if self.verbose:
116 print("Initializing collation...")
117 t0 = time.time()
118 self.parse_origin_date_range(xml)
119 self.parse_list_wit(xml)
120 self.validate_wits(xml)
121 # If a dates file was specified, then update the witness date ranges manually:
122 if dates_file is not None:
123 self.update_witness_date_ranges_from_dates_file(dates_file)
124 # If the upper bound on a work's date of origin is not defined, then attempt to assign it an upper bound based on the witness dates;
125 # otherwise, attempt to assign lower bounds to witness dates based on it:
126 if self.origin_date_range[1] is None:
127 self.update_origin_date_range_from_witness_date_ranges()
128 else:
129 self.update_witness_date_ranges_from_origin_date_range()
130 self.parse_intrinsic_odds(xml)
131 self.parse_transcriptional_rates(xml)
132 self.parse_apps(xml)
133 self.validate_intrinsic_relations()
134 self.parse_readings_by_witness()
135 # If a threshold of readings for fragmentary witnesses is specified, then filter the witness list using the dictionary mapping witness IDs to readings:
136 if self.fragmentary_threshold is not None:
137 self.filter_fragmentary_witnesses(xml)
138 t1 = time.time()
139 if self.verbose:
140 print("Total time to initialize collation: %0.4fs." % (t1 - t0))
142 def parse_origin_date_range(self, xml: et.ElementTree):
143 """Given an XML tree for a collation, populates this Collation's list of origin date bounds.
145 Args:
146 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
147 """
148 if self.verbose:
149 print("Parsing origin date range...")
150 t0 = time.time()
151 self.origin_date_range = [None, None]
152 for date in xml.xpath(
153 "//tei:sourceDesc//tei:bibl//tei:date|//tei:sourceDesc//tei:biblStruct//tei:date|//tei:sourceDesc//tei:biblFull//tei:date",
154 namespaces={"tei": tei_ns},
155 ):
156 # Try the @when attribute first; if it is set, then it accounts for both ends of the date range:
157 if date.get("when") is not None:
158 self.origin_date_range[0] = int(date.get("when").split("-")[0])
159 self.origin_date_range[1] = self.origin_date_range[0]
160 # Failing that, if it has @from and @to attributes (indicating the period over which the work was completed),
161 # then the completion date of the work accounts for both ends of the date range:
162 elif date.get("to") is not None:
163 self.origin_date_range[0] = int(date.get("to").split("-")[0])
164 self.origin_date_range[1] = self.origin_date_range[0]
165 # Failing that, set lower and upper bounds on the origin date using the the @notBefore and @notAfter attributes:
166 elif date.get("notBefore") is not None or date.get("notAfter") is not None:
167 if date.get("notBefore") is not None:
168 self.origin_date_range[0] = int(date.get("notBefore").split("-")[0])
169 if date.get("notAfter") is not None:
170 self.origin_date_range[1] = int(date.get("notAfter").split("-")[0])
171 return
173 def get_base_wit(self, wit: str):
174 """Given a witness siglum, strips of the specified manuscript suffixes until the siglum matches one in the witness list or until no more suffixes can be stripped.
176 Args:
177 wit: A string representing a witness siglum, potentially including suffixes to be stripped.
178 """
179 base_wit = wit
180 # If our starting siglum corresponds to a siglum in the witness list, then just return it:
181 if base_wit in self.witness_index_by_id:
182 return base_wit
183 # Otherwise, strip any suffixes we find until the siglum corresponds to a base witness in the list
184 # or no more suffixes can be stripped:
185 suffix_found = True
186 while suffix_found:
187 suffix_found = False
188 for suffix in self.manuscript_suffixes:
189 if base_wit.endswith(suffix):
190 suffix_found = True
191 base_wit = base_wit[: -len(suffix)]
192 break # stop looking for other suffixes
193 # If the siglum stripped of this suffix now corresponds to a siglum in the witness list, then return it:
194 if base_wit in self.witness_index_by_id:
195 return base_wit
196 # If we get here, then all possible manuscript suffixes have been stripped, and the resulting siglum does not correspond to a siglum in the witness list:
197 return base_wit
199 def parse_list_wit(self, xml: et.ElementTree):
200 """Given an XML tree for a collation, populates its list of witnesses from its listWit element.
201 If the XML tree does not contain a listWit element, then a ParsingException is thrown listing all distinct witness sigla encountered in the collation.
203 Args:
204 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
205 """
206 if self.verbose:
207 print("Parsing witness list...")
208 t0 = time.time()
209 self.witnesses = []
210 self.witness_index_by_id = {}
211 list_wits = xml.xpath("/tei:TEI//tei:listWit", namespaces={"tei": tei_ns})
212 if len(list_wits) == 0:
213 # There is no listWit element: collect all distinct witness sigla in the collation and raise a ParsingException listing them:
214 distinct_sigla = set()
215 sigla = []
216 # Proceed for each rdg, rdgGrp, or witDetail element:
217 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}):
218 wit_str = rdg.get("wit") if rdg.get("wit") is not None else ""
219 wits = wit_str.split()
220 for wit in wits:
221 siglum = wit.strip("#") # remove the URI prefix, if there is one
222 if siglum not in distinct_sigla:
223 distinct_sigla.add(siglum)
224 sigla.append(siglum)
225 sigla.sort()
226 msg = ""
227 msg += "An explicit listWit element must be included in the TEI XML collation.\n"
228 msg += "The following sigla occur in the collation and should be included as the @xml:id or @n attributes of witness elements under the listWit element:\n"
229 msg += ", ".join(sigla)
230 raise ParsingException(msg)
231 # Otherwise, take the first listWit element as the list of all witnesses and process it:
232 list_wit = list_wits[0]
233 for witness in list_wit.xpath("./tei:witness", namespaces={"tei": tei_ns}):
234 wit = Witness(witness, self.verbose)
235 self.witness_index_by_id[wit.id] = len(self.witnesses)
236 self.witnesses.append(wit)
237 t1 = time.time()
238 if self.verbose:
239 print("Finished processing %d witnesses in %0.4fs." % (len(self.witnesses), t1 - t0))
240 return
242 def validate_wits(self, xml: et.ElementTree):
243 """Given an XML tree for a collation, checks if any witness sigla listed in a rdg, rdgGrp, or witDetail element,
244 once stripped of ignored suffixes, is not found in the witness list.
245 A warning will be issued for each distinct siglum like this.
246 This method also checks if the upper bound of any witness's date is earlier than the lower bound on the collated work's date of origin
247 and throws an exception if so.
249 Args:
250 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
251 """
252 if self.verbose:
253 print("Validating witness list against collation...")
254 t0 = time.time()
255 # There is no listWit element: collect all distinct witness sigla in the collation and raise an exception listing them:
256 distinct_extra_sigla = set()
257 extra_sigla = []
258 # Proceed for each rdg, rdgGrp, or witDetail element:
259 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}):
260 wit_str = rdg.get("wit") if rdg.get("wit") is not None else ""
261 wits = wit_str.split()
262 for wit in wits:
263 siglum = wit.strip("#") # remove the URI prefix, if there is one
264 base_siglum = self.get_base_wit(siglum)
265 if base_siglum not in self.witness_index_by_id:
266 if base_siglum not in distinct_extra_sigla:
267 distinct_extra_sigla.add(base_siglum)
268 extra_sigla.append(base_siglum)
269 if len(extra_sigla) > 0:
270 extra_sigla.sort()
271 msg = ""
272 msg += "WARNING: The following sigla occur in the collation that do not have corresponding witness entries in the listWit:\n"
273 msg += ", ".join(extra_sigla)
274 print(msg)
275 # If the lower bound on the date of origin is defined, then check each witness against it:
276 if self.origin_date_range[0] is not None:
277 bad_date_witness_sigla = []
278 bad_date_upper_bounds_by_witness = {}
279 for i, wit in enumerate(self.witnesses):
280 if wit.date_range[1] is not None and wit.date_range[1] < self.origin_date_range[0]:
281 bad_date_witness_sigla.append(wit.id)
282 bad_date_upper_bounds_by_witness[wit.id] = wit.date_range[1]
283 if len(bad_date_witness_sigla) > 0:
284 msg = ""
285 msg += "The following witnesses have their latest possible dates before the earliest date of origin %d specified for the collated work:\n"
286 msg += ", ".join(
287 [
288 (siglum + "(" + str(bad_date_upper_bounds_by_witness[siglum]) + ")")
289 for siglum in bad_date_witness_sigla
290 ]
291 )
292 raise WitnessDateException(msg)
293 t1 = time.time()
294 if self.verbose:
295 print("Finished witness validation in %0.4fs." % (t1 - t0))
296 return
298 def update_witness_date_ranges_from_dates_file(self, dates_file: Union[Path, str]):
299 """Given a CSV-formatted dates file, update the date ranges of all witnesses whose IDs are in the first column of the dates file
300 (overwriting existing date ranges if necessary).
301 """
302 if self.verbose:
303 print("Updating witness dates from file %s..." % (str(dates_file)))
304 t0 = time.time()
305 dates_df = pd.read_csv(dates_file, index_col=0, names=["id", "min", "max"])
306 for witness in self.witnesses:
307 wit_id = witness.id
308 if wit_id in dates_df.index:
309 # For every witness in the list whose ID is specified in the dates file,
310 # update their date ranges (as long as the date ranges in the file are are well-formed):
311 min_date = int(dates_df.loc[wit_id]["min"]) if not np.isnan(dates_df.loc[wit_id]["min"]) else None
312 max_date = (
313 int(dates_df.loc[wit_id]["max"])
314 if not np.isnan(dates_df.loc[wit_id]["max"])
315 else datetime.now().year
316 )
317 print(min_date, max_date)
318 if min_date is not None and max_date is not None and min_date > max_date:
319 raise ParsingException(
320 "In dates file %s, for witness ID %s, the minimum date %d is greater than the maximum date %d."
321 % (str(dates_file), wit_id, min_date, max_date)
322 )
323 witness.date_range = [min_date, max_date]
324 t1 = time.time()
325 if self.verbose:
326 print("Finished witness date range updates in %0.4fs." % (t1 - t0))
327 return
329 def update_origin_date_range_from_witness_date_ranges(self):
330 """Conditionally updates the upper bound on the date of origin of the work represented by this Collation
331 based on the bounds on the witnesses' dates.
332 If none of the witnesses have bounds on their dates, then nothing is done.
333 This method is only invoked if the work's date of origin does not already have its upper bound defined.
334 """
335 if self.verbose:
336 print("Updating upper bound on origin date using witness dates...")
337 t0 = time.time()
338 # Set the origin date to the earliest witness date:
339 witness_date_lower_bounds = [wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None]
340 witness_date_upper_bounds = [wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None]
341 min_witness_date = (
342 min(witness_date_lower_bounds + witness_date_upper_bounds)
343 if len(witness_date_lower_bounds + witness_date_upper_bounds) > 0
344 else None
345 )
346 if min_witness_date is not None:
347 self.origin_date_range[1] = (
348 min(self.origin_date_range[1], min_witness_date)
349 if self.origin_date_range[1] is not None
350 else min_witness_date
351 )
352 t1 = time.time()
353 if self.verbose:
354 print("Finished updating upper bound on origin date in %0.4fs." % (t1 - t0))
355 return
357 def update_witness_date_ranges_from_origin_date_range(self):
358 """Attempts to update the lower bounds on the witnesses' dates of origin of the work represented by this Collation
359 using the upper bound on the date of origin of the work represented by this Collation.
360 This method is only invoked if the upper bound on the work's date of origin was not already defined
361 (i.e., if update_origin_date_range_from_witness_date_ranges was not invoked).
362 """
363 if self.verbose:
364 print("Updating lower bounds on witness dates using origin date...")
365 t0 = time.time()
366 # Proceed for every witness:
367 for i, wit in enumerate(self.witnesses):
368 # Ensure that the lower bound on this witness's date is no earlier than the upper bound on the date of the work's origin:
369 wit.date_range[0] = (
370 max(wit.date_range[0], self.origin_date_range[1])
371 if wit.date_range[0] is not None
372 else self.origin_date_range[1]
373 )
374 # Then ensure that the upper bound on this witness's date is no earlier than its lower bound, in case we updated it:
375 wit.date_range[1] = max(wit.date_range[0], wit.date_range[1])
376 t1 = time.time()
377 if self.verbose:
378 print("Finished updating lower bounds on witness dates in %0.4fs." % (t1 - t0))
379 return
381 def parse_intrinsic_odds(self, xml: et.ElementTree):
382 """Given an XML tree for a collation, populates this Collation's list of intrinsic probability categories
383 (e.g., "absolutely more likely," "highly more likely," "more likely," "slightly more likely," "equally likely")
384 and its dictionary mapping these categories to numerical odds.
385 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated.
387 Args:
388 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
389 """
390 if self.verbose:
391 print("Parsing intrinsic odds categories...")
392 t0 = time.time()
393 self.intrinsic_categories = []
394 self.intrinsic_odds_by_id = {}
395 for interp in xml.xpath("//tei:interpGrp[@type=\"intrinsic\"]/tei:interp", namespaces={"tei": tei_ns}):
396 # These must be indexed by the xml:id attribute, so skip any that do not have one:
397 if interp.get("{%s}id" % xml_ns) is None:
398 continue
399 odds_category = interp.get("{%s}id" % xml_ns)
400 # If this element contains a certainty subelement with a fixed odds value for this category, then set it:
401 odds = None
402 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}):
403 if certainty.get("degree") is not None:
404 odds = float(certainty.get("degree"))
405 break
406 self.intrinsic_categories.append(odds_category)
407 self.intrinsic_odds_by_id[odds_category] = odds
408 t1 = time.time()
409 if self.verbose:
410 print(
411 "Finished processing %d intrinsic odds categories in %0.4fs."
412 % (len(self.intrinsic_categories), t1 - t0)
413 )
414 return
416 def parse_transcriptional_rates(self, xml: et.ElementTree):
417 """Given an XML tree for a collation, populates this Collation's dictionary mapping transcriptional change categories
418 (e.g., "aural confusion," "visual error," "clarification") to numerical rates.
419 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated.
421 Args:
422 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
423 """
424 if self.verbose:
425 print("Parsing transcriptional change categories...")
426 t0 = time.time()
427 self.transcriptional_categories = []
428 self.transcriptional_rates_by_id = {}
429 for interp in xml.xpath("//tei:interpGrp[@type=\"transcriptional\"]/tei:interp", namespaces={"tei": tei_ns}):
430 # These must be indexed by the xml:id attribute, so skip any that do not have one:
431 if interp.get("{%s}id" % xml_ns) is None:
432 continue
433 transcriptional_category = interp.get("{%s}id" % xml_ns)
434 # If this element contains a certainty subelement with a fixed rate for this category, then set it:
435 rate = None
436 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}):
437 if certainty.get("degree") is not None:
438 rate = float(certainty.get("degree"))
439 break
440 self.transcriptional_categories.append(transcriptional_category)
441 self.transcriptional_rates_by_id[transcriptional_category] = rate
442 t1 = time.time()
443 if self.verbose:
444 print(
445 "Finished processing %d transcriptional change categories in %0.4fs."
446 % (len(self.transcriptional_rates_by_id), t1 - t0)
447 )
448 return
450 def validate_intrinsic_relations(self):
451 """Checks if any VariationUnit's intrinsic_relations map is not a forest.
452 If any is not, then an IntrinsicRelationsException is thrown describing the VariationUnit at fault.
453 """
454 if self.verbose:
455 print("Validating intrinsic relation graphs for variation units...")
456 t0 = time.time()
457 for vu in self.variation_units:
458 # Skip any variation units with an empty intrinsic_relations map:
459 if len(vu.intrinsic_relations) == 0:
460 continue
461 # For all others, start by identifying all reading IDs that are not related to by some other reading ID:
462 in_degree_by_reading = {}
463 for edge in vu.intrinsic_relations:
464 s = edge[0]
465 t = edge[1]
466 if s not in in_degree_by_reading:
467 in_degree_by_reading[s] = 0
468 if t not in in_degree_by_reading:
469 in_degree_by_reading[t] = 0
470 in_degree_by_reading[t] += 1
471 # If any reading has more than one relation pointing to it, then the intrinsic relations graph is not a forest:
472 excessive_in_degree_readings = [
473 rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] > 1
474 ]
475 if len(excessive_in_degree_readings) > 0:
476 msg = ""
477 msg += (
478 "In variation unit %s, the following readings have more than one intrinsic relation pointing to them: %s.\n"
479 % (vu.id, ", ".join(excessive_in_degree_readings))
480 )
481 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it."
482 raise IntrinsicRelationsException(msg)
483 # If every reading has another reading pointing to it, then the intrinsic relations graph contains a cycle and is not a forest:
484 starting_nodes = [rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] == 0]
485 if len(starting_nodes) == 0:
486 msg = ""
487 msg += "In variation unit %s, the intrinsic relations contain a cycle.\n" % vu.id
488 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it."
489 raise IntrinsicRelationsException(msg)
490 t1 = time.time()
491 if self.verbose:
492 print("Finished intrinsic relations validation in %0.4fs." % (t1 - t0))
493 return
495 def parse_apps(self, xml: et.ElementTree):
496 """Given an XML tree for a collation, populates its list of variation units from its app elements.
498 Args:
499 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
500 """
501 if self.verbose:
502 print("Parsing variation units...")
503 t0 = time.time()
504 for a in xml.xpath('//tei:app', namespaces={'tei': tei_ns}):
505 vu = VariationUnit(a, self.verbose)
506 self.variation_units.append(vu)
507 t1 = time.time()
508 if self.verbose:
509 print("Finished processing %d variation units in %0.4fs." % (len(self.variation_units), t1 - t0))
510 return
512 def get_readings_by_witness_for_unit(self, vu: VariationUnit):
513 """Returns a dictionary mapping witness IDs to a list of their reading coefficients for a given variation unit.
515 Args:
516 vu: A VariationUnit to be processed.
518 Returns:
519 A dictionary mapping witness ID strings to a list of their coefficients for all substantive readings in this VariationUnit.
520 """
521 # In a first pass, populate lists of substantive (variation unit ID, reading ID) tuples and reading labels
522 # and a map from reading IDs to the indices of their parent substantive reading in this unit:
523 reading_id_to_index = {}
524 self.substantive_readings_by_variation_unit_id[vu.id] = []
525 for rdg in vu.readings:
526 # If this reading is missing (e.g., lacunose or inapplicable due to an overlapping variant) or targets another reading, then skip it:
527 if rdg.type in self.missing_reading_types or len(rdg.certainties) > 0:
528 continue
529 # If this reading is trivial, then map it to the last substantive index:
530 if rdg.type in self.trivial_reading_types:
531 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1
532 continue
533 # Otherwise, the reading is substantive: add it to the map and update the last substantive index:
534 self.substantive_readings_by_variation_unit_id[vu.id].append(rdg.id)
535 self.substantive_variation_unit_reading_tuples.append(tuple([vu.id, rdg.id]))
536 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1
537 # If the list of substantive readings only contains one entry, then this variation unit is not informative;
538 # return an empty dictionary and add nothing to the list of substantive reading labels:
539 if self.verbose:
540 print(
541 "Variation unit %s has %d substantive readings."
542 % (vu.id, len(self.substantive_readings_by_variation_unit_id[vu.id]))
543 )
544 readings_by_witness_for_unit = {}
545 # Initialize the output dictionary with empty sets for all base witnesses:
546 for wit in self.witnesses:
547 readings_by_witness_for_unit[wit.id] = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id])
548 # In a second pass, assign each base witness a set containing the readings it supports in this unit:
549 for rdg in vu.readings:
550 # Initialize the dictionary indicating support for this reading (or its disambiguations):
551 rdg_support = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id])
552 # If this is a missing reading (e.g., a lacuna or an overlap), then we can skip it, as its corresponding set will be empty:
553 if rdg.type in self.missing_reading_types:
554 continue
555 # Otherwise, if this reading is trivial, then it will contain an entry for the index of its parent substantive reading:
556 elif rdg.type in self.trivial_reading_types:
557 rdg_support[reading_id_to_index[rdg.id]] = 1
558 # Otherwise, if this reading has one or more nonzero certainty degrees,
559 # then set the entries for these readings to their degrees:
560 elif sum(rdg.certainties.values()) > 0:
561 for t in rdg.certainties:
562 # Skip any reading whose ID is unrecognized in this unit:
563 if t in reading_id_to_index:
564 rdg_support[reading_id_to_index[t]] = rdg.certainties[t]
565 # Otherwise, if this reading has one or more targets (i.e., if it is an ambiguous reading),
566 # then set the entries for each of its targets to 1:
567 elif len(rdg.targets) > 0:
568 for t in rdg.targets:
569 # Skip any reading whose ID is unrecognized in this unit:
570 if t in reading_id_to_index:
571 rdg_support[reading_id_to_index[t]] = 1
572 # Otherwise, this reading is itself substantive; set the entry for the index of this reading to 1:
573 else:
574 rdg_support[reading_id_to_index[rdg.id]] = 1
575 # Proceed for each witness siglum in the support for this reading:
576 for wit in rdg.wits:
577 # Is this siglum a base siglum?
578 base_wit = self.get_base_wit(wit)
579 if base_wit not in self.witness_index_by_id:
580 # If it is not, then it is probably just because we've encountered a corrector or some other secondary witness not included in the witness list;
581 # report this if we're in verbose mode and move on:
582 if self.verbose:
583 print(
584 "Skipping unknown witness siglum %s (base siglum %s) in variation unit %s, reading %s..."
585 % (wit, base_wit, vu.id, rdg.id)
586 )
587 continue
588 # If we've found a base siglum, then add this reading's contribution to the base witness's reading set for this unit;
589 # normally the existing set will be empty, but if we reduce two suffixed sigla to the same base witness,
590 # then that witness may attest to multiple readings in the same unit:
591 readings_by_witness_for_unit[base_wit] = [
592 (min(readings_by_witness_for_unit[base_wit][i] + rdg_support[i], 1))
593 for i in range(len(rdg_support))
594 ]
595 return readings_by_witness_for_unit
597 def parse_readings_by_witness(self):
598 """Populates the internal dictionary mapping witness IDs to a list of their reading support sets for all variation units, and then fills the empty reading support sets for witnesses of type "corrector" with the entries of the previous witness."""
599 if self.verbose:
600 print("Populating internal dictionary of witness readings...")
601 t0 = time.time()
602 # Initialize the data structures to be populated here:
603 self.readings_by_witness = {}
604 self.variation_unit_ids = []
605 for wit in self.witnesses:
606 self.readings_by_witness[wit.id] = []
607 # Populate them for each variation unit:
608 for vu in self.variation_units:
609 readings_by_witness_for_unit = self.get_readings_by_witness_for_unit(vu)
610 if len(readings_by_witness_for_unit) > 0:
611 self.variation_unit_ids.append(vu.id)
612 for wit in readings_by_witness_for_unit:
613 self.readings_by_witness[wit].append(readings_by_witness_for_unit[wit])
614 # Optionally, fill the lacunae of the correctors:
615 if self.fill_corrector_lacunae:
616 for i, wit in enumerate(self.witnesses):
617 # If this is the first witness, then it shouldn't be a corrector (since there is no previous witness against which to compare it):
618 if i == 0:
619 continue
620 # Otherwise, if this witness is not a corrector, then skip it:
621 if wit.type != "corrector":
622 continue
623 # Otherwise, retrieve the previous witness:
624 prev_wit = self.witnesses[i - 1]
625 for j in range(len(self.readings_by_witness[wit.id])):
626 # If the corrector has no reading, then set it to the previous witness's reading:
627 if sum(self.readings_by_witness[wit.id][j]) == 0:
628 self.readings_by_witness[wit.id][j] = self.readings_by_witness[prev_wit.id][j]
629 t1 = time.time()
630 if self.verbose:
631 print(
632 "Populated dictionary for %d witnesses over %d substantive variation units in %0.4fs."
633 % (len(self.witnesses), len(self.variation_unit_ids), t1 - t0)
634 )
635 return
637 def filter_fragmentary_witnesses(self, xml):
638 """Filters the original witness list and readings by witness dictionary to exclude witnesses whose proportions of extant passages fall below the fragmentary readings threshold."""
639 if self.verbose:
640 print(
641 "Filtering fragmentary witnesses (extant in < %f of all variation units) out of internal witness list and dictionary of witness readings..."
642 % self.fragmentary_threshold
643 )
644 t0 = time.time()
645 fragmentary_witness_set = set()
646 # Proceed for each witness in order:
647 for wit in self.witnesses:
648 wit_id = wit.id
649 # We count the number of variation units at which this witness has an extant (i.e., non-missing) reading:
650 extant_reading_count = 0
651 total_reading_count = len(self.readings_by_witness[wit.id])
652 # Proceed through all reading support lists:
653 for rdg_support in self.readings_by_witness[wit_id]:
654 # If the current reading support list is not all zeroes, then increment this witness's count of extant readings:
655 if sum(rdg_support) != 0:
656 extant_reading_count += 1
657 # If the proportion of extant readings falls below the threshold, then add this witness to the list of fragmentary witnesses:
658 if extant_reading_count / total_reading_count < self.fragmentary_threshold:
659 fragmentary_witness_set.add(wit_id)
660 # Then filter the witness list to exclude the fragmentary witnesses:
661 filtered_witnesses = [wit for wit in self.witnesses if wit.id not in fragmentary_witness_set]
662 self.witnesses = filtered_witnesses
663 # Then remove the entries for the fragmentary witnesses from the witnesses-to-readings dictionary:
664 for wit_id in fragmentary_witness_set:
665 del self.readings_by_witness[wit_id]
666 t1 = time.time()
667 if self.verbose:
668 print(
669 "Filtered out %d fragmentary witness(es) (%s) in %0.4fs."
670 % (len(fragmentary_witness_set), str(list(fragmentary_witness_set)), t1 - t0)
671 )
672 return
674 def get_nexus_symbols(self):
675 """Returns a list of one-character symbols needed to represent the states of all substantive readings in NEXUS.
677 The number of symbols equals the maximum number of substantive readings at any variation unit.
679 Returns:
680 A list of individual characters representing states in readings.
681 """
682 # NOTE: PAUP* allows up to 64 symbols, but IQTREE does not appear to support symbols outside of 0-9 and a-z, and base symbols must be case-insensitive,
683 # so we will settle for a maximum of 32 singleton symbols for now
684 possible_symbols = list(string.digits) + list(string.ascii_lowercase)[:22]
685 # The number of symbols needed is equal to the length of the longest substantive reading vector:
686 nsymbols = 0
687 # If there are no witnesses, then no symbols are needed at all:
688 if len(self.witnesses) == 0:
689 return []
690 wit_id = self.witnesses[0].id
691 for rdg_support in self.readings_by_witness[wit_id]:
692 nsymbols = max(nsymbols, len(rdg_support))
693 nexus_symbols = possible_symbols[:nsymbols]
694 return nexus_symbols
696 def to_nexus(
697 self,
698 file_addr: Union[Path, str],
699 drop_constant: bool = False,
700 char_state_labels: bool = True,
701 frequency: bool = False,
702 ambiguous_as_missing: bool = False,
703 calibrate_dates: bool = False,
704 mrbayes: bool = False,
705 clock_model: ClockModel = ClockModel.strict,
706 ):
707 """Writes this Collation to a NEXUS file with the given address.
709 Args:
710 file_addr: A string representing the path to an output NEXUS file; the file type should be .nex, .nexus, or .nxs.
711 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading.
712 char_state_labels: An optional flag indicating whether or not to include the CharStateLabels block.
713 frequency: An optional flag indicating whether to use the StatesFormat=Frequency setting
714 instead of the StatesFormat=StatesPresent setting
715 (and thus represent all states with frequency vectors rather than symbols).
716 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation.
717 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data.
718 If this flag is set, then only base symbols will be generated for the NEXUS file.
719 It is only applied if the frequency option is False.
720 calibrate_dates: An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses.
721 This option is intended for inputs to BEAST 2.
722 mrbayes: An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses.
723 This option is intended for inputs to MrBayes.
724 clock_model: A ClockModel option indicating which type of clock model to use.
725 This option is intended for inputs to MrBayes and BEAST 2.
726 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified.
727 """
728 # Populate a list of sites that will correspond to columns of the sequence alignment:
729 substantive_variation_unit_ids = self.variation_unit_ids
730 if drop_constant:
731 substantive_variation_unit_ids = [
732 vu_id
733 for vu_id in self.variation_unit_ids
734 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
735 ]
736 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
737 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
738 # Start by calculating the values we will be using here:
739 ntax = len(self.witnesses)
740 nchar = len(substantive_variation_unit_ids)
741 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses]
742 max_taxlabel_length = max(
743 [len(taxlabel) for taxlabel in taxlabels]
744 ) # keep track of the longest taxon label for tabular alignment purposes
745 charlabels = [slugify(vu_id, lowercase=False, separator='_') for vu_id in substantive_variation_unit_ids]
746 missing_symbol = '?'
747 symbols = self.get_nexus_symbols()
748 # Generate all parent folders for this file that don't already exist:
749 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
750 with open(file_addr, "w", encoding="utf-8") as f:
751 # Start with the NEXUS header:
752 f.write("#NEXUS\n\n")
753 # Then begin the data block:
754 f.write("Begin DATA;\n")
755 # Write the collation matrix dimensions:
756 f.write("\tDimensions ntax=%d nchar=%d;\n" % (ntax, nchar))
757 # Write the format subblock:
758 f.write("\tFormat\n")
759 f.write("\t\tDataType=Standard\n")
760 f.write("\t\tMissing=%s\n" % missing_symbol)
761 if frequency:
762 f.write("\t\tStatesFormat=Frequency\n")
763 f.write("\t\tSymbols=\"%s\";\n" % (" ".join(symbols)))
764 # If the char_state_labels is set, then write the labels for character-state labels, with each on its own line:
765 if char_state_labels:
766 f.write("\tCharStateLabels")
767 vu_ind = 1
768 for vu in self.variation_units:
769 if vu.id not in substantive_variation_unit_ids_set:
770 continue
771 if vu_ind == 1:
772 f.write("\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_')))
773 else:
774 f.write(",\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_')))
775 rdg_ind = 0
776 for rdg in vu.readings:
777 key = tuple([vu.id, rdg.id])
778 if key not in substantive_variation_unit_reading_tuples_set:
779 continue
780 ascii_rdg_text = slugify(
781 rdg.text, lowercase=False, separator='_', replacements=[['η', 'h'], ['ω', 'w']]
782 )
783 if ascii_rdg_text == "":
784 ascii_rdg_text = "om."
785 f.write(" %s" % ascii_rdg_text)
786 rdg_ind += 1
787 if rdg_ind > 0:
788 vu_ind += 1
789 f.write(";\n")
790 # Write the matrix subblock:
791 f.write("\tMatrix")
792 for i, wit in enumerate(self.witnesses):
793 taxlabel = taxlabels[i]
794 if frequency:
795 sequence = "\n\t\t" + taxlabel
796 for j, vu_id in enumerate(self.variation_unit_ids):
797 if vu_id not in substantive_variation_unit_ids_set:
798 continue
799 rdg_support = self.readings_by_witness[wit.id][j]
800 sequence += "\n\t\t\t"
801 # If this reading is lacunose in this witness, then use the missing character:
802 if sum(rdg_support) == 0:
803 sequence += missing_symbol
804 continue
805 # Otherwise, print out its frequencies for different readings in parentheses:
806 sequence += "("
807 for j, w in enumerate(rdg_support):
808 sequence += "%s:%0.4f" % (symbols[j], w)
809 if j < len(rdg_support) - 1:
810 sequence += " "
811 sequence += ")"
812 else:
813 sequence = "\n\t\t" + taxlabel
814 # Add enough space after this label ensure that all sequences are nicely aligned:
815 sequence += " " * (max_taxlabel_length - len(taxlabel) + 1)
816 for j, vu_id in enumerate(self.variation_unit_ids):
817 if vu_id not in substantive_variation_unit_ids_set:
818 continue
819 rdg_support = self.readings_by_witness[wit.id][j]
820 # If this reading is lacunose in this witness, then use the missing character:
821 if sum(rdg_support) == 0:
822 sequence += missing_symbol
823 continue
824 rdg_inds = [
825 k for k, w in enumerate(rdg_support) if w > 0
826 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them
827 # For singleton readings, just print the symbol:
828 if len(rdg_inds) == 1:
829 sequence += symbols[rdg_inds[0]]
830 continue
831 # For multiple readings, print the corresponding readings in braces or the missing symbol depending on input settings:
832 if ambiguous_as_missing:
833 sequence += missing_symbol
834 else:
835 sequence += "{%s}" % "".join([str(rdg_ind) for rdg_ind in rdg_inds])
836 f.write("%s" % (sequence))
837 f.write(";\n")
838 # End the data block:
839 f.write("End;")
840 # If calibrate_dates is set, then add the assumptions block:
841 if calibrate_dates:
842 f.write("\n\n")
843 f.write("Begin ASSUMPTIONS;\n")
844 # Set the scale to years:
845 f.write("\tOPTIONS SCALE = years;\n\n")
846 # Then calibrate the witness ages:
847 calibrate_strings = []
848 for i, wit in enumerate(self.witnesses):
849 taxlabel = taxlabels[i]
850 date_range = wit.date_range
851 if date_range[0] is not None:
852 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution,
853 # depending on whether the upper and lower bounds match:
854 min_age = datetime.now().year - date_range[1]
855 max_age = datetime.now().year - date_range[0]
856 if min_age == max_age:
857 calibrate_string = "\tCALIBRATE %s = fixed(%d)" % (taxlabel, min_age)
858 calibrate_strings.append(calibrate_string)
859 else:
860 calibrate_string = "\tCALIBRATE %s = uniform(%d,%d)" % (taxlabel, min_age, max_age)
861 calibrate_strings.append(calibrate_string)
862 else:
863 # If there is no lower bound on the witness's date, then use an offset log-normal distribution:
864 min_age = datetime.now().year - date_range[1]
865 calibrate_string = "\tCALIBRATE %s = offsetlognormal(%d,0.0,1.0)" % (taxlabel, min_age)
866 calibrate_strings.append(calibrate_string)
867 # Then print the calibrate strings, separated by commas and line breaks and terminated by a semicolon:
868 f.write("%s;\n\n" % ",\n".join(calibrate_strings))
869 # End the assumptions block:
870 f.write("End;")
871 # If mrbayes is set, then add the mrbayes block:
872 if mrbayes:
873 f.write("\n\n")
874 f.write("Begin MRBAYES;\n")
875 # Turn on the autoclose feature by default:
876 f.write("\tset autoclose=yes;\n")
877 # Set the branch lengths to be governed by a birth-death clock model, and set up the parameters for this model:
878 f.write("\n")
879 f.write("\tprset brlenspr = clock:birthdeath;\n")
880 f.write("\tprset speciationpr = uniform(0.0,10.0);\n")
881 f.write("\tprset extinctionpr = beta(2.0,4.0);\n")
882 f.write("\tprset sampleprob = 0.01;\n")
883 # Use the specified clock model:
884 f.write("\n")
885 if clock_model == clock_model.uncorrelated:
886 f.write("\tprset clockvarpr=igr;\n")
887 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n")
888 f.write("\tprset igrvarpr=exponential(1.0);\n")
889 else:
890 f.write("\tprset clockvarpr=strict;\n")
891 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n")
892 # Set the priors on the tree age depending on the date range for the origin of the collated work:
893 f.write("\n")
894 if self.origin_date_range[0] is not None:
895 min_tree_age = (
896 datetime.now().year - self.origin_date_range[1]
897 if self.origin_date_range[1] is not None
898 else 0.0
899 )
900 max_tree_age = datetime.now().year - self.origin_date_range[0]
901 f.write("\tprset treeagepr = uniform(%d,%d);\n" % (min_tree_age, max_tree_age))
902 else:
903 min_tree_age = (
904 datetime.now().year - self.origin_date_range[1]
905 if self.origin_date_range[1] is not None
906 else 0.0
907 )
908 f.write("\tprset treeagepr = offsetgamma(%d,1.0,1.0);\n" % (min_tree_age))
909 # Then calibrate the witness ages:
910 f.write("\n")
911 f.write("\tprset nodeagepr = calibrated;\n")
912 for i, wit in enumerate(self.witnesses):
913 taxlabel = taxlabels[i]
914 date_range = wit.date_range
915 if date_range[0] is not None:
916 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution,
917 # depending on whether the upper and lower bounds match:
918 min_age = datetime.now().year - date_range[1]
919 max_age = datetime.now().year - date_range[0]
920 if min_age == max_age:
921 f.write("\tcalibrate %s = fixed(%d);\n" % (taxlabel, min_age))
922 else:
923 f.write("\tcalibrate %s = uniform(%d,%d);\n" % (taxlabel, min_age, max_age))
924 else:
925 # If there is no lower bound on the witness's date, then use an offset gamma distribution:
926 min_age = datetime.now().year - date_range[1]
927 f.write("\tcalibrate %s = offsetgamma(%d,1.0,1.0);\n" % (taxlabel, min_age))
928 f.write("\n")
929 # Add default settings for MCMC estimation of posterior distribution:
930 f.write("\tmcmcp ngen=100000;\n")
931 # Write the command to run MrBayes:
932 f.write("\tmcmc;\n")
933 # End the assumptions block:
934 f.write("End;")
935 return
937 def get_hennig86_symbols(self):
938 """Returns a list of one-character symbols needed to represent the states of all substantive readings in Hennig86 format.
940 The number of symbols equals the maximum number of substantive readings at any variation unit.
942 Returns:
943 A list of individual characters representing states in readings.
944 """
945 possible_symbols = (
946 list(string.digits) + list(string.ascii_uppercase)[:22]
947 ) # NOTE: the maximum number of symbols allowed in Hennig86 format is 32
948 # The number of symbols needed is equal to the length of the longest substantive reading vector:
949 nsymbols = 0
950 # If there are no witnesses, then no symbols are needed at all:
951 if len(self.witnesses) == 0:
952 return []
953 wit_id = self.witnesses[0].id
954 for rdg_support in self.readings_by_witness[wit_id]:
955 nsymbols = max(nsymbols, len(rdg_support))
956 hennig86_symbols = possible_symbols[:nsymbols]
957 return hennig86_symbols
959 def to_hennig86(self, file_addr: Union[Path, str], drop_constant: bool = False):
960 """Writes this Collation to a file in Hennig86 format with the given address.
961 Note that because Hennig86 format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data.
963 Args:
964 file_addr: A string representing the path to an output file.
965 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
966 """
967 # Populate a list of sites that will correspond to columns of the sequence alignment:
968 substantive_variation_unit_ids = self.variation_unit_ids
969 if drop_constant:
970 substantive_variation_unit_ids = [
971 vu_id
972 for vu_id in self.variation_unit_ids
973 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
974 ]
975 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
976 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
977 # Start by calculating the values we will be using here:
978 ntax = len(self.witnesses)
979 nchar = len(substantive_variation_unit_ids)
980 taxlabels = []
981 for wit in self.witnesses:
982 taxlabel = wit.id
983 # Hennig86 format requires taxon names to start with a letter, so if this is not the case, then append "WIT_" to the start of the name:
984 if taxlabel[0] not in string.ascii_letters:
985 taxlabel = "WIT_" + taxlabel
986 # Then replace any disallowed characters in the string with an underscore:
987 taxlabel = slugify(taxlabel, lowercase=False, separator='_')
988 taxlabels.append(taxlabel)
989 max_taxlabel_length = max(
990 [len(taxlabel) for taxlabel in taxlabels]
991 ) # keep track of the longest taxon label for tabular alignment purposes
992 missing_symbol = '?'
993 symbols = self.get_hennig86_symbols()
994 # Generate all parent folders for this file that don't already exist:
995 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
996 with open(file_addr, "w", encoding="ascii") as f:
997 # Start with the nstates header:
998 f.write("nstates %d;\n" % len(symbols))
999 # Then begin the xread block:
1000 f.write("xread\n")
1001 # Write the dimensions:
1002 f.write("%d %d\n" % (nchar, ntax))
1003 # Now write the matrix:
1004 for i, wit in enumerate(self.witnesses):
1005 taxlabel = taxlabels[i]
1006 # Add enough space after this label ensure that all sequences are nicely aligned:
1007 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel) + 1))
1008 for j, vu_id in enumerate(self.variation_unit_ids):
1009 if vu_id not in substantive_variation_unit_ids_set:
1010 continue
1011 rdg_support = self.readings_by_witness[wit.id][j]
1012 # If this reading is lacunose in this witness, then use the missing character:
1013 if sum(rdg_support) == 0:
1014 sequence += missing_symbol
1015 continue
1016 rdg_inds = [
1017 k for k, w in enumerate(rdg_support) if w > 0
1018 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them
1019 # For singleton readings, just print the symbol:
1020 if len(rdg_inds) == 1:
1021 sequence += symbols[rdg_inds[0]]
1022 continue
1023 # For multiple readings, print the missing symbol:
1024 sequence += missing_symbol
1025 f.write("%s\n" % (sequence))
1026 f.write(";")
1027 return
1029 def get_phylip_symbols(self):
1030 """Returns a list of one-character symbols needed to represent the states of all substantive readings in PHYLIP format.
1032 The number of symbols equals the maximum number of substantive readings at any variation unit.
1034 Returns:
1035 A list of individual characters representing states in readings.
1036 """
1037 possible_symbols = (
1038 list(string.digits) + list(string.ascii_lowercase)[:22]
1039 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported
1040 # The number of symbols needed is equal to the length of the longest substantive reading vector:
1041 nsymbols = 0
1042 # If there are no witnesses, then no symbols are needed at all:
1043 if len(self.witnesses) == 0:
1044 return []
1045 wit_id = self.witnesses[0].id
1046 for rdg_support in self.readings_by_witness[wit_id]:
1047 nsymbols = max(nsymbols, len(rdg_support))
1048 phylip_symbols = possible_symbols[:nsymbols]
1049 return phylip_symbols
1051 def to_phylip(self, file_addr: Union[Path, str], drop_constant: bool = False):
1052 """Writes this Collation to a file in PHYLIP format with the given address.
1053 Note that because PHYLIP format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data.
1055 Args:
1056 file_addr: A string representing the path to an output file.
1057 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1058 """
1059 # Populate a list of sites that will correspond to columns of the sequence alignment:
1060 substantive_variation_unit_ids = self.variation_unit_ids
1061 if drop_constant:
1062 substantive_variation_unit_ids = [
1063 vu_id
1064 for vu_id in self.variation_unit_ids
1065 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1066 ]
1067 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1068 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1069 # Start by calculating the values we will be using here:
1070 ntax = len(self.witnesses)
1071 nchar = len(substantive_variation_unit_ids)
1072 taxlabels = []
1073 for wit in self.witnesses:
1074 taxlabel = wit.id
1075 # Then replace any disallowed characters in the string with an underscore:
1076 taxlabel = slugify(taxlabel, lowercase=False, separator='_')
1077 taxlabels.append(taxlabel)
1078 max_taxlabel_length = max(
1079 [len(taxlabel) for taxlabel in taxlabels]
1080 ) # keep track of the longest taxon label for tabular alignment purposes
1081 missing_symbol = '?'
1082 symbols = self.get_phylip_symbols()
1083 # Generate all parent folders for this file that don't already exist:
1084 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
1085 with open(file_addr, "w", encoding="ascii") as f:
1086 # Write the dimensions:
1087 f.write("%d %d\n" % (ntax, nchar))
1088 # Now write the matrix:
1089 for i, wit in enumerate(self.witnesses):
1090 taxlabel = taxlabels[i]
1091 # Add enough space after this label ensure that all sequences are nicely aligned:
1092 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel))) + "\t"
1093 for j, vu_id in enumerate(self.variation_unit_ids):
1094 if vu_id not in substantive_variation_unit_ids_set:
1095 continue
1096 rdg_support = self.readings_by_witness[wit.id][j]
1097 # If this reading is lacunose in this witness, then use the missing character:
1098 if sum(rdg_support) == 0:
1099 sequence += missing_symbol
1100 continue
1101 rdg_inds = [
1102 k for k, w in enumerate(rdg_support) if w > 0
1103 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them
1104 # For singleton readings, just print the symbol:
1105 if len(rdg_inds) == 1:
1106 sequence += symbols[rdg_inds[0]]
1107 continue
1108 # For multiple readings, print the missing symbol:
1109 sequence += missing_symbol
1110 f.write("%s\n" % (sequence))
1111 return
1113 def get_fasta_symbols(self):
1114 """Returns a list of one-character symbols needed to represent the states of all substantive readings in FASTA format.
1116 The number of symbols equals the maximum number of substantive readings at any variation unit.
1118 Returns:
1119 A list of individual characters representing states in readings.
1120 """
1121 possible_symbols = (
1122 list(string.digits) + list(string.ascii_lowercase)[:22]
1123 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported
1124 # The number of symbols needed is equal to the length of the longest substantive reading vector:
1125 nsymbols = 0
1126 # If there are no witnesses, then no symbols are needed at all:
1127 if len(self.witnesses) == 0:
1128 return []
1129 wit_id = self.witnesses[0].id
1130 for rdg_support in self.readings_by_witness[wit_id]:
1131 nsymbols = max(nsymbols, len(rdg_support))
1132 fasta_symbols = possible_symbols[:nsymbols]
1133 return fasta_symbols
1135 def to_fasta(self, file_addr: Union[Path, str], drop_constant: bool = False):
1136 """Writes this Collation to a file in FASTA format with the given address.
1137 Note that because FASTA format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data.
1139 Args:
1140 file_addr: A string representing the path to an output file.
1141 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1142 """
1143 # Populate a list of sites that will correspond to columns of the sequence alignment:
1144 substantive_variation_unit_ids = self.variation_unit_ids
1145 if drop_constant:
1146 substantive_variation_unit_ids = [
1147 vu_id
1148 for vu_id in self.variation_unit_ids
1149 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1150 ]
1151 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1152 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1153 # Start by calculating the values we will be using here:
1154 ntax = len(self.witnesses)
1155 nchar = len(substantive_variation_unit_ids)
1156 taxlabels = []
1157 for wit in self.witnesses:
1158 taxlabel = wit.id
1159 # Then replace any disallowed characters in the string with an underscore:
1160 taxlabel = slugify(taxlabel, lowercase=False, separator='_')
1161 taxlabels.append(taxlabel)
1162 max_taxlabel_length = max(
1163 [len(taxlabel) for taxlabel in taxlabels]
1164 ) # keep track of the longest taxon label for tabular alignment purposes
1165 missing_symbol = '?'
1166 symbols = self.get_fasta_symbols()
1167 # Generate all parent folders for this file that don't already exist:
1168 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
1169 with open(file_addr, "w", encoding="ascii") as f:
1170 # Now write the matrix:
1171 for i, wit in enumerate(self.witnesses):
1172 taxlabel = taxlabels[i]
1173 # Add enough space after this label ensure that all sequences are nicely aligned:
1174 sequence = ">%s\n" % taxlabel
1175 for j, vu_id in enumerate(self.variation_unit_ids):
1176 if vu_id not in substantive_variation_unit_ids_set:
1177 continue
1178 rdg_support = self.readings_by_witness[wit.id][j]
1179 # If this reading is lacunose in this witness, then use the missing character:
1180 if sum(rdg_support) == 0:
1181 sequence += missing_symbol
1182 continue
1183 rdg_inds = [
1184 k for k, w in enumerate(rdg_support) if w > 0
1185 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them
1186 # For singleton readings, just print the symbol:
1187 if len(rdg_inds) == 1:
1188 sequence += symbols[rdg_inds[0]]
1189 continue
1190 # For multiple readings, print the missing symbol:
1191 sequence += missing_symbol
1192 f.write("%s\n" % (sequence))
1193 return
1195 def get_beast_symbols(self):
1196 """Returns a list of one-character symbols needed to represent the states of all substantive readings in BEAST format.
1198 The number of symbols equals the maximum number of substantive readings at any variation unit.
1200 Returns:
1201 A list of individual characters representing states in readings.
1202 """
1203 possible_symbols = (
1204 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase)
1205 ) # NOTE: for BEAST, any number of states should theoretically be permissible, but since code maps are required for some reason, we will limit the number of symbols to 62 for now
1206 # The number of symbols needed is equal to the length of the longest substantive reading vector:
1207 nsymbols = 0
1208 # If there are no witnesses, then no symbols are needed at all:
1209 if len(self.witnesses) == 0:
1210 return []
1211 wit_id = self.witnesses[0].id
1212 for rdg_support in self.readings_by_witness[wit_id]:
1213 nsymbols = max(nsymbols, len(rdg_support))
1214 beast_symbols = possible_symbols[:nsymbols]
1215 return beast_symbols
1217 def get_tip_date_range(self):
1218 """Gets the minimum and maximum dates attested among the witnesses.
1219 Also checks if the witness with the latest possible date has a fixed date
1220 (i.e, if the lower and upper bounds for its date are the same)
1221 and issues a warning if not, as this will cause unusual behavior in BEAST 2.
1223 Returns:
1224 A tuple containing the earliest and latest possible tip dates.
1225 """
1226 earliest_date = None
1227 earliest_wit = None
1228 latest_date = None
1229 latest_wit = None
1230 for wit in self.witnesses:
1231 wit_id = wit.id
1232 date_range = wit.date_range
1233 if date_range[0] is not None:
1234 if earliest_date is not None:
1235 earliest_wit = wit if date_range[0] < earliest_date else earliest_wit
1236 earliest_date = min(date_range[0], earliest_date)
1237 else:
1238 earliest_wit = wit
1239 earliest_date = date_range[0]
1240 if date_range[1] is not None:
1241 if latest_date is not None:
1242 latest_wit = wit if date_range[1] > latest_date else latest_wit
1243 latest_date = max(date_range[1], latest_date)
1244 else:
1245 latest_wit = wit
1246 latest_date = date_range[1]
1247 if latest_wit.date_range[0] is None or latest_wit.date_range[0] != latest_wit.date_range[1]:
1248 print(
1249 "WARNING: the latest witness, %s, has a variable date range; this will result in problems with time-dependent substitution models and misalignment of trees in BEAST DensiTree outputs! Please ensure that witness %s has a fixed date."
1250 % (latest_wit.id, latest_wit.id)
1251 )
1252 return (earliest_date, latest_date)
1254 def get_beast_origin_span(self, tip_date_range):
1255 """Returns a tuple containing the lower and upper bounds for the height of the origin of the Birth-Death Skyline model.
1256 The upper bound on the height of the tree is the difference between the latest tip date
1257 and the lower bound on the date of the original work, if both are defined;
1258 otherwise, it is left undefined.
1259 The lower bound on the height of the tree is the difference between the latest tip date
1260 and the upper bound on the date of the original work, if both are defined;
1261 otherwise, it is the difference between the earliest tip date and the latest, if both are defined.
1263 Args:
1264 tip_date_range: A tuple containing the earliest and latest possible tip dates.
1266 Returns:
1267 A tuple containing lower and upper bounds on the origin height for the Birth-Death Skyline model.
1268 """
1269 origin_span = [0, None]
1270 # If the upper bound on the date of the work's composition is defined, then set the lower bound on the height of the origin using it and the latest tip date
1271 # (note that if it had to be defined in terms of witness date lower bounds, then this would have happened already):
1272 if self.origin_date_range[1] is not None:
1273 origin_span[0] = tip_date_range[1] - self.origin_date_range[1]
1274 # If the lower bound on the date of the work's composition is defined, then set the upper bound on the height of the origin using it and the latest tip date:
1275 if self.origin_date_range[0] is not None:
1276 origin_span[1] = tip_date_range[1] - self.origin_date_range[0]
1277 return tuple(origin_span)
1279 def get_beast_date_map(self, taxlabels):
1280 """Returns a string representing witness-to-date mappings in BEAST format.
1282 Since this format requires single dates as opposed to date ranges,
1283 witnesses with closed date ranges will be mapped to the average of their lower and upper bounds,
1284 and witnesses with open date ranges will not be mapped.
1286 Args:
1287 taxlabels: A list of slugified taxon labels.
1289 Returns:
1290 A string containing comma-separated date calibrations of the form witness_id=date.
1291 """
1292 calibrate_strings = []
1293 for i, wit in enumerate(self.witnesses):
1294 taxlabel = taxlabels[i]
1295 date_range = wit.date_range
1296 # If either end of this witness's date range is empty, then do not include it:
1297 if date_range[0] is None or date_range[1] is None:
1298 continue
1299 # Otherwise, take the midpoint of its date range as its date:
1300 date = int((date_range[0] + date_range[1]) / 2)
1301 calibrate_string = "%s=%d" % (taxlabel, date)
1302 calibrate_strings.append(calibrate_string)
1303 # Then output the full date map string:
1304 date_map = ",".join(calibrate_strings)
1305 return date_map
1307 def get_beast_code_map_for_unit(self, symbols, missing_symbol, vu_ind):
1308 """Returns a string containing state/reading code mappings in BEAST format using the given single-state and missing state symbols for the character/variation unit at the given index.
1309 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a code for a dummy state will be included.
1311 Args:
1312 vu_ind: An integer index for the desired unit.
1314 Returns:
1315 A string containing comma-separated code mappings.
1316 """
1317 vu = self.variation_units[vu_ind]
1318 vu_id = vu.id
1319 code_map = {}
1320 for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id])):
1321 code_map[symbols[k]] = str(k)
1322 # If this site is a singleton site, then add a code mapping for the dummy state:
1323 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1:
1324 code_map[symbols[1]] = str(1)
1325 # Then add a mapping for the missing state, including a dummy state if this is a singleton site:
1326 code_map[missing_symbol] = " ".join(
1327 str(k) for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id]))
1328 )
1329 # If this site is a singleton site, then add the dummy state to the missing state mapping:
1330 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1:
1331 code_map[missing_symbol] = code_map[missing_symbol] + " " + str(1)
1332 # Then combine all of the mappings into a single string:
1333 code_map_string = ", ".join([code + "=" + code_map[code] for code in code_map])
1334 return code_map_string
1336 def get_beast_equilibrium_frequencies_for_unit(self, vu_ind):
1337 """Returns a string containing state/reading equilibrium frequencies in BEAST format for the character/variation unit at the given index.
1338 Since the equilibrium frequencies are not used with the substitution models, the equilibrium frequencies simply correspond to a uniform distribution over the states.
1339 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then an equilibrium frequency of 0 will be added for a dummy state.
1341 Args:
1342 vu_ind: An integer index for the desired unit.
1344 Returns:
1345 A string containing space-separated equilibrium frequencies.
1346 """
1347 vu = self.variation_units[vu_ind]
1348 vu_id = vu.id
1349 # If this unit is a singleton, then return the string "0.5 0.5":
1350 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1:
1351 return "0.5 0.5"
1352 # Otherwise, set the equilibrium frequencies according to a uniform distribution:
1353 equilibrium_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len(
1354 self.substantive_readings_by_variation_unit_id[vu_id]
1355 )
1356 equilibrium_frequencies_string = " ".join([str(w) for w in equilibrium_frequencies])
1357 return equilibrium_frequencies_string
1359 def get_beast_root_frequencies_for_unit(self, vu_ind):
1360 """Returns a string containing state/reading root frequencies in BEAST format for the character/variation unit at the given index.
1361 The root frequencies are calculated from the intrinsic odds at this unit.
1362 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a root frequency of 0 will be added for a dummy state.
1363 If no intrinsic odds are specified, then a uniform distribution over all states is assumed.
1365 Args:
1366 vu_ind: An integer index for the desired unit.
1368 Returns:
1369 A string containing space-separated root frequencies.
1370 """
1371 vu = self.variation_units[vu_ind]
1372 vu_id = vu.id
1373 intrinsic_relations = vu.intrinsic_relations
1374 intrinsic_odds_by_id = self.intrinsic_odds_by_id
1375 # If this unit is a singleton, then return the string "1 0":
1376 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1:
1377 return "1 0"
1378 # If this unit has no intrinsic odds, then assume a uniform distribution over all readings:
1379 if len(intrinsic_relations) == 0:
1380 root_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len(
1381 self.substantive_readings_by_variation_unit_id[vu_id]
1382 )
1383 root_frequencies_string = " ".join([str(w) for w in root_frequencies])
1384 return root_frequencies_string
1385 # We will populate the root frequencies based on the intrinsic odds of the readings:
1386 root_frequencies_by_id = {}
1387 for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id]:
1388 root_frequencies_by_id[rdg_id] = 0
1389 # First, construct an adjacency list for efficient edge iteration:
1390 neighbors_by_source = {}
1391 for edge in intrinsic_relations:
1392 s = edge[0]
1393 t = edge[1]
1394 if s not in neighbors_by_source:
1395 neighbors_by_source[s] = []
1396 if t not in neighbors_by_source:
1397 neighbors_by_source[t] = []
1398 neighbors_by_source[s].append(t)
1399 # Next, identify all readings that are not targeted by any intrinsic odds relation:
1400 in_degree_by_reading = {}
1401 for edge in intrinsic_relations:
1402 s = edge[0]
1403 t = edge[1]
1404 if s not in in_degree_by_reading:
1405 in_degree_by_reading[s] = 0
1406 if t not in in_degree_by_reading:
1407 in_degree_by_reading[t] = 0
1408 in_degree_by_reading[t] += 1
1409 starting_nodes = [t for t in in_degree_by_reading if in_degree_by_reading[t] == 0]
1410 # Set the root frequencies for these readings to 1 (they will be normalized later):
1411 for starting_node in starting_nodes:
1412 root_frequencies_by_id[starting_node] = 1.0
1413 # Next, set the frequencies for the remaining readings recursively using the adjacency list:
1414 def update_root_frequencies(s):
1415 for t in neighbors_by_source[s]:
1416 intrinsic_category = intrinsic_relations[(s, t)]
1417 odds = (
1418 intrinsic_odds_by_id[intrinsic_category]
1419 if intrinsic_odds_by_id[intrinsic_category] is not None
1420 else 1.0
1421 ) # TODO: This needs to be handled using parameters once we have it implemented in BEAST
1422 root_frequencies_by_id[t] = root_frequencies_by_id[s] / odds
1423 update_root_frequencies(t)
1424 return
1426 for starting_node in starting_nodes:
1427 update_root_frequencies(starting_node)
1428 # Then produce a normalized vector of root frequencies that corresponds to a probability distribution:
1429 root_frequencies = [
1430 root_frequencies_by_id[rdg_id] for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id]
1431 ]
1432 total_frequencies = sum(root_frequencies)
1433 for k in range(len(root_frequencies)):
1434 root_frequencies[k] = root_frequencies[k] / total_frequencies
1435 root_frequencies_string = " ".join([str(w) for w in root_frequencies])
1436 return root_frequencies_string
1438 def to_beast(
1439 self,
1440 file_addr: Union[Path, str],
1441 drop_constant: bool = False,
1442 clock_model: ClockModel = ClockModel.strict,
1443 ancestral_logger: AncestralLogger = AncestralLogger.state,
1444 seed: int = None,
1445 ):
1446 """Writes this Collation to a file in BEAST format with the given address.
1448 Args:
1449 file_addr: A string representing the path to an output file.
1450 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1451 clock_model: A ClockModel option indicating which clock model to use.
1452 ancestral_logger: An AncestralLogger option indicating which class of logger (if any) to use for ancestral states.
1453 seed: A seed for random number generation (for setting initial values of unspecified transcriptional rates).
1454 """
1455 # Populate a list of sites that will correspond to columns of the sequence alignment:
1456 substantive_variation_unit_ids = self.variation_unit_ids
1457 if drop_constant:
1458 substantive_variation_unit_ids = [
1459 vu_id
1460 for vu_id in self.variation_unit_ids
1461 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1462 ]
1463 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1464 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1465 # First, calculate the values we will be using for the main template:
1466 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses]
1467 missing_symbol = '?'
1468 symbols = self.get_beast_symbols()
1469 tip_date_range = self.get_tip_date_range()
1470 origin_span = self.get_beast_origin_span(tip_date_range)
1471 date_map = self.get_beast_date_map(taxlabels)
1472 # Then populate the necessary objects for the BEAST XML Jinja template:
1473 witness_objects = []
1474 variation_unit_objects = []
1475 intrinsic_category_objects = []
1476 transcriptional_category_objects = []
1477 # Start with witnesses:
1478 for i, wit in enumerate(self.witnesses):
1479 witness_object = {}
1480 # Copy the ID for this witness:
1481 witness_object["id"] = wit.id
1482 # Copy its date bounds:
1483 witness_object["min_date"] = wit.date_range[0]
1484 witness_object["max_date"] = wit.date_range[1]
1485 # Populate its sequence from its entries in the witness's readings dictionary:
1486 sequence = ""
1487 for j, rdg_support in enumerate(self.readings_by_witness[wit.id]):
1488 vu_id = self.variation_unit_ids[j]
1489 # Skip any variation units deemed non-substantive:
1490 if vu_id not in substantive_variation_unit_ids:
1491 continue
1492 # If this witness has a certainty of 0 for all readings, then it is a gap; assign a likelihood of 1 to each reading:
1493 if sum(rdg_support) == 0:
1494 for k, w in enumerate(rdg_support):
1495 sequence += "1"
1496 if k < len(rdg_support) - 1:
1497 sequence += ", "
1498 else:
1499 if len(rdg_support) > 1:
1500 sequence += "; "
1501 else:
1502 # If this site is a singleton site, then add a dummy state:
1503 sequence += ", 0; "
1504 # Otherwise, read the probabilities as they are given:
1505 else:
1506 for k, w in enumerate(rdg_support):
1507 sequence += str(w)
1508 if k < len(rdg_support) - 1:
1509 sequence += ", "
1510 else:
1511 if len(rdg_support) > 1:
1512 sequence += "; "
1513 else:
1514 # If this site is a singleton site, then add a dummy state:
1515 sequence += ", 0; "
1516 # Strip the final semicolon and space from the sequence:
1517 sequence = sequence.strip("; ")
1518 # Then set the witness object's sequence attribute to this string:
1519 witness_object["sequence"] = sequence
1520 witness_objects.append(witness_object)
1521 # Then proceed to variation units:
1522 for j, vu in enumerate(self.variation_units):
1523 if vu.id not in substantive_variation_unit_ids_set:
1524 continue
1525 variation_unit_object = {}
1526 # Copy the ID of this variation unit:
1527 variation_unit_object["id"] = vu.id
1528 # Copy this variation unit's number of substantive readings,
1529 # setting it to 2 if it is a singleton unit:
1530 variation_unit_object["nstates"] = (
1531 len(self.substantive_readings_by_variation_unit_id[vu.id])
1532 if len(self.substantive_readings_by_variation_unit_id[vu.id]) > 1
1533 else 2
1534 )
1535 # Then construct the code map for this unit:
1536 variation_unit_object["code_map"] = self.get_beast_code_map_for_unit(symbols, missing_symbol, j)
1537 # Then populate a comma-separated string of reading labels for this unit:
1538 rdg_texts = []
1539 vu_label = vu.id
1540 for rdg in vu.readings:
1541 key = tuple([vu.id, rdg.id])
1542 if key not in substantive_variation_unit_reading_tuples_set:
1543 continue
1544 rdg_text = slugify(rdg.text, lowercase=False, allow_unicode=True, separator='_')
1545 # Replace any empty reading text with an omission marker:
1546 if rdg_text == "":
1547 rdg_text = "om."
1548 rdg_texts.append(rdg_text)
1549 # If this site is a singleton site, then add a dummy reading for the dummy state:
1550 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1:
1551 rdg_texts.append("DUMMY")
1552 rdg_texts_string = ", ".join(rdg_texts)
1553 variation_unit_object["rdg_texts"] = rdg_texts_string
1554 # Then populate this unit's equilibrium frequency string and its root frequency string:
1555 variation_unit_object["equilibrium_frequencies"] = self.get_beast_equilibrium_frequencies_for_unit(j)
1556 variation_unit_object["root_frequencies"] = self.get_beast_root_frequencies_for_unit(j)
1557 # Then populate a dictionary mapping epoch height ranges to lists of off-diagonal entries for substitution models:
1558 rate_objects_by_epoch_height_range = {}
1559 epoch_height_ranges = []
1560 # Then proceed based on whether the transcriptional relations for this variation unit have been defined:
1561 if len(vu.transcriptional_relations_by_date_range) == 0:
1562 # If there are no transcriptional relations, then map the epoch range of (None, None) to their list of off-diagonal entries:
1563 epoch_height_ranges.append((None, None))
1564 rate_objects_by_epoch_height_range[(None, None)] = []
1565 rate_objects = rate_objects_by_epoch_height_range[(None, None)]
1566 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1:
1567 # If this is a singleton site, then use an arbitrary 2x2 rate matrix:
1568 rate_objects.append({"transcriptional_categories": ["default"], "expression": None})
1569 rate_objects.append({"transcriptional_categories": ["default"], "expression": None})
1570 else:
1571 # If this is a site with multiple substantive readings, but no transcriptional relations list,
1572 # then use a Lewis Mk substitution matrix with the appropriate number of states:
1573 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]):
1574 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]):
1575 # Skip diagonal elements:
1576 if k_1 == k_2:
1577 continue
1578 rate_objects.append({"transcriptional_categories": ["default"], "expression": None})
1579 else:
1580 # Otherwise, proceed for every date range:
1581 for date_range in vu.transcriptional_relations_by_date_range:
1582 # Get the map of transcriptional relations for reference later:
1583 transcriptional_relations = vu.transcriptional_relations_by_date_range[date_range]
1584 # Now get the epoch height range corresponding to this date range, and initialize its list in the dictionary:
1585 epoch_height_range = [None, None]
1586 epoch_height_range[0] = tip_date_range[1] - date_range[1] if date_range[1] is not None else None
1587 epoch_height_range[1] = tip_date_range[1] - date_range[0] if date_range[0] is not None else None
1588 epoch_height_range = tuple(epoch_height_range)
1589 epoch_height_ranges.append(epoch_height_range)
1590 rate_objects_by_epoch_height_range[epoch_height_range] = []
1591 rate_objects = rate_objects_by_epoch_height_range[epoch_height_range]
1592 # Then proceed for every pair of readings in this unit:
1593 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]):
1594 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]):
1595 # Skip diagonal elements:
1596 if k_1 == k_2:
1597 continue
1598 # If the first reading has no transcriptional relation to the second in this unit, then use the default rate:
1599 if (rdg_id_1, rdg_id_2) not in transcriptional_relations:
1600 rate_objects.append({"transcriptional_categories": ["default"], "expression": None})
1601 continue
1602 # Otherwise, if only one category of transcriptional relations holds between the first and second readings,
1603 # then use its rate:
1604 if len(transcriptional_relations[(rdg_id_1, rdg_id_2)]) == 1:
1605 # If there is only one such category, then add its rate as a standalone var element:
1606 transcriptional_category = list(transcriptional_relations[(rdg_id_1, rdg_id_2)])[0]
1607 rate_objects.append(
1608 {"transcriptional_categories": [transcriptional_category], "expression": None}
1609 )
1610 continue
1611 # If there is more than one, then add a var element that is a sum of the individual categories' rates:
1612 transcriptional_categories = list(transcriptional_relations[(rdg_id_1, rdg_id_2)])
1613 args = []
1614 for transcriptional_category in transcriptional_categories:
1615 args.append("%s_rate" % transcriptional_category)
1616 args_string = " ".join(args)
1617 ops = ["+"] * (len(args) - 1)
1618 ops_string = " ".join(ops)
1619 expression_string = " ".join([args_string, ops_string])
1620 rate_objects.append(
1621 {
1622 "transcriptional_categories": transcriptional_categories,
1623 "expression": expression_string,
1624 }
1625 )
1626 # Now reorder the list of epoch height ranges, and get a list of non-null epoch dates in ascending order from the dictionary:
1627 epoch_height_ranges.reverse()
1628 epoch_heights = [
1629 epoch_height_range[0] for epoch_height_range in epoch_height_ranges if epoch_height_range[0] is not None
1630 ]
1631 # Then add all of these data structures to the variation unit object:
1632 variation_unit_object["epoch_heights"] = epoch_heights
1633 variation_unit_object["epoch_heights_string"] = " ".join(
1634 [str(epoch_height) for epoch_height in epoch_heights]
1635 )
1636 variation_unit_object["epoch_height_ranges"] = epoch_height_ranges
1637 variation_unit_object["epoch_rates"] = [
1638 rate_objects_by_epoch_height_range[epoch_height_range] for epoch_height_range in epoch_height_ranges
1639 ]
1640 variation_unit_objects.append(variation_unit_object)
1641 # Then proceed to intrinsic odds categories:
1642 for intrinsic_category in self.intrinsic_categories:
1643 intrinsic_category_object = {}
1644 # Copy the ID of this intrinsic category:
1645 intrinsic_category_object["id"] = intrinsic_category
1646 # Then copy the odds factors associated with this intrinsic category,
1647 # setting it to 1.0 if it is not specified and setting the estimate attribute accordingly:
1648 odds = self.intrinsic_odds_by_id[intrinsic_category]
1649 intrinsic_category_object["odds"] = odds if odds is not None else 1.0
1650 intrinsic_category_object["estimate"] = "false" if odds is not None else "true"
1651 intrinsic_category_objects.append(intrinsic_category_object)
1652 # Then proceed to transcriptional rate categories:
1653 rng = np.random.default_rng(seed)
1654 for transcriptional_category in self.transcriptional_categories:
1655 transcriptional_category_object = {}
1656 # Copy the ID of this transcriptional category:
1657 transcriptional_category_object["id"] = transcriptional_category
1658 # Then copy the rate of this transcriptional category,
1659 # setting it to a random number sampled from a Gamma distribution if it is not specified and setting the estimate attribute accordingly:
1660 rate = self.transcriptional_rates_by_id[transcriptional_category]
1661 transcriptional_category_object["rate"] = rate if rate is not None else rng.gamma(5.0, 2.0)
1662 transcriptional_category_object["estimate"] = "false" if rate is not None else "true"
1663 transcriptional_category_objects.append(transcriptional_category_object)
1664 # Now render the output XML file using the Jinja template:
1665 env = Environment(loader=PackageLoader("teiphy", "templates"), autoescape=select_autoescape())
1666 template = env.get_template("beast_template.xml")
1667 rendered = template.render(
1668 nsymbols=len(symbols),
1669 date_map=date_map,
1670 origin_span=origin_span,
1671 clock_model=clock_model.value,
1672 clock_rate_categories=2 * len(self.witnesses) - 1,
1673 ancestral_logger=ancestral_logger.value,
1674 witnesses=witness_objects,
1675 variation_units=variation_unit_objects,
1676 intrinsic_categories=intrinsic_category_objects,
1677 transcriptional_categories=transcriptional_category_objects,
1678 )
1679 # Generate all parent folders for this file that don't already exist:
1680 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
1681 with open(file_addr, "w", encoding="utf-8") as f:
1682 f.write(rendered)
1683 return
1685 def to_numpy(self, drop_constant: bool = False, split_missing: bool = True):
1686 """Returns this Collation in the form of a NumPy array, along with arrays of its row and column labels.
1688 Args:
1689 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1690 split_missing: An optional flag indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings; if False, then missing data is ignored (i.e., all states are 0). Default value is True.
1692 Returns:
1693 A NumPy array with a row for each substantive reading and a column for each witness.
1694 A list of substantive reading ID strings.
1695 A list of witness ID strings.
1696 """
1697 # Populate a list of sites that will correspond to columns of the sequence alignment:
1698 substantive_variation_unit_ids = self.variation_unit_ids
1699 if drop_constant:
1700 substantive_variation_unit_ids = [
1701 vu_id
1702 for vu_id in self.variation_unit_ids
1703 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1704 ]
1705 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1706 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1707 # Initialize the output array with the appropriate dimensions:
1708 reading_labels = []
1709 for vu in self.variation_units:
1710 if vu.id not in substantive_variation_unit_ids_set:
1711 continue
1712 for rdg in vu.readings:
1713 key = tuple([vu.id, rdg.id])
1714 if key in substantive_variation_unit_reading_tuples_set:
1715 reading_labels.append(vu.id + ", " + rdg.text)
1716 witness_labels = [wit.id for wit in self.witnesses]
1717 matrix = np.zeros((len(reading_labels), len(witness_labels)), dtype=float)
1718 # Then populate it with the appropriate values:
1719 col_ind = 0
1720 for i, wit in enumerate(self.witnesses):
1721 row_ind = 0
1722 for j, vu_id in enumerate(self.variation_unit_ids):
1723 if vu_id not in substantive_variation_unit_ids_set:
1724 continue
1725 rdg_support = self.readings_by_witness[wit.id][j]
1726 # If this reading support vector sums to 0, then this is missing data; handle it as specified:
1727 if sum(rdg_support) == 0:
1728 if split_missing:
1729 for i in range(len(rdg_support)):
1730 matrix[row_ind, col_ind] = 1 / len(rdg_support)
1731 row_ind += 1
1732 else:
1733 row_ind += len(rdg_support)
1734 # Otherwise, add its coefficients normally:
1735 else:
1736 for i in range(len(rdg_support)):
1737 matrix[row_ind, col_ind] = rdg_support[i]
1738 row_ind += 1
1739 col_ind += 1
1740 return matrix, reading_labels, witness_labels
1742 def to_distance_matrix(self, drop_constant: bool = False, proportion: bool = False, show_ext: bool = False):
1743 """Transforms this Collation into a NumPy distance matrix between witnesses, along with an array of its labels for the witnesses.
1744 Distances can be computed either as counts of disagreements (the default setting), or as proportions of disagreements over all variation units where both witnesses have singleton readings.
1745 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of disagreements.
1747 Args:
1748 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1749 Default value is False.
1750 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units.
1751 Default value is False.
1752 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix
1753 should include the number of their extant, unambiguous variation units after the number of their disagreements.
1754 Default value is False.
1756 Returns:
1757 A NumPy distance matrix with a row and column for each witness.
1758 A list of witness ID strings.
1759 """
1760 # Populate a list of sites that will correspond to columns of the sequence alignment:
1761 substantive_variation_unit_ids = self.variation_unit_ids
1762 if drop_constant:
1763 substantive_variation_unit_ids = [
1764 vu_id
1765 for vu_id in self.variation_unit_ids
1766 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1767 ]
1768 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1769 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1770 # Initialize the output array with the appropriate dimensions:
1771 witness_labels = [wit.id for wit in self.witnesses]
1772 # The type of the matrix will depend on the input options:
1773 matrix = None
1774 if show_ext:
1775 matrix = np.full(
1776 (len(witness_labels), len(witness_labels)), "NA", dtype=object
1777 ) # strings of the form "disagreements/extant"
1778 elif proportion:
1779 matrix = np.full(
1780 (len(witness_labels), len(witness_labels)), 0.0, dtype=float
1781 ) # floats of the form disagreements/extant
1782 else:
1783 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form disagreements
1784 for i, wit_1 in enumerate(witness_labels):
1785 for j, wit_2 in enumerate(witness_labels):
1786 extant_units = 0
1787 disagreements = 0
1788 # If either of the cells for this pair of witnesses has been populated already,
1789 # then just copy the entry from the other side of the diagonal without recalculating:
1790 if i > j:
1791 matrix[i, j] = matrix[j, i]
1792 continue
1793 # Otherwise, calculate the number of units where both witnesses have unambiguous readings
1794 # and the number of units where they disagree:
1795 for k, vu_id in enumerate(self.variation_unit_ids):
1796 if vu_id not in substantive_variation_unit_ids_set:
1797 continue
1798 wit_1_rdg_support = self.readings_by_witness[wit_1][k]
1799 wit_2_rdg_support = self.readings_by_witness[wit_2][k]
1800 wit_1_rdg_inds = [l for l, w in enumerate(wit_1_rdg_support) if w > 0]
1801 wit_2_rdg_inds = [l for l, w in enumerate(wit_2_rdg_support) if w > 0]
1802 if len(wit_1_rdg_inds) != 1 or len(wit_2_rdg_inds) != 1:
1803 continue
1804 extant_units += 1
1805 if wit_1_rdg_inds[0] != wit_2_rdg_inds[0]:
1806 disagreements += 1
1807 cell_entry = None
1808 if proportion:
1809 cell_entry = disagreements / max(
1810 extant_units, 1
1811 ) # the max in the denominator is to prevent division by 0; the distance entry will be 0 if the two witnesses have no overlap
1812 else:
1813 cell_entry = disagreements
1814 if show_ext:
1815 cell_entry = str(cell_entry) + "/" + str(extant_units)
1816 matrix[i, j] = cell_entry
1817 return matrix, witness_labels
1819 def to_similarity_matrix(self, drop_constant: bool = False, proportion: bool = False, show_ext: bool = False):
1820 """Transforms this Collation into a NumPy similarity matrix between witnesses, along with an array of its labels for the witnesses.
1821 Similarities can be computed either as counts of agreements (the default setting), or as proportions of agreements over all variation units where both witnesses have singleton readings.
1822 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of agreements.
1824 Args:
1825 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1826 Default value is False.
1827 proportion (bool, optional): An optional flag indicating whether or not to calculate similarities as proportions over extant, unambiguous variation units.
1828 Default value is False.
1829 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix
1830 should include the number of their extant, unambiguous variation units after the number of agreements.
1831 Default value is False.
1833 Returns:
1834 A NumPy distance matrix with a row and column for each witness.
1835 A list of witness ID strings.
1836 """
1837 # Populate a list of sites that will correspond to columns of the sequence alignment:
1838 substantive_variation_unit_ids = self.variation_unit_ids
1839 if drop_constant:
1840 substantive_variation_unit_ids = [
1841 vu_id
1842 for vu_id in self.variation_unit_ids
1843 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1844 ]
1845 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1846 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1847 # Initialize the output array with the appropriate dimensions:
1848 witness_labels = [wit.id for wit in self.witnesses]
1849 # The type of the matrix will depend on the input options:
1850 matrix = None
1851 if show_ext:
1852 matrix = np.full(
1853 (len(witness_labels), len(witness_labels)), "NA", dtype=object
1854 ) # strings of the form "agreements/extant"
1855 elif proportion:
1856 matrix = np.full(
1857 (len(witness_labels), len(witness_labels)), 0.0, dtype=float
1858 ) # floats of the form agreements/extant
1859 else:
1860 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form agreements
1861 for i, wit_1 in enumerate(witness_labels):
1862 for j, wit_2 in enumerate(witness_labels):
1863 extant_units = 0
1864 agreements = 0
1865 # If either of the cells for this pair of witnesses has been populated already,
1866 # then just copy the entry from the other side of the diagonal without recalculating:
1867 if i > j:
1868 matrix[i, j] = matrix[j, i]
1869 continue
1870 # Otherwise, calculate the number of units where both witnesses have unambiguous readings
1871 # and the number of units where they agree:
1872 for k, vu_id in enumerate(self.variation_unit_ids):
1873 if vu_id not in substantive_variation_unit_ids_set:
1874 continue
1875 wit_1_rdg_support = self.readings_by_witness[wit_1][k]
1876 wit_2_rdg_support = self.readings_by_witness[wit_2][k]
1877 wit_1_rdg_inds = [l for l, w in enumerate(wit_1_rdg_support) if w > 0]
1878 wit_2_rdg_inds = [l for l, w in enumerate(wit_2_rdg_support) if w > 0]
1879 if len(wit_1_rdg_inds) != 1 or len(wit_2_rdg_inds) != 1:
1880 continue
1881 extant_units += 1
1882 if wit_1_rdg_inds[0] == wit_2_rdg_inds[0]:
1883 agreements += 1
1884 cell_entry = None
1885 if proportion:
1886 cell_entry = agreements / max(
1887 extant_units, 1
1888 ) # the max in the denominator is to prevent division by 0; the distance entry will be 0 if the two witnesses have no overlap
1889 else:
1890 cell_entry = agreements
1891 if show_ext:
1892 cell_entry = str(cell_entry) + "/" + str(extant_units)
1893 matrix[i, j] = cell_entry
1894 return matrix, witness_labels
1896 def to_nexus_table(self, drop_constant: bool = False, ambiguous_as_missing: bool = False):
1897 """Returns this Collation in the form of a table with rows for taxa, columns for characters, and reading IDs in cells.
1899 Args:
1900 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1901 Default value is False.
1902 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data.
1903 Default value is False.
1905 Returns:
1906 A NumPy array with rows for taxa, columns for characters, and reading IDs in cells.
1907 A list of substantive reading ID strings.
1908 A list of witness ID strings.
1909 """
1910 # Populate a list of sites that will correspond to columns of the sequence alignment:
1911 substantive_variation_unit_ids = self.variation_unit_ids
1912 if drop_constant:
1913 substantive_variation_unit_ids = [
1914 vu_id
1915 for vu_id in self.variation_unit_ids
1916 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1917 ]
1918 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1919 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1920 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary
1921 # to the readings' IDs:
1922 reading_ids_by_indices = {}
1923 for j, vu in enumerate(self.variation_units):
1924 if vu.id not in substantive_variation_unit_ids_set:
1925 continue
1926 k = 0
1927 for rdg in vu.readings:
1928 key = tuple([vu.id, rdg.id])
1929 if key not in substantive_variation_unit_reading_tuples_set:
1930 continue
1931 indices = tuple([j, k])
1932 reading_ids_by_indices[indices] = rdg.id
1933 k += 1
1934 # Initialize the output array with the appropriate dimensions:
1935 missing_symbol = '?'
1936 witness_labels = [wit.id for wit in self.witnesses]
1937 matrix = np.full(
1938 (len(witness_labels), len(substantive_variation_unit_ids)), missing_symbol, dtype=object
1939 ) # use dtype=object because the maximum string length is not known up front
1940 # Then populate it with the appropriate values:
1941 row_ind = 0
1942 for i, wit in enumerate(self.witnesses):
1943 col_ind = 0
1944 for j, vu in enumerate(self.variation_units):
1945 if vu.id not in substantive_variation_unit_ids_set:
1946 continue
1947 rdg_support = self.readings_by_witness[wit.id][j]
1948 # If this reading support vector sums to 0, then this is missing data; handle it as specified:
1949 if sum(rdg_support) == 0:
1950 matrix[row_ind, col_ind] = missing_symbol
1951 # Otherwise, add its coefficients normally:
1952 else:
1953 rdg_inds = [
1954 k for k, w in enumerate(rdg_support) if w > 0
1955 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them
1956 # For singleton readings, just print the reading ID:
1957 if len(rdg_inds) == 1:
1958 k = rdg_inds[0]
1959 matrix[row_ind, col_ind] = reading_ids_by_indices[(j, k)]
1960 # For multiple readings, print the corresponding reading IDs in braces or the missing symbol depending on input settings:
1961 else:
1962 if ambiguous_as_missing:
1963 matrix[row_ind, col_ind] = missing_symbol
1964 else:
1965 matrix[row_ind, col_ind] = "{%s}" % " ".join(
1966 [reading_ids_by_indices[(j, k)] for k in rdg_inds]
1967 )
1968 col_ind += 1
1969 row_ind += 1
1970 return matrix, witness_labels, substantive_variation_unit_ids
1972 def to_long_table(self, drop_constant: bool = False):
1973 """Returns this Collation in the form of a long table with columns for taxa, characters, reading indices, and reading values.
1974 Note that this method treats ambiguous readings as missing data.
1976 Args:
1977 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1978 Default value is False.
1980 Returns:
1981 A NumPy array with columns for taxa, characters, reading indices, and reading values, and rows for each combination of these values in the matrix.
1982 A list of column label strings.
1983 """
1984 # Populate a list of sites that will correspond to columns of the sequence alignment:
1985 substantive_variation_unit_ids = self.variation_unit_ids
1986 if drop_constant:
1987 substantive_variation_unit_ids = [
1988 vu_id
1989 for vu_id in self.variation_unit_ids
1990 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1991 ]
1992 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1993 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1994 # Initialize the outputs:
1995 column_labels = ["taxon", "character", "state", "value"]
1996 long_table_list = []
1997 # Populate a dictionary mapping (variation unit index, reading index) tuples to reading texts:
1998 reading_texts_by_indices = {}
1999 for j, vu in enumerate(self.variation_units):
2000 if vu.id not in substantive_variation_unit_ids_set:
2001 continue
2002 k = 0
2003 for rdg in vu.readings:
2004 key = tuple([vu.id, rdg.id])
2005 if key not in substantive_variation_unit_reading_tuples_set:
2006 continue
2007 indices = tuple([j, k])
2008 reading_texts_by_indices[indices] = rdg.text
2009 k += 1
2010 # Then populate the output list with the appropriate values:
2011 witness_labels = [wit.id for wit in self.witnesses]
2012 missing_symbol = '?'
2013 for i, wit in enumerate(self.witnesses):
2014 row_ind = 0
2015 for j, vu_id in enumerate(self.variation_unit_ids):
2016 if vu_id not in substantive_variation_unit_ids_set:
2017 continue
2018 rdg_support = self.readings_by_witness[wit.id][j]
2019 # Populate a list of nonzero coefficients for this reading support vector:
2020 rdg_inds = [k for k, w in enumerate(rdg_support) if w > 0]
2021 # If this list does not consist of exactly one reading, then treat it as missing data:
2022 if len(rdg_inds) != 1:
2023 long_table_list.append([wit.id, vu_id, missing_symbol, missing_symbol])
2024 continue
2025 k = rdg_inds[0]
2026 rdg_text = reading_texts_by_indices[(j, k)]
2027 # Replace empty reading texts with the omission placeholder:
2028 if rdg_text == "":
2029 rdg_text = "om."
2030 long_table_list.append([wit.id, vu_id, k, rdg_text])
2031 # Then convert the long table entries list to a NumPy array:
2032 long_table = np.array(long_table_list)
2033 return long_table, column_labels
2035 def to_dataframe(
2036 self,
2037 drop_constant: bool = False,
2038 ambiguous_as_missing: bool = False,
2039 proportion: bool = False,
2040 table_type: TableType = TableType.matrix,
2041 split_missing: bool = True,
2042 show_ext: bool = False,
2043 ):
2044 """Returns this Collation in the form of a Pandas DataFrame array, including the appropriate row and column labels.
2046 Args:
2047 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
2048 Default value is False.
2049 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data.
2050 Default value is False.
2051 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units.
2052 Default value is False.
2053 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate.
2054 Only applicable for tabular outputs.
2055 Default value is "matrix".
2056 split_missing: An optional flag indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings;
2057 if False, then missing data is ignored (i.e., all states are 0).
2058 Default value is True.
2059 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix
2060 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements.
2061 Only applicable for tabular output formats of type \"distance\" or \"similarity\".
2062 Default value is False.
2064 Returns:
2065 A Pandas DataFrame corresponding to a collation matrix with reading frequencies or a long table with discrete reading states.
2066 """
2067 df = None
2068 # Proceed based on the table type:
2069 if table_type == TableType.matrix:
2070 # Convert the collation to a NumPy array and get its row and column labels first:
2071 matrix, reading_labels, witness_labels = self.to_numpy(
2072 drop_constant=drop_constant, split_missing=split_missing
2073 )
2074 df = pd.DataFrame(matrix, index=reading_labels, columns=witness_labels)
2075 elif table_type == TableType.distance:
2076 # Convert the collation to a NumPy array and get its row and column labels first:
2077 matrix, witness_labels = self.to_distance_matrix(
2078 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext
2079 )
2080 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels)
2081 elif table_type == TableType.similarity:
2082 # Convert the collation to a NumPy array and get its row and column labels first:
2083 matrix, witness_labels = self.to_similarity_matrix(
2084 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext
2085 )
2086 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels)
2087 elif table_type == TableType.nexus:
2088 # Convert the collation to a NumPy array and get its row and column labels first:
2089 matrix, witness_labels, vu_labels = self.to_nexus_table(
2090 drop_constant=drop_constant, ambiguous_as_missing=ambiguous_as_missing
2091 )
2092 df = pd.DataFrame(matrix, index=witness_labels, columns=vu_labels)
2093 elif table_type == TableType.long:
2094 # Convert the collation to a long table and get its column labels first:
2095 long_table, column_labels = self.to_long_table(drop_constant=drop_constant)
2096 df = pd.DataFrame(long_table, columns=column_labels)
2097 return df
2099 def to_csv(
2100 self,
2101 file_addr: Union[Path, str],
2102 drop_constant: bool = False,
2103 ambiguous_as_missing: bool = False,
2104 proportion: bool = False,
2105 table_type: TableType = TableType.matrix,
2106 split_missing: bool = True,
2107 show_ext: bool = False,
2108 **kwargs
2109 ):
2110 """Writes this Collation to a comma-separated value (CSV) file with the given address.
2112 If your witness IDs are numeric (e.g., Gregory-Aland numbers), then they will be written in full to the CSV file, but Excel will likely interpret them as numbers and truncate any leading zeroes!
2114 Args:
2115 file_addr: A string representing the path to an output CSV file; the file type should be .csv.
2116 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading.
2117 Default value is False.
2118 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data.
2119 Default value is False.
2120 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units.
2121 Default value is False.
2122 table_type: A TableType option indicating which type of tabular output to generate.
2123 Only applicable for tabular outputs.
2124 Default value is "matrix".
2125 split_missing: An optional flag indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings;
2126 if False, then missing data is ignored (i.e., all states are 0).
2127 Default value is True.
2128 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix
2129 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements.
2130 Only applicable for tabular output formats of type \"distance\" or \"similarity\".
2131 Default value is False.
2132 **kwargs: Keyword arguments for pandas.DataFrame.to_csv.
2133 """
2134 # Convert the collation to a Pandas DataFrame first:
2135 df = self.to_dataframe(
2136 drop_constant=drop_constant,
2137 ambiguous_as_missing=ambiguous_as_missing,
2138 proportion=proportion,
2139 table_type=table_type,
2140 split_missing=split_missing,
2141 show_ext=show_ext,
2142 )
2143 # Generate all parent folders for this file that don't already exist:
2144 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
2145 # Proceed based on the table type:
2146 if table_type == TableType.long:
2147 return df.to_csv(
2148 file_addr, encoding="utf-8-sig", index=False, **kwargs
2149 ) # add BOM to start of file so that Excel will know to read it as Unicode
2150 return df.to_csv(
2151 file_addr, encoding="utf-8-sig", **kwargs
2152 ) # add BOM to start of file so that Excel will know to read it as Unicode
2154 def to_excel(
2155 self,
2156 file_addr: Union[Path, str],
2157 drop_constant: bool = False,
2158 ambiguous_as_missing: bool = False,
2159 proportion: bool = False,
2160 table_type: TableType = TableType.matrix,
2161 split_missing: bool = True,
2162 show_ext: bool = False,
2163 ):
2164 """Writes this Collation to an Excel (.xlsx) file with the given address.
2166 Since Pandas is deprecating its support for xlwt, specifying an output in old Excel (.xls) output is not recommended.
2168 Args:
2169 file_addr: A string representing the path to an output Excel file; the file type should be .xlsx.
2170 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading.
2171 Default value is False.
2172 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data.
2173 Default value is False.
2174 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units.
2175 Default value is False.
2176 table_type: A TableType option indicating which type of tabular output to generate.
2177 Only applicable for tabular outputs.
2178 Default value is "matrix".
2179 split_missing: An optional flag indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings;
2180 if False, then missing data is ignored (i.e., all states are 0).
2181 Default value is True.
2182 show_ext: An optional flag indicating whether each cell in a distance or similarity matrix
2183 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements.
2184 Only applicable for tabular output formats of type \"distance\" or \"similarity\".
2185 Default value is False.
2186 """
2187 # Convert the collation to a Pandas DataFrame first:
2188 df = self.to_dataframe(
2189 drop_constant=drop_constant,
2190 ambiguous_as_missing=ambiguous_as_missing,
2191 proportion=proportion,
2192 table_type=table_type,
2193 split_missing=split_missing,
2194 show_ext=show_ext,
2195 )
2196 # Generate all parent folders for this file that don't already exist:
2197 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
2198 # Proceed based on the table type:
2199 if table_type == TableType.long:
2200 return df.to_excel(file_addr, index=False)
2201 return df.to_excel(file_addr)
2203 def get_stemma_symbols(self):
2204 """Returns a list of one-character symbols needed to represent the states of all substantive readings in STEMMA format.
2206 The number of symbols equals the maximum number of substantive readings at any variation unit.
2208 Returns:
2209 A list of individual characters representing states in readings.
2210 """
2211 possible_symbols = (
2212 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase)
2213 ) # NOTE: the maximum number of symbols allowed in STEMMA format (other than "?" and "-") is 62
2214 # The number of symbols needed is equal to the length of the longest substantive reading vector:
2215 nsymbols = 0
2216 # If there are no witnesses, then no symbols are needed at all:
2217 if len(self.witnesses) == 0:
2218 return []
2219 wit_id = self.witnesses[0].id
2220 for rdg_support in self.readings_by_witness[wit_id]:
2221 nsymbols = max(nsymbols, len(rdg_support))
2222 stemma_symbols = possible_symbols[:nsymbols]
2223 return stemma_symbols
2225 def to_stemma(self, file_addr: Union[Path, str]):
2226 """Writes this Collation to a STEMMA file without an extension and a Chron file (containing low, middle, and high dates for all witnesses) without an extension.
2228 Since this format does not support ambiguous states, all reading vectors with anything other than one nonzero entry will be interpreted as lacunose.
2230 Args:
2231 file_addr: A string representing the path to an output STEMMA prep file; the file should have no extension.
2232 The accompanying chron file will match this file name, except that it will have "_chron" appended to the end.
2233 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading.
2234 """
2235 # Populate a list of sites that will correspond to columns of the sequence alignment
2236 # (by default, constant sites are dropped):
2237 substantive_variation_unit_ids = [
2238 vu_id for vu_id in self.variation_unit_ids if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
2239 ]
2240 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
2241 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
2242 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary
2243 # to the readings' texts:
2244 reading_texts_by_indices = {}
2245 for j, vu in enumerate(self.variation_units):
2246 if vu.id not in substantive_variation_unit_ids_set:
2247 continue
2248 k = 0
2249 for rdg in vu.readings:
2250 key = tuple([vu.id, rdg.id])
2251 if key not in substantive_variation_unit_reading_tuples_set:
2252 continue
2253 indices = tuple([j, k])
2254 reading_texts_by_indices[indices] = rdg.text
2255 k += 1
2256 # In a second pass, populate another dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary
2257 # to the witnesses exclusively supporting those readings:
2258 reading_wits_by_indices = {}
2259 for indices in reading_texts_by_indices:
2260 reading_wits_by_indices[indices] = []
2261 for i, wit in enumerate(self.witnesses):
2262 for j, vu_id in enumerate(self.variation_unit_ids):
2263 if vu_id not in substantive_variation_unit_ids_set:
2264 continue
2265 rdg_support = self.readings_by_witness[wit.id][j]
2266 # If this witness does not exclusively support exactly one reading at this unit, then treat it as lacunose:
2267 if len([k for k, w in enumerate(rdg_support) if w > 0]) != 1:
2268 continue
2269 k = rdg_support.index(1)
2270 indices = tuple([j, k])
2271 reading_wits_by_indices[indices].append(wit.id)
2272 # In a third pass, write to the STEMMA file:
2273 symbols = self.get_stemma_symbols()
2274 Path(file_addr).parent.mkdir(
2275 parents=True, exist_ok=True
2276 ) # generate all parent folders for this file that don't already exist
2277 chron_file_addr = str(file_addr) + "_chron"
2278 with open(file_addr, "w", encoding="utf-8") as f:
2279 # Start with the witness list:
2280 f.write("* %s ;\n\n" % " ".join([wit.id for wit in self.witnesses]))
2281 # f.write("^ %s\n\n" % chron_file_addr) #write the relative path to the chron file
2282 f.write(
2283 "^ %s\n\n" % ("." + os.sep + Path(chron_file_addr).name)
2284 ) # write the relative path to the chron file
2285 # Then add a line indicating that all witnesses are lacunose unless they are specified explicitly:
2286 f.write("= $? $* ;\n\n")
2287 # Then proceed for each variation unit:
2288 for j, vu_id in enumerate(self.variation_unit_ids):
2289 if vu_id not in substantive_variation_unit_ids_set:
2290 continue
2291 # Print the variation unit ID first:
2292 f.write("@ %s\n" % vu_id)
2293 # In a first pass, print the texts of all readings enclosed in brackets:
2294 f.write("[ ")
2295 k = 0
2296 while True:
2297 indices = tuple([j, k])
2298 if indices not in reading_texts_by_indices:
2299 break
2300 text = slugify(
2301 reading_texts_by_indices[indices], lowercase=False, allow_unicode=True, separator='.'
2302 )
2303 # Denote omissions by en-dashes:
2304 if text == "":
2305 text = "\u2013"
2306 # The first reading should not be preceded by anything:
2307 if k == 0:
2308 f.write(text)
2309 f.write(" |")
2310 # Every subsequent reading should be preceded by a space:
2311 elif k > 0:
2312 f.write(" %s" % text)
2313 k += 1
2314 f.write(" ]\n")
2315 # In a second pass, print the indices and witnesses for all readings enclosed in angle brackets:
2316 k = 0
2317 f.write("\t< ")
2318 while True:
2319 indices = tuple([j, k])
2320 if indices not in reading_wits_by_indices:
2321 break
2322 rdg_symbol = symbols[k] # get the one-character alphanumeric code for this state
2323 wits = " ".join(reading_wits_by_indices[indices])
2324 # Open the variant reading support block with an angle bracket:
2325 if k == 0:
2326 f.write("%s %s" % (rdg_symbol, wits))
2327 # Open all subsequent variant reading support blocks with pipes on the next line:
2328 else:
2329 f.write("\n\t| %s %s" % (rdg_symbol, wits))
2330 k += 1
2331 f.write(" >\n")
2332 # In a fourth pass, write to the chron file:
2333 max_id_length = max(
2334 [len(slugify(wit.id, lowercase=False, allow_unicode=True, separator='_')) for wit in self.witnesses]
2335 )
2336 max_date_length = 0
2337 for wit in self.witnesses:
2338 if wit.date_range[0] is not None:
2339 max_date_length = max(max_date_length, len(str(wit.date_range[0])))
2340 if wit.date_range[1] is not None:
2341 max_date_length = max(max_date_length, len(str(wit.date_range[1])))
2342 # Attempt to get the minimum and maximum dates for witnesses; if we can't do this, then don't write a chron file:
2343 min_date = None
2344 max_date = None
2345 try:
2346 min_date = min([wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None])
2347 max_date = max([wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None])
2348 except Exception as e:
2349 print("WARNING: no witnesses have date ranges; no chron file will be written!")
2350 return
2351 with open(chron_file_addr, "w", encoding="utf-8") as f:
2352 for wit in self.witnesses:
2353 wit_label = slugify(wit.id, lowercase=False, allow_unicode=True, separator='_')
2354 f.write(wit_label)
2355 f.write(" " * (max_id_length - len(wit.id) + 1))
2356 # If either the lower bound on this witness's date is empty, then use the min and max dates over all witnesses as defaults:
2357 date_range = wit.date_range
2358 if date_range[0] is None:
2359 date_range = tuple([min_date, date_range[1]])
2360 # Then write the date range minimum, average, and maximum to the chron file:
2361 low_date = str(date_range[0])
2362 f.write(" " * (max_date_length - len(low_date) + 2))
2363 f.write(low_date)
2364 avg_date = str(int(((date_range[0] + date_range[1]) / 2)))
2365 f.write(" " * (max_date_length - len(str(avg_date)) + 2))
2366 f.write(avg_date)
2367 high_date = str(date_range[1])
2368 f.write(" " * (max_date_length - len(high_date) + 2))
2369 f.write(high_date)
2370 f.write("\n")
2371 return
2373 def to_file(
2374 self,
2375 file_addr: Union[Path, str],
2376 format: Format = None,
2377 drop_constant: bool = False,
2378 split_missing: bool = True,
2379 char_state_labels: bool = True,
2380 frequency: bool = False,
2381 ambiguous_as_missing: bool = False,
2382 proportion: bool = False,
2383 calibrate_dates: bool = False,
2384 mrbayes: bool = False,
2385 clock_model: ClockModel = ClockModel.strict,
2386 ancestral_logger: AncestralLogger = AncestralLogger.state,
2387 table_type: TableType = TableType.matrix,
2388 show_ext: bool = False,
2389 seed: int = None,
2390 ):
2391 """Writes this Collation to the file with the given address.
2393 Args:
2394 file_addr (Union[Path, str]): The path to the output file.
2395 format (Format, optional): The desired output format.
2396 If None then it is infered from the file suffix.
2397 Defaults to None.
2398 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
2399 Default value is False.
2400 split_missing (bool, optional): An optional flag indicating whether to treat
2401 missing characters/variation units as having a contribution of 1 split over all states/readings;
2402 if False, then missing data is ignored (i.e., all states are 0).
2403 Not applicable for NEXUS, HENNIG86, PHYLIP, FASTA, or STEMMA format.
2404 Default value is True.
2405 char_state_labels (bool, optional): An optional flag indicating whether to print
2406 the CharStateLabels block in NEXUS output.
2407 Default value is True.
2408 frequency (bool, optional): An optional flag indicating whether to use the StatesFormat=Frequency setting
2409 instead of the StatesFormat=StatesPresent setting
2410 (and thus represent all states with frequency vectors rather than symbols)
2411 in NEXUS output.
2412 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation.
2413 Default value is False.
2414 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data.
2415 If this flag is set, then only base symbols will be generated for the NEXUS file.
2416 It is only applied if the frequency option is False.
2417 Default value is False.
2418 proportion (bool, optional): An optional flag indicating whether to populate a distance matrix's cells
2419 with a proportion of disagreements to variation units where both witnesses are extant.
2420 It is only applied if the table_type option is "distance".
2421 Default value is False.
2422 calibrate_dates (bool, optional): An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses
2423 in NEXUS output.
2424 This option is intended for inputs to BEAST 2.
2425 Default value is False.
2426 mrbayes (bool, optional): An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses
2427 in NEXUS output.
2428 This option is intended for inputs to MrBayes.
2429 Default value is False.
2430 clock_model (ClockModel, optional): A ClockModel option indicating which type of clock model to use.
2431 This option is intended for inputs to MrBayes and BEAST 2.
2432 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified.
2433 Default value is "strict".
2434 ancestral_logger (AncestralLogger, optional): An AncestralLogger option indicating which class of logger (if any) to use for ancestral states.
2435 This option is intended for inputs to BEAST 2.
2436 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate.
2437 Only applicable for tabular outputs.
2438 Default value is "matrix".
2439 show_ext (bool, optional): An optional flag indicating whether each cell in a distance or similarity matrix
2440 should include the number of variation units where both witnesses are extant after the number of their disagreements/agreements.
2441 Only applicable for tabular output formats of type \"distance\" or \"similarity\".
2442 Default value is False.
2443 seed (optional, int): A seed for random number generation (for setting initial values of unspecified transcriptional rates in BEAST 2 XML output).
2444 """
2445 file_addr = Path(file_addr)
2446 format = format or Format.infer(
2447 file_addr.suffix
2448 ) # an exception will be raised here if the format or suffix is invalid
2450 if format == Format.NEXUS:
2451 return self.to_nexus(
2452 file_addr,
2453 drop_constant=drop_constant,
2454 char_state_labels=char_state_labels,
2455 frequency=frequency,
2456 ambiguous_as_missing=ambiguous_as_missing,
2457 calibrate_dates=calibrate_dates,
2458 mrbayes=mrbayes,
2459 clock_model=clock_model,
2460 )
2462 if format == format.HENNIG86:
2463 return self.to_hennig86(file_addr, drop_constant=drop_constant)
2465 if format == format.PHYLIP:
2466 return self.to_phylip(file_addr, drop_constant=drop_constant)
2468 if format == format.FASTA:
2469 return self.to_fasta(file_addr, drop_constant=drop_constant)
2471 if format == format.BEAST:
2472 return self.to_beast(
2473 file_addr,
2474 drop_constant=drop_constant,
2475 clock_model=clock_model,
2476 ancestral_logger=ancestral_logger,
2477 seed=seed,
2478 )
2480 if format == Format.CSV:
2481 return self.to_csv(
2482 file_addr,
2483 drop_constant=drop_constant,
2484 ambiguous_as_missing=ambiguous_as_missing,
2485 proportion=proportion,
2486 table_type=table_type,
2487 split_missing=split_missing,
2488 show_ext=show_ext,
2489 )
2491 if format == Format.TSV:
2492 return self.to_csv(
2493 file_addr,
2494 drop_constant=drop_constant,
2495 ambiguous_as_missing=ambiguous_as_missing,
2496 proportion=proportion,
2497 table_type=table_type,
2498 split_missing=split_missing,
2499 show_ext=show_ext,
2500 sep="\t",
2501 )
2503 if format == Format.EXCEL:
2504 return self.to_excel(
2505 file_addr,
2506 drop_constant=drop_constant,
2507 ambiguous_as_missing=ambiguous_as_missing,
2508 proportion=proportion,
2509 table_type=table_type,
2510 split_missing=split_missing,
2511 show_ext=show_ext,
2512 )
2514 if format == Format.STEMMA:
2515 return self.to_stemma(file_addr)