Async Helpers

The machinery library provides a number of utilities to make it easier to use asyncio concepts without creating memory leaks or those annoying warnings that it produces when you have created asyncio tasks that aren’t awaited before the end of the program.

The consistent part of how this works are machinery.helpers.CTX objects that are loosely based of contexts in Go and allow us to create dependency chains where the parent being completed results in all children contexts also being completed.

protocol machinery.helpers.protocols.CTX

This object represents a chain of dependency that lets parents in the chain cancel children in the chain by cancelling themselves.

This is loosely based off how contexts work in Go and very loosely based off the ideas in languages like erlang with supervisor processes. It is very simple and is mainly about ensuring that if a parent is cancelled, so are it’s children.

It also contains some useful helpers for working with futures.

In python, asyncio.Future objects don’t have names and when you have a large program with lots of futures hanging around, it becomes very useful to be able to name them to understand what they are actually representing.

It is good practice for an object that holds onto one of these, to never cancel it’s own context and instead rely on it’s parent to cancel the context it is provided.

So usage looks like:

from machinery import helpers as hp

async def my_program() -> None:
    # The first context is created with ``hp.CTX.beginning()``
    # After this all contexts should be created using ``ctx.child``

    with ctx.child(name="SomeObject") as ctx_some_object:
        some_object = SomeObject(ctx=ctx_some_object)
        # some_object should never itself call ``self.ctx.cancel()``

        with ctx_some_object.child(name=...) as ctx_grandchild:
            ...

        ...

Objects that intend to finish should have a mechanism for signalling to itself it is done, rather than rely on cancelling the context. Stopping based off the ctx being done should only be an indication that the parent wishes to force the object to stop what it is doing.

Classes that implement this protocol must have the following methods / attributes:

property loop: AbstractEventLoop

The event loop this context is based off.

property name: str

The name associated with this context.

set_exception(exc: BaseException) None

Set an exception on this context and propagate that exception to all children contexts.

add_on_done(cb: FutureCTXCallback[None], index: FutureCallback[None] | None = None) FutureCallback[None]

Register a callback to be called when this context finishes.

The callback will be provided with both this context and the result of the future this context represents.

To add a done callback that doesn’t take in the context, use add_done_callback

If index is provided, then that will be used as the hashable object that is recognised by remove_done_callback to unregister the callback.

done() bool

Return True if the future this context represents is completed

cancel() bool

Cancel the future this context represents.

This is a safe no-op if the future is already done.

exception() BaseException | None

Get the exception this future was set with if there was one.

If the future is not complete this will raise an exception as like what happens with asyncio.Future objects.

cancelled() bool

Return True if the future represented by this context has been cancelled.

add_done_callback(cb: Callable[[FutureStatus[None]], None]) FutureCallback[None]

Register a callback to be called when this future completes.

If the future is already complete, the callback will be called straight away.

remove_done_callback(cb: Callable[[FutureStatus[None]], None]) int

Remove the callback from the callbacks on the future represented by this callback if that callback was represented.

Return the number of futures this callback was removed from (a context may hold onto many futures)

has_direct_done_callback(cb: Callable[[FutureStatus[None]], None]) bool

Return whether this context has this callback registered. If a parent context has this callback registered but this one does not, then True will not be returned.

async wait_for_first(*waits: WaitByCallback[Any] | Event) None

Given a number of futures, tasks or events, return when at least one of them is complete.

The default implementation of CTX will ensure that if any of these have been provided and one is already complete, that we will at least do an await asyncio.sleep(0) before returning.

async wait_for_all(*waits: WaitByCallback[Any] | Event) None

Given a number of futures, tasks or events, return only when all of them are complete.

The default implementation of CTX will ensure that if any of these have been provided and they are all already complete, that we will at least do an await asyncio.sleep(0) before returning.

async async_with_timeout(coro: Coroutine[object, object, T_Ret], *, name: str, silent: bool = True, timeout: int = 10, timeout_event: Event | None = None, timeout_error: BaseException | None = None) T_Ret

