Source code for Muscat.IO.UtReader

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
"""Ut file reader (Zset result file)
"""
from Muscat.IO.IOFactory import RegisterReaderClass
import os

import numpy as np
from typing import List, Dict
from Muscat.Types import MuscatFloat, MuscatIndex, ArrayLike
from Muscat.Containers.Mesh import Mesh
from Muscat.Helpers.Logger import Debug, Info
from Muscat.IO.ReaderBase import ReaderBase

[docs]def ReadFieldFromUt(fileName: str = "", fieldname: str = None, time: MuscatFloat = None, timeIndex: MuscatIndex = None, string: str = "", atIntegrationPoints: bool = False) -> ArrayLike: """Function API for reading a field defined in an ut file Parameters ---------- fileName : str, optional name of the file to be read, by default "" fieldname : str, optional name of the field to be read, by default None time : float, optional time at which the field is read, by default None timeIndex : int, optional time index at which the field is read, by default None string : str, optional data to be read as a string instead of a file, by default "" atIntegrationPoints : bool, optional if True, field is read at integration points (from .integ file), by default False Returns ------- np.ndarray field read """ reader = UtReader() reader.SetFileName(fileName) reader.SetStringToRead(string) reader.atIntegrationPoints = atIntegrationPoints reader.ReadMetaData() reader.SetFieldNameToRead(fieldname) reader.SetTimeToRead(time=time, timeIndex=timeIndex) return reader.ReadField()
[docs]def ReadUTMetaData(fileName: str) -> Dict: """Function API for reading of metadate of an ut file Parameters ---------- fileName : str, optional name of the file to be read, by default None Returns ------- dict metadate of the ut file """ reader = UtReader() reader.SetFileName(fileName) reader.ReadUTMetaData() metaData = {} metaData["meshFile"] = reader.meshfile metaData["node"] = reader.node metaData["integ"] = reader.integ metaData["element"] = reader.element metaData["time"] = reader.time return metaData
[docs]class UtReader(ReaderBase): """Ut Reader class""" def __init__(self): super().__init__() self.commentHeader = "#" self.meshfile = None self.node = [] self.integ = [] self.element = [] self.time = None self.fieldNameToRead = None self.timeToRead = -1 self.atIntegrationPoints = False self.meshMetadata = None self.cache = None self.oldtimeindex = None self.canHandleTemporal = True
[docs] def Reset(self): """Resets the reader""" self.meshfile = None self.node = [] self.integ = [] self.element = [] self.time = []
[docs] def SetFieldNameToRead(self, fieldname: str): """Sets the name of the field to read Parameters ---------- fieldname : str name of the field to read """ if fieldname is not None: self.fieldNameToRead = fieldname
[docs] def SetTimeToRead(self, time: MuscatFloat = None, timeIndex: MuscatIndex = None) -> MuscatIndex : """Sets the time at which the data is read Parameters ---------- time : float, optional time at which the data is read, by default None timeIndex : int, optional time index at which the data is read, by default None Returns ------- int time index at which the data is read """ if (time is None) and (timeIndex is None): if self.timeToRead == -1: self.timeToRead = self.time[-1][4] return len(self.time) - 1 else: return [data[4] for data in self.time].index(self.timeToRead) if (time is not None) and (timeIndex is not None): raise (Exception("Cannot specify both time and timeIndex")) if time is None: self.timeToRead = self.time[timeIndex][4] return timeIndex elif time == -1: self.timeToRead = self.time[-1][4] return timeIndex else: self.timeToRead = time return [data[4] for data in self.time].index(self.timeToRead)
[docs] def GetAvailableTimes(self) -> MuscatFloat: """Returns the available times at which data can be read Returns ------- np.ndarray available times at which data can be read """ return self.time[:, 4]
[docs] def ReadUTMetaData(self): """Function that performs the reading of the metadata of an ut file""" self.StartReading() self.Reset() while True: line = self.ReadCleanLine() if line == None: break if line.find("**meshfile") > -1: s = line.split() if len(s) == 1: line = self.ReadCleanLine() self.meshfile = line else: self.meshfile = s[-1] continue if line.find("**node") > -1: s = line.split() self.node = s[1:] continue if line.find("**integ") > -1: s = line.split() self.integ = s[1:] continue if line.find("**element") > -1: s = line.split() self.element = s[1:] continue # all the rest are the times s = line.split() self.time.append([b(a) for a, b in zip(s, [int, int, int, int, float])]) self.time = np.array(self.time) self.EndReading()
[docs] def ReadMetaData(self) -> Dict: """Function that performs the reading of the metadata of an ut file, and of the mesh defined in it Returns ------- dict global information on the mesh defined in the ut file """ self.ReadUTMetaData() if self.meshMetadata is not None: return self.meshMetadata if self.meshfile.endswith(".geof"): from Muscat.IO.GeofReader import GeofReader GR = GeofReader() else: from Muscat.IO.GeoReader import GeoReader GR = GeoReader() if os.path.isabs(self.meshfile): GR.SetFileName(self.meshfile) else: GR.SetFileName(self.filePath + self.meshfile) self.SetMeshMetaData(GR.ReadMetaData()) return self.meshMetadata
[docs] def SetMeshMetaData(self, meshMetadata): """ To prevent reading the mesh file again in case multiple ut files are related to the same mesh file """ self.meshMetadata = meshMetadata
[docs] def Read(self, fieldNames :List[str] = None) -> Mesh: """Function that performs the reading of the data defined in an ut file Parameters ---------- fieldNames : List[str], optional list of field names to read Returns ------- Mesh output unstructured mesh object containing reading result """ if self.meshfile[-5:] == ".geof": from Muscat.IO.GeofReader import GeofReader GR = GeofReader() else: from Muscat.IO.GeoReader import GeoReader GR = GeoReader() if os.path.isabs(self.meshfile): GR.SetFileName(self.meshfile) else: GR.SetFileName(self.filePath + self.meshfile) mesh = GR.Read() if fieldNames is None: for nodefield in self.node: mesh.nodeFields[nodefield] = self.ReadField(fieldname=nodefield) for integfield in self.integ: mesh.nodeFields[integfield] = self.ReadField(fieldname=integfield) else: for nodefield in fieldNames: mesh.nodeFields[nodefield] = self.ReadField(fieldname=nodefield) return mesh
[docs] def ReadField(self, fieldname: str = None, time: float =None, timeIndex: int = None) -> ArrayLike: """Function that performs the reading of a field defined in an ut file Parameters ---------- fieldname : str, optional name of the field to be read, by default None time : float, optional time at which the field is read, by default None timeIndex : int, optional time index at which the field is read, by default None Returns ------- np.ndarray field read """ self.ReadMetaData() postfix = "" if self.fileName[-1] == "p": postfix = "p" self.SetFieldNameToRead(fieldname) timeIndex = self.SetTimeToRead(time, timeIndex) if timeIndex != self.oldtimeindex: self.cache = None self.oldtimeindex = timeIndex Debug("Reading timeIndex : " + str(timeIndex)) basename = ".".join(self.fileName.split(".")[0:-1]) # find the field to read idx = None res = None nbUsednodes = self.meshMetadata["nbNodes"] nbNodes = self.meshMetadata["nbNodes"] nbIntegrationPoints = self.meshMetadata["nbIntegrationPoints"] nbElements = self.meshMetadata["nbElements"] IPPerElement = self.meshMetadata["IPPerElement"] if self.atIntegrationPoints: try: idx = self.integ.index(self.fieldNameToRead) offset = nbIntegrationPoints * len(self.integ) * timeIndex count = nbIntegrationPoints ffn = basename + ".integ" except: raise RuntimeError("unable to find field " + str(self.fieldNameToRead)) Debug("Opening file : " + str(ffn)) res = np.empty(count, dtype=MuscatFloat) try: if len(self.integ) == 1: with open(ffn + postfix, "rb") as datafile: datafile.seek(offset * 4) res = np.fromfile(datafile, count=count, dtype=np.float32).byteswap() elif np.min(IPPerElement) == np.max(IPPerElement): # the .intef file is homogenius with open(ffn + postfix, "rb") as datafile: datafile.seek(offset * 4) nip = IPPerElement[0] if self.cache is None: self.cache = np.fromfile(datafile, count=nip * nbElements * len(self.integ), dtype=np.float32).byteswap() self.cache.shape = (nbElements, nip * len(self.integ)) res = self.cache[:, idx * nip : (idx + 1) * nip].flatten() else: with open(ffn + postfix, "rb") as datafile: # Debug("Offset : " + str(offset*4)) # Debug("count : " + str(count)) datafile.seek(offset * 4) cpt = 0 oldStep = 0 for el in range(nbElements): # for sv in range(len(self.integ)): # for ip in range(IPPerElement[el]): # res[cpt] = np.fromfile(datafile ,count=1, dtype=np.float32).byteswap() nip = IPPerElement[el] oldStep += nip * idx datafile.seek(4 * (oldStep), 1) res[cpt : cpt + nip] = np.fromfile(datafile, count=nip, dtype=np.float32).byteswap() cpt += nip oldStep = (len(self.integ) - idx - 1) * nip except: # pragma: no cover print("Error Reading field : " + str(self.fieldNameToRead) + " (not read)") raise else: try: idx = self.node.index(self.fieldNameToRead) offset = nbNodes * len(self.node) * timeIndex + idx * nbNodes count = nbNodes ffn = basename + ".node" except: try: idx = self.integ.index(self.fieldNameToRead) offset = nbUsednodes * len(self.integ) * timeIndex + idx * nbUsednodes count = nbUsednodes ffn = basename + ".ctnod" except: raise RuntimeError("unable to find field " + str(self.fieldNameToRead) + " in file " + self.fileName) Debug("Opening file : " + str(ffn)) res = None try: with open(ffn + postfix, "rb") as datafile: # Debug("Offset : " + str(offset*4)) datafile.seek(offset * 4) res = np.fromfile(datafile, count=count, dtype=np.float32).byteswap() except: # pragma: no cover print("Error Reading field : " + str(self.fieldNameToRead) + " (not read)") return np.asarray(res, dtype=MuscatFloat)
def __str__(self): res = "" res += "class UtReader (" + str(id(self)) + ")\n" res += " meshfile : " + str(self.meshfile) + "\n" res += " node : " + str(self.node) + "\n" res += " integ : " + str(self.integ) + "\n" res += " element : " + str(self.element) + "\n" res += " times : \n" for i in self.time: res += " " + str(i) + "\n" return res
RegisterReaderClass(".ut", UtReader) RegisterReaderClass(".utp", UtReader)
[docs]def CheckIntegrity(GUI=True): import shutil from Muscat.Helpers.CheckTools import MustFailFunctionWith from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory from Muscat.Helpers.IO.FileTools import WriteTempFile import Muscat.TestData as MuscatTestData __teststring = """ **meshfile cube.geof **node U1 U2 U3 **integ sig11 sig22 sig33 sig12 sig23 sig31 **element 1 1 1 1 0.000000000000000e+00 1 1 1 1 1.000000000000000e+00 """ __teststringII = f""" **meshfile {TemporaryDirectory.GetTempPath()}cube.geo **node U1 U2 U3 **integ sig11 **element 1 1 1 1 0.000000000000000e+00 1 1 1 1 1.000000000000000e+00 """ numberOfnodeVariables = 3 def GenerateDataAndTest(filename, fileNameBase, postfix="", numberOfIntegVariables=6): reader = UtReader() reader.SetFileName(filename) reader.ReadMetaData() nbNodes = reader.meshMetadata["nbNodes"] nbIP = reader.meshMetadata["nbIntegrationPoints"] nbElements = reader.meshMetadata["nbElements"] IPPerElement = reader.meshMetadata["IPPerElement"] np.arange(nbNodes * numberOfnodeVariables * 2, dtype=np.float32).byteswap().tofile(TemporaryDirectory.GetTempPath() + f"{fileNameBase}.node" + postfix) off1 = nbNodes * numberOfnodeVariables * 2 (np.arange(nbNodes * numberOfIntegVariables * 2, dtype=np.float32) + off1).byteswap().tofile(TemporaryDirectory.GetTempPath() + f"{fileNameBase}.ctnod" + postfix) off2 = nbNodes * numberOfnodeVariables * 2 + nbNodes * numberOfIntegVariables * 2 ipdata = np.empty((nbElements, numberOfIntegVariables, 27), dtype=int) for el in range(nbElements): nip = IPPerElement[el] for sv in range(numberOfIntegVariables): for ip in range(nip): ipdata[el, sv, ip] = el * 10000 + sv * 100 + ip with open(TemporaryDirectory.GetTempPath() + f"{fileNameBase}.integ" + postfix, "wb") as datafile: for i in range(2): for el in range(nbElements): nip = IPPerElement[el] for sv in range(numberOfIntegVariables): for ip in range(nip): np.array(ipdata[el, sv, ip] + i * 1000000).astype(np.float32).byteswap().tofile(datafile) reader.SetTimeToRead() MustFailFunctionWith(Exception, reader.SetTimeToRead, 0, 0) reader.SetTimeToRead(time=-1) np.testing.assert_equal(reader.GetAvailableTimes(), [0.0, 1.0]) reader.Read() MustFailFunctionWith(RuntimeError, reader.ReadField, "invalidField") offset = 0 for t in [0.0, 1.0]: for f in ["U1", "U2", "U3"]: ref = np.arange(offset, offset + nbNodes, dtype=np.float32) np.testing.assert_equal(reader.ReadField(fieldname=f, time=t), ref) offset += nbNodes offset = 0 for t in [0.0, 1.0]: for f in ["sig11", "sig22", "sig33", "sig12", "sig23", "sig31"][0:numberOfIntegVariables]: data = reader.ReadField(fieldname=f, time=t) ref = np.arange(offset, offset + nbNodes, dtype=np.float32) + off1 np.testing.assert_equal(data, ref) offset += nbNodes reader.atIntegrationPoints = True MustFailFunctionWith(RuntimeError, reader.ReadField, "invalidField") offset = 0 for t in [0.0, 1.0]: cpt = 0 for f in ["sig11", "sig22", "sig33", "sig12", "sig23", "sig31"][0:numberOfIntegVariables]: data = reader.ReadField(fieldname=f, time=t) ref = ipdata[:, cpt, :].ravel() + t * 1000000 np.testing.assert_equal(data, ref) offset += nbIP cpt += 1 ReadFieldFromUt(filename, fieldname="U1", time=0.0) ReadUTMetaData(filename) print(reader) tempfileName = WriteTempFile("UtReaderTest.ut", content=__teststring) shutil.copy(MuscatTestData.GetTestDataPath() + "cube.geof", TemporaryDirectory.GetTempPath() + "cube.geof") GenerateDataAndTest(tempfileName, "UtReaderTest") shutil.copy(MuscatTestData.GetTestDataPath() + "cube.geo", TemporaryDirectory.GetTempPath() + "cube.geo") tempfileNameII = WriteTempFile("UtReaderTestII.utp", content=__teststringII) GenerateDataAndTest(tempfileNameII, "UtReaderTestII", postfix="p", numberOfIntegVariables=1) return "ok"
if __name__ == "__main__": # pragma: no cover # import time a = time.time() print(CheckIntegrity()) # pragma: no cover print(time.time() - a)