Source code for Muscat.Helpers.CPU

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#

import os
from typing import Union

from Muscat.Types import MuscatIndex
import Muscat.Helpers.ParserHelper as PH


[docs]def SetNumberOfThreadsPerInstance(nbThreads: MuscatIndex) -> None: """ Function to enforce the number of threads for various multithreaded libraries. This function should be called before importing these multithreaded libraries. """ os.environ["OMP_NUM_THREADS"] = str(nbThreads) os.environ["OPENBLAS_NUM_THREADS"] = str(nbThreads) os.environ["MKL_NUM_THREADS"] = str(nbThreads) os.environ["VECLIB_MAXIMUM_THREADS"] = str(nbThreads) os.environ["NUMEXPR_NUM_THREADS"] = str(nbThreads)
[docs]def GetNumberOfAvailableCores() -> MuscatIndex: """ Function to discover the number of computing cores available. the value of the number of core available is stored in the environment variable 'SLURM_JOB_CPU_PER_NODE' """ SLURM_JOB_CPU_PER_NODE = os.environ.get("SLURM_JOB_CPU_PER_NODE") if SLURM_JOB_CPU_PER_NODE is None: import subprocess import platform try: # only posix coverage out = subprocess.check_output(["/usr/bin/squeue", "-h", "-t", "r", "-u", str(os.environ.get("USER")), "-w", platform.node(), "-o", "'%C'"], shell=False).decode("utf-8", "ignore") out = out.strip().strip("'") res = PH.ReadInt(out) os.environ["SLURM_JOB_CPU_PER_NODE"] = str(res) return res except: try: import psutil res = psutil.cpu_count(logical=False) except: # pragma: no coverage import multiprocessing res = multiprocessing.cpu_count() os.environ["SLURM_JOB_CPU_PER_NODE"] = str(res) return res return PH.ReadInt(SLURM_JOB_CPU_PER_NODE)
[docs]class ComputingCores: """Class to help doing multithreading without using to many cpu""" coresAvailable = GetNumberOfAvailableCores() def __init__(self, nbCoresRequested: Union[int, MuscatIndex] = -1, WithError: bool = True) -> None: self.nbCoresRequested: MuscatIndex = nbCoresRequested self.nbCoresAllocated: MuscatIndex = 0 self.withError: bool = WithError def __enter__(self): if self.nbCoresRequested == -1: if ComputingCores.coresAvailable == 0: if self.withError: raise Exception("No more cores available") # pragma: no cover else: self.nbCoresAllocated = 0 else: self.nbCoresAllocated = self.coresAvailable ComputingCores.coresAvailable = 0 return self if self.nbCoresRequested <= ComputingCores.coresAvailable: self.nbCoresAllocated = self.nbCoresRequested ComputingCores.coresAvailable -= self.nbCoresRequested else: if ComputingCores.coresAvailable == 0: if self.withError: raise Exception("No more cores available") # pragma: no cover else: self.nbCoresAllocated = 0 else: self.nbCoresAllocated = self.coresAvailable ComputingCores.coresAvailable = 0 return self def __exit__(self, type, value, traceback): ComputingCores.coresAvailable += self.nbCoresAllocated self.nbCoresAllocated = 0
[docs]def CheckIntegrity(GUI: bool = False) -> str: nbCores = GetNumberOfAvailableCores() print("GetNumberOfAvailableCores", GetNumberOfAvailableCores()) SetNumberOfThreadsPerInstance(GetNumberOfAvailableCores()) with ComputingCores(nbCores + 1) as cpu: pass if nbCores < 2: # pragma: no cover print("We have only one Core, cant test everything") with ComputingCores(1) as cpu: print("CPU ask 2 Allocated :" + str(cpu.nbCoresAllocated)) with ComputingCores(WithError=False) as cpuII: print("CPU ask max Allocated :" + str(cpuII.nbCoresAllocated)) pass return "ok" else: with ComputingCores(2) as cpu: print("CPU ask 2 Allocated :" + str(cpu.nbCoresAllocated)) print("CPU available " + str(ComputingCores.coresAvailable)) if nbCores < 3: # pragma: no cover print("We have only two Core, cant test everything") return "ok" with ComputingCores() as cpu2: print("cpu2 ask max available, Allocated : " + str(cpu2.nbCoresAllocated)) print("CPU2 available " + str(ComputingCores.coresAvailable)) with ComputingCores(nbCoresRequested=7, WithError=False) as cpu3: print("cpu3 ask 7 Allocated " + str(cpu3.nbCoresAllocated)) print("CPU3 available " + str(ComputingCores.coresAvailable)) with ComputingCores(WithError=False) as cpu4: pass print("CPU available " + str(ComputingCores.coresAvailable)) return "ok"
if __name__ == "__main__": # pragma: no cover print(CheckIntegrity())