Wait for the provided coroutine has completed and either return the result from that coroutine, or raise the exception if the coroutine throws an exception, or through a timeout exception if the coroutine is still running after the provided timeout.

In the default implementation of CTX, If timeout_error is provided then that error will be raised if the timeout is reached, otherwise the coroutine will be sent an asyncio.CancelledError() and that will be raised.

The timeout_event will be set if the timeout is reached before the task is complete, otherwise it is never set.

The default implementation will return only after the task is finished cleaning up, which may be some time after the timeout if the task catches the asyncio.CancelledError and does more work.

async_as_background(coro: Coroutine[object, object, T_Ret], *, silent: bool = True) Task

In the default implementation of CTX, this will create an asyncio.Task from this coroutine and provide either self.reporter or self.silent_reporter as a done callback depending on the result of silent

It is up to the user to ensure that this task is awaited at some point to avoid asyncio warnings about the task never being awaited.

For example:

task = ctx.async_as_background(my_async_function())
try:
    await ctx.wait_for_first(task, some_other_event)
finally:
    task.cancel()
    await ctx.wait_for_all(task)
child(*, name: str, log: Logger | None = None, prefix: str = '') Self

Create a child context with the provided name and prefix.

If prefix is provided then the default implementation will set the name to be [{prefix}]-->{name}

The child context will know about all the futures held by this context.

__await__() Generator[None]

Wait for this context to be complete

__enter__() Self

Using the context as a context manager will ensure that it is cancelled when it goes out of scope.

__exit__(exc_type: type[BaseException] | None, value: BaseException | None, tb: TracebackType | None) None

Ensure the context is cancelled when it goes out of scope.

set_future_name(fut: Future[Any], *, name: str) None

Given some future, give it a name. This should be done such that the name can be retrieved by the sibling get_future_name function.

get_future_name(fut: Future[Any]) str | None

Given some future, return it’s name as set by set_future_name.

If no name was set, then return None.

log_info(msg: str) None

Log a simple message somewhere. It is up to the implementation to determine what that means.

log_exception(msg: object, *, exc_info: tuple[type[BaseException], BaseException, TracebackType | None] | None = None) None

Log an exception somewhere. It is up to the implementation to determine what that means.

fut_to_string(f: Future[Any] | WithRepr, with_name: bool = True) str

Given some future, or simply an object with the abiliy to call repr on it, return a string representing the future.

It is up to the implementation to determine how that actually works.

The default implementation in machinery will report whether the future is pending, cancelled, has an exception or has a result. It will also provide the name as retrieved from get_future_name if with_name is provided as True.

property reporter: FutureCallback[Any]

Return a callable that can be provided as a done callback for a future.

It is good practice to always give a done callback to a future that looks at cancelled, exception or result depending on the status of the future, to prevent asyncio from complaining they weren’t accessed.

The implementation of this callable should log when the future was finished with an exception.

property silent_reporter: FutureCallback[Any]

Return a callable that can be provided as a done callback for a future.

It is good practice to always give a done callback to a future that looks at cancelled, exception or result depending on the status of the future, to prevent asyncio from complaining they weren’t accessed.

The implementation of this callable should not log when the future was finished with an exception.

The ticker

protocol machinery.helpers.protocols.Ticker

An object that yields on a specific tick schedule.

The default implementation is found via hp.tick.

Classes that implement this protocol must have the following methods / attributes:

__aiter__() AsyncGenerator[tuple[int, float]]

Yield according to the tick schedule.

Each value yielded should be a tuple of (iteration, time_till_next) where iteration is a counter of how many times we yield a value starting from 1 and the time_till_next is the number of seconds till the next time we yield a value.

change_after(every: int, *, set_new_every: bool = True) None

Change the tick schedule.

For example change_after(20) will mean the next tick is 20 seconds after whenever the last tick was.

Setting set_new_every=False will mean this is a once only change in the schedule, otherwise this new every will become the new schedule.

