Async Helpers
The machinery library provides a number of utilities to make it easier to
use asyncio concepts without creating memory leaks or those annoying warnings
that it produces when you have created asyncio tasks that aren’t awaited before
the end of the program.
The consistent part of how this works are machinery.helpers.CTX objects
that are loosely based of contexts in Go and allow us to create dependency
chains where the parent being completed results in all children contexts also
being completed.
- protocol machinery.helpers.protocols.CTX
This object represents a chain of dependency that lets parents in the chain cancel children in the chain by cancelling themselves.
This is loosely based off how contexts work in Go and very loosely based off the ideas in languages like erlang with supervisor processes. It is very simple and is mainly about ensuring that if a parent is cancelled, so are it’s children.
It also contains some useful helpers for working with futures.
In python, asyncio.Future objects don’t have names and when you have a large program with lots of futures hanging around, it becomes very useful to be able to name them to understand what they are actually representing.
It is good practice for an object that holds onto one of these, to never cancel it’s own context and instead rely on it’s parent to cancel the context it is provided.
So usage looks like:
from machinery import helpers as hp async def my_program() -> None: # The first context is created with ``hp.CTX.beginning()`` # After this all contexts should be created using ``ctx.child`` with ctx.child(name="SomeObject") as ctx_some_object: some_object = SomeObject(ctx=ctx_some_object) # some_object should never itself call ``self.ctx.cancel()`` with ctx_some_object.child(name=...) as ctx_grandchild: ... ...
Objects that intend to finish should have a mechanism for signalling to itself it is done, rather than rely on cancelling the context. Stopping based off the ctx being done should only be an indication that the parent wishes to force the object to stop what it is doing.
Classes that implement this protocol must have the following methods / attributes:
- property loop: AbstractEventLoop
The event loop this context is based off.
- property name: str
The name associated with this context.
- set_exception(exc: BaseException) None
Set an exception on this context and propagate that exception to all children contexts.
- add_on_done(cb: FutureCTXCallback[None], index: FutureCallback[None] | None = None) FutureCallback[None]
Register a callback to be called when this context finishes.
The callback will be provided with both this context and the result of the future this context represents.
To add a done callback that doesn’t take in the context, use
add_done_callbackIf index is provided, then that will be used as the hashable object that is recognised by
remove_done_callbackto unregister the callback.
- done() bool
Return True if the future this context represents is completed
- cancel() bool
Cancel the future this context represents.
This is a safe no-op if the future is already done.
- exception() BaseException | None
Get the exception this future was set with if there was one.
If the future is not complete this will raise an exception as like what happens with
asyncio.Futureobjects.
- cancelled() bool
Return True if the future represented by this context has been cancelled.
- add_done_callback(cb: Callable[[FutureStatus[None]], None]) FutureCallback[None]
Register a callback to be called when this future completes.
If the future is already complete, the callback will be called straight away.
- remove_done_callback(cb: Callable[[FutureStatus[None]], None]) int
Remove the callback from the callbacks on the future represented by this callback if that callback was represented.
Return the number of futures this callback was removed from (a context may hold onto many futures)
- has_direct_done_callback(cb: Callable[[FutureStatus[None]], None]) bool
Return whether this context has this callback registered. If a parent context has this callback registered but this one does not, then True will not be returned.
- async wait_for_first(*waits: WaitByCallback[Any] | Event) None
Given a number of futures, tasks or events, return when at least one of them is complete.
The default implementation of
CTXwill ensure that if any of these have been provided and one is already complete, that we will at least do anawait asyncio.sleep(0)before returning.
- async wait_for_all(*waits: WaitByCallback[Any] | Event) None
Given a number of futures, tasks or events, return only when all of them are complete.
The default implementation of
CTXwill ensure that if any of these have been provided and they are all already complete, that we will at least do anawait asyncio.sleep(0)before returning.
- async async_with_timeout(coro: Coroutine[object, object, T_Ret], *, name: str, silent: bool = True, timeout: int = 10, timeout_event: Event | None = None, timeout_error: BaseException | None = None) T_Ret
Wait for the provided coroutine has completed and either return the result from that coroutine, or raise the exception if the coroutine throws an exception, or through a timeout exception if the coroutine is still running after the provided timeout.
In the default implementation of
CTX, Iftimeout_erroris provided then that error will be raised if the timeout is reached, otherwise the coroutine will be sent anasyncio.CancelledError()and that will be raised.The
timeout_eventwill be set if the timeout is reached before the task is complete, otherwise it is never set.The default implementation will return only after the task is finished cleaning up, which may be some time after the timeout if the task catches the
asyncio.CancelledErrorand does more work.
- async_as_background(coro: Coroutine[object, object, T_Ret], *, silent: bool = True) Task
In the default implementation of
CTX, this will create anasyncio.Taskfrom this coroutine and provide eitherself.reporterorself.silent_reporteras a done callback depending on the result ofsilentIt is up to the user to ensure that this task is awaited at some point to avoid asyncio warnings about the task never being awaited.
For example:
task = ctx.async_as_background(my_async_function()) try: await ctx.wait_for_first(task, some_other_event) finally: task.cancel() await ctx.wait_for_all(task)
- child(*, name: str, log: Logger | None = None, prefix: str = '') Self
Create a child context with the provided name and prefix.
If prefix is provided then the default implementation will set the name to be
[{prefix}]-->{name}The child context will know about all the futures held by this context.
- __await__() Generator[None]
Wait for this context to be complete
- __enter__() Self
Using the context as a context manager will ensure that it is cancelled when it goes out of scope.
- __exit__(exc_type: type[BaseException] | None, value: BaseException | None, tb: TracebackType | None) None
Ensure the context is cancelled when it goes out of scope.
- set_future_name(fut: Future[Any], *, name: str) None
Given some future, give it a name. This should be done such that the name can be retrieved by the sibling
get_future_namefunction.
- get_future_name(fut: Future[Any]) str | None
Given some future, return it’s name as set by
set_future_name.If no name was set, then return None.
- log_info(msg: str) None
Log a simple message somewhere. It is up to the implementation to determine what that means.
- log_exception(msg: object, *, exc_info: tuple[type[BaseException], BaseException, TracebackType | None] | None = None) None
Log an exception somewhere. It is up to the implementation to determine what that means.
- fut_to_string(f: Future[Any] | WithRepr, with_name: bool = True) str
Given some future, or simply an object with the abiliy to call
repron it, return a string representing the future.It is up to the implementation to determine how that actually works.
The default implementation in machinery will report whether the future is pending, cancelled, has an exception or has a result. It will also provide the name as retrieved from
get_future_nameifwith_nameis provided asTrue.
- property reporter: FutureCallback[Any]
Return a callable that can be provided as a done callback for a future.
It is good practice to always give a done callback to a future that looks at
cancelled,exceptionorresultdepending on the status of the future, to preventasynciofrom complaining they weren’t accessed.The implementation of this callable should log when the future was finished with an exception.
- property silent_reporter: FutureCallback[Any]
Return a callable that can be provided as a done callback for a future.
It is good practice to always give a done callback to a future that looks at
cancelled,exceptionorresultdepending on the status of the future, to preventasynciofrom complaining they weren’t accessed.The implementation of this callable should not log when the future was finished with an exception.
The ticker
- protocol machinery.helpers.protocols.Ticker
An object that yields on a specific tick schedule.
The default implementation is found via
hp.tick.Classes that implement this protocol must have the following methods / attributes:
- __aiter__() AsyncGenerator[tuple[int, float]]
Yield according to the tick schedule.
Each value yielded should be a tuple of
(iteration, time_till_next)whereiterationis a counter of how many times we yield a value starting from 1 and thetime_till_nextis the number of seconds till the next time we yield a value.
- change_after(every: int, *, set_new_every: bool = True) None
Change the tick schedule.
For example change_after(20) will mean the next tick is 20 seconds after whenever the last tick was.
Setting
set_new_every=Falsewill mean this is a once only change in the schedule, otherwise this neweverywill become the new schedule.
- property pauser: Semaphore | None
Pause the ticking whilst this pauser is locked.
- machinery.helpers.tick(every: int, *, ctx: CTX, max_iterations: int | None = None, max_time: int | None = None, min_wait: float = 0.1, name: str = '', pauser: Semaphore | None = None) AsyncGenerator[Ticker]
This object gives you an async generator that yields every
everyseconds, taking into account how long it takes for your code to finish for the next yield.For example:
from machinery import helpers as hp import time start = time.time() timing = [] ctx: hp.CTX = ... async with hp.tick(10, ctx=ctx) as ticker: async for _ in ticker: timing.append(time.time() - start) asyncio.sleep(8) if len(timing) >= 5: break assert timing == [0, 10, 20, 30, 40]
The value that is yielded is a tuple of (iteration, time_till_next) where
iterationis a counter of how many times we yield a value starting from 1 and thetime_till_nextis the number of seconds till the next time we yield a value.Note that the schedule value can be changed during iteration:
from machinery import helpers as hp ctx: hp.CTX = ... async with hp.tick(10, ctx=ctx) as ticker: done = 0 async for _ in ticker: done += 1 if done == 3: # This will mean the next tick will be 20 seconds after the last # tick and future ticks will be 20 seconds apart ticker.change_after(20) elif done == 5: # This will mean the next tick will be 40 seconds after the last # tick, but ticks after that will go back to 20 seconds apart. ticker.change_after(40, set_new_every=False)
There are other options:
- ctx
If this ctx is completed then the iteration will stop
- max_iterations
Iterations after this number will cause the loop to finish. By default there is no limit
- max_time
After this many iterations the loop will stop. By default there is no limit
- min_wait
The minimum amount of time to wait after a tick.
If this is False then we will always tick at the next expected time, otherwise we ensure this amount of time at a minimum between ticks
- pauser
If not None, we use this as a semaphore that will pause the ticks when it is locked.
Task holder
- protocol machinery.helpers.protocols.TaskHolder
An object that can be used to create asyncio.Task objects and ensure that they are cleaned up correctly without causing asyncio to complain about tasks that are never awaited.
The default implementation is provided by using
hp.task_holder:from machinery import helpers as hp ctx: hp.CTX = ... async with hp.task_holder(ctx=ctx) as ts: task = ts.add_coroutine(some_async_function())
When the context manager is exited then tasks will not be cancelled unless the parent
ctxhas been cancelled. During this time, more tasks may be added to the task holder.Once the parent
ctxhas been cancelled, then all the tasks that are held will be cancelled and awaited.Classes that implement this protocol must have the following methods / attributes:
- __contains__(task: Task[object]) bool
Return whether this holds onto the provided task.
- __iter__() Iterator[WaitByCallback[object]]
Yield all the tasks currently held by this object.
- add_coroutine(coro: Coroutine[object, object, T_Ret], *, silent: bool = False) Task
Create a task from this coroutine and ensure that it gets cleaned up eventually.
The silent argument should be used to say whether exceptions from this coroutine should be logged or not.
- add_task(task: Task) Task
Track this asyncio.Task object and ensure it is cleaned up correctly eventually.
- property pending: int
Return the number of pending tasks that are held by this object.
- machinery.helpers.task_holder(*, ctx: CTX, name: str = '') AsyncGenerator[TaskHolder]
An object for managing asynchronous coroutines.
Usage looks like:
from machinery import helpers as hp async def my_async_program(ctx: hp.CTX) -> None: async def something(): await asyncio.sleep(5) async with hp.task_holder(ctx=ctx) as ts: ts.add_coroutine(something()) ts.add_coroutine(something())
If you don’t want to use the context manager, you can say:
from machinery import helpers as hp import contextlib async def something(): await asyncio.sleep(5) async def my_async_program(ctx: hp.CTX) -> None: exit_stack = contextlib.AsyncExitStack() ts = await exit_stack.enter_async_context(hp.task_holder(ctx=ctx)) try: ts.add_coroutine(something()) ts.add_coroutine(something()) finally: await exit_stack.aclose()
Once your block in the context manager is done the context manager won’t exit until all coroutines have finished. During this time you may still use
ts.addorts.add_taskon the holder.If the
ctxis cancelled before all the tasks have completed then the tasks will be cancelled and properly waited on so their finally blocks run before the context manager finishes.ts.add_coroutinewill also return the task object that is made from the coroutine.ts.add_coroutinealso takes asilent=Falseparameter, that when True will not log any errors that happen. Otherwise errors will be logged.If you already have a task object, you can give it to the holder with
ts.add_task(my_task).
Queues
- class machinery.helpers.EnsureItemGetter
Used to return a QueueItemDef object that pretends to ensure that the item is
T_Item.This is useful when we want to create a type safe Queue without the extra CPU cycles to ensure that the items added to the queue are indeed of T_Item.
Usage is:
item_ensurer = EnsureItemGetter[MyAmazingType].get() reveal_type(item_ensurer) # _protocols.QueueItemDef[MyAmazingType]
- protocol machinery.helpers.protocols.SyncQueue
Represents an object that can be used as a synchronous queue.
The default implementation is via
hp.sync_queueand wraps the standard libraryqueue.Queueclass.Classes that implement this protocol must have the following methods / attributes:
- __iter__() Iterator[T_Item]
Iterate over the items in the queue. This is re-entrant if the iteration is stopped and restarted.
- __len__() int
Return how many items are in the queue.
- __type_params__ = (T_Item,)
- append(item: T_Item) None
Add something to the queue
- get_all() Iterator[T_Item]
Returns an iterator over all the items in the queue. This is re-entrant if the iteration is stopped and restarted.
- is_empty() bool
Return true if the queue is currently empty
- remaining() Iterator[T_Item]
Returns an iterator that returns everything that remains in the queue.
Useful after the queue has stopped and there are still items remaining.
This iteration will not wait for new items to be added to the queue when all the remaining items are yielded
- machinery.helpers.sync_queue(*, ctx: CTX, timeout: float = 0.05, empty_on_finished: bool = False, name: str = '', item_ensurer: QueueItemDef | None = None) Iterator[SyncQueue] | Iterator[SyncQueue[object]]
A simple wrapper around the standard library non async queue.
Usage is:
from machinery import helpers as hp ctx: hp.CTX = ... with hp.sync_queue(ctx=ctx) as sync_queue: async def results(): for result in sync_queue: print(result) ... sync_queue.append(something) sync_queue.append(another)
If
empty_on_finishedis set to True, then the queue will keep yielding what items remain afterctxis complete.The
item_ensurercan be passed in as a function that takes a singleobjectand returns an object matchingT_Item. This allows us to ensure the return type is a Queue that yieldsT_Itemobjects. If there is a strong guarantee that the objects provided to the queue will always be the correct type thenhp.EnsureItemGettercan be used to return an object foritem_ensurerthat is typed as returningT_Itembut will never be executed by the queue.If no
item_ensureris provided then the queue will be yielding objects of typeobject.If a
timeoutis provided, then that time out will be used when waiting for a new result on the standard libraryQueue.get(timeout=...)
- protocol machinery.helpers.protocols.Queue
These objects represent an asynchronous queue of values.
Classes that implement this protocol must have the following methods / attributes:
- __aiter__() AsyncGenerator[T_Item]
Asynchronously yield the values in the queue as they are added
- __len__() int
Return the number of items left in the queue.
- __type_params__ = (T_Item,)
- append(item: T_Item, *, priority: bool = False) None
Add an object to the end of the queue.
If
priorityis set to True, then the item is added to the front of the queue.
- append_instruction(cb: Callable[[], None], *, priority: bool = False) None
Add a callable to the end of the queue.
If
priorityis set to True, then the callable is added to the front of the queue.When the queue gets to these callables, the callable is returned and nothing is directly yielded as a result.
- property breaker: Event
When this is set, the iteration will exit.
The default implementation will always clear this when iteration is restarted.
- property get_all: Callable[[], AsyncGenerator[T_Item]]
Return an async generator that yields all the values in the queue as they are added.
- is_empty() bool
Return True if there are no items left in the queue
- process_after_yielded(process: Callable[[LimitedQueue[T_Item]], None], /) None
Register a callback to be called whenever a value has been yielded.
The callback will be provided the queue itself and is typed to only accept a limited API for that queue.
- remaining() Iterator[T_Item]
Yield all the remaining values in the queue. This is useful when the queue is stopped but there are still values left.
This will not wait for another value when it runs out of values to yield.
- machinery.helpers.queue(*, ctx: CTX, empty_on_finished: bool = False, name: str = '', item_ensurer: QueueItemDef | None = None) Iterator[Queue] | Iterator[Queue[object]]
Returns an object that can asynchronously yield the values it gets given.
Usage is:
from machinery import helpers as hp ctx: hp.CTX = ... with hp.queue(ctx=ctx_queue) as queue: async def results(): # This will continue forever until ctx is done async for result in queue: print(result) ... queue.append(something) queue.append(another)
Note that the main difference between this and the standard library asyncio.Queue other than a slighly different API surface, is that this one does not have the ability to impose limits.
Queue Manager
- protocol machinery.helpers.protocols.QueueFeeder
This represents one of the two objects returned by
hp.queue_managerand is used to feed items into the streamer it also creates.In the default implementation, sources may return “extendable” values which will be then used themselves as input sources rather than yielded.
These include:
callables that only take on argument
async generators
sync generators
Coroutine objects
asyncio.Task objects
It does not include normal iterable objects like lists.
Classes that implement this protocol must have the following methods / attributes:
- __type_params__ = (T_QueueContext,)
- add_async_generator(agen: AsyncGenerator[object], *, context: T_QueueContext | None = None) None
Add an async generator as a source.
In the default implementation, any “extendable” result will be used as an input source rather than given to the streamer as a value.
In the default implementation, every iteration will be added to the end of the streamer as they happen.
- add_coroutine(coro: Coroutine[object, object, object], *, context: T_QueueContext | None = None) None
Add a coroutine as a source.
In the default implementation, this coroutine is used to create an asyncio.Task object that is passed into
add_task.
- add_sync_function(func: Callable[[], object], *, context: T_QueueContext | None = None) None
Register a synchronous function as an input source.
The value from this function will be provided to the streamer with the provided context.
In the default implementation, an “extendable” result will be used as an input source rather than given to the streamer as a value.
- add_sync_iterator(iterator: Iterable[object] | Iterator[object], *, context: T_QueueContext | None = None) None
Register a synchronous iterator as an input source.
This can either be a normal Generator object or any other iterable object like a list.
In the default implementation, any “extendable” result will be used as an input source rather than given to the streamer as a value.
In the default implementation, every iteration will be added to the end of the streamer as they happen.
- add_task(task: asyncio.Task[object], *, context: T_QueueContext | None = None) None
Add an asyncio.Task as a source.
In the default implementation, if the result is “extendable” then it will be used as an input source rather than given to the streamer as a value
- add_value(value: object, *, context: T_QueueContext | None = None) None
Add a single value to the streamer
In the default implementation, if the value is “extendable” then it will be used as an input source rather than given to the streamer as a value
- set_as_finished_if_out_of_sources() None
Calling this will ensure that the streamer stops waiting for more input after all sources have dried up.
- protocol machinery.helpers.protocols.Streamer
This represents one of the two objects returned by
hp.queue_managerand is used to yield the objects that are fed into it by the feeder.Classes that implement this protocol must have the following methods / attributes:
- __aiter__() AsyncGenerator[T_Item]
Yield the values in the streamer as they come in
- __type_params__ = (T_Item,)
- property breaker: Event
When this event is set, the streamer will finish
- remaining() Iterator[T_Item]
Yield whatever remains in the streamer until no more values are left.
Useful if the iteration is exited early and there are items left.
- machinery.helpers.queue_manager(*, ctx: CTX, make_empty_context: Callable[[], T_QueueContext], name: str = '') AsyncGenerator[tuple[Streamer[QueueManagerResult], QueueFeeder]]
Create and manager a
(streamer, feeder)pair that can be used to manage a stream of values.Usage is as follows:
from machinery import helpers as hp from typing import assert_never ctx: hp.CTX = ... async with hp.queue_manager(ctx=ctx) as (streamer, feeder): feeder.add... feeder.add... feeder.add... feeder.set_as_finished_if_out_of_sources() async for result in streamer: match result: case hp.QueueManagerSuccess(): ... case hp.QueueManagerFailure(): ... case hp.QueueManagerIterationStop(): ... case hp.QueueManagerStopped(): ... case _: assert_never(result)
The feeder can be added to with a number of methods as found on the
hp.protocols.QueueFeederprotocol. These can be used even after theset_as_finished_if_out_of_sourcesmethod has been called and at any time.The
set_as_finished_if_out_of_sourcesmethod says that the streamer will not keep waiting when all it’s sources are exhausted and nothing is left to stream.The
QueueManagerStoppedwill be sent to the queue when thectxis complete and may be received before the streamer is finished yielding values.When values are added to the feeder, a context can be provided that will accompany all results.
To finish a streamer early,
streamer.breaker.set()can be called.Values that come from input sources may be “extended” where they are used as additional sources rather than added as results.
These include:
callables that only take on argument
async generators
sync generators
Coroutine objects
asyncio.Task objects
- enum machinery.helpers.QueueInput(value)
An enum of the different source types provided to the feeder
Valid values are as follows:
- SYNC_FUNCTION = <QueueInput.SYNC_FUNCTION: 'SYNC_FUNCTION'>
- SYNC_GENERATOR = <QueueInput.SYNC_GENERATOR: 'SYNC_GENERATOR'>
- SYNC_ITERATOR = <QueueInput.SYNC_ITERATOR: 'SYNC_ITERATOR'>
- VALUE = <QueueInput.VALUE: 'VALUE'>
- COROUTINE = <QueueInput.COROUTINE: 'COROUTINE'>
- TASK = <QueueInput.TASK: 'TASK'>
- ASYNC_GENERATOR = <QueueInput.ASYNC_GENERATOR: 'ASYNC_GENERATOR'>
- class machinery.helpers.QueueManagerSuccess(*, sources: Sequence[tuple[QueueInput, object]], value: object, context: T_QueueContext)
These are yielded when we have a value being successfully provided by an input source.
The sources list will be a list of tuples of the source type and the object used as that source.
The context will be the object provided by the first source that led to this value
- class machinery.helpers.QueueManagerFailure(*, sources: Sequence[tuple[QueueInput, object]], exception: BaseException, context: T_QueueContext)
These are yielded when we have an input source raising an exception.
The sources list will be a list of tuples of the source type and the object used as that source.
The context will be the object provided by the first source that led to this value
- class machinery.helpers.QueueManagerIterationStop(*, sources: Sequence[tuple[QueueInput, object]], exception: BaseException | None, context: T_QueueContext)
These are yielded when we have an iteration (either sync or async) reaching it’s end.
The sources list will be a list of tuples of the source type and the object used as that source.
The context will be the object provided by the first source that led to this value
- class machinery.helpers.QueueManagerStopped(*, exception: BaseException | None = None)
This is yielded when the queue itself has stopped. If it reached a natural end then the exception will be None.
Also provided is the type alias for the union of types that the streamer may
produce as machinery.helpers.QueueManagerResult:
QueueManagerResult
type QueueManagerResult[T_QueueContext] = (
QueueManagerSuccess[T_QueueContext]
| QueueManagerFailure[T_QueueContext]
| QueueManagerIterationStop[T_QueueContext]
| QueueManagerStopped
)
Odd helpers
There are few standalone helpers for some odd functionality:
- machinery.helpers.ensure_aexit(instance: AbstractAsyncContextManager[object]) AbstractAsyncContextManager[None]
Used to make sure a manual async context manager calls
__aexit__if__aenter__fails.Turns out if
__aenter__raises an exception, then__aexit__doesn’t get called. This is a helper to make it easy to ensure that does happen.Usage is as follows:
import types from machinery import helpers as hp class MyCM: async def __aenter__(self) -> None: async with hp.ensure_aexit(self): return await self.start() async def start(self) -> ...: ... async def __aexit__( self, exc_typ: type[BaseException] | None, value: BaseException | None, tb: types.TracebackType | None, ) -> None: await self.finish(exc_typ, value, tb) async def finish( self, exc_typ: type[BaseException] | None = None, value: BaseException | None = None, tb: types.TracebackType | None = None, ) -> None: ...
- async machinery.helpers.stop_async_generator(gen: AsyncGenerator[object, T_Send | None], provide: T_Send | None = None, exc: BaseException | None = None) None
This will ensure an async generator has stopped.
It will first throw an exception into the generator using
gen.athrow. This is either anasyncio.CancelledErroror theexcthat is provided if that is notNone.Then we will send either
providewithgen.asend.Finally, regardless of exceptions,
gen.aclosewill be called.
- machinery.helpers.noncancelled_results_from_futs(futs: Sequence[FutureStatus]) tuple[BaseException | BaseExceptionGroup | None, Sequence]
Get back (exception, results) from a list of futures
- exception
A single exception if all the errors are the same type or if there is only one exception
otherwise it is None
- results
A list of the results that exist
- machinery.helpers.find_and_apply_result(final: Future, futs: Sequence[Future]) bool
Find a result in
futswith a result or exception and set that result/exception on bothfinal, and all the settable futures infuts.As a bonus, if
finalis set, then we set that result/exception on all futures infuts.and if
finalis cancelled, we cancel all the futures infutsReturn True if we’ve changed
final
And some standalone protocols for some concepts:
- protocol machinery.helpers.protocols.FutureStatus
Represents a read-only view of a Future
Classes that implement this protocol must have the following methods / attributes:
- __type_params__ = (T_Ret,)
- cancelled() bool
- done() bool
- exception() BaseException | None
- result() T_Ret
- protocol machinery.helpers.protocols.FutureCallback
Represents an object that’s used as a done_callback for a future.
It can be hashed and it is a callable object that takes in the status of a completed future.
Classes that implement this protocol must have the following methods / attributes:
- __call__(res: FutureStatus[T_Ret], /) None
Call self as a function.
- __hash__() int
Return hash(self).
- __type_params__ = (T_Ret,)
- protocol machinery.helpers.protocols.FutureCTXCallback
Represents an object that’s used as a done_callback for a future but also takes in a CTX object.
Classes that implement this protocol must have the following methods / attributes:
- __call__(ctx: CTX, res: FutureStatus[T_Ret], /) None
Call self as a function.
- __hash__() int
Return hash(self).
- __type_params__ = (T_Ret,)
- protocol machinery.helpers.protocols.WaitByCallback
Represents an object that supports adding and removing callbacks for when the object is done.
Classes that implement this protocol must have the following methods / attributes:
- __type_params__ = (T_Ret,)
- add_done_callback(cb: Callable[[FutureStatus[T_Ret]], None]) None | FutureCallback[T_Ret]
- cancel() bool
- done() bool
- remove_done_callback(cb: Callable[[FutureStatus[T_Ret]], None]) int