# Copyright 2019 Jake Magers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Transducers, composable algorithmic transformations.
Notable features of transducers:
* Are decoupled from the context in which they are used. This means they
can be reused with any transducible process, including iterables and
channels.
* Are composable with simple function composition. See :func:`comp`.
* Support early termination via :any:`reduced` values.
Creating transducers:
Transducers are also known as :any:`reducing function` transformers. They
are simply functions that accept a reducing function as input and return a
new reducing function as output.
See `clojure.org <https://clojure.org/reference/transducers>`_ for more
information about transducers.
"""
import functools as _functools
import itertools as _itertools
import random as _random
from collections import deque as _deque
class _Undefined:
"""A default parameter value that a user could never pass in."""
[docs]def identity(x):
"""A NOP :any:`transducer` that simply returns its argument."""
return x
[docs]def comp(*xforms):
"""Returns a new :any:`transducer` equal to the composition of `xforms`.
The returned transducer passes values through the given transformations
from left to right.
Args:
xforms: Transducers.
"""
return _functools.reduce(lambda f, g: lambda x: f(g(x)), xforms, identity)
[docs]def multi_arity(*funcs):
"""Returns a new multi-arity function which dispatches to `funcs`.
The returned function will dispatch to the provided functions based on the
number of positional arguments it was called with. If called with zero
arguments it will dispatch to the first function in `funcs`, if called with
one argument it will dispatch to the second function in `funcs`, etc.
Args:
funcs: Functions to dispatch to. Each function represents a different
arity for the returned function. None values may be used to
represent arities that don't exist.
"""
def dispatch(*args):
try:
func = funcs[len(args)]
if func is None:
raise IndexError
except IndexError:
raise TypeError(f'wrong number of arguments, got {len(args)}')
return func(*args)
return dispatch
[docs]class reduced:
"""Wraps `x` in such a way that a reduce will terminate with `x`.
A :any:`reducing function` can return ``reduced(x)`` to terminate a
reduction early with the value `x`.
If used with a transduce function such as :func:`itransduce`, the
reduction will terminate with the result of invoking the completion arity
with `x`.
"""
def __init__(self, x):
self._value = x
[docs]def is_reduced(x):
"""Returns True if `x` is the result from a call to :any:`reduced`."""
return isinstance(x, reduced)
[docs]def ensure_reduced(x):
"""Returns ``reduced(x)`` if `x` is not already a :any:`reduced` value."""
return x if is_reduced(x) else reduced(x)
[docs]def unreduced(x):
"""Returns `x` if it's not a :any:`reduced` value else returns the unwrapped value."""
return x._value if is_reduced(x) else x
[docs]def completing(rf, cf=identity):
"""Returns a wrapper around `rf` that calls `cf` when invoked with one argument.
Args:
rf: A :any:`reducing function`.
cf: An optional function that accepts a single argument. Used as the
completion arity for the returned :any:`reducing function`.
Returns:
A :any:`reducing function` that dispatches to `cf` when called with a single
argument or `rf` when called with any other number of arguments.
"""
@_functools.wraps(rf)
def wrapper(*args):
if len(args) == 1:
return cf(*args)
return rf(*args)
return wrapper
def _ireduce(rf, init, coll):
result = init
for x in coll:
result = rf(result, x)
if is_reduced(result):
return unreduced(result)
return result
[docs]def ireduce(rf, init, coll=_Undefined):
"""
ireduce(rf, init, coll) -> reduction result
*ireduce(rf, coll) -> reduction result*
Returns the result of reducing an iterable.
Reduces `coll` by repeatedly calling `rf` with 2 arguments. If `coll` is
empty, then `init` will be returned. If `coll` is not empty, then the first
call to `rf` will be ``rf(init, first_coll_value)``. `rf` will continue to
get called as ``rf(prev_rf_return, next_coll_value)`` until either `coll`
is exhausted or `rf` returns a :any:`reduced` value.
Args:
rf: A :any:`reducing function` accepting 2 arguments. If `init` is not
provided, then `rf` must return a value to be used as `init` when
called with 0 arguments.
init: An optional initial value.
coll: An iterable.
See Also:
:any:`reduced`
:func:`itransduce`
"""
if coll is _Undefined:
return _ireduce(rf, rf(), init)
return _ireduce(rf, init, coll)
def _itransduce(xform, rf, init, coll):
xrf = xform(rf)
return xrf(ireduce(xrf, init, coll))
[docs]def itransduce(xform, rf, init, coll=_Undefined):
"""
itransduce(xform, rf, init, coll) -> reduction result
*itransduce(xform, rf, coll) -> reduction result*
Returns the result of reducing an iterable with a transformation.
Reduces `coll` using a transformed reducing function equal to
``xform(rf)``. See :func:`ireduce` for more information on reduction. After
the transformed reducing function has received all input it will be called
once more with a single argument, the result thus far.
Args:
xform: A :any:`transducer`.
rf: A :any:`reducing function` accepting both 1 and 2 arguments.
If `init` is not provided, then `rf` must return a value to be used
as `init` when called with 0 arguments.
init: An optional initial value.
coll: An iterable.
See Also:
:func:`ireduce`
"""
if coll is _Undefined:
return _itransduce(xform, rf, rf(), init)
return _itransduce(xform, rf, init, coll)
[docs]def append(appendable=_Undefined, val=_Undefined):
"""
append(appendable, val) -> appended result
| *append(appendable) -> appendable*
| *append() -> []*
A :any:`reducing function` that appends `val` to `appendable`.
"""
if appendable is _Undefined:
return []
if val is _Undefined:
return appendable
appendable.append(val)
return appendable
[docs]def into(appendable, xform, coll):
"""Transfers all values from `coll` into `appendable` with a transformation.
Same as :func:`itransduce(xform, append, appendable, coll) <itransduce>`.
"""
return itransduce(xform, append, appendable, coll)
[docs]def xiter(xform, coll):
"""Returns an iterator over the transformed elements in `coll`.
Useful for when you want to transform an iterable into another iterable
in a lazy fashion.
Args:
xform: A :any:`transducer`.
coll: A potentially infinite iterable.
"""
buffer = _deque()
def flush_buffer(buf):
assert buf is buffer, 'xform returned invalid value'
while len(buf) > 0:
yield buf.popleft()
xrf = xform(append)
for x in coll:
ret = xrf(buffer, x)
yield from flush_buffer(unreduced(ret))
if is_reduced(ret):
break
yield from flush_buffer(xrf(buffer))
def _step_safety(step):
"""A decorator for step functions to help with debugging reduced cases.
Args:
step: A :any:`reducing function` that accepts 2 arguments.
Returns:
A wrapper function that adds an assertion that the `step` function will
never get called again once a :any:`reduced` value is returned.
"""
end_of_input = False
@_functools.wraps(step)
def safe_step(result, val):
nonlocal end_of_input
assert not end_of_input, 'step cannot get called after reduced value is returned'
ret = step(result, val)
if is_reduced(ret):
end_of_input = True
return ret
return safe_step
[docs]def map(f):
"""Returns a :any:`transducer` that applies `f` to each input.
Args:
f: A function, ``f(input) -> any``.
See Also:
:func:`map_indexed`
"""
return lambda rf: multi_arity(rf, rf, lambda result, val: rf(result, f(val)))
[docs]def map_indexed(f):
"""Returns a :any:`transducer` that transforms using ``f(index, value)``.
The returned transducer applies `f` to each value with the corresponding
index. `f` will be called as ``f(index, value)`` where `index` represents
the nth `value` to be passed into the transformation starting at 0.
Args:
f: A function, ``f(index, value) -> any``.
See Also:
:func:`chanpy.transducers.map`
"""
def xform(rf):
i = -1
def step(result, val):
nonlocal i
i += 1
return rf(result, f(i, val))
return multi_arity(rf, rf, step)
return xform
[docs]def filter(pred):
"""Returns a :any:`transducer` that outputs values for which predicate returns True.
Args:
pred: A predicate function, ``pred(value) -> bool``.
See Also:
:func:`filter_indexed`
:func:`remove`
"""
return lambda rf: multi_arity(rf, rf,
lambda result, val: (rf(result, val)
if pred(val)
else result))
[docs]def filter_indexed(f):
"""Returns a :any:`transducer` which filters values based on ``f(index, value)``.
The returned transducer outputs values that return True when passed into
`f` with the corresponding index. `f` will be called as ``f(index, value)``
where `index` represents the nth `value` to be passed into the
transformation starting at 0.
Args:
f: A function, ``f(index, value) -> bool``.
See Also:
:func:`filter`
:func:`remove_indexed`
"""
return comp(map_indexed(lambda i, x: x if f(i, x) else _Undefined),
filter(lambda x: x is not _Undefined))
[docs]def remove(pred):
"""Returns a :any:`transducer` that drops values for which predicate returns True.
Args:
pred: A predicate function, ``pred(value) -> bool``.
See Also:
:func:`filter`
:func:`remove_indexed`
"""
return filter(lambda x: not pred(x))
[docs]def remove_indexed(f):
"""Returns a :any:`transducer` which drops values based on ``f(index, value)``.
The returned transducer drops values that return True when passed into `f`
with the corresponding index. `f` will be called as ``f(index, value)``
where `index` represents the nth `value` to be passed into the
transformation starting at 0.
Args:
f: A function, ``f(index, value) -> bool``.
See Also:
:func:`filter_indexed`
:func:`remove`
"""
return filter_indexed(lambda i, x: not f(i, x))
[docs]def keep(f):
"""Returns a :any:`transducer` that outputs the non-None return values of ``f(value)``.
See Also:
:func:`keep_indexed`
"""
return comp(map(f), filter(lambda x: x is not None))
[docs]def keep_indexed(f):
"""Returns a :any:`transducer` that outputs the non-None return values of ``f(index, value)``.
The returned transducer outputs the non-None return values of
``f(index, value)`` where `index` represents the nth `value` to be passed
into the transformation starting at 0.
Args:
f: A function, ``f(index, value) -> any``.
See Also:
:func:`keep`
"""
return comp(map_indexed(f), filter(lambda x: x is not None))
[docs]def cat(rf):
"""A :any:`transducer` that concatenates the contents of its inputs.
Expects each input to be an iterable, the contents of which will be
outputted one at a time.
See Also:
:func:`mapcat`
"""
def double_reduced_rf(result, val):
ret = rf(result, val)
return reduced(ret) if is_reduced(ret) else ret
return multi_arity(rf, rf, _functools.partial(ireduce, double_reduced_rf))
[docs]def mapcat(f):
"""Returns a :any:`transducer` that applies `f` to each input and concatenates the result."""
return comp(map(f), cat)
[docs]def take(n):
"""Returns a :any:`transducer` that outputs the first `n` inputs.
The returned transducer outputs the first `n` inputs if `n` < the number of
inputs. If `n` >= the number of inputs, then outputs all of them.
Args:
n: A number.
"""
def xform(rf):
remaining = n
@_step_safety
def step(result, val):
nonlocal remaining
new_result = rf(result, val) if remaining > 0 else result
remaining -= 1
return ensure_reduced(new_result) if remaining <= 0 else new_result
return multi_arity(rf, rf, step)
return xform
[docs]def take_last(n):
"""Returns a :any:`transducer` that outputs the last `n` inputs.
The returned transducer outputs the last `n` inputs if `n` < the number of
inputs. If `n` >= the number of inputs, then outputs all of them.
Note:
No values will be outputted until the completion arity is called.
Args:
n: A number.
"""
def xform(rf):
buffer = _deque()
def step(result, val):
buffer.append(val)
if len(buffer) > n:
buffer.popleft()
return result
def complete(result):
new_result = result
while len(buffer) > 0:
new_result = rf(new_result, buffer.popleft())
if is_reduced(new_result):
buffer.clear()
return rf(unreduced(new_result))
return multi_arity(rf, complete, step)
return xform
[docs]def take_nth(n):
"""Returns a :any:`transducer` that outputs every nth input starting with the first.
Args:
n: A positive int.
"""
if n < 1 or n != int(n):
raise ValueError('n must be a positive int')
return filter_indexed(lambda i, _: i % n == 0)
[docs]def take_while(pred):
"""Returns a :any:`transducer` that outputs values until the predicate returns False.
Args:
pred: A predicate function, ``f(value) -> bool``.
"""
def xform(rf):
@_step_safety
def step(result, val):
return rf(result, val) if pred(val) else reduced(result)
return multi_arity(rf, rf, step)
return xform
[docs]def drop(n):
"""Returns a :any:`transducer` that drops the first `n` inputs.
The returned transducer drops the first `n` inputs if `n` < the number of
inputs. If `n` >= the number of inputs, then drops all of them.
Args:
n: A number.
"""
def xform(rf):
remaining = n
def step(result, val):
nonlocal remaining
remaining -= 1
return result if remaining > -1 else rf(result, val)
return multi_arity(rf, rf, step)
return xform
[docs]def drop_last(n):
"""Returns a :any:`transducer` that drops the last `n` values.
The returned transducer drops the last `n` inputs if `n` < the number of
inputs. If `n` >= the number of inputs, then drops all of them.
Args:
n: A number.
Note:
No values will be outputted until `n` inputs have been received.
"""
def xform(rf):
buffer = _deque()
def step(result, val):
buffer.append(val)
if len(buffer) > n:
return rf(result, buffer.popleft())
return result
def complete(result):
buffer.clear()
return rf(result)
return multi_arity(rf, complete, step)
return xform
[docs]def drop_while(pred):
"""Returns a :any:`transducer` that drops inputs until the predicate returns False.
Args:
pred: A predicate function, ``pred(input) -> bool``.
"""
def xform(rf):
has_taken = False
def step(result, val):
nonlocal has_taken
if not has_taken and pred(val):
return result
has_taken = True
return rf(result, val)
return multi_arity(rf, rf, step)
return xform
[docs]def distinct(rf):
"""A :any:`transducer` that drops duplicate values."""
prev_vals = set()
def step(result, val):
if val in prev_vals:
return result
prev_vals.add(val)
return rf(result, val)
def complete(result):
prev_vals.clear()
return rf(result)
return multi_arity(rf, complete, step)
[docs]def dedupe(rf):
"""A :any:`transducer` that drops consecutive duplicate values."""
prev_val = _Undefined
def step(result, val):
nonlocal prev_val
if val == prev_val:
return result
prev_val = val
return rf(result, val)
return multi_arity(rf, rf, step)
[docs]def partition_all(n, step=None):
"""Returns a :any:`transducer` that partitions all values.
The returned transducer partitions values into tuples of size `n` that are
`step` items apart. Partitions at the end may have a size < `n`.
* If `step` < `n`, partitions will overlap `n` - `step` elements.
* If `step` == `n`, the default, no overlapping or dropping will occur.
* If `step` > `n`, `step` - `n` values will be dropped between partitions.
Args:
n: An optional positive int representing the size of each partition
(may be less for partitions at the end).
step: An optional positive int used as the offset between partitions.
Defaults to `n`.
See Also:
:func:`partition`
"""
if step is None:
step = n
if n < 1 or n != int(n):
raise ValueError('n must be a positive integer')
if step < 1 or step != int(step):
raise ValueError('step must be a positive integer')
def xform(rf):
buffer = []
remaining_drops = 0
def step_f(result, val):
nonlocal buffer, remaining_drops
if remaining_drops > 0:
remaining_drops -= 1
return result
buffer.append(val)
if len(buffer) < n:
return result
ret = rf(result, tuple(buffer))
remaining_drops = max(0, step - n)
buffer = [] if is_reduced(ret) else buffer[step:]
return ret
def complete(result):
nonlocal buffer
new_result = result
while len(buffer) > 0:
buf = tuple(buffer)
buffer = buffer[step:]
new_result = rf(new_result, buf)
if is_reduced(new_result):
buffer.clear()
return rf(unreduced(new_result))
return multi_arity(rf, complete, step_f)
return xform
[docs]def partition(n, step=None, pad=None):
"""Returns a :any:`transducer` that partitions values into tuples of size `n`.
The returned transducer partitions the values into tuples of size `n` that
are `step` items apart.
* If `step` < `n`, partitions will overlap `n` - `step` elements.
* If `step` == `n`, the default, no overlapping or dropping will occur.
* If `step` > `n`, `step` - `n` values will be dropped between partitions.
If the last partition size is greater than 0 but less than `n`:
* If `pad` is None, the last partition is discarded.
* If `pad` exists, its values will be used to fill the partition to a
desired size of `n`. The padded partition will be outputted even if its
size is < `n`.
Args:
n: A positive int representing the length of each partition. The last
partition may be < `n` if `pad` is provided.
step: An optional positive int used as the offset between partitions.
pad: An optional iterable of any size. If the last partition size is
greater than 0 and less than `n`, then `pad` will be applied to it.
See Also:
:func:`partition_all`
"""
def pad_xform(rf):
def step_f(result, part):
if len(part) == n:
return rf(result, part)
if pad is None:
return reduced(result)
padding = tuple(_itertools.islice(pad, n - len(part)))
return ensure_reduced(rf(result, part + tuple(padding)))
return multi_arity(rf, rf, step_f)
return comp(partition_all(n, step), pad_xform)
[docs]def partition_by(f):
"""Returns a :any:`transducer` that partitions inputs by `f`.
In this context, a partition is defined as a tuple containing consecutive
items for which ``f(item)`` returns the same value. That is to say, a new
partition will be started each time ``f(item)`` returns a different value
than the previous call.
Args:
f: A function, ``f(item) -> any``.
"""
def xform(rf):
prev_f_ret = _Undefined
buffer = []
def step(result, val):
nonlocal prev_f_ret, buffer
f_ret = f(val)
if prev_f_ret is _Undefined or f_ret == prev_f_ret:
prev_f_ret = f_ret
buffer.append(val)
return result
prev_f_ret = f_ret
rf_ret = rf(result, tuple(buffer))
buffer = [] if is_reduced(rf_ret) else [val]
return rf_ret
def complete(result):
if len(buffer) == 0:
return rf(result)
flushed_result = unreduced(rf(result, tuple(buffer)))
buffer.clear()
return rf(flushed_result)
return multi_arity(rf, complete, step)
return xform
[docs]def reductions(rf, init=_Undefined):
"""
reductions(rf, init=Undefined)
Returns a :any:`transducer` that outputs each intermediate result from a reduction.
The transformation first outputs `init`. From then on, all outputs will be
derived from ``rf(prev_output, val)`` where `val` is an input to the
transformation. `rf` will continue to get called until all input has been
exhausted or `rf` returns a :any:`reduced` value.
Args:
rf: A :any:`reducing function` accepting 2 arguments. If `init` is not
provided, then `rf` must return a value to be used as `init` when
called with 0 arguments.
init: An optional initial value.
See Also:
:func:`ireduce`
"""
if init is _Undefined:
init = rf()
def xform(xrf):
prev_state = _Undefined
def step(result, val):
nonlocal prev_state
if prev_state is _Undefined:
prev_state = init
result = xrf(result, init)
if is_reduced(result):
return result
prev_state = rf(prev_state, val)
new_result = xrf(result, unreduced(prev_state))
return (ensure_reduced(new_result)
if is_reduced(prev_state)
else new_result)
def complete(result):
if prev_state is _Undefined:
tmp_result = unreduced(xrf(result, init))
else:
tmp_result = result
return xrf(tmp_result)
return multi_arity(xrf, complete, step)
return xform
[docs]def interpose(sep):
"""Returns a :any:`transducer` that outputs each input separated by `sep`."""
def xform(rf):
is_initial = True
def step(result, val):
nonlocal is_initial
if is_initial:
is_initial = False
return rf(result, val)
sep_result = rf(result, sep)
return sep_result if is_reduced(sep_result) else rf(sep_result, val)
return multi_arity(rf, rf, step)
return xform
[docs]def replace(smap):
"""Returns a :any:`transducer` that replaces values based on the given dictionary.
The returned transducer replaces any input that's a key in `smap` with the
key's corresponding value. Inputs that aren't a key in `smap` will be
outputted without any transformation.
Args:
smap: A dictionary that maps values to their replacements.
"""
def xform(rf):
def step(result, val):
return rf(result, smap.get(val, val))
return multi_arity(rf, rf, step)
return xform
[docs]def random_sample(prob):
"""Returns a :any:`transducer` that selects inputs with the given probability.
Args:
prob: A number between 0 and 1.
"""
return filter(lambda _: _random.random() < prob)