# -*- coding: utf-8 -*-## This file is subject to the terms and conditions defined in# file 'LICENSE.txt', which is part of this source code package.#importosfromtypingimportUnionfromMuscat.TypesimportMuscatIndeximportMuscat.Helpers.ParserHelperasPH
[docs]defSetNumberOfThreadsPerInstance(nbThreads:MuscatIndex)->None:""" Function to enforce the number of threads for various multithreaded libraries. This function should be called before importing these multithreaded libraries. """os.environ["OMP_NUM_THREADS"]=str(nbThreads)os.environ["OPENBLAS_NUM_THREADS"]=str(nbThreads)os.environ["MKL_NUM_THREADS"]=str(nbThreads)os.environ["VECLIB_MAXIMUM_THREADS"]=str(nbThreads)os.environ["NUMEXPR_NUM_THREADS"]=str(nbThreads)
[docs]defGetNumberOfAvailableCores()->MuscatIndex:""" Function to discover the number of computing cores available. the value of the number of core available is stored in the environment variable 'SLURM_JOB_CPU_PER_NODE' """SLURM_JOB_CPU_PER_NODE=os.environ.get("SLURM_JOB_CPU_PER_NODE")ifSLURM_JOB_CPU_PER_NODEisNone:importsubprocessimportplatformtry:# only posix coverageout=subprocess.check_output(["/usr/bin/squeue","-h","-t","r","-u",str(os.environ.get("USER")),"-w",platform.node(),"-o","'%C'"],shell=False).decode("utf-8","ignore")out=out.strip().strip("'")res=PH.ReadInt(out)os.environ["SLURM_JOB_CPU_PER_NODE"]=str(res)returnresexcept:try:importpsutilres=psutil.cpu_count(logical=False)except:# pragma: no coverageimportmultiprocessingres=multiprocessing.cpu_count()os.environ["SLURM_JOB_CPU_PER_NODE"]=str(res)returnresreturnPH.ReadInt(SLURM_JOB_CPU_PER_NODE)
[docs]classComputingCores:"""Class to help doing multithreading without using to many cpu"""coresAvailable=GetNumberOfAvailableCores()def__init__(self,nbCoresRequested:Union[int,MuscatIndex]=-1,WithError:bool=True)->None:self.nbCoresRequested:MuscatIndex=nbCoresRequestedself.nbCoresAllocated:MuscatIndex=0self.withError:bool=WithErrordef__enter__(self):ifself.nbCoresRequested==-1:ifComputingCores.coresAvailable==0:ifself.withError:raiseException("No more cores available")# pragma: no coverelse:self.nbCoresAllocated=0else:self.nbCoresAllocated=self.coresAvailableComputingCores.coresAvailable=0returnselfifself.nbCoresRequested<=ComputingCores.coresAvailable:self.nbCoresAllocated=self.nbCoresRequestedComputingCores.coresAvailable-=self.nbCoresRequestedelse:ifComputingCores.coresAvailable==0:ifself.withError:raiseException("No more cores available")# pragma: no coverelse:self.nbCoresAllocated=0else:self.nbCoresAllocated=self.coresAvailableComputingCores.coresAvailable=0returnselfdef__exit__(self,type,value,traceback):ComputingCores.coresAvailable+=self.nbCoresAllocatedself.nbCoresAllocated=0
[docs]defCheckIntegrity(GUI:bool=False)->str:nbCores=GetNumberOfAvailableCores()print("GetNumberOfAvailableCores",GetNumberOfAvailableCores())SetNumberOfThreadsPerInstance(GetNumberOfAvailableCores())withComputingCores(nbCores+1)ascpu:passifnbCores<2:# pragma: no coverprint("We have only one Core, cant test everything")withComputingCores(1)ascpu:print("CPU ask 2 Allocated :"+str(cpu.nbCoresAllocated))withComputingCores(WithError=False)ascpuII:print("CPU ask max Allocated :"+str(cpuII.nbCoresAllocated))passreturn"ok"else:withComputingCores(2)ascpu:print("CPU ask 2 Allocated :"+str(cpu.nbCoresAllocated))print("CPU available "+str(ComputingCores.coresAvailable))ifnbCores<3:# pragma: no coverprint("We have only two Core, cant test everything")return"ok"withComputingCores()ascpu2:print("cpu2 ask max available, Allocated : "+str(cpu2.nbCoresAllocated))print("CPU2 available "+str(ComputingCores.coresAvailable))withComputingCores(nbCoresRequested=7,WithError=False)ascpu3:print("cpu3 ask 7 Allocated "+str(cpu3.nbCoresAllocated))print("CPU3 available "+str(ComputingCores.coresAvailable))withComputingCores(WithError=False)ascpu4:passprint("CPU available "+str(ComputingCores.coresAvailable))return"ok"
if__name__=="__main__":# pragma: no coverprint(CheckIntegrity())