Source code for fileseq.frameset

"""
frameset - A set-like object representing a frame range for fileseq.
"""
from __future__ import annotations

import dataclasses
import decimal
import numbers
import typing
import warnings
from collections.abc import Set, Sized, Iterable
from typing import Union, overload

from . import constants  # constants.MAX_FRAME_SIZE updated during tests
from .constants import PAD_MAP, FRANGE_RE, PAD_RE
from .exceptions import MaxSizeException, ParseException
from .utils import (asString, xfrange, unique, pad, quantize,
                    normalizeFrame, normalizeFrames, batchIterable, _islice)

# Type alias for frame values
FrameValue = Union[int, float, decimal.Decimal]

# Internal type alias for a (start, end) interval pair used in coverage/gap calculations
_Interval = typing.Tuple[decimal.Decimal, decimal.Decimal]

# Type alias for FrameSet constructor input
FrameSetInput = Union[
    str,                                    # Frame range string like "1-10", "1-100x5"
    'FrameSet',                            # Another FrameSet (copy constructor)
    typing.Iterable[FrameValue],           # General iterable of frame numbers
    FrameValue,                            # Individual frame number
]

if typing.TYPE_CHECKING:
    BaseFrameSet = Set[FrameValue]
else:
    BaseFrameSet = Set