property pauser: Semaphore | None

Pause the ticking whilst this pauser is locked.

machinery.helpers.tick(every: int, *, ctx: CTX, max_iterations: int | None = None, max_time: int | None = None, min_wait: float = 0.1, name: str = '', pauser: Semaphore | None = None) AsyncGenerator[Ticker]

This object gives you an async generator that yields every every seconds, taking into account how long it takes for your code to finish for the next yield.

For example:

from machinery import helpers as hp

import time


start = time.time()
timing = []

ctx: hp.CTX = ...

async with hp.tick(10, ctx=ctx) as ticker:
    async for _ in ticker:
        timing.append(time.time() - start)
        asyncio.sleep(8)
        if len(timing) >= 5:
            break

assert timing == [0, 10, 20, 30, 40]

The value that is yielded is a tuple of (iteration, time_till_next) where iteration is a counter of how many times we yield a value starting from 1 and the time_till_next is the number of seconds till the next time we yield a value.

Note that the schedule value can be changed during iteration:

from machinery import helpers as hp

ctx: hp.CTX = ...

async with hp.tick(10, ctx=ctx) as ticker:
    done = 0

    async for _ in ticker:
        done += 1
        if done == 3:
            # This will mean the next tick will be 20 seconds after the last
            # tick and future ticks will be 20 seconds apart
            ticker.change_after(20)
        elif done == 5:
            # This will mean the next tick will be 40 seconds after the last
            # tick, but ticks after that will go back to 20 seconds apart.
            ticker.change_after(40, set_new_every=False)

There are other options:

ctx

If this ctx is completed then the iteration will stop

max_iterations

Iterations after this number will cause the loop to finish. By default there is no limit

max_time

After this many iterations the loop will stop. By default there is no limit

min_wait

The minimum amount of time to wait after a tick.

If this is False then we will always tick at the next expected time, otherwise we ensure this amount of time at a minimum between ticks

pauser

If not None, we use this as a semaphore that will pause the ticks when it is locked.

Task holder

protocol machinery.helpers.protocols.TaskHolder

An object that can be used to create asyncio.Task objects and ensure that they are cleaned up correctly without causing asyncio to complain about tasks that are never awaited.

The default implementation is provided by using hp.task_holder:

from machinery import helpers as hp

ctx: hp.CTX = ...

async with hp.task_holder(ctx=ctx) as ts:
    task = ts.add_coroutine(some_async_function())

When the context manager is exited then tasks will not be cancelled unless the parent ctx has been cancelled. During this time, more tasks may be added to the task holder.

Once the parent ctx has been cancelled, then all the tasks that are held will be cancelled and awaited.

Classes that implement this protocol must have the following methods / attributes:

__contains__(task: Task[object]) bool

Return whether this holds onto the provided task.

__iter__() Iterator[WaitByCallback[object]]

Yield all the tasks currently held by this object.

add_coroutine(coro: Coroutine[object, object, T_Ret], *, silent: bool = False) Task

Create a task from this coroutine and ensure that it gets cleaned up eventually.

The silent argument should be used to say whether exceptions from this coroutine should be logged or not.

add_task(task: Task) Task

Track this asyncio.Task object and ensure it is cleaned up correctly eventually.

property pending: int

Return the number of pending tasks that are held by this object.

machinery.helpers.task_holder(*, ctx: CTX, name: str = '') AsyncGenerator[TaskHolder]

An object for managing asynchronous coroutines.

Usage looks like:

from machinery import helpers as hp


async def my_async_program(ctx: hp.CTX) -> None:
    async def something():
        await asyncio.sleep(5)

    async with hp.task_holder(ctx=ctx) as ts:
        ts.add_coroutine(something())
        ts.add_coroutine(something())

If you don’t want to use the context manager, you can say:

from machinery import helpers as hp
import contextlib


async def something():
    await asyncio.sleep(5)

