Source code for Muscat.Bridges.CGNSBridge

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
from typing import List, Optional
from collections import defaultdict
import numpy as np

from Muscat.Types import MuscatFloat, MuscatIndex
from Muscat.Containers.Mesh import Mesh, ElementsContainer
from Muscat.Containers import MeshInspectionTools as UMIT
from Muscat.Containers import MeshModificationTools as UMMT
from Muscat.Containers.Filters.FilterObjects import ElementFilter
import Muscat.Containers.ElementsDescription as ED

CGNSNameToMuscat = {}
CGNSNameToMuscat["NODE"] = ED.Point_1
# 1D
CGNSNameToMuscat["BAR_2"] = ED.Bar_2
CGNSNameToMuscat["BAR_3"] = ED.Bar_3
# 2D
CGNSNameToMuscat["TRI_3"] = ED.Triangle_3
CGNSNameToMuscat["TRI_6"] = ED.Triangle_6
CGNSNameToMuscat["QUAD_4"] = ED.Quadrangle_4
CGNSNameToMuscat["QUAD_8"] = ED.Quadrangle_8
CGNSNameToMuscat["QUAD_9"] = ED.Quadrangle_9
# 3D
CGNSNameToMuscat["TETRA_4"] = ED.Tetrahedron_4
CGNSNameToMuscat["TETRA_10"] = ED.Tetrahedron_10
CGNSNameToMuscat["PYRA_5"] = ED.Pyramid_5
CGNSNameToMuscat["PYRA_13"] = ED.Pyramid_13
CGNSNameToMuscat["PENTA_6"] = ED.Wedge_6
CGNSNameToMuscat["PENTA_15"] = ED.Wedge_15
CGNSNameToMuscat["PENTA_18"] = ED.Wedge_18
CGNSNameToMuscat["HEXA_8"] = ED.Hexahedron_8
CGNSNameToMuscat["HEXA_20"] = ED.Hexahedron_20
CGNSNameToMuscat["HEXA_27"] = ED.Hexahedron_27
# when adding a new element please add the Pertmutaion
#https://cgns.github.io/CGNS_docs_current/sids/conv.html#unstructgrid
#example  CGNSToMuscatPermutation["TETRA_4"] = [0 3 2 1]
CGNSToMuscatPermutation = {}
CGNSToMuscatPermutation["HEXA_20"] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 16, 17 ,18, 19, 12, 13, 14, 15]
CGNSToMuscatPermutation["HEXA_27"] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 16, 17 ,18, 19, 12, 13, 14, 15, 24, 22, 21, 23, 20, 25, 26]

CGNS_ElementType_l = ['Null', 'UserDefined', 'NODE', 'BAR_2', 'BAR_3', 'TRI_3', 'TRI_6', 'QUAD_4', 'QUAD_8', 'QUAD_9', 'TETRA_4', 'TETRA_10', 'PYRA_5', 'PYRA_14', 'PENTA_6', 'PENTA_15', 'PENTA_18', 'HEXA_8', 'HEXA_20', 'HEXA_27', 'MIXED', 'PYRA_13', 'NGON_n', 'NFACE_n', 'BAR_4', 'TRI_9', 'TRI_10', 'QUAD_12', 'QUAD_16', 'TETRA_16', 'TETRA_20', 'PYRA_21', 'PYRA_29', 'PYRA_30', 'PENTA_24', 'PENTA_38', 'PENTA_40', 'HEXA_32', 'HEXA_56', 'HEXA_64', 'BAR_5', 'TRI_12', 'TRI_15', 'QUAD_P4_16', 'QUAD_25', 'TETRA_22', 'TETRA_34', 'TETRA_35', 'PYRA_P4_29', 'PYRA_50', 'PYRA_55', 'PENTA_33', 'PENTA_66', 'PENTA_75', 'HEXA_44', 'HEXA_98', 'HEXA_125']

CGNSNumberToMuscat = { CGNS_ElementType_l.index(k):v for k,v in CGNSNameToMuscat.items() }

MuscatToCGNSNames = { y:x for x,y in CGNSNameToMuscat.items() }
MuscatToCGNSNumber = { y:x for x,y in CGNSNumberToMuscat.items() }


