""" Utility file for server side asynchronous task running and config objects """ import asyncio import contextlib from functools import partial from threading import Lock from typing import ContextManager, Generic, TypeVar def run_in_executor(func, *args, executor=None, **kwargs): """ Run the function in an executor, unblocking the main loop :param func: Function to be run in executor :param args: function parameters :param executor: executor instance [Thread | Process] :param kwargs: Additional parameters :return: Future of function result upon completion """ callback = partial(func, *args, **kwargs) loop = asyncio.get_event_loop() return loop.run_in_executor(executor, callback) # Genetic type template T = TypeVar("T") class Mutex(Generic[T]): """ Mutex class to implement lock/release of a shared protected variable """ def __init__(self, value: T): """ Create an instance of Mutex wrapper for the given resource :param value: Shared resources to be thread protected """ self.__value = value self.__lock = Lock() @contextlib.contextmanager def lock(self) -> ContextManager[T]: """ Lock the resource with a mutex to be used within a context block The lock is automatically released on context exit :return: Shared resource """ self.__lock.acquire() try: yield self.__value finally: self.__lock.release()