async def my_async_program(ctx: hp.CTX) -> None:
    exit_stack = contextlib.AsyncExitStack()

    ts = await exit_stack.enter_async_context(hp.task_holder(ctx=ctx))

    try:
        ts.add_coroutine(something())
        ts.add_coroutine(something())
    finally:
        await exit_stack.aclose()

Once your block in the context manager is done the context manager won’t exit until all coroutines have finished. During this time you may still use ts.add or ts.add_task on the holder.

If the ctx is cancelled before all the tasks have completed then the tasks will be cancelled and properly waited on so their finally blocks run before the context manager finishes.

ts.add_coroutine will also return the task object that is made from the coroutine.

ts.add_coroutine also takes a silent=False parameter, that when True will not log any errors that happen. Otherwise errors will be logged.

If you already have a task object, you can give it to the holder with ts.add_task(my_task).

Queues

class machinery.helpers.EnsureItemGetter

Used to return a QueueItemDef object that pretends to ensure that the item is T_Item.

This is useful when we want to create a type safe Queue without the extra CPU cycles to ensure that the items added to the queue are indeed of T_Item.

Usage is:

item_ensurer = EnsureItemGetter[MyAmazingType].get()

reveal_type(item_ensurer) # _protocols.QueueItemDef[MyAmazingType]
protocol machinery.helpers.protocols.SyncQueue

Represents an object that can be used as a synchronous queue.

The default implementation is via hp.sync_queue and wraps the standard library queue.Queue class.

Classes that implement this protocol must have the following methods / attributes:

__iter__() Iterator[T_Item]

Iterate over the items in the queue. This is re-entrant if the iteration is stopped and restarted.

__len__() int

Return how many items are in the queue.

__type_params__ = (T_Item,)
append(item: T_Item) None

Add something to the queue

get_all() Iterator[T_Item]

Returns an iterator over all the items in the queue. This is re-entrant if the iteration is stopped and restarted.

is_empty() bool

Return true if the queue is currently empty

remaining() Iterator[T_Item]

Returns an iterator that returns everything that remains in the queue.

Useful after the queue has stopped and there are still items remaining.

This iteration will not wait for new items to be added to the queue when all the remaining items are yielded

machinery.helpers.sync_queue(*, ctx: CTX, timeout: float = 0.05, empty_on_finished: bool = False, name: str = '', item_ensurer: QueueItemDef | None = None) Iterator[SyncQueue] | Iterator[SyncQueue[object]]

A simple wrapper around the standard library non async queue.

Usage is:

from machinery import helpers as hp

ctx: hp.CTX = ...

with hp.sync_queue(ctx=ctx) as sync_queue:
    async def results():
        for result in sync_queue:
            print(result)

    ...

    sync_queue.append(something)
    sync_queue.append(another)

If empty_on_finished is set to True, then the queue will keep yielding what items remain after ctx is complete.

The item_ensurer can be passed in as a function that takes a single object and returns an object matching T_Item. This allows us to ensure the return type is a Queue that yields T_Item objects. If there is a strong guarantee that the objects provided to the queue will always be the correct type then hp.EnsureItemGetter can be used to return an object for item_ensurer that is typed as returning T_Item but will never be executed by the queue.

If no item_ensurer is provided then the queue will be yielding objects of type object.

If a timeout is provided, then that time out will be used when waiting for a new result on the standard library Queue.get(timeout=...)

protocol machinery.helpers.protocols.Queue

These objects represent an asynchronous queue of values.

Classes that implement this protocol must have the following methods / attributes:

__aiter__() AsyncGenerator[T_Item]

Asynchronously yield the values in the queue as they are added

__len__() int

Return the number of items left in the queue.

__type_params__ = (T_Item,)
append(item: T_Item, *, priority: bool = False) None

Add an object to the end of the queue.

If priority is set to True, then the item is added to the front of the queue.

append_instruction(cb: Callable[[], None], *, priority: bool = False) None

Add a callable to the end of the queue.

If priority is set to True, then the callable is added to the front of the queue.