[docs] @dataclasses.dataclass(frozen=True) class Range: """Compact representation of a frame range with start, end, and step""" __slots__ = ('start', 'end', 'step') start: decimal.Decimal end: decimal.Decimal step: decimal.Decimal
[docs] def __contains__(self, frame: FrameValue) -> bool: """Check if frame is in this range""" frame_dec = decimal.Decimal(str(frame)) if self.step > 0: if not (self.start <= frame_dec <= self.end): return False else: if not (self.end <= frame_dec <= self.start): return False # check if frame aligns with step offset = abs(frame_dec - self.start) return offset % abs(self.step) == 0
[docs] def __iter__(self) -> typing.Iterator[FrameValue]: """Iterate through frames in this range""" step = abs(self.step) has_subframes = (self.start % 1 != 0 or self.end % 1 != 0 or step % 1 != 0) if not has_subframes: # use native int arithmetic for integer ranges start, end, istep = int(self.start), int(self.end), int(step) if start <= end: yield from range(start, end + 1, int(istep)) else: yield from range(start, end - 1, int(-istep)) return current = self.start if self.start <= self.end: while current <= self.end: yield current current += step else: while current >= self.end: yield current current -= step
[docs] def __len__(self) -> int: """Return number of frames in this range""" if self.step == 0: return 0 return int(abs(self.end - self.start) / abs(self.step)) + 1
_D0 = decimal.Decimal(0) _D1 = decimal.Decimal(1) def _all_ranges_contiguous(ranges: list[Range]) -> bool: """Return True if every range has a step of 1 or -1.""" for r in ranges: if r.step != _D1 and r.step != -_D1: return False return True def _merged_coverage(ranges: list[Range]) -> list[_Interval]: """Return sorted, merged bounding intervals from a list of ranges. Each entry is ``(lo, hi)`` where ``lo <= hi``. Adjacent intervals (e.g. [1,5] and [6,10]) are merged into one. """ if not ranges: return [] # collect one (lo, hi) pair per range block intervals = [(min(r.start, r.end), max(r.start, r.end)) for r in ranges] intervals.sort() merged: list[_Interval] = [intervals[0]] for lo, hi in intervals[1:]: prev_lo, prev_hi = merged[-1] # merge if adjacent (frames n and n+1 are neighbours) or overlapping if lo <= prev_hi + _D1: merged[-1] = (prev_lo, max(prev_hi, hi)) else: merged.append((lo, hi)) return merged def _gaps_in_range(lo: decimal.Decimal, hi: decimal.Decimal, coverage: list[_Interval]) -> list[_Interval]: """Return sub-intervals of [lo, hi] not covered by any entry in coverage. Gaps are returned in ascending order. """ gaps: list[_Interval] = [] cursor = lo for cov_lo, cov_hi in coverage: if cov_hi < lo: continue if cov_lo > hi: break if cursor < cov_lo: gaps.append((cursor, cov_lo - _D1)) cursor = max(cursor, cov_hi + _D1) if cursor > hi: break if cursor <= hi: gaps.append((cursor, hi)) return gaps
[docs] class FrameSet(BaseFrameSet): """ A ``FrameSet`` is an immutable representation of the ordered, unique set of frames in a given frame range. The frame range can be expressed in the following ways: - 1-5 - 1-5,10-20 - 1-100x5 (every fifth frame) - 1-100y5 (opposite of above, fills in missing frames) - 1-100:4 (same as 1-100x4,1-100x3,1-100x2,1-100) - 1-2x0.333333 (subframes) A ``FrameSet`` is effectively an ordered frozenset, with FrameSet-returning versions of frozenset methods: >>> FrameSet('1-5').union(FrameSet('5-10')) FrameSet("1-10") >>> FrameSet('1-5').intersection(FrameSet('5-10')) FrameSet("5") Because a FrameSet is hashable, it can be used as the key to a dictionary: >>> d = {FrameSet("1-20"): 'good'} A FrameSet can be created from an iterable of frame numbers, and will construct an appropriate string representation: >>> FrameSet([1,2,3,4,5]).frange '1-5' >>> FrameSet([0, '0.1429', '0.2857', '0.4286', '0.5714', '0.7143', '0.8571', 1]).frange '0-1x0.142857' Caveats: 1. Because the internal storage of a ``FrameSet`` contains the discreet values of the entire range, an exception will be thrown if the range exceeds a large reasonable limit, which could lead to huge memory allocations or memory failures. See ``fileseq.constants.MAX_FRAME_SIZE``. 2. All frozenset operations return a normalized ``FrameSet``: internal frames are in numerically increasing order. 3. Equality is based on the contents and order, NOT the frame range string (there are a finite, but potentially extremely large, number of strings that can represent any given range, only a "best guess" can be made). 4. Human-created frame ranges (ie 1-100x5) will be reduced to the actual internal frames (ie 1-96x5). 5. The "null" ``Frameset`` (``FrameSet('')``) is now a valid thing to create, it is required by set operations, but may cause confusion as both its start and end methods will raise IndexError. The :meth:`is_null` property allows you to guard against this. Args: frange (str or FrameSet or collections.Iterable of str, int, float, or decimal.Decimal): the frame range as a string (ie "1-100x5") or iterable of frame numbers. Raises: :class:`.ParseException`: if the frame range (or a portion of it) could not be parsed. :class:`fileseq.exceptions.MaxSizeException`: if the range exceeds ``fileseq.constants.MAX_FRAME_SIZE`` """ FRANGE_RE = FRANGE_RE PAD_MAP = PAD_MAP PAD_RE = PAD_RE __slots__ = ('_frange', '_ranges', '_normalized_cache', '_hash_cache', '_has_subframes', '_subframe_type') _ranges: list[Range] _normalized_cache: list[Range] | None _hash_cache: int | None _has_subframes: bool _subframe_type: type[float] | type[decimal.Decimal] | None def __new__(cls, *args: typing.Any, **kwargs: typing.Any) -> FrameSet: """ Initialize the :class:`FrameSet` object. Args: frange (str or :class:`FrameSet`): the frame range as a string (ie "1-100x5") Raises: :class:`.ParseException`: if the frame range (or a portion of it) could not be parsed. :class:`fileseq.exceptions.MaxSizeException`: if the range exceeds ``fileseq.constants.MAX_FRAME_SIZE`` """ self = super(cls, FrameSet).__new__(cls) return self def __init__(self, frange: FrameSetInput) -> None: """Initialize the :class:`FrameSet` object. """ def catch_parse_err(fn, *a, **kw): # type: ignore try: return fn(*a, **kw) except (TypeError, ValueError) as e: raise ParseException('FrameSet args parsing error: {}'.format(e)) from e # initialize caches self._normalized_cache = None self._hash_cache = None self._has_subframes = False self._subframe_type = None # if the user provides anything but a string, short-circuit the build if not isinstance(frange, (str,)): # if it's apparently a FrameSet already, short-circuit the build if set(dir(frange)).issuperset(self.__slots__): for attr in self.__slots__: setattr(self, attr, getattr(frange, attr)) return # if it's inherently disordered, sort and build elif isinstance(frange, Set): self._maxSizeCheck(frange) frames_raw = list(frange) # Normalize first frames = sorted(catch_parse_err(normalizeFrames, frames_raw)) # type: ignore # detect subframe type after normalization self._detect_subframe_type(frames) self._frange = catch_parse_err( # type: ignore self.framesToFrameRange, frames, sort=False, compress=False) self._ranges = self._frames_to_ranges(frames) return # if it's ordered, find unique and build elif isinstance(frange, Sized) and isinstance(frange, Iterable): self._maxSizeCheck(frange) # convert to list first to allow multiple iterations frange_list = list(frange) # normalize first normalized = catch_parse_err(normalizeFrames, frange_list) # type: ignore # detect subframe type after normalization self._detect_subframe_type(normalized) seen_items: typing.Set[FrameValue] = set() order = list(unique(seen_items, normalized)) # type: ignore self._frange = catch_parse_err( # type: ignore self.framesToFrameRange, order, sort=False, compress=False) self._ranges = self._frames_to_ranges(order) return # if it's an individual number build directly elif isinstance(frange, (int, float, decimal.Decimal)): frame = normalizeFrame(frange) self._has_subframes = isinstance(frame, (float, decimal.Decimal)) and frame % 1 != 0 self._frange = catch_parse_err( # type: ignore self.framesToFrameRange, [frame], sort=False, compress=False) frame_dec = decimal.Decimal(str(frame)) self._ranges = [Range(frame_dec, frame_dec, decimal.Decimal(1))] return # in all other cases, cast to a string else: try: frange = asString(frange) except Exception as err: msg = 'Could not parse "{0}": cast to string raised: {1}' raise ParseException(msg.format(frange, err)) # we're willing to trim padding characters from consideration frange = str(frange) for key in self.PAD_MAP: frange = frange.replace(key, '') frange = ''.join(frange.split()) self._frange = asString(frange) # because we're acting like a set, we need to support the empty set if not self._frange: self._ranges = [] return # parse frame range into Range objects self._ranges = [] maxSize = constants.MAX_FRAME_SIZE frange_parts: typing.List[typing.Any] = [] frange_types: typing.List[typing.Any] = [] has_decimal_notation = False for part in self._frange.split(","): if not part: continue start, end, modifier, chunk = self._parse_frange_part(part) frange_parts.append((start, end, modifier, chunk)) frange_types.extend(map(type, (start, end, chunk))) if '.' in part: has_decimal_notation = True # determine best type for numbers in range FrameType: type[decimal.Decimal | int] = int if decimal.Decimal in frange_types or has_decimal_notation: FrameType = decimal.Decimal self._has_subframes = True for start, end, modifier, chunk in frange_parts: # convert to Decimal for Range storage start_dec = decimal.Decimal(str(start)) end_dec = decimal.Decimal(str(end)) chunk_dec = decimal.Decimal(str(chunk)) # handle batched frames (1-100x5) if modifier == 'x': # calculate size mathematically without expansion range_size = int(abs(end_dec - start_dec) / abs(chunk_dec)) + 1 self._maxSizeCheck(range_size + len(self)) # check for overlap with existing ranges new_lo = min(start_dec, end_dec) new_hi = max(start_dec, end_dec) overlaps = any( new_lo <= max(r.start, r.end) and new_hi >= min(r.start, r.end) for r in self._ranges ) if not overlaps: actual_chunk = chunk_dec if end_dec >= start_dec else -chunk_dec self._ranges.append(Range(start_dec, end_dec, actual_chunk)) else: frame_range = xfrange(start, end, chunk, maxSize=maxSize) unique_frames: list[FrameValue] = [f for f in frame_range if f not in self] if unique_frames: self._ranges.extend(self._frames_to_ranges(unique_frames)) # handle staggered frames (1-100:5) elif modifier == ':': if '.' in str(chunk): raise ValueError("Unable to stagger subframes") # staggered frames must be expanded frames = [] seen = set(self) # track what's already in the frameset for stagger in range(chunk, 0, -1): frame_range = xfrange(start, end, stagger, maxSize=maxSize) for f in frame_range: if f not in seen: frames.append(f) seen.add(f) # mark as seen to avoid duplicates within stagger self._maxSizeCheck(len(frames) + len(self)) if frames: self._ranges.extend(self._frames_to_ranges(frames)) # handle filled frames (1-100y5) elif modifier == 'y': if '.' in str(chunk): raise ValueError("Unable to fill subframes") # filled frames must be expanded not_good = frozenset(xfrange(start, end, chunk, maxSize=maxSize)) all_frames = xfrange(start, end, 1, maxSize=maxSize) frames = [f for f in all_frames if f not in not_good and f not in self] self._maxSizeCheck(len(frames) + len(self)) if frames: self._ranges.extend(self._frames_to_ranges(frames)) # handle full ranges and single frames else: # calculate size mathematically without expansion range_size = int(abs(end_dec - start_dec)) + 1 self._maxSizeCheck(range_size + len(self)) # check for overlap with existing ranges new_lo = min(start_dec, end_dec) new_hi = max(start_dec, end_dec) overlaps = any( new_lo <= max(r.start, r.end) and new_hi >= min(r.start, r.end) for r in self._ranges ) actual_step = decimal.Decimal(1) if start < end else decimal.Decimal(-1) if not overlaps: self._ranges.append(Range(start_dec, end_dec, actual_step)) elif _all_ranges_contiguous(self._ranges): # fast path: all existing ranges are step-1 or step-(-1), so we can # compute the uncovered sub-intervals directly without per-frame iteration coverage = _merged_coverage(self._ranges) gaps = _gaps_in_range(new_lo, new_hi, coverage) if actual_step < _D0: # descending: append gaps in reverse so frame order matches start→end for gap_lo, gap_hi in reversed(gaps): self._ranges.append(Range(gap_hi, gap_lo, actual_step)) else: for gap_lo, gap_hi in gaps: self._ranges.append(Range(gap_lo, gap_hi, actual_step)) else: istep = 1 if start < end else -1 frame_range = xfrange(start, end, istep, maxSize=maxSize) unique_frames2: list[FrameValue] = [f for f in frame_range if f not in self] if unique_frames2: self._ranges.extend(self._frames_to_ranges(unique_frames2)) @property def is_null(self) -> bool: """ Read-only access to determine if the :class:`FrameSet` is the null or empty :class:`FrameSet`. Returns: bool: """ return not (self._frange and self._ranges) @property def frange(self) -> str: """ Read-only access to the frame range used to create this :class:`FrameSet`. Returns: str: """ return self._frange or '' @property def items(self) -> frozenset[FrameValue]: """ Read-only access to the unique frames that form this :class:`FrameSet`. .. deprecated:: 3.0 Direct access to `.items` triggers full expansion of the frame range. For large ranges, prefer iteration: `for frame in frameset` or membership testing: `frame in frameset`. Returns: frozenset: """ warnings.warn( "FrameSet.items triggers full frame expansion. " "For large ranges, use iteration or membership testing instead.", DeprecationWarning, stacklevel=2 ) return frozenset(self) @property def order(self) -> tuple[FrameValue, ...]: """ Read-only access to the ordered frames that form this :class:`FrameSet`. .. deprecated:: 3.0 Direct access to `.order` triggers full expansion of the frame range. For large ranges, prefer iteration: `for frame in frameset` or indexing: `frameset[i]`. Returns: tuple: """ warnings.warn( "FrameSet.order triggers full frame expansion. " "For large ranges, use iteration or indexing instead.", DeprecationWarning, stacklevel=2 ) return tuple(self)
[docs] @classmethod def from_iterable(cls, frames: typing.Iterable[FrameValue], sort: bool = False) -> FrameSet: """ Build a :class:`FrameSet` from an iterable of frames. Args: frames (collections.Iterable): an iterable object containing frames as integers sort (bool): True to sort frames before creation, default is False Returns: :class:`FrameSet`: """ return FrameSet(sorted(frames) if sort else frames)
def _detect_subframe_type(self, frames: typing.Iterable[FrameValue]) -> None: """Detect and set subframe type from frame values""" for f in frames: if isinstance(f, float) and f % 1 != 0: self._has_subframes = True self._subframe_type = float return elif isinstance(f, decimal.Decimal) and f % 1 != 0: self._has_subframes = True self._subframe_type = decimal.Decimal return @staticmethod def _frames_to_ranges(frames: typing.List[FrameValue]) -> list[Range]: """Convert a list of frames to a list of Range objects""" if not frames: return [] ranges: list[Range] = [] start_frame = decimal.Decimal(str(frames[0])) prev_frame = start_frame step: decimal.Decimal | None = None for frame in frames[1:]: curr_frame = decimal.Decimal(str(frame)) curr_step = curr_frame - prev_frame if step is None: step = curr_step elif step != curr_step: # end current range ranges.append(Range(start_frame, prev_frame, step)) start_frame = curr_frame step = None prev_frame = curr_frame # add final range if step is None: step = decimal.Decimal(1) ranges.append(Range(start_frame, prev_frame, step)) return ranges
[docs] @classmethod def from_range(cls, start: int, end: int, step: int = 1) -> FrameSet: """ Build a :class:`FrameSet` from given start and end frames (inclusive). Args: start (int): The first frame of the :class:`FrameSet`. end (int): The last frame of the :class:`FrameSet`. step (int, optional): Range step (default 1). Returns: :class:`FrameSet`: """ # match range() exception if not isinstance(step, int): raise TypeError("integer step argument expected, got {}." .format(type(step))) elif step == 0: raise ValueError("step argument must not be zero") elif step == 1: fstart, fend = normalizeFrames([start, end]) range_str = "{0}-{1}".format(fstart, fend) else: fstart, fend = normalizeFrames([start, end]) fstep = normalizeFrame(step) range_str = "{0}-{1}x{2}".format(fstart, fend, fstep) return FrameSet(range_str)
@classmethod def _cast_to_frameset(cls, other: typing.Any) -> FrameSet: """ Private method to simplify comparison operations. Args: other (:class:`FrameSet` or set or frozenset or iterable): item to be compared Returns: :class:`FrameSet` Raises: :class:`NotImplemented`: if a comparison is impossible """ if isinstance(other, FrameSet): return other try: return FrameSet(other) except Exception: return NotImplemented # type: ignore
[docs] def index(self, frame: int) -> int: """ Return the index of the given frame number within the :class:`FrameSet`. Args: frame (int): the frame number to find the index for Returns: int: Raises: :class:`ValueError`: if frame is not in self """ idx = 0 for r in self._ranges: if frame in r: # find position within this range frame_dec = decimal.Decimal(str(frame)) offset = abs(frame_dec - r.start) return idx + int(offset / abs(r.step)) idx += len(r) raise ValueError(f"{frame} is not in FrameSet")
[docs] def frame(self, index: int) -> FrameValue: """ Return the frame at the given index. Args: index (int): the index to find the frame for Returns: int: Raises: :class:`IndexError`: if index is out of bounds """ if index < 0: index = len(self) + index if index < 0 or index >= len(self): raise IndexError("index out of range") curr_idx = 0 for r in self._ranges: range_len = len(r) if curr_idx + range_len > index: # frame is in this range offset = index - curr_idx frame = r.start + (r.step * offset) return int(frame) if frame % 1 == 0 else frame curr_idx += range_len raise IndexError("index out of range")
[docs] def hasFrame(self, frame: int) -> bool: """ Check if the :class:`FrameSet` contains the frame or subframe Args: frame (int): the frame number to search for Returns: bool: """ return frame in self
[docs] def hasSubFrames(self) -> bool: """ Check if the :class:`FrameSet` contains any subframes Returns: bool: """ if self._has_subframes: return True for r in self._ranges: if r.start % 1 != 0 or r.end % 1 != 0 or r.step % 1 != 0: self._has_subframes = True return True return False
[docs] def start(self) -> FrameValue: """ The first frame in the :class:`FrameSet`. Returns: int: Raises: :class:`IndexError`: (with the empty :class:`FrameSet`) """ if not self._ranges: raise IndexError("FrameSet is empty") frame = self._ranges[0].start return int(frame) if frame % 1 == 0 else frame
[docs] def end(self) -> FrameValue: """ The last frame in the :class:`FrameSet`. Returns: int: Raises: :class:`IndexError`: (with the empty :class:`FrameSet`) """ if not self._ranges: raise IndexError("FrameSet is empty") r = self._ranges[-1] # actual last frame is start + step * (len - 1), not necessarily r.end frame = r.start + r.step * (len(r) - 1) return int(frame) if frame % 1 == 0 else frame
[docs] def isConsecutive(self) -> bool: """ Return whether the frame range represents consecutive integers, as opposed to having a stepping >= 2 Examples: >>> FrameSet('1-100').isConsecutive() True >>> FrameSet('1-100x2').isConsecutive() False >>> FrameSet('1-50,60-100').isConsecutive() False Returns: bool: """ if self.hasSubFrames(): return False if not self._ranges: return False # Each range must have step=1 or step=-1 (no skipping) for r in self._ranges: if abs(r.step) != 1: return False # Track covered [lo, hi] as ranges are visited in order. # A new range may extend lo or hi, but only if it doesn't # introduce a gap and doesn't extend in both directions. lo = hi = None extended_lo = extended_hi = False for r in self._ranges: r_lo = int(min(r.start, r.end)) r_hi = int(max(r.start, r.end)) if lo is None: lo, hi = r_lo, r_hi continue assert lo is not None assert hi is not None # Must overlap or be adjacent to current covered range if r_lo > hi + 1 or r_hi < lo - 1: return False if r_lo < lo: if extended_hi: return False extended_lo = True if r_hi > hi: if extended_lo: return False extended_hi = True lo = min(lo, r_lo) # type: ignore[type-var] hi = max(hi, r_hi) # type: ignore[type-var] return True
[docs] def frameRange(self, zfill: int = 0, decimal_places: int | None = None) -> str: """ Return the frame range used to create this :class:`FrameSet`, padded if desired. Examples: >>> FrameSet('1-100').frameRange() '1-100' >>> FrameSet('1-100').frameRange(5) '00001-00100' >>> FrameSet('1-100').frameRange(0, 1) '1.0-100.0' >>> FrameSet('1.0-100.0').frameRange() '1.0-100.0' Args: zfill (int): the width to use to zero-pad the frame range string decimal_places (int or None): the number of decimal places to use in frame range string Returns: str: """ return self.padFrameRange(self.frange, zfill, decimal_places)
[docs] def invertedFrameRange(self, zfill: int = 0, decimal_places: int | None = None) -> str: """ Return the inverse of the :class:`FrameSet` 's frame range, padded if desired. The inverse is every frame within the full extent of the range. Examples: >>> FrameSet('1-100x2').invertedFrameRange() '2-98x2' >>> FrameSet('1-100x2').invertedFrameRange(5) '00002-00098x2' If the inverted frame size exceeds ``fileseq.constants.MAX_FRAME_SIZE``, a ``MaxSizeException`` will be raised. Args: zfill (int): the width to use to zero-pad the frame range string decimal_places (int or None): the number of decimal places to use in frame range string Returns: str: Raises: :class:`fileseq.exceptions.MaxSizeException`: """ # No inverted frame range when range includes subframes if self.hasSubFrames(): return '' result: list[FrameValue] = [] frames = sorted(int(f) for f in self) for idx, frame in enumerate(frames[:-1]): next_frame = frames[idx + 1] if next_frame - frame != 1: r = range(frame + 1, next_frame) self._maxSizeCheck(len(r) + len(result)) result += r # type: ignore[arg-type] if not result: return '' return self.framesToFrameRange( result, zfill=zfill, sort=False, compress=False)
[docs] def normalize(self) -> FrameSet: """ Returns a new normalized (sorted and compacted) :class:`FrameSet`. Returns: :class:`FrameSet`: """ if self._normalized_cache is None: self._normalized_cache = self._normalize() # create new FrameSet from normalized ranges fs = self.__class__.__new__(self.__class__) fs._ranges = self._normalized_cache[:] fs._normalized_cache = fs._ranges[:] fs._hash_cache = None fs._has_subframes = self._has_subframes fs._subframe_type = self._subframe_type fs._frange = FrameSet.framesToFrameRange(fs, sort=False, compress=False) return fs
@overload def batches(self, batch_size: int, frames: typing.Literal[True]) -> typing.Iterator[_islice[FrameValue]]: ... @overload def batches(self, batch_size: int, frames: typing.Literal[False] = ...) -> typing.Iterator[FrameSet]: ...
[docs] def batches(self, batch_size: int, frames: bool = False) -> typing.Iterator[_islice[FrameValue]] | typing.Iterator[FrameSet]: """ Returns a generator that yields sub-batches of frames, up to ``batch_size``. If ``frames=False``, each batch is a new ``FrameSet`` subrange. If ``frames=True``, each batch is an islice generator object of the sub-range. Args: batch_size (int): max frame values in each batch frames (bool): if True, generate islice sub-ranges instead of FrameSets Returns: generator: yields batches of islice or FrameSet sub-ranges """ batch_it = batchIterable(self, batch_size) if frames: # They just want batches of the frame values return batch_it # return batches of FrameSet instance return (self.from_iterable(b) for b in batch_it)
[docs] def __getstate__(self) -> tuple[str]: """ Allows for serialization to a pickled :class:`FrameSet`. Returns: tuple: (frame range string, """ # we have to special-case the empty FrameSet, because of a quirk in # Python where __setstate__ will not be called if the return value of # bool(__getstate__) == False. A tuple with ('',) will return True. return (self.frange,)
[docs] def __setstate__(self, state: typing.Any) -> None: """ Allows for de-serialization from a pickled :class:`FrameSet`. Args: state (tuple or str or dict): A string/dict can be used for backwards compatibility Raises: ValueError: if state is not an appropriate type """ if isinstance(state, tuple): # this is to allow unpickling of "3rd generation" FrameSets, # which are immutable and may be empty. self.__init__(state[0]) # type: ignore[misc] elif isinstance(state, str): # this is to allow unpickling of "2nd generation" FrameSets, # which were mutable and could not be empty. self.__init__(state) # type: ignore[misc] elif isinstance(state, dict): # this is to allow unpickling of "1st generation" FrameSets, # when the full __dict__ was stored if '__frange' in state: # Old format - reconstruct from frange self.__init__(state['__frange']) # type: ignore[misc] elif '_frange' in state: # Reconstruct from frange self.__init__(state['_frange']) # type: ignore[misc] else: msg = "Unrecognized state data from which to deserialize FrameSet" raise ValueError(msg) else: msg = "Unrecognized state data from which to deserialize FrameSet" raise ValueError(msg)
[docs] def __getitem__(self, index: int | slice) -> FrameValue | FrameSet: """ Allows indexing into the ordered frames of this :class:`FrameSet`. Args: index (int or slice): the index to retrieve Returns: int or FrameSet: Raises: :class:`IndexError`: if index is out of bounds """ if isinstance(index, slice): # handle slicing without full expansion start, stop, step = index.indices(len(self)) frames = [self.frame(i) for i in range(start, stop, step)] return self.from_iterable(frames) return self.frame(index)
[docs] def __len__(self) -> int: """ Returns the length of the ordered frames of this :class:`FrameSet`. Returns: int: """ return sum(len(r) for r in self._ranges)
[docs] def __str__(self) -> str: """ Returns the frame range string of this :class:`FrameSet`. Returns: str: """ return self.frange
[docs] def __repr__(self) -> str: """ Returns a long-form representation of this :class:`FrameSet`. Returns: str: """ return '{0}("{1}")'.format(self.__class__.__name__, self.frange)
[docs] def __iter__(self) -> typing.Iterator[FrameValue]: """ Allows for iteration over the ordered frames of this :class:`FrameSet`. Returns: generator: """ for r in self._ranges: for frame in r: # convert to appropriate type based on FrameSet's subframe type if self._has_subframes: if self._subframe_type == float: yield float(frame) else: # decimal type yield frame if isinstance(frame, decimal.Decimal) else decimal.Decimal(frame) else: # integer-only FrameSet yield int(frame) if isinstance(frame, decimal.Decimal) else frame
[docs] def __reversed__(self) -> typing.Iterator[FrameValue]: """ Allows for reversed iteration over the ordered frames of this :class:`FrameSet`. Returns: generator: """ for r in reversed(self._ranges): frames = list(r) yield from reversed(frames)
[docs] def __contains__(self, item: object) -> bool: """ Check if item is a member of this :class:`FrameSet`. Args: item (int): the frame number to check for Returns: bool: """ for r in self._ranges: if item in r: # type: ignore[operator] return True return False
def _normalize(self) -> list[Range]: """Normalize ranges by expanding, sorting, and compacting""" # expand all frames all_frames = list(self) if not all_frames: return [] # sort frames all_frames.sort() # compact back to ranges return self._frames_to_ranges(all_frames)
[docs] def __hash__(self) -> int: """ Builds the hash of this :class:`FrameSet` for equality checking and to allow use as a dictionary key. Returns: int: """ if self._hash_cache is None: if self._normalized_cache is None: self._normalized_cache = self._normalize() self._hash_cache = hash(tuple( (r.start, r.end, r.step) for r in self._normalized_cache )) return self._hash_cache
[docs] def __lt__(self, other: object) -> typing.Any: """ Check if self < other via a comparison of the contents. If other is not a :class:`FrameSet`, but is a set, frozenset, or is iterable, it will be cast to a :class:`FrameSet`. Note: A :class:`FrameSet` is less than other if the set of its contents are less, OR if the contents are equal but the order of the items is less. .. code-block:: python :caption: Same contents, but (1,2,3,4,5) sorts below (5,4,3,2,1) >>> FrameSet("1-5") < FrameSet("5-1") True Args: other (:class:`FrameSet`): Can also be an object that can be cast to a :class:`FrameSet` Returns: bool: :class:`NotImplemented`: if `other` fails to convert to a :class:`FrameSet` """ other = self._cast_to_frameset(other) if other is NotImplemented: return NotImplemented # check if proper subset is_subset = all(f in other for f in self) if is_subset and len(self) < len(other): return True # same size, compare order lexicographically if len(self) == len(other): for a, b in zip(self, other): if a != b: return a < b return False
[docs] def __le__(self, other: object) -> typing.Any: """ Check if `self` <= `other` via a comparison of the contents. If `other` is not a :class:`FrameSet`, but is a set, frozenset, or is iterable, it will be cast to a :class:`FrameSet`. Args: other (:class:`FrameSet`): Also accepts an object that can be cast to a :class:`FrameSet` Returns: bool: :class:`NotImplemented`: if `other` fails to convert to a :class:`FrameSet` """ other = self._cast_to_frameset(other) if other is NotImplemented: return NotImplemented return self.issubset(other)
[docs] def __eq__(self, other: object) -> typing.Any: """ Check if `self` == `other` via a comparison of the hash of their contents. If `other` is not a :class:`FrameSet`, but is a set, frozenset, or is iterable, it will be cast to a :class:`FrameSet`. Args: other (:class:`FrameSet`): Also accepts an object that can be cast to a :class:`FrameSet` Returns: bool: :class:`NotImplemented`: if `other` fails to convert to a :class:`FrameSet` """ if not isinstance(other, FrameSet): if not isinstance(other, typing.Iterable): return NotImplemented other = self.from_iterable(other) # normalize both on first comparison, cache forever if self._normalized_cache is None: self._normalized_cache = self._normalize() if other._normalized_cache is None: other._normalized_cache = other._normalize() return self._normalized_cache == other._normalized_cache
[docs] def __ne__(self, other: object) -> typing.Any: """ Check if `self` != `other` via a comparison of the hash of their contents. If `other` is not a :class:`FrameSet`, but is a set, frozenset, or is iterable, it will be cast to a :class:`FrameSet`. Args: other (:class:`FrameSet`): Also accepts an object that can be cast to a :class:`FrameSet` Returns: bool: :class:`NotImplemented`: if `other` fails to convert to a :class:`FrameSet` """ is_equals = self == other if is_equals != NotImplemented: return not is_equals return is_equals
[docs] def __ge__(self, other: object) -> typing.Any: """ Check if `self` >= `other` via a comparison of the contents. If `other` is not a :class:`FrameSet`, but is a set, frozenset, or is iterable, it will be cast to a :class:`FrameSet`. Args: other (:class:`FrameSet`): Also accepts an object that can be cast to a :class:`FrameSet` Returns: bool: :class:`NotImplemented`: if `other` fails to convert to a :class:`FrameSet` """ other = self._cast_to_frameset(other) if other is NotImplemented: return NotImplemented return self.issuperset(other)
[docs] def __gt__(self, other: object) -> typing.Any: """ Check if `self` > `other` via a comparison of the contents. If `other` is not a :class:`FrameSet`, but is a set, frozenset, or is iterable, it will be cast to a :class:`FrameSet`. Note: A :class:`FrameSet` is greater than `other` if the set of its contents are greater, OR if the contents are equal but the order is greater. .. code-block:: python :caption: Same contents, but (1,2,3,4,5) sorts below (5,4,3,2,1) >>> FrameSet("1-5") > FrameSet("5-1") False Args: other (:class:`FrameSet`): Also accepts an object that can be cast to a :class:`FrameSet` Returns: bool: :class:`NotImplemented`: if `other` fails to convert to a :class:`FrameSet` """ other = self._cast_to_frameset(other) if other is NotImplemented: return NotImplemented # check if proper superset is_superset = all(f in self for f in other) if is_superset and len(self) > len(other): return True # same size, compare order lexicographically if len(self) == len(other): for a, b in zip(self, other): if a != b: return a > b return False
[docs] def __and__(self, other: object) -> typing.Any: """ Overloads the ``&`` operator. Returns a new :class:`FrameSet` that holds only the frames `self` and `other` have in common. Note: The order of operations is irrelevant: ``(self & other) == (other & self)`` Args: other (:class:`FrameSet`): Returns: :class:`FrameSet`: :class:`NotImplemented`: if `other` fails to convert to a :class:`FrameSet` """ other = self._cast_to_frameset(other) if other is NotImplemented: return NotImplemented # iterate and check membership without expanding result = [f for f in self if f in other] return self.from_iterable(result)
__rand__ = __and__
[docs] def __sub__(self, other: object) -> typing.Any: """ Overloads the ``-`` operator. Returns a new :class:`FrameSet` that holds only the frames of `self` that are not in `other.` Note: This is for left-hand subtraction (``self - other``). Args: other (:class:`FrameSet`): Returns: :class:`FrameSet`: :class:`NotImplemented`: if `other` fails to convert to a :class:`FrameSet` """ other = self._cast_to_frameset(other) if other is NotImplemented: return NotImplemented # iterate and check membership without expanding result = [f for f in self if f not in other] return self.from_iterable(result)
[docs] def __rsub__(self, other: object) -> typing.Any: """ Overloads the ``-`` operator. Returns a new :class:`FrameSet` that holds only the frames of `other` that are not in `self.` Note: This is for right-hand subtraction (``other - self``). Args: other (:class:`FrameSet`): Returns: :class:`FrameSet`: :class:`NotImplemented`: if `other` fails to convert to a :class:`FrameSet` """ other = self._cast_to_frameset(other) if other is NotImplemented: return NotImplemented # iterate and check membership without expanding result = [f for f in other if f not in self] return self.from_iterable(result)
[docs] def __or__(self, other: object) -> typing.Any: """ Overloads the ``|`` operator. Returns a new :class:`FrameSet` that holds all the frames in `self,` `other,` or both. Note: The order of operations is irrelevant: ``(self | other) == (other | self)`` Args: other (:class:`FrameSet`): Returns: :class:`FrameSet`: :class:`NotImplemented`: if `other` fails to convert to a :class:`FrameSet` """ other = self._cast_to_frameset(other) if other is NotImplemented: return NotImplemented # combine both without expanding, iterate and deduplicate seen = set() result = [] for f in self: if f not in seen: seen.add(f) result.append(f) for f in other: if f not in seen: seen.add(f) result.append(f) return self.from_iterable(result, sort=True)
__ror__ = __or__
[docs] def __xor__(self, other: object) -> typing.Any: """ Overloads the ``^`` operator. Returns a new :class:`FrameSet` that holds all the frames in `self` or `other` but not both. Note: The order of operations is irrelevant: ``(self ^ other) == (other ^ self)`` Args: other (:class:`FrameSet`): Returns: :class:`FrameSet`: :class:`NotImplemented`: if `other` fails to convert to a :class:`FrameSet` """ other = self._cast_to_frameset(other) if other is NotImplemented: return NotImplemented # frames in self but not other, plus frames in other but not self result = [f for f in self if f not in other] result.extend(f for f in other if f not in self) return self.from_iterable(result, sort=True)
__rxor__ = __xor__
[docs] def isdisjoint(self, other: typing.Any) -> bool | NotImplemented: # type: ignore """ Check if the contents of :class:self has no common intersection with the contents of :class:other. Args: other (:class:`FrameSet`): Returns: bool: :class:`NotImplemented`: if `other` fails to convert to a :class:`FrameSet` """ other = self._cast_to_frameset(other) if other is NotImplemented: return NotImplemented # type: ignore # check if any frame in self is in other for f in self: if f in other: return False return True
[docs] def issubset(self, other: typing.Any) -> bool | NotImplemented: # type: ignore """ Check if the contents of `self` is a subset of the contents of `other.` Args: other (:class:`FrameSet`): Returns: bool: :class:`NotImplemented`: if `other` fails to convert to a :class:`FrameSet` """ other = self._cast_to_frameset(other) if other is NotImplemented: return NotImplemented # type: ignore # check if all frames in self are in other for f in self: if f not in other: return False return True
[docs] def issuperset(self, other: typing.Any) -> bool | NotImplemented: # type: ignore """ Check if the contents of `self` is a superset of the contents of `other.` Args: other (:class:`FrameSet`): Returns: bool: :class:`NotImplemented`: if `other` fails to convert to a :class:`FrameSet` """ other = self._cast_to_frameset(other) if other is NotImplemented: return NotImplemented # type: ignore # check if all frames in other are in self for f in other: if f not in self: return False return True
[docs] def union(self, *other: typing.Iterable[FrameValue]) -> FrameSet: """ Returns a new :class:`FrameSet` with the elements of `self` and of `other`. Args: other (:class:`FrameSet`): or objects that can cast to :class:`FrameSet` Returns: :class:`FrameSet`: """ seen = set() result = [] for f in self: if f not in seen: seen.add(f) result.append(f) for o in other: for f in o: if f not in seen: seen.add(f) result.append(f) return self.from_iterable(result, sort=True)
[docs] def intersection(self, *other: typing.Iterable[FrameValue]) -> FrameSet: """ Returns a new :class:`FrameSet` with the elements common to `self` and `other`. Args: other (:class:`FrameSet`): or objects that can cast to :class:`FrameSet` Returns: :class:`FrameSet`: """ result = list(self) for o in other: o_set = set(o) result = [f for f in result if f in o_set] return self.from_iterable(result)
[docs] def difference(self, *other: typing.Iterable[FrameValue]) -> FrameSet: """ Returns a new :class:`FrameSet` with elements in `self` but not in `other`. Args: other (:class:`FrameSet`): or objects that can cast to :class:`FrameSet` Returns: :class:`FrameSet`: """ exclude: set[FrameValue] = set() for o in other: exclude.update(o) result = [f for f in self if f not in exclude] return self.from_iterable(result)
[docs] def symmetric_difference(self, other: typing.Any) -> FrameSet: """ Returns a new :class:`FrameSet` that contains all the elements in either `self` or `other`, but not both. Args: other (:class:`FrameSet`): Returns: :class:`FrameSet`: """ other = self._cast_to_frameset(other) if other is NotImplemented: return NotImplemented # type: ignore # frames in self but not other, plus frames in other but not self result = [f for f in self if f not in other] result.extend(f for f in other if f not in self) return self.from_iterable(result, sort=True)
[docs] def copy(self) -> FrameSet: """ Create a deep copy of this :class:`FrameSet`. Returns: :class:`.FrameSet`: """ fs = self.__class__.__new__(self.__class__) fs._frange = self._frange fs._ranges = self._ranges[:] fs._normalized_cache = self._normalized_cache[:] if self._normalized_cache else None fs._hash_cache = self._hash_cache fs._has_subframes = self._has_subframes fs._subframe_type = self._subframe_type return fs
@classmethod def _maxSizeCheck(cls, obj: int | float | decimal.Decimal | Sized | typing.Any) -> None: """ Raise a MaxSizeException if ``obj`` exceeds MAX_FRAME_SIZE Args: obj (numbers.Number or collection): Raises: :class:`fileseq.exceptions.MaxSizeException`: """ fail = False size = 0 if isinstance(obj, numbers.Number): if obj > constants.MAX_FRAME_SIZE: # type: ignore fail = True size = obj # type: ignore elif hasattr(obj, '__len__'): size = len(obj) fail = size > constants.MAX_FRAME_SIZE if fail: raise MaxSizeException('Frame size %s > %s (MAX_FRAME_SIZE)' % (size, constants.MAX_FRAME_SIZE))
[docs] @classmethod def isFrameRange(cls, frange: str) -> bool: """ Return True if the given string is a frame range. Any padding characters, such as '#' and '@' are ignored. Args: frange (str): a frame range to test Returns: bool: """ # we're willing to trim padding characters from consideration # this translation is orders of magnitude faster than prior method frange = str(frange) for key in cls.PAD_MAP: frange = frange.replace(key, '') if not frange: return True for part in asString(frange).split(','): if not part: continue try: cls._parse_frange_part(part) except ParseException: return False return True
[docs] @classmethod def padFrameRange(cls, frange: str, zfill: int, decimal_places: int | None = None) -> str: """ Return the zero-padded version of the frame range string. Args: frange (str): a frame range to test zfill (int): decimal_places (int or None): Returns: str: """ def _do_pad(match: typing.Any) -> str: """ Substitutes padded for unpadded frames. """ result = list(match.groups()) neg, start = result[:2] result[:2] = [pad(neg + start, zfill, decimal_places)] neg, end = result[2:4] if end: result[2:4] = [pad(neg + end, zfill, decimal_places)] return ''.join((i for i in result if i)) return cls.PAD_RE.sub(_do_pad, frange)
@classmethod def _parse_frange_part(cls, frange: str) -> tuple[int, int, str, int]: """ Internal method: parse a discrete frame range part. Args: frange (str): single part of a frame range as a string (ie "1-100x5") Returns: tuple: (start, end, modifier, chunk) Raises: :class:`.ParseException`: if the frame range can not be parsed """ match = cls.FRANGE_RE.match(frange) if not match: msg = 'Could not parse "{0}": did not match {1}' raise ParseException(msg.format(frange, cls.FRANGE_RE.pattern)) start, end, modifier, chunk = match.groups() start = normalizeFrame(start) end = normalizeFrame(end) if end is not None else start chunk = normalizeFrame(chunk) if chunk is not None else 1 if end > start and chunk is not None and chunk < 0: # type: ignore[operator] msg = 'Could not parse "{0}: chunk can not be negative' raise ParseException(msg.format(frange)) # a zero chunk is just plain illogical if chunk == 0: msg = 'Could not parse "{0}": chunk cannot be 0' raise ParseException(msg.format(frange)) return start, end, modifier, abs(chunk) # type: ignore @staticmethod def _build_frange_part(start: object, stop: object, stride: FrameValue | None, zfill: int = 0) -> str: """ Private method: builds a proper and padded frame range string. Args: start (int or decimal.Decimal): first frame stop (int or or decimal.Decimal or None): last frame stride (int or None): increment zfill (int): width for zero padding Returns: str: """ if stop is None: return '' pad_start = pad(start, zfill) pad_stop = pad(stop, zfill) if stride is None or start == stop: return '{0}'.format(pad_start) elif abs(stride) == 1: return '{0}-{1}'.format(pad_start, pad_stop) else: stride = normalizeFrame(stride) return '{0}-{1}x{2}'.format(pad_start, pad_stop, stride) @staticmethod def _build_frange_part_decimal( start: decimal.Decimal, stop: decimal.Decimal, count: int, stride: decimal.Decimal | None, min_stride: decimal.Decimal, max_stride: decimal.Decimal, zfill: int = 0 ) -> str: """ Private method: builds a proper and padded subframe range string from decimal values. Args: start (decimal.Decimal): first frame stop (decimal.Decimal): last frame count (int): number of frames in range (inclusive) stride (decimal.Decimal or None): stride to use if known else None min_stride (decimal.Decimal): minimum increment that will produce correctly rounded frames max_stride (decimal.Decimal): maximum increment that will produce correctly rounded frames zfill (int): width for zero padding Returns: str: """ if stride is None: # Use an exact stride value if within allowed limits for # range, otherwise use midpoint of stride limits stride = (stop - start) / (count - 1) if not min_stride <= stride <= max_stride: stride = (min_stride + max_stride) / 2 # Minimise number of decimal places in stride stride_range = max_stride - min_stride stride_range_tup = stride_range.as_tuple() leading_zeros = abs(len(stride_range_tup.digits) + int(stride_range_tup.exponent)) stride = abs(quantize(stride, leading_zeros + 1)).normalize() assert isinstance(stride, decimal.Decimal) # Adjust end frame if required so correct number of steps is # calculated when recreating FrameSet from frange string while abs(stop - start) / stride + 1 < count: exponent = int(stop.as_tuple().exponent) delta = decimal.Decimal(1).scaleb(exponent) stop += delta.copy_sign(stop) start, stop = normalizeFrames([start, stop]) # type:ignore[assignment] return FrameSet._build_frange_part(start, stop, stride, zfill=zfill) @staticmethod def _framesToFrameRangesFloat( frames: list[int | float], zfill: int = 0 ) -> typing.Iterator[str]: """ Converts a sequence of int/float frames to a series of padded frame range strings. Args: frames (list[int | float]): sequence of frames to process zfill (int): width for zero padding Yields: str: """ _build = FrameSet._build_frange_part curr_start: int | float | None = None curr_stride: int | float | None = None curr_frame: int | float last_frame: int | float | None = None curr_count = 0 if not frames: return for curr_frame in frames: if curr_start is None: curr_start = curr_frame last_frame = curr_frame curr_count += 1 continue if curr_stride is None: curr_stride = abs(curr_frame - curr_start) assert last_frame is not None new_stride = abs(curr_frame - last_frame) if curr_stride == new_stride: curr_count += 1 elif curr_count == 2 and curr_stride != 1: yield _build(curr_start, curr_start, None, zfill) curr_start = last_frame curr_stride = new_stride else: yield _build(curr_start, last_frame, curr_stride, zfill) curr_stride = None curr_start = curr_frame curr_count = 1 last_frame = curr_frame if curr_count == 2 and curr_stride != 1: yield _build(curr_start, curr_start, None, zfill) yield _build(curr_frame, curr_frame, None, zfill) else: yield _build(curr_start, curr_frame, curr_stride, zfill) @staticmethod def _framesToFrameRangesDecimal( frames: list[decimal.Decimal], zfill: int = 0 ) -> typing.Iterator[str]: """ Converts a sequence of Decimal frames to a series of padded frame range strings. Args: frames (list[decimal.Decimal]): sequence of frames to process zfill (int): width for zero padding Yields: str: """ _build = FrameSet._build_frange_part _build_decimal = FrameSet._build_frange_part_decimal curr_start: decimal.Decimal | None = None curr_stride: decimal.Decimal | None = None curr_strides: set[decimal.Decimal] = set() curr_min_stride: decimal.Decimal | None = None curr_max_stride: decimal.Decimal | None = None curr_frame: decimal.Decimal last_frame: decimal.Decimal | None = None curr_count = 0 if not frames: return for curr_frame in frames: if curr_start is None: curr_start = curr_frame last_frame = curr_frame curr_count += 1 continue if curr_stride is None: curr_stride = abs(curr_frame - curr_start) curr_strides = {curr_stride} assert last_frame is not None new_stride = abs(curr_frame - last_frame) # Handle decimal strides and frame rounding # Check whether stride difference could be caused by rounding max_stride_delta: decimal.Decimal if len(curr_strides) == 1: stride_delta = abs(curr_stride - new_stride) exponent = int(stride_delta.as_tuple().exponent) max_stride_delta = decimal.Decimal(1).scaleb(exponent) if stride_delta <= max_stride_delta: curr_strides.add(new_stride) if new_stride in curr_strides: # Find minimum frame value that rounds to current min_frame = (curr_frame - max_stride_delta / 2) # type: ignore[possibly-undefined] while min_frame.quantize(curr_frame) != curr_frame: min_frame = min_frame.next_plus() # Find maximum frame value that rounds to current max_frame = (curr_frame + max_stride_delta / 2) while max_frame.quantize(curr_frame) != curr_frame: max_frame = max_frame.next_minus() # Adjust min stride limit until frame rounds to current while True: new_min_stride = (min_frame - curr_start) / curr_count test_frame = curr_start + new_min_stride * curr_count if test_frame.quantize(curr_frame) == curr_frame: break min_frame = min_frame.next_plus() # Adjust max stride limit until frame rounds to current while True: new_max_stride = (max_frame - curr_start) / curr_count test_frame = curr_start + new_max_stride * curr_count if test_frame.quantize(curr_frame) == curr_frame: break max_frame = max_frame.next_minus() # Update minimum and maximum stride values for overall range if curr_min_stride is not None: new_min_stride = max(curr_min_stride, new_min_stride) if curr_max_stride is not None: new_max_stride = min(curr_max_stride, new_max_stride) # A stride exists that rounds all frame values correctly if new_min_stride <= new_max_stride: new_stride = curr_stride curr_min_stride = new_min_stride curr_max_stride = new_max_stride if curr_stride == new_stride: curr_count += 1 elif curr_count == 2 and curr_stride != 1: yield _build(curr_start, curr_start, None, zfill) curr_start = last_frame curr_stride = new_stride curr_strides = {new_stride} curr_min_stride = None curr_max_stride = None else: stride = curr_strides.pop() if len(curr_strides) == 1 else None assert curr_start is not None if curr_stride is None: yield _build(curr_start, curr_frame, curr_stride, zfill) else: assert curr_min_stride is not None assert curr_max_stride is not None yield _build_decimal(curr_start, last_frame, curr_count, stride, curr_min_stride, curr_max_stride, zfill) curr_stride = None curr_strides = set() curr_min_stride = None curr_max_stride = None curr_start = curr_frame curr_count = 1 last_frame = curr_frame if curr_count == 2 and curr_stride != 1: yield _build(curr_start, curr_start, None, zfill) yield _build(curr_frame, curr_frame, None, zfill) else: stride = curr_strides.pop() if len(curr_strides) == 1 else None assert curr_start is not None if curr_stride is None: yield _build(curr_start, curr_frame, curr_stride, zfill) else: assert curr_min_stride is not None assert curr_max_stride is not None yield _build_decimal(curr_start, curr_frame, curr_count, stride, curr_min_stride, curr_max_stride, zfill)
[docs] @staticmethod def framesToFrameRanges( frames: typing.Iterable[int | float | decimal.Decimal | str], zfill: int = 0 ) -> typing.Iterator[str]: """ Converts a sequence of frames to a series of padded frame range strings. Args: frames (collections.Iterable): sequence of frames to process zfill (int): width for zero padding Yields: str: """ # Ensure all frame values are of same type frames = normalizeFrames(frames) # Dispatch to appropriate specialized method based on frame type # Handle empty frames by defaulting to float method if frames and isinstance(frames[0], decimal.Decimal): yield from FrameSet._framesToFrameRangesDecimal(frames, zfill) # type: ignore[arg-type] else: yield from FrameSet._framesToFrameRangesFloat(frames, zfill) # type: ignore[arg-type]
[docs] @staticmethod def framesToFrameRange( frames: typing.Iterable[int | float | decimal.Decimal | str], sort: bool = True, zfill: int = 0, compress: bool = False ) -> str: """ Converts an iterator of frames into a frame range string. Args: frames (collections.Iterable): sequence of frames to process sort (bool): sort the sequence before processing zfill (int): width for zero padding compress (bool): remove any duplicates before processing Returns: str: """ if compress: frames = unique(set(), frames) frames = list(frames) if not frames: return '' if len(frames) == 1: return pad(frames[0], zfill) if sort: frames.sort() ret = ','.join(FrameSet.framesToFrameRanges(frames, zfill)) return str(ret)