# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
"""Ut file reader (Zset result file)
"""
from Muscat.IO.IOFactory import RegisterReaderClass
import os
import numpy as np
from typing import List, Dict
from Muscat.Types import MuscatFloat, MuscatIndex, ArrayLike
from Muscat.Containers.Mesh import Mesh
from Muscat.Helpers.Logger import Debug, Info
from Muscat.IO.ReaderBase import ReaderBase
[docs]def ReadFieldFromUt(fileName: str = "", fieldname: str = None, time: MuscatFloat = None, timeIndex: MuscatIndex = None, string: str = "", atIntegrationPoints: bool = False) -> ArrayLike:
"""Function API for reading a field defined in an ut file
Parameters
----------
fileName : str, optional
name of the file to be read, by default ""
fieldname : str, optional
name of the field to be read, by default None
time : float, optional
time at which the field is read, by default None
timeIndex : int, optional
time index at which the field is read, by default None
string : str, optional
data to be read as a string instead of a file, by default ""
atIntegrationPoints : bool, optional
if True, field is read at integration points (from .integ file), by default False
Returns
-------
np.ndarray
field read
"""
reader = UtReader()
reader.SetFileName(fileName)
reader.SetStringToRead(string)
reader.atIntegrationPoints = atIntegrationPoints
reader.ReadMetaData()
reader.SetFieldNameToRead(fieldname)
reader.SetTimeToRead(time=time, timeIndex=timeIndex)
return reader.ReadField()
[docs]class UtReader(ReaderBase):
"""Ut Reader class"""
def __init__(self):
super().__init__()
self.commentHeader = "#"
self.meshfile = None
self.node = []
self.integ = []
self.element = []
self.time = None
self.fieldNameToRead = None
self.timeToRead = -1
self.atIntegrationPoints = False
self.meshMetadata = None
self.cache = None
self.oldtimeindex = None
self.canHandleTemporal = True
[docs] def Reset(self):
"""Resets the reader"""
self.meshfile = None
self.node = []
self.integ = []
self.element = []
self.time = []
[docs] def SetFieldNameToRead(self, fieldname: str):
"""Sets the name of the field to read
Parameters
----------
fieldname : str
name of the field to read
"""
if fieldname is not None:
self.fieldNameToRead = fieldname
[docs] def SetTimeToRead(self, time: MuscatFloat = None, timeIndex: MuscatIndex = None) -> MuscatIndex :
"""Sets the time at which the data is read
Parameters
----------
time : float, optional
time at which the data is read, by default None
timeIndex : int, optional
time index at which the data is read, by default None
Returns
-------
int
time index at which the data is read
"""
if (time is None) and (timeIndex is None):
if self.timeToRead == -1:
self.timeToRead = self.time[-1][4]
return len(self.time) - 1
else:
return [data[4] for data in self.time].index(self.timeToRead)
if (time is not None) and (timeIndex is not None):
raise (Exception("Cannot specify both time and timeIndex"))
if time is None:
self.timeToRead = self.time[timeIndex][4]
return timeIndex
elif time == -1:
self.timeToRead = self.time[-1][4]
return timeIndex
else:
self.timeToRead = time
return [data[4] for data in self.time].index(self.timeToRead)
[docs] def GetAvailableTimes(self) -> MuscatFloat:
"""Returns the available times at which data can be read
Returns
-------
np.ndarray
available times at which data can be read
"""
return self.time[:, 4]
[docs] def Read(self, fieldNames :List[str] = None) -> Mesh:
"""Function that performs the reading of the data defined in an ut file
Parameters
----------
fieldNames : List[str], optional
list of field names to read
Returns
-------
Mesh
output unstructured mesh object containing reading result
"""
if self.meshfile[-5:] == ".geof":
from Muscat.IO.GeofReader import GeofReader
GR = GeofReader()
else:
from Muscat.IO.GeoReader import GeoReader
GR = GeoReader()
if os.path.isabs(self.meshfile):
GR.SetFileName(self.meshfile)
else:
GR.SetFileName(self.filePath + self.meshfile)
mesh = GR.Read()
if fieldNames is None:
for nodefield in self.node:
mesh.nodeFields[nodefield] = self.ReadField(fieldname=nodefield)
for integfield in self.integ:
mesh.nodeFields[integfield] = self.ReadField(fieldname=integfield)
else:
for nodefield in fieldNames:
mesh.nodeFields[nodefield] = self.ReadField(fieldname=nodefield)
return mesh
[docs] def ReadField(self, fieldname: str = None, time: float =None, timeIndex: int = None) -> ArrayLike:
"""Function that performs the reading of a field defined in an ut file
Parameters
----------
fieldname : str, optional
name of the field to be read, by default None
time : float, optional
time at which the field is read, by default None
timeIndex : int, optional
time index at which the field is read, by default None
Returns
-------
np.ndarray
field read
"""
self.ReadMetaData()
postfix = ""
if self.fileName[-1] == "p":
postfix = "p"
self.SetFieldNameToRead(fieldname)
timeIndex = self.SetTimeToRead(time, timeIndex)
if timeIndex != self.oldtimeindex:
self.cache = None
self.oldtimeindex = timeIndex
Debug("Reading timeIndex : " + str(timeIndex))
basename = ".".join(self.fileName.split(".")[0:-1])
# find the field to read
idx = None
res = None
nbUsednodes = self.meshMetadata["nbNodes"]
nbNodes = self.meshMetadata["nbNodes"]
nbIntegrationPoints = self.meshMetadata["nbIntegrationPoints"]
nbElements = self.meshMetadata["nbElements"]
IPPerElement = self.meshMetadata["IPPerElement"]
if self.atIntegrationPoints:
try:
idx = self.integ.index(self.fieldNameToRead)
offset = nbIntegrationPoints * len(self.integ) * timeIndex
count = nbIntegrationPoints
ffn = basename + ".integ"
except:
raise RuntimeError("unable to find field " + str(self.fieldNameToRead))
Debug("Opening file : " + str(ffn))
res = np.empty(count, dtype=MuscatFloat)
try:
if len(self.integ) == 1:
with open(ffn + postfix, "rb") as datafile:
datafile.seek(offset * 4)
res = np.fromfile(datafile, count=count, dtype=np.float32).byteswap()
elif np.min(IPPerElement) == np.max(IPPerElement):
# the .intef file is homogenius
with open(ffn + postfix, "rb") as datafile:
datafile.seek(offset * 4)
nip = IPPerElement[0]
if self.cache is None:
self.cache = np.fromfile(datafile, count=nip * nbElements * len(self.integ), dtype=np.float32).byteswap()
self.cache.shape = (nbElements, nip * len(self.integ))
res = self.cache[:, idx * nip : (idx + 1) * nip].flatten()
else:
with open(ffn + postfix, "rb") as datafile:
# Debug("Offset : " + str(offset*4))
# Debug("count : " + str(count))
datafile.seek(offset * 4)
cpt = 0
oldStep = 0
for el in range(nbElements):
# for sv in range(len(self.integ)):
# for ip in range(IPPerElement[el]):
# res[cpt] = np.fromfile(datafile ,count=1, dtype=np.float32).byteswap()
nip = IPPerElement[el]
oldStep += nip * idx
datafile.seek(4 * (oldStep), 1)
res[cpt : cpt + nip] = np.fromfile(datafile, count=nip, dtype=np.float32).byteswap()
cpt += nip
oldStep = (len(self.integ) - idx - 1) * nip
except: # pragma: no cover
print("Error Reading field : " + str(self.fieldNameToRead) + " (not read)")
raise
else:
try:
idx = self.node.index(self.fieldNameToRead)
offset = nbNodes * len(self.node) * timeIndex + idx * nbNodes
count = nbNodes
ffn = basename + ".node"
except:
try:
idx = self.integ.index(self.fieldNameToRead)
offset = nbUsednodes * len(self.integ) * timeIndex + idx * nbUsednodes
count = nbUsednodes
ffn = basename + ".ctnod"
except:
raise RuntimeError("unable to find field " + str(self.fieldNameToRead) + " in file " + self.fileName)
Debug("Opening file : " + str(ffn))
res = None
try:
with open(ffn + postfix, "rb") as datafile:
# Debug("Offset : " + str(offset*4))
datafile.seek(offset * 4)
res = np.fromfile(datafile, count=count, dtype=np.float32).byteswap()
except: # pragma: no cover
print("Error Reading field : " + str(self.fieldNameToRead) + " (not read)")
return np.asarray(res, dtype=MuscatFloat)
def __str__(self):
res = ""
res += "class UtReader (" + str(id(self)) + ")\n"
res += " meshfile : " + str(self.meshfile) + "\n"
res += " node : " + str(self.node) + "\n"
res += " integ : " + str(self.integ) + "\n"
res += " element : " + str(self.element) + "\n"
res += " times : \n"
for i in self.time:
res += " " + str(i) + "\n"
return res
RegisterReaderClass(".ut", UtReader)
RegisterReaderClass(".utp", UtReader)
[docs]def CheckIntegrity(GUI=True):
import shutil
from Muscat.Helpers.CheckTools import MustFailFunctionWith
from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory
from Muscat.Helpers.IO.FileTools import WriteTempFile
import Muscat.TestData as MuscatTestData
__teststring = """
**meshfile cube.geof
**node U1 U2 U3
**integ sig11 sig22 sig33 sig12 sig23 sig31
**element
1 1 1 1 0.000000000000000e+00
1 1 1 1 1.000000000000000e+00
"""
__teststringII = f"""
**meshfile
{TemporaryDirectory.GetTempPath()}cube.geo
**node U1 U2 U3
**integ sig11
**element
1 1 1 1 0.000000000000000e+00
1 1 1 1 1.000000000000000e+00
"""
numberOfnodeVariables = 3
def GenerateDataAndTest(filename, fileNameBase, postfix="", numberOfIntegVariables=6):
reader = UtReader()
reader.SetFileName(filename)
reader.ReadMetaData()
nbNodes = reader.meshMetadata["nbNodes"]
nbIP = reader.meshMetadata["nbIntegrationPoints"]
nbElements = reader.meshMetadata["nbElements"]
IPPerElement = reader.meshMetadata["IPPerElement"]
np.arange(nbNodes * numberOfnodeVariables * 2, dtype=np.float32).byteswap().tofile(TemporaryDirectory.GetTempPath() + f"{fileNameBase}.node" + postfix)
off1 = nbNodes * numberOfnodeVariables * 2
(np.arange(nbNodes * numberOfIntegVariables * 2, dtype=np.float32) + off1).byteswap().tofile(TemporaryDirectory.GetTempPath() + f"{fileNameBase}.ctnod" + postfix)
off2 = nbNodes * numberOfnodeVariables * 2 + nbNodes * numberOfIntegVariables * 2
ipdata = np.empty((nbElements, numberOfIntegVariables, 27), dtype=int)
for el in range(nbElements):
nip = IPPerElement[el]
for sv in range(numberOfIntegVariables):
for ip in range(nip):
ipdata[el, sv, ip] = el * 10000 + sv * 100 + ip
with open(TemporaryDirectory.GetTempPath() + f"{fileNameBase}.integ" + postfix, "wb") as datafile:
for i in range(2):
for el in range(nbElements):
nip = IPPerElement[el]
for sv in range(numberOfIntegVariables):
for ip in range(nip):
np.array(ipdata[el, sv, ip] + i * 1000000).astype(np.float32).byteswap().tofile(datafile)
reader.SetTimeToRead()
MustFailFunctionWith(Exception, reader.SetTimeToRead, 0, 0)
reader.SetTimeToRead(time=-1)
np.testing.assert_equal(reader.GetAvailableTimes(), [0.0, 1.0])
reader.Read()
MustFailFunctionWith(RuntimeError, reader.ReadField, "invalidField")
offset = 0
for t in [0.0, 1.0]:
for f in ["U1", "U2", "U3"]:
ref = np.arange(offset, offset + nbNodes, dtype=np.float32)
np.testing.assert_equal(reader.ReadField(fieldname=f, time=t), ref)
offset += nbNodes
offset = 0
for t in [0.0, 1.0]:
for f in ["sig11", "sig22", "sig33", "sig12", "sig23", "sig31"][0:numberOfIntegVariables]:
data = reader.ReadField(fieldname=f, time=t)
ref = np.arange(offset, offset + nbNodes, dtype=np.float32) + off1
np.testing.assert_equal(data, ref)
offset += nbNodes
reader.atIntegrationPoints = True
MustFailFunctionWith(RuntimeError, reader.ReadField, "invalidField")
offset = 0
for t in [0.0, 1.0]:
cpt = 0
for f in ["sig11", "sig22", "sig33", "sig12", "sig23", "sig31"][0:numberOfIntegVariables]:
data = reader.ReadField(fieldname=f, time=t)
ref = ipdata[:, cpt, :].ravel() + t * 1000000
np.testing.assert_equal(data, ref)
offset += nbIP
cpt += 1
ReadFieldFromUt(filename, fieldname="U1", time=0.0)
ReadUTMetaData(filename)
print(reader)
tempfileName = WriteTempFile("UtReaderTest.ut", content=__teststring)
shutil.copy(MuscatTestData.GetTestDataPath() + "cube.geof", TemporaryDirectory.GetTempPath() + "cube.geof")
GenerateDataAndTest(tempfileName, "UtReaderTest")
shutil.copy(MuscatTestData.GetTestDataPath() + "cube.geo", TemporaryDirectory.GetTempPath() + "cube.geo")
tempfileNameII = WriteTempFile("UtReaderTestII.utp", content=__teststringII)
GenerateDataAndTest(tempfileNameII, "UtReaderTestII", postfix="p", numberOfIntegVariables=1)
return "ok"
if __name__ == "__main__": # pragma: no cover
#
import time
a = time.time()
print(CheckIntegrity()) # pragma: no cover
print(time.time() - a)