When the queue gets to these callables, the callable is returned and nothing is directly yielded as a result.

property breaker: Event

When this is set, the iteration will exit.

The default implementation will always clear this when iteration is restarted.

property get_all: Callable[[], AsyncGenerator[T_Item]]

Return an async generator that yields all the values in the queue as they are added.

is_empty() bool

Return True if there are no items left in the queue

process_after_yielded(process: Callable[[LimitedQueue[T_Item]], None], /) None

Register a callback to be called whenever a value has been yielded.

The callback will be provided the queue itself and is typed to only accept a limited API for that queue.

remaining() Iterator[T_Item]

Yield all the remaining values in the queue. This is useful when the queue is stopped but there are still values left.

This will not wait for another value when it runs out of values to yield.

machinery.helpers.queue(*, ctx: CTX, empty_on_finished: bool = False, name: str = '', item_ensurer: QueueItemDef | None = None) Iterator[Queue] | Iterator[Queue[object]]

Returns an object that can asynchronously yield the values it gets given.

Usage is:

from machinery import helpers as hp

ctx: hp.CTX = ...

with hp.queue(ctx=ctx_queue) as queue:

    async def results():
        # This will continue forever until ctx is done
        async for result in queue:
            print(result)

    ...

    queue.append(something)
    queue.append(another)

Note that the main difference between this and the standard library asyncio.Queue other than a slighly different API surface, is that this one does not have the ability to impose limits.

Queue Manager

protocol machinery.helpers.protocols.QueueFeeder

This represents one of the two objects returned by hp.queue_manager and is used to feed items into the streamer it also creates.

In the default implementation, sources may return “extendable” values which will be then used themselves as input sources rather than yielded.

These include:

  • callables that only take on argument

  • async generators

  • sync generators

  • Coroutine objects

  • asyncio.Task objects

It does not include normal iterable objects like lists.

Classes that implement this protocol must have the following methods / attributes:

__type_params__ = (T_QueueContext,)
add_async_generator(agen: AsyncGenerator[object], *, context: T_QueueContext | None = None) None

Add an async generator as a source.

In the default implementation, any “extendable” result will be used as an input source rather than given to the streamer as a value.

In the default implementation, every iteration will be added to the end of the streamer as they happen.

add_coroutine(coro: Coroutine[object, object, object], *, context: T_QueueContext | None = None) None

Add a coroutine as a source.

In the default implementation, this coroutine is used to create an asyncio.Task object that is passed into add_task.

add_sync_function(func: Callable[[], object], *, context: T_QueueContext | None = None) None

Register a synchronous function as an input source.

The value from this function will be provided to the streamer with the provided context.

In the default implementation, an “extendable” result will be used as an input source rather than given to the streamer as a value.

add_sync_iterator(iterator: Iterable[object] | Iterator[object], *, context: T_QueueContext | None = None) None

Register a synchronous iterator as an input source.

This can either be a normal Generator object or any other iterable object like a list.

In the default implementation, any “extendable” result will be used as an input source rather than given to the streamer as a value.

In the default implementation, every iteration will be added to the end of the streamer as they happen.

add_task(task: asyncio.Task[object], *, context: T_QueueContext | None = None) None

Add an asyncio.Task as a source.

In the default implementation, if the result is “extendable” then it will be used as an input source rather than given to the streamer as a value

add_value(value: object, *, context: T_QueueContext | None = None) None

Add a single value to the streamer

In the default implementation, if the value is “extendable” then it will be used as an input source rather than given to the streamer as a value

set_as_finished_if_out_of_sources() None

Calling this will ensure that the streamer stops waiting for more input after all sources have dried up.

protocol machinery.helpers.protocols.Streamer

This represents one of the two objects returned by hp.queue_manager and is used to yield the objects that are fed into it by the feeder.

Classes that implement this protocol must have the following methods / attributes:

__aiter__() AsyncGenerator[T_Item]

Yield the values in the streamer as they come in

__type_params__ = (T_Item,)
property breaker: Event

