API¶
Channels, the center around any CSP library. The core
module provides
all the essential functions for creating and managing them. For convenience,
all of the public members of core
exist at the top-level of the package.
Like Clojure’s core.async, ChanPy channels have direct support for
transformations via transducers. The transducers
module provides many
transducers as well as functions to help create and use them.
Core¶
Core functions for working with channels.
ChanPy’s channels
have full support for use with asyncio
coroutines, callback based code, and multi-threaded designs. The functions in
this module are designed to primarily accept and produce channels and by doing
so, can be used almost identically with each of the aforementioned styles.
An extremely valuable feature from Clojure’s core.async library is the ability to cheaply create asynchronous “processes” using go blocks. ChanPy, like aiochan, is able to do something similar by leveraging Python’s own asyncio library. Channels can easily be used from within coroutines which can then be added as tasks to an event loop. ChanPy additionally offers ways for these tasks to be added from threads without a running event loop.
Note
Unless explicitly stated otherwise, any function involving asynchronous
work should be assumed to require an asyncio event loop. Many of these
functions leverage the use of an event loop for the efficiency reasons
stated earlier. Threads with a running event loop will be able to directly
call these functions but threads without one will be required to register
one to themselves using set_loop()
prior to doing so. Calling
set_loop()
will be unnecessary for threads that were created with
thread()
as those threads will have already been registered.
-
exception
chanpy.core.
QueueSizeError
[source]¶ Maximum pending channel operations exceeded.
Raised when too many operations have been enqueued on a channel. Consider using a windowing buffer to prevent enqueuing too many puts or altering your design to have less asynchronous “processes” access the channel at once.
Note
This exception is an indication of a design error. It should NOT be caught and discarded.
-
chanpy.core.
alt
(*ops, priority=False, default=Undefined)[source]¶ Returns an awaitable representing the first and only channel operation to finish.
Accepts a variable number of operations that either get from or put to a channel and commits only one of them. If no default is provided, then only the first op to finish will be committed. If default is provided and none of the ops finish immediately, then no operation will be committed and default will instead be used to complete the returned awaitable.
- Parameters
ops – Operations that either get from or put to a channel. A get operation is represented as simply a channel to get from. A put operation is represented as an iterable of the form
[channel, val]
, where val is an item to put onto channel.priority – An optional bool. If True, operations will be tried in order. If False, operations will be tried in random order.
default – An optional value to use in case no operation finishes immediately.
- Returns
An awaitable that evaluates to a tuple of the form
(val, ch)
. If default is not provided, then val will be what the first successful operation returned and ch will be the channel used in that operation. If default is provided and none of the operations complete immediately, then the awaitable will evaluate to(default, 'default')
.- Raises
ValueError – If ops is empty or contains both a get and put operation to the same channel.
RuntimeError – If the calling thread has no running event loop.
See also
-
chanpy.core.
b_alt
(*ops, priority=False, default=Undefined)[source]¶ Same as
alt()
except it blocks instead of returning an awaitable.Does not require an event loop.
-
chanpy.core.
buffer
(n)[source]¶ Returns a fixed buffer with a capacity of n.
Puts to channels with this buffer will block if the capacity is reached.
- Parameters
n – A positive number.
-
class
chanpy.core.
chan
(buf_or_n=None, xform=None, ex_handler=None)[source]¶ A CSP channel with optional buffer, transducer, and exception handler.
Channels support multiple producers and consumers and may be buffered or unbuffered. Additionally, buffered channels can optionally have a transformation applied to the values put to them through the use of a
transducer
.Channels may be used by threads with or without a running asyncio event loop. The
get()
,put()
, andalt()
functions provide direct support for asyncio by returning awaitables. Channels additionally can be used as asynchronous generators when used withasync for
.b_get()
,b_put()
,b_alt()
, andto_iter()
provide blocking alternatives for threads which do not wish to use asyncio. Channels can even be used with callback based code viaf_put()
andf_get()
. A very valuable feature of channels is that producers and consumers of them need not be of the same type. For example, a value placed onto a channel withput()
(asyncio) can be taken by a call tob_get()
(blocking) from a separate thread.A select/alt feature is also available using the
alt()
andb_alt()
functions. This feature allows one to attempt many operations on a channel at once and only have the first operation to complete actually committed.Once closed, future puts will be unsuccessful but any pending puts will remain until consumed or until a
reduced
value is returned from the transformation. Once exhausted, all future gets will complete with the value None. Because of this, None cannot be put onto a channel either directly or indirectly through a transformation.- Parameters
buf_or_n – An optional buffer that may be expressed as a positive number. If it’s a number, a fixed buffer of that capacity will be used. If None, the channel will be unbuffered.
xform – An optional
transducer
for transforming elements put onto the channel. buf_or_n must not be None if this is provided.ex_handler – An optional function to handle exceptions raised during transformation. Must accept the raised exception as a parameter. Any non-None return value will be put onto the buffer.
See also
-
b_get
(*, wait=True)[source]¶ Same as
get()
except it blocks instead of returning an awaitable.Does not require an event loop.
-
b_put
(val, *, wait=True)[source]¶ Same as
put()
except it blocks instead of returning an awaitable.Does not require an event loop.
-
f_get
(f)[source]¶ Asynchronously takes a value from the channel and calls f with it.
Does not require an event loop.
- Parameters
f – A non-blocking function accepting a single argument. Will be passed the value taken from the channel or None if the channel is exhausted.
- Raises
QueueSizeError – If the channel has too many pending get operations.
-
f_put
(val, f=None)[source]¶ Asynchronously puts val onto the channel and calls f when complete.
Does not require an event loop.
- Parameters
val – A non-None value to put onto the channel.
f – An optional non-blocking function accepting the completion status of the put operation.
- Returns
False if the channel is already closed or True if it’s not.
- Raises
QueueSizeError – If the channel has too many pending put operations.
-
get
(*, wait=True)[source]¶ Attempts to take a value from the channel.
Gets will fail if the channel is exhausted or if
wait=False
and a value is not immediately available.- Parameters
wait – An optional bool that if False, fails the get operation when a value is not immediately available.
- Returns
An awaitable that evaluates to a value taken from the channel or None if the operation fails.
- Raises
RuntimeError – If the calling thread has no running event loop.
QueueSizeError – If the channel has too many pending get operations.
-
offer
(val)[source]¶ Same as
b_put(val, wait=False)
.
-
poll
()[source]¶ Same as
b_get(wait=False)
.
-
put
(val, *, wait=True)[source]¶ Attempts to put val onto the channel.
Puts will fail in the following cases:
the channel is already closed
wait=False
and val cannot be immediately put onto the channela
reduced
value is returned during transformation
- Parameters
val – A non-None value to put onto the channel.
wait – An optional bool that if False, fails the put operation when it cannot complete immediately.
- Returns
An awaitable that will evaluate to True if val is accepted onto the channel or False if it’s not.
- Raises
RuntimeError – If the calling thread has no running event loop.
QueueSizeError – If the channel has too many pending put operations.
-
chanpy.core.
dropping_buffer
(n)[source]¶ Returns a windowing buffer that drops inputs when capacity is reached.
Puts to channels with this buffer will appear successful after the capacity is reached but nothing will be added to the buffer.
- Parameters
n – A positive number representing the buffer capacity.
-
chanpy.core.
get_loop
()[source]¶ Returns the event loop for the current thread.
If
set_loop()
has been used to register an event loop to the current thread, then that loop will be returned. If no such event loop exists, then returns the running loop in the current thread.- Raises
RuntimeError – If no event loop has been registered and no loop is running in the current thread.
See also
-
chanpy.core.
go
(coro)[source]¶ Adds a coroutine object as a task to the current event loop.
coro will be added as a task to the event loop returned from
get_loop()
.- Parameters
coro – A coroutine object.
- Returns
A channel containing the return value of coro.
-
chanpy.core.
map
(f, chs, buf_or_n=None)[source]¶ Repeatedly takes a value from each channel and applies f.
Asynchronously takes one value per source channel and passes the resulting list of values as positional arguments to f. Each return value of f will be put onto the returned channel. The returned channel closes if any one of the source channels closes.
- Parameters
f – A non-blocking function accepting
len(chs)
positional arguments.chs – An iterable of source channels.
buf_or_n – An optional buffer to use with the returned channel. Can also be represented as a positive number. See
chan
.
- Returns
A channel containing the return values of f.
-
chanpy.core.
merge
(chs, buf_or_n=None)[source]¶ Returns a channel that emits values from the provided source channels.
Transfers all values from chs onto the returned channel. The returned channel closes after the transfer finishes.
- Parameters
chs – An iterable of source channels.
buf_or_n – An optional buffer to use with the returned channel. Can also be represented as a positive number. See
chan
.
See also
-
class
chanpy.core.
mix
(ch)[source]¶ Consumes values from each of its source channels and puts them onto ch.
A source channel can be added with
admix()
and removed withunmix()
orunmix_all()
.A source channel can be given a set of attribute flags to modify how it is consumed with
toggle()
. If a channel has its'pause'
attribute set to True, then the mix will stop consuming from it. Else if its'mute'
attribute is set, then the channel will still be consumed but its values discarded.A source channel may also be soloed by setting the
'solo'
attribute. If any source channel is soloed, then all of its other attributes will be ignored. Furthermore, non-soloed channels will have their attributes ignored and instead will take on whatever attribute has been set withsolo_mode()
(defaults to'mute'
ifsolo_mode()
hasn’t been invoked).- Parameters
ch – A channel to put values onto.
See also
-
solo_mode
(mode)[source]¶ Sets the mode for non-soloed source channels.
For as long as there is at least one soloed channel, non-soloed source channels will have their attributes ignored and will instead take on the provided mode.
- Parameters
mode – Either
'pause'
or'mute'
. Seemix
for behaviors.
-
toggle
(state_map)[source]¶ Merges state_map with the current state of the mix.
state_map will be used to update the attributes of the mix’s source channels by merging its contents with the current state of the mix. If state_map contains a channel that is not currently in the mix, then that channel will be added with the given attributes.
- Parameters
state_map – A dictionary of the form
{channel: attribute_map}
where attribute_map is a dictionary of the form{attribute: bool}
. Supported attributes are{'solo', 'pause', 'mute'}
. Seemix
for corresponding behaviors.
-
class
chanpy.core.
mult
(ch)[source]¶ A mult(iple) of the source channel that puts each of its values to its taps.
tap()
can be used to subscribe a channel to the mult and therefore receive copies of the values from ch. Taps can later be unsubscribed usinguntap()
oruntap_all()
.No tap will receive the next value from ch until all taps have accepted the current value. If no tap exists, values will still be consumed from ch but will be discarded.
- Parameters
ch – A channel to get values from.
-
chanpy.core.
onto_chan
(ch, coll, *, close=True)[source]¶ Asynchronously transfers values from an iterable to a channel.
- Parameters
ch – A channel to put values onto.
coll – An iterable to get values from.
close – An optional bool. If True, ch will be closed after transfer finishes.
- Returns
A channel that closes after the transfer finishes.
See also
-
chanpy.core.
pipe
(from_ch, to_ch, *, close=True)[source]¶ Asynchronously transfers all values from from_ch to to_ch.
- Parameters
from_ch – A channel to get values from.
to_ch – A channel to put values onto.
close – An optional bool. If True, to_ch will be closed after transfer finishes.
- Returns
to_ch.
-
chanpy.core.
pipeline
(n, to_ch, xform, from_ch, *, close=True, ex_handler=None, mode='thread', chunksize=1)[source]¶ Transforms values from from_ch to to_ch in parallel.
Values from from_ch will be transformed in parallel using a pool of threads or processes. The transducer will be applied to values from from_ch independently (not across values) and may produce zero or more outputs per input. The transformed values will be put onto to_ch in order relative to the inputs. If to_ch closes, then from_ch will no longer be consumed from.
- Parameters
n – A positive int specifying the maximum number of workers to run in parallel.
to_ch – A channel to put the transformed values onto.
xform – A
transducer
that will be applied to each value independently (not across values).from_ch – A channel to get values from.
close – An optional bool. If True, to_ch will be closed after transfer finishes.
ex_handler – An optional exception handler. See
chan
.mode – Either
'thread'
or'process'
. Specifies whether to use a thread or process pool to parallelize work.chunksize – An optional positive int that’s only relevant when
mode='process'
. Specifies the approximate amount of values each worker will receive at once.
- Returns
A channel that closes after the transfer finishes.
Note
If CPython is being used with
mode='thread'
, then xform must release the GIL at some point in order to achieve any parallelism.See also
-
chanpy.core.
pipeline_async
(n, to_ch, af, from_ch, *, close=True)[source]¶ Transforms values from from_ch to to_ch in parallel using an async function.
Values will be gathered from from_ch and passed to af along with a channel for its outputs to be placed onto. af will be called as
af(val, result_ch)
and should return immediately, having spawned some asynchronous operation that will place zero or more outputs onto result_ch. Up to n of these asynchronous “processes” will be run at once, each of which will be required to close their corresponding result_ch when finished. Values from these result channels will be placed onto to_ch in order relative to the inputs from from_ch. If to_ch closes, then from_ch will no longer be consumed from and any unclosed result channels will be closed.- Parameters
n – A positive int representing the maximum number of asynchronous “processes” to run at once.
to_ch – A channel to place the results onto.
af – A non-blocking function that will be called as
af(val, result_ch)
. This function will presumably spawn some kind of asynchronous operation that will place outputs onto result_ch. result_ch must be closed before the asynchronous operation finishes.from_ch – A channel to get values from.
close – An optional bool. If True, to_ch will be closed after transfer finishes.
- Returns
A channel that closes after the transfer finishes.
See also
-
chanpy.core.
promise_chan
(xform=None, ex_handler=None)[source]¶ Returns a channel that emits the same value forever.
Creates a channel with an optional
transducer
and exception handler that always returns the same value to consumers. The value emitted will be the first item put onto the channel or None if the channel was closed before the first put.
-
class
chanpy.core.
pub
(ch, topic_fn, buf_fn=None)[source]¶ A pub(lication) of the source channel divided into topics.
The values of ch will be categorized into topics defined by topic_fn. Each topic will be given its own
mult
for channels to subscribe to. Channels can be subscribed to a given topic withsub()
and unsubscribed withunsub()
orunsub_all()
.- Parameters
ch – A channel to get values from.
topic_fn – A function that given a value from ch returns a topic identifier.
buf_fn – An optional function that given a topic returns a buffer to be used with that topic’s
mult
channel. If not provided, channels will be unbuffered.
See also
-
chanpy.core.
reduce
(rf, init, ch) → result_ch[source]¶ reduce(rf, ch) -> result_ch
Asynchronously reduces a channel.
Asynchronously collects values from ch and reduces them using rf, placing the final result in the returned channel. If ch is exhausted, then init will be used as the result. If ch is not exhausted, then the first call to rf will be
rf(init, val)
where val is taken from ch. rf will continue to get called asrf(prev_rf_return, next_val)
until either ch is exhausted or rf returns areduced
value.- Parameters
rf – A
reducing function
accepting 2 args. If init is not provided, then rf must return a value to be used as init when called with 0 args.init – An optional initial value.
ch – A channel to get values from.
- Returns
A channel containing the result of the reduction.
See also
-
chanpy.core.
set_loop
(loop)[source]¶ Registers an event loop to the current thread.
Any thread not running an asyncio event loop will be required to run this function before any asynchronous functions are used. This is because most of the functions in this library that involve asynchronous work are designed to do so through an event loop.
A single event loop may be registered to any number of threads at once.
- Parameters
loop – An asyncio event loop.
- Returns
A context manager that on exit will unregister loop and reregister the event loop that was originally set.
See also
-
chanpy.core.
sliding_buffer
(n)[source]¶ Returns a windowing buffer that evicts the oldest element when capacity is reached.
Puts to channels with this buffer will complete successfully after the capacity is reached but will evict the oldest element in the buffer.
- Parameters
n – A positive number representing the buffer capacity.
-
chanpy.core.
split
(pred, ch, true_buf=None, false_buf=None)[source]¶ Splits the values of a channel into two channels based on a predicate.
Returns a tuple of the form
(true_ch, false_ch)
where true_ch contains all the values from ch where the predicate returns True and false_ch contains all the values that return False.
-
chanpy.core.
thread
(f, executor=None)[source]¶ Registers current loop to a separate thread and then calls f from it.
Calls f in another thread, returning immediately to the calling thread. The separate thread will have the loop from the calling thread registered to it while f runs.
- Parameters
f – A function accepting no arguments.
executor – An optional
ThreadPoolExecutor
to submit f to.
- Returns
A channel containing the return value of f.
-
chanpy.core.
to_chan
(coll)[source]¶ Returns a channel that emits all values from an iterable and then closes.
- Parameters
coll – An iterable to get values from.
See also
-
chanpy.core.
to_list
(ch)[source]¶ Asynchronously reduces the values from a channel to a list.
- Returns
A channel containing a list of values from ch.
-
chanpy.core.
transduce
(xform, rf, init, ch) → result_ch[source]¶ transduce(xform, rf, ch) -> result_ch
Asynchronously reduces a channel with a transformation.
Asynchronously collects values from ch and reduces them using a transformed reducing function equal to
xform(rf)
. Seereduce()
for more information on reduction. After the transformed reducing function has received all input it will be called once more with a single argument, the accumulated result.- Parameters
xform – A
transducer
.rf – A
reducing function
accepting both 1 and 2 arguments. If init is not provided, then rf must return a value to be used as init when called with 0 arguments.init – An optional initial value.
ch – A channel to get values from.
- Returns
A channel containing the result of the reduction.
See also
Transducers¶
Transducers, composable algorithmic transformations.
- Notable features of transducers:
- Creating transducers:
Transducers are also known as
reducing function
transformers. They are simply functions that accept a reducing function as input and return a new reducing function as output.
See clojure.org for more information about transducers.
-
chanpy.transducers.
append
(appendable, val) → appended result[source]¶ - append(appendable) -> appendableappend() -> []
A
reducing function
that appends val to appendable.
-
chanpy.transducers.
cat
(rf)[source]¶ A
transducer
that concatenates the contents of its inputs.Expects each input to be an iterable, the contents of which will be outputted one at a time.
See also
-
chanpy.transducers.
comp
(*xforms)[source]¶ Returns a new
transducer
equal to the composition of xforms.The returned transducer passes values through the given transformations from left to right.
- Parameters
xforms – Transducers.
-
chanpy.transducers.
completing
(rf, cf=<function identity>)[source]¶ Returns a wrapper around rf that calls cf when invoked with one argument.
- Parameters
rf – A
reducing function
.cf – An optional function that accepts a single argument. Used as the completion arity for the returned
reducing function
.
- Returns
A
reducing function
that dispatches to cf when called with a single argument or rf when called with any other number of arguments.
-
chanpy.transducers.
dedupe
(rf)[source]¶ A
transducer
that drops consecutive duplicate values.
-
chanpy.transducers.
distinct
(rf)[source]¶ A
transducer
that drops duplicate values.
-
chanpy.transducers.
drop
(n)[source]¶ Returns a
transducer
that drops the first n inputs.The returned transducer drops the first n inputs if n < the number of inputs. If n >= the number of inputs, then drops all of them.
- Parameters
n – A number.
-
chanpy.transducers.
drop_last
(n)[source]¶ Returns a
transducer
that drops the last n values.The returned transducer drops the last n inputs if n < the number of inputs. If n >= the number of inputs, then drops all of them.
- Parameters
n – A number.
Note
No values will be outputted until n inputs have been received.
-
chanpy.transducers.
drop_while
(pred)[source]¶ Returns a
transducer
that drops inputs until the predicate returns False.- Parameters
pred – A predicate function,
pred(input) -> bool
.
-
chanpy.transducers.
ensure_reduced
(x)[source]¶ Returns
reduced(x)
if x is not already areduced
value.
-
chanpy.transducers.
filter
(pred)[source]¶ Returns a
transducer
that outputs values for which predicate returns True.- Parameters
pred – A predicate function,
pred(value) -> bool
.
See also
-
chanpy.transducers.
filter_indexed
(f)[source]¶ Returns a
transducer
which filters values based onf(index, value)
.The returned transducer outputs values that return True when passed into f with the corresponding index. f will be called as
f(index, value)
where index represents the nth value to be passed into the transformation starting at 0.- Parameters
f – A function,
f(index, value) -> bool
.
See also
-
chanpy.transducers.
identity
(x)[source]¶ A NOP
transducer
that simply returns its argument.
-
chanpy.transducers.
interpose
(sep)[source]¶ Returns a
transducer
that outputs each input separated by sep.
-
chanpy.transducers.
into
(appendable, xform, coll)[source]¶ Transfers all values from coll into appendable with a transformation.
-
chanpy.transducers.
ireduce
(rf, init, coll) → reduction result[source]¶ ireduce(rf, coll) -> reduction result
Returns the result of reducing an iterable.
Reduces coll by repeatedly calling rf with 2 arguments. If coll is empty, then init will be returned. If coll is not empty, then the first call to rf will be
rf(init, first_coll_value)
. rf will continue to get called asrf(prev_rf_return, next_coll_value)
until either coll is exhausted or rf returns areduced
value.- Parameters
rf – A
reducing function
accepting 2 arguments. If init is not provided, then rf must return a value to be used as init when called with 0 arguments.init – An optional initial value.
coll – An iterable.
See also
-
chanpy.transducers.
itransduce
(xform, rf, init, coll) → reduction result[source]¶ itransduce(xform, rf, coll) -> reduction result
Returns the result of reducing an iterable with a transformation.
Reduces coll using a transformed reducing function equal to
xform(rf)
. Seeireduce()
for more information on reduction. After the transformed reducing function has received all input it will be called once more with a single argument, the result thus far.- Parameters
xform – A
transducer
.rf – A
reducing function
accepting both 1 and 2 arguments. If init is not provided, then rf must return a value to be used as init when called with 0 arguments.init – An optional initial value.
coll – An iterable.
See also
-
chanpy.transducers.
keep
(f)[source]¶ Returns a
transducer
that outputs the non-None return values off(value)
.See also
-
chanpy.transducers.
keep_indexed
(f)[source]¶ Returns a
transducer
that outputs the non-None return values off(index, value)
.The returned transducer outputs the non-None return values of
f(index, value)
where index represents the nth value to be passed into the transformation starting at 0.- Parameters
f – A function,
f(index, value) -> any
.
See also
-
chanpy.transducers.
map
(f)[source]¶ Returns a
transducer
that applies f to each input.- Parameters
f – A function,
f(input) -> any
.
See also
-
chanpy.transducers.
map_indexed
(f)[source]¶ Returns a
transducer
that transforms usingf(index, value)
.The returned transducer applies f to each value with the corresponding index. f will be called as
f(index, value)
where index represents the nth value to be passed into the transformation starting at 0.- Parameters
f – A function,
f(index, value) -> any
.
See also
-
chanpy.transducers.
mapcat
(f)[source]¶ Returns a
transducer
that applies f to each input and concatenates the result.
-
chanpy.transducers.
multi_arity
(*funcs)[source]¶ Returns a new multi-arity function which dispatches to funcs.
The returned function will dispatch to the provided functions based on the number of positional arguments it was called with. If called with zero arguments it will dispatch to the first function in funcs, if called with one argument it will dispatch to the second function in funcs, etc.
- Parameters
funcs – Functions to dispatch to. Each function represents a different arity for the returned function. None values may be used to represent arities that don’t exist.
-
chanpy.transducers.
partition
(n, step=None, pad=None)[source]¶ Returns a
transducer
that partitions values into tuples of size n.The returned transducer partitions the values into tuples of size n that are step items apart.
If step < n, partitions will overlap n - step elements.
If step == n, the default, no overlapping or dropping will occur.
If step > n, step - n values will be dropped between partitions.
If the last partition size is greater than 0 but less than n:
If pad is None, the last partition is discarded.
If pad exists, its values will be used to fill the partition to a desired size of n. The padded partition will be outputted even if its size is < n.
- Parameters
n – A positive int representing the length of each partition. The last partition may be < n if pad is provided.
step – An optional positive int used as the offset between partitions.
pad – An optional iterable of any size. If the last partition size is greater than 0 and less than n, then pad will be applied to it.
See also
-
chanpy.transducers.
partition_all
(n, step=None)[source]¶ Returns a
transducer
that partitions all values.The returned transducer partitions values into tuples of size n that are step items apart. Partitions at the end may have a size < n.
If step < n, partitions will overlap n - step elements.
If step == n, the default, no overlapping or dropping will occur.
If step > n, step - n values will be dropped between partitions.
- Parameters
n – An optional positive int representing the size of each partition (may be less for partitions at the end).
step – An optional positive int used as the offset between partitions. Defaults to n.
See also
-
chanpy.transducers.
partition_by
(f)[source]¶ Returns a
transducer
that partitions inputs by f.In this context, a partition is defined as a tuple containing consecutive items for which
f(item)
returns the same value. That is to say, a new partition will be started each timef(item)
returns a different value than the previous call.- Parameters
f – A function,
f(item) -> any
.
-
chanpy.transducers.
random_sample
(prob)[source]¶ Returns a
transducer
that selects inputs with the given probability.- Parameters
prob – A number between 0 and 1.
-
class
chanpy.transducers.
reduced
(x)[source]¶ Wraps x in such a way that a reduce will terminate with x.
A
reducing function
can returnreduced(x)
to terminate a reduction early with the value x.If used with a transduce function such as
itransduce()
, the reduction will terminate with the result of invoking the completion arity with x.
-
chanpy.transducers.
reductions
(rf, init=Undefined)[source]¶ Returns a
transducer
that outputs each intermediate result from a reduction.The transformation first outputs init. From then on, all outputs will be derived from
rf(prev_output, val)
where val is an input to the transformation. rf will continue to get called until all input has been exhausted or rf returns areduced
value.- Parameters
rf – A
reducing function
accepting 2 arguments. If init is not provided, then rf must return a value to be used as init when called with 0 arguments.init – An optional initial value.
See also
-
chanpy.transducers.
remove
(pred)[source]¶ Returns a
transducer
that drops values for which predicate returns True.- Parameters
pred – A predicate function,
pred(value) -> bool
.
See also
-
chanpy.transducers.
remove_indexed
(f)[source]¶ Returns a
transducer
which drops values based onf(index, value)
.The returned transducer drops values that return True when passed into f with the corresponding index. f will be called as
f(index, value)
where index represents the nth value to be passed into the transformation starting at 0.- Parameters
f – A function,
f(index, value) -> bool
.
See also
-
chanpy.transducers.
replace
(smap)[source]¶ Returns a
transducer
that replaces values based on the given dictionary.The returned transducer replaces any input that’s a key in smap with the key’s corresponding value. Inputs that aren’t a key in smap will be outputted without any transformation.
- Parameters
smap – A dictionary that maps values to their replacements.
-
chanpy.transducers.
take
(n)[source]¶ Returns a
transducer
that outputs the first n inputs.The returned transducer outputs the first n inputs if n < the number of inputs. If n >= the number of inputs, then outputs all of them.
- Parameters
n – A number.
-
chanpy.transducers.
take_last
(n)[source]¶ Returns a
transducer
that outputs the last n inputs.The returned transducer outputs the last n inputs if n < the number of inputs. If n >= the number of inputs, then outputs all of them.
Note
No values will be outputted until the completion arity is called.
- Parameters
n – A number.
-
chanpy.transducers.
take_nth
(n)[source]¶ Returns a
transducer
that outputs every nth input starting with the first.- Parameters
n – A positive int.
-
chanpy.transducers.
take_while
(pred)[source]¶ Returns a
transducer
that outputs values until the predicate returns False.- Parameters
pred – A predicate function,
f(value) -> bool
.
-
chanpy.transducers.
unreduced
(x)[source]¶ Returns x if it’s not a
reduced
value else returns the unwrapped value.
-
chanpy.transducers.
xiter
(xform, coll)[source]¶ Returns an iterator over the transformed elements in coll.
Useful for when you want to transform an iterable into another iterable in a lazy fashion.
- Parameters
xform – A
transducer
.coll – A potentially infinite iterable.