# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
"""CGNS file writer
"""
import os
import numpy as np
from copy import deepcopy
from Muscat.MeshContainers.Mesh import Mesh
from Muscat.Bridges.CGNSBridge import (
MeshToCGNS,
MuscatToCGNSNames,
AddIntegrationRuleCollection,
AddIntegrationPointFlowSolution,
)
from Muscat.IO.WriterBase import WriterBase as WriterBase
from Muscat.IO.IOFactory import RegisterWriterClass
from Muscat.FE.Fields.IPField import IPField
from Muscat.MeshContainers.ElementsDescription import ElementsInfo
[docs]
class CGNSWriter(WriterBase):
"""Class to writes a CGNS file on disk"""
def __init__(self):
super().__init__()
self.canHandleTemporal = True
self.canHandleAppend = False
def __str__(self):
res = "CGNSWriter"
return res
def _get_base_zone(self, tree, baseName=None, zoneName=None):
import CGNS.PAT.cgnsutils as CGU
if baseName is None:
basePath = CGU.getPathsByTypeSet(tree, ["CGNSBase_t"])[0]
else:
basePath = CGU.getPathsByNameSet(tree, [baseName])[0]
baseNode = CGU.getNodeByPath(tree, basePath)
if zoneName is None:
zonePath = CGU.getPathsByTypeSet(baseNode, ["Zone_t"])[0]
else:
zonePath = CGU.getPathsByNameSet(baseNode, [zoneName])[0]
zoneNode = CGU.getNodeByPath(baseNode, zonePath)
return baseNode, zoneNode
def _inject_ip_from_tree(self, outTree, payloadTree):
import CGNS.PAT.cgnsutils as CGU
outBase, outZone = self._get_base_zone(outTree)
srcBase, srcZone = self._get_base_zone(payloadTree)
for p in CGU.getPathsByTypeSet(srcBase, ["IntegrationRulesCollection_t"]):
node = deepcopy(CGU.getNodeByPath(srcBase, p))
old = CGU.getNodeByPath(outBase, node[0])
if old is not None:
CGU.nodeDelete(outBase, old)
CGU.addChild(outBase, node)
for p in CGU.getPathsByTypeSet(srcZone, ["FlowSolution_t", "UserDefinedData_t"]):
flow = CGU.getNodeByPath(srcZone, p)
gl = CGU.getPathsByTypeSet(flow, ["GridLocation_t"])
if gl:
loc = CGU.getValueAsString(CGU.getNodeByPath(flow, gl[0]))
else:
glDesc = CGU.getNodeByPath(flow, "gridlocation")
loc = CGU.getValueAsString(glDesc) if glDesc is not None else None
if loc != "IntegrationPoint":
continue
node = deepcopy(flow)
old = CGU.getNodeByPath(outZone, node[0])
if old is not None:
CGU.nodeDelete(outZone, old)
CGU.addChild(outZone, node)
def _inject_ip_from_fields(self, outTree, ipFields: list[IPField]):
from Muscat.FE.Fields.IPField import IPField
if len(ipFields) == 0:
return
outBase, outZone = self._get_base_zone(outTree)
# Build one collection from all encountered element rules
idToName = {}
rules = {}
nextId = 1
ruleIdByName = {}
for ipf in ipFields:
if not isinstance(ipf, IPField):
raise TypeError("IntegrationPointPayload list must contain IPField objects")
if ipf.name == '':
raise(RuntimeError("all IPFields must have names"))
def GetCGNSRuleName(ipf, et):
return f"{ipf.name}_{MuscatToCGNSNames[et]}_Rule"
for ipf in ipFields:
for et in ipf.data.keys():
eRule = ipf.GetRuleFor(et)
ruleName = GetCGNSRuleName(ipf, et)
if ruleName in ruleIdByName:
continue
idToName[nextId] = ruleName
rules[ruleName] = {
"element_type": MuscatToCGNSNames[et],
"reference_space": "Parametric",
"parametric_integration_points": np.asarray(eRule.points)[:,0:ElementsInfo[et].dimension],
"weights": np.asarray(eRule.weights),
}
ruleIdByName[ruleName] = nextId
nextId += 1
collectionName = "IntegrationRules"
AddIntegrationRuleCollection(outBase, collectionName=collectionName, idToName=idToName, rules=rules)
#need to output data only for the 3D
# Build offsets and ids in global cell order
nCells = ipFields[0].mesh.GetNumberOfElements(dim=ipFields[0].mesh.GetElementsDimensionality())
itgIds = np.zeros(nCells, dtype=np.int32)
globalOffset = ipFields[0].mesh.ComputeGlobalOffset()
from Muscat.MeshContainers.Filters.FilterObjects import ElementFilter
for ipf in ipFields:
bulk_filter = ElementFilter(dimensionality=ipf.mesh.GetElementsDimensionality())
offset = ipf.GetFlattenOffset(bulk_filter)
vals = ipf.Flatten(bulk_filter)
for selection in bulk_filter(ipFields[0].mesh):
ruleName = GetCGNSRuleName(ipf, selection.elementType)
itgIds[selection.GetSelectionSlice()] = ruleIdByName[ruleName]
AddIntegrationPointFlowSolution(
outZone,
flowName=f"{ipf.name}_IntegrationPointFields",
dataArrays={ipf.name: vals},
itgPointStartOffset=offset,
itgRulesPath=f"/{outBase[0]}/{collectionName}",
itgRulesIds=itgIds,
)
[docs]
def Write(self, mesh: Mesh, fileName=None, outputPyTree=None, PointFields=None, PointFieldsNames=None, CellFieldsNames=None, CellFields=None, GridFieldsNames=None, GridFields=None, updateZone=False, IntegrationPointPayload=None):
"""Function to write a CGNS File on disk
Parameters
----------
mesh : Mesh
support of the data to be written
fileName : str
filename of the file to be read
outpuPyTree : list
existing pyTree in which the data structure in mesh will be appended
updateZone : bool or str, only with outputPyTree
if False: data are added to a new CGNS base
if True: all zones pre existing in outPyTree base are updated (replace all structured zones with one unstructured zone, or update unique zone)
if zoneName: only zone with zoneName is updated (for unstructured multi zones computation)
"""
import CGNS.MAP as CGM
# make a
outputMesh = mesh.View()
if PointFields is not None and PointFieldsNames is not None:
outputMesh.nodeFields = {k: v for k, v in zip(PointFieldsNames, PointFields)}
if CellFields is not None and CellFieldsNames is not None:
outputMesh.elemFields = {k: v for k, v in zip(CellFieldsNames, CellFields)}
newPyTree = MeshToCGNS(outputMesh, deepcopy(outputPyTree), updateZone=updateZone)
if IntegrationPointPayload is not None:
from Muscat.FE.Fields.IPField import IPField
if isinstance(IntegrationPointPayload, list) and (
len(IntegrationPointPayload) == 0 or all(isinstance(x, IPField) for x in IntegrationPointPayload)
):
self._inject_ip_from_fields(newPyTree, IntegrationPointPayload)
elif isinstance(IntegrationPointPayload, list):
# CGNS pyTree is also a python list
self._inject_ip_from_tree(newPyTree, IntegrationPointPayload)
else:
raise TypeError("IntegrationPointPayload must be a CGNSTree (list) or a list[IPField]")
if fileName is None:
fileName = self.fileName
CGM.save(fileName, newPyTree)
[docs]
def CheckIntegrity(GUI: bool = False):
try:
import CGNS.MAP as CGM
except ImportError:
print("CGNS Python module not found, skipping test")
return "skip"
from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory
tempdir = TemporaryDirectory.GetTempPath()
import Muscat.TestData as MuscatTestData
import Muscat.IO.UtReader as UR
reader = UR.UtReader()
reader.SetFileName(MuscatTestData.GetTestDataPath() + "UtExample/cube.ut")
reader.ReadMetaData()
reader.atIntegrationPoints = False
myMesh = reader.Read()
myMesh.nodeFields["Nodes arange"] = np.arange(myMesh.GetNumberOfNodes(), dtype=float)
myMesh.elemFields["Element arange"] = np.arange(myMesh.GetNumberOfElements(), dtype=float)
from Muscat.MeshContainers.Tags import Tags
##################################
# EXEMPLE SYNTAXE DU WRITER
# import Muscat.IO.CGNSWriter as CW
# CgW = CW.CGNSWriter()
CgW = CGNSWriter()
CgW.Write(mesh=myMesh, fileName=tempdir + os.sep + "toto.cgns")
totoC = np.random.rand(myMesh.GetNumberOfElements())
totoV = np.random.rand(myMesh.GetNumberOfNodes())
CgW.Write(mesh=myMesh, fileName=tempdir + os.sep + "toto.cgns", CellFieldsNames=["toto"], CellFields=[totoC], PointFieldsNames=["toto"], PointFields=[totoV])
##################################
# IntegrationPointPayload as list[IPField]
from Muscat.FE.Fields.IPField import IPField
from Muscat.FE.IntegrationRules import LagrangeP1Quadrature
from Muscat.IO.CGNSReader import CGNSReader
ipf = IPField(name="IPScalar", mesh=myMesh, rule=LagrangeP1Quadrature)
ipf.Allocate()
for _, arr in ipf.data.items():
nE, nIp = arr.shape
arr[:, :] = np.arange(nE, dtype=float)[:, None] + np.linspace(0.0, 1.0, nIp, dtype=float)[None, :]
withIp = tempdir + os.sep + "toto_with_ip.cgns"
CgW.Write(mesh=myMesh, fileName=withIp, IntegrationPointPayload=[ipf])
rd = CGNSReader()
rd.readIntegrationPointPayload = True
_ = rd.Read(fileName=withIp)
assert rd.integrationPointPayload is not None and len(rd.integrationPointPayload) > 0
# IntegrationPointPayload as CGNSTree
sourceTree = CGM.load(withIp)[0]
copiedIp = tempdir + os.sep + "toto_with_ip_copied.cgns"
CgW.Write(mesh=myMesh, fileName=copiedIp, IntegrationPointPayload=sourceTree)
rd2 = CGNSReader()
rd2.readIntegrationPointPayload = True
_ = rd2.Read(fileName=copiedIp)
assert rd2.integrationPointPayload is not None and len(rd2.integrationPointPayload) > 0
print(tempdir + os.sep + "toto.cgns")
print(withIp)
print(copiedIp)
return "ok"
RegisterWriterClass(".cgns", CGNSWriter)
if __name__ == "__main__":
print(CheckIntegrity(GUI=True)) # pragma: no cover