[docs]classFuture:""" A class used to parallelize a task Instead of doing: res = method(arg1, arg2) just use: future = Future(method, arg1, arg2) [...] res = future.get() note: this is useless if you are using pure python as it is sequential """args_:tuple[Any]originalArgs_:list[Any]wasWritable_:list[bool]res_:Any
def__init__(self,task,*args):# original args, untouchedself.originalArgs_=args# actual args given to task, can modifyself.args_=list(args)# this contains the original value of writable in the np arraysself.wasWritable_=[None]*len(args)foriinrange(len(args)):ifisinstance(args[i],np.ndarray):self.wasWritable_[i]=self.args_[i].flags.writeableself.args_[i]=np.array(self.args_[i],copy=True)self.originalArgs_[i].flags.writeable=False# create the task and launch itself.thread_=threading.Thread(target=self.run_,args=(task,))self.thread_.start()
[docs]defavailable(self):"""returns if the data is available to use (not blocking)"""returnnotself.thread_.is_alive()
[docs]defget(self):"""returns the data returned by task (blocking)"""# wait for the task to finishself.thread_.join()foriinrange(len(self.args_)):iftype(self.args_[i])isnp.ndarray:self.originalArgs_[i].flags.writeable=self.wasWritable_[i]returnself.res_
[docs]defCheckFutureSequence():# Global Future testimporttimeglobalasyncStrgloballockedlocked=Truedefwait(val):globalasyncStrgloballockedwhilelocked:passasyncStr=valreturn1future=Future(wait,"FUTURE")asyncStr="NOW"assertnotfuture.available()locked=Falseassertfuture.get()==1print(asyncStr)assertasyncStr=="FUTURE"assertfuture.available()
[docs]defCheckFutureNumpy():# Numpy specific testimporttimemat=np.array([1,2,3])defmodifyMat(mat:np.ndarray):time.sleep(0.1)mat[0]=0return1future=Future(modifyMat,mat)assertmat.flags.writeableisFalseassertfuture.get()==1assertmat.flags.writeableisTrue