When this event is set, the streamer will finish

remaining() Iterator[T_Item]

Yield whatever remains in the streamer until no more values are left.

Useful if the iteration is exited early and there are items left.

machinery.helpers.queue_manager(*, ctx: CTX, make_empty_context: Callable[[], T_QueueContext], name: str = '') AsyncGenerator[tuple[Streamer[QueueManagerResult], QueueFeeder]]

Create and manager a (streamer, feeder) pair that can be used to manage a stream of values.

Usage is as follows:

from machinery import helpers as hp
from typing import assert_never

ctx: hp.CTX = ...

async with hp.queue_manager(ctx=ctx) as (streamer, feeder):
    feeder.add...
    feeder.add...
    feeder.add...
    feeder.set_as_finished_if_out_of_sources()

    async for result in streamer:
        match result:
            case hp.QueueManagerSuccess():
                ...
            case hp.QueueManagerFailure():
                ...
            case hp.QueueManagerIterationStop():
                ...
            case hp.QueueManagerStopped():
                ...
            case _:
                assert_never(result)

The feeder can be added to with a number of methods as found on the hp.protocols.QueueFeeder protocol. These can be used even after the set_as_finished_if_out_of_sources method has been called and at any time.

The set_as_finished_if_out_of_sources method says that the streamer will not keep waiting when all it’s sources are exhausted and nothing is left to stream.

The QueueManagerStopped will be sent to the queue when the ctx is complete and may be received before the streamer is finished yielding values.

When values are added to the feeder, a context can be provided that will accompany all results.

To finish a streamer early, streamer.breaker.set() can be called.

Values that come from input sources may be “extended” where they are used as additional sources rather than added as results.

These include:

  • callables that only take on argument

  • async generators

  • sync generators

  • Coroutine objects

  • asyncio.Task objects

enum machinery.helpers.QueueInput(value)

An enum of the different source types provided to the feeder

Valid values are as follows:

SYNC_FUNCTION = <QueueInput.SYNC_FUNCTION: 'SYNC_FUNCTION'>
SYNC_GENERATOR = <QueueInput.SYNC_GENERATOR: 'SYNC_GENERATOR'>
SYNC_ITERATOR = <QueueInput.SYNC_ITERATOR: 'SYNC_ITERATOR'>
VALUE = <QueueInput.VALUE: 'VALUE'>
COROUTINE = <QueueInput.COROUTINE: 'COROUTINE'>
TASK = <QueueInput.TASK: 'TASK'>
ASYNC_GENERATOR = <QueueInput.ASYNC_GENERATOR: 'ASYNC_GENERATOR'>
class machinery.helpers.QueueManagerSuccess(*, sources: Sequence[tuple[QueueInput, object]], value: object, context: T_QueueContext)

These are yielded when we have a value being successfully provided by an input source.

The sources list will be a list of tuples of the source type and the object used as that source.

The context will be the object provided by the first source that led to this value

class machinery.helpers.QueueManagerFailure(*, sources: Sequence[tuple[QueueInput, object]], exception: BaseException, context: T_QueueContext)

These are yielded when we have an input source raising an exception.

The sources list will be a list of tuples of the source type and the object used as that source.

The context will be the object provided by the first source that led to this value

class machinery.helpers.QueueManagerIterationStop(*, sources: Sequence[tuple[QueueInput, object]], exception: BaseException | None, context: T_QueueContext)

These are yielded when we have an iteration (either sync or async) reaching it’s end.

The sources list will be a list of tuples of the source type and the object used as that source.

The context will be the object provided by the first source that led to this value

class machinery.helpers.QueueManagerStopped(*, exception: BaseException | None = None)

This is yielded when the queue itself has stopped. If it reached a natural end then the exception will be None.

Also provided is the type alias for the union of types that the streamer may produce as machinery.helpers.QueueManagerResult:

QueueManagerResult

