Source code for betty.model.ancestry

"""
Provide Betty's main data model.
"""

from __future__ import annotations

from collections.abc import MutableSequence
from contextlib import suppress
from enum import Enum
from pathlib import Path
from reprlib import recursive_repr
from typing import Iterable, Any, TYPE_CHECKING
from urllib.parse import quote

from geopy import Point

from betty.classtools import repr_instance
from betty.json.linked_data import (
    LinkedDataDumpable,
    dump_context,
    dump_link,
    add_json_ld,
)
from betty.json.schema import add_property, ref_json_schema
from betty.locale import Localized, Datey, Str, Localizable, ref_datey
from betty.media_type import MediaType
from betty.model import (
    many_to_many,
    Entity,
    one_to_many,
    many_to_one,
    many_to_one_to_many,
    MultipleTypesEntityCollection,
    EntityCollection,
    UserFacingEntity,
    EntityTypeAssociationRegistry,
    GeneratedEntityId,
    get_entity_type_name,
)
from betty.model.event_type import EventType, UnknownEventType
from betty.serde.dump import DictDump, Dump, dump_default
from betty.string import camel_case_to_kebab_case
from betty.warnings import deprecated

if TYPE_CHECKING:
    from betty.app import App


[docs] class Privacy(Enum): PUBLIC = 1 PRIVATE = 2 UNDETERMINED = 3
[docs] class HasPrivacy(LinkedDataDumpable): def __init__( self, *args: Any, privacy: Privacy | None = None, public: bool | None = None, private: bool | None = None, **kwargs: Any, ): super().__init__(*args, **kwargs) if [privacy, public, private].count(None) < 2: raise ValueError( f"Only one of the `privacy`, `public`, and `private` arguments to {type(self)}.__init__() may be given at a time." ) if privacy is not None: self._privacy = privacy elif public is True: self._privacy = Privacy.PUBLIC elif private is True: self._privacy = Privacy.PRIVATE else: self._privacy = Privacy.UNDETERMINED @property def own_privacy(self) -> Privacy: return self._privacy def _get_effective_privacy(self) -> Privacy: return self.own_privacy @property def privacy(self) -> Privacy: return self._get_effective_privacy() @privacy.setter def privacy(self, privacy: Privacy) -> None: self._privacy = privacy @privacy.deleter def privacy(self) -> None: self.privacy = Privacy.UNDETERMINED @property def private(self) -> bool: return self.privacy is Privacy.PRIVATE @private.setter def private(self, private: True) -> None: self.privacy = Privacy.PRIVATE @property def public(self) -> bool: # Undetermined privacy defaults to public. return self.privacy is not Privacy.PRIVATE @public.setter def public(self, public: True) -> None: self.privacy = Privacy.PUBLIC
[docs] async def dump_linked_data(self, app: App) -> DictDump[Dump]: dump = await super().dump_linked_data(app) dump["private"] = self.private return dump
[docs] @classmethod async def linked_data_schema(cls, app: App) -> DictDump[Dump]: schema = await super().linked_data_schema(app) add_property( schema, "private", { "$ref": "#/definitions/privacy", }, ) definitions = dump_default(schema, "definitions", dict) if "privacy" not in definitions: definitions["privacy"] = { "type": "boolean", "description": "Whether this entity is private (true), or public (false).", } return schema
[docs] def is_private(target: Any) -> bool: """ Check if the given target is private. """ if isinstance(target, HasPrivacy): return target.private return False
[docs] def is_public(target: Any) -> bool: """ Check if the given target is public. """ if isinstance(target, HasPrivacy): return target.public return True
[docs] def resolve_privacy(privacy: Privacy | HasPrivacy | None) -> Privacy: """ Resolve the privacy of a value. """ if privacy is None: return Privacy.UNDETERMINED if isinstance(privacy, Privacy): return privacy return privacy.privacy
[docs] def merge_privacies(*privacies: Privacy | HasPrivacy | None) -> Privacy: """ Merge multiple privacies into one. """ privacies = {resolve_privacy(privacy) for privacy in privacies} if Privacy.PRIVATE in privacies: return Privacy.PRIVATE if Privacy.UNDETERMINED in privacies: return Privacy.UNDETERMINED return Privacy.PUBLIC
[docs] class Dated(LinkedDataDumpable): def __init__( self, *args: Any, date: Datey | None = None, **kwargs: Any, ): super().__init__(*args, **kwargs) self.date = date
[docs] async def dump_linked_data(self, app: App) -> DictDump[Dump]: dump = await super().dump_linked_data(app) if self.date and is_public(self): dump["date"] = await self.date.dump_linked_data(app) return dump
[docs] @classmethod async def linked_data_schema(cls, app: App) -> DictDump[Dump]: schema = await super().linked_data_schema(app) schema["type"] = "object" schema["additionalProperties"] = False add_property(schema, "date", await ref_datey(schema, app), False) return schema
[docs] class Described(LinkedDataDumpable): def __init__( self, *args: Any, description: str | None = None, **kwargs: Any, ): super().__init__(*args, **kwargs) self.description = description
[docs] async def dump_linked_data(self, app: App) -> DictDump[Dump]: dump = await super().dump_linked_data(app) if self.description is not None: dump["description"] = self.description dump_context(dump, description="description") return dump
[docs] @classmethod async def linked_data_schema(cls, app: App) -> DictDump[Dump]: schema = await super().linked_data_schema(app) add_property( schema, "description", { "$ref": "#/definitions/description", }, False, ) definitions = dump_default(schema, "definitions", dict) if "description" not in definitions: definitions["description"] = { "type": "string", } return schema
[docs] class HasMediaType(LinkedDataDumpable): def __init__( self, *args: Any, media_type: MediaType | None = None, **kwargs: Any, ): super().__init__(*args, **kwargs) self.media_type = media_type
[docs] async def dump_linked_data(self, app: App) -> DictDump[Dump]: dump = await super().dump_linked_data(app) if is_public(self): if self.media_type is not None: dump["mediaType"] = str(self.media_type) return dump
[docs] @classmethod async def linked_data_schema(cls, app: App) -> DictDump[Dump]: schema = await super().linked_data_schema(app) add_property(schema, "mediaType", ref_media_type(schema), False) return schema
[docs] def ref_media_type(root_schema: DictDump[Dump]) -> DictDump[Dump]: """ Reference the MediaType schema. """ definitions = dump_default(root_schema, "definitions", dict) if "mediaType" not in definitions: definitions["mediaType"] = { "type": "string", "description": "An IANA media type (https://www.iana.org/assignments/media-types/media-types.xhtml).", } return { "$ref": "#/definitions/mediaType", }
[docs] class HasLinksEntity(HasLinks):
[docs] async def dump_linked_data( # type: ignore[misc] self: HasLinksEntity & Entity, app: App, ) -> DictDump[Dump]: dump: DictDump[Dump] = await super().dump_linked_data(app) # type: ignore[misc] if not isinstance(self.id, GeneratedEntityId): await dump_link( dump, app, Link( app.static_url_generator.generate( f"/{camel_case_to_kebab_case(get_entity_type_name(self.type))}/{self.id}/index.json" ), relationship="canonical", media_type=MediaType("application/ld+json"), ), ) if is_public(self): await dump_link( dump, app, *( Link( app.url_generator.generate( self, media_type="text/html", locale=locale ), relationship="alternate", media_type=MediaType("text/html"), locale=locale, ) for locale in app.project.configuration.locales ), ) return dump
[docs] @many_to_one("entity", "betty.model.ancestry.HasNotes", "notes") class Note(UserFacingEntity, HasPrivacy, HasLinksEntity, Entity): entity: HasNotes def __init__( self, text: str, *, id: str | None = None, entity: HasNotes | None = None, privacy: Privacy | None = None, public: bool | None = None, private: bool | None = None, ): super().__init__( id, privacy=privacy, public=public, private=private, ) self._text = text if entity is not None: self.entity = entity
[docs] @classmethod def entity_type_label(cls) -> Str: return Str._("Note")
[docs] @classmethod def entity_type_label_plural(cls) -> Str: return Str._("Notes")
@property def text(self) -> str: return self._text @property def label(self) -> Str: return Str.plain(self.text)
[docs] async def dump_linked_data(self, app: App) -> DictDump[Dump]: dump = await super().dump_linked_data(app) dump["@type"] = "https://schema.org/Thing" if self.public: dump["text"] = self.text return dump
[docs] @classmethod async def linked_data_schema(cls, app: App) -> DictDump[Dump]: schema = await super().linked_data_schema(app) add_property(schema, "text", {"type": "string"}, False) return schema
[docs] @one_to_many("notes", "betty.model.ancestry.Note", "entity") class HasNotes(LinkedDataDumpable): def __init__( # type: ignore[misc] self: HasNotes & Entity, *args: Any, notes: Iterable[Note] | None = None, **kwargs: Any, ): super().__init__( # type: ignore[misc] *args, **kwargs, ) if notes is not None: self.notes = notes # type: ignore[assignment] @property def notes(self) -> EntityCollection[Note]: # type: ignore[empty-body] pass @notes.setter def notes(self, notes: Iterable[Note]) -> None: pass @notes.deleter def notes(self) -> None: pass
[docs] async def dump_linked_data(self, app: App) -> DictDump[Dump]: dump = await super().dump_linked_data(app) dump["notes"] = [ app.static_url_generator.generate(f"/note/{quote(note.id)}/index.json") for note in self.notes if not isinstance(note.id, GeneratedEntityId) ] return dump
[docs] @classmethod async def linked_data_schema(cls, app: App) -> DictDump[Dump]: schema = await super().linked_data_schema(app) add_property( schema, "notes", { "$ref": "#/definitions/entity/noteCollection", }, ) return schema
[docs] @many_to_many("citations", "betty.model.ancestry.Citation", "facts") class HasCitations(LinkedDataDumpable): def __init__( # type: ignore[misc] self: HasCitations & Entity, *args: Any, citations: Iterable[Citation] | None = None, **kwargs: Any, ): super().__init__( # type: ignore[misc] *args, **kwargs, ) if citations is not None: self.citations = citations # type: ignore[assignment] @property def citations(self) -> EntityCollection[Citation]: # type: ignore[empty-body] pass @citations.setter def citations(self, citations: Iterable[Citation]) -> None: pass @citations.deleter def citations(self) -> None: pass
[docs] async def dump_linked_data(self, app: App) -> DictDump[Dump]: dump = await super().dump_linked_data(app) dump["citations"] = [ app.static_url_generator.generate( f"/citation/{quote(citation.id)}/index.json" ) for citation in self.citations if not isinstance(citation.id, GeneratedEntityId) ] return dump
[docs] @classmethod async def linked_data_schema(cls, app: App) -> DictDump[Dump]: schema = await super().linked_data_schema(app) add_property( schema, "citations", { "$ref": "#/definitions/entity/citationCollection", }, ) return schema
[docs] @many_to_many("entities", "betty.model.ancestry.HasFiles", "files") class File( Described, HasPrivacy, HasLinksEntity, HasMediaType, HasNotes, HasCitations, UserFacingEntity, Entity, ): def __init__( self, path: Path, *, id: str | None = None, media_type: MediaType | None = None, description: str | None = None, notes: Iterable[Note] | None = None, citations: Iterable[Citation] | None = None, privacy: Privacy | None = None, public: bool | None = None, private: bool | None = None, links: MutableSequence[Link] | None = None, ): super().__init__( id, media_type=media_type, description=description, notes=notes, citations=citations, privacy=privacy, public=public, private=private, links=links, ) self._path = path @property def entities(self) -> EntityCollection[Entity]: # type: ignore[empty-body] pass @entities.setter def entities(self, entities: Iterable[Entity]) -> None: pass @entities.deleter def entities(self) -> None: pass
[docs] @classmethod def entity_type_label(cls) -> Str: return Str._("File")
[docs] @classmethod def entity_type_label_plural(cls) -> Str: return Str._("Files")
@property def path(self) -> Path: return self._path @property def label(self) -> Str: return Str.plain(self.description) if self.description else super().label
[docs] async def dump_linked_data(self, app: App) -> DictDump[Dump]: dump = await super().dump_linked_data(app) dump["entities"] = [ app.static_url_generator.generate( f"/{camel_case_to_kebab_case(get_entity_type_name(entity))}/{quote(entity.id)}/index.json" ) for entity in self.entities if not isinstance(entity.id, GeneratedEntityId) ] return dump
[docs] @classmethod async def linked_data_schema(cls, app: App) -> DictDump[Dump]: schema = await super().linked_data_schema(app) add_property( schema, "entities", {"type": "array", "items": {"type": "string", "format": "uri"}}, ) return schema
[docs] @many_to_many("files", "betty.model.ancestry.File", "entities") class HasFiles: def __init__( # type: ignore[misc] self: HasFiles & Entity, *args: Any, files: Iterable[File] | None = None, **kwargs: Any, ): super().__init__( # type: ignore[misc] *args, **kwargs, ) if files is not None: self.files = files # type: ignore[assignment] @property def files(self) -> EntityCollection[File]: # type: ignore[empty-body] pass @files.setter def files(self, files: Iterable[File]) -> None: pass @files.deleter def files(self) -> None: pass @property def associated_files(self) -> Iterable[File]: return self.files
[docs] @many_to_one("contained_by", "betty.model.ancestry.Source", "contains") @one_to_many("contains", "betty.model.ancestry.Source", "contained_by") @one_to_many("citations", "betty.model.ancestry.Citation", "source") class Source( Dated, HasFiles, HasNotes, HasLinksEntity, HasPrivacy, UserFacingEntity, Entity ): contained_by: Source | None def __init__( self, name: str | None = None, *, id: str | None = None, author: str | None = None, publisher: str | None = None, contained_by: Source | None = None, contains: Iterable[Source] | None = None, notes: Iterable[Note] | None = None, date: Datey | None = None, files: Iterable[File] | None = None, links: MutableSequence[Link] | None = None, privacy: Privacy | None = None, public: bool | None = None, private: bool | None = None, ): super().__init__( id, notes=notes, date=date, files=files, links=links, privacy=privacy, public=public, private=private, ) self.name = name self.author = author self.publisher = publisher if contained_by is not None: self.contained_by = contained_by if contains is not None: self.contains = contains # type: ignore[assignment] def _get_effective_privacy(self) -> Privacy: privacy = super()._get_effective_privacy() if self.contained_by: return merge_privacies(privacy, self.contained_by.privacy) return privacy @property def contains(self) -> EntityCollection[Source]: # type: ignore[empty-body] pass @contains.setter def contains(self, contains: Iterable[Source]) -> None: pass @contains.deleter def contains(self) -> None: pass @property def citations(self) -> EntityCollection[Citation]: # type: ignore[empty-body] pass @citations.setter def citations(self, citations: Iterable[Citation]) -> None: pass @citations.deleter def citations(self) -> None: pass
[docs] @classmethod def entity_type_label(cls) -> Str: return Str._("Source")
[docs] @classmethod def entity_type_label_plural(cls) -> Str: return Str._("Sources")
@property def label(self) -> Str: return Str.plain(self.name) if self.name else super().label
[docs] async def dump_linked_data(self, app: App) -> DictDump[Dump]: dump = await super().dump_linked_data(app) dump["@type"] = "https://schema.org/Thing" dump["contains"] = [ app.static_url_generator.generate( f"/source/{quote(contained.id)}/index.json" ) for contained in self.contains if not isinstance(contained.id, GeneratedEntityId) ] dump["citations"] = [ app.static_url_generator.generate( f"/citation/{quote(citation.id)}/index.json" ) for citation in self.citations if not isinstance(citation.id, GeneratedEntityId) ] if self.contained_by is not None and not isinstance( self.contained_by.id, GeneratedEntityId ): dump["containedBy"] = app.static_url_generator.generate( f"/source/{quote(self.contained_by.id)}/index.json" ) if self.public: if self.name is not None: dump_context(dump, name="name") dump["name"] = self.name if self.author is not None: dump["author"] = self.author if self.publisher is not None: dump["publisher"] = self.publisher return dump
[docs] @classmethod async def linked_data_schema(cls, app: App) -> DictDump[Dump]: schema = await super().linked_data_schema(app) add_property( schema, "name", { "type": "string", }, False, ) add_property( schema, "author", { "type": "string", }, False, ) add_property( schema, "publisher", { "type": "string", }, False, ) add_property( schema, "contains", { "type": "array", "items": { "type": "string", "format": "uri", }, }, ) add_property( schema, "citations", { "$ref": "#/definitions/entity/citationCollection", }, ) add_property( schema, "containedBy", { "type": "string", "format": "uri", }, False, ) return schema
[docs] @deprecated( f"This class is deprecated as of Betty 0.3.2, and will be removed in Betty 0.4.x. No direct replacement is available. Instead, set the privacy for {Source} entities accordingly." ) class AnonymousSource(Source): @property # type: ignore[override] def name(self) -> str: return "private" @name.setter def name(self, _) -> None: # This is a no-op as the name is 'hardcoded'. pass @name.deleter def name(self) -> None: # This is a no-op as the name is 'hardcoded'. pass
[docs] @many_to_many("facts", "betty.model.ancestry.HasCitations", "citations") @many_to_one("source", "betty.model.ancestry.Source", "citations") class Citation(Dated, HasFiles, HasPrivacy, HasLinksEntity, UserFacingEntity, Entity): def __init__( self, *, id: str | None = None, facts: Iterable[HasCitations] | None = None, source: Source | None = None, location: Str | None = None, date: Datey | None = None, files: Iterable[File] | None = None, privacy: Privacy | None = None, public: bool | None = None, private: bool | None = None, ): super().__init__( id, date=date, files=files, privacy=privacy, public=public, private=private, ) if facts is not None: self.facts = facts # type: ignore[assignment] self.location = location self.source = source def _get_effective_privacy(self) -> Privacy: privacy = super()._get_effective_privacy() if self.source: return merge_privacies(privacy, self.source.privacy) return privacy @property def facts(self) -> EntityCollection[HasCitations & Entity]: # type: ignore[empty-body] pass @facts.setter def facts(self, facts: Iterable[HasCitations & Entity]) -> None: pass @facts.deleter def facts(self) -> None: pass
[docs] @classmethod def entity_type_label(cls) -> Str: return Str._("Citation")
[docs] @classmethod def entity_type_label_plural(cls) -> Str: return Str._("Citations")
@property def label(self) -> Str: return self.location or Str.plain("")
[docs] async def dump_linked_data(self, app: App) -> DictDump[Dump]: dump = await super().dump_linked_data(app) dump["@type"] = "https://schema.org/Thing" dump["facts"] = [ app.static_url_generator.generate( f"/{camel_case_to_kebab_case(get_entity_type_name(fact))}/{quote(fact.id)}/index.json" ) for fact in self.facts if not isinstance(fact.id, GeneratedEntityId) ] if self.source is not None and not isinstance( self.source.id, GeneratedEntityId ): dump["source"] = app.static_url_generator.generate( f"/source/{quote(self.source.id)}/index.json" ) return dump
[docs] @classmethod async def linked_data_schema(cls, app: App) -> DictDump[Dump]: schema = await super().linked_data_schema(app) add_property(schema, "source", {"type": "string", "format": "uri"}, False) add_property( schema, "facts", {"type": "array", "items": {"type": "string", "format": "uri"}}, ) return schema
[docs] @deprecated( f"This class is deprecated as of Betty 0.3.2, and will be removed in Betty 0.4.x. No direct replacement is available. Instead, set the privacy for {Citation} entities accordingly." ) class AnonymousCitation(Citation): @property # type: ignore[override] def location(self) -> Str: return Str._("private (in order to protect people's privacy)") @location.setter def location(self, _) -> None: # This is a no-op as the location is 'hardcoded'. pass @location.deleter def location(self) -> None: # This is a no-op as the location is 'hardcoded'. pass
[docs] class PlaceName(Localized, Dated, LinkedDataDumpable): def __init__( self, name: str, *, locale: str | None = None, date: Datey | None = None, ): super().__init__( date=date, locale=locale, ) self._name = name def __eq__(self, other: Any) -> bool: if not isinstance(other, self.__class__): return NotImplemented # pragma: no cover return self._name == other._name and self.locale == other.locale @recursive_repr() def __repr__(self) -> str: return repr_instance(self, name=self.name, locale=self.locale) def __str__(self) -> str: return self._name @property def name(self) -> str: return self._name
[docs] async def dump_linked_data(self, app: App) -> DictDump[Dump]: dump = await super().dump_linked_data(app) dump["name"] = self.name return dump
[docs] @classmethod async def linked_data_schema(cls, app: App) -> DictDump[Dump]: schema = await super().linked_data_schema(app) add_property(schema, "name", {"type": "string"}) return schema
[docs] @many_to_one_to_many( "betty.model.ancestry.Place", "enclosed_by", "encloses", "enclosed_by", "betty.model.ancestry.Place", "encloses", ) class Enclosure(Dated, HasCitations, Entity): encloses: Place | None enclosed_by: Place | None def __init__( self, encloses: Place | None = None, enclosed_by: Place | None = None, ): super().__init__() self.encloses = encloses self.enclosed_by = enclosed_by
[docs] @classmethod def entity_type_label(cls) -> Str: return Str._("Enclosure")
[docs] @classmethod def entity_type_label_plural(cls) -> Str: return Str._("Enclosures")
[docs] @one_to_many("events", "betty.model.ancestry.Event", "place") @one_to_many("enclosed_by", "betty.model.ancestry.Enclosure", "encloses") @one_to_many("encloses", "betty.model.ancestry.Enclosure", "enclosed_by") class Place(HasLinksEntity, HasFiles, HasNotes, HasPrivacy, UserFacingEntity, Entity): def __init__( self, *, id: str | None = None, names: list[PlaceName] | None = None, events: Iterable[Event] | None = None, enclosed_by: Iterable[Enclosure] | None = None, encloses: Iterable[Enclosure] | None = None, notes: Iterable[Note] | None = None, coordinates: Point | None = None, links: MutableSequence[Link] | None = None, privacy: Privacy | None = None, public: bool | None = None, private: bool | None = None, ): super().__init__( id, notes=notes, links=links, privacy=privacy, public=public, private=private, ) self._names = [] if names is None else names self._coordinates = coordinates if events is not None: self.events = events # type: ignore[assignment] if enclosed_by is not None: self.enclosed_by = enclosed_by # type: ignore[assignment] if encloses is not None: self.encloses = encloses # type: ignore[assignment] @property def enclosed_by(self) -> EntityCollection[Enclosure]: # type: ignore[empty-body] pass @enclosed_by.setter def enclosed_by(self, enclosed_by: Iterable[Enclosure]) -> None: pass @enclosed_by.deleter def enclosed_by(self) -> None: pass @property def encloses(self) -> EntityCollection[Enclosure]: # type: ignore[empty-body] pass @encloses.setter def encloses(self, encloses: Iterable[Enclosure]) -> None: pass @encloses.deleter def encloses(self) -> None: pass @property def events(self) -> EntityCollection[Event]: # type: ignore[empty-body] pass @events.setter def events(self, events: Iterable[Event]) -> None: pass @events.deleter def events(self) -> None: pass
[docs] @classmethod def entity_type_label(cls) -> Str: return Str._("Place")
[docs] @classmethod def entity_type_label_plural(cls) -> Str: return Str._("Places")
@property def names(self) -> list[PlaceName]: return self._names @property def coordinates(self) -> Point | None: return self._coordinates @coordinates.setter def coordinates(self, coordinates: Point): self._coordinates = coordinates @property def label(self) -> Str: # @todo Negotiate this by locale and date. with suppress(IndexError): return Str.plain(self.names[0].name) return super().label @property def associated_files(self) -> Iterable[File]: yield from self.files for event in self.events: yield from event.files
[docs] async def dump_linked_data(self, app: App) -> DictDump[Dump]: dump = await super().dump_linked_data(app) dump_context( dump, names="name", events="event", enclosedBy="containedInPlace", encloses="containsPlace", ) dump["@type"] = "https://schema.org/Place" dump["names"] = [await name.dump_linked_data(app) for name in self.names] dump["events"] = [ app.static_url_generator.generate(f"/event/{quote(event.id)}/index.json") for event in self.events if not isinstance(event.id, GeneratedEntityId) ] dump["enclosedBy"] = [ app.static_url_generator.generate( f"/place/{quote(enclosure.enclosed_by.id)}/index.json" ) for enclosure in self.enclosed_by if enclosure.enclosed_by is not None and not isinstance(enclosure.enclosed_by.id, GeneratedEntityId) ] dump["encloses"] = [ app.static_url_generator.generate( f"/place/{quote(enclosure.encloses.id)}/index.json" ) for enclosure in self.encloses if enclosure.encloses is not None and not isinstance(enclosure.encloses.id, GeneratedEntityId) ] if self.coordinates is not None: dump["coordinates"] = { "@type": "https://schema.org/GeoCoordinates", "latitude": self.coordinates.latitude, "longitude": self.coordinates.longitude, } dump_context(dump, coordinates="geo") dump_context( dump["coordinates"], # type: ignore[arg-type] latitude="latitude", ) dump_context( dump["coordinates"], # type: ignore[arg-type] longitude="longitude", ) return dump
[docs] @classmethod async def linked_data_schema(cls, app: App) -> DictDump[Dump]: schema = await super().linked_data_schema(app) add_property( schema, "names", { "type": "array", "items": await PlaceName.linked_data_schema(app), }, ) add_property( schema, "enclosedBy", {"$ref": "#/definitions/entity/placeCollection"}, False, ) add_property( schema, "encloses", {"$ref": "#/definitions/entity/placeCollection"} ) coordinate_schema: DictDump[Dump] = { "type": "number", } coordinates_schema: DictDump[Dump] = { "type": "object", "additionalProperties": False, } add_property(coordinates_schema, "latitude", coordinate_schema, False) add_property(coordinates_schema, "longitude", coordinate_schema, False) add_json_ld(coordinates_schema, schema) add_property(schema, "coordinates", coordinates_schema, False) add_property(schema, "events", {"$ref": "#/definitions/entity/eventCollection"}) return schema
[docs] class PresenceRole:
[docs] @classmethod def name(cls) -> str: raise NotImplementedError(repr(cls))
@property def label(self) -> Str: raise NotImplementedError(repr(self))
[docs] def ref_role(root_schema: DictDump[Dump]) -> DictDump[Dump]: """ Reference the PresenceRole schema. """ definitions = dump_default(root_schema, "definitions", dict) if "role" not in definitions: definitions["role"] = { "type": "string", "description": "A person's role in an event.", } return { "$ref": "#/definitions/role", }
[docs] class Subject(PresenceRole):
[docs] @classmethod def name(cls) -> str: return "subject"
@property def label(self) -> Str: return Str._("Subject")
[docs] class Witness(PresenceRole):
[docs] @classmethod def name(cls) -> str: return "witness"
@property def label(self) -> Str: return Str._("Witness")
[docs] class Beneficiary(PresenceRole):
[docs] @classmethod def name(cls) -> str: return "beneficiary"
@property def label(self) -> Str: return Str._("Beneficiary")
[docs] class Attendee(PresenceRole):
[docs] @classmethod def name(cls) -> str: return "attendee"
@property def label(self) -> Str: return Str._("Attendee")
[docs] class Speaker(PresenceRole):
[docs] @classmethod def name(cls) -> str: return "speaker"
@property def label(self) -> Str: return Str._("Speaker")
[docs] class Celebrant(PresenceRole):
[docs] @classmethod def name(cls) -> str: return "celebrant"
@property def label(self) -> Str: return Str._("Celebrant")
[docs] class Organizer(PresenceRole):
[docs] @classmethod def name(cls) -> str: return "organizer"
@property def label(self) -> Str: return Str._("Organizer")
[docs] @many_to_one_to_many( "betty.model.ancestry.Person", "presences", "person", "event", "betty.model.ancestry.Event", "presences", ) class Presence(HasPrivacy, Entity): person: Person | None event: Event | None role: PresenceRole def __init__( self, person: Person | None, role: PresenceRole, event: Event | None, ): super().__init__(None) self.person = person self.role = role self.event = event
[docs] @classmethod def entity_type_label(cls) -> Str: return Str._("Presence")
[docs] @classmethod def entity_type_label_plural(cls) -> Str: return Str._("Presences")
@property def label(self) -> Str: return Str._( "Presence of {person} at {event}", person=self.person.label if self.person else Str._("Unknown"), event=self.event.label if self.event else Str._("Unknown"), ) def _get_effective_privacy(self) -> Privacy: return merge_privacies( super()._get_effective_privacy(), self.person, self.event, )
[docs] @many_to_one("place", "betty.model.ancestry.Place", "events") @one_to_many("presences", "betty.model.ancestry.Presence", "event") class Event( Dated, HasFiles, HasCitations, HasNotes, Described, HasPrivacy, HasLinksEntity, UserFacingEntity, Entity, ): place: Place | None def __init__( self, *, id: str | None = None, event_type: type[EventType] = UnknownEventType, date: Datey | None = None, files: Iterable[File] | None = None, citations: Iterable[Citation] | None = None, notes: Iterable[Note] | None = None, privacy: Privacy | None = None, public: bool | None = None, private: bool | None = None, place: Place | None = None, description: str | None = None, ): super().__init__( id, date=date, files=files, citations=citations, notes=notes, privacy=privacy, public=public, private=private, description=description, ) self._event_type = event_type if place is not None: self.place = place @property def label(self) -> Str: format_kwargs: dict[str, str | Localizable] = { "event_type": self._event_type.label(), } subjects = [ presence.person for presence in self.presences if presence.public and isinstance(presence.role, Subject) and presence.person is not None and presence.person.public ] if subjects: format_kwargs["subjects"] = Str.call( lambda localizer: ", ".join( person.label.localize(localizer) for person in subjects ) ) if self.description is not None: format_kwargs["event_description"] = self.description if subjects: if self.description is None: return Str._("{event_type} of {subjects}", **format_kwargs) else: return Str._( "{event_type} ({event_description}) of {subjects}", **format_kwargs ) if self.description is None: return Str._("{event_type}", **format_kwargs) else: return Str._("{event_type} ({event_description})", **format_kwargs) @recursive_repr() def __repr__(self) -> str: return repr_instance(self, id=self._id, type=self._event_type) @property def presences(self) -> EntityCollection[Presence]: # type: ignore[empty-body] pass @presences.setter def presences(self, presences: Iterable[Presence]) -> None: pass @presences.deleter def presences(self) -> None: pass
[docs] @classmethod def entity_type_label(cls) -> Str: return Str._("Event")
[docs] @classmethod def entity_type_label_plural(cls) -> Str: return Str._("Events")
@property def event_type(self) -> type[EventType]: return self._event_type @property def associated_files(self) -> Iterable[File]: files = [ *self.files, *[ file for citation in self.citations for file in citation.associated_files ], ] # Preserve the original order. seen = set() for file in files: if file in seen: continue seen.add(file) yield file
[docs] async def dump_linked_data(self, app: App) -> DictDump[Dump]: dump = await super().dump_linked_data(app) dump_context(dump, presences="performer") dump["@type"] = "https://schema.org/Event" dump["type"] = self.event_type.name() dump["eventAttendanceMode"] = "https://schema.org/OfflineEventAttendanceMode" dump["eventStatus"] = "https://schema.org/EventScheduled" dump["presences"] = presences = [] if self.date is not None and self.public: await self.date.datey_dump_linked_data( dump["date"], # type: ignore[arg-type] "startDate", "endDate", ) for presence in self.presences: if presence.person and not isinstance( presence.person.id, GeneratedEntityId ): presences.append(self._dump_event_presence(presence, app)) if self.place is not None and not isinstance(self.place.id, GeneratedEntityId): dump["place"] = app.static_url_generator.generate( f"/place/{quote(self.place.id)}/index.json" ) dump_context(dump, place="location") return dump
def _dump_event_presence(self, presence: Presence, app: App) -> DictDump[Dump]: assert presence.person dump: DictDump[Dump] = { "@type": "https://schema.org/Person", "person": app.static_url_generator.generate( f"/person/{quote(presence.person.id)}/index.json" ), } if presence.public: dump["role"] = presence.role.name() return dump
[docs] @classmethod async def linked_data_schema(cls, app: App) -> DictDump[Dump]: schema = await super().linked_data_schema(app) add_property( schema, "type", { "type": "string", }, ) add_property( schema, "place", { "type": "string", "format": "uri", }, False, ) presence_schema: DictDump[Dump] = { "type": "object", "additionalProperties": False, } add_property(presence_schema, "role", ref_role(schema), False) add_property( presence_schema, "person", { "type": "string", "format": "uri", }, ) add_json_ld(presence_schema, schema) add_property( schema, "presences", { "type": "array", "items": presence_schema, }, ) add_property( schema, "eventStatus", { "type": "string", }, ) add_property( schema, "eventAttendanceMode", { "type": "string", }, ) return schema
[docs] @many_to_one("person", "betty.model.ancestry.Person", "names") class PersonName(Localized, HasCitations, HasPrivacy, Entity): person: Person | None def __init__( self, *, id: str | None = None, person: Person | None = None, individual: str | None = None, affiliation: str | None = None, privacy: Privacy | None = None, public: bool | None = None, private: bool | None = None, locale: str | None = None, ): if not individual and not affiliation: raise ValueError( "The individual and affiliation names must not both be empty." ) super().__init__( id, privacy=privacy, public=public, private=private, locale=locale, ) self._individual = individual self._affiliation = affiliation # Set the person association last, because the association requires comparisons, and self.__eq__() uses the # individual and affiliation names. self.person = person def _get_effective_privacy(self) -> Privacy: privacy = super()._get_effective_privacy() if self.person: return merge_privacies(privacy, self.person.privacy) return privacy def __repr__(self) -> str: return repr_instance( self, id=self.id, individual=self.individual, affiliation=self.affiliation )
[docs] @classmethod def entity_type_label(cls) -> Str: return Str._("Person name")
[docs] @classmethod def entity_type_label_plural(cls) -> Str: return Str._("Person names")
@property def individual(self) -> str | None: return self._individual @property def affiliation(self) -> str | None: return self._affiliation @property def label(self) -> Str: return Str._( "{individual_name} {affiliation_name}", individual_name="…" if not self.individual else self.individual, affiliation_name="…" if not self.affiliation else self.affiliation, )
[docs] async def dump_linked_data(self, app: App) -> DictDump[Dump]: dump = await super().dump_linked_data(app) if self.public: if self.individual is not None: dump_context(dump, individual="givenName") dump["individual"] = self.individual if self.affiliation is not None: dump_context(dump, affiliation="familyName") dump["affiliation"] = self.affiliation return dump
[docs] @classmethod async def linked_data_schema(cls, app: App) -> DictDump[Dump]: schema = await super().linked_data_schema(app) add_property( schema, "individual", { "type": "string", }, False, ) add_property( schema, "affiliation", { "type": "string", }, False, ) return schema
[docs] @many_to_many("parents", "betty.model.ancestry.Person", "children") @many_to_many("children", "betty.model.ancestry.Person", "parents") @one_to_many("presences", "betty.model.ancestry.Presence", "person") @one_to_many("names", "betty.model.ancestry.PersonName", "person") class Person( HasFiles, HasCitations, HasNotes, HasLinksEntity, HasPrivacy, UserFacingEntity, Entity, ): def __init__( self, *, id: str | None = None, files: Iterable[File] | None = None, citations: Iterable[Citation] | None = None, links: MutableSequence[Link] | None = None, notes: Iterable[Note] | None = None, privacy: Privacy | None = None, public: bool | None = None, private: bool | None = None, parents: Iterable[Person] | None = None, children: Iterable[Person] | None = None, presences: Iterable[Presence] | None = None, names: Iterable[PersonName] | None = None, ): super().__init__( id, files=files, citations=citations, links=links, notes=notes, privacy=privacy, public=public, private=private, ) if children is not None: self.children = children # type: ignore[assignment] if parents is not None: self.parents = parents # type: ignore[assignment] if presences is not None: self.presences = presences # type: ignore[assignment] if names is not None: self.names = names # type: ignore[assignment] @property def parents(self) -> EntityCollection[Person]: # type: ignore[empty-body] pass @parents.setter def parents(self, parents: Iterable[Person]) -> None: pass @parents.deleter def parents(self) -> None: pass @property def children(self) -> EntityCollection[Person]: # type: ignore[empty-body] pass @children.setter def children(self, children: Iterable[Person]) -> None: pass @children.deleter def children(self) -> None: pass @property def presences(self) -> EntityCollection[Presence]: # type: ignore[empty-body] pass @presences.setter def presences(self, presences: Iterable[Presence]) -> None: pass @presences.deleter def presences(self) -> None: pass @property def names(self) -> EntityCollection[PersonName]: # type: ignore[empty-body] pass @names.setter def names(self, names: Iterable[PersonName]) -> None: pass @names.deleter def names(self) -> None: pass
[docs] @classmethod def entity_type_label(cls) -> Str: return Str._("Person")
[docs] @classmethod def entity_type_label_plural(cls) -> Str: return Str._("People")
@property def siblings(self) -> list[Person]: siblings = [] for parent in self.parents: for sibling in parent.children: if sibling != self and sibling not in siblings: siblings.append(sibling) return siblings @property def associated_files(self) -> Iterable[File]: files = [ *self.files, *[ file for name in self.names for citation in name.citations for file in citation.associated_files ], *[ file for presence in self.presences if presence.event is not None for file in presence.event.associated_files ], ] # Preserve the original order. seen = set() for file in files: if file in seen: continue seen.add(file) yield file @property def label(self) -> Str: for name in self.names: if name.public: return name.label return super().label
[docs] async def dump_linked_data(self, app: App) -> DictDump[Dump]: dump = await super().dump_linked_data(app) dump_context( dump, names="name", parents="parent", children="child", siblings="sibling", ) dump["@type"] = "https://schema.org/Person" dump["parents"] = [ app.static_url_generator.generate(f"/person/{quote(parent.id)}/index.json") for parent in self.parents if not isinstance(parent.id, GeneratedEntityId) ] dump["children"] = [ app.static_url_generator.generate(f"/person/{quote(child.id)}/index.json") for child in self.children if not isinstance(child.id, GeneratedEntityId) ] dump["siblings"] = [ app.static_url_generator.generate(f"/person/{quote(sibling.id)}/index.json") for sibling in self.siblings if not isinstance(sibling.id, GeneratedEntityId) ] dump["presences"] = [ self._dump_person_presence(presence, app) for presence in self.presences if presence.event is not None and not isinstance(presence.event.id, GeneratedEntityId) ] if self.public: dump["names"] = [ await name.dump_linked_data(app) for name in self.names if name.public ] else: dump["names"] = [] return dump
def _dump_person_presence(self, presence: Presence, app: App) -> DictDump[Dump]: assert presence.event dump: DictDump[Dump] = { "event": app.static_url_generator.generate( f"/event/{quote(presence.event.id)}/index.json" ), } dump_context(dump, event="performerIn") if presence.public: dump["role"] = presence.role.name() return dump
[docs] @classmethod async def linked_data_schema(cls, app: App) -> DictDump[Dump]: schema = await super().linked_data_schema(app) add_property( schema, "names", { "type": "array", "items": await PersonName.linked_data_schema(app), }, ) add_property( schema, "parents", { "$ref": "#/definitions/entity/personCollection", }, ) add_property( schema, "children", { "$ref": "#/definitions/entity/personCollection", }, ) add_property( schema, "siblings", { "$ref": "#/definitions/entity/personCollection", }, ) presence_schema: DictDump[Dump] = { "type": "object", "additionalProperties": False, } add_property(presence_schema, "role", ref_role(schema), False) add_property( presence_schema, "event", { "type": "string", "format": "uri", }, ) add_json_ld(presence_schema, schema) add_property( schema, "presences", { "type": "array", "items": presence_schema, }, ) return schema
[docs] class Ancestry(MultipleTypesEntityCollection[Entity]): def __init__(self): super().__init__() self._check_graph = True
[docs] def add_unchecked_graph(self, *entities: Entity) -> None: self._check_graph = False try: self.add(*entities) finally: self._check_graph = True
def _on_add(self, *entities: Entity) -> None: super()._on_add(*entities) if self._check_graph: self.add(*self._get_associates(*entities)) def _get_associates(self, *entities: Entity) -> Iterable[Entity]: for entity in entities: for association in EntityTypeAssociationRegistry.get_all_associations( entity ): for associate in EntityTypeAssociationRegistry.get_associates( entity, association ): yield associate