"""
Provide Betty's default Jinja2 filters.
"""
from __future__ import annotations
import json as stdjson
import re
import warnings
from asyncio import get_running_loop, run
from base64 import b64encode
from collections.abc import Awaitable
from contextlib import suppress
from io import BytesIO
from pathlib import Path
from typing import Callable, Iterable, Any, Iterator, TypeVar, AsyncIterator
from urllib.parse import quote
import aiofiles
from PIL import Image
from PIL.Image import DecompressionBombWarning
from aiofiles.os import makedirs
from geopy import units
from geopy.format import DEGREES_FORMAT
from jinja2 import pass_context, pass_eval_context
from jinja2.async_utils import auto_aiter, auto_await
from jinja2.filters import prepare_map, make_attrgetter
from jinja2.nodes import EvalContext
from jinja2.runtime import Context, Macro
from markupsafe import Markup, escape
from pdf2image.pdf2image import convert_from_path
from betty import _resizeimage
from betty.functools import walk
from betty.hashid import hashid_file_meta, hashid
from betty.locale import (
negotiate_localizeds,
Localized,
Datey,
negotiate_locale,
Localey,
get_data,
Localizable,
)
from betty.media_type import MediaType
from betty.model import get_entity_type_name
from betty.model.ancestry import File, Dated
from betty.os import link_or_copy
from betty.serde.dump import minimize, none_void, void_none
from betty.string import (
camel_case_to_snake_case,
camel_case_to_kebab_case,
upper_camel_case_to_lower_camel_case,
)
from betty.warnings import deprecated
T = TypeVar("T")
[docs]
@pass_context
def filter_url(
context: Context,
resource: Any,
media_type: str | None = None,
*args: Any,
locale: Localey | None = None,
**kwargs: Any,
) -> str:
"""
Generate a localized URL for a localizable resource.
"""
from betty.jinja2 import context_app, context_localizer
return context_app(context).url_generator.generate(
resource,
media_type or "text/html",
*args,
locale=locale or context_localizer(context).locale, # type: ignore[misc]
**kwargs,
)
[docs]
@pass_context
def filter_static_url(
context: Context,
resource: Any,
absolute: bool = False,
) -> str:
"""
Generate a static URL for a static resource.
"""
from betty.jinja2 import context_app
return context_app(context).static_url_generator.generate(
resource,
absolute=absolute,
)
[docs]
@pass_context
def filter_localize(
context: Context,
localizable: Localizable,
) -> str:
"""
Localize a value using the context's current localizer.
"""
from betty.jinja2 import context_localizer
return localizable.localize(context_localizer(context))
[docs]
def filter_json(data: Any, indent: int | None = None) -> str:
"""
Convert a value to a JSON string.
"""
return stdjson.dumps(data, indent=indent)
[docs]
async def filter_flatten(values_of_values: Iterable[Iterable[T]]) -> AsyncIterator[T]:
"""
Flatten an iterable of iterables into a single iterable.
"""
async for values in auto_aiter(values_of_values):
async for value in auto_aiter(values):
yield value
[docs]
def filter_walk(value: Any, attribute_name: str) -> Iterable[Any]:
"""
Walk over a data structure.
"""
return walk(value, attribute_name)
_paragraph_re = re.compile(r"(?:\r\n|\r|\n){2,}")
[docs]
@pass_eval_context
def filter_paragraphs(eval_ctx: EvalContext, text: str) -> str | Markup:
"""
Convert newlines to <p> and <br> tags.
Taken from http://jinja.pocoo.org/docs/2.10/api/#custom-filters.
"""
result = "\n\n".join(
"<p>%s</p>" % p.replace("\n", Markup("<br>\n"))
for p in _paragraph_re.split(escape(text))
)
if eval_ctx.autoescape:
result = Markup(result)
return result
[docs]
async def filter_unique(value: Iterable[T]) -> AsyncIterator[T]:
"""
Iterate over an iterable of values and only yield those values that have not been yielded before.
"""
seen = []
async for value in auto_aiter(value):
if value not in seen:
yield value
seen.append(value)
[docs]
@pass_context
async def filter_map(
context: Context, values: Iterable[Any], *args: Any, **kwargs: Any
) -> Any:
"""
Map an iterable's values.
This mimics Jinja2's built-in map filter, but allows macros as callbacks.
"""
if len(args) > 0 and isinstance(args[0], Macro):
func: Macro | Callable[[Any], bool] = args[0]
else:
func = prepare_map(context, args, kwargs)
async for value in auto_aiter(values):
yield await auto_await(func(value))
[docs]
@pass_context
async def filter_file(context: Context, file: File) -> str:
"""
Preprocess a file for use in a page.
:return: The public path to the preprocessed file. This can be used on a web page.
"""
from betty.jinja2 import context_app, context_job_context
app = context_app(context)
job_context = context_job_context(context)
execute_filter = True
if job_context:
job_cache_item_id = f"filter_file:{file.id}"
async with job_context.cache.getset(job_cache_item_id, wait=False) as (
cache_item,
setter,
):
if cache_item is None and setter is not None:
await setter(None)
else:
execute_filter = False
if execute_filter:
file_destination_path = (
app.project.configuration.www_directory_path
/ "file"
/ file.id
/ "file"
/ file.path.name
)
await makedirs(file_destination_path.parent, exist_ok=True)
await link_or_copy(file.path, file_destination_path)
return f"/file/{quote(file.id)}/file/{quote(file.path.name)}"
[docs]
@pass_context
async def filter_image(
context: Context,
file: File,
width: int | None = None,
height: int | None = None,
) -> str:
"""
Preprocess an image file for use in a page.
:return: The public path to the preprocessed file. This can be embedded in a web page.
"""
from betty.jinja2 import context_app, context_job_context
# Treat SVGs as regular files.
if (
file.media_type
and file.media_type.type == "image"
and "svg+xml" == file.media_type.subtype
):
return await filter_file(context, file)
app = context_app(context)
job_context = context_job_context(context)
destination_name = f"{file.id}-"
if height and width:
destination_name += f"{width}x{height}"
elif height:
destination_name += f"-x{height}"
elif width:
destination_name += f"{width}x-"
else:
raise ValueError("At least the width or height must be given.")
file_directory_path = app.project.configuration.www_directory_path / "file"
if file.media_type:
if file.media_type.type == "image":
image_loader = _load_image_image
destination_name += file.path.suffix
elif file.media_type.type == "application" and file.media_type.subtype == "pdf":
image_loader = _load_image_application_pdf
destination_name += "." + "jpg"
else:
raise ValueError(
f'Cannot convert a file of media type "{file.media_type}" to an image.'
)
else:
raise ValueError("Cannot convert a file without a media type to an image.")
cache_item_id = f'{await hashid_file_meta(file.path)}:{"" if width is None else width}:{"" if height is None else height}'
execute_filter = True
if job_context:
async with job_context.cache.with_scope("filter_image").getset(
cache_item_id, wait=False
) as (cache_item, setter):
if cache_item is None and setter is not None:
await setter(True)
else:
execute_filter = False
if execute_filter:
loop = get_running_loop()
await loop.run_in_executor(
app.process_pool,
_execute_filter_image,
image_loader,
file.path,
file.media_type,
app.binary_file_cache.with_scope("image").cache_item_file_path(
cache_item_id
),
file_directory_path,
destination_name,
width,
height,
)
destination_public_path = f"/file/{quote(destination_name)}"
return destination_public_path
async def _load_image_image(
file_path: Path,
media_type: MediaType,
) -> Image.Image:
# We want to read the image asynchronously and prevent Pillow from keeping too many file
# descriptors open simultaneously, so we read the image ourselves and store the contents
# in a synchronous file object.
async with aiofiles.open(file_path, "rb") as f:
image_f = BytesIO(await f.read())
# Ignore warnings about decompression bombs, because we know where the files come from.
with warnings.catch_warnings(action="ignore", category=DecompressionBombWarning):
image = Image.open(image_f, formats=[media_type.subtype])
return image
async def _load_image_application_pdf(
file_path: Path,
media_type: MediaType,
) -> Image.Image:
# Ignore warnings about decompression bombs, because we know where the files come from.
with warnings.catch_warnings(action="ignore", category=DecompressionBombWarning):
image = convert_from_path(file_path, fmt="jpeg")[0]
return image
def _execute_filter_image(
image_loader: Callable[[Path, MediaType], Awaitable[Image.Image]],
file_path: Path,
media_type: MediaType,
cache_item_file_path: Path,
destination_directory_path: Path,
destination_name: str,
width: int | None,
height: int | None,
) -> None:
run(
__execute_filter_image(
image_loader,
file_path,
media_type,
cache_item_file_path,
destination_directory_path,
destination_name,
width,
height,
)
)
async def __execute_filter_image(
image_loader: Callable[[Path, MediaType], Awaitable[Image.Image]],
file_path: Path,
media_type: MediaType,
cache_item_file_path: Path,
destination_directory_path: Path,
destination_name: str,
width: int | None,
height: int | None,
) -> None:
destination_file_path = destination_directory_path / destination_name
await makedirs(destination_directory_path, exist_ok=True)
try:
await link_or_copy(cache_item_file_path, destination_file_path)
except FileNotFoundError:
image = await image_loader(file_path, media_type)
try:
if width is not None:
width = min(width, image.width)
if height is not None:
height = min(height, image.height)
await makedirs(cache_item_file_path.parent, exist_ok=True)
converted_image = await _execute_filter_image_convert(image, width, height)
converted_image.save(cache_item_file_path, format=media_type.subtype)
del converted_image
finally:
image.close()
del image
await link_or_copy(cache_item_file_path, destination_file_path)
async def _execute_filter_image_convert(
image: Image.Image,
width: int | None,
height: int | None,
) -> Image.Image:
if width is not None and height is not None:
return _resizeimage.resize_cover(image, (width, height))
if width is not None:
return _resizeimage.resize_width(image, width)
if height is not None:
return _resizeimage.resize_height(image, height)
raise ValueError("Width and height cannot both be None.")
[docs]
@pass_context
def filter_negotiate_localizeds(
context: Context, localizeds: Iterable[Localized]
) -> Localized | None:
"""
Try to find an object whose locale matches the context's current locale.
"""
from betty.jinja2 import context_localizer
return negotiate_localizeds(context_localizer(context).locale, list(localizeds))
[docs]
@pass_context
def filter_sort_localizeds(
context: Context,
localizeds: Iterable[Localized],
localized_attribute: str,
sort_attribute: str,
) -> Iterable[Localized]:
"""
Sort localized objects.
"""
from betty.jinja2 import context_localizer
get_localized_attr = make_attrgetter(context.environment, localized_attribute)
get_sort_attr = make_attrgetter(context.environment, sort_attribute)
def _get_sort_key(x: Localized) -> Any:
return get_sort_attr(
negotiate_localizeds(
context_localizer(context).locale, get_localized_attr(x)
)
)
return sorted(localizeds, key=_get_sort_key)
[docs]
@pass_context
def filter_select_localizeds(
context: Context, localizeds: Iterable[Localized], include_unspecified: bool = False
) -> Iterable[Localized]:
"""
Select all objects whose locale matches the context's current locale.
:param include_unspecified: If True, the return value includes all objects that do not have a locale specified.
"""
from betty.jinja2 import context_localizer
for localized in localizeds:
if include_unspecified and localized.locale in {
None,
"mis",
"mul",
"und",
"zxx",
}:
yield localized
if (
localized.locale is not None
and negotiate_locale(context_localizer(context).locale, [localized.locale])
is not None
):
yield localized
[docs]
@pass_context
def filter_negotiate_dateds(
context: Context, dateds: Iterable[Dated], date: Datey | None
) -> Dated | None:
"""
Try to find an object whose date falls in the given date.
:param date: A date to select by. If None, then today's date is used.
"""
with suppress(StopIteration):
return next(filter_select_dateds(context, dateds, date))
return None
[docs]
@pass_context
def filter_select_dateds(
context: Context, dateds: Iterable[Dated], date: Datey | None
) -> Iterator[Dated]:
"""
Select all objects whose date falls in the given date.
:param date: A date to select by. If None, then today's date is used.
"""
if date is None:
date = context.resolve_or_missing("today")
return filter(
lambda dated: dated.date is None
or dated.date.comparable
and dated.date in date,
dateds,
)
[docs]
@deprecated(
"This function is deprecated as of Betty 0.3.4, and will be removed in Betty 0.4.x. Instead, use the `hashid` filter."
)
def filter_base64(input: str) -> str:
"""
Base-64-encode a string.
"""
return b64encode(input.encode("utf-8")).decode("utf-8")
[docs]
def filter_hashid(input: str) -> str:
"""
Create a hash ID.
"""
return hashid(input)
[docs]
@pass_context
def filter_public_css(context: Context, public_path: str) -> None:
"""
Add a CSS file to the current page.
"""
public_css_paths = context.resolve_or_missing("public_css_paths")
if public_path in public_css_paths:
return
public_css_paths.append(public_path)
[docs]
@pass_context
def filter_public_js(context: Context, public_path: str) -> None:
"""
Add a JavaScript file to the current page.
"""
public_js_paths = context.resolve_or_missing("public_js_paths")
if public_path in public_js_paths:
return
public_js_paths.append(public_path)
FILTERS = {
"base64": filter_base64,
"camel_case_to_kebab_case": camel_case_to_kebab_case,
"camel_case_to_snake_case": camel_case_to_snake_case,
"entity_type_name": get_entity_type_name,
"file": filter_file,
"flatten": filter_flatten,
"format_datey": filter_format_datey,
"format_degrees": filter_format_degrees,
"hashid": filter_hashid,
"image": filter_image,
"json": filter_json,
"locale_get_data": get_data,
"localize": filter_localize,
"map": filter_map,
"minimize": minimize,
"negotiate_dateds": filter_negotiate_dateds,
"negotiate_localizeds": filter_negotiate_localizeds,
"none_void": none_void,
"paragraphs": filter_paragraphs,
"select_dateds": filter_select_dateds,
"select_localizeds": filter_select_localizeds,
"static_url": filter_static_url,
"sort_localizeds": filter_sort_localizeds,
"str": str,
"unique": filter_unique,
"upper_camel_case_to_lower_camel_case": upper_camel_case_to_lower_camel_case,
"url": filter_url,
"void_none": void_none,
"walk": filter_walk,
"public_css": filter_public_css,
"public_js": filter_public_js,
}