type QueueManagerResult[T_QueueContext] = (
    QueueManagerSuccess[T_QueueContext]
    | QueueManagerFailure[T_QueueContext]
    | QueueManagerIterationStop[T_QueueContext]
    | QueueManagerStopped
)

Odd helpers

There are few standalone helpers for some odd functionality:

machinery.helpers.ensure_aexit(instance: AbstractAsyncContextManager[object]) AbstractAsyncContextManager[None]

Used to make sure a manual async context manager calls __aexit__ if __aenter__ fails.

Turns out if __aenter__ raises an exception, then __aexit__ doesn’t get called. This is a helper to make it easy to ensure that does happen.

Usage is as follows:

import types

from machinery import helpers as hp


class MyCM:
    async def __aenter__(self) -> None:
        async with hp.ensure_aexit(self):
            return await self.start()

    async def start(self) -> ...:
        ...

    async def __aexit__(
        self,
        exc_typ: type[BaseException] | None,
        value: BaseException | None,
        tb: types.TracebackType | None,
    ) -> None:
        await self.finish(exc_typ, value, tb)

    async def finish(
        self,
        exc_typ: type[BaseException] | None = None,
        value: BaseException | None = None,
        tb: types.TracebackType | None = None,
    ) -> None:
        ...
async machinery.helpers.stop_async_generator(gen: AsyncGenerator[object, T_Send | None], provide: T_Send | None = None, exc: BaseException | None = None) None

This will ensure an async generator has stopped.

It will first throw an exception into the generator using gen.athrow. This is either an asyncio.CancelledError or the exc that is provided if that is not None.

Then we will send either provide with gen.asend.

Finally, regardless of exceptions, gen.aclose will be called.

machinery.helpers.noncancelled_results_from_futs(futs: Sequence[FutureStatus]) tuple[BaseException | BaseExceptionGroup | None, Sequence]

Get back (exception, results) from a list of futures

exception

A single exception if all the errors are the same type or if there is only one exception

otherwise it is None

results

A list of the results that exist

machinery.helpers.find_and_apply_result(final: Future, futs: Sequence[Future]) bool

Find a result in futs with a result or exception and set that result/exception on both final, and all the settable futures in futs.

As a bonus, if final is set, then we set that result/exception on all futures in futs.

and if final is cancelled, we cancel all the futures in futs

Return True if we’ve changed final

And some standalone protocols for some concepts:

protocol machinery.helpers.protocols.FutureStatus

Represents a read-only view of a Future

Classes that implement this protocol must have the following methods / attributes:

__type_params__ = (T_Ret,)
cancelled() bool
done() bool
exception() BaseException | None
result() T_Ret
protocol machinery.helpers.protocols.FutureCallback

Represents an object that’s used as a done_callback for a future.

It can be hashed and it is a callable object that takes in the status of a completed future.

Classes that implement this protocol must have the following methods / attributes:

__call__(res: FutureStatus[T_Ret], /) None

Call self as a function.

__hash__() int

Return hash(self).

__type_params__ = (T_Ret,)
protocol machinery.helpers.protocols.FutureCTXCallback

Represents an object that’s used as a done_callback for a future but also takes in a CTX object.

Classes that implement this protocol must have the following methods / attributes:

__call__(ctx: CTX, res: FutureStatus[T_Ret], /) None

Call self as a function.

__hash__() int

Return hash(self).

__type_params__ = (T_Ret,)
protocol machinery.helpers.protocols.WaitByCallback

Represents an object that supports adding and removing callbacks for when the object is done.

Classes that implement this protocol must have the following methods / attributes:

__type_params__ = (T_Ret,)
add_done_callback(cb: Callable[[FutureStatus[T_Ret]], None]) None | FutureCallback[T_Ret]
cancel() bool
done() bool
remove_done_callback(cb: Callable[[FutureStatus[T_Ret]], None]) int
protocol machinery.helpers.protocols.WithRepr

Represents an object we can call repr with.

Classes that implement this protocol must have the following methods / attributes:

__repr__() str

Return repr(self).