Source code for Muscat.Helpers.Future
import threading
from typing import Any
import numpy as np
[docs]
class Future:
"""
A class used to parallelize a task
Instead of doing:
res = method(arg1, arg2)
just use:
future = Future(method, arg1, arg2)
[...]
res = future.get()
note: this is useless if you are using pure python as it is sequential
"""
args_: tuple[Any]
originalArgs_: list[Any]
wasWritable_: list[bool]
res_: Any
[docs]
def run_(self, task):
self.res_ = task(*(self.args_))
def __init__(self, task, *args):
# original args, untouched
self.originalArgs_ = args
# actual args given to task, can modify
self.args_ = list(args)
# this contains the original value of writable in the np arrays
self.wasWritable_ = [None] * len(args)
for i in range(len(args)):
if isinstance(args[i], np.ndarray):
self.wasWritable_[i] = self.args_[i].flags.writeable
self.args_[i] = np.array(self.args_[i], copy=True)
self.originalArgs_[i].flags.writeable = False
# create the task and launch it
self.thread_ = threading.Thread(target=self.run_, args=(task,))
self.thread_.start()
[docs]
def available(self):
"""returns if the data is available to use (not blocking)"""
return not self.thread_.is_alive()
[docs]
def get(self):
"""returns the data returned by task (blocking)"""
# wait for the task to finish
self.thread_.join()
for i in range(len(self.args_)):
if type(self.args_[i]) is np.ndarray:
self.originalArgs_[i].flags.writeable = self.wasWritable_[i]
return self.res_
[docs]
def CheckFutureSequence():
# Global Future test
import time
global asyncStr
global locked
locked = True
def wait(val):
global asyncStr
global locked
while locked:
pass
asyncStr = val
return 1
future = Future(wait, "FUTURE")
asyncStr = "NOW"
assert not future.available()
locked = False
assert future.get() == 1
print(asyncStr)
assert asyncStr == "FUTURE"
assert future.available()
[docs]
def CheckFutureNumpy():
# Numpy specific test
import time
mat = np.array([1, 2, 3])
def modifyMat(mat: np.ndarray):
time.sleep(0.1)
mat[0] = 0
return 1
future = Future(modifyMat, mat)
assert mat.flags.writeable is False
assert future.get() == 1
assert mat.flags.writeable is True
[docs]
def CheckIntegrity(GUI: bool = False) -> str:
CheckFutureSequence()
CheckFutureNumpy()
return "OK"
if __name__ == "__main__":
print(CheckIntegrity(True)) # pragma: no cover
if __name__ == "__main__":
print(CheckIntegrity()) # pragma: no cover