Skip to content

asyn_iter Module Reference

amap(f, xs) async

map but f is async

Source code in haskellian/src/haskellian/asyn_iter/funcs.py
@AI.lift
async def amap(f: Callable[[A], Awaitable[B]], xs: AsyncIterable[A]) -> AsyncIterable[B]:
  """`map` but `f` is async"""
  async for x in xs:
    yield await f(x)

asyncify(xs) async

Asyncify an iterable

Source code in haskellian/src/haskellian/asyn_iter/funcs.py
@AI.lift
async def asyncify(xs: Iterable[A]) -> AsyncIterable[A]:
  """Asyncify an iterable"""
  for x in xs:
    yield x

batch(batch_size, xs, yield_remaining=True) async

Batch elements of xs into tuples of size batch_size

Source code in haskellian/src/haskellian/asyn_iter/funcs.py
@AI.lift
async def batch(
  batch_size: int, xs: AsyncIterable[A], yield_remaining: bool = True
) -> AsyncIterable[tuple[A, ...]]:
  """Batch elements of `xs` into tuples of size `batch_size`"""
  batch = []
  async for x in xs:
    if len(batch) == batch_size:
      yield tuple(batch)
      batch = []
    batch.append(x)
  if yield_remaining and len(batch) > 0:
    yield tuple(batch)

enumerate(xs) async

Source code in haskellian/src/haskellian/asyn_iter/funcs.py
@AI.lift
async def enumerate(xs: AsyncIterable[A]) -> AsyncIterable[tuple[int, A]]:
  """"""
  i = 0
  async for x in xs:
    yield i, x
    i += 1

every(n, xs) async

Take every nth element of xs - every(3, arange(10)) == AsyncIter([0, 3, 6, 9])

Source code in haskellian/src/haskellian/asyn_iter/funcs.py
@AI.lift
async def every(n: int, xs: AsyncIterable[A]) -> AsyncIterable[A]:
	"""Take every `n`th element of `xs`
	- `every(3, arange(10)) == AsyncIter([0, 3, 6, 9])`
	"""
	async for i, x in enumerate(xs):
		if i % n == 0:
			yield x

filter(p, xs) async

Source code in haskellian/src/haskellian/asyn_iter/funcs.py
@AI.lift
async def filter(p: Callable[[A], bool], xs: AsyncIterable[A]) -> AsyncIterable[A]:
  """"""
  async for x in xs:
    if p(x):
      yield x

flatmap(f, xs) async

Source code in haskellian/src/haskellian/asyn_iter/funcs.py
@AI.lift
async def flatmap(f: Callable[[A], AsyncIterable[B]], xs: AsyncIterable[A]) -> AsyncIterable[B]:
  """"""
  async for x in xs:
    async for y in f(x):
      yield y

flatten(xxs) async

Single-level flatten

Source code in haskellian/src/haskellian/asyn_iter/funcs.py
@AI.lift
async def flatten(xxs: AsyncIterable[Iterable[A]]) -> AsyncIterable[A]:
  """Single-level flatten"""
  async for xs in xxs:
    for x in xs:
      yield x

head(xs) async

Get the first element of xs

Source code in haskellian/src/haskellian/asyn_iter/funcs.py
async def head(xs: AsyncIterable[A]) -> A | None:
  """Get the first element of `xs`"""
  async for x in xs:
    return x

map(f, xs) async

Source code in haskellian/src/haskellian/asyn_iter/funcs.py
@AI.lift
async def map(f: Callable[[A], B], xs: AsyncIterable[A]) -> AsyncIterable[B]:
  """"""
  async for x in xs:
    yield f(x)

skip(n, xs) async

Skip the first n elements of xs

Source code in haskellian/src/haskellian/asyn_iter/funcs.py
@AI.lift
async def skip(n: int, xs: AsyncIterable[A]) -> AsyncIterable[A]:
  """Skip the first `n` elements of `xs`"""
  async for i, x in enumerate(xs):
    if i >= n:
      yield x

split(n, xs) async

Split xs into xs[:n], xs[n:]

Source code in haskellian/src/haskellian/asyn_iter/funcs.py
async def split(n: int, xs: AsyncIterable[A]) -> tuple[list[A], AsyncIterable[A]]:
  """Split `xs` into `xs[:n], xs[n:]`"""
  head = await syncify(take(n, xs))
  return head, xs

starmap(f, xs) async

Source code in haskellian/src/haskellian/asyn_iter/funcs.py
@AI.lift
async def starmap(f: Callable[[Unpack[As]], B], xs: AsyncIterable[tuple[Unpack[As]]]) -> AsyncIterable[B]:
  """"""
  async for x in xs:
    yield f(*x)

syncify(xs) async

Await all xs

Source code in haskellian/src/haskellian/asyn_iter/funcs.py
async def syncify(xs: AsyncIterable[A]) -> list[A]:
  """Await all `xs`"""
  return [x async for x in xs]

take(n, xs) async

Take the first n elements of xs

Source code in haskellian/src/haskellian/asyn_iter/funcs.py
@AI.lift
async def take(n: int, xs: AsyncIterable[A]) -> AsyncIterable[A]:
  """Take the first `n` elements of `xs`"""
  if n == 0:
    return
  async for i, x in enumerate(xs):
    if i < n - 1:
      yield x
    elif i >= n - 1:
      yield x
      return

lift(f)

Lift a function f to return an AsyncIter

Source code in haskellian/src/haskellian/asyn_iter/lifting.py
def lift(f: Callable[P, AsyncIterable[A]]) -> Callable[P, AI.AsyncIter[A]]:
  """Lift a function `f` to return an `AsyncIter`"""
  @wraps(f)
  def _f(*args: P.args, **kwargs: P.kwargs) -> AI.AsyncIter[A]:
    return AI.AsyncIter(f(*args, **kwargs))
  return _f

ManagedAsync

Bases: AsyncIter[A], Generic[A]

Managed async iterator

Source code in haskellian/src/haskellian/asyn_iter/managed.py
class ManagedAsync(AI.AsyncIter[A], Generic[A]):
  """Managed async iterator"""

  def __init__(self):
    self.xs: list[A] = []
    self._next = P.ManagedPromise()
    self.ended: bool = False

  def __repr__(self):
    return f'ManagedAsync({self.xs})'

  def push(self, value: A):
    self.xs.append(value)
    if not self._next.resolved:
      self._next.resolve()

  def end(self):
    self.ended = True
    if not self._next.resolved:
      self._next.resolve()

  def clear(self):
    self.xs = []
    self.ended = False

  def __aiter__(self) -> AsyncIterator[A]:
    return self

  async def __anext__(self) -> A:
    if len(self.xs) > 0:
      return self.xs.pop(0)
    elif self.ended:
      raise StopAsyncIteration()
    else:
      await self._next
      self._next = P.ManagedPromise()
      return await self.__anext__()

prefetched(prefetch, xs)

Prefetch prefetch elements from xs - If prefetched < 1, it'll be clipped to 1

Source code in haskellian/src/haskellian/asyn_iter/prefetching.py
@AI.lift
def prefetched(prefetch: int, xs: AsyncIterable[A]) -> AsyncIterable[A]:
  """Prefetch `prefetch` elements from `xs`
  - If `prefetched < 1`, it'll be clipped to `1`
  """
  buffer = asyncio.Queue(maxsize=max(prefetch, 1))
  sentinel = object()

  async def producer():
    async for x in xs:
      await buffer.put(x)
    await buffer.put(sentinel)

  async def consumer():
    while True:
      item = await buffer.get()
      if item is sentinel:
        break
      yield item

  asyncio.create_task(producer())
  return consumer()