# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
"""
rst based ansys mesh reader.
returns an unstructured mesh (class from BasicTools)
"""
from typing import List
import numpy as np
from Muscat.IO.ReaderBase import ReaderBase
from Muscat.IO.IOFactory import RegisterReaderClass
from Muscat.Types import MuscatFloat, MuscatIndex
from Muscat.MeshContainers.Mesh import Mesh
from Muscat.Helpers.Logger import Debug
import Muscat.MeshContainers.ElementsDescription as ED
from Muscat.IO.AnsysTools import nbIntegrationsPoints, AnsysElementDescriptorToMuscatElementType
from Muscat.FE.Fields.FEField import FEField
from Muscat.MeshContainers.Filters.FilterObjects import ElementFilter
[docs]
def ReadRst(fileName: str, time: MuscatFloat = None, timeIndex: MuscatIndex = None, fromElementalNodalToNodal = True):
"""Function mesh and solution fields (noda, elemental and elemental nodal) from a rst file
Parameters
----------
fileName : str, optional
name of the file to be read, by default None
time : float, optional
time at which the field is read, by default None
timeIndex :int, optional
time index at which the field is read, by default None
Returns
-------
Mesh
output Mesh object containing the results
"""
reader = RstReader()
reader.SetFileName(fileName)
reader.ReadMetaData()
return reader.Read(time=time, timeIndex=timeIndex, fromElementalNodalToNodal=fromElementalNodalToNodal)
[docs]
class RstReader(ReaderBase):
"""Rst Reader class
"""
def __init__(self):
""" Initialize the RstReader
Raises:
ModuleNotFoundError: if the import ansys.dpf.core could not be load
"""
super().__init__()
self.Reset()
self.canHandleTemporal = True
self.modal = False
[docs]
def SetFileName(self, fileName: str) -> None:
"""Function to set the file to read
Parameters
----------
fileName : str
path to the simulation
"""
if self.fileName != fileName:
self.Reset()
self.fileName = fileName
[docs]
def Reset(self):
"""Resets the reader
"""
self.fileName = None
self.model = None
self.meshMetadata = None
self.nodal = {}
self.elemental_nodal = {}
self.elementary = {}
self.atIntegrationPoints = False
self.IdsToNodes = None
self.timeToRead = -1
self.output = None
self.to_nodal = None
[docs]
def GetModel(self):
"""Function to recover the dpf Model associated to the rst file
Raises:
RuntimeError: if the fileName is not already set
Returns:
Model: dpf model
"""
if self.model is None:
if self.fileName is None:
raise RuntimeError("Need to set the fileName first")
from ansys.dpf.core import Model
self.model = Model(self.fileName)
return self.model
[docs]
def MeshRead(self) -> Mesh:
"""Function that performs the reading of a mesh in a rst file
Returns
-------
Mesh
mesh containing reading result
"""
res = Mesh()
rst = self.GetModel()
metadata = rst.metadata
ansys_mesh = metadata.meshed_region
oidToElementContainerAndIndex = {}
nbNodes = ansys_mesh.nodes.n_nodes
if nbNodes:
# ------------ Reading the NODES ------------
res.originalIDNodes = np.asarray(ansys_mesh.nodes.scoping.get_ids(), dtype=MuscatIndex)
self.IdsToNodes = np.empty(np.max(res.originalIDNodes)+1, dtype=MuscatIndex)
self.IdsToNodes[res.originalIDNodes] = np.arange(res.originalIDNodes.shape[0])
res.nodes = np.asarray(ansys_mesh.nodes.coordinates_field.data, dtype=MuscatFloat)
# ------------ Reading the GROUPS/SELECTIONS ------------
group_names = ansys_mesh.available_named_selections
for group_name in group_names:
group_container = ansys_mesh.named_selection(group_name)
if group_container.location == 'Nodal':
nsetname = group_name
if len(nsetname[0]) and group_container.size:
tag = res.GetNodalTag(nsetname)
tag.SetIds(np.asarray(group_container.ids, dtype=MuscatIndex)-1)
nbElements = ansys_mesh.elements.n_elements
if nbElements:
j = 0
element_types = ansys_mesh.elements.element_types_field.data
used_types, return_inverse, return_counts = np.unique(element_types, return_inverse=True, return_counts=True)
connectivity_buffer = ansys_mesh.elements.connectivities_field.data
element_ids = ansys_mesh.elements.scoping.ids
nbnodes_per_type = np.array([ED.numberOfNodes[AnsysElementDescriptorToMuscatElementType[used_type]] for used_type in used_types])
offsets = np.empty(nbElements+1, dtype=MuscatIndex)
offsets[0] = 0
offsets[1:] = np.add.accumulate(nbnodes_per_type[return_inverse])
for i, used_type in enumerate(used_types):
nameType = AnsysElementDescriptorToMuscatElementType[used_type]
elements = res.GetElementsOfType(nameType)
cpt0 = elements.GetNumberOfElements()
elements.Reserve(cpt0+return_counts[i])
numberOfNodesPerElement = elements.GetNumberOfNodesPerElement()
index = np.where(element_types == used_type)[0]
mask = np.arange(numberOfNodesPerElement, dtype=MuscatIndex) * np.ones((len(index), numberOfNodesPerElement), dtype=MuscatIndex) + offsets[index, None]
localConnectivity = connectivity_buffer[mask]
local_ids = element_ids[index]
elements.AddNewElements(localConnectivity, local_ids)
for j, oid in enumerate(local_ids):
oidToElementContainerAndIndex[oid] = (elements, j+cpt0)
eolOriginalIds = res.GetElementsOriginalIDs()
self.IdsToElements = np.empty(np.max(eolOriginalIds)+1, dtype=MuscatIndex)
self.IdsToElements[eolOriginalIds] = np.arange(eolOriginalIds.shape[0])
# ------------ Reading the GROUPS/SELECTIONS ------------
group_names = ansys_mesh.available_named_selections
for group_name in group_names:
group_container = ansys_mesh.named_selection(group_name)
if group_container.location == 'Elemental':
if group_container.size != 0:
# res.AddElementToTagUsingOriginalId(group_container.ids, group_name)
for oid_elem in group_container.ids:
elementContainer, index = oidToElementContainerAndIndex[oid_elem]
elementContainer.tags.CreateTag(group_name, False).AddToTag(index)
res.PrepareForOutput()
self.output = res
return res
[docs]
def Read(self, time: MuscatFloat = None, timeIndex: MuscatIndex = None, fromElementalNodalToNodal = True) -> Mesh:
"""Function that performs the reading of the mesh and the fields in the rst
Parameters
----------
time : float, optional
time at which the field is read, by default None
timeIndex :int, optional
time index at which the field is read, by default None
Returns
-------
Mesh
output Mesh object containing the results
"""
try:
import ansys.dpf.core as dpf
from ansys.dpf import post
except ImportError:
raise ModuleNotFoundError("To use this module ansys-dpf-core and ansys/pydpf-post must be installed")
# If the simulation is modal then it sets self.modal to True because it needs
# to accomodate to the 1 index of modal simulations
self.modal = int(isinstance(post.load_simulation(self.fileName), post.ModalMechanicalSimulation))
mesh = self.MeshRead()
for nodefield, operators in self.nodal.items():
field = self.ReadField(fieldname=nodefield, time=time, timeIndex=timeIndex)
if field is not None:
for i, op in enumerate(operators):
mesh.nodeFields[op] = field[:, i]
for elemField, operators in self.elementary.items():
field = self.ReadField(fieldname=elemField, time=time, timeIndex=timeIndex)
if field is not None:
for i, op in enumerate(operators):
mesh.elemFields[op] = field[:, i]
if fromElementalNodalToNodal:
for elemField, operators in self.elemental_nodal.items():
field = self.ReadField(fieldname=elemField, time=time, timeIndex=timeIndex)
if field is not None:
for i, op in enumerate(operators):
mesh.nodeFields[op] = field[:, i]
else:
for elemField, operators in self.elemental_nodal.items():
FEfield = self.ReadField(fieldname=elemField, time=time, timeIndex=timeIndex, fromElementalNodalToNodal=False)
if FEfield is not None:
for i, op in enumerate(operators):
mesh.elemFields[op] = FEfield[i].GetCellRepresentation()
return mesh
[docs]
def ReadField(self, fieldname, time: MuscatFloat = None, timeIndex: MuscatIndex = None, fromElementalNodalToNodal = True) -> np.ndarray:
"""Function that performs the reading of a field
Parameters
----------
fieldname : str, optional
name of the field to be read, by default None
time : float, optional
time at which the field is read, by default None
timeIndex : int, optional
time index at which the field is read, by default None
Returns
-------
np.ndarray
for an ElementalNodal field:
if fromElementalNodalToNodal = True (default), this function returns a numpy array of the averaged value at the nodes
if fromElementalNodalToNodal = False, this function returns a list of discontinuous FEfields (one for each component of the field)
"""
self.ReadMetaData()
# need to shift the index in case of a modal study
timeIndex = self.SetTimeToRead(time, timeIndex) + self.modal
Debug("Reading timeIndex : " + str(timeIndex))
rst = self.GetModel()
try:
import ansys.dpf.core as dpf
except ImportError:
raise ModuleNotFoundError("To use this module ansys-dpf-core and ansys/pydpf-post must be installed")
if fieldname in self.nodal:
scoping = rst.metadata.meshed_region.nodes.scoping
nbentities = self.output.GetNumberOfNodes()
mapping = self.IdsToNodes
elif fieldname in self.elementary:
scoping = rst.metadata.meshed_region.elements.scoping
nbentities = self.output.GetNumberOfElements()
mapping = self.IdsToElements
elif fieldname in self.elemental_nodal:
elementFilterSolid = ElementFilter(dimensionality=self.output.GetElementsDimensionality())
solid_ids = []
for selection in elementFilterSolid(self.output):
solid_ids.extend(selection.elements.originalIds)
solid_ids = np.array(solid_ids, dtype=MuscatIndex)
scoping = dpf.mesh_scoping_factory.elemental_scoping(solid_ids)
nbentities = self.output.GetNumberOfNodes()
mapping = self.IdsToNodes
else:
raise (Exception("unable to find field " + str(fieldname)))
field = getattr(rst.results, fieldname)(mesh_scoping=scoping, time_scoping=int(timeIndex)).outputs.fields_container()[0]
if fieldname in self.elemental_nodal:
if not fromElementalNodalToNodal:
############
# generate a discontinues space to hold the field "elemental nodal non moyenné"
from Muscat.FE.DofNumbering import ComputeDofNumbering
from Muscat.FE.Spaces.FESpaces import LagrangeSpaceP1, LagrangeSpaceP2, LagrangeSpaceGeo
support_ids = np.asarray(field.scoping.ids)
data = field.data
if data.ndim == 1:
data = data[:, None]
muscat_offset = self.output.ComputeGlobalOffset()
# warning : depending of the nature of the field (stress, temperature...), the number of entries in field.data can correspond to the order of the mesh or to a linear version
count = 0
for selection in elementFilterSolid.IterOnTypesOnly(self.output):
count += ED.numberOfNodes[selection.elementType]*selection.elements.GetNumberOfElements()
if count == data.shape[0]:
space = LagrangeSpaceGeo
else:
space = LagrangeSpaceP1
numbering = ComputeDofNumbering(self.output, space=space, elementFilter=elementFilterSolid, discontinuous=True)
elementsOriginalIDs = self.output.GetElementsOriginalIDs()
nodesPerElements = np.zeros((np.max(elementsOriginalIDs)+1), dtype=MuscatIndex)
elementsTypes = np.zeros((np.max(elementsOriginalIDs)+1), dtype=MuscatIndex)-1
for selection in elementFilterSolid.IterOnTypesOnly(self.output):
nodesPerElements[selection.elements.originalIds] = space[selection.elementType].GetNumberOfShapeFunctions()
elementsTypes[selection.elements.originalIds] = selection.elementType.value
nodesPerElements_extraction = nodesPerElements[support_ids]
elementsTypes_extraction = elementsTypes[support_ids]
ansys_offset = np.empty(len(support_ids)+1, dtype=MuscatIndex)
ansys_offset[0] = 0
ansys_offset[1:] = np.add.accumulate(nodesPerElements_extraction)
res = []
for i in range(data.shape[1]):
res_i = FEField(name=fieldname, mesh=self.output, space=space, numbering=numbering)
res_i.Allocate()
cpt = 0
for selection in elementFilterSolid(self.output):
index = np.where(elementsTypes_extraction == selection.elementType.value)[0]
numberOfNodesPerElement = space[selection.elementType].GetNumberOfShapeFunctions()
fillingMask = np.arange(numberOfNodesPerElement, dtype=MuscatIndex) * np.ones((len(index), numberOfNodesPerElement), dtype=MuscatIndex)
ansys_mask = fillingMask + ansys_offset[index, None]
newData = data[ansys_mask, i]
targetIndices = (self.IdsToElements[support_ids[index]] - muscat_offset[selection.elementType])*numberOfNodesPerElement + cpt
muscat_mask = fillingMask + targetIndices[:, None]
res_i.data[muscat_mask.ravel()] = newData.ravel()
cpt += selection.elements.GetNumberOfElements()*numberOfNodesPerElement
res.append(res_i)
return res
#############################
else:
#############################
# construct the nodal averaged field
to_nodal = dpf.operators.averaging.elemental_nodal_to_nodal()
to_nodal.inputs.field.connect(field)
field_nodal = to_nodal.outputs.field()
support_ids = np.asarray(field_nodal.scoping.ids)
data = field_nodal.data
#############################
else:
support_ids = np.asarray(field.scoping.ids)
data = field.data
if data.ndim == 1:
data = data[:, None]
res = np.zeros((nbentities, data.shape[1]))
try:
res[mapping[support_ids]] = data
except ValueError:
print(f"{fieldname} of shape {data.shape} could not be broadcast to indexing result of shape", support_ids.shape)
return None
return res
[docs]
def SetTimeToRead(self, time: MuscatFloat = None, timeIndex: MuscatIndex = None) -> MuscatIndex:
"""Sets the time at which the data is read
Parameters
----------
time : float, optional
time at which the data is read, by default None
timeIndex : int, optional
time index at which the data is read, by default None
Returns
-------
int
time index at which the data is read
"""
if (time is None) and (timeIndex is None):
if self.timeToRead == -1:
self.timeToRead = self.time[-1]
return len(self.time)-1
else:
return np.where(self.time == self.timeToRead)[0]
if (time is not None) and (timeIndex is not None):
raise (Exception("Cannot specify both time and timeIndex"))
if time is None:
self.timeToRead = self.time[timeIndex]
return timeIndex
elif time == -1:
self.timeToRead = self.time[-1]
return timeIndex
else:
self.timeToRead = time
return np.where(self.time == self.timeToRead)[0][0]
[docs]
def GetAvailableTimes(self) -> np.ndarray:
"""Returns the available times at which data can be read
Returns
-------
np.ndarray
available times at which data can be read
"""
self.ReadMetaData()
return self.time
[docs]
def GetAvailableResultsAndOperators(self) -> List:
"""Returns the available results and its operator name
Returns
-------
list
contains a namedTuple with the results. attributes are name, operator_name, native_location, n_components
"""
rst = self.GetModel()
class Results():
__slots__ = ("name", "operator_name", "native_location", "n_components")
def __init__(self, name, operator_name, native_location, n_components):
self.name = name
self.operator_name = operator_name
self.native_location = native_location
self.n_components = n_components
def __repr__(self):
return f"{self.name}({self.operator_name}, {self.native_location}, {self.n_components})"
return [Results(r.name, r.operator_name, r.native_location, r.n_components) for r in rst.metadata.result_info.available_results]
[docs]
def GetAvailablePostResults(self) -> List:
try:
import ansys.dpf.core
from ansys.dpf import post
except ImportError:
raise ModuleNotFoundError("To use this module ansys-dpf-core and ansys/pydpf-post must be installed")
simulation = post.load_simulation(self.fileName)
return [attr for attr in dir(simulation) if callable(getattr(simulation, attr)) and not attr.startswith("__")]
[docs]
def GetPostResults(self, post_names=[], locations=[]):
try:
from ansys.dpf import post
except ImportError:
raise ModuleNotFoundError("To use this module ansys/pydpf-post must be installed")
simulation = post.load_simulation(self.fileName)
for attribute, location in zip(post_names, locations):
result = getattr(simulation, attribute)(location=getattr(post.locations, location))
if location == "nodal":
res = np.empty_like(result.array)
res[result.mesh_index.values-1] = result.array
self.output.nodeFields[attribute] = res
elif location == "elemental":
res = np.empty_like(result.array)
res[result.mesh_index.values-1] = result.array
self.output.elemFields[attribute] = res
RegisterReaderClass(".rst", RstReader)
[docs]
def CheckIntegrity(GUI: bool = False):
import Muscat.TestData as MuscatTestData
try:
import ansys.dpf.core as dpf
except ImportError:
return 'skip: ansys.dpf.core not available '
file = MuscatTestData.GetTestDataPath()+"file.rst"
ReadRst(file)
reader = RstReader()
reader.SetFileName(file)
times = reader.GetAvailableTimes()
reader.Read(time=times[-1], fromElementalNodalToNodal=False)
reader.GetAvailableResultsAndOperators()
reader.GetAvailablePostResults()
reader.GetPostResults(['stress'], ["nodal"])
return "ok"
if __name__ == '__main__':
print(CheckIntegrity(True))# pragma: no cover