Source code for Muscat.Helpers.CPU
# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
import os
from typing import Union
from Muscat.Types import MuscatIndex
import Muscat.Helpers.ParserHelper as PH
[docs]
def SetNumberOfThreadsPerInstance(nbThreads: MuscatIndex) -> None:
"""
Function to enforce the number of threads for various multithreaded libraries.
This function should be called before importing these multithreaded libraries.
"""
os.environ["OMP_NUM_THREADS"] = str(nbThreads)
os.environ["OPENBLAS_NUM_THREADS"] = str(nbThreads)
os.environ["MKL_NUM_THREADS"] = str(nbThreads)
os.environ["VECLIB_MAXIMUM_THREADS"] = str(nbThreads)
os.environ["NUMEXPR_NUM_THREADS"] = str(nbThreads)
[docs]
def GetNumberOfAvailableCores() -> MuscatIndex:
"""
Function to discover the number of computing cores available.
the value of the number of core available is stored in the environment variable
'SLURM_JOB_CPU_PER_NODE'
"""
SLURM_JOB_CPU_PER_NODE = os.environ.get("SLURM_JOB_CPU_PER_NODE")
if SLURM_JOB_CPU_PER_NODE is None:
import subprocess
import platform
try: # only posix coverage
out = subprocess.check_output(["/usr/bin/squeue", "-h", "-t", "r", "-u", str(os.environ.get("USER")), "-w", platform.node(), "-o", "'%C'"], shell=False).decode("utf-8", "ignore")
out = out.strip().strip("'")
res = PH.ReadInt(out)
os.environ["SLURM_JOB_CPU_PER_NODE"] = str(res)
return res
except:
try:
import psutil
res = psutil.cpu_count(logical=False)
except: # pragma: no coverage
import multiprocessing
res = multiprocessing.cpu_count()
os.environ["SLURM_JOB_CPU_PER_NODE"] = str(res)
return res
return PH.ReadInt(SLURM_JOB_CPU_PER_NODE)
[docs]
class ComputingCores:
"""Class to help doing multithreading without using to many cpu"""
coresAvailable = GetNumberOfAvailableCores()
def __init__(self, nbCoresRequested: Union[int, MuscatIndex] = -1, WithError: bool = True) -> None:
self.nbCoresRequested: MuscatIndex = nbCoresRequested
self.nbCoresAllocated: MuscatIndex = 0
self.withError: bool = WithError
def __enter__(self):
if self.nbCoresRequested == -1:
if ComputingCores.coresAvailable == 0:
if self.withError:
raise Exception("No more cores available") # pragma: no cover
else:
self.nbCoresAllocated = 0
else:
self.nbCoresAllocated = self.coresAvailable
ComputingCores.coresAvailable = 0
return self
if self.nbCoresRequested <= ComputingCores.coresAvailable:
self.nbCoresAllocated = self.nbCoresRequested
ComputingCores.coresAvailable -= self.nbCoresRequested
else:
if ComputingCores.coresAvailable == 0:
if self.withError:
raise Exception("No more cores available") # pragma: no cover
else:
self.nbCoresAllocated = 0
else:
self.nbCoresAllocated = self.coresAvailable
ComputingCores.coresAvailable = 0
return self
def __exit__(self, type, value, traceback):
ComputingCores.coresAvailable += self.nbCoresAllocated
self.nbCoresAllocated = 0
[docs]
def CheckIntegrity(GUI: bool = False) -> str:
nbCores = GetNumberOfAvailableCores()
print("GetNumberOfAvailableCores", GetNumberOfAvailableCores())
SetNumberOfThreadsPerInstance(GetNumberOfAvailableCores())
with ComputingCores(nbCores + 1) as cpu:
pass
if nbCores < 2: # pragma: no cover
print("We have only one Core, cant test everything")
with ComputingCores(1) as cpu:
print("CPU ask 2 Allocated :" + str(cpu.nbCoresAllocated))
with ComputingCores(WithError=False) as cpuII:
print("CPU ask max Allocated :" + str(cpuII.nbCoresAllocated))
pass
return "ok"
else:
with ComputingCores(2) as cpu:
print("CPU ask 2 Allocated :" + str(cpu.nbCoresAllocated))
print("CPU available " + str(ComputingCores.coresAvailable))
if nbCores < 3: # pragma: no cover
print("We have only two Core, cant test everything")
return "ok"
with ComputingCores() as cpu2:
print("cpu2 ask max available, Allocated : " + str(cpu2.nbCoresAllocated))
print("CPU2 available " + str(ComputingCores.coresAvailable))
with ComputingCores(nbCoresRequested=7, WithError=False) as cpu3:
print("cpu3 ask 7 Allocated " + str(cpu3.nbCoresAllocated))
print("CPU3 available " + str(ComputingCores.coresAvailable))
with ComputingCores(WithError=False) as cpu4:
pass
print("CPU available " + str(ComputingCores.coresAvailable))
return "ok"
if __name__ == "__main__": # pragma: no cover
print(CheckIntegrity())