Source code for Muscat.IO.CGNSReader

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#

"""CGNS file reader
"""
from enum import Enum
from typing import Dict, List, Optional, Tuple, Union
import os

import numpy as np

from Muscat.Types import MuscatFloat, MuscatIndex
from Muscat.Bridges.CGNSBridge import CGNSNameToMuscat, CGNSToMesh, ReadIntegrationPointPayload
from Muscat.MeshContainers.Mesh import Mesh
from Muscat.IO.ReaderBase import FieldSupports
from Muscat.FE.Fields.FEField import FEField
from Muscat.FE.Fields.IPField import IPField


[docs] def ReadCGNS(fileName, time: Optional[MuscatFloat] = None, baseNumberOrName: Union[MuscatIndex, str] = MuscatIndex(0), zoneNumberOrName: Union[MuscatIndex, str] = MuscatIndex(0), keepPartitionedMesh=False) -> Mesh: """Function API for reading a CGNS file Parameters ---------- fileName : str name of the file to be read time : float, optional not coded yet, by default None baseNumberOrName : int or str, optional name of the base to use, by default 0 (first) zoneNumberOrName : int or str, optional name of the zone to be read, by default 0 (first) Returns ------- Mesh output unstructured mesh object containing reading result """ reader = CGNSReader() reader.SetFileName(fileName) reader.baseNumberOrName = baseNumberOrName reader.zoneNumberOrName = zoneNumberOrName reader.SetTimeToRead(time) reader.keepPartitionedMesh = keepPartitionedMesh res = reader.Read(fileName=fileName) return res
[docs] class CGNSReader: """CGNS Reader class""" def __init__(self) -> None: super().__init__() self.fileName: Optional[str] = None self.fieldName = None self.baseNumberOrName: Optional[Union[MuscatIndex, str]] = None self.zoneNumberOrName: Optional[Union[MuscatIndex, str]] = None self.timeToRead: MuscatFloat = MuscatFloat(-1.0) self.canHandleTemporal = False self.keepPartitionedMesh = False self.readIntegrationPointPayload = False self.integrationPointPayload = None self.mesh_cache = None
[docs] def SetFileName(self, fileName: Optional[str]) -> None: """Function to set fileName to read Parameters ---------- fileName : str name of the file to be read """ self.fileName = fileName self.mesh_cache = None if fileName is None: self.__path = None else: self.filePath = os.path.abspath(os.path.dirname(fileName)) + os.sep
def _get_loaded_data(self, partitionedMesh: bool, needIntegrationPointPayload: bool = False): """Load and cache CGNS raw/converted data for the current reader configuration. Parameters ---------- partitionedMesh : bool Forwarded to :func:`CGNSToMesh` to control conversion mode. needIntegrationPointPayload : bool, optional If ``True``, also read integration-point payload and cache it. If the mesh is already cached but payload is missing, payload is computed lazily from the cached CGNS tree. Returns ------- tuple ``(node, mesh, ipPayload)`` where: - ``node`` is the CGNS tree root returned by ``CGM.load(...)[0]`` - ``mesh`` is the converted Muscat mesh - ``ipPayload`` is integration-point payload or ``None`` when not requested Notes ----- Cache entries are keyed by current ``fileName`` and ``partitionedMesh``. Cache invalidation is handled in :meth:`SetFileName`. """ if self.fileName is None: raise RuntimeError("No fileName set. Please call SetFileName before reading") cache = self.mesh_cache if cache is not None: sameFile = cache.get("fileName", None) == self.fileName samePartition = cache.get("partitionedMesh", None) == partitionedMesh if sameFile and samePartition: node = cache["node"] mesh = cache["mesh"] ipPayload = cache.get("integrationPointPayload", None) if needIntegrationPointPayload and ipPayload is None: ipPayload = ReadIntegrationPointPayload(node) cache["integrationPointPayload"] = ipPayload return node, mesh, ipPayload import CGNS.MAP as CGM node = CGM.load(self.fileName)[0] mesh = CGNSToMesh(node, partitionedMesh=partitionedMesh) ipPayload = ReadIntegrationPointPayload(node) if needIntegrationPointPayload else None self.mesh_cache = { "fileName": self.fileName, "partitionedMesh": partitionedMesh, "node": node, "mesh": mesh, "integrationPointPayload": ipPayload, } return node, mesh, ipPayload
[docs] def SetTimeToRead(self, time: Optional[MuscatFloat] = None) -> None: """Function to set time value to read Parameters ---------- time : float, optional not coded yet, by default None """ if time is None: self.timeToRead = MuscatFloat(0.0) else: # pragma: no cover raise NotImplementedError("not coded yet") self.timeToRead = time
[docs] def Read(self, fileName: Optional[str] = None, time: Optional[MuscatFloat] = None) -> Mesh: """Function that performs the reading of a CGNS result file Parameters ---------- fileName : str name of the file to be read time : float, optional not coded yet, by default None baseNumberOrName : int or str, optional name of the base to use, by default 0 (first) zoneNumberOrName : int or str, optional name of the zone to be read, by default 0 (first) Returns ------- Mesh output unstructured mesh object containing reading result """ if fileName is not None: self.SetFileName(fileName) self.SetTimeToRead(time) node, res, ipPayload = self._get_loaded_data( partitionedMesh=self.keepPartitionedMesh, needIntegrationPointPayload=self.readIntegrationPointPayload, ) self.CGNSTree = node self.integrationPointPayload = ipPayload if self.readIntegrationPointPayload else None res.PrepareForOutput() return res
[docs] def GetExtraAvailableFields(self) -> List[Tuple[str, FieldSupports]]: """return a list of tuples of (field name,field support)""" if self.fileName is None: raise RuntimeError("No fileName set. Please call SetFileName before GetExtraFieldsAvailable") if self.keepPartitionedMesh: raise NotImplementedError("GetExtraFieldsAvailable does not support keepPartitionedMesh=True") _, mesh, ipPayload = self._get_loaded_data(partitionedMesh=False, needIntegrationPointPayload=True) if not isinstance(mesh, Mesh): # pragma: no cover raise RuntimeError("Internal error: expected Mesh when partitionedMesh=False") if ipPayload is None: # pragma: no cover ipPayload = [] out: List[Tuple[str, FieldSupports]] = [] for name in mesh.nodeFields.keys(): out.append((name, FieldSupports.Nodal)) for name in mesh.elemFields.keys(): out.append((name, FieldSupports.Elementary)) for payload in ipPayload: for name in payload["data_arrays"].keys(): out.append((name, FieldSupports.IPField)) dedup: List[Tuple[str, FieldSupports]] = [] seen = set() for item in out: if item in seen: continue seen.add(item) dedup.append(item) return dedup
def _build_ip_field_from_payload(self, mesh: Mesh, payload: Dict, fieldName: str): from Muscat.FE.Fields.IPField import IPField, RestrictedIPField from Muscat.FE.IntegrationRules import ElementaryQuadrature, MeshQuadrature offsets = np.asarray(payload["itg_point_start_offset"], dtype=MuscatIndex).ravel() flat = np.asarray(payload["data_arrays"][fieldName], dtype=MuscatFloat).ravel() assoc = payload.get("itg_rules", None) resolvedRules = payload.get("resolved_rules", {}) if assoc is None or assoc.get("ids", None) is None: raise RuntimeError(f"Integration-point payload for '{fieldName}' has no integration-rule association") ruleIdsPerElem = np.asarray(assoc["ids"], dtype=np.int32).ravel() if ruleIdsPerElem.size + 1 != offsets.size: raise RuntimeError( f"Incompatible integration-point metadata for '{fieldName}': " f"len(ids)={ruleIdsPerElem.size}, len(offsets)={offsets.size}" ) md = mesh.GetElementsDimensionality() from Muscat.MeshContainers.Filters.FilterObjects import ElementFilter bulk_filter = ElementFilter(dimensionality=mesh.GetElementsDimensionality()) globalOffset = mesh.ComputeGlobalOffset() quadrature = MeshQuadrature(name=f"CGNS_{fieldName}_Quadrature") dataByElementType = {} for selection in bulk_filter(mesh): startElem = globalOffset[selection.elementType] endElem = startElem + selection.Size() localRuleIds = ruleIdsPerElem[startElem:endElem] uniqueRuleIds = np.unique(localRuleIds) if uniqueRuleIds.size != 1: raise NotImplementedError( f"Field '{fieldName}' uses multiple integration rules within element type {selection.elementType}." ) rid = int(uniqueRuleIds[0]) if rid not in resolvedRules: raise RuntimeError(f"Cannot resolve integration rule id {rid} for field '{fieldName}'") ruleInfo = resolvedRules[rid] cgnsElementType = ruleInfo.get("element_type", None) if cgnsElementType is not None and cgnsElementType in CGNSNameToMuscat: if CGNSNameToMuscat[cgnsElementType] != selection.elementType: raise RuntimeError( f"Rule element type mismatch for field '{fieldName}': " f"rule={CGNSNameToMuscat[cgnsElementType]} mesh={selection.elementType}" ) raw_points = np.asarray(ruleInfo["parametric_integration_points"], dtype=MuscatFloat).T points = np.pad(raw_points , pad_width=((0,0),(0,3-raw_points.shape[1])) ) weights = np.asarray(ruleInfo["weights"], dtype=MuscatFloat) quadrature.AddElementaryQuadrature(ElementaryQuadrature(elementType=selection.elementType, points=points, weights=weights)) firstStart = int(offsets[startElem]) firstEnd = int(offsets[startElem + 1]) nIp = firstEnd - firstStart if nIp <= 0: raise RuntimeError(f"Invalid number of integration points ({nIp}) for field '{fieldName}'") localData = np.empty((selection.Size(), nIp), dtype=MuscatFloat) for i in range(selection.Size()): g = startElem + i s = int(offsets[g]) e = int(offsets[g + 1]) if e - s != nIp: raise NotImplementedError( f"Variable number of integration points in field '{fieldName}' for element type {selection.elementType}." ) localData[i, :] = flat[s:e] dataByElementType[selection.elementType] = localData return RestrictedIPField(name=fieldName, mesh=mesh, rule=quadrature, data=dataByElementType, elementFilter=bulk_filter)
[docs] def ReadExtraFields(self, query : List[Tuple[str, FieldSupports]]) -> Dict[Tuple[str, FieldSupports],Union[np.ndarray, FEField, IPField]]: """ query is of the same type (structure) of the output of GetExtraFieldsAvailable and the output is a dict with keys of type of every entry of query, and the value the actual field""" if self.fileName is None: raise RuntimeError("No fileName set. Please call SetFileName before ReadExtraFields") if self.keepPartitionedMesh: raise NotImplementedError("ReadExtraFields does not support keepPartitionedMesh=True") if query is None: return {} _, mesh, ipPayload = self._get_loaded_data(partitionedMesh=False, needIntegrationPointPayload=True) if not isinstance(mesh, Mesh): # pragma: no cover raise RuntimeError("Internal error: expected Mesh when partitionedMesh=False") if ipPayload is None: # pragma: no cover ipPayload = [] payloadByFieldName: Dict[str, List[Dict]] = {} for payload in ipPayload: for name in payload["data_arrays"].keys(): payloadByFieldName.setdefault(name, []).append(payload) if isinstance(query, tuple) and len(query) == 2: query = [query] res = {} for item in query: if not isinstance(item, tuple) or len(item) != 2: raise TypeError(f"Each query entry must be a tuple (field_name, field_support), got {item}") fieldName, fieldSupport = item if isinstance(fieldSupport, str): fieldSupport = FieldSupports(fieldSupport) if fieldSupport == FieldSupports.Nodal: if fieldName not in mesh.nodeFields: raise KeyError(f"Node field '{fieldName}' not available") res[item] = mesh.nodeFields[fieldName] elif fieldSupport == FieldSupports.Elementary: if fieldName not in mesh.elemFields: raise KeyError(f"Element field '{fieldName}' not available") res[item] = mesh.elemFields[fieldName] elif fieldSupport == FieldSupports.IPField: if fieldName not in payloadByFieldName: raise KeyError(f"Integration-point field '{fieldName}' not available") payloads = payloadByFieldName[fieldName] if len(payloads) != 1: raise NotImplementedError( f"Field '{fieldName}' is present in multiple integration-point payloads; " "disambiguation is not implemented yet." ) res[item] = self._build_ip_field_from_payload(mesh=mesh, payload=payloads[0], fieldName=fieldName) else: raise NotImplementedError(f"ReadExtraFields is not implemented for support '{fieldSupport}'") return res
from Muscat.IO.IOFactory import RegisterReaderClass RegisterReaderClass(".cgns", CGNSReader,withError=False)
[docs] def CheckIntegrity(GUI: bool = False): try: import CGNS.MAP as CGM except ImportError: print("CGNS Python module not found, skipping test") return "skip" import Muscat.IO.CGNSWriter as CW CW.CheckIntegrity() from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory tempdir = TemporaryDirectory.GetTempPath() mesh = ReadCGNS(fileName=tempdir + os.sep + "toto.cgns") print("Read mesh from cgns:", mesh) reader = CGNSReader() reader.SetFileName(None) from Muscat.TestData import GetTestDataPath print("Read mesh from cgns:", ReadCGNS(GetTestDataPath() + "CGNSExample/2TriMesh.cgns")) # Extra fields API with integration-point payload from Muscat.IO.CGNSWriter import CGNSWriter from Muscat.FE.Fields.IPField import IPField from Muscat.FE.IntegrationRules import LagrangeP1Quadrature w = CGNSWriter() ipf = IPField(name="IPScalar", mesh=mesh, rule=LagrangeP1Quadrature) ipf.Allocate() for _, arr in ipf.data.items(): nE, nIp = arr.shape arr[:, :] = np.arange(nE, dtype=float)[:, None] + np.linspace(0.0, 1.0, nIp, dtype=float)[None, :] withIp = tempdir + os.sep + "reader_extra_with_ip.cgns" w.Write(mesh=mesh, fileName=withIp, IntegrationPointPayload=[ipf]) r2 = CGNSReader() r2.SetFileName(withIp) available = r2.GetExtraAvailableFields() assert ("IPScalar", FieldSupports.IPField) in available extra = r2.ReadExtraFields([("IPScalar", FieldSupports.IPField)]) key = ("IPScalar", FieldSupports.IPField) assert key in extra ipField = extra[key] from Muscat.MeshContainers.Filters.FilterObjects import ElementFilter np.testing.assert_equal(ipField.Flatten().size, ipf.Flatten(elementFilter=ElementFilter(dimensionality=mesh.GetElementsDimensionality())).size) np.testing.assert_equal(ipField.Flatten(), ipf.GetRestrictedIPField(ElementFilter(dimensionality=mesh.GetElementsDimensionality())).Flatten()) np.testing.assert_equal(ipField.Flatten(), ipf.Flatten(ElementFilter(dimensionality=mesh.GetElementsDimensionality()))) return "ok"
if __name__ == "__main__": print(CheckIntegrity(True)) # pragma: no cover