"""
Provide the project API.
"""
from __future__ import annotations
from contextlib import suppress
from pathlib import Path
from reprlib import recursive_repr
from typing import Any, Generic, final, Iterable, cast, Self
from urllib.parse import urlparse
from betty.app.extension import Extension, ConfigurableExtension
from betty.classtools import repr_instance
from betty.config import (
Configuration,
Configurable,
FileBasedConfiguration,
ConfigurationMapping,
ConfigurationSequence,
)
from betty.hashid import hashid
from betty.locale import get_data, Str
from betty.model import Entity, get_entity_type_name, UserFacingEntity, EntityT
from betty.model.ancestry import Ancestry, Person, Event, Place, Source
from betty.serde.dump import (
Dump,
VoidableDump,
void_none,
minimize,
Void,
VoidableDictDump,
)
from betty.serde.load import (
AssertionFailed,
Fields,
Assertions,
Assertion,
RequiredField,
OptionalField,
Asserter,
)
from betty.warnings import deprecate
DEFAULT_LIFETIME_THRESHOLD = 125
[docs]
class EntityReference(Configuration, Generic[EntityT]):
def __init__(
self,
entity_type: type[EntityT] | None = None,
entity_id: str | None = None,
*,
entity_type_is_constrained: bool = False,
):
super().__init__()
self._entity_type = entity_type
self._entity_id = entity_id
self._entity_type_is_constrained = entity_type_is_constrained
@property
def entity_type(self) -> type[EntityT] | None:
return self._entity_type
@entity_type.setter
def entity_type(self, entity_type: type[EntityT]) -> None:
if self._entity_type_is_constrained:
raise AttributeError(
f"The entity type cannot be set, as it is already constrained to {self._entity_type}."
)
self._entity_type = entity_type
self._dispatch_change()
@property
def entity_id(self) -> str | None:
return self._entity_id
@entity_id.setter
def entity_id(self, entity_id: str) -> None:
self._entity_id = entity_id
self._dispatch_change()
@entity_id.deleter
def entity_id(self) -> None:
self._entity_id = None
@property
def entity_type_is_constrained(self) -> bool:
return self._entity_type_is_constrained
[docs]
def update(self, other: Self) -> None:
self._entity_type = other._entity_type
self._entity_type_is_constrained = other._entity_type_is_constrained
self._entity_id = other._entity_id
self._dispatch_change()
[docs]
@classmethod
def load(
cls,
dump: Dump,
configuration: Self | None = None,
) -> Self:
if configuration is None:
configuration = cls()
asserter = Asserter()
if isinstance(dump, dict) or not configuration.entity_type_is_constrained:
asserter.assert_record(
Fields(
RequiredField(
"entity_type",
Assertions(asserter.assert_entity_type())
| asserter.assert_setattr(configuration, "entity_type"),
),
OptionalField(
"entity_id",
Assertions(asserter.assert_str())
| asserter.assert_setattr(configuration, "entity_id"),
),
)
)(dump)
else:
asserter.assert_str()(dump)
asserter.assert_setattr(configuration, "entity_id")(dump) # type: ignore[arg-type]
return configuration
[docs]
def dump(self) -> VoidableDump:
if self.entity_type_is_constrained:
return void_none(self.entity_id)
if self.entity_type is None or self.entity_id is None:
return Void
dump: VoidableDictDump[VoidableDump] = {
"entity_type": (
get_entity_type_name(self._entity_type) if self._entity_type else Void
),
"entity_id": self._entity_id,
}
return minimize(dump)
def __eq__(self, other: Any) -> bool:
if not isinstance(other, EntityReference):
return NotImplemented
return (
self.entity_type == other.entity_type and self.entity_id == other.entity_id
)
[docs]
class EntityReferenceSequence(
Generic[EntityT], ConfigurationSequence[EntityReference[EntityT]]
):
def __init__(
self,
entity_references: Iterable[EntityReference[EntityT]] | None = None,
*,
entity_type_constraint: type[EntityT] | None = None,
):
self._entity_type_constraint = entity_type_constraint
super().__init__(entity_references)
@classmethod
def _item_type(cls) -> type[EntityReference[EntityT]]:
return EntityReference
def _on_add(self, configuration: EntityReference[EntityT]) -> None:
super()._on_add(configuration)
entity_type_constraint = self._entity_type_constraint
entity_reference_entity_type = configuration._entity_type
if entity_type_constraint is None:
return
if (
entity_reference_entity_type == entity_type_constraint
and configuration.entity_type_is_constrained
):
return
expected_entity_type_name = get_entity_type_name(
cast(type[Entity], entity_type_constraint),
)
expected_entity_type_label = entity_type_constraint.entity_type_label()
if entity_reference_entity_type is None:
raise AssertionFailed(
Str._(
"The entity reference must be for an entity of type {expected_entity_type_name} ({expected_entity_type_label}), but instead does not specify an entity type at all.",
expected_entity_type_name=expected_entity_type_name,
expected_entity_type_label=expected_entity_type_label,
)
)
actual_entity_type_label = entity_type_constraint.entity_type_label()
raise AssertionFailed(
Str._(
"The entity reference must be for an entity of type {expected_entity_type_name} ({expected_entity_type_label}), but instead is for an entity of type {actual_entity_type_name} ({actual_entity_type_label})",
expected_entity_type_name=expected_entity_type_name,
expected_entity_type_label=expected_entity_type_label,
actual_entity_type_name=get_entity_type_name(
entity_reference_entity_type
),
actual_entity_type_label=actual_entity_type_label,
)
)
[docs]
class ExtensionConfiguration(Configuration):
def __init__(
self,
extension_type: type[Extension],
*,
enabled: bool = True,
extension_configuration: Configuration | None = None,
):
super().__init__()
self._extension_type = extension_type
self._enabled = enabled
if extension_configuration is None and issubclass(
extension_type, ConfigurableExtension
):
extension_configuration = extension_type.default_configuration()
if extension_configuration is not None:
extension_configuration.on_change(self)
self._extension_configuration = extension_configuration
def __eq__(self, other: Any) -> bool:
if not isinstance(other, self.__class__):
return NotImplemented
if self.extension_type != other.extension_type:
return False
if self.enabled != other.enabled:
return False
if self.extension_configuration != other.extension_configuration:
return False
return True
@property
def extension_type(self) -> type[Extension]:
return self._extension_type
@property
def enabled(self) -> bool:
return self._enabled
@enabled.setter
def enabled(self, enabled: bool) -> None:
self._enabled = enabled
self._dispatch_change()
@property
def extension_configuration(self) -> Configuration | None:
return self._extension_configuration
[docs]
def update(self, other: Self) -> None:
self._extension_type = other._extension_type
self._enabled = other._enabled
self._extension_configuration = other._extension_configuration
[docs]
@classmethod
def load(
cls,
dump: Dump,
configuration: Self | None = None,
) -> Self:
asserter = Asserter()
extension_type = asserter.assert_field(
RequiredField(
"extension",
Assertions(asserter.assert_extension_type()),
)
)(dump)
if configuration is None:
configuration = cls(extension_type)
else:
# This MUST NOT fail. If it does, this is a bug in the calling code that must be fixed.
assert extension_type is configuration.extension_type
asserter.assert_record(
Fields(
RequiredField(
"extension",
),
OptionalField(
"enabled",
Assertions(asserter.assert_bool())
| asserter.assert_setattr(configuration, "enabled"),
),
OptionalField(
"configuration",
Assertions(
configuration._assert_load_extension_configuration(
configuration.extension_type
)
),
),
)
)(dump)
return configuration
def _assert_load_extension_configuration(
self, extension_type: type[Extension]
) -> Assertion[Any, Configuration]:
def _assertion(value: Any) -> Configuration:
extension_configuration = self._extension_configuration
if isinstance(extension_configuration, Configuration):
return extension_configuration.load(value, extension_configuration)
raise AssertionFailed(
Str._(
"{extension_type} is not configurable.",
extension_type=extension_type.name(),
)
)
return _assertion
[docs]
def dump(self) -> VoidableDump:
return minimize(
{
"extension": self.extension_type.name(),
"enabled": self.enabled,
"configuration": (
minimize(self.extension_configuration.dump())
if issubclass(self.extension_type, Configurable)
and self.extension_configuration
else Void
),
}
)
[docs]
class ExtensionConfigurationMapping(
ConfigurationMapping[type[Extension], ExtensionConfiguration]
):
def _minimize_item_dump(self) -> bool:
return True
@classmethod
def _create_default_item(
cls, configuration_key: type[Extension]
) -> ExtensionConfiguration:
return ExtensionConfiguration(configuration_key)
def __init__(
self,
configurations: Iterable[ExtensionConfiguration] | None = None,
):
super().__init__(configurations)
@classmethod
def _item_type(cls) -> type[ExtensionConfiguration]:
return ExtensionConfiguration
def _get_key(self, configuration: ExtensionConfiguration) -> type[Extension]:
return configuration.extension_type
@classmethod
def _load_key(
cls,
item_dump: Dump,
key_dump: str,
) -> Dump:
asserter = Asserter()
dict_dump = asserter.assert_dict()(item_dump)
dict_dump["extension"] = key_dump
return dict_dump
def _dump_key(self, item_dump: VoidableDump) -> tuple[VoidableDump, str]:
dict_dump = self._asserter.assert_dict()(item_dump)
return dict_dump, dict_dump.pop("extension")
[docs]
def enable(self, *extension_types: type[Extension]) -> None:
for extension_type in extension_types:
try:
self._configurations[extension_type].enabled = True
except KeyError:
self.append(ExtensionConfiguration(extension_type))
[docs]
def disable(self, *extension_types: type[Extension]) -> None:
for extension_type in extension_types:
with suppress(KeyError):
self._configurations[extension_type].enabled = False
[docs]
class EntityTypeConfiguration(Configuration):
def __init__(
self,
entity_type: type[Entity],
*,
generate_html_list: bool | None = None,
):
super().__init__()
self._entity_type = entity_type
self.generate_html_list = generate_html_list # type: ignore[assignment]
def __eq__(self, other: Any) -> bool:
if not isinstance(other, self.__class__):
return NotImplemented
if self.entity_type != other.entity_type:
return False
if self.generate_html_list != other.generate_html_list:
return False
return True
@property
def entity_type(self) -> type[Entity]:
return self._entity_type
@property
def generate_html_list(self) -> bool:
return self._generate_html_list or False
@generate_html_list.setter
def generate_html_list(self, generate_html_list: bool | None) -> None:
if generate_html_list and not issubclass(self._entity_type, UserFacingEntity):
raise AssertionFailed(
Str._(
"Cannot generate pages for {entity_type}, because it is not a user-facing entity type.",
entity_type=get_entity_type_name(self._entity_type),
)
)
self._generate_html_list = generate_html_list
self._dispatch_change()
[docs]
def update(self, other: Self) -> None:
self._entity_type = other._entity_type
self._generate_html_list = other._generate_html_list
self._dispatch_change()
[docs]
@classmethod
def load(
cls,
dump: Dump,
configuration: Self | None = None,
) -> Self:
asserter = Asserter()
entity_type = asserter.assert_field(
RequiredField[Any, type[Entity]](
"entity_type",
Assertions(asserter.assert_str()) | asserter.assert_entity_type(),
),
)(dump)
configuration = cls(entity_type)
asserter.assert_record(
Fields(
OptionalField(
"entity_type",
),
OptionalField(
"generate_html_list",
Assertions(asserter.assert_bool())
| asserter.assert_setattr(configuration, "generate_html_list"),
),
)
)(dump)
return configuration
[docs]
def dump(self) -> VoidableDump:
dump: VoidableDictDump[VoidableDump] = {
"entity_type": get_entity_type_name(self._entity_type),
"generate_html_list": (
Void if self._generate_html_list is None else self._generate_html_list
),
}
return minimize(dump)
[docs]
class EntityTypeConfigurationMapping(
ConfigurationMapping[type[Entity], EntityTypeConfiguration]
):
def _minimize_item_dump(self) -> bool:
return True
def _get_key(self, configuration: EntityTypeConfiguration) -> type[Entity]:
return configuration.entity_type
@classmethod
def _load_key(
cls,
item_dump: Dump,
key_dump: str,
) -> Dump:
asserter = Asserter()
dict_dump = asserter.assert_dict()(item_dump)
asserter.assert_entity_type()(key_dump)
dict_dump["entity_type"] = key_dump
return dict_dump
def _dump_key(self, item_dump: VoidableDump) -> tuple[VoidableDump, str]:
dict_dump = self._asserter.assert_dict()(item_dump)
return dict_dump, dict_dump.pop("entity_type")
@classmethod
def _item_type(cls) -> type[EntityTypeConfiguration]:
return EntityTypeConfiguration
@classmethod
def _create_default_item(
cls, configuration_key: type[Entity]
) -> EntityTypeConfiguration:
return EntityTypeConfiguration(configuration_key)
[docs]
class LocaleConfiguration(Configuration):
def __init__(
self,
locale: str,
*,
alias: str | None = None,
):
super().__init__()
self._locale = locale
if alias is not None and "/" in alias:
raise AssertionFailed(Str._("Locale aliases must not contain slashes."))
self._alias = alias
@recursive_repr()
def __repr__(self) -> str:
return repr_instance(self, locale=self.locale, alias=self.alias)
def __eq__(self, other: Any) -> bool:
if not isinstance(other, self.__class__):
return NotImplemented
if self.locale != other.locale:
return False
if self.alias != other.alias:
return False
return True
def __hash__(self) -> int:
return hash((self._locale, self._alias))
@property
def locale(self) -> str:
return self._locale
@property
def alias(self) -> str:
if self._alias is None:
return self.locale
return self._alias
@alias.setter
def alias(self, alias: str | None) -> None:
self._alias = alias
self._dispatch_change()
[docs]
def update(self, other: Self) -> None:
self._locale = other._locale
self._alias = other._alias
[docs]
@classmethod
def load(
cls,
dump: Dump,
configuration: Self | None = None,
) -> Self:
asserter = Asserter()
locale = asserter.assert_field(
RequiredField("locale", Assertions(asserter.assert_locale())),
)(dump)
if configuration is None:
configuration = cls(locale)
asserter.assert_record(
Fields(
RequiredField("locale"),
OptionalField(
"alias",
Assertions(asserter.assert_str())
| asserter.assert_setattr(configuration, "alias"),
),
)
)(dump)
return configuration
[docs]
def dump(self) -> VoidableDump:
return minimize({"locale": self.locale, "alias": void_none(self._alias)})
[docs]
class LocaleConfigurationMapping(ConfigurationMapping[str, LocaleConfiguration]):
@classmethod
def _create_default_item(cls, configuration_key: str) -> LocaleConfiguration:
return LocaleConfiguration(configuration_key)
def __init__(
self,
configurations: Iterable[LocaleConfiguration] | None = None,
):
super().__init__(configurations)
if len(self) == 0:
self.append(LocaleConfiguration("en-US"))
def _get_key(self, configuration: LocaleConfiguration) -> str:
return configuration.locale
@classmethod
def _load_key(
cls,
item_dump: Dump,
key_dump: str,
) -> Dump:
asserter = Asserter()
dict_item_dump = asserter.assert_dict()(item_dump)
dict_item_dump["locale"] = key_dump
return dict_item_dump
def _dump_key(self, item_dump: VoidableDump) -> tuple[VoidableDump, str]:
dict_item_dump = self._asserter.assert_dict()(item_dump)
return dict_item_dump, dict_item_dump.pop("locale")
@classmethod
def _item_type(cls) -> type[LocaleConfiguration]:
return LocaleConfiguration
def _on_remove(self, configuration: LocaleConfiguration) -> None:
if len(self._configurations) <= 1:
raise AssertionFailed(
Str._(
"Cannot remove the last remaining locale {locale}",
locale=get_data(configuration.locale).get_display_name()
or configuration.locale,
)
)
@property
def default(self) -> LocaleConfiguration:
return next(iter(self._configurations.values()))
@default.setter
def default(self, configuration: LocaleConfiguration | str) -> None:
if isinstance(configuration, str):
configuration = self[configuration]
self._configurations[configuration.locale] = configuration
self._configurations.move_to_end(configuration.locale, False)
self._dispatch_change()
@property
def multilingual(self) -> bool:
return len(self) > 1
[docs]
@final
class ProjectConfiguration(FileBasedConfiguration):
def __init__(
self,
base_url: str | None = None,
root_path: str = "",
clean_urls: bool = False,
title: str = "Betty",
author: str | None = None,
entity_types: Iterable[EntityTypeConfiguration] | None = None,
extensions: Iterable[ExtensionConfiguration] | None = None,
debug: bool = False,
locales: Iterable[LocaleConfiguration] | None = None,
lifetime_threshold: int = DEFAULT_LIFETIME_THRESHOLD,
name: str | None = None,
):
super().__init__()
self._name = name
self._computed_name: str | None = None
self._base_url = "https://example.com" if base_url is None else base_url
self._root_path = root_path
self._clean_urls = clean_urls
self._title = title
self._author = author
self._entity_types = EntityTypeConfigurationMapping(
entity_types
or [
EntityTypeConfiguration(
entity_type=Person,
generate_html_list=True,
),
EntityTypeConfiguration(
entity_type=Event,
generate_html_list=True,
),
EntityTypeConfiguration(
entity_type=Place,
generate_html_list=True,
),
EntityTypeConfiguration(
entity_type=Source,
generate_html_list=True,
),
]
)
self._entity_types.on_change(self)
self._extensions = ExtensionConfigurationMapping(extensions or ())
self._extensions.on_change(self)
self._debug = debug
self._locales = LocaleConfigurationMapping(locales or ())
self._locales.on_change(self)
self._lifetime_threshold = lifetime_threshold
@property
def name(self) -> str | None:
return self._name
@name.setter
def name(self, name: str) -> None:
self._name = name
self._dispatch_change()
@property
def project_directory_path(self) -> Path:
return self.configuration_file_path.parent
@property
def output_directory_path(self) -> Path:
return self.project_directory_path / "output"
@property
def assets_directory_path(self) -> Path:
return self.project_directory_path / "assets"
@property
def www_directory_path(self) -> Path:
return self.output_directory_path / "www"
[docs]
def localize_www_directory_path(self, locale: str) -> Path:
if self.locales.multilingual:
return self.www_directory_path / self.locales[locale].alias
return self.www_directory_path
@property
def title(self) -> str:
return self._title
@title.setter
def title(self, title: str) -> None:
self._title = title
self._dispatch_change()
@property
def author(self) -> str | None:
return self._author
@author.setter
def author(self, author: str | None) -> None:
self._author = author
self._dispatch_change()
@property
def base_url(self) -> str:
return self._base_url
@base_url.setter
def base_url(self, base_url: str) -> None:
base_url_parts = urlparse(base_url)
if not base_url_parts.scheme:
raise AssertionFailed(
Str._(
"The base URL must start with a scheme such as https://, http://, or file://."
)
)
if not base_url_parts.netloc:
raise AssertionFailed(Str._("The base URL must include a path."))
self._base_url = "%s://%s" % (base_url_parts.scheme, base_url_parts.netloc)
self._dispatch_change()
@property
def root_path(self) -> str:
return self._root_path
@root_path.setter
def root_path(self, root_path: str) -> None:
self._root_path = root_path.strip("/")
self._dispatch_change()
@property
def clean_urls(self) -> bool:
return self._clean_urls
@clean_urls.setter
def clean_urls(self, clean_urls: bool) -> None:
self._clean_urls = clean_urls
self._dispatch_change()
@property
def locales(self) -> LocaleConfigurationMapping:
return self._locales
@property
def entity_types(self) -> EntityTypeConfigurationMapping:
return self._entity_types
@property
def extensions(self) -> ExtensionConfigurationMapping:
return self._extensions
@property
def debug(self) -> bool:
return self._debug
@debug.setter
def debug(self, debug: bool) -> None:
self._debug = debug
self._dispatch_change()
@property
def lifetime_threshold(self) -> int:
return self._lifetime_threshold
@lifetime_threshold.setter
def lifetime_threshold(self, lifetime_threshold: int) -> None:
self._asserter.assert_positive_number()(lifetime_threshold)
self._lifetime_threshold = lifetime_threshold
self._dispatch_change()
[docs]
def update(self, other: Self) -> None:
self._base_url = other._base_url
self._title = other._title
self._author = other._author
self._root_path = other._root_path
self._clean_urls = other._clean_urls
self._debug = other._debug
self._lifetime_threshold = other._lifetime_threshold
self._locales.update(other._locales)
self._extensions.update(other._extensions)
self._entity_types.update(other._entity_types)
self._dispatch_change()
[docs]
@classmethod
def load(
cls,
dump: Dump,
configuration: Self | None = None,
) -> Self:
if configuration is None:
configuration = cls()
asserter = Asserter()
asserter.assert_record(
Fields(
OptionalField(
"name",
Assertions(asserter.assert_str())
| asserter.assert_setattr(configuration, "name"),
),
RequiredField(
"base_url",
Assertions(asserter.assert_str())
| asserter.assert_setattr(configuration, "base_url"),
),
OptionalField(
"title",
Assertions(asserter.assert_str())
| asserter.assert_setattr(configuration, "title"),
),
OptionalField(
"author",
Assertions(asserter.assert_str())
| asserter.assert_setattr(configuration, "author"),
),
OptionalField(
"root_path",
Assertions(asserter.assert_str())
| asserter.assert_setattr(configuration, "root_path"),
),
OptionalField(
"clean_urls",
Assertions(asserter.assert_bool())
| asserter.assert_setattr(configuration, "clean_urls"),
),
OptionalField(
"debug",
Assertions(asserter.assert_bool())
| asserter.assert_setattr(configuration, "debug"),
),
OptionalField(
"lifetime_threshold",
Assertions(asserter.assert_int())
| asserter.assert_setattr(configuration, "lifetime_threshold"),
),
OptionalField(
"locales",
Assertions(
configuration._locales.assert_load(configuration.locales)
),
),
OptionalField(
"extensions",
Assertions(
configuration._extensions.assert_load(configuration.extensions)
),
),
OptionalField(
"entity_types",
Assertions(
configuration._entity_types.assert_load(
configuration.entity_types
)
),
),
)
)(dump)
return configuration
[docs]
def dump(self) -> VoidableDictDump[Dump]:
return minimize(
{ # type: ignore[return-value]
"name": void_none(self.name),
"base_url": self.base_url,
"title": self.title,
"root_path": void_none(self.root_path),
"clean_urls": void_none(self.clean_urls),
"author": void_none(self.author),
"debug": void_none(self.debug),
"lifetime_threshold": void_none(self.lifetime_threshold),
"locales": self.locales.dump(),
"extensions": self.extensions.dump(),
"entity_types": self.entity_types.dump(),
},
True,
)
[docs]
class Project(Configurable[ProjectConfiguration]):
def __init__(
self,
*,
project_id: str | None = None,
ancestry: Ancestry | None = None,
):
super().__init__()
if project_id is not None:
deprecate(
f"Initializing {type(self)} with a project ID is deprecated as of Betty 0.3.2, and will be removed in Betty 0.4.x. Instead, set {type(self)}.configuration.name.",
stacklevel=2,
)
self._id = project_id
self._configuration = ProjectConfiguration()
self._ancestry = Ancestry() if ancestry is None else ancestry
@property
def id(self) -> str:
deprecate(
f"{type(self)}.id is deprecated as of Betty 0.3.2, and will be removed in Betty 0.4.x. Insead, use {type(self)}.name."
)
if self._id is None:
return self.name
return self._id
@property
def name(self) -> str:
if self._configuration.name is None:
return hashid(str(self._configuration.configuration_file_path))
return self._configuration.name
@property
def ancestry(self) -> Ancestry:
return self._ancestry