[docs]def GetCGNSNumberToMuscat(cgnsNumber): res = CGNSNumberToMuscat.get(cgnsNumber,None) if res is None: # pragma: no cover raise Exception(f"CGNC elements of number : {cgnsNumber} not coded yet") return res
def __ReadIndex(pyTree): a = __ReadIndexArray(pyTree) b = __ReadIndexRange(pyTree) return np.hstack( (a,b) ) def __ReadIndexArray(pyTree): indexArrayPaths = GetAllNodesByTypeSet(pyTree, 'IndexArray_t') res =[] for indexArrayPath in indexArrayPaths: data = GetNodeByPath(pyTree, indexArrayPath)[0] if data[1] is None: continue else: res.extend(data[1].ravel()) return np.array(res, dtype=MuscatIndex).ravel() def __ReadIndexRange(pyTree): indexRangePaths = GetAllNodesByTypeSet(pyTree, 'IndexRange_t') res =[] #if len(indexRangePaths) == 0: # return np.zeros((0), dtype = MuscatIndex) for indexRangePath in indexRangePaths: indexRange = GetNodeByPath(pyTree, indexRangePath)[0][1] begin = indexRange[0]#[:,0] end = indexRange[1]#[:,1] res.extend(np.arange(begin, end+1).ravel()) return np.array(res, dtype = MuscatIndex).ravel()
[docs]def GetRecursiveObjects(pyTree, filter, state= None): res = list() state, lres = filter(state, pyTree) res.extend(lres) for subTree in pyTree[2]: lstate, lres = GetRecursiveObjects(subTree,filter,state) res.extend(lres) return state, res
[docs]def GetAllNodesByTypeSet(pyTree, typeset): def fil(state, data): if state is None: state = "" state += "/" + data[0] if data[3] == typeset: return state, [state] else: return state, [] return GetRecursiveObjects(pyTree, fil)[1]
[docs]def GetNodeByPath(pyTree, path): def fil(state, data): if len(state)==1 and data[0] == state[0]: return state[1:], [data] else: return state[1:], [] return GetRecursiveObjects(pyTree, fil, path.strip("/").split("/"))[1]
[docs]def CGNSToMesh(pyTree, baseNames:Optional[List[str]] = None, zoneNames:Optional[List[str]] = None, FieldsAsTags:bool=False)-> Mesh: if baseNames is not None: basepaths0 = GetAllNodesByTypeSet(pyTree,"CGNSBase_t") availableBaseNames = [b.split('/')[-1] for b in basepaths0] basepaths = [] for baseName in baseNames: if baseName in availableBaseNames: basepaths.append(basepaths0[availableBaseNames.index(baseName)]) else: # pragma: no cover raise RuntimeError("baseName "+baseName+" not available in CGNS tree") else: basepaths = GetAllNodesByTypeSet(pyTree,"CGNSBase_t") res = None for basepath in basepaths: basePyTree = GetNodeByPath(pyTree, basepath)[0] if zoneNames is not None: zonepaths0 = GetAllNodesByTypeSet(basePyTree,"Zone_t") availableZoneNames = [b.split('/')[-1] for b in zonepaths0] zonepaths = [] for zoneName in zoneNames: if zoneName in availableZoneNames: zonepaths.append(zonepaths0[availableZoneNames.index(zoneName)]) else: # pragma: no cover raise("zoneName "+zoneName+" not available in base "+basepath) else: zonepaths = GetAllNodesByTypeSet(basePyTree,"Zone_t") for zonepath in zonepaths: zonePyTree = GetNodeByPath(basePyTree, zonepath)[0] gridCoordinatesPath = GetAllNodesByTypeSet(zonePyTree, "GridCoordinates_t")[0] #nodes if len(GetNodeByPath(zonePyTree,gridCoordinatesPath+'/CoordinateZ')) == 0: node_dim = 2 else: node_dim = 3 gx = GetNodeByPath(zonePyTree,gridCoordinatesPath+'/CoordinateX')[0][1] local_mesh = Mesh() local_mesh.nodes = np.empty((gx.shape[0],node_dim), dtype= MuscatFloat) local_mesh.nodes[:,0] = gx local_mesh.nodes[:,1] = GetNodeByPath(zonePyTree,gridCoordinatesPath+'/CoordinateY')[0][1] if node_dim == 3: local_mesh.nodes[:,2] = GetNodeByPath(zonePyTree,gridCoordinatesPath+'/CoordinateZ')[0][1] local_mesh.originalIDNodes = np.arange(1, local_mesh.nodes.shape[0]+1, dtype= MuscatIndex) #elements elementsPaths = GetAllNodesByTypeSet(zonePyTree, "Elements_t") for elementsPath in elementsPaths: cgnsElements = GetNodeByPath(zonePyTree,elementsPath)[0] cgnsUserName = cgnsElements[0] cgnsElemType = cgnsElements[1][0] originalIds = __ReadIndexRange(cgnsElements) if cgnsElemType == 20: # we are in mixed mode: cpt = 0 elementConnectivity = GetNodeByPath(cgnsElements,cgnsUserName+'/ElementConnectivity')[0][1] for oid in originalIds: muscatElemType = GetCGNSNumberToMuscat(elementConnectivity[cpt]) cpt +=1 nbp = ED.numberOfNodes[muscatElemType] coon = elementConnectivity[cpt:cpt+nbp] - 1 if CGNSToMuscatPermutation.get(cgnsElemType,None) is not None: coon = coon[CGNSToMuscatPermutation[cgnsElemType]] cpt += nbp elems: ElementsContainer = local_mesh.elements.GetElementsOfType(muscatElemType) elcpt = elems.AddNewElements(coon[None,:], [oid] ) if not cgnsUserName.endswith(MuscatToCGNSNames[muscatElemType]): elems.tags.CreateTag(cgnsUserName,False).AddToTag(elcpt-1) else: muscatElemType = GetCGNSNumberToMuscat(cgnsElemType) elementConnectivity = np.asarray(GetNodeByPath(cgnsElements,cgnsUserName+'/ElementConnectivity')[0][1], dtype=MuscatIndex).reshape( (-1, ED.numberOfNodes[muscatElemType] ) )-1 elems: ElementsContainer = local_mesh.elements.GetElementsOfType(muscatElemType) if CGNSToMuscatPermutation.get(cgnsElemType,None) is not None: elementConnectivity = elementConnectivity[:,CGNSToMuscatPermutation[cgnsElemType]] elcpt = elems.cpt nelcpt = elems.AddNewElements(elementConnectivity, originalIds) if not cgnsUserName.endswith(MuscatToCGNSNames[muscatElemType]): elems.tags.CreateTag(cgnsUserName,False).AddToTag(np.arange(elcpt,nelcpt)) #tags elemTags = [] nodeTags = [] ZoneBCPaths = GetAllNodesByTypeSet(zonePyTree, "ZoneBC_t") for ZoneBCPath in ZoneBCPaths: ZoneBC = GetNodeByPath(zonePyTree,ZoneBCPath)[0] BCPaths = GetAllNodesByTypeSet(ZoneBC, "BC_t") for BCPath in BCPaths: BCNode = GetNodeByPath(ZoneBC,BCPath)[0] BCName = str(BCNode[0]) indices = __ReadIndex(BCNode) if len(indices) == 0: continue gl = GetAllNodesByTypeSet(BCNode, "GridLocation_t") if "".join(np.array(GetNodeByPath(BCNode,gl[0])[0][1], dtype=str) )== "CellCenter": #local_mesh.AddElementToTagUsingOriginalId(indices-1, BCName) local_mesh.AddElementsToTag(indices-1, BCName) elemTags.append(BCName) pass if "".join(np.array(GetNodeByPath(BCNode,gl[0])[0][1], dtype=str) )== "Vertex": local_mesh.nodesTags.CreateTag(BCName).SetIds(indices-1) nodeTags.append(BCName) #fields datasPaths = GetAllNodesByTypeSet(zonePyTree, "FlowSolution_t") for dataPath in datasPaths : datas = GetNodeByPath(zonePyTree,dataPath)[0] gl = GetAllNodesByTypeSet(datas, "GridLocation_t") store = local_mesh.nodeFields if len(gl) > 0: if "".join(np.array(GetNodeByPath(datas,gl[0])[0][1], dtype=str) )== "CellCenter": store = local_mesh.elemFields if "".join(np.array(GetNodeByPath(datas,gl[0])[0][1], dtype=str) )== "Vertex": store = local_mesh.nodeFields fieldPaths = GetAllNodesByTypeSet(datas, "DataArray_t") for fieldPath in fieldPaths: fieldData = GetNodeByPath(datas,fieldPath)[0] dataName = fieldData[0] data = fieldData[1] if dataName == "OriginalIds" and store is local_mesh.nodeFields: local_mesh.originalIDNodes = np.asarray(data, dtype=MuscatIndex,order='C') elif dataName == "OriginalIds" and store is local_mesh.elemFields: local_mesh.SetElementsOriginalIDs(np.asarray(data, dtype=MuscatIndex)) elif store is local_mesh.nodeFields: if (FieldsAsTags == True) or (FieldsAsTags == False and dataName not in nodeTags): local_mesh.nodeFields[dataName] = np.asarray(data) elif store is local_mesh.elemFields: if (FieldsAsTags == True) or (FieldsAsTags == False and dataName not in elemTags): local_mesh.elemFields[dataName] = np.asarray(data) if res is None: res = local_mesh else: res.Merge(local_mesh) #UMMT.CleanDoubleElements(res) UMMT.CleanDoubleNodes(res) return res
[docs]def NewDataArray(father, name, data): res = [name, data, [], 'DataArray_t'] father[2].append(res) return res
[docs]def NewBC(father,name,data, pttype, onPoints=True,listType='PointList'): res = [name, np.array([b'F', b'a', b'm', b'i', b'l', b'y', b'S', b'p', b'e', b'c', b'i', b'f', b'i', b'e', b'd'], dtype='|S1'), [], 'BC_t'] plist = [listType, data[None,:], [], pttype] res[2].append(plist) family = ['FamilyName', np.array([b'N', b'u', b'l', b'l'], dtype='|S1'), [], 'FamilyName_t'] res[2].append(family) if onPoints: glocation = ['GridLocation', np.array([b'V', b'e', b'r', b't', b'e', b'x'], dtype='|S1'), [], 'GridLocation_t'] else: glocation = ['GridLocation', np.array([b'C', b'e', b'l', b'l', b'C', b'e', b'n', b't', b'e', b'r'], dtype='|S1'), [], 'GridLocation_t'] res[2].append(glocation) father[2].append(res) return res
[docs]def NewElements(father, data, globalOffsets): name = data.elementType nbelem = data.GetNumberOfElements() res = ["Elements_"+ MuscatToCGNSNames[name], np.array([MuscatToCGNSNumber[name], 0], dtype=np.int32), [['ElementRange', np.array((globalOffsets+1, globalOffsets+nbelem) ), [], 'IndexRange_t'], ['ElementConnectivity', (data.connectivity+1).ravel(), [], 'DataArray_t']], 'Elements_t'] father[2].append(res) return res
[docs]def NewFlowSolution(father,name,gridlocation ): res = [name, None,[['GridLocation', np.array([ c for c in gridlocation], dtype='|S1'), [], 'GridLocation_t']] , 'FlowSolution_t'] father[2].append(res) return res
[docs]def MeshToCGNS( mesh: Mesh, outputPyTree = None, OneBasePerTopoDim:bool = False, TagsAsFields:bool=False, ExportOriginalIDs:bool=True) : if outputPyTree is None: outputPyTree=['CGNSTree', None, [], 'CGNSTree_t'] version = ['CGNSLibraryVersion', np.array([3.4], dtype=np.float32), [], 'CGNSLibraryVersion_t'] outputPyTree[2].append(version) if mesh.GetPointsDimensionality() == 2: physicalDim = 2 coordNames = ["X", "Y"] elif mesh.GetPointsDimensionality() == 3: physicalDim = 3 coordNames = ["X", "Y", "Z"] else: # pragma: no cover raise RuntimeError('mesh.GetPointsDimensionality() should be 2 or 3') def ConstructZone(outputPyTree, local_mesh:Mesh, topologicalDim:int, physicalDim:int, baseName:str, zoneName:str)->None: base = [baseName, np.array([topologicalDim, physicalDim], dtype=np.int32), [], 'CGNSBase_t'] # name, physical dim, topological dim outputPyTree[2].append(base) family = ['Bulk', np.array([b'B', b'u', b'l', b'k'], dtype='|S1'), [], 'FamilyName_t'] base[2].append(family) nbElemTags = len(local_mesh.GetNamesOfElementTags()) nbNodesTags = len(local_mesh.nodesTags) totalNumberOfTags = nbElemTags +nbNodesTags totalNumberOfElem = local_mesh.GetNumberOfElements() s = np.array([[local_mesh.GetNumberOfNodes(),local_mesh.GetNumberOfElements(),totalNumberOfTags]],dtype=np.int32) grid = [zoneName, s, [['ZoneType', np.array([b'U', b'n', b's', b't', b'r', b'u', b'c', b't', b'u', b'r', b'e',b'd'], dtype='|S1'), [], 'ZoneType_t']], 'Zone_t'] base[2].append(grid) zone_family = ['FamilyName', np.array([b'B', b'u', b'l', b'k'], dtype='|S1'), [], 'FamilyName_t'] grid[2].append(zone_family) #add nodes gridCoordinates = ['GridCoordinates', None, [], 'GridCoordinates_t'] grid[2].append(gridCoordinates) for i,coord in enumerate(coordNames): da = NewDataArray(gridCoordinates, "Coordinate"+coord, np.copy(local_mesh.nodes[:,i]) ) #add elements globalOffsets = mesh.ComputeGlobalOffset() for elementType, data in local_mesh.elements.items(): nbelem = data.connectivity.shape[0] if nbelem == 0: continue NewElements(grid, data, globalOffsets[elementType]) ## add nodeFields flowSolution = NewFlowSolution(grid, "PointData", gridlocation="Vertex") if ExportOriginalIDs: NewDataArray(flowSolution, "OriginalIds", local_mesh.originalIDNodes) if len(local_mesh.nodeFields) > 0: for pointFieldName, pointData in local_mesh.nodeFields.items(): if pointData.dtype.char == "U": print(f"skipping nodeFields '{pointFieldName}' because if of type: {pointData.dtype}" ) continue NewDataArray(flowSolution, pointFieldName,pointData) ## add elemFields flowSolutionCell = NewFlowSolution(grid, "CellData", gridlocation="CellCenter") if ExportOriginalIDs: NewDataArray(flowSolutionCell, "OriginalIds", local_mesh.GetElementsOriginalIDs() ) if len(local_mesh.elemFields) > 0: for pointFieldName, cellData in local_mesh.elemFields.items(): if cellData.dtype.char == "U": if pointFieldName != 'FE Names': print(f"skipping elemFields '{pointFieldName}' because if of type: {cellData.dtype}" ) continue NewDataArray(flowSolutionCell, pointFieldName, np.copy(cellData)) #add elements tags (as field also) has_elem_select = False for pointFieldName, data in local_mesh.elements.items(): if len(data.tags.keys())>0: has_elem_select = True if has_elem_select: ezbg = ['Elements_Selections', None, [], 'ZoneBC_t'] grid[2].append(ezbg) tagListing=defaultdict(lambda : np.array([],dtype=np.int64)) for pointFieldName, data in local_mesh.elements.items(): for elTag in data.tags.keys(): ids = data.tags[elTag].GetIds() tagListing[elTag]=np.unique(np.append(tagListing[elTag],ids + globalOffsets[pointFieldName])) for elTag in tagListing: NewBC(ezbg,elTag,tagListing[elTag]+1, pttype="IndexArray_t", onPoints=False,listType="ElementList") if TagsAsFields: cellData = np.zeros(totalNumberOfElem, dtype=np.int64) cellData[tagListing[elTag]] = 1 NewDataArray(flowSolutionCell, elTag+"_as_field", cellData) #add node tags (as field also) if len(local_mesh.nodesTags): zbg = ['Points_Selections', None, [], 'ZoneBC_t'] grid[2].append(zbg) for tag in local_mesh.nodesTags: ids = tag.GetIds() NewBC(zbg,tag.name,ids+1,pttype="IndexArray_t") if TagsAsFields: pointData = np.zeros(local_mesh.GetNumberOfNodes()) pointData[ids] = 1 NewDataArray(flowSolution, tag.name+"_as_field", np.copy(pointData)) el_topo_dim={1:[],2:[],3:[]} maxTopoDim = 0 for name in mesh.elements.keys(): el_topo_dim[ED.dimensionality[name]].append(name) maxTopoDim = max(maxTopoDim, ED.dimensionality[name]) if OneBasePerTopoDim: for td in range(3): topologicalDim = td+1 if len(el_topo_dim[topologicalDim])>0: ff = ElementFilter(dimensionality=topologicalDim) local_mesh = UMIT.ExtractElementsByElementFilter(mesh, ff) ConstructZone(outputPyTree, local_mesh, topologicalDim, physicalDim, "Base_"+str(topologicalDim)+"_"+str(physicalDim), "Zone") else: #ff = F.ElementFilter(mesh, dimensionality=maxTopoDim) #local_mesh = UMIT.ExtractElementsByElementFilter(mesh, ff) ConstructZone(outputPyTree, mesh, maxTopoDim, physicalDim, "Base_"+str(maxTopoDim)+"_"+str(physicalDim), "Zone") return outputPyTree
[docs]def CheckIntegrity(GUI=False): import Muscat.TestData as MuscatTestData import Muscat.IO.GeofReader as GR res = GR.ReadGeof(fileName=MuscatTestData.GetTestDataPath() + "UtExample/cube.geof") from Muscat.Containers.MeshCreationTools import CreateCube res = CreateCube() res.VerifyIntegrity() resII = CGNSToMesh(MeshToCGNS(res)) resII.VerifyIntegrity() from Muscat.Containers.MeshTools import IsClose #IsClose(res,resII) #for the moment the element tags are not exported return 'ok'
if __name__ == '__main__': print(CheckIntegrity(True))# pragma: no cover