Source code for chanpy._channel

# Copyright 2019 Jake Magers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import contextlib
import random
import threading
from collections import deque
from numbers import Number
from . import _buffers as bufs
from . import transducers as xf


__all__ = ['chan', 'alt', 'b_alt', 'QueueSizeError']


MAX_QUEUE_SIZE = 1024


[docs]class QueueSizeError(Exception): """Maximum pending channel operations exceeded. Raised when too many operations have been enqueued on a channel. Consider using a windowing buffer to prevent enqueuing too many puts or altering your design to have less asynchronous "processes" access the channel at once. Note: This exception is an indication of a design error. It should NOT be caught and discarded. """
class Promise: def __init__(self): self._lock = threading.Lock() self._value = None self._is_realized = False self._realized = threading.Condition(self._lock) def deliver(self, value): with self._lock: if self._is_realized: return False self._value = value self._is_realized = True self._realized.notify_all() return True def deref(self): with self._lock: self._realized.wait_for(lambda: self._is_realized) return self._value class FlagFuture(asyncio.Future): def __init__(self, flag): self.__flag = flag self.__result = None super().__init__(loop=asyncio.get_running_loop()) def set_result(self, result): raise AssertionError('cannot call set_result on a future provided by ' 'a channel') def set_exception(self, exception): raise AssertionError('cannot call set_exception on a future provided ' 'by a channel') def cancel(self): with self.__flag['lock']: if self.__flag['is_active']: self.__flag['is_active'] = False elif not super().done(): # This case is when value has been committed but # future hasn't been set because call_soon_threadsafe() # callback hasn't been invoked yet super().set_result(self.__result) return super().cancel() def future_deliver_fn(future): def set_result(result): try: asyncio.Future.set_result(future, result) except asyncio.InvalidStateError: assert future.result() is result def deliver(result): future._FlagFuture__result = result future.get_loop().call_soon_threadsafe(set_result, result) return deliver def create_flag(): return {'lock': threading.Lock(), 'is_active': True} class HandlerManagerMixin: def __enter__(self): return self.acquire() def __exit__(self, e_type, e_val, traceback): self.release() class FnHandler(HandlerManagerMixin): def __init__(self, cb, is_waitable=True): self._cb = cb self.is_waitable = is_waitable self.lock_id = 0 self.is_active = True def acquire(self): return True def release(self): pass def commit(self): return self._cb class FlagHandler(HandlerManagerMixin): def __init__(self, flag, cb, is_waitable=True): self._flag = flag self._cb = cb self.is_waitable = is_waitable self.lock_id = id(flag) @property def is_active(self): return self._flag['is_active'] def acquire(self): return self._flag['lock'].acquire() def release(self): self._flag['lock'].release() def commit(self): self._flag['is_active'] = False return self._cb @contextlib.contextmanager def acquire_handlers(*handlers): """Returns a context manager for acquiring `handlers` without deadlock.""" # Acquire locks in consistent order for h in sorted(handlers, key=lambda h: h.lock_id): is_acquired = h.acquire() assert is_acquired try: yield True finally: for h in handlers: h.release() def nop_ex_handler(e): raise e
[docs]class chan: """A CSP channel with optional buffer, transducer, and exception handler. Channels support multiple producers and consumers and may be buffered or unbuffered. Additionally, buffered channels can optionally have a transformation applied to the values put to them through the use of a :any:`transducer`. Channels may be used by threads with or without a running asyncio event loop. The :meth:`get`, :meth:`put`, and :func:`alt` functions provide direct support for asyncio by returning awaitables. Channels additionally can be used as asynchronous generators when used with ``async for``. :meth:`b_get`, :meth:`b_put`, :func:`b_alt`, and :meth:`to_iter` provide blocking alternatives for threads which do not wish to use asyncio. Channels can even be used with callback based code via :meth:`f_put` and :meth:`f_get`. A very valuable feature of channels is that producers and consumers of them need not be of the same type. For example, a value placed onto a channel with :meth:`put` (asyncio) can be taken by a call to :meth:`b_get` (blocking) from a separate thread. A select/alt feature is also available using the :func:`alt` and :func:`b_alt` functions. This feature allows one to attempt many operations on a channel at once and only have the first operation to complete actually committed. Once closed, future puts will be unsuccessful but any pending puts will remain until consumed or until a :any:`reduced` value is returned from the transformation. Once exhausted, all future gets will complete with the value None. Because of this, None cannot be put onto a channel either directly or indirectly through a transformation. Args: buf_or_n: An optional buffer that may be expressed as a positive number. If it's a number, a fixed buffer of that capacity will be used. If None, the channel will be unbuffered. xform: An optional :any:`transducer <transducers>` for transforming elements put onto the channel. `buf_or_n` must not be None if this is provided. ex_handler: An optional function to handle exceptions raised during transformation. Must accept the raised exception as a parameter. Any non-None return value will be put onto the buffer. See Also: :any:`buffer()` :any:`dropping_buffer()` :any:`sliding_buffer()` """ def __init__(self, buf_or_n=None, xform=None, ex_handler=None): if buf_or_n is None: if xform is not None: raise TypeError('unbuffered channels cannot have an xform') if ex_handler is not None: raise TypeError('unbuffered channels cannot have an ex_handler') self._buf = (bufs.FixedBuffer(buf_or_n) if isinstance(buf_or_n, Number) else buf_or_n) xform = xf.identity if xform is None else xform ex_handler = nop_ex_handler if ex_handler is None else ex_handler self._takes = deque() self._puts = deque() self._is_closed = False self._buf_rf_is_completed = False self._lock = threading.Lock() @xform @xf.completing def xrf(_, val): if val is None: raise AssertionError('xform cannot produce None') self._buf.put(val) def ex_handler_rf(*args): try: return xrf(*args) except Exception as e: val = ex_handler(e) if val is not None: self._buf.put(val) self._buf_rf = ex_handler_rf
[docs] def put(self, val, *, wait=True): """Attempts to put `val` onto the channel. Puts will fail in the following cases: * the channel is already closed * ``wait=False`` and `val` cannot be immediately put onto the channel * a :any:`reduced` value is returned during transformation Args: val: A non-None value to put onto the channel. wait: An optional bool that if False, fails the put operation when it cannot complete immediately. Returns: An awaitable that will evaluate to True if `val` is accepted onto the channel or False if it's not. Raises: RuntimeError: If the calling thread has no running event loop. QueueSizeError: If the channel has too many pending put operations. """ flag = create_flag() future = FlagFuture(flag) handler = FlagHandler(flag, future_deliver_fn(future), wait) ret = self._p_put(handler, val) if ret is not None: asyncio.Future.set_result(future, ret[0]) return future
[docs] def get(self, *, wait=True): """Attempts to take a value from the channel. Gets will fail if the channel is exhausted or if ``wait=False`` and a value is not immediately available. Args: wait: An optional bool that if False, fails the get operation when a value is not immediately available. Returns: An awaitable that evaluates to a value taken from the channel or None if the operation fails. Raises: RuntimeError: If the calling thread has no running event loop. QueueSizeError: If the channel has too many pending get operations. """ flag = create_flag() future = FlagFuture(flag) handler = FlagHandler(flag, future_deliver_fn(future), wait) ret = self._p_get(handler) if ret is not None: asyncio.Future.set_result(future, ret[0]) return future
[docs] def b_put(self, val, *, wait=True): """Same as :meth:`put` except it blocks instead of returning an awaitable. Does not require an event loop. """ prom = Promise() ret = self._p_put(FnHandler(prom.deliver, wait), val) if ret is not None: return ret[0] return prom.deref()
[docs] def b_get(self, *, wait=True): """Same as :meth:`get` except it blocks instead of returning an awaitable. Does not require an event loop. """ prom = Promise() ret = self._p_get(FnHandler(prom.deliver, wait)) if ret is not None: return ret[0] return prom.deref()
[docs] def f_put(self, val, f=None): """Asynchronously puts `val` onto the channel and calls `f` when complete. Does not require an event loop. Args: val: A non-None value to put onto the channel. f: An optional non-blocking function accepting the completion status of the put operation. Returns: False if the channel is already closed or True if it's not. Raises: QueueSizeError: If the channel has too many pending put operations. """ f = (lambda _: None) if f is None else f ret = self._p_put(FnHandler(f), val) if ret is None: return True f(ret[0]) return ret[0]
[docs] def f_get(self, f): """Asynchronously takes a value from the channel and calls `f` with it. Does not require an event loop. Args: f: A non-blocking function accepting a single argument. Will be passed the value taken from the channel or None if the channel is exhausted. Raises: QueueSizeError: If the channel has too many pending get operations. """ ret = self._p_get(FnHandler(f)) if ret is None: return f(ret[0])
[docs] def offer(self, val): """Same as :meth:`b_put(val, wait=False) <b_put>`.""" return self.b_put(val, wait=False)
[docs] def poll(self): """Same as :meth:`b_get(wait=False) <b_get>`.""" return self.b_get(wait=False)
[docs] def close(self): """Closes the channel.""" with self._lock: self._cleanup() self._close()
async def __aiter__(self): """Returns an asynchronous iterator over the channel's values.""" while True: value = await self.get() if value is None: break yield value
[docs] def to_iter(self): """Returns an iterator over the channel's values. Calling ``next()`` on the returned iterator may block. Does not require an event loop. """ while True: val = self.b_get() if val is None: break yield val
def _p_put(self, handler, val): """Commits or enqueues a put operation to the channel. If the put operation completes immediately, then the `handler` will be committed but its callback will not be invoked. The completion status of the operation will be wrapped in a tuple and returned. The status will be True if `val` was accepted onto the channel or False otherwise. If the operation is unable to complete immediately, then `handler` and `val` will be enqueued and None will be returned. When the operation eventually completes, the `handler` will be committed and its callback will be invoked with the completion status. Args: handler: A handler that will be committed upon completion. Its callback will only be invoked if the operation is enqueued. val: A non-None value to put onto the channel. Returns: A tuple containing the completion status if the operation completes immediately. None if the operation is enqueued. Raises: QueueSizeError: If the channel has too many pending put operations. """ if val is None: raise TypeError('item cannot be None') with self._lock: self._cleanup() if self._is_closed: return self._fail_op(handler, False) # Attempt to transfer val onto buf if self._buf is not None and not self._buf.is_full(): with handler: if not handler.is_active: return None handler.commit() self._buf_put(val) self._transfer_buf_vals_to_takers() return True, # Attempt to transfer val to a taker if self._buf is None: while len(self._takes) > 0: taker = self._takes[0] with acquire_handlers(handler, taker): if not handler.is_active: return None self._takes.popleft() if taker.is_active: handler.commit() taker.commit()(val) return True, if not handler.is_waitable: return self._fail_op(handler, False) # Attempt to enqueue the operation if len(self._puts) >= MAX_QUEUE_SIZE: raise QueueSizeError('channel has too many pending puts') self._puts.append((handler, val)) def _p_get(self, handler): """Commits or enqueues a get operation to the channel. If the get operation completes immediately, then the `handler` will be committed but its callback will not be invoked. If the channel is not already exhausted, then the value taken from the channel will be wrapped in a tuple and returned. If the channel is already exhausted then the tuple, ``(None,)``, will be returned. If the operation is unable to complete immediately, then `handler` and `val` will be enqueued and None will be returned. When the operation eventually completes, the `handler` will be committed and its callback will be invoked with the value taken from the channel or None if its exhausted. Args: handler: A handler that will be committed upon completion. Its callback will only be invoked if the operation is enqueued. Returns: A tuple containing the result of the get operation if it completes immediately. None if the operation is enqueued. Raises: QueueSizeError: If the channel has too many pending get operations. """ with self._lock: self._cleanup() # Attempt to take val from buf if self._buf is not None and len(self._buf) > 0: with handler: if not handler.is_active: return None handler.commit() ret = self._buf.get() # Transfer vals from putters onto buf while len(self._puts) > 0 and not self._buf.is_full(): putter, val = self._puts.popleft() with putter: if putter.is_active: putter.commit()(True) self._buf_put(val) self._complete_buf_rf_if_ready() return ret, # Attempt to take val from a putter if self._buf is None: while len(self._puts) > 0: putter, val = self._puts[0] with acquire_handlers(handler, putter): if not handler.is_active: return None self._puts.popleft() if putter.is_active: handler.commit() putter.commit()(True) return val, if self._is_closed or not handler.is_waitable: return self._fail_op(handler, None) # Attempt to enqueue the operation if len(self._takes) >= MAX_QUEUE_SIZE: raise QueueSizeError('channel has too many pending gets') self._takes.append(handler) def _cleanup(self): """Removes enqueued operations that are no longer active.""" self._takes = deque(h for h in self._takes if h.is_active) self._puts = deque((h, v) for h, v in self._puts if h.is_active) @staticmethod def _fail_op(handler, val): with handler: if handler.is_active: handler.commit() return val, return None def _buf_put(self, val): if xf.is_reduced(self._buf_rf(None, val)): # If reduced value is returned then no more input is allowed onto # buf. To ensure this, remove all pending puts and close ch. for putter, _ in self._puts: with putter: if putter.is_active: putter.commit()(False) self._puts.clear() self._close() def _transfer_buf_vals_to_takers(self): while len(self._takes) > 0 and len(self._buf) > 0: taker = self._takes.popleft() with taker: if taker.is_active: taker.commit()(self._buf.get()) def _complete_buf_rf_if_ready(self): """Calls buf_rf completion arity once if all input has been put to buf. Guarantees that the buf_rf completion arity will be invoked only after all input has been placed onto the buffer and that it will never be called more than once. Invoking the completion arity will flush any remaining values from the transformed reducing function onto buf. """ if (self._is_closed and len(self._puts) == 0 and not self._buf_rf_is_completed): self._buf_rf_is_completed = True self._buf_rf(None) def _close(self): self._is_closed = True if self._buf is not None: self._complete_buf_rf_if_ready() self._transfer_buf_vals_to_takers() # Remove pending takes # No-op if there are pending puts or buffer isn't empty for taker in self._takes: with taker: if taker.is_active: taker.commit()(None) self._takes.clear()
class _Undefined: """A default parameter value that a user could never pass in.""" def _alts(flag, deliver_fn, ops, priority, default): ops = list(ops) if len(ops) == 0: raise ValueError('alts must have at least one channel operation') if not priority: random.shuffle(ops) ch_ops = {} # Parse ops into ch_ops for raw_op in ops: try: ch, val = raw_op op = {'type': 'put', 'value': val} except TypeError: ch = raw_op op = {'type': 'get'} if ch_ops.get(ch, op)['type'] != op['type']: raise ValueError('cannot get and put to same channel') ch_ops[ch] = op def create_handler(ch): return FlagHandler(flag, lambda val: deliver_fn((val, ch))) # Start ops for ch, op in ch_ops.items(): if op['type'] == 'get': ret = ch._p_get(create_handler(ch)) elif op['type'] == 'put': ret = ch._p_put(create_handler(ch), op['value']) if ret is not None: return ret[0], ch if default is not _Undefined: with flag['lock']: if flag['is_active']: flag['is_active'] = False return default, 'default'
[docs]def alt(*ops, priority=False, default=_Undefined): """ alt(*ops, priority=False, default=Undefined) Returns an awaitable representing the first and only channel operation to finish. Accepts a variable number of operations that either get from or put to a channel and commits only one of them. If no `default` is provided, then only the first op to finish will be committed. If `default` is provided and none of the `ops` finish immediately, then no operation will be committed and `default` will instead be used to complete the returned awaitable. Args: ops: Operations that either get from or put to a channel. A get operation is represented as simply a channel to get from. A put operation is represented as an iterable of the form ``[channel, val]``, where `val` is an item to put onto `channel`. priority: An optional bool. If True, operations will be tried in order. If False, operations will be tried in random order. default: An optional value to use in case no operation finishes immediately. Returns: An awaitable that evaluates to a tuple of the form ``(val, ch)``. If `default` is not provided, then `val` will be what the first successful operation returned and `ch` will be the channel used in that operation. If `default` is provided and none of the operations complete immediately, then the awaitable will evaluate to ``(default, 'default')``. Raises: ValueError: If `ops` is empty or contains both a get and put operation to the same channel. RuntimeError: If the calling thread has no running event loop. See Also: :func:`b_alt` """ flag = create_flag() future = FlagFuture(flag) ret = _alts(flag, future_deliver_fn(future), ops, priority, default) if ret is not None: asyncio.Future.set_result(future, ret) return future
[docs]def b_alt(*ops, priority=False, default=_Undefined): """ b_alt(*ops, priority=False, default=Undefined) Same as :func:`alt` except it blocks instead of returning an awaitable. Does not require an event loop. """ prom = Promise() ret = _alts(create_flag(), prom.deliver, ops, priority, default) return prom.deref() if ret is None else ret