# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
"""CGNS file reader
"""
from enum import Enum
from typing import Dict, List, Optional, Tuple, Union
import os
import numpy as np
from Muscat.Types import MuscatFloat, MuscatIndex
from Muscat.Bridges.CGNSBridge import CGNSNameToMuscat, CGNSToMesh, ReadIntegrationPointPayload
from Muscat.MeshContainers.Mesh import Mesh
from Muscat.IO.ReaderBase import FieldSupports
from Muscat.FE.Fields.FEField import FEField
from Muscat.FE.Fields.IPField import IPField
[docs]
def ReadCGNS(fileName, time: Optional[MuscatFloat] = None, baseNumberOrName: Union[MuscatIndex, str] = MuscatIndex(0), zoneNumberOrName: Union[MuscatIndex, str] = MuscatIndex(0), keepPartitionedMesh=False) -> Mesh:
"""Function API for reading a CGNS file
Parameters
----------
fileName : str
name of the file to be read
time : float, optional
not coded yet, by default None
baseNumberOrName : int or str, optional
name of the base to use, by default 0 (first)
zoneNumberOrName : int or str, optional
name of the zone to be read, by default 0 (first)
Returns
-------
Mesh
output unstructured mesh object containing reading result
"""
reader = CGNSReader()
reader.SetFileName(fileName)
reader.baseNumberOrName = baseNumberOrName
reader.zoneNumberOrName = zoneNumberOrName
reader.SetTimeToRead(time)
reader.keepPartitionedMesh = keepPartitionedMesh
res = reader.Read(fileName=fileName)
return res
[docs]
class CGNSReader:
"""CGNS Reader class"""
def __init__(self) -> None:
super().__init__()
self.fileName: Optional[str] = None
self.fieldName = None
self.baseNumberOrName: Optional[Union[MuscatIndex, str]] = None
self.zoneNumberOrName: Optional[Union[MuscatIndex, str]] = None
self.timeToRead: MuscatFloat = MuscatFloat(-1.0)
self.canHandleTemporal = False
self.keepPartitionedMesh = False
self.readIntegrationPointPayload = False
self.integrationPointPayload = None
self.mesh_cache = None
[docs]
def SetFileName(self, fileName: Optional[str]) -> None:
"""Function to set fileName to read
Parameters
----------
fileName : str
name of the file to be read
"""
self.fileName = fileName
self.mesh_cache = None
if fileName is None:
self.__path = None
else:
self.filePath = os.path.abspath(os.path.dirname(fileName)) + os.sep
def _get_loaded_data(self, partitionedMesh: bool, needIntegrationPointPayload: bool = False):
"""Load and cache CGNS raw/converted data for the current reader configuration.
Parameters
----------
partitionedMesh : bool
Forwarded to :func:`CGNSToMesh` to control conversion mode.
needIntegrationPointPayload : bool, optional
If ``True``, also read integration-point payload and cache it. If the
mesh is already cached but payload is missing, payload is computed
lazily from the cached CGNS tree.
Returns
-------
tuple
``(node, mesh, ipPayload)`` where:
- ``node`` is the CGNS tree root returned by ``CGM.load(...)[0]``
- ``mesh`` is the converted Muscat mesh
- ``ipPayload`` is integration-point payload or ``None`` when not requested
Notes
-----
Cache entries are keyed by current ``fileName`` and ``partitionedMesh``.
Cache invalidation is handled in :meth:`SetFileName`.
"""
if self.fileName is None:
raise RuntimeError("No fileName set. Please call SetFileName before reading")
cache = self.mesh_cache
if cache is not None:
sameFile = cache.get("fileName", None) == self.fileName
samePartition = cache.get("partitionedMesh", None) == partitionedMesh
if sameFile and samePartition:
node = cache["node"]
mesh = cache["mesh"]
ipPayload = cache.get("integrationPointPayload", None)
if needIntegrationPointPayload and ipPayload is None:
ipPayload = ReadIntegrationPointPayload(node)
cache["integrationPointPayload"] = ipPayload
return node, mesh, ipPayload
import CGNS.MAP as CGM
node = CGM.load(self.fileName)[0]
mesh = CGNSToMesh(node, partitionedMesh=partitionedMesh)
ipPayload = ReadIntegrationPointPayload(node) if needIntegrationPointPayload else None
self.mesh_cache = {
"fileName": self.fileName,
"partitionedMesh": partitionedMesh,
"node": node,
"mesh": mesh,
"integrationPointPayload": ipPayload,
}
return node, mesh, ipPayload
[docs]
def SetTimeToRead(self, time: Optional[MuscatFloat] = None) -> None:
"""Function to set time value to read
Parameters
----------
time : float, optional
not coded yet, by default None
"""
if time is None:
self.timeToRead = MuscatFloat(0.0)
else: # pragma: no cover
raise NotImplementedError("not coded yet")
self.timeToRead = time
[docs]
def Read(self, fileName: Optional[str] = None, time: Optional[MuscatFloat] = None) -> Mesh:
"""Function that performs the reading of a CGNS result file
Parameters
----------
fileName : str
name of the file to be read
time : float, optional
not coded yet, by default None
baseNumberOrName : int or str, optional
name of the base to use, by default 0 (first)
zoneNumberOrName : int or str, optional
name of the zone to be read, by default 0 (first)
Returns
-------
Mesh
output unstructured mesh object containing reading result
"""
if fileName is not None:
self.SetFileName(fileName)
self.SetTimeToRead(time)
node, res, ipPayload = self._get_loaded_data(
partitionedMesh=self.keepPartitionedMesh,
needIntegrationPointPayload=self.readIntegrationPointPayload,
)
self.CGNSTree = node
self.integrationPointPayload = ipPayload if self.readIntegrationPointPayload else None
res.PrepareForOutput()
return res
def _build_ip_field_from_payload(self, mesh: Mesh, payload: Dict, fieldName: str):
from Muscat.FE.Fields.IPField import IPField, RestrictedIPField
from Muscat.FE.IntegrationRules import ElementaryQuadrature, MeshQuadrature
offsets = np.asarray(payload["itg_point_start_offset"], dtype=MuscatIndex).ravel()
flat = np.asarray(payload["data_arrays"][fieldName], dtype=MuscatFloat).ravel()
assoc = payload.get("itg_rules", None)
resolvedRules = payload.get("resolved_rules", {})
if assoc is None or assoc.get("ids", None) is None:
raise RuntimeError(f"Integration-point payload for '{fieldName}' has no integration-rule association")
ruleIdsPerElem = np.asarray(assoc["ids"], dtype=np.int32).ravel()
if ruleIdsPerElem.size + 1 != offsets.size:
raise RuntimeError(
f"Incompatible integration-point metadata for '{fieldName}': "
f"len(ids)={ruleIdsPerElem.size}, len(offsets)={offsets.size}"
)
md = mesh.GetElementsDimensionality()
from Muscat.MeshContainers.Filters.FilterObjects import ElementFilter
bulk_filter = ElementFilter(dimensionality=mesh.GetElementsDimensionality())
globalOffset = mesh.ComputeGlobalOffset()
quadrature = MeshQuadrature(name=f"CGNS_{fieldName}_Quadrature")
dataByElementType = {}
for selection in bulk_filter(mesh):
startElem = globalOffset[selection.elementType]
endElem = startElem + selection.Size()
localRuleIds = ruleIdsPerElem[startElem:endElem]
uniqueRuleIds = np.unique(localRuleIds)
if uniqueRuleIds.size != 1:
raise NotImplementedError(
f"Field '{fieldName}' uses multiple integration rules within element type {selection.elementType}."
)
rid = int(uniqueRuleIds[0])
if rid not in resolvedRules:
raise RuntimeError(f"Cannot resolve integration rule id {rid} for field '{fieldName}'")
ruleInfo = resolvedRules[rid]
cgnsElementType = ruleInfo.get("element_type", None)
if cgnsElementType is not None and cgnsElementType in CGNSNameToMuscat:
if CGNSNameToMuscat[cgnsElementType] != selection.elementType:
raise RuntimeError(
f"Rule element type mismatch for field '{fieldName}': "
f"rule={CGNSNameToMuscat[cgnsElementType]} mesh={selection.elementType}"
)
raw_points = np.asarray(ruleInfo["parametric_integration_points"], dtype=MuscatFloat).T
points = np.pad(raw_points , pad_width=((0,0),(0,3-raw_points.shape[1])) )
weights = np.asarray(ruleInfo["weights"], dtype=MuscatFloat)
quadrature.AddElementaryQuadrature(ElementaryQuadrature(elementType=selection.elementType, points=points, weights=weights))
firstStart = int(offsets[startElem])
firstEnd = int(offsets[startElem + 1])
nIp = firstEnd - firstStart
if nIp <= 0:
raise RuntimeError(f"Invalid number of integration points ({nIp}) for field '{fieldName}'")
localData = np.empty((selection.Size(), nIp), dtype=MuscatFloat)
for i in range(selection.Size()):
g = startElem + i
s = int(offsets[g])
e = int(offsets[g + 1])
if e - s != nIp:
raise NotImplementedError(
f"Variable number of integration points in field '{fieldName}' for element type {selection.elementType}."
)
localData[i, :] = flat[s:e]
dataByElementType[selection.elementType] = localData
return RestrictedIPField(name=fieldName, mesh=mesh, rule=quadrature, data=dataByElementType, elementFilter=bulk_filter)
from Muscat.IO.IOFactory import RegisterReaderClass
RegisterReaderClass(".cgns", CGNSReader,withError=False)
[docs]
def CheckIntegrity(GUI: bool = False):
try:
import CGNS.MAP as CGM
except ImportError:
print("CGNS Python module not found, skipping test")
return "skip"
import Muscat.IO.CGNSWriter as CW
CW.CheckIntegrity()
from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory
tempdir = TemporaryDirectory.GetTempPath()
mesh = ReadCGNS(fileName=tempdir + os.sep + "toto.cgns")
print("Read mesh from cgns:", mesh)
reader = CGNSReader()
reader.SetFileName(None)
from Muscat.TestData import GetTestDataPath
print("Read mesh from cgns:", ReadCGNS(GetTestDataPath() + "CGNSExample/2TriMesh.cgns"))
# Extra fields API with integration-point payload
from Muscat.IO.CGNSWriter import CGNSWriter
from Muscat.FE.Fields.IPField import IPField
from Muscat.FE.IntegrationRules import LagrangeP1Quadrature
w = CGNSWriter()
ipf = IPField(name="IPScalar", mesh=mesh, rule=LagrangeP1Quadrature)
ipf.Allocate()
for _, arr in ipf.data.items():
nE, nIp = arr.shape
arr[:, :] = np.arange(nE, dtype=float)[:, None] + np.linspace(0.0, 1.0, nIp, dtype=float)[None, :]
withIp = tempdir + os.sep + "reader_extra_with_ip.cgns"
w.Write(mesh=mesh, fileName=withIp, IntegrationPointPayload=[ipf])
r2 = CGNSReader()
r2.SetFileName(withIp)
available = r2.GetExtraAvailableFields()
assert ("IPScalar", FieldSupports.IPField) in available
extra = r2.ReadExtraFields([("IPScalar", FieldSupports.IPField)])
key = ("IPScalar", FieldSupports.IPField)
assert key in extra
ipField = extra[key]
from Muscat.MeshContainers.Filters.FilterObjects import ElementFilter
np.testing.assert_equal(ipField.Flatten().size, ipf.Flatten(elementFilter=ElementFilter(dimensionality=mesh.GetElementsDimensionality())).size)
np.testing.assert_equal(ipField.Flatten(), ipf.GetRestrictedIPField(ElementFilter(dimensionality=mesh.GetElementsDimensionality())).Flatten())
np.testing.assert_equal(ipField.Flatten(), ipf.Flatten(ElementFilter(dimensionality=mesh.GetElementsDimensionality())))
return "ok"
if __name__ == "__main__":
print(CheckIntegrity(True)) # pragma: no cover