Source code for fileseq.utils

"""
utils - General tools of use to fileseq operations.
"""
from __future__ import annotations

import collections.abc
import decimal
import os
import typing

import sys
from itertools import chain, count, islice

from . import exceptions


FILESYSTEM_ENCODING = sys.getfilesystemencoding() or 'utf-8'


[docs] def quantize( number: decimal.Decimal, decimal_places: int, rounding: str = decimal.ROUND_HALF_EVEN ) -> decimal.Decimal: """ Round a decimal value to given number of decimal places Args: number (decimal.Decimal): Decimal number to round decimal_places (int): Number of decimal places in return value rounding (str): decimal.Decimal rounding mode. See rounding argument of https://docs.python.org/2/library/decimal.html#decimal.Context Returns: decimal.Decimal: """ quantize_exponent = decimal.Decimal(1).scaleb(-decimal_places) nq = number.quantize(quantize_exponent, rounding=rounding) if nq.is_zero(): return nq.copy_abs() return nq
[docs] def lenRange(start: int, stop: int, step: int = 1) -> int: """ Get the length of values for a given range, exclusive of the stop Args: start (int): stop (int): step (int): """ if not step: raise ValueError('step argument must not be zero') if step > 0: result = (stop - start + step - 1) // step else: result = (stop - start + step + 1) // step return max(0, result)
[docs] class xrange2(object): """ An itertools-based replacement for xrange which does not exhibit the OverflowError issue on some platforms, when a value exceeds a C long size. Provides the features of an islice, with the added support for checking the length of the range. """ __slots__ = ['_len', '_islice', '_start', '_stop', '_step'] def __init__(self, start: int, stop: typing.Optional[int] = None, step: int = 1): if stop is None: start, stop = 0, start self._len = lenRange(start, stop, step) self._islice = islice(count(start, step), self._len) self._start = start self._stop = stop self._step = step def __repr__(self) -> str: if self._step == 1: return 'range({}, {})'.format(self._start, self._stop) else: return 'range({}, {}, {})'.format(self._start, self._stop, self._step) def __len__(self) -> int: return self._len def __next__(self) -> int: return next(self._islice) def __iter__(self) -> typing.Iterable[typing.Any]: return self._islice.__iter__() @property def start(self) -> int: return self._start @property def stop(self) -> int: return self._stop @property def step(self) -> int: return self._step
# Issue #44 # On Windows platform, it is possible for xrange to get an # OverflowError if a value passed to xrange exceeds the size of a C long. # Switch to an alternate implementation. if os.name == 'nt': xrange = range = xrange2 else: xrange = range class _islice(object): def __init__(self, gen: typing.Iterable[typing.Any], start: int, stop: int, step: int = 1): self._gen = gen self._start = start self._stop = stop self._step = step def __len__(self) -> int: return lenRange(self._start, self._stop, self._step) def __next__(self) -> typing.Any: # noinspection PyTypeChecker return next(self._gen) # type:ignore def __iter__(self) -> typing.Iterable[typing.Any]: return self._gen.__iter__() @property def start(self) -> int: return self._start @property def stop(self) -> int: return self._stop @property def step(self) -> int: return self._step class _xfrange(_islice): def __len__(self) -> int: stop = self._stop + (1 if self._start <= self._stop else -1) return lenRange(self._start, stop, self._step)
[docs] def xfrange(start: int, stop: int, step: int = 1, maxSize: int = -1) -> typing.Generator[typing.Any, None, None]: """ Returns a generator that yields the frames from start to stop, inclusive. In other words it adds or subtracts a frame, as necessary, to return the stop value as well, if the stepped range would touch that value. Args: start (int): stop (int): step (int): Note that the sign will be ignored maxSize (int): Returns: generator: Raises: :class:`fileseq.exceptions.MaxSizeException`: if size is exceeded """ if not step: raise ValueError('xfrange() step argument must not be zero') start, stop, step = normalizeFrames([start, stop, step]) # type:ignore[assignment] if start <= stop: step = abs(step) else: step = -abs(step) if isinstance(start, int): size = (stop - start) // step + 1 else: size = int((stop - start) / step) + 1 if 0 <= maxSize < size: raise exceptions.MaxSizeException( "Size %d > %s (MAX_FRAME_SIZE)" % (size, maxSize)) # because an xrange is an odd object all its own, we wrap it in a # generator expression to get a proper Generator if isinstance(start, int): offset = step // abs(step) gen = (f for f in range(start, stop + offset, step)) # type:ignore else: gen = (start + i * step for i in range(size)) return _xfrange(gen, start, stop, step) # type:ignore
[docs] def batchFrames(start: int, stop: int, batch_size: int) -> typing.Iterable[typing.Any]: """ Returns a generator that yields batches of frames from start to stop, inclusive. Each batch value is a ``range`` generator object, also providing start, stop, and step properties. The last batch frame length may be smaller if the batches cannot be divided evenly. start value is allowed to be greater than stop value, to generate decreasing frame values. Args: start (int): start frame value stop (int): stop frame value batch_size (int): max size of each batch Yields: range(sub_start, sub_stop) """ if batch_size <= 0: return for i in xfrange(start, stop, batch_size): if start <= stop: sub_stop = min(i - 1 + batch_size, stop) else: sub_stop = max(i + 1 - batch_size, stop) yield xfrange(i, sub_stop)
[docs] def batchIterable(it: typing.Iterable[typing.Any], batch_size: int) -> typing.Iterable[typing.Any]: """ Returns a generator that yields batches of items returned by the given iterable. The last batch frame length may be smaller if the batches cannot be divided evenly. Args: it (iterable): An iterable from which to yield batches of values batch_size (int): max size of each batch Yields: iterable: a subset of batched items """ if batch_size <= 0: return # Try to get the length. If it is a generator with no # known length, then we have to use a less efficient # method that builds results by exhausting the generator try: length = len(it) # type:ignore except TypeError: for b in _batchGenerator(it, batch_size): yield b return # We can use the known length to yield slices for start in xrange(0, length, batch_size): # type:ignore stop = start + batch_size gen = islice(it, start, stop) yield _islice(gen, start, stop)
def _batchGenerator(gen: typing.Iterable[typing.Any], batch_size: int) -> typing.Generator[typing.Any, None, None]: """ A batching generator function that handles a generator type, where the length isn't known. Args: gen: generator object batch_size (int): max size of each batch Yields: iterable: a subset of batched items """ batch = [] for item in gen: batch.append(item) if len(batch) == batch_size: yield batch batch = [] if batch: yield batch
[docs] def normalizeFrame(frame: int | float | decimal.Decimal | str) -> int | float | decimal.Decimal | None: """ Convert a frame number to the most appropriate type - the most compact type that doesn't affect precision, for example numbers that convert exactly to integer values will be converted to int Args: frame (int, float, decimal.Decimal, str): frame number to normalize Returns: frame (int, float, or decimal.Decimal): """ if frame is None: return None elif isinstance(frame, int): return frame elif isinstance(frame, float): frame_int = int(frame) if frame == frame_int: return frame_int return frame elif isinstance(frame, decimal.Decimal): frame_int = int(frame) if frame == frame_int: return frame_int return frame.normalize() else: try: return int(frame) except ValueError: try: frame = decimal.Decimal(frame) except decimal.DecimalException: return frame # type:ignore[return-value] else: return normalizeFrame(frame)
[docs] def normalizeFrames(frames: typing.Iterable[typing.Any]) -> list[int | float | decimal.Decimal]: """ Convert a sequence of frame numbers to the most appropriate type for the overall sequence, where all members of the result are of the same type. Args: frames (iterable of int, float, decimal.Decimal, or str): frame numbers to normalize Returns: frames (iterable of int, float, or decimal.Decimal): """ # Normalise all frame values and find their type frames = [normalizeFrame(frame) for frame in frames] frame_types = set(type(frame) for frame in frames) FrameType: object # Determine best overall type for frames if float in frame_types: FrameType = float elif decimal.Decimal in frame_types: FrameType = decimal.Decimal else: FrameType = int if len(frame_types) == 1: return frames # Convert all frames to chosen type frames = [FrameType(frame) for frame in frames] # Ensure all decimal frames have same exponent if FrameType is decimal.Decimal: maximum_decimal_places = max( -frame.as_tuple().exponent for frame in frames ) frames = [quantize(frame, maximum_decimal_places) for frame in frames] return frames
[docs] def unique( seen: typing.Set[typing.Any], *iterables: typing.Iterable[typing.Any] ) -> typing.Generator[typing.Any, None, None]: """ Get the unique items in iterables while preserving order. Note that this mutates the seen set provided only when the returned generator is used. Args: seen (set): either an empty set, or the set of things already seen *iterables: one or more iterable lists to chain together Returns: generator: """ _add = seen.add # return a generator of the unique items and the set of the seen items # the seen set will mutate when the generator is iterated over return (i for i in chain(*iterables) if i not in seen and not _add(i))
[docs] def pad(number: typing.Any, width: typing.Optional[int] = 0, decimal_places: typing.Optional[int] = None) -> str: """ Return the zero-padded string of a given number. Args: number (str, int, float, or decimal.Decimal): the number to pad width (int): width for zero padding the integral component decimal_places (int): number of decimal places to use in frame range Returns: str: """ # Make the common case fast. Truncate to integer value as USD does. # https://graphics.pixar.com/usd/docs/api/_usd__page__value_clips.html # See _DeriveClipTimeString for formatting of templateAssetPath # https://github.com/PixarAnimationStudios/USD/blob/release/pxr/usd/usd/clipSetDefinition.cpp if decimal_places == 0: try: number = round(number) or 0 except TypeError: pass return str(number).partition(".")[0].zfill(width) # type:ignore[arg-type] # USD ultimately uses vsnprintf to format floats for templateAssetPath: # _DeriveClipTimeString -> TfStringPrintf -> ArchVStringPrintf -> ArchVsnprintf -> vsnprintf # Since glibc 2.17 the printf family of functions rounds floats using the # current IEEE rounding mode, by default bankers' rounding (FE_TONEAREST). # See https://sourceware.org/bugzilla/show_bug.cgi?id=5044 and man(3) fegetround # Also https://www.exploringbinary.com/inconsistent-rounding-of-printed-floating-point-numbers/ if decimal_places is not None: if not isinstance(number, decimal.Decimal): number = decimal.Decimal(number) number = quantize(number, decimal_places, decimal.ROUND_HALF_EVEN) number = str(number) parts = number.split(".", 1) parts[0] = parts[0].zfill(width) return ".".join(parts)
def _getPathSep(path: str) -> str: """ Abstracts returning the appropriate path separator for the given path string. This implementation always returns ``os.sep`` Abstracted to make test mocking easier. Args: path (str): A path to check for the most common sep Returns: str: """ return os.sep _STR_TYPES = frozenset((str, bytes))
[docs] def asString(obj: object) -> str: """ Ensure an object is explicitly str type and not some derived type that can change semantics. If the object is str, return str. Otherwise, return the string conversion of the object. Args: obj: Object to return as str Returns: str: """ typ = type(obj) # explicit type check as faster path if typ in _STR_TYPES: if typ is bytes: obj = os.fsdecode(obj) # type: ignore return obj # type: ignore # derived type check elif isinstance(obj, bytes): obj = obj.decode(FILESYSTEM_ENCODING) else: obj = str(obj) return str(obj)