Coverage for teiphy/collation.py: 100.00%
1658 statements
« prev ^ index » next coverage.py v7.9.2, created at 2026-05-07 02:39 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2026-05-07 02:39 +0000
1#!/usr/bin/env python3
3from enum import Enum
4from typing import List, Union
5import os
6from pathlib import Path
7from datetime import datetime # for calculating the current year (for dating and tree height purposes)
8import math # for special functions
9import time # to time calculations for users
10import string # for easy retrieval of character ranges
11from lxml import etree as et # for reading TEI XML inputs
12import numpy as np # for random number sampling and collation matrix outputs
13import pandas as pd # for writing to DataFrames, CSV, Excel, etc.
14from slugify import slugify # for converting Unicode text from readings to ASCII for NEXUS
15from jinja2 import Environment, PackageLoader, select_autoescape # for filling output XML templates
16from tqdm import tqdm # for progress bars
18from .common import xml_ns, tei_ns
19from .format import Format
20from .witness import Witness
21from .variation_unit import VariationUnit
24class ParsingException(Exception):
25 pass
28class WitnessDateException(Exception):
29 pass
32class IntrinsicRelationsException(Exception):
33 pass
36class ClockModel(str, Enum):
37 strict = "strict"
38 uncorrelated = "uncorrelated"
39 local = "local"
42class AncestralLogger(str, Enum):
43 state = "state"
44 sequence = "sequence"
45 none = "none"
48class TableType(str, Enum):
49 matrix = "matrix"
50 distance = "distance"
51 similarity = "similarity"
52 idf = "idf"
53 mean_idf = "mean-idf"
54 mi = "mi"
55 mean_mi = "mean-mi"
56 nexus = "nexus"
57 long = "long"
60class SplitMissingType(str, Enum):
61 uniform = "uniform"
62 proportional = "proportional"
65class TransformMatrixType(str, Enum):
66 stddev = "stddev"
67 mad = "mad"
70class Collation:
71 """Base class for storing TEI XML collation data internally.
73 This corresponds to the entire XML tree, rooted at the TEI element of the collation.
75 Attributes:
76 manuscript_suffixes: A list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses.
77 trivial_reading_types: A set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading.
78 missing_reading_types: A set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data.
79 fill_corrector_lacunae: A boolean flag indicating whether or not to fill "lacunae" in witnesses with type "corrector".
80 fragmentary_threshold: A float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation.
81 fill_correctors_threshold: A float representing the proportion such that all correctors extant at fewer than this proportion of variation units are not filled in.
82 witnesses: A list of Witness instances contained in this Collation.
83 witness_index_by_id: A dictionary mapping base witness ID strings to their int indices in the witnesses list.
84 variation_units: A list of VariationUnit instances contained in this Collation.
85 readings_by_witness: A dictionary mapping base witness ID strings to lists of reading support coefficients for all units (with at least two substantive readings).
86 substantive_variation_unit_ids: A list of ID strings for variation units with two or more substantive readings.
87 substantive_variation_unit_reading_tuples: A list of (variation unit ID, reading ID) tuples for substantive readings.
88 verbose: A boolean flag indicating whether or not to print timing and debugging details for the user.
89 """
91 def __init__(
92 self,
93 xml: et.ElementTree,
94 manuscript_suffixes: List[str] = [],
95 trivial_reading_types: List[str] = [],
96 missing_reading_types: List[str] = [],
97 fill_corrector_lacunae: bool = False,
98 fragmentary_threshold: float = None,
99 fill_correctors_threshold: float = None,
100 dates_file: Union[Path, str] = None,
101 verbose: bool = False,
102 ):
103 """Constructs a new Collation instance with the given settings.
105 Args:
106 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
107 manuscript_suffixes: An optional list of suffixes used to distinguish manuscript subwitnesses like first hands, correctors, main texts, alternate texts, and multiple attestations from their base witnesses.
108 trivial_reading_types: An optional set of reading types (e.g., "reconstructed", "defective", "orthographic", "subreading") whose readings should be collapsed under the previous substantive reading.
109 missing_reading_types: An optional set of reading types (e.g., "lac", "overlap") whose readings should be treated as missing data.
110 fill_corrector_lacunae: An optional flag indicating whether or not to fill "lacunae" in witnesses with type "corrector".
111 fragmentary_threshold: An optional float representing the proportion such that all witnesses extant at fewer than this proportion of variation units are filtered out of the collation.
112 fill_correctors_threshold: An optional float representing the proportion such that all correctors extant at fewer than this proportion of variation units are not filled in.
113 dates_file: An optional path to a CSV file containing witness IDs, minimum dates, and maximum dates. If specified, then for all witnesses in the first column, any existing date ranges for them in the TEI XML collation will be ignored.
114 verbose: An optional flag indicating whether or not to print timing and debugging details for the user.
115 """
116 self.manuscript_suffixes = manuscript_suffixes
117 self.trivial_reading_types = set(trivial_reading_types)
118 self.missing_reading_types = set(missing_reading_types)
119 self.fill_corrector_lacunae = fill_corrector_lacunae
120 self.fragmentary_threshold = fragmentary_threshold
121 self.fill_correctors_threshold = fill_correctors_threshold
122 self.verbose = verbose
123 self.witnesses = []
124 self.witness_index_by_id = {}
125 self.variation_units = []
126 self.readings_by_witness = {}
127 self.variation_unit_ids = []
128 self.substantive_variation_unit_reading_tuples = []
129 self.substantive_readings_by_variation_unit_id = {}
130 self.weight_categories = []
131 self.weights_by_id = {}
132 self.intrinsic_categories = []
133 self.intrinsic_odds_by_id = {}
134 self.transcriptional_categories = []
135 self.transcriptional_rates_by_id = {}
136 self.origin_date_range = []
137 # Now parse the XML tree to populate these data structures:
138 if self.verbose:
139 print("Initializing collation...")
140 t0 = time.time()
141 self.parse_origin_date_range(xml)
142 self.parse_list_wit(xml)
143 self.validate_wits(xml)
144 # If a dates file was specified, then update the witness date ranges manually:
145 if dates_file is not None:
146 self.update_witness_date_ranges_from_dates_file(dates_file)
147 # If the upper bound on a work's date of origin is not defined, then attempt to assign it an upper bound based on the witness dates;
148 # otherwise, attempt to assign lower bounds to witness dates based on it:
149 if self.origin_date_range[1] is None:
150 self.update_origin_date_range_from_witness_date_ranges()
151 else:
152 self.update_witness_date_ranges_from_origin_date_range()
153 self.parse_weights(xml)
154 self.parse_intrinsic_odds(xml)
155 self.parse_transcriptional_rates(xml)
156 self.parse_apps(xml)
157 self.validate_intrinsic_relations()
158 self.parse_readings_by_witness()
159 # If a threshold of readings for fragmentary witnesses is specified, then filter the witness list using the dictionary mapping witness IDs to readings:
160 if self.fragmentary_threshold is not None:
161 self.filter_fragmentary_witnesses(xml)
162 t1 = time.time()
163 if self.verbose:
164 print("Total time to initialize collation: %0.4fs." % (t1 - t0))
166 def parse_origin_date_range(self, xml: et.ElementTree):
167 """Given an XML tree for a collation, populates this Collation's list of origin date bounds.
169 Args:
170 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
171 """
172 if self.verbose:
173 print("Parsing origin date range...")
174 t0 = time.time()
175 self.origin_date_range = [None, None]
176 for date in xml.xpath(
177 "//tei:sourceDesc//tei:bibl//tei:date|//tei:sourceDesc//tei:biblStruct//tei:date|//tei:sourceDesc//tei:biblFull//tei:date",
178 namespaces={"tei": tei_ns},
179 ):
180 # Try the @when attribute first; if it is set, then it accounts for both ends of the date range:
181 if date.get("when") is not None:
182 self.origin_date_range[0] = int(date.get("when").split("-")[0])
183 self.origin_date_range[1] = self.origin_date_range[0]
184 # Failing that, if it has @from and @to attributes (indicating the period over which the work was completed),
185 # then the completion date of the work accounts for both ends of the date range:
186 elif date.get("to") is not None:
187 self.origin_date_range[0] = int(date.get("to").split("-")[0])
188 self.origin_date_range[1] = self.origin_date_range[0]
189 # Failing that, set lower and upper bounds on the origin date using the the @notBefore and @notAfter attributes:
190 elif date.get("notBefore") is not None or date.get("notAfter") is not None:
191 if date.get("notBefore") is not None:
192 self.origin_date_range[0] = int(date.get("notBefore").split("-")[0])
193 if date.get("notAfter") is not None:
194 self.origin_date_range[1] = int(date.get("notAfter").split("-")[0])
195 return
197 def get_base_wit(self, wit: str):
198 """Given a witness siglum, strips of the specified manuscript suffixes until the siglum matches one in the witness list or until no more suffixes can be stripped.
200 Args:
201 wit: A string representing a witness siglum, potentially including suffixes to be stripped.
202 """
203 base_wit = wit
204 # If our starting siglum corresponds to a siglum in the witness list, then just return it:
205 if base_wit in self.witness_index_by_id:
206 return base_wit
207 # Otherwise, strip any suffixes we find until the siglum corresponds to a base witness in the list
208 # or no more suffixes can be stripped:
209 suffix_found = True
210 while suffix_found:
211 suffix_found = False
212 for suffix in self.manuscript_suffixes:
213 if base_wit.endswith(suffix):
214 suffix_found = True
215 base_wit = base_wit[: -len(suffix)]
216 break # stop looking for other suffixes
217 # If the siglum stripped of this suffix now corresponds to a siglum in the witness list, then return it:
218 if base_wit in self.witness_index_by_id:
219 return base_wit
220 # If we get here, then all possible manuscript suffixes have been stripped, and the resulting siglum does not correspond to a siglum in the witness list:
221 return base_wit
223 def parse_list_wit(self, xml: et.ElementTree):
224 """Given an XML tree for a collation, populates its list of witnesses from its listWit element.
225 If the XML tree does not contain a listWit element, then a ParsingException is thrown listing all distinct witness sigla encountered in the collation.
227 Args:
228 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
229 """
230 if self.verbose:
231 print("Parsing witness list...")
232 t0 = time.time()
233 self.witnesses = []
234 self.witness_index_by_id = {}
235 list_wits = xml.xpath("/tei:TEI//tei:listWit", namespaces={"tei": tei_ns})
236 if len(list_wits) == 0:
237 # There is no listWit element: collect all distinct witness sigla in the collation and raise a ParsingException listing them:
238 distinct_sigla = set()
239 sigla = []
240 # Proceed for each rdg, rdgGrp, or witDetail element:
241 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}):
242 wit_str = rdg.get("wit") if rdg.get("wit") is not None else ""
243 wits = wit_str.split()
244 for wit in wits:
245 siglum = wit.strip("#") # remove the URI prefix, if there is one
246 if siglum not in distinct_sigla:
247 distinct_sigla.add(siglum)
248 sigla.append(siglum)
249 sigla.sort()
250 msg = ""
251 msg += "An explicit listWit element must be included in the TEI XML collation.\n"
252 msg += "The following sigla occur in the collation and should be included as the @xml:id or @n attributes of witness elements under the listWit element:\n"
253 msg += ", ".join(sigla)
254 raise ParsingException(msg)
255 # Otherwise, take the first listWit element as the list of all witnesses and process it:
256 list_wit = list_wits[0]
257 for witness in list_wit.xpath("./tei:witness", namespaces={"tei": tei_ns}):
258 wit = Witness(witness, self.verbose)
259 self.witness_index_by_id[wit.id] = len(self.witnesses)
260 self.witnesses.append(wit)
261 t1 = time.time()
262 if self.verbose:
263 print("Finished processing %d witnesses in %0.4fs." % (len(self.witnesses), t1 - t0))
264 return
266 def validate_wits(self, xml: et.ElementTree):
267 """Given an XML tree for a collation, checks if any witness sigla listed in a rdg, rdgGrp, or witDetail element,
268 once stripped of ignored suffixes, is not found in the witness list.
269 A warning will be issued for each distinct siglum like this.
270 This method also checks if the upper bound of any witness's date is earlier than the lower bound on the collated work's date of origin
271 and throws an exception if so.
273 Args:
274 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
275 """
276 if self.verbose:
277 print("Validating witness list against collation...")
278 t0 = time.time()
279 # There is no listWit element: collect all distinct witness sigla in the collation and raise an exception listing them:
280 distinct_extra_sigla = set()
281 extra_sigla = []
282 # Proceed for each rdg, rdgGrp, or witDetail element:
283 for rdg in xml.xpath("//tei:rdg|//tei:rdgGrp|//tei:witDetail", namespaces={"tei": tei_ns}):
284 wit_str = rdg.get("wit") if rdg.get("wit") is not None else ""
285 wits = wit_str.split()
286 for wit in wits:
287 siglum = wit.strip("#") # remove the URI prefix, if there is one
288 base_siglum = self.get_base_wit(siglum)
289 if base_siglum not in self.witness_index_by_id:
290 if base_siglum not in distinct_extra_sigla:
291 distinct_extra_sigla.add(base_siglum)
292 extra_sigla.append(base_siglum)
293 if len(extra_sigla) > 0:
294 extra_sigla.sort()
295 msg = ""
296 msg += "WARNING: The following sigla occur in the collation that do not have corresponding witness entries in the listWit:\n"
297 msg += ", ".join(extra_sigla)
298 print(msg)
299 # If the lower bound on the date of origin is defined, then check each witness against it:
300 if self.origin_date_range[0] is not None:
301 bad_date_witness_sigla = []
302 bad_date_upper_bounds_by_witness = {}
303 for i, wit in enumerate(self.witnesses):
304 if wit.date_range[1] is not None and wit.date_range[1] < self.origin_date_range[0]:
305 bad_date_witness_sigla.append(wit.id)
306 bad_date_upper_bounds_by_witness[wit.id] = wit.date_range[1]
307 if len(bad_date_witness_sigla) > 0:
308 msg = ""
309 msg += "The following witnesses have their latest possible dates before the earliest date of origin %d specified for the collated work:\n"
310 msg += ", ".join(
311 [
312 (siglum + "(" + str(bad_date_upper_bounds_by_witness[siglum]) + ")")
313 for siglum in bad_date_witness_sigla
314 ]
315 )
316 raise WitnessDateException(msg)
317 t1 = time.time()
318 if self.verbose:
319 print("Finished witness validation in %0.4fs." % (t1 - t0))
320 return
322 def update_witness_date_ranges_from_dates_file(self, dates_file: Union[Path, str]):
323 """Given a CSV-formatted dates file, update the date ranges of all witnesses whose IDs are in the first column of the dates file
324 (overwriting existing date ranges if necessary).
325 """
326 if self.verbose:
327 print("Updating witness dates from file %s..." % (str(dates_file)))
328 t0 = time.time()
329 dates_df = pd.read_csv(dates_file, index_col=0, names=["id", "min", "max"])
330 for witness in self.witnesses:
331 wit_id = witness.id
332 if wit_id in dates_df.index:
333 # For every witness in the list whose ID is specified in the dates file,
334 # update their date ranges (as long as the date ranges in the file are are well-formed):
335 min_date = int(dates_df.loc[wit_id]["min"]) if not np.isnan(dates_df.loc[wit_id]["min"]) else None
336 max_date = (
337 int(dates_df.loc[wit_id]["max"])
338 if not np.isnan(dates_df.loc[wit_id]["max"])
339 else datetime.now().year
340 )
341 print(min_date, max_date)
342 if min_date is not None and max_date is not None and min_date > max_date:
343 raise ParsingException(
344 "In dates file %s, for witness ID %s, the minimum date %d is greater than the maximum date %d."
345 % (str(dates_file), wit_id, min_date, max_date)
346 )
347 witness.date_range = [min_date, max_date]
348 t1 = time.time()
349 if self.verbose:
350 print("Finished witness date range updates in %0.4fs." % (t1 - t0))
351 return
353 def update_origin_date_range_from_witness_date_ranges(self):
354 """Conditionally updates the upper bound on the date of origin of the work represented by this Collation
355 based on the bounds on the witnesses' dates.
356 If none of the witnesses have bounds on their dates, then nothing is done.
357 This method is only invoked if the work's date of origin does not already have its upper bound defined.
358 """
359 if self.verbose:
360 print("Updating upper bound on origin date using witness dates...")
361 t0 = time.time()
362 # Set the origin date to the earliest witness date:
363 witness_date_lower_bounds = [wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None]
364 witness_date_upper_bounds = [wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None]
365 min_witness_date = (
366 min(witness_date_lower_bounds + witness_date_upper_bounds)
367 if len(witness_date_lower_bounds + witness_date_upper_bounds) > 0
368 else None
369 )
370 if min_witness_date is not None:
371 self.origin_date_range[1] = (
372 min(self.origin_date_range[1], min_witness_date)
373 if self.origin_date_range[1] is not None
374 else min_witness_date
375 )
376 t1 = time.time()
377 if self.verbose:
378 print("Finished updating upper bound on origin date in %0.4fs." % (t1 - t0))
379 return
381 def update_witness_date_ranges_from_origin_date_range(self):
382 """Attempts to update the lower bounds on the witnesses' dates of origin of the work represented by this Collation
383 using the upper bound on the date of origin of the work represented by this Collation.
384 This method is only invoked if the upper bound on the work's date of origin was not already defined
385 (i.e., if update_origin_date_range_from_witness_date_ranges was not invoked).
386 """
387 if self.verbose:
388 print("Updating lower bounds on witness dates using origin date...")
389 t0 = time.time()
390 # Proceed for every witness:
391 for i, wit in enumerate(self.witnesses):
392 # Ensure that the lower bound on this witness's date is no earlier than the upper bound on the date of the work's origin:
393 wit.date_range[0] = (
394 max(wit.date_range[0], self.origin_date_range[1])
395 if wit.date_range[0] is not None
396 else self.origin_date_range[1]
397 )
398 # Then ensure that the upper bound on this witness's date is no earlier than its lower bound, in case we updated it:
399 wit.date_range[1] = max(wit.date_range[0], wit.date_range[1])
400 t1 = time.time()
401 if self.verbose:
402 print("Finished updating lower bounds on witness dates in %0.4fs." % (t1 - t0))
403 return
405 def parse_weights(self, xml: et.ElementTree):
406 """Given an XML tree for a collation, populates this Collation's list of variation unit weight categories
407 (associated with types of variation that may have different expected frequencies)
408 and its dictionary mapping these categories to integer weights.
410 Args:
411 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
412 """
413 if self.verbose:
414 print("Parsing variation unit weight categories...")
415 t0 = time.time()
416 self.weight_categories = []
417 self.weights_by_id = {}
418 for interp in xml.xpath("//tei:interpGrp[@type=\"weight\"]/tei:interp", namespaces={"tei": tei_ns}):
419 # These must be indexed by the xml:id attribute, so skip any that do not have one:
420 if interp.get("{%s}id" % xml_ns) is None:
421 continue
422 weight_category = interp.get("{%s}id" % xml_ns)
423 # Retrieve this element's text value and coerce it to an integer, defaulting to 1 if it has none:
424 weight = 1
425 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}):
426 if certainty.get("degree") is not None:
427 weight = int(certainty.get("degree"))
428 break
429 self.weight_categories.append(weight_category)
430 self.weights_by_id[weight_category] = weight
431 t1 = time.time()
432 if self.verbose:
433 print(
434 "Finished processing %d variation unit weight categories in %0.4fs."
435 % (len(self.weight_categories), t1 - t0)
436 )
437 return
439 def parse_intrinsic_odds(self, xml: et.ElementTree):
440 """Given an XML tree for a collation, populates this Collation's list of intrinsic probability categories
441 (e.g., "absolutely more likely," "highly more likely," "more likely," "slightly more likely," "equally likely")
442 and its dictionary mapping these categories to numerical odds.
443 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated.
445 Args:
446 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
447 """
448 if self.verbose:
449 print("Parsing intrinsic odds categories...")
450 t0 = time.time()
451 self.intrinsic_categories = []
452 self.intrinsic_odds_by_id = {}
453 for interp in xml.xpath("//tei:interpGrp[@type=\"intrinsic\"]/tei:interp", namespaces={"tei": tei_ns}):
454 # These must be indexed by the xml:id attribute, so skip any that do not have one:
455 if interp.get("{%s}id" % xml_ns) is None:
456 continue
457 odds_category = interp.get("{%s}id" % xml_ns)
458 # If this element contains a certainty subelement with a fixed odds value for this category, then set it:
459 odds = None
460 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}):
461 if certainty.get("degree") is not None:
462 odds = float(certainty.get("degree"))
463 break
464 self.intrinsic_categories.append(odds_category)
465 self.intrinsic_odds_by_id[odds_category] = odds
466 t1 = time.time()
467 if self.verbose:
468 print(
469 "Finished processing %d intrinsic odds categories in %0.4fs."
470 % (len(self.intrinsic_categories), t1 - t0)
471 )
472 return
474 def parse_transcriptional_rates(self, xml: et.ElementTree):
475 """Given an XML tree for a collation, populates this Collation's dictionary mapping transcriptional change categories
476 (e.g., "aural confusion," "visual error," "clarification") to numerical rates.
477 If a category does not contain a certainty element specifying its number, then it will be assumed to be a parameter to be estimated.
479 Args:
480 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
481 """
482 if self.verbose:
483 print("Parsing transcriptional change categories...")
484 t0 = time.time()
485 self.transcriptional_categories = []
486 self.transcriptional_rates_by_id = {}
487 for interp in xml.xpath("//tei:interpGrp[@type=\"transcriptional\"]/tei:interp", namespaces={"tei": tei_ns}):
488 # These must be indexed by the xml:id attribute, so skip any that do not have one:
489 if interp.get("{%s}id" % xml_ns) is None:
490 continue
491 transcriptional_category = interp.get("{%s}id" % xml_ns)
492 # If this element contains a certainty subelement with a fixed rate for this category, then set it:
493 rate = None
494 for certainty in interp.xpath("./tei:certainty", namespaces={"tei": tei_ns}):
495 if certainty.get("degree") is not None:
496 rate = float(certainty.get("degree"))
497 break
498 self.transcriptional_categories.append(transcriptional_category)
499 self.transcriptional_rates_by_id[transcriptional_category] = rate
500 t1 = time.time()
501 if self.verbose:
502 print(
503 "Finished processing %d transcriptional change categories in %0.4fs."
504 % (len(self.transcriptional_rates_by_id), t1 - t0)
505 )
506 return
508 def validate_intrinsic_relations(self):
509 """Checks if any VariationUnit's intrinsic_relations map is not a forest.
510 If any is not, then an IntrinsicRelationsException is thrown describing the VariationUnit at fault.
511 """
512 if self.verbose:
513 print("Validating intrinsic relation graphs for variation units...")
514 t0 = time.time()
515 for vu in self.variation_units:
516 # Skip any variation units with an empty intrinsic_relations map:
517 if len(vu.intrinsic_relations) == 0:
518 continue
519 # For all others, start by identifying all reading IDs that are not related to by some other reading ID:
520 in_degree_by_reading = {}
521 for edge in vu.intrinsic_relations:
522 s = edge[0]
523 t = edge[1]
524 if s not in in_degree_by_reading:
525 in_degree_by_reading[s] = 0
526 if t not in in_degree_by_reading:
527 in_degree_by_reading[t] = 0
528 in_degree_by_reading[t] += 1
529 # If any reading has more than one relation pointing to it, then the intrinsic relations graph is not a forest:
530 excessive_in_degree_readings = [
531 rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] > 1
532 ]
533 if len(excessive_in_degree_readings) > 0:
534 msg = ""
535 msg += (
536 "In variation unit %s, the following readings have more than one intrinsic relation pointing to them: %s.\n"
537 % (vu.id, ", ".join(excessive_in_degree_readings))
538 )
539 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it."
540 raise IntrinsicRelationsException(msg)
541 # If every reading has another reading pointing to it, then the intrinsic relations graph contains a cycle and is not a forest:
542 starting_nodes = [rdg_id for rdg_id in in_degree_by_reading if in_degree_by_reading[rdg_id] == 0]
543 if len(starting_nodes) == 0:
544 msg = ""
545 msg += "In variation unit %s, the intrinsic relations contain a cycle.\n" % vu.id
546 msg += "Please ensure that at least one reading has no relations pointing to it and that every reading has no more than one relation pointing to it."
547 raise IntrinsicRelationsException(msg)
548 t1 = time.time()
549 if self.verbose:
550 print("Finished intrinsic relations validation in %0.4fs." % (t1 - t0))
551 return
553 def parse_apps(self, xml: et.ElementTree):
554 """Given an XML tree for a collation, populates its list of variation units from its app elements.
556 Args:
557 xml: An lxml.etree.ElementTree representing an XML tree rooted at a TEI element.
558 """
559 if self.verbose:
560 print("Parsing variation units...")
561 t0 = time.time()
562 for a in xml.xpath('//tei:app', namespaces={'tei': tei_ns}):
563 vu = VariationUnit(a, self.verbose)
564 self.variation_units.append(vu)
565 t1 = time.time()
566 if self.verbose:
567 print("Finished processing %d variation units in %0.4fs." % (len(self.variation_units), t1 - t0))
568 return
570 def get_readings_by_witness_for_unit(self, vu: VariationUnit):
571 """Returns a dictionary mapping witness IDs to a list of their reading coefficients for a given variation unit.
573 Args:
574 vu: A VariationUnit to be processed.
576 Returns:
577 A dictionary mapping witness ID strings to a list of their coefficients for all substantive readings in this VariationUnit.
578 """
579 # In a first pass, populate lists of substantive (variation unit ID, reading ID) tuples and reading labels
580 # and a map from reading IDs to the indices of their parent substantive reading in this unit:
581 reading_id_to_index = {}
582 self.substantive_readings_by_variation_unit_id[vu.id] = []
583 for rdg in vu.readings:
584 # If this reading is missing (e.g., lacunose or inapplicable due to an overlapping variant) or targets another reading, then skip it:
585 if rdg.type in self.missing_reading_types or len(rdg.certainties) > 0:
586 continue
587 # If this reading is trivial, then map it to the last substantive index:
588 if rdg.type in self.trivial_reading_types:
589 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1
590 continue
591 # Otherwise, the reading is substantive: add it to the map and update the last substantive index:
592 self.substantive_readings_by_variation_unit_id[vu.id].append(rdg.id)
593 self.substantive_variation_unit_reading_tuples.append(tuple([vu.id, rdg.id]))
594 reading_id_to_index[rdg.id] = len(self.substantive_readings_by_variation_unit_id[vu.id]) - 1
595 # If the list of substantive readings only contains one entry, then this variation unit is not informative;
596 # return an empty dictionary and add nothing to the list of substantive reading labels:
597 if self.verbose:
598 print(
599 "Variation unit %s has %d substantive readings."
600 % (vu.id, len(self.substantive_readings_by_variation_unit_id[vu.id]))
601 )
602 readings_by_witness_for_unit = {}
603 # Initialize the output dictionary with empty sets for all base witnesses:
604 for wit in self.witnesses:
605 readings_by_witness_for_unit[wit.id] = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id])
606 # In a second pass, assign each base witness a set containing the readings it supports in this unit:
607 for rdg in vu.readings:
608 # Initialize the dictionary indicating support for this reading (or its disambiguations):
609 rdg_support = [0] * len(self.substantive_readings_by_variation_unit_id[vu.id])
610 # If this is a missing reading (e.g., a lacuna or an overlap), then we can skip it, as its corresponding set will be empty:
611 if rdg.type in self.missing_reading_types:
612 continue
613 # Otherwise, if this reading is trivial, then it will contain an entry for the index of its parent substantive reading:
614 elif rdg.type in self.trivial_reading_types:
615 rdg_support[reading_id_to_index[rdg.id]] = 1
616 # Otherwise, if this reading has one or more nonzero certainty degrees,
617 # then set the entries for these readings to their degrees:
618 elif sum(rdg.certainties.values()) > 0:
619 for t in rdg.certainties:
620 # Skip any reading whose ID is unrecognized in this unit:
621 if t in reading_id_to_index:
622 rdg_support[reading_id_to_index[t]] = rdg.certainties[t]
623 # Otherwise, if this reading has one or more targets (i.e., if it is an ambiguous reading),
624 # then set the entries for each of its targets to 1:
625 elif len(rdg.targets) > 0:
626 for t in rdg.targets:
627 # Skip any reading whose ID is unrecognized in this unit:
628 if t in reading_id_to_index:
629 rdg_support[reading_id_to_index[t]] = 1
630 # Otherwise, this reading is itself substantive; set the entry for the index of this reading to 1:
631 else:
632 rdg_support[reading_id_to_index[rdg.id]] = 1
633 # Proceed for each witness siglum in the support for this reading:
634 for wit in rdg.wits:
635 # Is this siglum a base siglum?
636 base_wit = self.get_base_wit(wit)
637 if base_wit not in self.witness_index_by_id:
638 # If it is not, then it is probably just because we've encountered a corrector or some other secondary witness not included in the witness list;
639 # report this if we're in verbose mode and move on:
640 if self.verbose:
641 print(
642 "Skipping unknown witness siglum %s (base siglum %s) in variation unit %s, reading %s..."
643 % (wit, base_wit, vu.id, rdg.id)
644 )
645 continue
646 # If we've found a base siglum, then add this reading's contribution to the base witness's reading set for this unit;
647 # normally the existing set will be empty, but if we reduce two suffixed sigla to the same base witness,
648 # then that witness may attest to multiple readings in the same unit:
649 readings_by_witness_for_unit[base_wit] = [
650 (min(readings_by_witness_for_unit[base_wit][i] + rdg_support[i], 1))
651 for i in range(len(rdg_support))
652 ]
653 return readings_by_witness_for_unit
655 def parse_readings_by_witness(self):
656 """Populates the internal dictionary mapping witness IDs to a list of their reading support sets for all variation units, and then fills the empty reading support sets for witnesses of type "corrector" with the entries of the previous witness."""
657 if self.verbose:
658 print("Populating internal dictionary of witness readings...")
659 t0 = time.time()
660 # Initialize the data structures to be populated here:
661 self.readings_by_witness = {}
662 self.variation_unit_ids = []
663 for wit in self.witnesses:
664 self.readings_by_witness[wit.id] = []
665 # Populate them for each variation unit:
666 for vu in self.variation_units:
667 readings_by_witness_for_unit = self.get_readings_by_witness_for_unit(vu)
668 if len(readings_by_witness_for_unit) > 0:
669 self.variation_unit_ids.append(vu.id)
670 for wit in readings_by_witness_for_unit:
671 self.readings_by_witness[wit].append(readings_by_witness_for_unit[wit])
672 # Optionally, fill the lacunae of the correctors:
673 if self.fill_corrector_lacunae:
674 filled_readings = []
675 for i, wit in enumerate(self.witnesses):
676 # If this is the first witness, then it shouldn't be a corrector (since there is no previous witness against which to compare it):
677 if i == 0:
678 filled_readings = list(self.readings_by_witness[wit.id])
679 continue
680 # Otherwise, if this witness is not a corrector, then skip it:
681 if wit.type != "corrector":
682 filled_readings = list(self.readings_by_witness[wit.id])
683 continue
684 # Otherwise, add this corrector's extant readings to the filled readings list:
685 for j in range(len(self.readings_by_witness[wit.id])):
686 if sum(self.readings_by_witness[wit.id][j]) != 0:
687 filled_readings[j] = list(self.readings_by_witness[wit.id][j])
688 # If a threshold of extant readings is specified, then check if this corrector meets it, and skip it if not:
689 if self.fill_correctors_threshold is not None:
690 # If there is a threshold, then first check the proportion of variation units at which this witness is not lacunose:
691 proportion_extant = sum(
692 [
693 1
694 for j in range(len(self.readings_by_witness[wit.id]))
695 if sum(self.readings_by_witness[wit.id][j]) != 0
696 ]
697 ) / len(self.readings_by_witness[wit.id])
698 # If this corrector does not exceed the threshold, then don't fill it,
699 # but do update the running set of readings for the previous corrector and first hand:
700 if proportion_extant < self.fill_correctors_threshold:
701 continue
702 # Otherwise, fill every lacuna in this corrector based on the filled readings list:
703 for j in range(len(self.readings_by_witness[wit.id])):
704 if sum(self.readings_by_witness[wit.id][j]) == 0:
705 self.readings_by_witness[wit.id][j] = list(filled_readings[j])
706 t1 = time.time()
707 if self.verbose:
708 print(
709 "Populated dictionary for %d witnesses over %d substantive variation units in %0.4fs."
710 % (len(self.witnesses), len(self.variation_unit_ids), t1 - t0)
711 )
712 return
714 def filter_fragmentary_witnesses(self, xml):
715 """Filters the original witness list and readings by witness dictionary to exclude witnesses whose proportions of extant passages fall below the fragmentary readings threshold."""
716 if self.verbose:
717 print(
718 "Filtering fragmentary witnesses (extant in < %f of all variation units) out of internal witness list and dictionary of witness readings..."
719 % self.fragmentary_threshold
720 )
721 t0 = time.time()
722 fragmentary_witness_set = set()
723 # Proceed for each witness in order:
724 for wit in self.witnesses:
725 wit_id = wit.id
726 # We count the number of variation units at which this witness has an extant (i.e., non-missing) reading:
727 extant_reading_count = 0
728 total_reading_count = len(self.readings_by_witness[wit.id])
729 # Proceed through all reading support lists:
730 for rdg_support in self.readings_by_witness[wit_id]:
731 # If the current reading support list is not all zeroes, then increment this witness's count of extant readings:
732 if sum(rdg_support) != 0:
733 extant_reading_count += 1
734 # If the proportion of extant readings falls below the threshold, then add this witness to the list of fragmentary witnesses:
735 if extant_reading_count / total_reading_count < self.fragmentary_threshold:
736 fragmentary_witness_set.add(wit_id)
737 # Then filter the witness list to exclude the fragmentary witnesses:
738 filtered_witnesses = [wit for wit in self.witnesses if wit.id not in fragmentary_witness_set]
739 self.witnesses = filtered_witnesses
740 # Then remove the entries for the fragmentary witnesses from the witnesses-to-readings dictionary:
741 for wit_id in fragmentary_witness_set:
742 del self.readings_by_witness[wit_id]
743 t1 = time.time()
744 if self.verbose:
745 print(
746 "Filtered out %d fragmentary witness(es) (%s) in %0.4fs."
747 % (len(fragmentary_witness_set), str(list(fragmentary_witness_set)), t1 - t0)
748 )
749 return
751 def get_nexus_symbols(self):
752 """Returns a list of one-character symbols needed to represent the states of all substantive readings in NEXUS.
754 The number of symbols equals the maximum number of substantive readings at any variation unit.
756 Returns:
757 A list of individual characters representing states in readings.
758 """
759 # NOTE: IQTREE does not appear to support symbols outside of 0-9 and a-z, and its base symbols must be case-insensitive.
760 # The official version of MrBayes is likewise limited to 32 symbols.
761 # But PAUP* allows up to 64 symbols, and Andrew Edmondson's fork of MrBayes does, as well.
762 # So this method will support symbols from 0-9, a-z, and A-Z (for a total of 62 states)
763 possible_symbols = list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase)
764 # The number of symbols needed is equal to the length of the longest substantive reading vector:
765 nsymbols = 0
766 # If there are no witnesses, then no symbols are needed at all:
767 if len(self.witnesses) == 0:
768 return []
769 wit_id = self.witnesses[0].id
770 for rdg_support in self.readings_by_witness[wit_id]:
771 nsymbols = max(nsymbols, len(rdg_support))
772 nexus_symbols = possible_symbols[:nsymbols]
773 return nexus_symbols
775 def to_nexus(
776 self,
777 file_addr: Union[Path, str],
778 drop_constant: bool = False,
779 char_state_labels: bool = True,
780 frequency: bool = False,
781 ambiguous_as_missing: bool = False,
782 calibrate_dates: bool = False,
783 mrbayes: bool = False,
784 clock_model: ClockModel = ClockModel.strict,
785 ):
786 """Writes this Collation to a NEXUS file with the given address.
788 Args:
789 file_addr: A string representing the path to an output NEXUS file; the file type should be .nex, .nexus, or .nxs.
790 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading.
791 char_state_labels: An optional flag indicating whether or not to include the CharStateLabels block.
792 frequency: An optional flag indicating whether to use the StatesFormat=Frequency setting
793 instead of the StatesFormat=StatesPresent setting
794 (and thus represent all states with frequency vectors rather than symbols).
795 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation.
796 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data.
797 If this flag is set, then only base symbols will be generated for the NEXUS file.
798 It is only applied if the frequency option is False.
799 calibrate_dates: An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses.
800 This option is intended for inputs to BEAST 2.
801 mrbayes: An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses.
802 This option is intended for inputs to MrBayes.
803 clock_model: A ClockModel option indicating which type of clock model to use.
804 This option is intended for inputs to MrBayes and BEAST 2.
805 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified.
806 """
807 # Populate a list of sites that will correspond to columns of the sequence alignment:
808 substantive_variation_unit_ids = self.variation_unit_ids
809 if drop_constant:
810 substantive_variation_unit_ids = [
811 vu_id
812 for vu_id in self.variation_unit_ids
813 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
814 ]
815 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
816 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
817 # Start by calculating the values we will be using here:
818 ntax = len(self.witnesses)
819 nchar = len(substantive_variation_unit_ids)
820 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses]
821 max_taxlabel_length = max(
822 [len(taxlabel) for taxlabel in taxlabels]
823 ) # keep track of the longest taxon label for tabular alignment purposes
824 charlabels = [slugify(vu_id, lowercase=False, separator='_') for vu_id in substantive_variation_unit_ids]
825 missing_symbol = '?'
826 symbols = self.get_nexus_symbols()
827 # Generate all parent folders for this file that don't already exist:
828 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
829 # Then write the file:
830 pbar = tqdm()
831 with open(file_addr, "w", encoding="utf-8") as f:
832 # Start with the NEXUS header:
833 f.write("#NEXUS\n\n")
834 # Then begin the data block:
835 f.write("Begin DATA;\n")
836 # Write the collation matrix dimensions:
837 f.write("\tDimensions ntax=%d nchar=%d;\n" % (ntax, nchar))
838 # Write the format subblock:
839 f.write("\tFormat\n")
840 f.write("\t\tDataType=Standard\n")
841 f.write("\t\tMissing=%s\n" % missing_symbol)
842 if frequency:
843 f.write("\t\tStatesFormat=Frequency\n")
844 f.write("\t\tSymbols=\"%s\";\n" % (" ".join(symbols)))
845 # If the char_state_labels is set, then write the labels for character-state labels, with each on its own line:
846 if char_state_labels:
847 f.write("\tCharStateLabels")
848 vu_ind = 1
849 for vu in self.variation_units:
850 if vu.id not in substantive_variation_unit_ids_set:
851 continue
852 if vu_ind == 1:
853 f.write("\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_')))
854 else:
855 f.write(",\n\t\t%d %s /" % (vu_ind, slugify(vu.id, lowercase=False, separator='_')))
856 rdg_ind = 0
857 for rdg in vu.readings:
858 key = tuple([vu.id, rdg.id])
859 if key not in substantive_variation_unit_reading_tuples_set:
860 continue
861 ascii_rdg_text = slugify(
862 rdg.text, lowercase=False, separator='_', replacements=[['η', 'h'], ['ω', 'w']]
863 )
864 if ascii_rdg_text == "":
865 ascii_rdg_text = "om."
866 f.write(" %s" % ascii_rdg_text)
867 rdg_ind += 1
868 if rdg_ind > 0:
869 vu_ind += 1
870 f.write(";\n")
871 # Write the matrix subblock:
872 f.write("\tMatrix")
873 with tqdm(total=len(self.witnesses)) as pbar:
874 for i, wit in enumerate(self.witnesses):
875 taxlabel = taxlabels[i]
876 if frequency:
877 sequence = "\n\t\t" + taxlabel
878 for j, vu_id in enumerate(self.variation_unit_ids):
879 if vu_id not in substantive_variation_unit_ids_set:
880 continue
881 rdg_support = self.readings_by_witness[wit.id][j]
882 sequence += "\n\t\t\t"
883 # If this reading is lacunose in this witness, then use the missing character:
884 if sum(rdg_support) == 0:
885 sequence += missing_symbol
886 continue
887 # Otherwise, print out its frequencies for different readings in parentheses:
888 sequence += "("
889 for k, w in enumerate(rdg_support):
890 sequence += "%s:%0.4f" % (symbols[k], w)
891 if k < len(rdg_support) - 1:
892 sequence += " "
893 sequence += ")"
894 else:
895 sequence = "\n\t\t" + taxlabel
896 # Add enough space after this label ensure that all sequences are nicely aligned:
897 sequence += " " * (max_taxlabel_length - len(taxlabel) + 1)
898 for j, vu_id in enumerate(self.variation_unit_ids):
899 if vu_id not in substantive_variation_unit_ids_set:
900 continue
901 rdg_support = self.readings_by_witness[wit.id][j]
902 # If this reading is lacunose in this witness, then use the missing character:
903 if sum(rdg_support) == 0:
904 sequence += missing_symbol
905 continue
906 rdg_inds = [
907 k for k, w in enumerate(rdg_support) if w > 0
908 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them
909 # For singleton readings, just print the symbol:
910 if len(rdg_inds) == 1:
911 sequence += symbols[rdg_inds[0]]
912 continue
913 # For multiple readings, print the corresponding readings in braces or the missing symbol depending on input settings:
914 if ambiguous_as_missing:
915 sequence += missing_symbol
916 else:
917 sequence += "{%s}" % "".join([str(rdg_ind) for rdg_ind in rdg_inds])
918 f.write("%s" % (sequence))
919 pbar.update(1)
920 f.write(";\n")
921 # End the data block:
922 f.write("End;")
923 # If calibrate_dates is set, then add the assumptions block:
924 if calibrate_dates:
925 f.write("\n\n")
926 f.write("Begin ASSUMPTIONS;\n")
927 # Set the scale to years:
928 f.write("\tOPTIONS SCALE = years;\n\n")
929 # Then calibrate the witness ages:
930 calibrate_strings = []
931 for i, wit in enumerate(self.witnesses):
932 taxlabel = taxlabels[i]
933 date_range = wit.date_range
934 if date_range[0] is not None:
935 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution,
936 # depending on whether the upper and lower bounds match:
937 min_age = datetime.now().year - date_range[1]
938 max_age = datetime.now().year - date_range[0]
939 if min_age == max_age:
940 calibrate_string = "\tCALIBRATE %s = fixed(%d)" % (taxlabel, min_age)
941 calibrate_strings.append(calibrate_string)
942 else:
943 calibrate_string = "\tCALIBRATE %s = uniform(%d,%d)" % (taxlabel, min_age, max_age)
944 calibrate_strings.append(calibrate_string)
945 else:
946 # If there is no lower bound on the witness's date, then use an offset log-normal distribution:
947 min_age = datetime.now().year - date_range[1]
948 calibrate_string = "\tCALIBRATE %s = offsetlognormal(%d,0.0,1.0)" % (taxlabel, min_age)
949 calibrate_strings.append(calibrate_string)
950 # Then print the calibrate strings, separated by commas and line breaks and terminated by a semicolon:
951 f.write("%s;\n\n" % ",\n".join(calibrate_strings))
952 # End the assumptions block:
953 f.write("End;")
954 # If mrbayes is set, then add the mrbayes block:
955 if mrbayes:
956 f.write("\n\n")
957 f.write("Begin MRBAYES;\n")
958 # Turn on the autoclose feature by default:
959 f.write("\tset autoclose=yes;\n")
960 # Set the branch lengths to be governed by a birth-death clock model, and set up the parameters for this model:
961 f.write("\n")
962 f.write("\tprset brlenspr = clock:birthdeath;\n")
963 f.write("\tprset speciationpr = uniform(0.0,10.0);\n")
964 f.write("\tprset extinctionpr = beta(2.0,4.0);\n")
965 f.write("\tprset sampleprob = 0.01;\n")
966 # Use the specified clock model:
967 f.write("\n")
968 if clock_model == clock_model.uncorrelated:
969 f.write("\tprset clockvarpr=igr;\n")
970 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n")
971 f.write("\tprset igrvarpr=exponential(1.0);\n")
972 else:
973 f.write("\tprset clockvarpr=strict;\n")
974 f.write("\tprset clockratepr=lognormal(0.0,1.0);\n")
975 # Set the priors on the tree age depending on the date range for the origin of the collated work:
976 f.write("\n")
977 if self.origin_date_range[0] is not None:
978 min_tree_age = (
979 datetime.now().year - self.origin_date_range[1]
980 if self.origin_date_range[1] is not None
981 else 0.0
982 )
983 max_tree_age = datetime.now().year - self.origin_date_range[0]
984 f.write("\tprset treeagepr = uniform(%d,%d);\n" % (min_tree_age, max_tree_age))
985 else:
986 min_tree_age = (
987 datetime.now().year - self.origin_date_range[1]
988 if self.origin_date_range[1] is not None
989 else 0.0
990 )
991 f.write("\tprset treeagepr = offsetgamma(%d,1.0,1.0);\n" % (min_tree_age))
992 # Then calibrate the witness ages:
993 f.write("\n")
994 f.write("\tprset nodeagepr = calibrated;\n")
995 for i, wit in enumerate(self.witnesses):
996 taxlabel = taxlabels[i]
997 date_range = wit.date_range
998 if date_range[0] is not None:
999 # If there is a lower bound on the witness's date, then use either a fixed or uniform distribution,
1000 # depending on whether the upper and lower bounds match:
1001 min_age = datetime.now().year - date_range[1]
1002 max_age = datetime.now().year - date_range[0]
1003 if min_age == max_age:
1004 f.write("\tcalibrate %s = fixed(%d);\n" % (taxlabel, min_age))
1005 else:
1006 f.write("\tcalibrate %s = uniform(%d,%d);\n" % (taxlabel, min_age, max_age))
1007 else:
1008 # If there is no lower bound on the witness's date, then use an offset gamma distribution:
1009 min_age = datetime.now().year - date_range[1]
1010 f.write("\tcalibrate %s = offsetgamma(%d,1.0,1.0);\n" % (taxlabel, min_age))
1011 f.write("\n")
1012 # Add default settings for MCMC estimation of posterior distribution:
1013 f.write("\tmcmcp ngen=100000;\n")
1014 # Write the command to run MrBayes:
1015 f.write("\tmcmc;\n")
1016 # End the assumptions block:
1017 f.write("End;")
1018 return
1020 def get_hennig86_symbols(self):
1021 """Returns a list of one-character symbols needed to represent the states of all substantive readings in Hennig86 format.
1023 The number of symbols equals the maximum number of substantive readings at any variation unit.
1025 Returns:
1026 A list of individual characters representing states in readings.
1027 """
1028 possible_symbols = (
1029 list(string.digits) + list(string.ascii_uppercase)[:22]
1030 ) # NOTE: the maximum number of symbols allowed in Hennig86 format is 32
1031 # The number of symbols needed is equal to the length of the longest substantive reading vector:
1032 nsymbols = 0
1033 # If there are no witnesses, then no symbols are needed at all:
1034 if len(self.witnesses) == 0:
1035 return []
1036 wit_id = self.witnesses[0].id
1037 for rdg_support in self.readings_by_witness[wit_id]:
1038 nsymbols = max(nsymbols, len(rdg_support))
1039 hennig86_symbols = possible_symbols[:nsymbols]
1040 return hennig86_symbols
1042 def to_hennig86(self, file_addr: Union[Path, str], drop_constant: bool = False):
1043 """Writes this Collation to a file in Hennig86 format with the given address.
1044 Note that because Hennig86 format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data.
1046 Args:
1047 file_addr: A string representing the path to an output file.
1048 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1049 """
1050 # Populate a list of sites that will correspond to columns of the sequence alignment:
1051 substantive_variation_unit_ids = self.variation_unit_ids
1052 if drop_constant:
1053 substantive_variation_unit_ids = [
1054 vu_id
1055 for vu_id in self.variation_unit_ids
1056 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1057 ]
1058 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1059 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1060 # Start by calculating the values we will be using here:
1061 ntax = len(self.witnesses)
1062 nchar = len(substantive_variation_unit_ids)
1063 taxlabels = []
1064 for wit in self.witnesses:
1065 taxlabel = wit.id
1066 # Hennig86 format requires taxon names to start with a letter, so if this is not the case, then append "WIT_" to the start of the name:
1067 if taxlabel[0] not in string.ascii_letters:
1068 taxlabel = "WIT_" + taxlabel
1069 # Then replace any disallowed characters in the string with an underscore:
1070 taxlabel = slugify(taxlabel, lowercase=False, separator='_')
1071 taxlabels.append(taxlabel)
1072 max_taxlabel_length = max(
1073 [len(taxlabel) for taxlabel in taxlabels]
1074 ) # keep track of the longest taxon label for tabular alignment purposes
1075 missing_symbol = '?'
1076 symbols = self.get_hennig86_symbols()
1077 # Generate all parent folders for this file that don't already exist:
1078 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
1079 with open(file_addr, "w", encoding="ascii") as f:
1080 # Start with the nstates header:
1081 f.write("nstates %d;\n" % len(symbols))
1082 # Then begin the xread block:
1083 f.write("xread\n")
1084 # Write the dimensions:
1085 f.write("%d %d\n" % (nchar, ntax))
1086 # Now write the matrix:
1087 with tqdm(total=len(self.witnesses)) as pbar:
1088 for i, wit in enumerate(self.witnesses):
1089 taxlabel = taxlabels[i]
1090 # Add enough space after this label ensure that all sequences are nicely aligned:
1091 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel) + 1))
1092 for j, vu_id in enumerate(self.variation_unit_ids):
1093 if vu_id not in substantive_variation_unit_ids_set:
1094 continue
1095 rdg_support = self.readings_by_witness[wit.id][j]
1096 # If this reading is lacunose in this witness, then use the missing character:
1097 if sum(rdg_support) == 0:
1098 sequence += missing_symbol
1099 continue
1100 rdg_inds = [
1101 k for k, w in enumerate(rdg_support) if w > 0
1102 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them
1103 # For singleton readings, just print the symbol:
1104 if len(rdg_inds) == 1:
1105 sequence += symbols[rdg_inds[0]]
1106 continue
1107 # For multiple readings, print the missing symbol:
1108 sequence += missing_symbol
1109 f.write("%s\n" % (sequence))
1110 pbar.update(1)
1111 f.write(";")
1112 return
1114 def get_phylip_symbols(self):
1115 """Returns a list of one-character symbols needed to represent the states of all substantive readings in PHYLIP format.
1117 The number of symbols equals the maximum number of substantive readings at any variation unit.
1119 Returns:
1120 A list of individual characters representing states in readings.
1121 """
1122 possible_symbols = (
1123 list(string.digits) + list(string.ascii_lowercase)[:22]
1124 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported
1125 # The number of symbols needed is equal to the length of the longest substantive reading vector:
1126 nsymbols = 0
1127 # If there are no witnesses, then no symbols are needed at all:
1128 if len(self.witnesses) == 0:
1129 return []
1130 wit_id = self.witnesses[0].id
1131 for rdg_support in self.readings_by_witness[wit_id]:
1132 nsymbols = max(nsymbols, len(rdg_support))
1133 phylip_symbols = possible_symbols[:nsymbols]
1134 return phylip_symbols
1136 def to_phylip(self, file_addr: Union[Path, str], drop_constant: bool = False):
1137 """Writes this Collation to a file in PHYLIP format with the given address.
1138 Note that because PHYLIP format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data.
1140 Args:
1141 file_addr: A string representing the path to an output file.
1142 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1143 """
1144 # Populate a list of sites that will correspond to columns of the sequence alignment:
1145 substantive_variation_unit_ids = self.variation_unit_ids
1146 if drop_constant:
1147 substantive_variation_unit_ids = [
1148 vu_id
1149 for vu_id in self.variation_unit_ids
1150 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1151 ]
1152 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1153 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1154 # Start by calculating the values we will be using here:
1155 ntax = len(self.witnesses)
1156 nchar = len(substantive_variation_unit_ids)
1157 taxlabels = []
1158 for wit in self.witnesses:
1159 taxlabel = wit.id
1160 # Then replace any disallowed characters in the string with an underscore:
1161 taxlabel = slugify(taxlabel, lowercase=False, separator='_')
1162 taxlabels.append(taxlabel)
1163 max_taxlabel_length = max(
1164 [len(taxlabel) for taxlabel in taxlabels]
1165 ) # keep track of the longest taxon label for tabular alignment purposes
1166 missing_symbol = '?'
1167 symbols = self.get_phylip_symbols()
1168 # Generate all parent folders for this file that don't already exist:
1169 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
1170 with open(file_addr, "w", encoding="ascii") as f:
1171 # Write the dimensions:
1172 f.write("%d %d\n" % (ntax, nchar))
1173 # Now write the matrix:
1174 for i, wit in enumerate(self.witnesses):
1175 taxlabel = taxlabels[i]
1176 # Add enough space after this label ensure that all sequences are nicely aligned:
1177 sequence = taxlabel + (" " * (max_taxlabel_length - len(taxlabel))) + "\t"
1178 for j, vu_id in enumerate(self.variation_unit_ids):
1179 if vu_id not in substantive_variation_unit_ids_set:
1180 continue
1181 rdg_support = self.readings_by_witness[wit.id][j]
1182 # If this reading is lacunose in this witness, then use the missing character:
1183 if sum(rdg_support) == 0:
1184 sequence += missing_symbol
1185 continue
1186 rdg_inds = [
1187 k for k, w in enumerate(rdg_support) if w > 0
1188 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them
1189 # For singleton readings, just print the symbol:
1190 if len(rdg_inds) == 1:
1191 sequence += symbols[rdg_inds[0]]
1192 continue
1193 # For multiple readings, print the missing symbol:
1194 sequence += missing_symbol
1195 f.write("%s\n" % (sequence))
1196 return
1198 def get_fasta_symbols(self):
1199 """Returns a list of one-character symbols needed to represent the states of all substantive readings in FASTA format.
1201 The number of symbols equals the maximum number of substantive readings at any variation unit.
1203 Returns:
1204 A list of individual characters representing states in readings.
1205 """
1206 possible_symbols = (
1207 list(string.digits) + list(string.ascii_lowercase)[:22]
1208 ) # NOTE: for RAxML, multistate characters with an alphabet sizes up to 32 are supported
1209 # The number of symbols needed is equal to the length of the longest substantive reading vector:
1210 nsymbols = 0
1211 # If there are no witnesses, then no symbols are needed at all:
1212 if len(self.witnesses) == 0:
1213 return []
1214 wit_id = self.witnesses[0].id
1215 for rdg_support in self.readings_by_witness[wit_id]:
1216 nsymbols = max(nsymbols, len(rdg_support))
1217 fasta_symbols = possible_symbols[:nsymbols]
1218 return fasta_symbols
1220 def to_fasta(self, file_addr: Union[Path, str], drop_constant: bool = False):
1221 """Writes this Collation to a file in FASTA format with the given address.
1222 Note that because FASTA format does not support NEXUS-style ambiguities, such ambiguities will be treated as missing data.
1224 Args:
1225 file_addr: A string representing the path to an output file.
1226 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1227 """
1228 # Populate a list of sites that will correspond to columns of the sequence alignment:
1229 substantive_variation_unit_ids = self.variation_unit_ids
1230 if drop_constant:
1231 substantive_variation_unit_ids = [
1232 vu_id
1233 for vu_id in self.variation_unit_ids
1234 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1235 ]
1236 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1237 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1238 # Start by calculating the values we will be using here:
1239 ntax = len(self.witnesses)
1240 nchar = len(substantive_variation_unit_ids)
1241 taxlabels = []
1242 for wit in self.witnesses:
1243 taxlabel = wit.id
1244 # Then replace any disallowed characters in the string with an underscore:
1245 taxlabel = slugify(taxlabel, lowercase=False, separator='_')
1246 taxlabels.append(taxlabel)
1247 max_taxlabel_length = max(
1248 [len(taxlabel) for taxlabel in taxlabels]
1249 ) # keep track of the longest taxon label for tabular alignment purposes
1250 missing_symbol = '?'
1251 symbols = self.get_fasta_symbols()
1252 # Generate all parent folders for this file that don't already exist:
1253 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
1254 with open(file_addr, "w", encoding="ascii") as f:
1255 # Now write the matrix:
1256 with tqdm(total=len(self.witnesses)) as pbar:
1257 for i, wit in enumerate(self.witnesses):
1258 taxlabel = taxlabels[i]
1259 # Add enough space after this label ensure that all sequences are nicely aligned:
1260 sequence = ">%s\n" % taxlabel
1261 for j, vu_id in enumerate(self.variation_unit_ids):
1262 if vu_id not in substantive_variation_unit_ids_set:
1263 continue
1264 rdg_support = self.readings_by_witness[wit.id][j]
1265 # If this reading is lacunose in this witness, then use the missing character:
1266 if sum(rdg_support) == 0:
1267 sequence += missing_symbol
1268 continue
1269 rdg_inds = [
1270 k for k, w in enumerate(rdg_support) if w > 0
1271 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them
1272 # For singleton readings, just print the symbol:
1273 if len(rdg_inds) == 1:
1274 sequence += symbols[rdg_inds[0]]
1275 continue
1276 # For multiple readings, print the missing symbol:
1277 sequence += missing_symbol
1278 f.write("%s\n" % (sequence))
1279 pbar.update(1)
1280 return
1282 def get_beast_symbols(self):
1283 """Returns a list of one-character symbols needed to represent the states of all substantive readings in BEAST format.
1285 The number of symbols equals the maximum number of substantive readings at any variation unit.
1287 Returns:
1288 A list of individual characters representing states in readings.
1289 """
1290 possible_symbols = (
1291 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase)
1292 ) # NOTE: for BEAST, any number of states should theoretically be permissible, but since code maps are required for some reason, we will limit the number of symbols to 62 for now
1293 # The number of symbols needed is equal to the length of the longest substantive reading vector:
1294 nsymbols = 0
1295 # If there are no witnesses, then no symbols are needed at all:
1296 if len(self.witnesses) == 0:
1297 return []
1298 wit_id = self.witnesses[0].id
1299 for rdg_support in self.readings_by_witness[wit_id]:
1300 nsymbols = max(nsymbols, len(rdg_support))
1301 beast_symbols = possible_symbols[:nsymbols]
1302 return beast_symbols
1304 def get_tip_date_range(self):
1305 """Gets the minimum and maximum dates attested among the witnesses.
1306 Also checks if the witness with the latest possible date has a fixed date
1307 (i.e, if the lower and upper bounds for its date are the same)
1308 and issues a warning if not, as this will cause unusual behavior in BEAST 2.
1310 Returns:
1311 A tuple containing the earliest and latest possible tip dates.
1312 """
1313 earliest_date = None
1314 earliest_wit = None
1315 latest_date = None
1316 latest_wit = None
1317 for wit in self.witnesses:
1318 wit_id = wit.id
1319 date_range = wit.date_range
1320 if date_range[0] is not None:
1321 if earliest_date is not None:
1322 earliest_wit = wit if date_range[0] < earliest_date else earliest_wit
1323 earliest_date = min(date_range[0], earliest_date)
1324 else:
1325 earliest_wit = wit
1326 earliest_date = date_range[0]
1327 if date_range[1] is not None:
1328 if latest_date is not None:
1329 latest_wit = (
1330 wit
1331 if (date_range[1] > latest_date or (date_range[0] == date_range[1] == latest_date))
1332 else latest_wit
1333 ) # the second check ensures that a witness with a fixed date is preferred to a witness with a date range that ends with the same date
1334 latest_date = max(date_range[1], latest_date)
1335 else:
1336 latest_wit = wit
1337 latest_date = date_range[1]
1338 if latest_wit.date_range[0] is None or latest_wit.date_range[0] != latest_wit.date_range[1]:
1339 print(
1340 "WARNING: the latest witness, %s, has a variable date range; this will result in problems with time-dependent substitution models and misalignment of trees in BEAST DensiTree outputs! Please ensure that witness %s has a fixed date."
1341 % (latest_wit.id, latest_wit.id)
1342 )
1343 return (earliest_date, latest_date)
1345 def get_beast_origin_span(self, tip_date_range):
1346 """Returns a tuple containing the lower and upper bounds for the height of the origin of the Birth-Death Skyline model.
1347 The upper bound on the height of the tree is the difference between the latest tip date
1348 and the lower bound on the date of the original work, if both are defined;
1349 otherwise, it is left undefined.
1350 The lower bound on the height of the tree is the difference between the latest tip date
1351 and the upper bound on the date of the original work, if both are defined;
1352 otherwise, it is the difference between the earliest tip date and the latest, if both are defined.
1354 Args:
1355 tip_date_range: A tuple containing the earliest and latest possible tip dates.
1357 Returns:
1358 A tuple containing lower and upper bounds on the origin height for the Birth-Death Skyline model.
1359 """
1360 origin_span = [0, None]
1361 # If the upper bound on the date of the work's composition is defined, then set the lower bound on the height of the origin using it and the latest tip date
1362 # (note that if it had to be defined in terms of witness date lower bounds, then this would have happened already):
1363 if self.origin_date_range[1] is not None:
1364 origin_span[0] = tip_date_range[1] - self.origin_date_range[1]
1365 # If the lower bound on the date of the work's composition is defined, then set the upper bound on the height of the origin using it and the latest tip date:
1366 if self.origin_date_range[0] is not None:
1367 origin_span[1] = tip_date_range[1] - self.origin_date_range[0]
1368 return tuple(origin_span)
1370 def get_beast_date_map(self, taxlabels):
1371 """Returns a string representing witness-to-date mappings in BEAST format.
1373 Since this format requires single dates as opposed to date ranges,
1374 witnesses with closed date ranges will be mapped to the average of their lower and upper bounds,
1375 and witnesses with open date ranges will not be mapped.
1377 Args:
1378 taxlabels: A list of slugified taxon labels.
1380 Returns:
1381 A string containing comma-separated date calibrations of the form witness_id=date.
1382 """
1383 calibrate_strings = []
1384 for i, wit in enumerate(self.witnesses):
1385 taxlabel = taxlabels[i]
1386 date_range = wit.date_range
1387 # If either end of this witness's date range is empty, then do not include it:
1388 if date_range[0] is None or date_range[1] is None:
1389 continue
1390 # Otherwise, take the midpoint of its date range as its date:
1391 date = int((date_range[0] + date_range[1]) / 2)
1392 calibrate_string = "%s=%d" % (taxlabel, date)
1393 calibrate_strings.append(calibrate_string)
1394 # Then output the full date map string:
1395 date_map = ",".join(calibrate_strings)
1396 return date_map
1398 def get_beast_code_map_for_unit(self, symbols, missing_symbol, vu_ind):
1399 """Returns a string containing state/reading code mappings in BEAST format using the given single-state and missing state symbols for the character/variation unit at the given index.
1400 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a code for a dummy state will be included.
1402 Args:
1403 vu_ind: An integer index for the desired unit.
1405 Returns:
1406 A string containing comma-separated code mappings.
1407 """
1408 vu = self.variation_units[vu_ind]
1409 vu_id = vu.id
1410 code_map = {}
1411 for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id])):
1412 code_map[symbols[k]] = str(k)
1413 # If this site is a singleton site, then add a code mapping for the dummy state:
1414 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1:
1415 code_map[symbols[1]] = str(1)
1416 # Then add a mapping for the missing state, including a dummy state if this is a singleton site:
1417 code_map[missing_symbol] = " ".join(
1418 str(k) for k in range(len(self.substantive_readings_by_variation_unit_id[vu.id]))
1419 )
1420 # If this site is a singleton site, then add the dummy state to the missing state mapping:
1421 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1:
1422 code_map[missing_symbol] = code_map[missing_symbol] + " " + str(1)
1423 # Then combine all of the mappings into a single string:
1424 code_map_string = ", ".join([code + "=" + code_map[code] for code in code_map])
1425 return code_map_string
1427 def get_beast_equilibrium_frequencies_for_unit(self, vu_ind):
1428 """Returns a string containing state/reading equilibrium frequencies in BEAST format for the character/variation unit at the given index.
1429 Since the equilibrium frequencies are not used with the substitution models, the equilibrium frequencies simply correspond to a uniform distribution over the states.
1430 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then an equilibrium frequency of 0 will be added for a dummy state.
1432 Args:
1433 vu_ind: An integer index for the desired unit.
1435 Returns:
1436 A string containing space-separated equilibrium frequencies.
1437 """
1438 vu = self.variation_units[vu_ind]
1439 vu_id = vu.id
1440 # If this unit is a singleton, then return the string "0.5 0.5":
1441 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1:
1442 return "0.5 0.5"
1443 # Otherwise, set the equilibrium frequencies according to a uniform distribution:
1444 equilibrium_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len(
1445 self.substantive_readings_by_variation_unit_id[vu_id]
1446 )
1447 equilibrium_frequencies_string = " ".join([str(w) for w in equilibrium_frequencies])
1448 return equilibrium_frequencies_string
1450 def get_beast_root_frequencies_for_unit(self, vu_ind):
1451 """Returns a string containing state/reading root frequencies in BEAST format for the character/variation unit at the given index.
1452 The root frequencies are calculated from the intrinsic odds at this unit.
1453 If the variation unit at the given index is a singleton unit (i.e., if it has only one substantive reading), then a root frequency of 0 will be added for a dummy state.
1454 If no intrinsic odds are specified, then a uniform distribution over all states is assumed.
1456 Args:
1457 vu_ind: An integer index for the desired unit.
1459 Returns:
1460 A string containing space-separated root frequencies.
1461 """
1462 vu = self.variation_units[vu_ind]
1463 vu_id = vu.id
1464 intrinsic_relations = vu.intrinsic_relations
1465 intrinsic_odds_by_id = self.intrinsic_odds_by_id
1466 # If this unit is a singleton, then return the string "1 0":
1467 if len(self.substantive_readings_by_variation_unit_id[vu_id]) == 1:
1468 return "1 0"
1469 # If this unit has no intrinsic odds, then assume a uniform distribution over all readings:
1470 if len(intrinsic_relations) == 0:
1471 root_frequencies = [1.0 / len(self.substantive_readings_by_variation_unit_id[vu_id])] * len(
1472 self.substantive_readings_by_variation_unit_id[vu_id]
1473 )
1474 root_frequencies_string = " ".join([str(w) for w in root_frequencies])
1475 return root_frequencies_string
1476 # We will populate the root frequencies based on the intrinsic odds of the readings:
1477 root_frequencies_by_id = {}
1478 for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id]:
1479 root_frequencies_by_id[rdg_id] = 0
1480 # First, construct an adjacency list for efficient edge iteration:
1481 neighbors_by_source = {}
1482 for edge in intrinsic_relations:
1483 s = edge[0]
1484 t = edge[1]
1485 if s not in neighbors_by_source:
1486 neighbors_by_source[s] = []
1487 if t not in neighbors_by_source:
1488 neighbors_by_source[t] = []
1489 neighbors_by_source[s].append(t)
1490 # Next, identify all readings that are not targeted by any intrinsic odds relation:
1491 in_degree_by_reading = {}
1492 for edge in intrinsic_relations:
1493 s = edge[0]
1494 t = edge[1]
1495 if s not in in_degree_by_reading:
1496 in_degree_by_reading[s] = 0
1497 if t not in in_degree_by_reading:
1498 in_degree_by_reading[t] = 0
1499 in_degree_by_reading[t] += 1
1500 starting_nodes = [t for t in in_degree_by_reading if in_degree_by_reading[t] == 0]
1501 # Set the root frequencies for these readings to 1 (they will be normalized later):
1502 for starting_node in starting_nodes:
1503 root_frequencies_by_id[starting_node] = 1.0
1504 # Next, set the frequencies for the remaining readings recursively using the adjacency list:
1505 def update_root_frequencies(s):
1506 for t in neighbors_by_source[s]:
1507 intrinsic_category = intrinsic_relations[(s, t)]
1508 odds = (
1509 intrinsic_odds_by_id[intrinsic_category]
1510 if intrinsic_odds_by_id[intrinsic_category] is not None
1511 else 1.0
1512 ) # TODO: This needs to be handled using parameters once we have it implemented in BEAST
1513 root_frequencies_by_id[t] = root_frequencies_by_id[s] / odds
1514 update_root_frequencies(t)
1515 return
1517 for starting_node in starting_nodes:
1518 update_root_frequencies(starting_node)
1519 # Then produce a normalized vector of root frequencies that corresponds to a probability distribution:
1520 root_frequencies = [
1521 root_frequencies_by_id[rdg_id] for rdg_id in self.substantive_readings_by_variation_unit_id[vu_id]
1522 ]
1523 total_frequencies = sum(root_frequencies)
1524 for k in range(len(root_frequencies)):
1525 root_frequencies[k] = root_frequencies[k] / total_frequencies
1526 root_frequencies_string = " ".join([str(w) for w in root_frequencies])
1527 return root_frequencies_string
1529 def to_beast(
1530 self,
1531 file_addr: Union[Path, str],
1532 drop_constant: bool = False,
1533 clock_model: ClockModel = ClockModel.strict,
1534 ancestral_logger: AncestralLogger = AncestralLogger.state,
1535 seed: int = None,
1536 ):
1537 """Writes this Collation to a file in BEAST format with the given address.
1539 Args:
1540 file_addr: A string representing the path to an output file.
1541 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1542 clock_model: A ClockModel option indicating which clock model to use.
1543 ancestral_logger: An AncestralLogger option indicating which class of logger (if any) to use for ancestral states.
1544 seed: A seed for random number generation (for setting initial values of unspecified transcriptional rates).
1545 """
1546 # Populate a list of sites that will correspond to columns of the sequence alignment:
1547 substantive_variation_unit_ids = self.variation_unit_ids
1548 if drop_constant:
1549 substantive_variation_unit_ids = [
1550 vu_id
1551 for vu_id in self.variation_unit_ids
1552 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1553 ]
1554 # Populate sets of substantive variation unit IDs and substantive variant reading tuples:
1555 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1556 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1557 # First, calculate the values we will be using for the main template:
1558 taxlabels = [slugify(wit.id, lowercase=False, separator='_') for wit in self.witnesses]
1559 missing_symbol = '?'
1560 symbols = self.get_beast_symbols()
1561 tip_date_range = self.get_tip_date_range()
1562 origin_span = self.get_beast_origin_span(tip_date_range)
1563 date_map = self.get_beast_date_map(taxlabels)
1564 # Then populate the necessary objects for the BEAST XML Jinja template:
1565 witness_objects = []
1566 variation_unit_objects = []
1567 intrinsic_category_objects = []
1568 transcriptional_category_objects = []
1569 with tqdm(total=len(self.witnesses)) as pbar:
1570 # Start with witnesses:
1571 for i, wit in enumerate(self.witnesses):
1572 witness_object = {}
1573 # Copy the ID for this witness:
1574 witness_object["id"] = wit.id
1575 # Copy its date bounds:
1576 witness_object["min_date"] = wit.date_range[0]
1577 witness_object["max_date"] = wit.date_range[1]
1578 # Populate its sequence from its entries in the witness's readings dictionary:
1579 sequence = ""
1580 for j, rdg_support in enumerate(self.readings_by_witness[wit.id]):
1581 vu_id = self.variation_unit_ids[j]
1582 # Skip any variation units deemed non-substantive:
1583 if vu_id not in substantive_variation_unit_ids:
1584 continue
1585 # If this witness has a certainty of 0 for all readings, then it is a gap; assign a likelihood of 1 to each reading:
1586 if sum(rdg_support) == 0:
1587 for k, w in enumerate(rdg_support):
1588 sequence += "1"
1589 if k < len(rdg_support) - 1:
1590 sequence += ", "
1591 else:
1592 if len(rdg_support) > 1:
1593 sequence += "; "
1594 else:
1595 # If this site is a singleton site, then add a dummy state:
1596 sequence += ", 0; "
1597 # Otherwise, read the probabilities as they are given:
1598 else:
1599 for k, w in enumerate(rdg_support):
1600 sequence += str(w)
1601 if k < len(rdg_support) - 1:
1602 sequence += ", "
1603 else:
1604 if len(rdg_support) > 1:
1605 sequence += "; "
1606 else:
1607 # If this site is a singleton site, then add a dummy state:
1608 sequence += ", 0; "
1609 # Strip the final semicolon and space from the sequence:
1610 sequence = sequence.strip("; ")
1611 # Then set the witness object's sequence attribute to this string:
1612 witness_object["sequence"] = sequence
1613 witness_objects.append(witness_object)
1614 pbar.update(1)
1615 # Then proceed to variation units:
1616 for j, vu in enumerate(self.variation_units):
1617 if vu.id not in substantive_variation_unit_ids_set:
1618 continue
1619 variation_unit_object = {}
1620 # Copy the one-based index of this variation unit:
1621 variation_unit_object["index"] = j + 1
1622 # Copy the ID of this variation unit:
1623 variation_unit_object["id"] = vu.id
1624 # Set a flag indicating if this variation unit is constant:
1625 variation_unit_object["is_constant"] = (
1626 True if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1 else False
1627 )
1628 # Copy this variation unit's number of substantive readings,
1629 # setting it to 2 if it is a singleton unit:
1630 variation_unit_object["nstates"] = (
1631 len(self.substantive_readings_by_variation_unit_id[vu.id])
1632 if len(self.substantive_readings_by_variation_unit_id[vu.id]) > 1
1633 else 2
1634 )
1635 # Then construct the code map for this unit:
1636 variation_unit_object["code_map"] = self.get_beast_code_map_for_unit(symbols, missing_symbol, j)
1637 # Then populate a comma-separated string of reading labels for this unit:
1638 rdg_texts = []
1639 vu_label = vu.id
1640 for rdg in vu.readings:
1641 key = tuple([vu.id, rdg.id])
1642 if key not in substantive_variation_unit_reading_tuples_set:
1643 continue
1644 rdg_text = slugify(rdg.text, lowercase=False, allow_unicode=True, separator='_')
1645 # Replace any empty reading text with an omission marker:
1646 if rdg_text == "":
1647 rdg_text = "om."
1648 rdg_texts.append(rdg_text)
1649 # If this site is a singleton site, then add a dummy reading for the dummy state:
1650 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1:
1651 rdg_texts.append("DUMMY")
1652 rdg_texts_string = ", ".join(rdg_texts)
1653 variation_unit_object["rdg_texts"] = rdg_texts_string
1654 # Then populate this unit's equilibrium frequency string and its root frequency string:
1655 variation_unit_object["equilibrium_frequencies"] = self.get_beast_equilibrium_frequencies_for_unit(j)
1656 variation_unit_object["root_frequencies"] = self.get_beast_root_frequencies_for_unit(j)
1657 # Then populate a dictionary mapping epoch height ranges to lists of off-diagonal entries for substitution models:
1658 rate_objects_by_epoch_height_range = {}
1659 epoch_height_ranges = []
1660 # Then proceed based on whether the transcriptional relations for this variation unit have been defined:
1661 if len(vu.transcriptional_relations_by_date_range) == 0:
1662 # If there are no transcriptional relations, then map the epoch range of (None, None) to their list of off-diagonal entries:
1663 epoch_height_ranges.append((None, None))
1664 rate_objects_by_epoch_height_range[(None, None)] = []
1665 rate_objects = rate_objects_by_epoch_height_range[(None, None)]
1666 if len(self.substantive_readings_by_variation_unit_id[vu.id]) == 1:
1667 # If this is a singleton site, then use an arbitrary 2x2 rate matrix:
1668 rate_objects.append({"transcriptional_categories": ["default"], "expression": None})
1669 rate_objects.append({"transcriptional_categories": ["default"], "expression": None})
1670 else:
1671 # If this is a site with multiple substantive readings, but no transcriptional relations list,
1672 # then use a Lewis Mk substitution matrix with the appropriate number of states:
1673 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]):
1674 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]):
1675 # Skip diagonal elements:
1676 if k_1 == k_2:
1677 continue
1678 rate_objects.append({"transcriptional_categories": ["default"], "expression": None})
1679 else:
1680 # Otherwise, proceed for every date range:
1681 for date_range in vu.transcriptional_relations_by_date_range:
1682 # Get the map of transcriptional relations for reference later:
1683 transcriptional_relations = vu.transcriptional_relations_by_date_range[date_range]
1684 # Now get the epoch height range corresponding to this date range, and initialize its list in the dictionary:
1685 epoch_height_range = [None, None]
1686 epoch_height_range[0] = tip_date_range[1] - date_range[1] if date_range[1] is not None else None
1687 epoch_height_range[1] = tip_date_range[1] - date_range[0] if date_range[0] is not None else None
1688 epoch_height_range = tuple(epoch_height_range)
1689 epoch_height_ranges.append(epoch_height_range)
1690 rate_objects_by_epoch_height_range[epoch_height_range] = []
1691 rate_objects = rate_objects_by_epoch_height_range[epoch_height_range]
1692 # Then proceed for every pair of readings in this unit:
1693 for k_1, rdg_id_1 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]):
1694 for k_2, rdg_id_2 in enumerate(self.substantive_readings_by_variation_unit_id[vu.id]):
1695 # Skip diagonal elements:
1696 if k_1 == k_2:
1697 continue
1698 # If the first reading has no transcriptional relation to the second in this unit, then use the default rate:
1699 if (rdg_id_1, rdg_id_2) not in transcriptional_relations:
1700 rate_objects.append({"transcriptional_categories": ["default"], "expression": None})
1701 continue
1702 # Otherwise, if only one category of transcriptional relations holds between the first and second readings,
1703 # then use its rate:
1704 if len(transcriptional_relations[(rdg_id_1, rdg_id_2)]) == 1:
1705 # If there is only one such category, then add its rate as a standalone var element:
1706 transcriptional_category = list(transcriptional_relations[(rdg_id_1, rdg_id_2)])[0]
1707 rate_objects.append(
1708 {"transcriptional_categories": [transcriptional_category], "expression": None}
1709 )
1710 continue
1711 # If there is more than one, then add a var element that is a sum of the individual categories' rates:
1712 transcriptional_categories = list(transcriptional_relations[(rdg_id_1, rdg_id_2)])
1713 args = []
1714 for transcriptional_category in transcriptional_categories:
1715 args.append("%s_rate" % transcriptional_category)
1716 args_string = " ".join(args)
1717 ops = ["+"] * (len(args) - 1)
1718 ops_string = " ".join(ops)
1719 expression_string = " ".join([args_string, ops_string])
1720 rate_objects.append(
1721 {
1722 "transcriptional_categories": transcriptional_categories,
1723 "expression": expression_string,
1724 }
1725 )
1726 # Now reorder the list of epoch height ranges, and get a list of non-null epoch dates in ascending order from the dictionary:
1727 epoch_height_ranges.reverse()
1728 epoch_heights = [
1729 epoch_height_range[0] for epoch_height_range in epoch_height_ranges if epoch_height_range[0] is not None
1730 ]
1731 # Then add all of these data structures to the variation unit object:
1732 variation_unit_object["epoch_heights"] = epoch_heights
1733 variation_unit_object["epoch_heights_string"] = " ".join(
1734 [str(epoch_height) for epoch_height in epoch_heights]
1735 )
1736 variation_unit_object["epoch_height_ranges"] = epoch_height_ranges
1737 variation_unit_object["epoch_rates"] = [
1738 rate_objects_by_epoch_height_range[epoch_height_range] for epoch_height_range in epoch_height_ranges
1739 ]
1740 variation_unit_objects.append(variation_unit_object)
1741 # Then proceed to intrinsic odds categories:
1742 for intrinsic_category in self.intrinsic_categories:
1743 intrinsic_category_object = {}
1744 # Copy the ID of this intrinsic category:
1745 intrinsic_category_object["id"] = intrinsic_category
1746 # Then copy the odds factors associated with this intrinsic category,
1747 # setting it to 1.0 if it is not specified and setting the estimate attribute accordingly:
1748 odds = self.intrinsic_odds_by_id[intrinsic_category]
1749 intrinsic_category_object["odds"] = odds if odds is not None else 1.0
1750 intrinsic_category_object["estimate"] = "false" if odds is not None else "true"
1751 intrinsic_category_objects.append(intrinsic_category_object)
1752 # Then proceed to transcriptional rate categories:
1753 rng = np.random.default_rng(seed)
1754 for transcriptional_category in self.transcriptional_categories:
1755 transcriptional_category_object = {}
1756 # Copy the ID of this transcriptional category:
1757 transcriptional_category_object["id"] = transcriptional_category
1758 # Then copy the rate of this transcriptional category,
1759 # setting it to a random number sampled from a Gamma distribution if it is not specified and setting the estimate attribute accordingly:
1760 rate = self.transcriptional_rates_by_id[transcriptional_category]
1761 transcriptional_category_object["rate"] = rate if rate is not None else rng.gamma(5.0, 2.0)
1762 transcriptional_category_object["estimate"] = "false" if rate is not None else "true"
1763 transcriptional_category_objects.append(transcriptional_category_object)
1764 # Now render the output XML file using the Jinja template:
1765 env = Environment(loader=PackageLoader("teiphy", "templates"), autoescape=select_autoescape())
1766 template = env.get_template("beast_template.xml")
1767 rendered = template.render(
1768 nsymbols=len(symbols),
1769 date_map=date_map,
1770 origin_span=origin_span,
1771 clock_model=clock_model.value,
1772 clock_rate_categories=2 * len(self.witnesses) - 1,
1773 ancestral_logger=ancestral_logger.value,
1774 witnesses=witness_objects,
1775 variation_units=variation_unit_objects,
1776 non_constant_variation_units=[
1777 variation_unit_object
1778 for variation_unit_object in variation_unit_objects
1779 if not variation_unit_object["is_constant"]
1780 ],
1781 constant_variation_unit_filter=",".join(
1782 [
1783 str(variation_unit_object["index"])
1784 for variation_unit_object in variation_unit_objects
1785 if variation_unit_object["is_constant"]
1786 ]
1787 ),
1788 intrinsic_categories=intrinsic_category_objects,
1789 transcriptional_categories=transcriptional_category_objects,
1790 )
1791 # Generate all parent folders for this file that don't already exist:
1792 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
1793 with open(file_addr, "w", encoding="utf-8") as f:
1794 f.write(rendered)
1795 return
1797 def to_numpy(self, drop_constant: bool = False, split_missing: SplitMissingType = None):
1798 """Returns this Collation in the form of a NumPy array, along with arrays of its row and column labels.
1800 Args:
1801 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1802 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings.
1803 If not specified, then missing data is ignored (i.e., all states are 0).
1804 If "uniform", then the contribution of 1 is divided evenly over all substantive readings.
1805 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing.
1807 Returns:
1808 A NumPy array with a row for each substantive reading and a column for each witness.
1809 A list of substantive reading ID strings.
1810 A list of witness ID strings.
1811 """
1812 # Populate a list of sites that will correspond to columns of the sequence alignment:
1813 substantive_variation_unit_ids = self.variation_unit_ids
1814 if drop_constant:
1815 substantive_variation_unit_ids = [
1816 vu_id
1817 for vu_id in self.variation_unit_ids
1818 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1819 ]
1820 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1821 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
1822 # Initialize the output array with the appropriate dimensions:
1823 reading_labels = []
1824 for vu in self.variation_units:
1825 if vu.id not in substantive_variation_unit_ids_set:
1826 continue
1827 for rdg in vu.readings:
1828 key = tuple([vu.id, rdg.id])
1829 if key in substantive_variation_unit_reading_tuples_set:
1830 reading_labels.append(vu.id + ", " + rdg.text)
1831 witness_labels = [wit.id for wit in self.witnesses]
1832 matrix = np.zeros((len(reading_labels), len(witness_labels)), dtype=float)
1833 # For each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings:
1834 support_proportions_by_unit = {}
1835 for j, vu_id in enumerate(self.variation_unit_ids):
1836 if vu_id not in substantive_variation_unit_ids_set:
1837 continue
1838 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id])
1839 for i, wit in enumerate(self.witnesses):
1840 rdg_support = self.readings_by_witness[wit.id][j]
1841 for l, w in enumerate(rdg_support):
1842 support_proportions[l] += w
1843 norm = (
1844 sum(support_proportions) if sum(support_proportions) > 0 else 1.0
1845 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero
1846 for l in range(len(support_proportions)):
1847 support_proportions[l] = support_proportions[l] / norm
1848 support_proportions_by_unit[vu_id] = support_proportions
1849 # Then populate it with the appropriate values:
1850 col_ind = 0
1851 with tqdm(total=len(self.witnesses)) as pbar:
1852 for i, wit in enumerate(self.witnesses):
1853 row_ind = 0
1854 for j, vu_id in enumerate(self.variation_unit_ids):
1855 if vu_id not in substantive_variation_unit_ids_set:
1856 continue
1857 rdg_support = self.readings_by_witness[wit.id][j]
1858 # If this reading support vector sums to 0, then this is missing data; handle it as specified:
1859 if sum(rdg_support) == 0:
1860 if split_missing == SplitMissingType.uniform:
1861 for l in range(len(rdg_support)):
1862 matrix[row_ind, col_ind] = 1 / len(rdg_support)
1863 row_ind += 1
1864 elif split_missing == SplitMissingType.proportional:
1865 for l in range(len(rdg_support)):
1866 matrix[row_ind, col_ind] = support_proportions_by_unit[vu_id][l]
1867 row_ind += 1
1868 else:
1869 row_ind += len(rdg_support)
1870 # Otherwise, add its coefficients normally:
1871 else:
1872 for l in range(len(rdg_support)):
1873 matrix[row_ind, col_ind] = rdg_support[l]
1874 row_ind += 1
1875 col_ind += 1
1876 pbar.update(1)
1877 return matrix, reading_labels, witness_labels
1879 def get_ext_matrix(self, drop_constant: bool = False, split_missing: SplitMissingType = None):
1880 """Returns a NumPy matrix containing a row and column for each witness and the number of variation units shared by the row and column witnesses in each cell.
1881 Note that if the split_missing option is specified, all variation units are counted.
1883 Args:
1884 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1885 Default value is False.
1886 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings.
1887 If not specified, then missing data is ignored (i.e., all states are 0).
1888 If "uniform", then the contribution of 1 is divided evenly over all substantive readings.
1889 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing.
1891 Returns:
1892 A NumPy matrix with a row and column for each witness and the number of variation units shared by the row and column witnesses in each cell.
1893 """
1894 # Populate a list of sites that will correspond to columns of the sequence alignment:
1895 substantive_variation_unit_ids = self.variation_unit_ids
1896 if drop_constant:
1897 substantive_variation_unit_ids = [
1898 vu_id
1899 for vu_id in self.variation_unit_ids
1900 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
1901 ]
1902 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
1903 # Then initialize the output matrix:
1904 witness_labels = [wit.id for wit in self.witnesses]
1905 ext_matrix = ext_matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int)
1906 # If the split_missing option has been specified, then all entries in the matrix will be the number of substantive variation units,
1907 # so we can just fill the matrix with this value and return it:
1908 if split_missing is not None:
1909 ext_matrix = ext_matrix = np.full(
1910 (len(witness_labels), len(witness_labels)), len(substantive_variation_unit_ids), dtype=int
1911 )
1912 return ext_matrix
1913 # Otherwise, populate the matrix for all pairs of witnesses:
1914 with tqdm(total=len(self.witnesses) ** 2) as pbar:
1915 # Then calculate the mutual information contribution for each pair of witnesses:
1916 for i, wit_1 in enumerate(witness_labels):
1917 for j, wit_2 in enumerate(witness_labels):
1918 shared_ext_units = 0
1919 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified,
1920 # so we only have to calculate it once:
1921 if i > j:
1922 pbar.update(1)
1923 continue
1924 # Otherwise, calculate the number of substantive variation units at which both of these witnesses are extant:
1925 for k, vu_id in enumerate(self.variation_unit_ids):
1926 if vu_id not in substantive_variation_unit_ids_set:
1927 continue
1928 wit_1_rdg_support = self.readings_by_witness[wit_1][k]
1929 wit_2_rdg_support = self.readings_by_witness[wit_2][k]
1930 if sum(wit_1_rdg_support) == 0.0 or sum(wit_2_rdg_support) == 0.0:
1931 continue
1932 shared_ext_units += 1
1933 ext_matrix[i][j] = shared_ext_units
1934 ext_matrix[j][i] = shared_ext_units
1935 pbar.update(1)
1936 return ext_matrix
1938 def transform_matrix(self, matrix: np.ndarray, transform_matrix: TransformMatrixType = None):
1939 """Transforms a given matrix's columns based on the specified transform_matrix option.
1941 Args:
1942 matrix (ndarray): The matrix whose columns are to be transformed.
1943 transform_matrix (TransformMatrixType, optional): A TransformMatrixType option indicating how the columns of a witness-to-witness matrix output should be transformed.
1944 Only applicable for tabular outputs in which the rows and columns correspond to the witnesses in the collation.
1946 Returns:
1947 A Pandas DataFrame corresponding to a collation matrix with reading frequencies or a long table with discrete reading states.
1948 """
1949 # If no transform_matrix option was supplied, then return the matrix as-is:
1950 if transform_matrix is None:
1951 return matrix
1952 # Otherwise, apply the specified column transformation:
1953 if transform_matrix == TransformMatrixType.stddev:
1954 means = np.mean(matrix, 0) # get the means of the columns (axis 0)
1955 stddevs = np.std(matrix, 0) # get the standard deviations of the columns (axis 0)
1956 transformed_matrix = np.full(
1957 matrix.shape, 0.0, dtype=float
1958 ) # the standard deviation can only be 0 if all values equal the mean, so a default value of 0 is acceptable
1959 np.divide(matrix - means, stddevs, out=transformed_matrix, where=(stddevs != 0))
1960 return transformed_matrix
1961 if transform_matrix == TransformMatrixType.mad:
1962 medians = np.median(matrix, 0) # get the medians of the columns (axis 0)
1963 mads = np.median(np.abs(matrix - medians), 0) # get the median absolute deviations of the columns (axis 0)
1964 transformed_matrix = np.full(
1965 matrix.shape, np.nan, dtype=float
1966 ) # the MAD can be 0 even if all values are not equal to the median, so NaN should be used if a column's MAD is 0
1967 np.divide(matrix - medians, mads, out=transformed_matrix, where=(mads != 0))
1968 return transformed_matrix
1970 def to_distance_matrix(
1971 self,
1972 drop_constant: bool = False,
1973 proportion: bool = False,
1974 show_ext: bool = False,
1975 transform_matrix: TransformMatrixType = None,
1976 ):
1977 """Transforms this Collation into a NumPy distance matrix between witnesses, along with an array of its labels for the witnesses.
1978 Distances can be computed either as counts of disagreements (the default setting), or as proportions of disagreements over all variation units where both witnesses have singleton readings.
1979 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of disagreements.
1981 Args:
1982 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
1983 Default value is False.
1984 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units.
1985 Default value is False.
1986 show_ext: An optional flag indicating whether each cell in the matrix
1987 should include the number of their extant, unambiguous variation units after the number of their disagreements.
1988 Default value is False.
1989 transform_matrix (TransformMatrixType, optional): A TransformMatrixType option indicating how the columns of the matrix should be transformed.
1991 Returns:
1992 A NumPy distance matrix with a row and column for each witness.
1993 A list of witness ID strings.
1994 """
1995 # Populate a list of sites that will correspond to columns of the sequence alignment:
1996 substantive_variation_unit_ids = self.variation_unit_ids
1997 if drop_constant:
1998 substantive_variation_unit_ids = [
1999 vu_id
2000 for vu_id in self.variation_unit_ids
2001 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
2002 ]
2003 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
2004 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
2005 # Initialize the output array with the appropriate dimensions:
2006 witness_labels = [wit.id for wit in self.witnesses]
2007 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form disagreements
2008 with tqdm(total=len(self.witnesses) ** 2) as pbar:
2009 for i, wit_1 in enumerate(witness_labels):
2010 for j, wit_2 in enumerate(witness_labels):
2011 disagreements = 0
2012 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified,
2013 # so we only have to calculate it once:
2014 if i > j:
2015 pbar.update(1)
2016 continue
2017 # Otherwise, calculate the number of units where both witnesses disagree:
2018 for k, vu_id in enumerate(self.variation_unit_ids):
2019 if vu_id not in substantive_variation_unit_ids_set:
2020 continue
2021 wit_1_rdg_support = self.readings_by_witness[wit_1][k]
2022 wit_2_rdg_support = self.readings_by_witness[wit_2][k]
2023 # If either witness is lacunose, then move on:
2024 if sum(wit_1_rdg_support) == 0.0 or sum(wit_2_rdg_support) == 0.0:
2025 continue
2026 # Otherwise, if the (potential) readings of the two witnesses do not overlap, then count them as disagreeing:
2027 if (
2028 sum([wit_1_rdg_support[l] * wit_2_rdg_support[l] for l in range(len(wit_1_rdg_support))])
2029 == 0.0
2030 ):
2031 disagreements += 1
2032 matrix[i, j] = disagreements
2033 matrix[j, i] = disagreements
2034 pbar.update(1)
2035 # Initialize a matrix for shared extant variation units for witnesses, and populate it if the proportion or show_ext option is specified:
2036 ext_matrix = None
2037 if proportion or show_ext:
2038 ext_matrix = self.get_ext_matrix(drop_constant=drop_constant)
2039 # If the proportion option is set, then divide every value in the matrix by the corresponding entry in the matrix of shared extant variation units:
2040 if proportion:
2041 proportion_matrix = np.full((len(witness_labels), len(witness_labels)), 0.0, dtype=float)
2042 np.divide(
2043 matrix, ext_matrix, out=proportion_matrix, where=(ext_matrix != 0)
2044 ) # division by 0 can occur if two witnesses have no overlapping units; leave their proportion as 0.0
2045 matrix = proportion_matrix
2046 # Then transform the columns of the main matrix as specified:
2047 matrix = self.transform_matrix(matrix, transform_matrix)
2048 # If the show_ext option is set, then append the number of shared extant variation units after the matrix's values:
2049 if show_ext:
2050 serialized_values = []
2051 for i, wit_1 in enumerate(witness_labels):
2052 serialized_values.append([])
2053 for j, wit_2 in enumerate(witness_labels):
2054 serialized_values[-1].append("/".join([str(matrix[i][j]), str(ext_matrix[i][j])]))
2055 matrix = np.array(serialized_values)
2056 return matrix, witness_labels
2058 def to_similarity_matrix(
2059 self,
2060 drop_constant: bool = False,
2061 proportion: bool = False,
2062 show_ext: bool = False,
2063 transform_matrix: TransformMatrixType = None,
2064 ):
2065 """Transforms this Collation into a NumPy similarity matrix between witnesses, along with an array of its labels for the witnesses.
2066 Similarities can be computed either as counts of agreements (the default setting), or as proportions of agreements over all variation units where both witnesses have singleton readings.
2067 Optionally, the count of units where both witnesses have singleton readings can be included after the count/proportion of agreements.
2069 Args:
2070 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
2071 Default value is False.
2072 proportion (bool, optional): An optional flag indicating whether or not to calculate similarities as proportions over extant, unambiguous variation units.
2073 Default value is False.
2074 show_ext: An optional flag indicating whether each cell in the matrix
2075 should include the number of their extant, unambiguous variation units after the number of agreements.
2076 Default value is False.
2077 transform_matrix (TransformMatrixType, optional): A TransformMatrixType option indicating how the columns of the matrix should be transformed.
2079 Returns:
2080 A NumPy agreement matrix with a row and column for each witness.
2081 A list of witness ID strings.
2082 """
2083 # Populate a list of sites that will correspond to columns of the sequence alignment:
2084 substantive_variation_unit_ids = self.variation_unit_ids
2085 if drop_constant:
2086 substantive_variation_unit_ids = [
2087 vu_id
2088 for vu_id in self.variation_unit_ids
2089 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
2090 ]
2091 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
2092 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
2093 # Initialize the output array with the appropriate dimensions:
2094 witness_labels = [wit.id for wit in self.witnesses]
2095 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=int) # ints of the form agreements
2096 with tqdm(total=len(self.witnesses) ** 2) as pbar:
2097 for i, wit_1 in enumerate(witness_labels):
2098 for j, wit_2 in enumerate(witness_labels):
2099 agreements = 0
2100 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified,
2101 # so we only have to calculate it once:
2102 if i > j:
2103 pbar.update(1)
2104 continue
2105 # Otherwise, calculate the number of units where both witnesses unambiguously agree:
2106 for k, vu_id in enumerate(self.variation_unit_ids):
2107 if vu_id not in substantive_variation_unit_ids_set:
2108 continue
2109 wit_1_rdg_support = self.readings_by_witness[wit_1][k]
2110 wit_2_rdg_support = self.readings_by_witness[wit_2][k]
2111 wit_1_rdg_inds = [l for l, w in enumerate(wit_1_rdg_support) if w > 0]
2112 wit_2_rdg_inds = [l for l, w in enumerate(wit_2_rdg_support) if w > 0]
2113 if len(wit_1_rdg_inds) != 1 or len(wit_2_rdg_inds) != 1:
2114 continue
2115 if wit_1_rdg_inds[0] == wit_2_rdg_inds[0]:
2116 agreements += 1
2117 matrix[i, j] = agreements
2118 matrix[j, i] = agreements
2119 pbar.update(1)
2120 # Initialize a matrix for shared extant variation units for witnesses, and populate it if the proportion or show_ext option is specified:
2121 ext_matrix = None
2122 if proportion or show_ext:
2123 ext_matrix = self.get_ext_matrix(drop_constant=drop_constant)
2124 # If the proportion option is set, then divide every value in the matrix by the corresponding entry in the matrix of shared extant variation units:
2125 if proportion:
2126 proportion_matrix = np.full((len(witness_labels), len(witness_labels)), 0.0, dtype=float)
2127 np.divide(
2128 matrix, ext_matrix, out=proportion_matrix, where=(ext_matrix != 0)
2129 ) # division by 0 can occur if two witnesses have no overlapping units; leave their proportion as 0.0
2130 matrix = proportion_matrix
2131 # Then transform the columns of the main matrix as specified:
2132 matrix = self.transform_matrix(matrix, transform_matrix)
2133 # If the show_ext option is set, then append the number of shared extant variation units after the matrix's values:
2134 if show_ext:
2135 serialized_values = []
2136 for i, wit_1 in enumerate(witness_labels):
2137 serialized_values.append([])
2138 for j, wit_2 in enumerate(witness_labels):
2139 serialized_values[-1].append("/".join([str(matrix[i][j]), str(ext_matrix[i][j])]))
2140 matrix = np.array(serialized_values)
2141 return matrix, witness_labels
2143 def to_idf_matrix(
2144 self,
2145 drop_constant: bool = False,
2146 split_missing: SplitMissingType = None,
2147 proportion: bool = False,
2148 show_ext: bool = False,
2149 transform_matrix: TransformMatrixType = None,
2150 ):
2151 """Transforms this Collation into a NumPy matrix of agreements between witnesses weighted by inverse document frequency (IDF), along with an array of its labels for the witnesses.
2152 The IDF weight of an agreement on a given reading is the information content -log(Pr(R)) of the event R of randomly sampling a witness with that reading.
2153 The IDF-weighted agreement score for two witnesses is the sum of the IDF weights for the readings at which they agree.
2154 Where any witness is ambiguous, it contributes to its potential readings' sampling probabilities in proportion to its degrees of support for those readings.
2155 Similarly, where one or both target witnesses are ambiguous, the expected information content of their agreement is calculated based on the probabilities of their having the same reading.
2156 If a split_missing argument is supplied, then lacunae are handled in the same way.
2157 If the proportion option is set to True, then each cell will contain the mean IDF weight for its corresponding pair of witnesses over all variation units at which both are extant.
2158 (If the split_missing argument is also specified, then the mean is taken over all substantive variation units.)
2160 Args:
2161 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
2162 Default value is False.
2163 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings.
2164 If not specified, then missing data is ignored (i.e., all states are 0).
2165 If "uniform", then the contribution of 1 is divided evenly over all substantive readings.
2166 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing.
2167 proportion (bool, optional): An optional flag indicating whether or not to calculate the mean IDF weight of each pair of witnesses' agreements as opposed to the total IDF weight of their agreements.
2168 Default value is False.
2169 show_ext: An optional flag indicating whether each cell in the matrix
2170 should include the number of their extant, unambiguous variation units after the number of agreements.
2171 Default value is False.
2172 transform_matrix (TransformMatrixType, optional): A TransformMatrixType option indicating how the columns of the matrix should be transformed.
2174 Returns:
2175 A NumPy IDF-weighted agreement matrix with a row and column for each witness.
2176 A list of witness ID strings.
2177 """
2178 # Populate a list of sites that will correspond to columns of the sequence alignment:
2179 substantive_variation_unit_ids = self.variation_unit_ids
2180 if drop_constant:
2181 substantive_variation_unit_ids = [
2182 vu_id
2183 for vu_id in self.variation_unit_ids
2184 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
2185 ]
2186 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
2187 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
2188 # Initialize the output array with the appropriate dimensions:
2189 witness_labels = [wit.id for wit in self.witnesses]
2190 # If the split_missing option is "proportional", then for each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings:
2191 support_proportions_by_unit = {}
2192 if split_missing == SplitMissingType.proportional:
2193 for k, vu_id in enumerate(self.variation_unit_ids):
2194 # Skip this variation unit if it is a dropped constant site:
2195 if vu_id not in substantive_variation_unit_ids_set:
2196 continue
2197 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id])
2198 for i, wit in enumerate(witness_labels):
2199 rdg_support = self.readings_by_witness[wit][k]
2200 for l, w in enumerate(rdg_support):
2201 support_proportions[l] += w
2202 norm = (
2203 sum(support_proportions) if sum(support_proportions) > 0 else 1.0
2204 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero
2205 for l in range(len(support_proportions)):
2206 support_proportions[l] = support_proportions[l] / norm
2207 support_proportions_by_unit[vu_id] = support_proportions
2208 # Then populate data structures mapping each variation unit's ID to normalized reading support dictionaries
2209 # and vectors of sampling probabilities for its substantive readings:
2210 normalized_reading_support_dicts_by_vu_id = {}
2211 sampling_probabilities_by_vu_id = {}
2212 for k, vu_id in enumerate(self.variation_unit_ids):
2213 # Skip this variation unit if it is a dropped constant site:
2214 if vu_id not in substantive_variation_unit_ids_set:
2215 continue
2216 # Otherwise, populate normalized reading support vector dictionaries and sampling probability vectors in this unit:
2217 normalized_reading_support_by_wit = {}
2218 sampling_probabilities = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id])
2219 for i, wit in enumerate(witness_labels):
2220 rdg_support = self.readings_by_witness[wit][k]
2221 # Check if this reading support vector represents missing data:
2222 norm = sum(rdg_support)
2223 if norm == 0:
2224 # If this reading support vector sums to 0, then this is missing data; handle it as specified:
2225 if split_missing == SplitMissingType.uniform:
2226 rdg_support = [1 / len(rdg_support) for l in range(len(rdg_support))]
2227 elif split_missing == SplitMissingType.proportional:
2228 rdg_support = [support_proportions_by_unit[vu_id][l] for l in range(len(rdg_support))]
2229 else:
2230 # Otherwise, the data is present, though it may be ambiguous; normalize the reading probabilities to sum to 1:
2231 rdg_support = [w / norm for l, w in enumerate(rdg_support)]
2232 normalized_reading_support_by_wit[wit] = rdg_support
2233 # Then add this witness's contributions to the readings' sampling probabilities:
2234 for l, w in enumerate(normalized_reading_support_by_wit[wit]):
2235 sampling_probabilities[l] += w
2236 norm = (
2237 sum(sampling_probabilities) if sum(sampling_probabilities) > 0 else 1.0
2238 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero
2239 # Otherwise, normalize the sampling probabilities so they sum to 1:
2240 sampling_probabilities = [w / norm for w in sampling_probabilities]
2241 normalized_reading_support_dicts_by_vu_id[vu_id] = normalized_reading_support_by_wit
2242 sampling_probabilities_by_vu_id[vu_id] = sampling_probabilities
2243 # Then populate the matrix with the total expected information content for agreements between each pair of witnesses:
2244 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float)
2245 with tqdm(total=len(self.witnesses) ** 2) as pbar:
2246 for i, wit_1 in enumerate(witness_labels):
2247 for j, wit_2 in enumerate(witness_labels):
2248 total_information_content = 0.0
2249 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified,
2250 # so we only have to calculate it once:
2251 if i > j:
2252 pbar.update(1)
2253 continue
2254 # Otherwise, calculate the expected information content of agreements between these witnesses in each substantive variation unit
2255 # based on the sampling probabilities of the substantive readings in the unit:
2256 for k, vu_id in enumerate(self.variation_unit_ids):
2257 if vu_id not in substantive_variation_unit_ids_set:
2258 continue
2259 wit_1_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_1]
2260 wit_2_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_2]
2261 sampling_probabilities = sampling_probabilities_by_vu_id[vu_id]
2262 # First, calculate the probability that these two witnesses agree:
2263 probability_of_agreement = sum(
2264 [wit_1_rdg_support[l] * wit_2_rdg_support[l] for l in range(len(sampling_probabilities))]
2265 )
2266 # If these witnesses do not agree at this variation unit, then this unit contributes nothing to their total score:
2267 if probability_of_agreement == 0.0:
2268 continue
2269 # Otherwise, calculate the expected information content (in bits) of their agreement given their agreement on that reading
2270 # (skipping readings with a sampling probability of 0):
2271 expected_information_content = sum(
2272 [
2273 -math.log2(sampling_probabilities[l])
2274 * (wit_1_rdg_support[l] * wit_2_rdg_support[l] / probability_of_agreement)
2275 for l in range(len(sampling_probabilities))
2276 if sampling_probabilities[l] > 0.0
2277 ]
2278 )
2279 # Then add this contribution to the total score for these two witnesses:
2280 total_information_content += expected_information_content
2281 matrix[i, j] = total_information_content
2282 matrix[j, i] = total_information_content
2283 pbar.update(1)
2284 # Initialize a matrix for shared extant variation units for witnesses, and populate it if the proportion or show_ext option is specified:
2285 ext_matrix = None
2286 if proportion or show_ext:
2287 ext_matrix = self.get_ext_matrix(drop_constant=drop_constant, split_missing=split_missing)
2288 # If the proportion option is set, then divide every value in the matrix by the corresponding entry in the matrix of shared extant variation units:
2289 if proportion:
2290 proportion_matrix = np.full((len(witness_labels), len(witness_labels)), 0.0, dtype=float)
2291 np.divide(
2292 matrix, ext_matrix, out=proportion_matrix, where=(ext_matrix != 0)
2293 ) # division by 0 can occur if two witnesses have no overlapping units; leave their proportion as 0.0
2294 matrix = proportion_matrix
2295 # Then transform the columns of the main matrix as specified:
2296 matrix = self.transform_matrix(matrix, transform_matrix)
2297 # If the show_ext option is set, then append the number of shared extant variation units after the matrix's values:
2298 if show_ext:
2299 serialized_values = []
2300 for i, wit_1 in enumerate(witness_labels):
2301 serialized_values.append([])
2302 for j, wit_2 in enumerate(witness_labels):
2303 serialized_values[-1].append("/".join([str(matrix[i][j]), str(ext_matrix[i][j])]))
2304 matrix = np.array(serialized_values)
2305 return matrix, witness_labels
2307 def to_mi_matrix(
2308 self,
2309 drop_constant: bool = False,
2310 split_missing: SplitMissingType = None,
2311 proportion: bool = False,
2312 show_ext: bool = False,
2313 transform_matrix: TransformMatrixType = None,
2314 ):
2315 """Transforms this Collation into a NumPy matrix of the total mutual information (MI), in bits, between witnesses over all variation units, along with an array of its labels for the witnesses.
2316 This is equivalent to the total Kullback-Leibler divergence of the joint distribution of the witnesses' observed readings
2317 from the joint distribution of their expected readings under the assumption that the witnesses are independent, taken over all variation units.
2318 The value of 0 if and only if the witnesses are completely independent.
2320 Args:
2321 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
2322 Default value is False.
2323 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings.
2324 If not specified, then missing data is ignored (i.e., all states are 0).
2325 If "uniform", then the contribution of 1 is divided evenly over all substantive readings.
2326 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing.
2327 proportion (bool, optional): An optional flag indicating whether or not to calculate the mean IDF weight of each pair of witnesses' agreements as opposed to the total IDF weight of their agreements.
2328 Default value is False.
2329 show_ext: An optional flag indicating whether each cell in the matrix
2330 should include the number of their extant, unambiguous variation units after the number of agreements.
2331 Default value is False.
2332 transform_matrix (TransformMatrixType, optional): A TransformMatrixType option indicating how the columns of the matrix should be transformed.
2334 Returns:
2335 A NumPy MI matrix with a row and column for each witness.
2336 A list of witness ID strings.
2337 """
2338 # Populate a list of sites that will correspond to columns of the sequence alignment:
2339 substantive_variation_unit_ids = self.variation_unit_ids
2340 if drop_constant:
2341 substantive_variation_unit_ids = [
2342 vu_id
2343 for vu_id in self.variation_unit_ids
2344 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
2345 ]
2346 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
2347 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
2348 # Initialize the output array with the appropriate dimensions:
2349 witness_labels = [wit.id for wit in self.witnesses]
2350 # If the split_missing option is "proportional", then for each variation unit, keep a record of the proportion of non-missing witnesses supporting the substantive variant readings:
2351 support_proportions_by_unit = {}
2352 if split_missing == SplitMissingType.proportional:
2353 for k, vu_id in enumerate(self.variation_unit_ids):
2354 # Skip this variation unit if it is a dropped constant site:
2355 if vu_id not in substantive_variation_unit_ids_set:
2356 continue
2357 support_proportions = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id])
2358 for i, wit in enumerate(witness_labels):
2359 rdg_support = self.readings_by_witness[wit][k]
2360 for l, w in enumerate(rdg_support):
2361 support_proportions[l] += w
2362 norm = (
2363 sum(support_proportions) if sum(support_proportions) > 0 else 1.0
2364 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero
2365 for l in range(len(support_proportions)):
2366 support_proportions[l] = support_proportions[l] / norm
2367 support_proportions_by_unit[vu_id] = support_proportions
2368 # Then populate data structures mapping each variation unit's ID to normalized reading support dictionaries,
2369 # vectors of sampling probabilities for its substantive readings, and expected joint probability matrices:
2370 normalized_reading_support_dicts_by_vu_id = {}
2371 sampling_probabilities_by_vu_id = {}
2372 expected_joint_probabilities_by_vu_id = {}
2373 for k, vu_id in enumerate(self.variation_unit_ids):
2374 # Skip this variation unit if it is a dropped constant site:
2375 if vu_id not in substantive_variation_unit_ids_set:
2376 continue
2377 # Otherwise, populate normalized reading support vector dictionaries and sampling probability vectors in this unit:
2378 normalized_reading_support_by_wit = {}
2379 sampling_probabilities = [0.0] * len(self.substantive_readings_by_variation_unit_id[vu_id])
2380 for i, wit in enumerate(witness_labels):
2381 rdg_support = self.readings_by_witness[wit][k]
2382 # Check if this reading support vector represents missing data:
2383 norm = sum(rdg_support)
2384 if norm == 0:
2385 # If this reading support vector sums to 0, then this is missing data; handle it as specified:
2386 if split_missing == SplitMissingType.uniform:
2387 rdg_support = [1 / len(rdg_support) for l in range(len(rdg_support))]
2388 elif split_missing == SplitMissingType.proportional:
2389 rdg_support = [support_proportions_by_unit[vu_id][l] for l in range(len(rdg_support))]
2390 else:
2391 # Otherwise, the data is present, though it may be ambiguous; normalize the reading probabilities to sum to 1:
2392 rdg_support = [w / norm for l, w in enumerate(rdg_support)]
2393 normalized_reading_support_by_wit[wit] = rdg_support
2394 # Then add this witness's contributions to the readings' sampling probabilities:
2395 for l, w in enumerate(normalized_reading_support_by_wit[wit]):
2396 sampling_probabilities[l] += w
2397 norm = (
2398 sum(sampling_probabilities) if sum(sampling_probabilities) > 0 else 1.0
2399 ) # if this variation unit has no extant witnesses (e.g., if its only witnesses are fragmentary and we have excluded them), then assume a norm of 1 to avoid division by zero
2400 # Otherwise, normalize the sampling probabilities so they sum to 1:
2401 sampling_probabilities = [w / norm for w in sampling_probabilities]
2402 normalized_reading_support_dicts_by_vu_id[vu_id] = normalized_reading_support_by_wit
2403 sampling_probabilities_by_vu_id[vu_id] = sampling_probabilities
2404 # Then populate a contingency table for the expected probabilities of joint support in this unit:
2405 expected_joint_probabilities = np.full(
2406 (len(sampling_probabilities), len(sampling_probabilities)), 0, dtype=float
2407 )
2408 for l1 in range(len(sampling_probabilities)):
2409 for l2 in range(len(sampling_probabilities)):
2410 expected_joint_probabilities[l1, l2] = sampling_probabilities[l1] * sampling_probabilities[l2]
2411 expected_joint_probabilities_by_vu_id[vu_id] = expected_joint_probabilities
2412 # Then populate the matrix one variation unit at a time:
2413 matrix = np.full((len(witness_labels), len(witness_labels)), 0, dtype=float)
2414 with tqdm(total=len(self.witnesses) ** 2) as pbar:
2415 # Then calculate the mutual information contribution for each pair of witnesses:
2416 for i, wit_1 in enumerate(witness_labels):
2417 for j, wit_2 in enumerate(witness_labels):
2418 total_mutual_information = 0.0
2419 # The contribution to the entry for these witnesses will be identical regardless of the order in which they are specified,
2420 # so we only have to calculate it once:
2421 if i > j:
2422 pbar.update(1)
2423 continue
2424 # Otherwise, calculate the mutual information between these witnesses in each substantive variation unit:
2425 for k, vu_id in enumerate(self.variation_unit_ids):
2426 if vu_id not in substantive_variation_unit_ids_set:
2427 continue
2428 wit_1_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_1]
2429 wit_2_rdg_support = normalized_reading_support_dicts_by_vu_id[vu_id][wit_2]
2430 sampling_probabilities = sampling_probabilities_by_vu_id[vu_id]
2431 expected_joint_probabilities = expected_joint_probabilities_by_vu_id[vu_id]
2432 # If either witness has an all-zeroes vector (because it is lacunose in this unit), then we can skip these witnesses here:
2433 if sum(wit_1_rdg_support) == 0 or sum(wit_2_rdg_support) == 0:
2434 continue
2435 # Otherwise, populate a contingency table for the observed probabilities of joint support in this unit:
2436 observed_joint_probabilities = np.full(
2437 (len(sampling_probabilities), len(sampling_probabilities)), 0, dtype=float
2438 )
2439 for l1, w1 in enumerate(wit_1_rdg_support):
2440 for l2, w2 in enumerate(wit_2_rdg_support):
2441 observed_joint_probabilities[l1, l2] = w1 * w2
2442 # Then calculate the mutual information using the expected and observed distribution matrices:
2443 mutual_information = 0.0
2444 for l1 in range(len(sampling_probabilities)):
2445 for l2 in range(len(sampling_probabilities)):
2446 observed = observed_joint_probabilities[l1, l2]
2447 expected = expected_joint_probabilities[l1, l2]
2448 if observed == 0:
2449 continue
2450 mutual_information += observed * math.log2(observed / expected)
2451 # Then add this mutual information to the total for these two witnesses:
2452 total_mutual_information += mutual_information
2453 matrix[i, j] += total_mutual_information
2454 matrix[j, i] += total_mutual_information
2455 pbar.update(1)
2456 # Initialize a matrix for shared extant variation units for witnesses, and populate it if the proportion or show_ext option is specified:
2457 ext_matrix = None
2458 if proportion or show_ext:
2459 ext_matrix = self.get_ext_matrix(drop_constant=drop_constant, split_missing=split_missing)
2460 # If the proportion option is set, then divide every value in the matrix by the corresponding entry in the matrix of shared extant variation units:
2461 if proportion:
2462 proportion_matrix = np.full((len(witness_labels), len(witness_labels)), 0.0, dtype=float)
2463 np.divide(
2464 matrix, ext_matrix, out=proportion_matrix, where=(ext_matrix != 0)
2465 ) # division by 0 can occur if two witnesses have no overlapping units; leave their proportion as 0.0
2466 matrix = proportion_matrix
2467 # Then transform the columns of the main matrix as specified:
2468 matrix = self.transform_matrix(matrix, transform_matrix)
2469 # If the show_ext option is set, then append the number of shared extant variation units after the matrix's values:
2470 if show_ext:
2471 serialized_values = []
2472 for i, wit_1 in enumerate(witness_labels):
2473 serialized_values.append([])
2474 for j, wit_2 in enumerate(witness_labels):
2475 serialized_values[-1].append("/".join([str(matrix[i][j]), str(ext_matrix[i][j])]))
2476 matrix = np.array(serialized_values)
2477 return matrix, witness_labels
2479 def to_nexus_table(self, drop_constant: bool = False, ambiguous_as_missing: bool = False):
2480 """Returns this Collation in the form of a table with rows for taxa, columns for characters, and reading IDs in cells.
2482 Args:
2483 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
2484 Default value is False.
2485 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data.
2486 Default value is False.
2488 Returns:
2489 A NumPy array with rows for taxa, columns for characters, and reading IDs in cells.
2490 A list of substantive reading ID strings.
2491 A list of witness ID strings.
2492 """
2493 # Populate a list of sites that will correspond to columns of the sequence alignment:
2494 substantive_variation_unit_ids = self.variation_unit_ids
2495 if drop_constant:
2496 substantive_variation_unit_ids = [
2497 vu_id
2498 for vu_id in self.variation_unit_ids
2499 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
2500 ]
2501 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
2502 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
2503 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary
2504 # to the readings' IDs:
2505 reading_ids_by_indices = {}
2506 for j, vu in enumerate(self.variation_units):
2507 if vu.id not in substantive_variation_unit_ids_set:
2508 continue
2509 k = 0
2510 for rdg in vu.readings:
2511 key = tuple([vu.id, rdg.id])
2512 if key not in substantive_variation_unit_reading_tuples_set:
2513 continue
2514 indices = tuple([j, k])
2515 reading_ids_by_indices[indices] = rdg.id
2516 k += 1
2517 # Initialize the output array with the appropriate dimensions:
2518 missing_symbol = '?'
2519 witness_labels = [wit.id for wit in self.witnesses]
2520 matrix = np.full(
2521 (len(witness_labels), len(substantive_variation_unit_ids)), missing_symbol, dtype=object
2522 ) # use dtype=object because the maximum string length is not known up front
2523 # Then populate it with the appropriate values:
2524 with tqdm(total=len(self.witnesses)) as pbar:
2525 row_ind = 0
2526 for i, wit in enumerate(self.witnesses):
2527 col_ind = 0
2528 for j, vu in enumerate(self.variation_units):
2529 if vu.id not in substantive_variation_unit_ids_set:
2530 continue
2531 rdg_support = self.readings_by_witness[wit.id][j]
2532 # If this reading support vector sums to 0, then this is missing data; handle it as specified:
2533 if sum(rdg_support) == 0:
2534 matrix[row_ind, col_ind] = missing_symbol
2535 # Otherwise, add its coefficients normally:
2536 else:
2537 rdg_inds = [
2538 k for k, w in enumerate(rdg_support) if w > 0
2539 ] # the index list consists of the indices of all readings with any degree of certainty assigned to them
2540 # For singleton readings, just print the reading ID:
2541 if len(rdg_inds) == 1:
2542 k = rdg_inds[0]
2543 matrix[row_ind, col_ind] = reading_ids_by_indices[(j, k)]
2544 # For multiple readings, print the corresponding reading IDs in braces or the missing symbol depending on input settings:
2545 else:
2546 if ambiguous_as_missing:
2547 matrix[row_ind, col_ind] = missing_symbol
2548 else:
2549 matrix[row_ind, col_ind] = "{%s}" % " ".join(
2550 [reading_ids_by_indices[(j, k)] for k in rdg_inds]
2551 )
2552 col_ind += 1
2553 row_ind += 1
2554 pbar.update(1)
2555 return matrix, witness_labels, substantive_variation_unit_ids
2557 def to_long_table(self, drop_constant: bool = False):
2558 """Returns this Collation in the form of a long table with columns for taxa, characters, reading indices, and reading values.
2559 Note that this method treats ambiguous readings as missing data.
2561 Args:
2562 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
2563 Default value is False.
2565 Returns:
2566 A NumPy array with columns for taxa, characters, reading indices, and reading values, and rows for each combination of these values in the matrix.
2567 A list of column label strings.
2568 """
2569 # Populate a list of sites that will correspond to columns of the sequence alignment:
2570 substantive_variation_unit_ids = self.variation_unit_ids
2571 if drop_constant:
2572 substantive_variation_unit_ids = [
2573 vu_id
2574 for vu_id in self.variation_unit_ids
2575 if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
2576 ]
2577 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
2578 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
2579 # Initialize the outputs:
2580 column_labels = ["taxon", "character", "state", "value"]
2581 long_table_list = []
2582 # Populate a dictionary mapping (variation unit index, reading index) tuples to reading texts:
2583 reading_texts_by_indices = {}
2584 for j, vu in enumerate(self.variation_units):
2585 if vu.id not in substantive_variation_unit_ids_set:
2586 continue
2587 k = 0
2588 for rdg in vu.readings:
2589 key = tuple([vu.id, rdg.id])
2590 if key not in substantive_variation_unit_reading_tuples_set:
2591 continue
2592 indices = tuple([j, k])
2593 reading_texts_by_indices[indices] = rdg.text
2594 k += 1
2595 # Then populate the output list with the appropriate values:
2596 witness_labels = [wit.id for wit in self.witnesses]
2597 missing_symbol = '?'
2598 with tqdm(total=len(self.witnesses)) as pbar:
2599 for i, wit in enumerate(self.witnesses):
2600 row_ind = 0
2601 for j, vu_id in enumerate(self.variation_unit_ids):
2602 if vu_id not in substantive_variation_unit_ids_set:
2603 continue
2604 rdg_support = self.readings_by_witness[wit.id][j]
2605 # Populate a list of nonzero coefficients for this reading support vector:
2606 rdg_inds = [k for k, w in enumerate(rdg_support) if w > 0]
2607 # If this list does not consist of exactly one reading, then treat it as missing data:
2608 if len(rdg_inds) != 1:
2609 long_table_list.append([wit.id, vu_id, missing_symbol, missing_symbol])
2610 continue
2611 k = rdg_inds[0]
2612 rdg_text = reading_texts_by_indices[(j, k)]
2613 # Replace empty reading texts with the omission placeholder:
2614 if rdg_text == "":
2615 rdg_text = "om."
2616 long_table_list.append([wit.id, vu_id, k, rdg_text])
2617 pbar.update(1)
2618 # Then convert the long table entries list to a NumPy array:
2619 long_table = np.array(long_table_list)
2620 return long_table, column_labels
2622 def to_dataframe(
2623 self,
2624 drop_constant: bool = False,
2625 ambiguous_as_missing: bool = False,
2626 proportion: bool = False,
2627 table_type: TableType = TableType.matrix,
2628 split_missing: SplitMissingType = None,
2629 transform_matrix: TransformMatrixType = None,
2630 show_ext: bool = False,
2631 ):
2632 """Returns this Collation in the form of a Pandas DataFrame array, including the appropriate row and column labels.
2634 Args:
2635 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
2636 Default value is False.
2637 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data.
2638 Default value is False.
2639 proportion (bool, optional): An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units.
2640 Default value is False.
2641 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate.
2642 Only applicable for tabular outputs.
2643 Default value is "matrix".
2644 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings.
2645 If not specified, then missing data is ignored (i.e., all states are 0).
2646 If "uniform", then the contribution of 1 is divided evenly over all substantive readings.
2647 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing.
2648 Only applicable for table types "matrix" and "idf".
2649 transform_matrix (TransformMatrixType, optional): A TransformMatrixType option indicating how the columns of a witness-to-witness matrix output should be transformed.
2650 Only applicable for tabular outputs in which the rows and columns correspond to the witnesses in the collation.
2651 show_ext: An optional flag indicating whether each cell in the matrix
2652 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements.
2653 Only applicable for tabular output formats of type \"distance\" or \"similarity\".
2654 Default value is False.
2656 Returns:
2657 A Pandas DataFrame corresponding to a collation matrix with reading frequencies or a long table with discrete reading states.
2658 """
2659 df = None
2660 # Proceed based on the table type:
2661 if table_type == TableType.matrix:
2662 # Convert the collation to a NumPy array and get its row and column labels first:
2663 matrix, reading_labels, witness_labels = self.to_numpy(
2664 drop_constant=drop_constant, split_missing=split_missing
2665 )
2666 df = pd.DataFrame(matrix, index=reading_labels, columns=witness_labels)
2667 elif table_type == TableType.distance:
2668 # Convert the collation to a NumPy array and get its row and column labels first:
2669 matrix, witness_labels = self.to_distance_matrix(
2670 drop_constant=drop_constant, proportion=proportion, transform_matrix=transform_matrix, show_ext=show_ext
2671 )
2672 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels)
2673 elif table_type == TableType.similarity:
2674 # Convert the collation to a NumPy array and get its row and column labels first:
2675 matrix, witness_labels = self.to_similarity_matrix(
2676 drop_constant=drop_constant, proportion=proportion, transform_matrix=transform_matrix, show_ext=show_ext
2677 )
2678 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels)
2679 elif table_type == TableType.idf:
2680 # Convert the collation to a NumPy array and get its row and column labels first:
2681 matrix, witness_labels = self.to_idf_matrix(
2682 drop_constant=drop_constant,
2683 split_missing=split_missing,
2684 proportion=proportion,
2685 transform_matrix=transform_matrix,
2686 show_ext=show_ext,
2687 )
2688 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels)
2689 elif table_type == TableType.mi:
2690 # Convert the collation to a NumPy array and get its row and column labels first:
2691 matrix, witness_labels = self.to_mi_matrix(
2692 drop_constant=drop_constant,
2693 split_missing=split_missing,
2694 proportion=proportion,
2695 transform_matrix=transform_matrix,
2696 show_ext=show_ext,
2697 )
2698 df = pd.DataFrame(matrix, index=witness_labels, columns=witness_labels)
2699 elif table_type == TableType.nexus:
2700 # Convert the collation to a NumPy array and get its row and column labels first:
2701 matrix, witness_labels, vu_labels = self.to_nexus_table(
2702 drop_constant=drop_constant, ambiguous_as_missing=ambiguous_as_missing
2703 )
2704 df = pd.DataFrame(matrix, index=witness_labels, columns=vu_labels)
2705 elif table_type == TableType.long:
2706 # Convert the collation to a long table and get its column labels first:
2707 long_table, column_labels = self.to_long_table(drop_constant=drop_constant)
2708 df = pd.DataFrame(long_table, columns=column_labels)
2709 return df
2711 def to_csv(
2712 self,
2713 file_addr: Union[Path, str],
2714 drop_constant: bool = False,
2715 ambiguous_as_missing: bool = False,
2716 proportion: bool = False,
2717 table_type: TableType = TableType.matrix,
2718 split_missing: SplitMissingType = None,
2719 transform_matrix: TransformMatrixType = None,
2720 show_ext: bool = False,
2721 **kwargs
2722 ):
2723 """Writes this Collation to a comma-separated value (CSV) file with the given address.
2725 If your witness IDs are numeric (e.g., Gregory-Aland numbers), then they will be written in full to the CSV file, but Excel will likely interpret them as numbers and truncate any leading zeroes!
2727 Args:
2728 file_addr: A string representing the path to an output CSV file; the file type should be .csv.
2729 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading.
2730 Default value is False.
2731 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data.
2732 Default value is False.
2733 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units.
2734 Default value is False.
2735 table_type: A TableType option indicating which type of tabular output to generate.
2736 Only applicable for tabular outputs.
2737 Default value is "matrix".
2738 split_missing: An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings.
2739 If not specified, then missing data is ignored (i.e., all states are 0).
2740 If "uniform", then the contribution of 1 is divided evenly over all substantive readings.
2741 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing.
2742 Only applicable for table types "matrix" and "idf".
2743 transform_matrix: A TransformMatrixType option indicating how the columns of a witness-to-witness matrix output should be transformed.
2744 Only applicable for tabular outputs in which the rows and columns correspond to the witnesses in the collation.
2745 show_ext: An optional flag indicating whether each cell in the matrix
2746 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements.
2747 Only applicable for tabular output formats of type \"distance\" or \"similarity\".
2748 Default value is False.
2749 **kwargs: Keyword arguments for pandas.DataFrame.to_csv.
2750 """
2751 # Convert the collation to a Pandas DataFrame first:
2752 df = self.to_dataframe(
2753 drop_constant=drop_constant,
2754 ambiguous_as_missing=ambiguous_as_missing,
2755 proportion=proportion,
2756 table_type=table_type,
2757 split_missing=split_missing,
2758 show_ext=show_ext,
2759 transform_matrix=transform_matrix,
2760 )
2761 # Generate all parent folders for this file that don't already exist:
2762 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
2763 # Proceed based on the table type:
2764 if table_type == TableType.long:
2765 return df.to_csv(
2766 file_addr, encoding="utf-8-sig", index=False, **kwargs
2767 ) # add BOM to start of file so that Excel will know to read it as Unicode
2768 return df.to_csv(
2769 file_addr, encoding="utf-8-sig", **kwargs
2770 ) # add BOM to start of file so that Excel will know to read it as Unicode
2772 def to_excel(
2773 self,
2774 file_addr: Union[Path, str],
2775 drop_constant: bool = False,
2776 ambiguous_as_missing: bool = False,
2777 proportion: bool = False,
2778 table_type: TableType = TableType.matrix,
2779 split_missing: SplitMissingType = None,
2780 transform_matrix: TransformMatrixType = None,
2781 show_ext: bool = False,
2782 ):
2783 """Writes this Collation to an Excel (.xlsx) file with the given address.
2785 Since Pandas is deprecating its support for xlwt, specifying an output in old Excel (.xls) output is not recommended.
2787 Args:
2788 file_addr: A string representing the path to an output Excel file; the file type should be .xlsx.
2789 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading.
2790 Default value is False.
2791 ambiguous_as_missing: An optional flag indicating whether to treat all ambiguous states as missing data.
2792 Default value is False.
2793 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units.
2794 Default value is False.
2795 table_type: A TableType option indicating which type of tabular output to generate.
2796 Only applicable for tabular outputs.
2797 Default value is "matrix".
2798 split_missing: An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings.
2799 If not specified, then missing data is ignored (i.e., all states are 0).
2800 If "uniform", then the contribution of 1 is divided evenly over all substantive readings.
2801 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing.
2802 Only applicable for table types "matrix" and "idf".
2803 transform_matrix: A TransformMatrixType option indicating how the columns of a witness-to-witness matrix output should be transformed.
2804 Only applicable for tabular outputs in which the rows and columns correspond to the witnesses in the collation.
2805 show_ext: An optional flag indicating whether each cell in the matrix
2806 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements.
2807 Only applicable for tabular output formats of type \"distance\" or \"similarity\".
2808 Default value is False.
2809 """
2810 # Convert the collation to a Pandas DataFrame first:
2811 df = self.to_dataframe(
2812 drop_constant=drop_constant,
2813 ambiguous_as_missing=ambiguous_as_missing,
2814 proportion=proportion,
2815 table_type=table_type,
2816 split_missing=split_missing,
2817 show_ext=show_ext,
2818 transform_matrix=transform_matrix,
2819 )
2820 # Generate all parent folders for this file that don't already exist:
2821 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
2822 # Proceed based on the table type:
2823 if table_type == TableType.long:
2824 return df.to_excel(file_addr, index=False)
2825 return df.to_excel(file_addr)
2827 def to_phylip_matrix(
2828 self,
2829 file_addr: Union[Path, str],
2830 drop_constant: bool = False,
2831 proportion: bool = False,
2832 table_type: TableType = TableType.distance,
2833 show_ext: bool = False,
2834 ):
2835 """Writes this Collation as a PHYLIP-formatted distance/similarity matrix to the file with the given address.
2837 Args:
2838 file_addr: A string representing the path to an output PHYLIP file; the file type should be .ph or .phy.
2839 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading.
2840 Default value is False.
2841 proportion: An optional flag indicating whether or not to calculate distances as proportions over extant, unambiguous variation units.
2842 Default value is False.
2843 table_type: A TableType option indicating which type of tabular output to generate.
2844 For PHYLIP-formatted outputs, distance and similarity matrices are the only supported table types.
2845 Default value is "distance".
2846 show_ext: An optional flag indicating whether each cell in the matrix
2847 should include the number of their extant, unambiguous variation units after the number of their disagreements/agreements.
2848 Only applicable for tabular output formats of type \"distance\" or \"similarity\".
2849 Default value is False.
2850 """
2851 # Convert the collation to a Pandas DataFrame first:
2852 matrix = None
2853 witness_labels = []
2854 # Proceed based on the table type:
2855 if table_type == TableType.distance:
2856 # Convert the collation to a NumPy array and get its row and column labels first:
2857 matrix, witness_labels = self.to_distance_matrix(
2858 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext
2859 )
2860 elif table_type == TableType.similarity:
2861 # Convert the collation to a NumPy array and get its row and column labels first:
2862 matrix, witness_labels = self.to_similarity_matrix(
2863 drop_constant=drop_constant, proportion=proportion, show_ext=show_ext
2864 )
2865 # Generate all parent folders for this file that don't already exist:
2866 Path(file_addr).parent.mkdir(parents=True, exist_ok=True)
2867 with open(file_addr, "w", encoding="utf-8") as f:
2868 # The first line contains the number of taxa:
2869 f.write("%d\n" % len(witness_labels))
2870 # Every subsequent line contains a witness label, followed by the values in its row of the matrix:
2871 for i, wit_id in enumerate(witness_labels):
2872 wit_label = slugify(wit_id, lowercase=False, allow_unicode=True, separator='_')
2873 f.write("%s %s\n" % (wit_label, " ".join([str(v) for v in matrix[i]])))
2874 return
2876 def get_stemma_symbols(self):
2877 """Returns a list of one-character symbols needed to represent the states of all substantive readings in stemma format.
2879 The number of symbols equals the maximum number of substantive readings at any variation unit.
2881 Returns:
2882 A list of individual characters representing states in readings.
2883 """
2884 possible_symbols = (
2885 list(string.digits) + list(string.ascii_lowercase) + list(string.ascii_uppercase)
2886 ) # NOTE: the maximum number of symbols allowed in stemma format (other than "?" and "-") is 62
2887 # The number of symbols needed is equal to the length of the longest substantive reading vector:
2888 nsymbols = 0
2889 # If there are no witnesses, then no symbols are needed at all:
2890 if len(self.witnesses) == 0:
2891 return []
2892 wit_id = self.witnesses[0].id
2893 for rdg_support in self.readings_by_witness[wit_id]:
2894 nsymbols = max(nsymbols, len(rdg_support))
2895 stemma_symbols = possible_symbols[:nsymbols]
2896 return stemma_symbols
2898 def to_stemma(self, file_addr: Union[Path, str]):
2899 """Writes this Collation to a stemma file without an extension and a Chron file (containing low, middle, and high dates for all witnesses) without an extension.
2901 Since this format does not support ambiguous states, all reading vectors with anything other than one nonzero entry will be interpreted as lacunose.
2902 If an interpGrp for weights is specified in the TEI XML collation, then the weights for the interp elements will be used as weights
2903 for the variation units that specify them in their ana attribute.
2905 Args:
2906 file_addr: A string representing the path to an output stemma prep file; the file should have no extension.
2907 The accompanying chron file will match this file name, except that it will have "_chron" appended to the end.
2908 drop_constant: An optional flag indicating whether to ignore variation units with one substantive reading.
2909 """
2910 # Populate a list of sites that will correspond to columns of the sequence alignment
2911 # (by default, constant sites are dropped):
2912 substantive_variation_unit_ids = [
2913 vu_id for vu_id in self.variation_unit_ids if len(self.substantive_readings_by_variation_unit_id[vu_id]) > 1
2914 ]
2915 substantive_variation_unit_ids_set = set(substantive_variation_unit_ids)
2916 substantive_variation_unit_reading_tuples_set = set(self.substantive_variation_unit_reading_tuples)
2917 # In a first pass, populate a dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary
2918 # to the readings' texts:
2919 reading_texts_by_indices = {}
2920 for j, vu in enumerate(self.variation_units):
2921 if vu.id not in substantive_variation_unit_ids_set:
2922 continue
2923 k = 0
2924 for rdg in vu.readings:
2925 key = tuple([vu.id, rdg.id])
2926 if key not in substantive_variation_unit_reading_tuples_set:
2927 continue
2928 indices = tuple([j, k])
2929 reading_texts_by_indices[indices] = rdg.text
2930 k += 1
2931 # In a second pass, populate another dictionary mapping (variation unit index, reading index) tuples from the readings_by_witness dictionary
2932 # to the witnesses exclusively supporting those readings:
2933 reading_wits_by_indices = {}
2934 for indices in reading_texts_by_indices:
2935 reading_wits_by_indices[indices] = []
2936 for i, wit in enumerate(self.witnesses):
2937 for j, vu_id in enumerate(self.variation_unit_ids):
2938 if vu_id not in substantive_variation_unit_ids_set:
2939 continue
2940 rdg_support = self.readings_by_witness[wit.id][j]
2941 # If this witness does not exclusively support exactly one reading at this unit, then treat it as lacunose:
2942 if len([k for k, w in enumerate(rdg_support) if w > 0]) != 1:
2943 continue
2944 k = rdg_support.index(1)
2945 indices = tuple([j, k])
2946 reading_wits_by_indices[indices].append(wit.id)
2947 # In a third pass, write to the stemma file:
2948 symbols = self.get_stemma_symbols()
2949 Path(file_addr).parent.mkdir(
2950 parents=True, exist_ok=True
2951 ) # generate all parent folders for this file that don't already exist
2952 chron_file_addr = str(file_addr) + "_chron"
2953 with open(file_addr, "w", encoding="utf-8") as f:
2954 # Start with the witness list:
2955 f.write(
2956 "* %s ;\n\n"
2957 % " ".join(
2958 [slugify(wit.id, lowercase=False, allow_unicode=True, separator='_') for wit in self.witnesses]
2959 )
2960 )
2961 # f.write("^ %s\n\n" % chron_file_addr) #write the relative path to the chron file
2962 f.write(
2963 "^ %s\n\n" % ("." + os.sep + Path(chron_file_addr).name)
2964 ) # write the relative path to the chron file
2965 # Then add a line indicating that all witnesses are lacunose unless they are specified explicitly:
2966 f.write("= $? $* ;\n\n")
2967 with tqdm(total=len(self.variation_unit_ids)) as pbar:
2968 # Then proceed for each variation unit:
2969 for j, vu_id in enumerate(self.variation_unit_ids):
2970 if vu_id not in substantive_variation_unit_ids_set:
2971 pbar.update(1)
2972 continue
2973 # Print the variation unit ID first:
2974 f.write("@ %s\n" % vu_id)
2975 # In a first pass, print the texts of all readings enclosed in brackets:
2976 f.write("[ ")
2977 k = 0
2978 while True:
2979 indices = tuple([j, k])
2980 if indices not in reading_texts_by_indices:
2981 break
2982 text = slugify(
2983 reading_texts_by_indices[indices], lowercase=False, allow_unicode=True, separator='.'
2984 )
2985 # Denote omissions by en-dashes:
2986 if text == "":
2987 text = "\u2013"
2988 # The first reading should not be preceded by anything:
2989 if k == 0:
2990 f.write(text)
2991 f.write(" |")
2992 # Add the weight of this variation unit after the pipe by comparing its analysis categories to their weights:
2993 weight = 1
2994 vu = self.variation_units[j]
2995 if len(vu.analysis_categories) > 0:
2996 weight = int(
2997 sum(
2998 [
2999 self.weights_by_id[ana] if ana in self.weights_by_id else 1
3000 for ana in vu.analysis_categories
3001 ]
3002 )
3003 / len(vu.analysis_categories)
3004 )
3005 f.write("*%d" % weight)
3006 # Every subsequent reading should be preceded by a space:
3007 elif k > 0:
3008 f.write(" %s" % text)
3009 k += 1
3010 f.write(" ]\n")
3011 # In a second pass, print the indices and witnesses for all readings enclosed in angle brackets:
3012 k = 0
3013 f.write("\t< ")
3014 while True:
3015 indices = tuple([j, k])
3016 if indices not in reading_wits_by_indices:
3017 break
3018 rdg_symbol = symbols[k] # get the one-character alphanumeric code for this state
3019 wits = " ".join(reading_wits_by_indices[indices])
3020 # Open the variant reading support block with an angle bracket:
3021 if k == 0:
3022 f.write("%s %s" % (rdg_symbol, wits))
3023 # Open all subsequent variant reading support blocks with pipes on the next line:
3024 else:
3025 f.write("\n\t| %s %s" % (rdg_symbol, wits))
3026 k += 1
3027 f.write(" >\n")
3028 pbar.update(1)
3029 # In a fourth pass, write to the chron file:
3030 max_id_length = max(
3031 [len(slugify(wit.id, lowercase=False, allow_unicode=True, separator='_')) for wit in self.witnesses]
3032 )
3033 max_date_length = 0
3034 for wit in self.witnesses:
3035 if wit.date_range[0] is not None:
3036 max_date_length = max(max_date_length, len(str(wit.date_range[0])))
3037 if wit.date_range[1] is not None:
3038 max_date_length = max(max_date_length, len(str(wit.date_range[1])))
3039 # Attempt to get the minimum and maximum dates for witnesses; if we can't do this, then don't write a chron file:
3040 min_date = None
3041 max_date = None
3042 try:
3043 min_date = min([wit.date_range[0] for wit in self.witnesses if wit.date_range[0] is not None])
3044 max_date = max([wit.date_range[1] for wit in self.witnesses if wit.date_range[1] is not None])
3045 except Exception as e:
3046 print("WARNING: no witnesses have date ranges; no chron file will be written!")
3047 return
3048 with open(chron_file_addr, "w", encoding="utf-8") as f:
3049 for wit in self.witnesses:
3050 wit_label = slugify(wit.id, lowercase=False, allow_unicode=True, separator='_')
3051 f.write(wit_label)
3052 f.write(" " * (max_id_length - len(wit.id) + 1))
3053 # If either the lower bound on this witness's date is empty, then use the min and max dates over all witnesses as defaults:
3054 date_range = wit.date_range
3055 if date_range[0] is None:
3056 date_range = tuple([min_date, date_range[1]])
3057 # Then write the date range minimum, average, and maximum to the chron file:
3058 low_date = str(date_range[0])
3059 f.write(" " * (max_date_length - len(low_date) + 2))
3060 f.write(low_date)
3061 avg_date = str(int(((date_range[0] + date_range[1]) / 2)))
3062 f.write(" " * (max_date_length - len(str(avg_date)) + 2))
3063 f.write(avg_date)
3064 high_date = str(date_range[1])
3065 f.write(" " * (max_date_length - len(high_date) + 2))
3066 f.write(high_date)
3067 f.write("\n")
3068 return
3070 def to_file(
3071 self,
3072 file_addr: Union[Path, str],
3073 format: Format = None,
3074 drop_constant: bool = False,
3075 split_missing: SplitMissingType = None,
3076 char_state_labels: bool = True,
3077 frequency: bool = False,
3078 ambiguous_as_missing: bool = False,
3079 proportion: bool = False,
3080 calibrate_dates: bool = False,
3081 mrbayes: bool = False,
3082 clock_model: ClockModel = ClockModel.strict,
3083 ancestral_logger: AncestralLogger = AncestralLogger.state,
3084 table_type: TableType = TableType.matrix,
3085 transform_matrix: TransformMatrixType = None,
3086 show_ext: bool = False,
3087 seed: int = None,
3088 ):
3089 """Writes this Collation to the file with the given address.
3091 Args:
3092 file_addr (Union[Path, str]): The path to the output file.
3093 format (Format, optional): The desired output format.
3094 If None then it is infered from the file suffix.
3095 Defaults to None.
3096 drop_constant (bool, optional): An optional flag indicating whether to ignore variation units with one substantive reading.
3097 Default value is False.
3098 split_missing (SplitMissingType, optional): An option indicating whether or not to treat missing characters/variation units as having a contribution of 1 split over all states/readings.
3099 If not specified, then missing data is ignored (i.e., all states are 0).
3100 If "uniform", then the contribution of 1 is divided evenly over all substantive readings.
3101 If "proportional", then the contribution of 1 is divided between the readings in proportion to their support among the witnesses that are not missing.
3102 Only applicable for tabular outputs of type "matrix", "idf", "mean-idf", "mi", and "mean-mi".
3103 char_state_labels (bool, optional): An optional flag indicating whether to print
3104 the CharStateLabels block in NEXUS output.
3105 Default value is True.
3106 frequency (bool, optional): An optional flag indicating whether to use the StatesFormat=Frequency setting
3107 instead of the StatesFormat=StatesPresent setting
3108 (and thus represent all states with frequency vectors rather than symbols)
3109 in NEXUS output.
3110 Note that this setting is necessary to make use of certainty degrees assigned to multiple ambiguous states in the collation.
3111 Default value is False.
3112 ambiguous_as_missing (bool, optional): An optional flag indicating whether to treat all ambiguous states as missing data.
3113 If this flag is set, then only base symbols will be generated for the NEXUS file.
3114 It is only applied if the frequency option is False.
3115 Default value is False.
3116 proportion (bool, optional): An optional flag indicating whether to populate a distance matrix's cells
3117 with a proportion of disagreements to variation units where both witnesses are extant.
3118 It is only applied if the table_type option is "distance".
3119 Default value is False.
3120 calibrate_dates (bool, optional): An optional flag indicating whether to add an Assumptions block that specifies date distributions for witnesses
3121 in NEXUS output.
3122 This option is intended for inputs to BEAST 2.
3123 Default value is False.
3124 mrbayes (bool, optional): An optional flag indicating whether to add a MrBayes block that specifies model settings and age calibrations for witnesses
3125 in NEXUS output.
3126 This option is intended for inputs to MrBayes.
3127 Default value is False.
3128 clock_model (ClockModel, optional): A ClockModel option indicating which type of clock model to use.
3129 This option is intended for inputs to MrBayes and BEAST 2.
3130 MrBayes does not presently support a local clock model, so it will default to a strict clock model if a local clock model is specified.
3131 Default value is "strict".
3132 ancestral_logger (AncestralLogger, optional): An AncestralLogger option indicating which class of logger (if any) to use for ancestral states.
3133 This option is intended for inputs to BEAST 2.
3134 table_type (TableType, optional): A TableType option indicating which type of tabular output to generate.
3135 Only applicable for tabular outputs and PHYLIP outputs.
3136 If the output is a PHYLIP file, then the type of tabular output must be "distance" or "similarity"; otherwise, it will be ignored.
3137 Default value is "matrix".
3138 transform_matrix (TransformMatrixType, optional): A TransformMatrixType option indicating how the columns of a witness-to-witness matrix output should be transformed.
3139 Only applicable for tabular outputs in which the rows and columns correspond to the witnesses in the collation.
3140 show_ext (bool, optional): An optional flag indicating whether each cell in the matrix
3141 should include the number of variation units where both witnesses are extant after the number of their disagreements/agreements.
3142 Only applicable for tabular output formats of type "distance" or "similarity".
3143 Default value is False.
3144 seed (optional, int): A seed for random number generation (for setting initial values of unspecified transcriptional rates in BEAST 2 XML output).
3145 """
3146 file_addr = Path(file_addr)
3147 format = format or Format.infer(
3148 file_addr.suffix
3149 ) # an exception will be raised here if the format or suffix is invalid
3151 if format == Format.NEXUS:
3152 return self.to_nexus(
3153 file_addr,
3154 drop_constant=drop_constant,
3155 char_state_labels=char_state_labels,
3156 frequency=frequency,
3157 ambiguous_as_missing=ambiguous_as_missing,
3158 calibrate_dates=calibrate_dates,
3159 mrbayes=mrbayes,
3160 clock_model=clock_model,
3161 )
3163 if format == format.HENNIG86:
3164 return self.to_hennig86(file_addr, drop_constant=drop_constant)
3166 if format == format.PHYLIP:
3167 if table_type in [TableType.distance, TableType.similarity]:
3168 return self.to_phylip_matrix(
3169 file_addr,
3170 drop_constant=drop_constant,
3171 proportion=proportion,
3172 table_type=table_type,
3173 show_ext=show_ext,
3174 )
3175 return self.to_phylip(file_addr, drop_constant=drop_constant)
3177 if format == format.FASTA:
3178 return self.to_fasta(file_addr, drop_constant=drop_constant)
3180 if format == format.BEAST:
3181 return self.to_beast(
3182 file_addr,
3183 drop_constant=drop_constant,
3184 clock_model=clock_model,
3185 ancestral_logger=ancestral_logger,
3186 seed=seed,
3187 )
3189 if format == Format.CSV:
3190 return self.to_csv(
3191 file_addr,
3192 drop_constant=drop_constant,
3193 ambiguous_as_missing=ambiguous_as_missing,
3194 proportion=proportion,
3195 table_type=table_type,
3196 split_missing=split_missing,
3197 transform_matrix=transform_matrix,
3198 show_ext=show_ext,
3199 )
3201 if format == Format.TSV:
3202 return self.to_csv(
3203 file_addr,
3204 drop_constant=drop_constant,
3205 ambiguous_as_missing=ambiguous_as_missing,
3206 proportion=proportion,
3207 table_type=table_type,
3208 split_missing=split_missing,
3209 transform_matrix=transform_matrix,
3210 show_ext=show_ext,
3211 sep="\t",
3212 )
3214 if format == Format.EXCEL:
3215 return self.to_excel(
3216 file_addr,
3217 drop_constant=drop_constant,
3218 ambiguous_as_missing=ambiguous_as_missing,
3219 proportion=proportion,
3220 table_type=table_type,
3221 split_missing=split_missing,
3222 transform_matrix=transform_matrix,
3223 show_ext=show_ext,
3224 )
3226 if format == Format.STEMMA:
3227 return self.to_stemma(file_addr)