Source code for betty.fs

"""
Provide file system utilities.
"""

from __future__ import annotations

import asyncio
import hashlib
import os
from collections import deque
from contextlib import suppress
from os.path import getmtime
from pathlib import Path
from shutil import copy2
from types import TracebackType
from typing import AsyncIterable, AsyncContextManager, Sequence

import aiofiles
from aiofiles.os import makedirs
from aiofiles.threadpool.text import AsyncTextIOWrapper

from betty import _ROOT_DIRECTORY_PATH
from betty.warnings import deprecated

ROOT_DIRECTORY_PATH = _ROOT_DIRECTORY_PATH


ASSETS_DIRECTORY_PATH = ROOT_DIRECTORY_PATH / "betty" / "assets"


PREBUILT_ASSETS_DIRECTORY_PATH = ROOT_DIRECTORY_PATH / "prebuild"


HOME_DIRECTORY_PATH = Path.home() / ".betty"


CACHE_DIRECTORY_PATH = HOME_DIRECTORY_PATH / "cache"
"""
Define the path to the cache directory.

.. deprecated:: 0.3.3
   This constant is deprecated as of Betty 0.3.3, and will be removed in Betty 0.4.x.
   Instead, use :py:class:`betty.cache.file.BinaryFileCache`.
"""


[docs] async def iterfiles(path: Path) -> AsyncIterable[Path]: """ Recursively iterate over any files found in a directory. """ for dir_path, _, filenames in os.walk(str(path)): for filename in filenames: yield Path(dir_path) / filename
[docs] @deprecated( "This function is deprecated as of Betty 0.3.4, and will be removed in Betty 0.4.x. Instead, use `betty.hashid.hashid_file_meta()`." ) def hashfile(path: Path) -> str: """ Get a hash for a file. This function relies on the file path and last modified time for uniqueness. File contents are ignored. """ return hashlib.md5( ":".join([str(getmtime(path)), str(path)]).encode("utf-8") ).hexdigest()
class _Open: def __init__(self, fs: FileSystem, file_paths: tuple[Path, ...]): self._fs = fs self._file_paths = file_paths self._file: AsyncContextManager[AsyncTextIOWrapper] | None = None async def __aenter__(self) -> AsyncTextIOWrapper: for file_path in map(Path, self._file_paths): for fs_path, fs_encoding in self._fs._paths: with suppress(FileNotFoundError): self._file = aiofiles.open( fs_path / file_path, encoding=fs_encoding ) return await self._file.__aenter__() raise FileNotFoundError async def __aexit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: if self._file is not None: await self._file.__aexit__(None, None, None)
[docs] class FileSystem: def __init__(self, *paths: tuple[Path, str | None]): self._paths = deque(paths) def __len__(self) -> int: return len(self._paths) @property def paths(self) -> Sequence[tuple[Path, str | None]]: return list(self._paths)
[docs] def prepend(self, path: Path, fs_encoding: str | None = None) -> None: self._paths.appendleft((path, fs_encoding))
[docs] def clear(self) -> None: self._paths.clear()
[docs] def open(self, *file_paths: Path) -> _Open: return _Open(self, file_paths)
[docs] async def copy2(self, source_path: Path, destination_path: Path) -> Path: for fs_path, _ in self._paths: with suppress(FileNotFoundError): await asyncio.to_thread(copy2, fs_path / source_path, destination_path) return destination_path tried_paths = [str(fs_path / source_path) for fs_path, _ in self._paths] raise FileNotFoundError("Could not find any of %s." % ", ".join(tried_paths))
[docs] async def copytree( self, source_path: Path, destination_path: Path ) -> AsyncIterable[Path]: file_destination_paths = set() for fs_path, _ in self._paths: async for file_source_path in iterfiles(fs_path / source_path): file_destination_path = destination_path / file_source_path.relative_to( fs_path / source_path ) if file_destination_path not in file_destination_paths: file_destination_paths.add(file_destination_path) await makedirs(file_destination_path.parent, exist_ok=True) await asyncio.to_thread( copy2, file_source_path, file_destination_path ) yield file_destination_path