# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
"""Class to help the execution of an external program
"""
import subprocess
import os
import platform
from Muscat.Helpers.Logger import Info
[docs]class Interface():
def __init__(self, workingDirectory=os.getcwd()):
super().__init__()
# Working Folder
"""Working folder must contain:
a folder 'self.tpl_directory' containing :
'self.tpl_filename'
"""
self.SetWorkingDirectory(workingDirectory)
# Model parameters
self.parameters = {}
self.ptab = {}
self.bc = {}
# Template
self.tplFilename = 'template.tpl'
try:
self.tpl = self.ReadFile(self.workingDirectory + os.sep + self.tplFilename)
except IOError: # pragma: no cover
self.tpl = ""
# Temporary files folder creation
self.processDirectory = self.workingDirectory + os.sep
# Output file name
self.inputFilename = 'calcul'
self.inputFileExtension = '.inp'
self.acceptedReturnCodes = [0]
# Code command
self.codeCommand = 'Zrun'
self.options = []
self.lastCommandExecuted = None
self.openExternalWindows = False
self.keepExternalWindows = False
self.withFilename = True
[docs] def WriteFile(self, idProc):
# Write code input file
try:
def expand_vars(string, vars):
while True:
expanded = string.format(**vars)
if expanded == string:
break
string = expanded
return string
inpString = expand_vars(self.tpl.replace("{{", "OPEN_DOUBLE_CURLY_BRACE").replace("}}", "CLOSE_DOUBLE_CURLY_BRACE"), self.parameters)
inpString = inpString.replace("OPEN_DOUBLE_CURLY_BRACE", "{").replace("CLOSE_DOUBLE_CURLY_BRACE", "}")
except KeyError as e: # pragma: no cover
print("The user must supply the key: %s" % str(e))
raise
inpFilename = self.inputFilename + str(idProc) + self.inputFileExtension
with open(self.processDirectory + os.sep + inpFilename, 'w') as inpFile:
inpFile.write(inpString)
[docs] def SetOptions(self, opts):
self.options = opts
[docs] def GenerateCommandToRun(self, idProc=0):
# Command to execute
cmd = []
cmd.append(self.codeCommand)
for opt in self.options:
cmd.append(opt)
if self.withFilename:
inpFilename = self.inputFilename + str(idProc) + self.inputFileExtension
cmd.append(inpFilename)
for i in range(len(cmd)):
cmd[i] = cmd[i].format(**self.parameters)
return cmd
[docs] def SingleRunComputation(self, idProc, stdout=None):
cmd = self.GenerateCommandToRun(idProc)
if stdout is None:
out = open(os.devnull, 'w')
else:
out = stdout
# Commande execution
shell = False
if platform.system() == "Windows":
shell = True
if self.openExternalWindows:
if platform.system() == "Windows":
if self.keepExternalWindows:
cmd.insert(0, "/K")
else:
cmd.insert(0, "/C")
cmd.insert(0, "cmd")
cmd.insert(0, "/wait")
cmd.insert(0, "start")
else:
if self.keepExternalWindows:
cmd = [" ".join(cmd)+" ; bash "]
cmd.insert(0, "-e")
cmd.insert(0, "/usr/bin/xterm")
self.lastCommandExecuted = cmd
print(cmd)
proc = subprocess.Popen(cmd, cwd=self.processDirectory, stdout=out, shell=shell)
return proc
[docs] def SingleRunComputationAndReturnOutput(self, idProc=0):
cmd = self.GenerateCommandToRun(idProc)
# Commande execution
self.lastCommandExecuted = cmd
Info(self.processDirectory)
Info(cmd)
shell = False
if platform.system() == "Windows":
shell = True
from subprocess import Popen, PIPE
p = Popen(cmd, cwd=self.processDirectory,shell=shell, stdin=PIPE, stdout=PIPE, stderr=PIPE, bufsize=-1)
stdout, stderr = p.communicate()
if p.returncode not in self.acceptedReturnCodes:
raise subprocess.CalledProcessError(p.returncode,cmd=cmd,output=stdout.decode() )
#try:
# out = subprocess.check_output(cmd, cwd=self.processDirectory, shell=shell).decode("utf-8", "ignore")
#except subprocess.CalledProcessError as grepexc:
# print(f"error code ({grepexc.returncode}): output {grepexc.output.decode()}" )
# raise
return stdout.decode()
[docs] def ReadFile(self, filenameDir):
# Template file read
with open(filenameDir, 'r') as File:
string = File.read()
return string
[docs] def SetWorkingDirectory(self, Dir):
self.workingDirectory = os.path.dirname(Dir)
if len(self.workingDirectory) and self.workingDirectory[-1] != os.sep:
self.workingDirectory += os.sep
[docs] def SetProcessDirectory(self, Dir):
self.processDirectory = os.path.dirname(Dir)
if len(self.processDirectory) and self.processDirectory[-1] != os.sep:
self.processDirectory += os.sep
[docs] def SetCodeCommand(self, ccommand): # Code command
self.codeCommand = ccommand
[docs] def SetTemplateFile(self, filename):
self.tplFilename = filename
[docs] def ReadTemplateFile(self, filename=None):
if filename is not None:
self.SetTemplateFile(filename)
self.tpl = open(self.workingDirectory + os.sep + self.tplFilename).read()
[docs] def CopyFile(self, filetocopy):
import shutil
shutil.copy(self.workingDirectory + os.sep + filetocopy,
self.processDirectory + os.sep + filetocopy.split('/')[-1])
[docs]def CheckIntegrity(GUI: bool = False):
from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory
import Muscat.TestData as MuscatTestData
interface = Interface(MuscatTestData.GetTestDataPath())
interface.keepExternalWindows = GUI
interface.parameters['calcul'] = 'thermal_transient'
interface.parameters['Ti'] = 1000.0
interface.parameters['mesh'] = 'Cube_3D.geof'
interface.parameters['solver'] = 'mumps'
interface.parameters['python_script'] = 'reduction'
interface.parameters['sequence'] = 1
interface.parameters['time'] = 2000.
interface.parameters['increment'] = 20
interface.parameters['iteration'] = 1000
interface.parameters['ratio'] = 0.001
interface.parameters['algorithm'] = 'p1p2p3'
interface.parameters['bc'] = 'myBC'
interface.parameters['table'] = 'myTable'
interface.parameters['conductivity'] = '280.+100.*atan(0.01*(1100-temperature));'
interface.parameters['coefficient'] = '8.*(430.+40.*atan(0.01*(temperature-500.)))*(1.5-0.5*exp(-200./((temperature-1200.)*(temperature-1200.))));'
import sys
interface.SetProcessDirectory(TemporaryDirectory.GetTempPath())
interface.SetCodeCommand("echo")
interface.ReadTemplateFile('template.tpl')
interface.WriteFile(1)
interface.openExternalWindows = GUI
interface.keepExternalWindows = GUI
interface.SingleRunComputation(1, sys.stdout).wait()
print("lastCommandExecuted: " + str(interface.lastCommandExecuted))
interface.SetCodeCommand("echo")
interface.SetOptions(["{filter}"])
interface.parameters['filter'] = 'calcul1.inp'
interface.withFilename = False
print("output is :" + str(interface.SingleRunComputationAndReturnOutput(1).encode("ascii", "ignore")))
print("lastCommandExecuted: " + str(interface.lastCommandExecuted))
return 'ok'
if __name__ == '__main__':
print(CheckIntegrity(True)) # pragma: no cover