# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
import numpy as np
import os
from Muscat.IO.WriterBase import WriterBase as WriterBase
import Muscat.IO.GeofReader as GR
import Muscat.IO.UtReader as UR
import Muscat.Containers.ElementsDescription as ED
from Muscat.FE.IntegrationRules import LagrangeIsoParamQuadrature
from Muscat.Helpers.TextFormatHelper import TFormat
from Muscat.IO.Parallel.UtMerger import Tag3D, Return3DElements
[docs]class UtSplitter(WriterBase):
"This class can split .ut, .goef, .ctnod, .node, .integ files from a monolithic Z-set solution"
def __init__(self):
super().__init__()
self.timeSteps = "all"
self.name = ""
self.dataFolder = ""
def __str__(self):
res = "UtSplit : \n"
res += " Name : " + str(self.name) + "\n"
res += " timeSteps : " + str(self.timeSteps) + "\n"
res += " dataFolder : " + str(self.dataFolder) + "\n"
return res
[docs] def SetName(self, name):
self.name = name
[docs] def SetdataFolder(self, folder):
self.dataFolder = folder
from os import listdir
from os.path import isfile, join
temp = [f.split(".") for f in listdir(self.dataFolder + self.name + "-pmeshes" + os.sep) if isfile(join(self.dataFolder + self.name + "-pmeshes" + os.sep, f))]
temp2 = []
count = 0
for f in temp:
temp2.append([])
for fi in f:
splitted = fi.split("-")
for fil in splitted:
try:
temp2[count].append(int(fil))
except ValueError:
temp2[count].append(fil)
count += 1
subdomains = []
for f in temp2:
if "geof" in f and self.name in f:
for fi in f:
if type(fi) == int:
subdomains.append(fi)
self.nbsd = np.max(subdomains)
print("Number of found subdomains:", self.nbsd)
[docs] def SetOutputFolder(self, outputFolder):
self.outputFolder = outputFolder
[docs] def SetTimeSteps(self, iterator):
self.timeSteps = iterator
[docs] def Split(self):
reader = UR.UtReader()
reader.SetFileName(self.dataFolder + self.name + ".ut")
reader.ReadMetaData()
if self.timeSteps == "all":
timeIndices = np.array(reader.time[:, 0], dtype = int)
else:
timeIndices = np.array(reader.time[self.timeSteps, 0], dtype = int)
nTimeSteps = timeIndices.shape[0]
reader.atIntegrationPoints = True
globaldataInteg = {}
for din in reader.integ:
globaldataInteg[din] = np.empty((reader.meshMetadata["nbIntegrationPoints"], nTimeSteps))
for timeStep in range(nTimeSteps):
globaldataInteg[din][:, timeStep] = reader.ReadField(fieldname=din, timeIndex = timeIndices[timeStep] - 1)
reader.atIntegrationPoints = False
globaldataNode = {}
for din in reader.node:
globaldataNode[din] = np.empty((reader.meshMetadata["nbNodes"], nTimeSteps))
for timeStep in range(nTimeSteps):
globaldataNode[din][:, timeStep] = reader.ReadField(fieldname=din, timeIndex = timeIndices[timeStep] - 1)
globalMesh = GR.ReadGeof(fileName=self.dataFolder + self.name + ".geof", readElset=False, readFaset=False, printNotRead=False)
Tag3D(globalMesh)
_, metaDataglobalMesh3D = Return3DElements(globalMesh)
globalMesh.originalIDNodes = np.array(globalMesh.originalIDNodes - 1, dtype=int)
nGpE = metaDataglobalMesh3D.NGaussperEl
from Muscat.Helpers.ProgressBar import PrintProgressBar
PrintProgressBar(0, self.nbsd, prefix="Progress:", suffix="Complete", length=50)
for sd in range(1, self.nbsd + 1):
sdString = "-" + str(sd).zfill(3)
localMesh = GR.ReadGeof(fileName=self.dataFolder + self.name + "-pmeshes" + os.sep + self.name + sdString + ".geof", readElset=False, readFaset=False, printNotRead=False)
Tag3D(localMesh)
localIdstotreat, metaDatalocalMesh3D = Return3DElements(localMesh)
localOriginalIDNodes = np.array(localMesh.originalIDNodes - 1, dtype=int)
# write .integ
data_integ = np.empty(len(reader.integ) * metaDatalocalMesh3D.NGauss * (nTimeSteps))
count0 = 0
for timeStep in range(nTimeSteps):
field = np.empty((len(reader.integ), metaDatalocalMesh3D.NGauss))
count = 0
for k in range(len(reader.integ)):
count = 0
for el in localIdstotreat:
field[k, count : count + nGpE] = globaldataInteg[reader.integ[k]][el * nGpE : (el + 1) * nGpE, timeStep]
count += nGpE
for m in range(len(localIdstotreat)):
for k in range(len(reader.integ)):
data_integ[count0 : count0 + nGpE] = field[k, nGpE * m : nGpE * (m + 1)]
count0 += nGpE
data_integ.astype(np.float32).byteswap().tofile(self.outputFolder + self.name + sdString + ".integ")
# write .node
count0 = 0
data_node = np.zeros(len(reader.node) * metaDatalocalMesh3D.Nodes * (nTimeSteps))
for timeStep in range(nTimeSteps):
for k in range(len(reader.node)):
data_node[count0 : count0 + metaDatalocalMesh3D.Nodes] = globaldataNode[reader.node[k]][localOriginalIDNodes, timeStep]
count0 += metaDatalocalMesh3D.Nodes
data_node.astype(np.float32).byteswap().tofile(self.outputFolder + self.name + sdString + ".node")
# write .ut
try:
relativePath = os.path.relpath(self.dataFolder, self.outputFolder)
except: # pragma: no cover
print("Error using relative path. Using absolute path (files generated are not relocatable)")
relativePath = self.dataFolder
__string = "**meshfile " + relativePath + os.sep + self.name + "-pmeshes" + os.sep + self.name + sdString + ".geof\n"
with open(self.dataFolder + self.name + ".ut", "r") as inFile:
# next(inFile)
inFile.readline()
for i in range(3):
__string += inFile.readline()
with open(self.outputFolder + self.name + sdString + ".ut", "w") as outFile:
outFile.write(__string)
for timeStep in range(nTimeSteps):
line = ""
for i in range(4):
line += str(int(reader.time[timeStep, i])) + " "
line += str(reader.time[timeStep, 4]) + "\n"
outFile.write(line)
PrintProgressBar(sd, self.nbsd, prefix="Progress:", suffix="Complete", length=50)
# write .cut
__string = "***decomposition\n" + " **global_mesh " + relativePath + os.sep + self.name + ".geof\n" + " **domains " + str(self.nbsd) + "\n"
with open(self.outputFolder + self.name + ".cut", "w") as outFile:
outFile.write(__string)
[docs]def CheckIntegrity(GUI: bool = False):
from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory
tempdir = TemporaryDirectory.GetTempPath()
import Muscat.TestData as MuscatTestData
##################################
# EXEMPLE SYNTAXE DU SPLITTER
import Muscat.IO.Parallel.UtSplitter as US
splitter = US.UtSplitter()
print(splitter)
splitter.SetName("cube")
splitter.SetdataFolder(MuscatTestData.GetTestDataPath() + "UtParExample" + os.sep)
splitter.SetOutputFolder(tempdir)
splitter.SetTimeSteps([0])
splitter.Split()
splitter.SetTimeSteps("all")
splitter.Split()
##################################
print("tempdir =", tempdir)
import filecmp
for i in range(splitter.nbsd):
sdString = "-" + str(i + 1).zfill(3)
print(
TFormat.InRed(
"node files for subdomain "
+ str(i + 1)
+ " equals ? "
+ str(filecmp.cmp(tempdir + "cube" + sdString + ".node", MuscatTestData.GetTestDataPath() + "UtParExample" + os.sep + "cube" + sdString + ".node", shallow=False))
)
)
print(
TFormat.InRed(
"integ files for subdomain "
+ str(i + 1)
+ " equals ? "
+ str(filecmp.cmp(tempdir + "cube" + sdString + ".integ", MuscatTestData.GetTestDataPath() + "UtParExample" + os.sep + "cube" + sdString + ".integ", shallow=False))
)
)
print(tempdir)
return "ok"
if __name__ == "__main__":
print((CheckIntegrity())) # pragma: no cover