# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
from typing import List, Optional
from collections import defaultdict
import numpy as np
from Muscat.Types import MuscatFloat, MuscatIndex
from Muscat.Containers.Mesh import Mesh, ElementsContainer
from Muscat.Containers import MeshInspectionTools as UMIT
from Muscat.Containers import MeshModificationTools as UMMT
from Muscat.Containers.Filters.FilterObjects import ElementFilter
import Muscat.Containers.ElementsDescription as ED
CGNSNameToMuscat = {}
CGNSNameToMuscat["NODE"] = ED.Point_1
# 1D
CGNSNameToMuscat["BAR_2"] = ED.Bar_2
CGNSNameToMuscat["BAR_3"] = ED.Bar_3
# 2D
CGNSNameToMuscat["TRI_3"] = ED.Triangle_3
CGNSNameToMuscat["TRI_6"] = ED.Triangle_6
CGNSNameToMuscat["QUAD_4"] = ED.Quadrangle_4
CGNSNameToMuscat["QUAD_8"] = ED.Quadrangle_8
CGNSNameToMuscat["QUAD_9"] = ED.Quadrangle_9
# 3D
CGNSNameToMuscat["TETRA_4"] = ED.Tetrahedron_4
CGNSNameToMuscat["TETRA_10"] = ED.Tetrahedron_10
CGNSNameToMuscat["PYRA_5"] = ED.Pyramid_5
CGNSNameToMuscat["PYRA_13"] = ED.Pyramid_13
CGNSNameToMuscat["PENTA_6"] = ED.Wedge_6
CGNSNameToMuscat["PENTA_15"] = ED.Wedge_15
CGNSNameToMuscat["PENTA_18"] = ED.Wedge_18
CGNSNameToMuscat["HEXA_8"] = ED.Hexahedron_8
CGNSNameToMuscat["HEXA_20"] = ED.Hexahedron_20
CGNSNameToMuscat["HEXA_27"] = ED.Hexahedron_27
# when adding a new element please add the Pertmutaion
#https://cgns.github.io/CGNS_docs_current/sids/conv.html#unstructgrid
#example CGNSToMuscatPermutation["TETRA_4"] = [0 3 2 1]
CGNSToMuscatPermutation = {}
CGNSToMuscatPermutation["HEXA_20"] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 16, 17 ,18, 19, 12, 13, 14, 15]
CGNSToMuscatPermutation["HEXA_27"] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 16, 17 ,18, 19, 12, 13, 14, 15, 24, 22, 21, 23, 20, 25, 26]
CGNS_ElementType_l = ['Null', 'UserDefined', 'NODE', 'BAR_2', 'BAR_3', 'TRI_3', 'TRI_6', 'QUAD_4', 'QUAD_8', 'QUAD_9', 'TETRA_4', 'TETRA_10', 'PYRA_5', 'PYRA_14', 'PENTA_6', 'PENTA_15', 'PENTA_18', 'HEXA_8', 'HEXA_20', 'HEXA_27', 'MIXED', 'PYRA_13', 'NGON_n', 'NFACE_n', 'BAR_4', 'TRI_9', 'TRI_10', 'QUAD_12', 'QUAD_16', 'TETRA_16', 'TETRA_20', 'PYRA_21', 'PYRA_29', 'PYRA_30', 'PENTA_24', 'PENTA_38', 'PENTA_40', 'HEXA_32', 'HEXA_56', 'HEXA_64', 'BAR_5', 'TRI_12', 'TRI_15', 'QUAD_P4_16', 'QUAD_25', 'TETRA_22', 'TETRA_34', 'TETRA_35', 'PYRA_P4_29', 'PYRA_50', 'PYRA_55', 'PENTA_33', 'PENTA_66', 'PENTA_75', 'HEXA_44', 'HEXA_98', 'HEXA_125']
CGNSNumberToMuscat = { CGNS_ElementType_l.index(k):v for k,v in CGNSNameToMuscat.items() }
MuscatToCGNSNames = { y:x for x,y in CGNSNameToMuscat.items() }
MuscatToCGNSNumber = { y:x for x,y in CGNSNumberToMuscat.items() }
[docs]def GetCGNSNumberToMuscat(cgnsNumber):
res = CGNSNumberToMuscat.get(cgnsNumber,None)
if res is None: # pragma: no cover
raise Exception(f"CGNC elements of number : {cgnsNumber} not coded yet")
return res
def __ReadIndex(pyTree):
a = __ReadIndexArray(pyTree)
b = __ReadIndexRange(pyTree)
return np.hstack( (a,b) )
def __ReadIndexArray(pyTree):
indexArrayPaths = GetAllNodesByTypeSet(pyTree, 'IndexArray_t')
res =[]
for indexArrayPath in indexArrayPaths:
data = GetNodeByPath(pyTree, indexArrayPath)[0]
if data[1] is None:
continue
else:
res.extend(data[1].ravel())
return np.array(res, dtype=MuscatIndex).ravel()
def __ReadIndexRange(pyTree):
indexRangePaths = GetAllNodesByTypeSet(pyTree, 'IndexRange_t')
res =[]
#if len(indexRangePaths) == 0:
# return np.zeros((0), dtype = MuscatIndex)
for indexRangePath in indexRangePaths:
indexRange = GetNodeByPath(pyTree, indexRangePath)[0][1]
begin = indexRange[0]#[:,0]
end = indexRange[1]#[:,1]
res.extend(np.arange(begin, end+1).ravel())
return np.array(res, dtype = MuscatIndex).ravel()
[docs]def GetRecursiveObjects(pyTree, filter, state= None):
res = list()
state, lres = filter(state, pyTree)
res.extend(lres)
for subTree in pyTree[2]:
lstate, lres = GetRecursiveObjects(subTree,filter,state)
res.extend(lres)
return state, res
[docs]def GetAllNodesByTypeSet(pyTree, typeset):
def fil(state, data):
if state is None:
state = ""
state += "/" + data[0]
if data[3] == typeset:
return state, [state]
else:
return state, []
return GetRecursiveObjects(pyTree, fil)[1]
[docs]def GetNodeByPath(pyTree, path):
def fil(state, data):
if len(state)==1 and data[0] == state[0]:
return state[1:], [data]
else:
return state[1:], []
return GetRecursiveObjects(pyTree, fil, path.strip("/").split("/"))[1]
[docs]def CGNSToMesh(pyTree, baseNames:Optional[List[str]] = None, zoneNames:Optional[List[str]] = None, FieldsAsTags:bool=False)-> Mesh:
if baseNames is not None:
basepaths0 = GetAllNodesByTypeSet(pyTree,"CGNSBase_t")
availableBaseNames = [b.split('/')[-1] for b in basepaths0]
basepaths = []
for baseName in baseNames:
if baseName in availableBaseNames:
basepaths.append(basepaths0[availableBaseNames.index(baseName)])
else: # pragma: no cover
raise RuntimeError("baseName "+baseName+" not available in CGNS tree")
else:
basepaths = GetAllNodesByTypeSet(pyTree,"CGNSBase_t")
res = None
for basepath in basepaths:
basePyTree = GetNodeByPath(pyTree, basepath)[0]
if zoneNames is not None:
zonepaths0 = GetAllNodesByTypeSet(basePyTree,"Zone_t")
availableZoneNames = [b.split('/')[-1] for b in zonepaths0]
zonepaths = []
for zoneName in zoneNames:
if zoneName in availableZoneNames:
zonepaths.append(zonepaths0[availableZoneNames.index(zoneName)])
else: # pragma: no cover
raise("zoneName "+zoneName+" not available in base "+basepath)
else:
zonepaths = GetAllNodesByTypeSet(basePyTree,"Zone_t")
for zonepath in zonepaths:
zonePyTree = GetNodeByPath(basePyTree, zonepath)[0]
gridCoordinatesPath = GetAllNodesByTypeSet(zonePyTree, "GridCoordinates_t")[0]
#nodes
if len(GetNodeByPath(zonePyTree,gridCoordinatesPath+'/CoordinateZ')) == 0:
node_dim = 2
else:
node_dim = 3
gx = GetNodeByPath(zonePyTree,gridCoordinatesPath+'/CoordinateX')[0][1]
local_mesh = Mesh()
local_mesh.nodes = np.empty((gx.shape[0],node_dim), dtype= MuscatFloat)
local_mesh.nodes[:,0] = gx
local_mesh.nodes[:,1] = GetNodeByPath(zonePyTree,gridCoordinatesPath+'/CoordinateY')[0][1]
if node_dim == 3:
local_mesh.nodes[:,2] = GetNodeByPath(zonePyTree,gridCoordinatesPath+'/CoordinateZ')[0][1]
local_mesh.originalIDNodes = np.arange(1, local_mesh.nodes.shape[0]+1, dtype= MuscatIndex)
#elements
elementsPaths = GetAllNodesByTypeSet(zonePyTree, "Elements_t")
for elementsPath in elementsPaths:
cgnsElements = GetNodeByPath(zonePyTree,elementsPath)[0]
cgnsUserName = cgnsElements[0]
cgnsElemType = cgnsElements[1][0]
originalIds = __ReadIndexRange(cgnsElements)
if cgnsElemType == 20: # we are in mixed mode:
cpt = 0
elementConnectivity = GetNodeByPath(cgnsElements,cgnsUserName+'/ElementConnectivity')[0][1]
for oid in originalIds:
muscatElemType = GetCGNSNumberToMuscat(elementConnectivity[cpt])
cpt +=1
nbp = ED.numberOfNodes[muscatElemType]
coon = elementConnectivity[cpt:cpt+nbp] - 1
if CGNSToMuscatPermutation.get(cgnsElemType,None) is not None:
coon = coon[CGNSToMuscatPermutation[cgnsElemType]]
cpt += nbp
elems: ElementsContainer = local_mesh.elements.GetElementsOfType(muscatElemType)
elcpt = elems.AddNewElements(coon[None,:], [oid] )
if not cgnsUserName.endswith(MuscatToCGNSNames[muscatElemType]):
elems.tags.CreateTag(cgnsUserName,False).AddToTag(elcpt-1)
else:
muscatElemType = GetCGNSNumberToMuscat(cgnsElemType)
elementConnectivity = np.asarray(GetNodeByPath(cgnsElements,cgnsUserName+'/ElementConnectivity')[0][1], dtype=MuscatIndex).reshape( (-1, ED.numberOfNodes[muscatElemType] ) )-1
elems: ElementsContainer = local_mesh.elements.GetElementsOfType(muscatElemType)
if CGNSToMuscatPermutation.get(cgnsElemType,None) is not None:
elementConnectivity = elementConnectivity[:,CGNSToMuscatPermutation[cgnsElemType]]
elcpt = elems.cpt
nelcpt = elems.AddNewElements(elementConnectivity, originalIds)
if not cgnsUserName.endswith(MuscatToCGNSNames[muscatElemType]):
elems.tags.CreateTag(cgnsUserName,False).AddToTag(np.arange(elcpt,nelcpt))
#tags
elemTags = []
nodeTags = []
ZoneBCPaths = GetAllNodesByTypeSet(zonePyTree, "ZoneBC_t")
for ZoneBCPath in ZoneBCPaths:
ZoneBC = GetNodeByPath(zonePyTree,ZoneBCPath)[0]
BCPaths = GetAllNodesByTypeSet(ZoneBC, "BC_t")
for BCPath in BCPaths:
BCNode = GetNodeByPath(ZoneBC,BCPath)[0]
BCName = str(BCNode[0])
indices = __ReadIndex(BCNode)
if len(indices) == 0:
continue
gl = GetAllNodesByTypeSet(BCNode, "GridLocation_t")
if "".join(np.array(GetNodeByPath(BCNode,gl[0])[0][1], dtype=str) )== "CellCenter":
#local_mesh.AddElementToTagUsingOriginalId(indices-1, BCName)
local_mesh.AddElementsToTag(indices-1, BCName)
elemTags.append(BCName)
pass
if "".join(np.array(GetNodeByPath(BCNode,gl[0])[0][1], dtype=str) )== "Vertex":
local_mesh.nodesTags.CreateTag(BCName).SetIds(indices-1)
nodeTags.append(BCName)
#fields
datasPaths = GetAllNodesByTypeSet(zonePyTree, "FlowSolution_t")
for dataPath in datasPaths :
datas = GetNodeByPath(zonePyTree,dataPath)[0]
gl = GetAllNodesByTypeSet(datas, "GridLocation_t")
store = local_mesh.nodeFields
if len(gl) > 0:
if "".join(np.array(GetNodeByPath(datas,gl[0])[0][1], dtype=str) )== "CellCenter":
store = local_mesh.elemFields
if "".join(np.array(GetNodeByPath(datas,gl[0])[0][1], dtype=str) )== "Vertex":
store = local_mesh.nodeFields
fieldPaths = GetAllNodesByTypeSet(datas, "DataArray_t")
for fieldPath in fieldPaths:
fieldData = GetNodeByPath(datas,fieldPath)[0]
dataName = fieldData[0]
data = fieldData[1]
if dataName == "OriginalIds" and store is local_mesh.nodeFields:
local_mesh.originalIDNodes = np.asarray(data, dtype=MuscatIndex,order='C')
elif dataName == "OriginalIds" and store is local_mesh.elemFields:
local_mesh.SetElementsOriginalIDs(np.asarray(data, dtype=MuscatIndex))
elif store is local_mesh.nodeFields:
if (FieldsAsTags == True) or (FieldsAsTags == False and dataName not in nodeTags):
local_mesh.nodeFields[dataName] = np.asarray(data)
elif store is local_mesh.elemFields:
if (FieldsAsTags == True) or (FieldsAsTags == False and dataName not in elemTags):
local_mesh.elemFields[dataName] = np.asarray(data)
if res is None:
res = local_mesh
else:
res.Merge(local_mesh)
#UMMT.CleanDoubleElements(res)
UMMT.CleanDoubleNodes(res)
return res
[docs]def NewDataArray(father, name, data):
res = [name, data, [], 'DataArray_t']
father[2].append(res)
return res
[docs]def NewBC(father,name,data, pttype, onPoints=True,listType='PointList'):
res = [name, np.array([b'F', b'a', b'm', b'i', b'l', b'y', b'S', b'p', b'e', b'c', b'i', b'f', b'i', b'e', b'd'], dtype='|S1'), [], 'BC_t']
plist = [listType, data[None,:], [], pttype]
res[2].append(plist)
family = ['FamilyName', np.array([b'N', b'u', b'l', b'l'], dtype='|S1'), [], 'FamilyName_t']
res[2].append(family)
if onPoints:
glocation = ['GridLocation', np.array([b'V', b'e', b'r', b't', b'e', b'x'], dtype='|S1'), [], 'GridLocation_t']
else:
glocation = ['GridLocation', np.array([b'C', b'e', b'l', b'l', b'C', b'e', b'n', b't', b'e', b'r'], dtype='|S1'), [], 'GridLocation_t']
res[2].append(glocation)
father[2].append(res)
return res
[docs]def NewElements(father, data, globalOffsets):
name = data.elementType
nbelem = data.GetNumberOfElements()
res = ["Elements_"+ MuscatToCGNSNames[name], np.array([MuscatToCGNSNumber[name], 0], dtype=np.int32), [['ElementRange', np.array((globalOffsets+1, globalOffsets+nbelem) ), [], 'IndexRange_t'], ['ElementConnectivity', (data.connectivity+1).ravel(), [], 'DataArray_t']], 'Elements_t']
father[2].append(res)
return res
[docs]def NewFlowSolution(father,name,gridlocation ):
res = [name, None,[['GridLocation', np.array([ c for c in gridlocation], dtype='|S1'), [], 'GridLocation_t']] , 'FlowSolution_t']
father[2].append(res)
return res
[docs]def MeshToCGNS( mesh: Mesh, outputPyTree = None, OneBasePerTopoDim:bool = False, TagsAsFields:bool=False, ExportOriginalIDs:bool=True) :
if outputPyTree is None:
outputPyTree=['CGNSTree', None, [], 'CGNSTree_t']
version = ['CGNSLibraryVersion', np.array([3.4], dtype=np.float32), [], 'CGNSLibraryVersion_t']
outputPyTree[2].append(version)
if mesh.GetPointsDimensionality() == 2:
physicalDim = 2
coordNames = ["X", "Y"]
elif mesh.GetPointsDimensionality() == 3:
physicalDim = 3
coordNames = ["X", "Y", "Z"]
else: # pragma: no cover
raise RuntimeError('mesh.GetPointsDimensionality() should be 2 or 3')
def ConstructZone(outputPyTree, local_mesh:Mesh, topologicalDim:int, physicalDim:int, baseName:str, zoneName:str)->None:
base = [baseName, np.array([topologicalDim, physicalDim], dtype=np.int32), [], 'CGNSBase_t'] # name, physical dim, topological dim
outputPyTree[2].append(base)
family = ['Bulk', np.array([b'B', b'u', b'l', b'k'], dtype='|S1'), [], 'FamilyName_t']
base[2].append(family)
nbElemTags = len(local_mesh.GetNamesOfElementTags())
nbNodesTags = len(local_mesh.nodesTags)
totalNumberOfTags = nbElemTags +nbNodesTags
totalNumberOfElem = local_mesh.GetNumberOfElements()
s = np.array([[local_mesh.GetNumberOfNodes(),local_mesh.GetNumberOfElements(),totalNumberOfTags]],dtype=np.int32)
grid = [zoneName, s, [['ZoneType', np.array([b'U', b'n', b's', b't', b'r', b'u', b'c', b't', b'u', b'r', b'e',b'd'], dtype='|S1'), [], 'ZoneType_t']], 'Zone_t']
base[2].append(grid)
zone_family = ['FamilyName', np.array([b'B', b'u', b'l', b'k'], dtype='|S1'), [], 'FamilyName_t']
grid[2].append(zone_family)
#add nodes
gridCoordinates = ['GridCoordinates', None, [], 'GridCoordinates_t']
grid[2].append(gridCoordinates)
for i,coord in enumerate(coordNames):
da = NewDataArray(gridCoordinates, "Coordinate"+coord, np.copy(local_mesh.nodes[:,i]) )
#add elements
globalOffsets = mesh.ComputeGlobalOffset()
for elementType, data in local_mesh.elements.items():
nbelem = data.connectivity.shape[0]
if nbelem == 0:
continue
NewElements(grid, data, globalOffsets[elementType])
## add nodeFields
flowSolution = NewFlowSolution(grid, "PointData", gridlocation="Vertex")
if ExportOriginalIDs:
NewDataArray(flowSolution, "OriginalIds", local_mesh.originalIDNodes)
if len(local_mesh.nodeFields) > 0:
for pointFieldName, pointData in local_mesh.nodeFields.items():
if pointData.dtype.char == "U":
print(f"skipping nodeFields '{pointFieldName}' because if of type: {pointData.dtype}" )
continue
NewDataArray(flowSolution, pointFieldName,pointData)
## add elemFields
flowSolutionCell = NewFlowSolution(grid, "CellData", gridlocation="CellCenter")
if ExportOriginalIDs:
NewDataArray(flowSolutionCell, "OriginalIds", local_mesh.GetElementsOriginalIDs() )
if len(local_mesh.elemFields) > 0:
for pointFieldName, cellData in local_mesh.elemFields.items():
if cellData.dtype.char == "U":
if pointFieldName != 'FE Names':
print(f"skipping elemFields '{pointFieldName}' because if of type: {cellData.dtype}" )
continue
NewDataArray(flowSolutionCell, pointFieldName, np.copy(cellData))
#add elements tags (as field also)
has_elem_select = False
for pointFieldName, data in local_mesh.elements.items():
if len(data.tags.keys())>0:
has_elem_select = True
if has_elem_select:
ezbg = ['Elements_Selections', None, [], 'ZoneBC_t']
grid[2].append(ezbg)
tagListing=defaultdict(lambda : np.array([],dtype=np.int64))
for pointFieldName, data in local_mesh.elements.items():
for elTag in data.tags.keys():
ids = data.tags[elTag].GetIds()
tagListing[elTag]=np.unique(np.append(tagListing[elTag],ids + globalOffsets[pointFieldName]))
for elTag in tagListing:
NewBC(ezbg,elTag,tagListing[elTag]+1, pttype="IndexArray_t", onPoints=False,listType="ElementList")
if TagsAsFields:
cellData = np.zeros(totalNumberOfElem, dtype=np.int64)
cellData[tagListing[elTag]] = 1
NewDataArray(flowSolutionCell, elTag+"_as_field", cellData)
#add node tags (as field also)
if len(local_mesh.nodesTags):
zbg = ['Points_Selections', None, [], 'ZoneBC_t']
grid[2].append(zbg)
for tag in local_mesh.nodesTags:
ids = tag.GetIds()
NewBC(zbg,tag.name,ids+1,pttype="IndexArray_t")
if TagsAsFields:
pointData = np.zeros(local_mesh.GetNumberOfNodes())
pointData[ids] = 1
NewDataArray(flowSolution, tag.name+"_as_field", np.copy(pointData))
el_topo_dim={1:[],2:[],3:[]}
maxTopoDim = 0
for name in mesh.elements.keys():
el_topo_dim[ED.dimensionality[name]].append(name)
maxTopoDim = max(maxTopoDim, ED.dimensionality[name])
if OneBasePerTopoDim:
for td in range(3):
topologicalDim = td+1
if len(el_topo_dim[topologicalDim])>0:
ff = ElementFilter(dimensionality=topologicalDim)
local_mesh = UMIT.ExtractElementsByElementFilter(mesh, ff)
ConstructZone(outputPyTree, local_mesh, topologicalDim, physicalDim, "Base_"+str(topologicalDim)+"_"+str(physicalDim), "Zone")
else:
#ff = F.ElementFilter(mesh, dimensionality=maxTopoDim)
#local_mesh = UMIT.ExtractElementsByElementFilter(mesh, ff)
ConstructZone(outputPyTree, mesh, maxTopoDim, physicalDim, "Base_"+str(maxTopoDim)+"_"+str(physicalDim), "Zone")
return outputPyTree
[docs]def CheckIntegrity(GUI=False):
import Muscat.TestData as MuscatTestData
import Muscat.IO.GeofReader as GR
res = GR.ReadGeof(fileName=MuscatTestData.GetTestDataPath() + "UtExample/cube.geof")
from Muscat.Containers.MeshCreationTools import CreateCube
res = CreateCube()
res.VerifyIntegrity()
resII = CGNSToMesh(MeshToCGNS(res))
resII.VerifyIntegrity()
from Muscat.Containers.MeshTools import IsClose
#IsClose(res,resII)
#for the moment the element tags are not exported
return 'ok'
if __name__ == '__main__':
print(CheckIntegrity(True))# pragma: no cover