Source code for Muscat.IO.CGNSWriter

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#

"""CGNS file writer
"""
import os
import numpy as np
from copy import deepcopy

from Muscat.MeshContainers.Mesh import Mesh
from Muscat.Bridges.CGNSBridge import (
    MeshToCGNS,
    MuscatToCGNSNames,
    AddIntegrationRuleCollection,
    AddIntegrationPointFlowSolution,
)
from Muscat.IO.WriterBase import WriterBase as WriterBase
from Muscat.IO.IOFactory import RegisterWriterClass
from Muscat.FE.Fields.IPField import IPField
from Muscat.MeshContainers.ElementsDescription import ElementsInfo

[docs] class CGNSWriter(WriterBase): """Class to writes a CGNS file on disk""" def __init__(self): super().__init__() self.canHandleTemporal = True self.canHandleAppend = False def __str__(self): res = "CGNSWriter" return res def _get_base_zone(self, tree, baseName=None, zoneName=None): import CGNS.PAT.cgnsutils as CGU if baseName is None: basePath = CGU.getPathsByTypeSet(tree, ["CGNSBase_t"])[0] else: basePath = CGU.getPathsByNameSet(tree, [baseName])[0] baseNode = CGU.getNodeByPath(tree, basePath) if zoneName is None: zonePath = CGU.getPathsByTypeSet(baseNode, ["Zone_t"])[0] else: zonePath = CGU.getPathsByNameSet(baseNode, [zoneName])[0] zoneNode = CGU.getNodeByPath(baseNode, zonePath) return baseNode, zoneNode def _inject_ip_from_tree(self, outTree, payloadTree): import CGNS.PAT.cgnsutils as CGU outBase, outZone = self._get_base_zone(outTree) srcBase, srcZone = self._get_base_zone(payloadTree) for p in CGU.getPathsByTypeSet(srcBase, ["IntegrationRulesCollection_t"]): node = deepcopy(CGU.getNodeByPath(srcBase, p)) old = CGU.getNodeByPath(outBase, node[0]) if old is not None: CGU.nodeDelete(outBase, old) CGU.addChild(outBase, node) for p in CGU.getPathsByTypeSet(srcZone, ["FlowSolution_t", "UserDefinedData_t"]): flow = CGU.getNodeByPath(srcZone, p) gl = CGU.getPathsByTypeSet(flow, ["GridLocation_t"]) if gl: loc = CGU.getValueAsString(CGU.getNodeByPath(flow, gl[0])) else: glDesc = CGU.getNodeByPath(flow, "gridlocation") loc = CGU.getValueAsString(glDesc) if glDesc is not None else None if loc != "IntegrationPoint": continue node = deepcopy(flow) old = CGU.getNodeByPath(outZone, node[0]) if old is not None: CGU.nodeDelete(outZone, old) CGU.addChild(outZone, node) def _inject_ip_from_fields(self, outTree, ipFields: list[IPField]): from Muscat.FE.Fields.IPField import IPField if len(ipFields) == 0: return outBase, outZone = self._get_base_zone(outTree) # Build one collection from all encountered element rules idToName = {} rules = {} nextId = 1 ruleIdByName = {} for ipf in ipFields: if not isinstance(ipf, IPField): raise TypeError("IntegrationPointPayload list must contain IPField objects") if ipf.name == '': raise(RuntimeError("all IPFields must have names")) def GetCGNSRuleName(ipf, et): return f"{ipf.name}_{MuscatToCGNSNames[et]}_Rule" for ipf in ipFields: for et in ipf.data.keys(): eRule = ipf.GetRuleFor(et) ruleName = GetCGNSRuleName(ipf, et) if ruleName in ruleIdByName: continue idToName[nextId] = ruleName rules[ruleName] = { "element_type": MuscatToCGNSNames[et], "reference_space": "Parametric", "parametric_integration_points": np.asarray(eRule.points)[:,0:ElementsInfo[et].dimension], "weights": np.asarray(eRule.weights), } ruleIdByName[ruleName] = nextId nextId += 1 collectionName = "IntegrationRules" AddIntegrationRuleCollection(outBase, collectionName=collectionName, idToName=idToName, rules=rules) #need to output data only for the 3D # Build offsets and ids in global cell order nCells = ipFields[0].mesh.GetNumberOfElements(dim=ipFields[0].mesh.GetElementsDimensionality()) itgIds = np.zeros(nCells, dtype=np.int32) globalOffset = ipFields[0].mesh.ComputeGlobalOffset() from Muscat.MeshContainers.Filters.FilterObjects import ElementFilter for ipf in ipFields: bulk_filter = ElementFilter(dimensionality=ipf.mesh.GetElementsDimensionality()) offset = ipf.GetFlattenOffset(bulk_filter) vals = ipf.Flatten(bulk_filter) for selection in bulk_filter(ipFields[0].mesh): ruleName = GetCGNSRuleName(ipf, selection.elementType) itgIds[selection.GetSelectionSlice()] = ruleIdByName[ruleName] AddIntegrationPointFlowSolution( outZone, flowName=f"{ipf.name}_IntegrationPointFields", dataArrays={ipf.name: vals}, itgPointStartOffset=offset, itgRulesPath=f"/{outBase[0]}/{collectionName}", itgRulesIds=itgIds, )
[docs] def Write(self, mesh: Mesh, fileName=None, outputPyTree=None, PointFields=None, PointFieldsNames=None, CellFieldsNames=None, CellFields=None, GridFieldsNames=None, GridFields=None, updateZone=False, IntegrationPointPayload=None): """Function to write a CGNS File on disk Parameters ---------- mesh : Mesh support of the data to be written fileName : str filename of the file to be read outpuPyTree : list existing pyTree in which the data structure in mesh will be appended updateZone : bool or str, only with outputPyTree if False: data are added to a new CGNS base if True: all zones pre existing in outPyTree base are updated (replace all structured zones with one unstructured zone, or update unique zone) if zoneName: only zone with zoneName is updated (for unstructured multi zones computation) """ import CGNS.MAP as CGM # make a outputMesh = mesh.View() if PointFields is not None and PointFieldsNames is not None: outputMesh.nodeFields = {k: v for k, v in zip(PointFieldsNames, PointFields)} if CellFields is not None and CellFieldsNames is not None: outputMesh.elemFields = {k: v for k, v in zip(CellFieldsNames, CellFields)} newPyTree = MeshToCGNS(outputMesh, deepcopy(outputPyTree), updateZone=updateZone) if IntegrationPointPayload is not None: from Muscat.FE.Fields.IPField import IPField if isinstance(IntegrationPointPayload, list) and ( len(IntegrationPointPayload) == 0 or all(isinstance(x, IPField) for x in IntegrationPointPayload) ): self._inject_ip_from_fields(newPyTree, IntegrationPointPayload) elif isinstance(IntegrationPointPayload, list): # CGNS pyTree is also a python list self._inject_ip_from_tree(newPyTree, IntegrationPointPayload) else: raise TypeError("IntegrationPointPayload must be a CGNSTree (list) or a list[IPField]") if fileName is None: fileName = self.fileName CGM.save(fileName, newPyTree)
[docs] def CheckIntegrity(GUI: bool = False): try: import CGNS.MAP as CGM except ImportError: print("CGNS Python module not found, skipping test") return "skip" from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory tempdir = TemporaryDirectory.GetTempPath() import Muscat.TestData as MuscatTestData import Muscat.IO.UtReader as UR reader = UR.UtReader() reader.SetFileName(MuscatTestData.GetTestDataPath() + "UtExample/cube.ut") reader.ReadMetaData() reader.atIntegrationPoints = False myMesh = reader.Read() myMesh.nodeFields["Nodes arange"] = np.arange(myMesh.GetNumberOfNodes(), dtype=float) myMesh.elemFields["Element arange"] = np.arange(myMesh.GetNumberOfElements(), dtype=float) from Muscat.MeshContainers.Tags import Tags ################################## # EXEMPLE SYNTAXE DU WRITER # import Muscat.IO.CGNSWriter as CW # CgW = CW.CGNSWriter() CgW = CGNSWriter() CgW.Write(mesh=myMesh, fileName=tempdir + os.sep + "toto.cgns") totoC = np.random.rand(myMesh.GetNumberOfElements()) totoV = np.random.rand(myMesh.GetNumberOfNodes()) CgW.Write(mesh=myMesh, fileName=tempdir + os.sep + "toto.cgns", CellFieldsNames=["toto"], CellFields=[totoC], PointFieldsNames=["toto"], PointFields=[totoV]) ################################## # IntegrationPointPayload as list[IPField] from Muscat.FE.Fields.IPField import IPField from Muscat.FE.IntegrationRules import LagrangeP1Quadrature from Muscat.IO.CGNSReader import CGNSReader ipf = IPField(name="IPScalar", mesh=myMesh, rule=LagrangeP1Quadrature) ipf.Allocate() for _, arr in ipf.data.items(): nE, nIp = arr.shape arr[:, :] = np.arange(nE, dtype=float)[:, None] + np.linspace(0.0, 1.0, nIp, dtype=float)[None, :] withIp = tempdir + os.sep + "toto_with_ip.cgns" CgW.Write(mesh=myMesh, fileName=withIp, IntegrationPointPayload=[ipf]) rd = CGNSReader() rd.readIntegrationPointPayload = True _ = rd.Read(fileName=withIp) assert rd.integrationPointPayload is not None and len(rd.integrationPointPayload) > 0 # IntegrationPointPayload as CGNSTree sourceTree = CGM.load(withIp)[0] copiedIp = tempdir + os.sep + "toto_with_ip_copied.cgns" CgW.Write(mesh=myMesh, fileName=copiedIp, IntegrationPointPayload=sourceTree) rd2 = CGNSReader() rd2.readIntegrationPointPayload = True _ = rd2.Read(fileName=copiedIp) assert rd2.integrationPointPayload is not None and len(rd2.integrationPointPayload) > 0 print(tempdir + os.sep + "toto.cgns") print(withIp) print(copiedIp) return "ok"
RegisterWriterClass(".cgns", CGNSWriter) if __name__ == "__main__": print(CheckIntegrity(GUI=True)) # pragma: no cover