Source code for Muscat.IO.CodeInterface

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#

"""Class to help the execution of an external program
"""

import subprocess
import os
import platform

from Muscat.Helpers.Logger import Info

[docs]class Interface(): def __init__(self, workingDirectory=os.getcwd()): super().__init__() # Working Folder """Working folder must contain: a folder 'self.tpl_directory' containing : 'self.tpl_filename' """ self.SetWorkingDirectory(workingDirectory) # Model parameters self.parameters = {} self.ptab = {} self.bc = {} # Template self.tplFilename = 'template.tpl' try: self.tpl = self.ReadFile(self.workingDirectory + os.sep + self.tplFilename) except IOError: # pragma: no cover self.tpl = "" # Temporary files folder creation self.processDirectory = self.workingDirectory + os.sep # Output file name self.inputFilename = 'calcul' self.inputFileExtension = '.inp' self.acceptedReturnCodes = [0] # Code command self.codeCommand = 'Zrun' self.options = [] self.lastCommandExecuted = None self.openExternalWindows = False self.keepExternalWindows = False self.withFilename = True
[docs] def WriteFile(self, idProc): # Write code input file try: def expand_vars(string, vars): while True: expanded = string.format(**vars) if expanded == string: break string = expanded return string inpString = expand_vars(self.tpl.replace("{{", "OPEN_DOUBLE_CURLY_BRACE").replace("}}", "CLOSE_DOUBLE_CURLY_BRACE"), self.parameters) inpString = inpString.replace("OPEN_DOUBLE_CURLY_BRACE", "{").replace("CLOSE_DOUBLE_CURLY_BRACE", "}") except KeyError as e: # pragma: no cover print("The user must supply the key: %s" % str(e)) raise inpFilename = self.inputFilename + str(idProc) + self.inputFileExtension with open(self.processDirectory + os.sep + inpFilename, 'w') as inpFile: inpFile.write(inpString)
[docs] def SetOptions(self, opts): self.options = opts
[docs] def GenerateCommandToRun(self, idProc=0): # Command to execute cmd = [] cmd.append(self.codeCommand) for opt in self.options: cmd.append(opt) if self.withFilename: inpFilename = self.inputFilename + str(idProc) + self.inputFileExtension cmd.append(inpFilename) for i in range(len(cmd)): cmd[i] = cmd[i].format(**self.parameters) return cmd
[docs] def SingleRunComputation(self, idProc, stdout=None): cmd = self.GenerateCommandToRun(idProc) if stdout is None: out = open(os.devnull, 'w') else: out = stdout # Commande execution shell = False if platform.system() == "Windows": shell = True if self.openExternalWindows: if platform.system() == "Windows": if self.keepExternalWindows: cmd.insert(0, "/K") else: cmd.insert(0, "/C") cmd.insert(0, "cmd") cmd.insert(0, "/wait") cmd.insert(0, "start") else: if self.keepExternalWindows: cmd = [" ".join(cmd)+" ; bash "] cmd.insert(0, "-e") cmd.insert(0, "/usr/bin/xterm") self.lastCommandExecuted = cmd print(cmd) proc = subprocess.Popen(cmd, cwd=self.processDirectory, stdout=out, shell=shell) return proc
[docs] def SingleRunComputationAndReturnOutput(self, idProc=0): cmd = self.GenerateCommandToRun(idProc) # Commande execution self.lastCommandExecuted = cmd Info(self.processDirectory) Info(cmd) shell = False if platform.system() == "Windows": shell = True from subprocess import Popen, PIPE p = Popen(cmd, cwd=self.processDirectory,shell=shell, stdin=PIPE, stdout=PIPE, stderr=PIPE, bufsize=-1) stdout, stderr = p.communicate() if p.returncode not in self.acceptedReturnCodes: raise subprocess.CalledProcessError(p.returncode,cmd=cmd,output=stdout.decode() ) #try: # out = subprocess.check_output(cmd, cwd=self.processDirectory, shell=shell).decode("utf-8", "ignore") #except subprocess.CalledProcessError as grepexc: # print(f"error code ({grepexc.returncode}): output {grepexc.output.decode()}" ) # raise return stdout.decode()
[docs] def ReadFile(self, filenameDir): # Template file read with open(filenameDir, 'r') as File: string = File.read() return string
[docs] def SetWorkingDirectory(self, Dir): self.workingDirectory = os.path.dirname(Dir) if len(self.workingDirectory) and self.workingDirectory[-1] != os.sep: self.workingDirectory += os.sep
[docs] def SetProcessDirectory(self, Dir): self.processDirectory = os.path.dirname(Dir) if len(self.processDirectory) and self.processDirectory[-1] != os.sep: self.processDirectory += os.sep
[docs] def SetCodeCommand(self, ccommand): # Code command self.codeCommand = ccommand
[docs] def SetTemplateFile(self, filename): self.tplFilename = filename
[docs] def ReadTemplateFile(self, filename=None): if filename is not None: self.SetTemplateFile(filename) self.tpl = open(self.workingDirectory + os.sep + self.tplFilename).read()
[docs] def CopyFile(self, filetocopy): import shutil shutil.copy(self.workingDirectory + os.sep + filetocopy, self.processDirectory + os.sep + filetocopy.split('/')[-1])
[docs]def CheckIntegrity(GUI: bool = False): from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory import Muscat.TestData as MuscatTestData interface = Interface(MuscatTestData.GetTestDataPath()) interface.keepExternalWindows = GUI interface.parameters['calcul'] = 'thermal_transient' interface.parameters['Ti'] = 1000.0 interface.parameters['mesh'] = 'Cube_3D.geof' interface.parameters['solver'] = 'mumps' interface.parameters['python_script'] = 'reduction' interface.parameters['sequence'] = 1 interface.parameters['time'] = 2000. interface.parameters['increment'] = 20 interface.parameters['iteration'] = 1000 interface.parameters['ratio'] = 0.001 interface.parameters['algorithm'] = 'p1p2p3' interface.parameters['bc'] = 'myBC' interface.parameters['table'] = 'myTable' interface.parameters['conductivity'] = '280.+100.*atan(0.01*(1100-temperature));' interface.parameters['coefficient'] = '8.*(430.+40.*atan(0.01*(temperature-500.)))*(1.5-0.5*exp(-200./((temperature-1200.)*(temperature-1200.))));' import sys interface.SetProcessDirectory(TemporaryDirectory.GetTempPath()) interface.SetCodeCommand("echo") interface.ReadTemplateFile('template.tpl') interface.WriteFile(1) interface.openExternalWindows = GUI interface.keepExternalWindows = GUI interface.SingleRunComputation(1, sys.stdout).wait() print("lastCommandExecuted: " + str(interface.lastCommandExecuted)) interface.SetCodeCommand("echo") interface.SetOptions(["{filter}"]) interface.parameters['filter'] = 'calcul1.inp' interface.withFilename = False print("output is :" + str(interface.SingleRunComputationAndReturnOutput(1).encode("ascii", "ignore"))) print("lastCommandExecuted: " + str(interface.lastCommandExecuted)) return 'ok'
if __name__ == '__main__': print(CheckIntegrity(True)) # pragma: no cover