Source code for Muscat.IO.RstReader

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#

"""
rst based ansys mesh reader.
returns an unstructured mesh (class from BasicTools)
"""
from typing import List

import numpy as np

from Muscat.IO.ReaderBase import ReaderBase
from Muscat.IO.IOFactory import RegisterReaderClass
from Muscat.Types import MuscatFloat, MuscatIndex
from Muscat.MeshContainers.Mesh import Mesh
from Muscat.Helpers.Logger import Debug
import Muscat.MeshContainers.ElementsDescription as ED
from Muscat.IO.AnsysTools import nbIntegrationsPoints, AnsysElementDescriptorToMuscatElementType
from Muscat.FE.Fields.FEField import FEField
from Muscat.MeshContainers.Filters.FilterObjects import ElementFilter

[docs] def ReadRst(fileName: str, time: MuscatFloat = None, timeIndex: MuscatIndex = None, fromElementalNodalToNodal = True): """Function mesh and solution fields (noda, elemental and elemental nodal) from a rst file Parameters ---------- fileName : str, optional name of the file to be read, by default None time : float, optional time at which the field is read, by default None timeIndex :int, optional time index at which the field is read, by default None Returns ------- Mesh output Mesh object containing the results """ reader = RstReader() reader.SetFileName(fileName) reader.ReadMetaData() return reader.Read(time=time, timeIndex=timeIndex, fromElementalNodalToNodal=fromElementalNodalToNodal)
[docs] class RstReader(ReaderBase): """Rst Reader class """ def __init__(self): """ Initialize the RstReader Raises: ModuleNotFoundError: if the import ansys.dpf.core could not be load """ super().__init__() self.Reset() self.canHandleTemporal = True self.modal = False
[docs] def SetFileName(self, fileName: str) -> None: """Function to set the file to read Parameters ---------- fileName : str path to the simulation """ if self.fileName != fileName: self.Reset() self.fileName = fileName
[docs] def Reset(self): """Resets the reader """ self.fileName = None self.model = None self.meshMetadata = None self.nodal = {} self.elemental_nodal = {} self.elementary = {} self.atIntegrationPoints = False self.IdsToNodes = None self.timeToRead = -1 self.output = None self.to_nodal = None
[docs] def GetModel(self): """Function to recover the dpf Model associated to the rst file Raises: RuntimeError: if the fileName is not already set Returns: Model: dpf model """ if self.model is None: if self.fileName is None: raise RuntimeError("Need to set the fileName first") from ansys.dpf.core import Model self.model = Model(self.fileName) return self.model
[docs] def MeshRead(self) -> Mesh: """Function that performs the reading of a mesh in a rst file Returns ------- Mesh mesh containing reading result """ res = Mesh() rst = self.GetModel() metadata = rst.metadata ansys_mesh = metadata.meshed_region oidToElementContainerAndIndex = {} nbNodes = ansys_mesh.nodes.n_nodes if nbNodes: # ------------ Reading the NODES ------------ res.originalIDNodes = np.asarray(ansys_mesh.nodes.scoping.get_ids(), dtype=MuscatIndex) self.IdsToNodes = np.empty(np.max(res.originalIDNodes)+1, dtype=MuscatIndex) self.IdsToNodes[res.originalIDNodes] = np.arange(res.originalIDNodes.shape[0]) res.nodes = np.asarray(ansys_mesh.nodes.coordinates_field.data, dtype=MuscatFloat) # ------------ Reading the GROUPS/SELECTIONS ------------ group_names = ansys_mesh.available_named_selections for group_name in group_names: group_container = ansys_mesh.named_selection(group_name) if group_container.location == 'Nodal': nsetname = group_name if len(nsetname[0]) and group_container.size: tag = res.GetNodalTag(nsetname) tag.SetIds(np.asarray(group_container.ids, dtype=MuscatIndex)-1) nbElements = ansys_mesh.elements.n_elements if nbElements: j = 0 element_types = ansys_mesh.elements.element_types_field.data used_types, return_inverse, return_counts = np.unique(element_types, return_inverse=True, return_counts=True) connectivity_buffer = ansys_mesh.elements.connectivities_field.data element_ids = ansys_mesh.elements.scoping.ids nbnodes_per_type = np.array([ED.numberOfNodes[AnsysElementDescriptorToMuscatElementType[used_type]] for used_type in used_types]) offsets = np.empty(nbElements+1, dtype=MuscatIndex) offsets[0] = 0 offsets[1:] = np.add.accumulate(nbnodes_per_type[return_inverse]) for i, used_type in enumerate(used_types): nameType = AnsysElementDescriptorToMuscatElementType[used_type] elements = res.GetElementsOfType(nameType) cpt0 = elements.GetNumberOfElements() elements.Reserve(cpt0+return_counts[i]) numberOfNodesPerElement = elements.GetNumberOfNodesPerElement() index = np.where(element_types == used_type)[0] mask = np.arange(numberOfNodesPerElement, dtype=MuscatIndex) * np.ones((len(index), numberOfNodesPerElement), dtype=MuscatIndex) + offsets[index, None] localConnectivity = connectivity_buffer[mask] local_ids = element_ids[index] elements.AddNewElements(localConnectivity, local_ids) for j, oid in enumerate(local_ids): oidToElementContainerAndIndex[oid] = (elements, j+cpt0) eolOriginalIds = res.GetElementsOriginalIDs() self.IdsToElements = np.empty(np.max(eolOriginalIds)+1, dtype=MuscatIndex) self.IdsToElements[eolOriginalIds] = np.arange(eolOriginalIds.shape[0]) # ------------ Reading the GROUPS/SELECTIONS ------------ group_names = ansys_mesh.available_named_selections for group_name in group_names: group_container = ansys_mesh.named_selection(group_name) if group_container.location == 'Elemental': if group_container.size != 0: # res.AddElementToTagUsingOriginalId(group_container.ids, group_name) for oid_elem in group_container.ids: elementContainer, index = oidToElementContainerAndIndex[oid_elem] elementContainer.tags.CreateTag(group_name, False).AddToTag(index) res.PrepareForOutput() self.output = res return res
[docs] def ReadMetaData(self): """Function that performs the reading of the metadata of an rst file (number of points, number of elements, timesteps, fields) """ if self.meshMetadata is not None: return self.meshMetadata self.meshMetadata = {} rst = self.GetModel() metadata = rst.metadata ansys_mesh = metadata.meshed_region # nodes info self.meshMetadata["nbNodes"] = ansys_mesh.nodes.n_nodes self.meshMetadata["dimensionality"] = ansys_mesh.nodes.coordinates_field.shape[1] # elements info self.meshMetadata['nbElements'] = ansys_mesh.elements.n_elements # for the moment workbench does not export the integration point information (work must be done here) # the only solution is to export element-points wise information # this means the number of integration points is set to the number of nodes per element element_types = ansys_mesh.elements.element_types_field.data used_types, return_inverse, return_counts = np.unique(element_types, return_inverse=True, return_counts=True) integrationPointPerUsedElementTypes = np.asarray([nbIntegrationsPoints[AnsysElementDescriptorToMuscatElementType[used_type]] for used_type in used_types]) self.meshMetadata['IPPerElement'] = integrationPointPerUsedElementTypes[return_inverse] self.meshMetadata['nbIntegrationPoints'] = np.sum(self.meshMetadata['IPPerElement']) # self.nodal = {} self.elementary = {} self.elemental_nodal = {} storageTarget = {'Nodal': self.nodal, 'Elemental': self.elementary, 'ElementalNodal': self.elemental_nodal} if rst.metadata.result_info.n_results > 0: for property_res in rst.metadata.result_info.available_results: storage = storageTarget[property_res.native_location] if len(property_res.sub_results) > 0: op_list = [] for comp in property_res.sub_results[:property_res.n_components]: op_list.append(comp['operator name']) storage[property_res.name] = op_list else: storage[property_res.name] = [property_res.operator_name] self.time = np.array(rst.metadata.time_freq_support.time_frequencies.data, dtype=MuscatFloat)
[docs] def Read(self, time: MuscatFloat = None, timeIndex: MuscatIndex = None, fromElementalNodalToNodal = True) -> Mesh: """Function that performs the reading of the mesh and the fields in the rst Parameters ---------- time : float, optional time at which the field is read, by default None timeIndex :int, optional time index at which the field is read, by default None Returns ------- Mesh output Mesh object containing the results """ try: import ansys.dpf.core as dpf from ansys.dpf import post except ImportError: raise ModuleNotFoundError("To use this module ansys-dpf-core and ansys/pydpf-post must be installed") # If the simulation is modal then it sets self.modal to True because it needs # to accomodate to the 1 index of modal simulations self.modal = int(isinstance(post.load_simulation(self.fileName), post.ModalMechanicalSimulation)) mesh = self.MeshRead() for nodefield, operators in self.nodal.items(): field = self.ReadField(fieldname=nodefield, time=time, timeIndex=timeIndex) if field is not None: for i, op in enumerate(operators): mesh.nodeFields[op] = field[:, i] for elemField, operators in self.elementary.items(): field = self.ReadField(fieldname=elemField, time=time, timeIndex=timeIndex) if field is not None: for i, op in enumerate(operators): mesh.elemFields[op] = field[:, i] if fromElementalNodalToNodal: for elemField, operators in self.elemental_nodal.items(): field = self.ReadField(fieldname=elemField, time=time, timeIndex=timeIndex) if field is not None: for i, op in enumerate(operators): mesh.nodeFields[op] = field[:, i] else: for elemField, operators in self.elemental_nodal.items(): FEfield = self.ReadField(fieldname=elemField, time=time, timeIndex=timeIndex, fromElementalNodalToNodal=False) if FEfield is not None: for i, op in enumerate(operators): mesh.elemFields[op] = FEfield[i].GetCellRepresentation() return mesh
[docs] def ReadField(self, fieldname, time: MuscatFloat = None, timeIndex: MuscatIndex = None, fromElementalNodalToNodal = True) -> np.ndarray: """Function that performs the reading of a field Parameters ---------- fieldname : str, optional name of the field to be read, by default None time : float, optional time at which the field is read, by default None timeIndex : int, optional time index at which the field is read, by default None Returns ------- np.ndarray for an ElementalNodal field: if fromElementalNodalToNodal = True (default), this function returns a numpy array of the averaged value at the nodes if fromElementalNodalToNodal = False, this function returns a list of discontinuous FEfields (one for each component of the field) """ self.ReadMetaData() # need to shift the index in case of a modal study timeIndex = self.SetTimeToRead(time, timeIndex) + self.modal Debug("Reading timeIndex : " + str(timeIndex)) rst = self.GetModel() try: import ansys.dpf.core as dpf except ImportError: raise ModuleNotFoundError("To use this module ansys-dpf-core and ansys/pydpf-post must be installed") if fieldname in self.nodal: scoping = rst.metadata.meshed_region.nodes.scoping nbentities = self.output.GetNumberOfNodes() mapping = self.IdsToNodes elif fieldname in self.elementary: scoping = rst.metadata.meshed_region.elements.scoping nbentities = self.output.GetNumberOfElements() mapping = self.IdsToElements elif fieldname in self.elemental_nodal: elementFilterSolid = ElementFilter(dimensionality=self.output.GetElementsDimensionality()) solid_ids = [] for selection in elementFilterSolid(self.output): solid_ids.extend(selection.elements.originalIds) solid_ids = np.array(solid_ids, dtype=MuscatIndex) scoping = dpf.mesh_scoping_factory.elemental_scoping(solid_ids) nbentities = self.output.GetNumberOfNodes() mapping = self.IdsToNodes else: raise (Exception("unable to find field " + str(fieldname))) field = getattr(rst.results, fieldname)(mesh_scoping=scoping, time_scoping=int(timeIndex)).outputs.fields_container()[0] if fieldname in self.elemental_nodal: if not fromElementalNodalToNodal: ############ # generate a discontinues space to hold the field "elemental nodal non moyenné" from Muscat.FE.DofNumbering import ComputeDofNumbering from Muscat.FE.Spaces.FESpaces import LagrangeSpaceP1, LagrangeSpaceP2, LagrangeSpaceGeo support_ids = np.asarray(field.scoping.ids) data = field.data if data.ndim == 1: data = data[:, None] muscat_offset = self.output.ComputeGlobalOffset() # warning : depending of the nature of the field (stress, temperature...), the number of entries in field.data can correspond to the order of the mesh or to a linear version count = 0 for selection in elementFilterSolid.IterOnTypesOnly(self.output): count += ED.numberOfNodes[selection.elementType]*selection.elements.GetNumberOfElements() if count == data.shape[0]: space = LagrangeSpaceGeo else: space = LagrangeSpaceP1 numbering = ComputeDofNumbering(self.output, space=space, elementFilter=elementFilterSolid, discontinuous=True) elementsOriginalIDs = self.output.GetElementsOriginalIDs() nodesPerElements = np.zeros((np.max(elementsOriginalIDs)+1), dtype=MuscatIndex) elementsTypes = np.zeros((np.max(elementsOriginalIDs)+1), dtype=MuscatIndex)-1 for selection in elementFilterSolid.IterOnTypesOnly(self.output): nodesPerElements[selection.elements.originalIds] = space[selection.elementType].GetNumberOfShapeFunctions() elementsTypes[selection.elements.originalIds] = selection.elementType.value nodesPerElements_extraction = nodesPerElements[support_ids] elementsTypes_extraction = elementsTypes[support_ids] ansys_offset = np.empty(len(support_ids)+1, dtype=MuscatIndex) ansys_offset[0] = 0 ansys_offset[1:] = np.add.accumulate(nodesPerElements_extraction) res = [] for i in range(data.shape[1]): res_i = FEField(name=fieldname, mesh=self.output, space=space, numbering=numbering) res_i.Allocate() cpt = 0 for selection in elementFilterSolid(self.output): index = np.where(elementsTypes_extraction == selection.elementType.value)[0] numberOfNodesPerElement = space[selection.elementType].GetNumberOfShapeFunctions() fillingMask = np.arange(numberOfNodesPerElement, dtype=MuscatIndex) * np.ones((len(index), numberOfNodesPerElement), dtype=MuscatIndex) ansys_mask = fillingMask + ansys_offset[index, None] newData = data[ansys_mask, i] targetIndices = (self.IdsToElements[support_ids[index]] - muscat_offset[selection.elementType])*numberOfNodesPerElement + cpt muscat_mask = fillingMask + targetIndices[:, None] res_i.data[muscat_mask.ravel()] = newData.ravel() cpt += selection.elements.GetNumberOfElements()*numberOfNodesPerElement res.append(res_i) return res ############################# else: ############################# # construct the nodal averaged field to_nodal = dpf.operators.averaging.elemental_nodal_to_nodal() to_nodal.inputs.field.connect(field) field_nodal = to_nodal.outputs.field() support_ids = np.asarray(field_nodal.scoping.ids) data = field_nodal.data ############################# else: support_ids = np.asarray(field.scoping.ids) data = field.data if data.ndim == 1: data = data[:, None] res = np.zeros((nbentities, data.shape[1])) try: res[mapping[support_ids]] = data except ValueError: print(f"{fieldname} of shape {data.shape} could not be broadcast to indexing result of shape", support_ids.shape) return None return res
[docs] def SetTimeToRead(self, time: MuscatFloat = None, timeIndex: MuscatIndex = None) -> MuscatIndex: """Sets the time at which the data is read Parameters ---------- time : float, optional time at which the data is read, by default None timeIndex : int, optional time index at which the data is read, by default None Returns ------- int time index at which the data is read """ if (time is None) and (timeIndex is None): if self.timeToRead == -1: self.timeToRead = self.time[-1] return len(self.time)-1 else: return np.where(self.time == self.timeToRead)[0] if (time is not None) and (timeIndex is not None): raise (Exception("Cannot specify both time and timeIndex")) if time is None: self.timeToRead = self.time[timeIndex] return timeIndex elif time == -1: self.timeToRead = self.time[-1] return timeIndex else: self.timeToRead = time return np.where(self.time == self.timeToRead)[0][0]
[docs] def GetAvailableTimes(self) -> np.ndarray: """Returns the available times at which data can be read Returns ------- np.ndarray available times at which data can be read """ self.ReadMetaData() return self.time
[docs] def GetAvailableResultsAndOperators(self) -> List: """Returns the available results and its operator name Returns ------- list contains a namedTuple with the results. attributes are name, operator_name, native_location, n_components """ rst = self.GetModel() class Results(): __slots__ = ("name", "operator_name", "native_location", "n_components") def __init__(self, name, operator_name, native_location, n_components): self.name = name self.operator_name = operator_name self.native_location = native_location self.n_components = n_components def __repr__(self): return f"{self.name}({self.operator_name}, {self.native_location}, {self.n_components})" return [Results(r.name, r.operator_name, r.native_location, r.n_components) for r in rst.metadata.result_info.available_results]
[docs] def GetAvailablePostResults(self) -> List: try: import ansys.dpf.core from ansys.dpf import post except ImportError: raise ModuleNotFoundError("To use this module ansys-dpf-core and ansys/pydpf-post must be installed") simulation = post.load_simulation(self.fileName) return [attr for attr in dir(simulation) if callable(getattr(simulation, attr)) and not attr.startswith("__")]
[docs] def GetPostResults(self, post_names=[], locations=[]): try: from ansys.dpf import post except ImportError: raise ModuleNotFoundError("To use this module ansys/pydpf-post must be installed") simulation = post.load_simulation(self.fileName) for attribute, location in zip(post_names, locations): result = getattr(simulation, attribute)(location=getattr(post.locations, location)) if location == "nodal": res = np.empty_like(result.array) res[result.mesh_index.values-1] = result.array self.output.nodeFields[attribute] = res elif location == "elemental": res = np.empty_like(result.array) res[result.mesh_index.values-1] = result.array self.output.elemFields[attribute] = res
RegisterReaderClass(".rst", RstReader)
[docs] def CheckIntegrity(GUI: bool = False): import Muscat.TestData as MuscatTestData try: import ansys.dpf.core as dpf except ImportError: return 'skip: ansys.dpf.core not available ' file = MuscatTestData.GetTestDataPath()+"file.rst" ReadRst(file) reader = RstReader() reader.SetFileName(file) times = reader.GetAvailableTimes() reader.Read(time=times[-1], fromElementalNodalToNodal=False) reader.GetAvailableResultsAndOperators() reader.GetAvailablePostResults() reader.GetPostResults(['stress'], ["nodal"]) return "ok"
if __name__ == '__main__': print(CheckIntegrity(True))# pragma: no cover