Source code for Muscat.Helpers.Future

import threading
from typing import Any
import numpy as np


[docs]class Future: """ A class used to parallelize a task Instead of doing: res = method(arg1, arg2) just use: future = Future(method, arg1, arg2) [...] res = future.get() note: this is useless if you are using pure python as it is sequential """ args_: tuple[Any] originalArgs_: list[Any] wasWritable_: list[bool] res_: Any
[docs] def run_(self, task): self.res_ = task(*(self.args_))
def __init__(self, task, *args): # original args, untouched self.originalArgs_ = args # actual args given to task, can modify self.args_ = list(args) # this contains the original value of writable in the np arrays self.wasWritable_ = [None] * len(args) for i in range(len(args)): if isinstance(args[i], np.ndarray): self.wasWritable_[i] = self.args_[i].flags.writeable self.args_[i] = np.array(self.args_[i], copy=True) self.originalArgs_[i].flags.writeable = False # create the task and launch it self.thread_ = threading.Thread(target=self.run_, args=(task,)) self.thread_.start()
[docs] def available(self): """returns if the data is available to use (not blocking)""" return not self.thread_.is_alive()
[docs] def get(self): """returns the data returned by task (blocking)""" # wait for the task to finish self.thread_.join() for i in range(len(self.args_)): if type(self.args_[i]) is np.ndarray: self.originalArgs_[i].flags.writeable = self.wasWritable_[i] return self.res_
[docs]def CheckFutureSequence(): # Global Future test import time global asyncStr def wait(val): global asyncStr time.sleep(0.1) asyncStr = val return 1 future = Future(wait, "FUTURE") asyncStr = "NOW" assert not future.available() assert future.get() == 1 print(asyncStr) assert asyncStr == "FUTURE" assert future.available()
[docs]def CheckFutureNumpy(): # Numpy specific test import time mat = np.array([1, 2, 3]) def modifyMat(mat: np.ndarray): time.sleep(0.1) mat[0] = 0 return 1 future = Future(modifyMat, mat) assert mat.flags.writeable is False assert future.get() == 1 assert mat.flags.writeable is True
[docs]def CheckIntegrity(GUI: bool = False) -> str: CheckFutureSequence() CheckFutureNumpy() return "OK"
if __name__ == "__main__": print(CheckIntegrity(True)) # pragma: no cover if __name__ == "__main__": print(CheckIntegrity()) # pragma: no cover