# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
from typing import Any, Dict, List, Optional, Union
from numpy.typing import NDArray
import numpy as np
try:
import CGNS.MAP as CGM
import CGNS.PAT.cgnsutils as CGU
import CGNS.PAT.cgnslib as CGL
import CGNS.PAT.cgnskeywords as CGK
CGK.IntegrationPoint_s ="IntegrationPoint"
if CGK.GridLocation_l[-1] != CGK.IntegrationPoint_s:
CGK.GridLocation_l.append(CGK.IntegrationPoint_s)
except ImportError:
from Muscat.Helpers.Logger import Warning
Warning("CGNS library not found. Please install CGNS python package")
from Muscat.Types import MuscatFloat, MuscatIndex
from Muscat.MeshContainers.Mesh import Mesh, ElementsContainer
from Muscat.MeshContainers.PartitionedMesh import PartitionedMesh, GlobalIdsFieldName
from Muscat.MeshTools import MeshModificationTools as MMT
from Muscat.MeshContainers.Filters.FilterObjects import ElementFilter
import Muscat.MeshContainers.ElementsDescription as ED
from Muscat.MeshContainers.ElementsContainers import StructuredElementsContainer
import itertools
CGNSNameToMuscat = {}
CGNSNameToMuscat["NODE"] = ED.Point_1
# 1D
CGNSNameToMuscat["BAR_2"] = ED.Bar_2
CGNSNameToMuscat["BAR_3"] = ED.Bar_3
# 2D
CGNSNameToMuscat["TRI_3"] = ED.Triangle_3
CGNSNameToMuscat["TRI_6"] = ED.Triangle_6
CGNSNameToMuscat["QUAD_4"] = ED.Quadrangle_4
CGNSNameToMuscat["QUAD_8"] = ED.Quadrangle_8
CGNSNameToMuscat["QUAD_9"] = ED.Quadrangle_9
# 3D
CGNSNameToMuscat["TETRA_4"] = ED.Tetrahedron_4
CGNSNameToMuscat["TETRA_10"] = ED.Tetrahedron_10
CGNSNameToMuscat["PYRA_5"] = ED.Pyramid_5
CGNSNameToMuscat["PYRA_13"] = ED.Pyramid_13
CGNSNameToMuscat["PENTA_6"] = ED.Wedge_6
CGNSNameToMuscat["PENTA_15"] = ED.Wedge_15
CGNSNameToMuscat["PENTA_18"] = ED.Wedge_18
CGNSNameToMuscat["HEXA_8"] = ED.Hexahedron_8
CGNSNameToMuscat["HEXA_20"] = ED.Hexahedron_20
CGNSNameToMuscat["HEXA_27"] = ED.Hexahedron_27
# when adding a new element please add the Pertmutaion
# https://cgns.github.io/CGNS_docs_current/sids/conv.html#unstructgrid
# example CGNSToMuscatPermutation["TETRA_4"] = [0 3 2 1]
CGNSToMuscatPermutation = {}
CGNSToMuscatPermutation["PENTA_15"] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 13, 14, 9, 10, 11]
CGNSToMuscatPermutation["PENTA_18"] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 13, 14, 9, 10, 11, 15, 16, 17]
CGNSToMuscatPermutation["HEXA_20"] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 16, 17, 18, 19, 12, 13, 14, 15]
CGNSToMuscatPermutation["HEXA_27"] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 16, 17, 18, 19, 12, 13, 14, 15, 24, 22, 21, 23, 20, 25, 26]
CGNS_ElementType_l = ['Null', 'UserDefined', 'NODE', 'BAR_2', 'BAR_3', 'TRI_3', 'TRI_6', 'QUAD_4', 'QUAD_8', 'QUAD_9', 'TETRA_4', 'TETRA_10', 'PYRA_5', 'PYRA_14', 'PENTA_6', 'PENTA_15', 'PENTA_18', 'HEXA_8', 'HEXA_20', 'HEXA_27', 'MIXED', 'PYRA_13', 'NGON_n', 'NFACE_n', 'BAR_4', 'TRI_9', 'TRI_10', 'QUAD_12', 'QUAD_16', 'TETRA_16', 'TETRA_20', 'PYRA_21', 'PYRA_29', 'PYRA_30', 'PENTA_24', 'PENTA_38', 'PENTA_40', 'HEXA_32', 'HEXA_56', 'HEXA_64', 'BAR_5', 'TRI_12', 'TRI_15', 'QUAD_P4_16', 'QUAD_25', 'TETRA_22', 'TETRA_34', 'TETRA_35', 'PYRA_P4_29', 'PYRA_50', 'PYRA_55', 'PENTA_33', 'PENTA_66', 'PENTA_75', 'HEXA_44', 'HEXA_98', 'HEXA_125']
CGNSNumberToMuscat = {CGNS_ElementType_l.index(k): v for k, v in CGNSNameToMuscat.items()}
MuscatToCGNSNames = {y: x for x, y in CGNSNameToMuscat.items()}
MuscatToCGNSNumber = {y: x for x, y in CGNSNumberToMuscat.items()}
CGNS_MAX_NAME_LENGTH = 32
#use the proposal to write integration point data, if false we use a UserDefined
USE_CGNS_PROPOSAL_CPEX0047 = False
def _string_to_cgns_array(value: str) -> np.ndarray:
"""Convert python string to CGNS C1-like array."""
if value is None:
value = ""
return np.frombuffer(value.encode("ascii"), dtype="|S1")
def _strings_to_fixed_cgns_matrix(values: List[str], width: int = CGNS_MAX_NAME_LENGTH) -> np.ndarray:
"""Convert a list of strings to a fixed-width CGNS C1 2D matrix (width, n)."""
res = np.zeros((width, len(values)), dtype="|S1", order="F")
for j, name in enumerate(values):
_check_cgns_name(name, context="MapName")
encoded = name.encode("ascii")[:width]
if len(encoded):
res[:len(encoded), j] = np.frombuffer(encoded, dtype="|S1")
return res
def _resolve_abs_path(pyTree: list, absPath: str):
"""Resolve a CGNS absolute path like '/Base/Zone/Node'."""
if absPath is None:
return None
return CGU.getNodeByPath(pyTree, absPath.lstrip("/"))
def _get_or_create_child(parent: list, name: str, label: str, value: Any = None):
node = CGU.getNodeByPath(parent, name)
if node is None:
node = CGU.newNode(name, value, [], label, parent)
return node
def _new_element_association(parent: list, path: str, ids: np.ndarray, nodeName: str = "ItgRules"):
assoc = CGU.newNode(nodeName, None, [], "ElementAssociation_t", parent)
CGU.newNode("Path", _string_to_cgns_array(path), [], "DataArray_t", assoc)
CGU.newNode("Ids", np.asarray(ids, dtype=np.int32), [], "DataArray_t", assoc)
return assoc
def _read_element_association(node: Optional[list]) -> Optional[Dict[str, Any]]:
if node is None:
return None
pathNode = CGU.getNodeByPath(node, "Path")
idsNode = CGU.getNodeByPath(node, "Ids")
path = CGU.getValueAsString(pathNode) if pathNode is not None else None
ids = np.asarray(idsNode[1], dtype=np.int32).ravel() if idsNode is not None and idsNode[1] is not None else None
return {"path": path, "ids": ids}
[docs]
def AddIntegrationRuleCollection(baseNode: list, collectionName: str, idToName: Dict[int, str], rules: Dict[str, Dict[str, Any]]) -> list:
"""Create (or replace) an IntegrationRulesCollection_t under a CGNS base."""
_check_cgns_name(collectionName, context="IntegrationRulesCollection")
old = CGU.getNodeByPath(baseNode, collectionName)
if old is not None:
CGU.nodeDelete(baseNode, old)
collection = CGU.newNode(collectionName, None, [], "IntegrationRulesCollection_t", baseNode)
# IdToQualifier (MapName_t)
ids = np.asarray(list(idToName.keys()), dtype=np.int32)
names = list(idToName.values())
mapNode = CGU.newNode("IdToQualifier", ids, [], "MapName_t", collection)
CGU.newNode("Names", _strings_to_fixed_cgns_matrix(names), [], "DataArray_t", mapNode)
for ruleName, rule in rules.items():
_check_cgns_name(ruleName, context="IntegrationRule")
rnode = CGU.newNode(ruleName, None, [], "IntegrationRule_t", collection)
CGU.newNode("IntegrationName", _string_to_cgns_array(ruleName), [], "ElementType_t", rnode)
CGU.newNode("ElementType", _string_to_cgns_array(rule.get("element_type", "TRI_3")), [], "ElementType_t", rnode)
CGU.newNode("ReferenceSpace", _string_to_cgns_array(rule.get("reference_space", "Parametric")), [], "ElementSpace_t", rnode)
points = np.asarray(rule["parametric_integration_points"], dtype=MuscatFloat)
if points.ndim != 2:
raise ValueError("parametric_integration_points must be a 2D array")
weights = np.asarray(rule["weights"], dtype=MuscatFloat).ravel()
if weights.size != points.shape[0]:
raise ValueError("weights length must be equal to number of integration points")
CGU.newNode("NumberOfIntegrationPoint", np.array([points.shape[0]], dtype=np.int32), [], "DataArray_t", rnode)
CGU.newNode("ParametricDim", np.array([points.shape[1]], dtype=np.int32), [], "DataArray_t", rnode)
if "integration_name" in rule:
CGU.newNode("IntegrationName", _string_to_cgns_array(rule["integration_name"]), [], "DataArray_t", rnode)
CGU.newNode("ParametricIntegrationPoint", points, [], "DataArray_t", rnode)
CGU.newNode("Weights", weights, [], "DataArray_t", rnode)
return collection
[docs]
def AddIntegrationPointFlowSolution(zoneNode: list,
flowName: str,
dataArrays: Dict[str, np.ndarray],
itgPointStartOffset: np.ndarray,
itgRulesPath: str,
itgRulesIds: np.ndarray,
elementsFallbackAssociation: Optional[Dict[str, Any]] = None) -> list:
"""Create one FlowSolution_t with GridLocation=IntegrationPoint and associated metadata."""
_check_cgns_name(flowName, context="FlowSolution")
old = CGU.getNodeByPath(zoneNode, flowName)
if old is not None:
CGU.nodeDelete(zoneNode, old)
# for the moment we use userDefinedData to store IntegrationPoint data
# because pycng does not implement IntegrationPoint as a gridlocation
if USE_CGNS_PROPOSAL_CPEX0047:
flow = CGL.newFlowSolution(None, name=flowName, gridlocation="IntegrationPoint")
else:
flow = CGL.newUserDefinedData(None, name=flowName)
CGL.newDescriptor(flow, 'gridlocation', 'IntegrationPoint')
_new_element_association(flow, itgRulesPath, np.asarray(itgRulesIds, dtype=np.int32), nodeName="ItgRules")
CGU.newNode("ItgPointStartOffset", np.asarray(itgPointStartOffset, dtype=np.int32), [], "Offset_t", flow)
for fieldName, fieldData in dataArrays.items():
_check_cgns_name(fieldName, context="DataArray")
CGL.newDataArray(flow, fieldName, value=np.asarray(fieldData))
CGU.setAsChild(zoneNode, flow)
if elementsFallbackAssociation is not None:
elemName = elementsFallbackAssociation["elements_name"]
elemNode = CGU.getNodeByPath(zoneNode, elemName)
if elemNode is not None:
_new_element_association(
elemNode,
elementsFallbackAssociation["path"],
np.asarray(elementsFallbackAssociation["ids"], dtype=np.int32),
nodeName="ItgRules",
)
return flow
[docs]
def ReadIntegrationPointPayload(pyTree: list,
baseNames: Optional[List[str]] = None,
zoneNames: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""Read raw IntegrationPoint payload from CGNS FlowSolution_t nodes."""
out: List[Dict[str, Any]] = []
def _get_integration_point_location(node: list) -> Optional[str]:
"""Return grid location for a flow-like node supporting legacy and UDD payloads."""
glPath = CGU.getPathsByTypeSet(node, ["GridLocation_t"])
if glPath:
return CGU.getValueAsString(CGU.getNodeByPath(node, glPath[0]))
# New payload format stores location as a descriptor on UserDefinedData_t
# (e.g. CGL.newDescriptor(flow, 'gridlocation', 'IntegrationPoint')).
descNode = CGU.getNodeByPath(node, "gridlocation")
if descNode is not None:
return CGU.getValueAsString(descNode)
return None
if baseNames is not None:
basePaths = CGU.getPathsByNameSet(pyTree, baseNames)
else:
basePaths = CGU.getPathsByTypeSet(pyTree, ["CGNSBase_t"])
for basePath in basePaths:
baseNode = CGU.getNodeByPath(pyTree, basePath)
if zoneNames is not None:
zonePaths = CGU.getPathsByNameSet(baseNode, zoneNames)
else:
zonePaths = CGU.getPathsByTypeSet(baseNode, ["Zone_t"])
for zonePath in zonePaths:
zoneNode = CGU.getNodeByPath(baseNode, zonePath)
flowPaths = CGU.getPathsByTypeSet(zoneNode, ["FlowSolution_t", "UserDefinedData_t"])
for flowPath in flowPaths:
flowNode = CGU.getNodeByPath(zoneNode, flowPath)
location = _get_integration_point_location(flowNode)
if location != "IntegrationPoint":
continue
offsetNode = CGU.getNodeByPath(flowNode, "ItgPointStartOffset")
if offsetNode is None or offsetNode[1] is None:
continue
offsets = np.asarray(offsetNode[1], dtype=np.int32).ravel()
assoc = _read_element_association(CGU.getNodeByPath(flowNode, "ItgRules"))
if assoc is None:
elemPaths = CGU.getPathsByTypeSet(zoneNode, ["Elements_t"])
for ep in elemPaths:
elem = CGU.getNodeByPath(zoneNode, ep)
assoc = _read_element_association(CGU.getNodeByPath(elem, "ItgRules"))
if assoc is not None:
break
dataArrays = {}
for fp in CGU.getPathsByTypeSet(flowNode, ["DataArray_t"]):
n = CGU.getNodeByPath(flowNode, fp)
if n[0] in ["Path", "Ids", "IntegrationName"]:
continue
if n[1] is None:
continue
dataArrays[n[0]] = np.asarray(n[1])
resolvedRules = {}
if assoc is not None and assoc["path"]:
collection = _resolve_abs_path(pyTree, assoc["path"])
if collection is not None:
mapNode = CGU.getNodeByPath(collection, "IdToQualifier")
idMap: Dict[int, str] = {}
if mapNode is not None:
ids = np.asarray(mapNode[1], dtype=np.int32).ravel() if mapNode[1] is not None else np.array([], dtype=np.int32)
namesNode = CGU.getNodeByPath(mapNode, "Names")
if namesNode is not None and namesNode[1] is not None:
namesRaw = np.asarray(namesNode[1])
for j, rid in enumerate(ids):
idMap[int(rid)] = b"".join(namesRaw[:, j].tolist()).decode("ascii", errors="ignore").strip("\x00 ")
for rid in np.unique(assoc["ids"] if assoc["ids"] is not None else np.array([], dtype=np.int32)):
ruleName = idMap.get(int(rid))
if not ruleName:
continue
ruleNode = CGU.getNodeByPath(collection, ruleName)
if ruleNode is None:
continue
etNode = CGU.getNodeByPath(ruleNode, "ElementType")
rsNode = CGU.getNodeByPath(ruleNode, "ReferenceSpace")
pNode = CGU.getNodeByPath(ruleNode, "ParametricIntegrationPoint")
wNode = CGU.getNodeByPath(ruleNode, "Weights")
resolvedRules[int(rid)] = {
"name": ruleName,
"element_type": CGU.getValueAsString(etNode) if etNode is not None else None,
"reference_space": CGU.getValueAsString(rsNode) if rsNode is not None else None,
"parametric_integration_points": np.asarray(pNode[1]) if pNode is not None and pNode[1] is not None else None,
"weights": np.asarray(wNode[1]) if wNode is not None and wNode[1] is not None else None,
}
out.append(
{
"base": baseNode[0],
"zone": zoneNode[0],
"flow_solution": flowNode[0],
"itg_point_start_offset": offsets,
"itg_rules": assoc,
"data_arrays": dataArrays,
"resolved_rules": resolvedRules,
}
)
return out
CGNSNumberToMuscatPermutation = {
k: CGNSToMuscatPermutation[name]
for k in MuscatToCGNSNumber.values()
if (name := MuscatToCGNSNames[CGNSNumberToMuscat[k]]) in CGNSToMuscatPermutation
}
MuscatElementTypeToCGNS = {
k: np.argsort(CGNSToMuscatPermutation[name])
for k in CGNSNameToMuscat.values()
if (name := MuscatToCGNSNames[k]) in CGNSToMuscatPermutation
}
def _check_cgns_name(name: str, context: str = "CGNS node") -> None:
"""Validate a CGNS node name according to CGNS naming convention.
Args:
name (str): name to validate
context (str, optional): extra context for error message. Defaults to "CGNS node".
Raises:
ValueError: if name is not a string or if its length is greater than 32 chars
Returns:
str: validated name
"""
if not isinstance(name, str):
raise ValueError(f"{context} name must be a string, got {type(name).__name__}")
if len(name) > CGNS_MAX_NAME_LENGTH:
raise ValueError(
f"{context} name '{name}' has length {len(name)} which exceeds CGNS limit "
f"({CGNS_MAX_NAME_LENGTH})"
)
[docs]
def GetCGNSNumberToMuscat(cgnsNumber: int):
"""Get Muscat ElementType from CGNS element type number
Args:
cgnsNumber (int): CGNS element type number
Raises:
Exception: if there is no equivalent to CGNS element type in Muscat
Returns:
res (ElementType): Corresponding elment type in Muscat
"""
res = CGNSNumberToMuscat.get(cgnsNumber, None)
if res is None: # pragma: no cover
raise Exception(f"CGNC elements of number : {cgnsNumber} not coded yet")
return res
def __ReadIndex(pyTree: list, dim: List[int]):
"""Read Index Array or Index Range from CGNS
Args:
pyTree (list): CGNS node which has a child Index to read
dim (list): dimensions of the coordinates
Returns:
indices in Muscat
"""
a = __ReadIndexArray(pyTree)
b = __ReadIndexRange(pyTree, dim)
return np.hstack((a, b))
def __ReadIndexArray(pyTree: list):
"""Read Index Array from CGNS
Args:
pyTree (list): CGNS node which has a child of type IndexArray_t to read
Returns:
indices in Muscat
"""
indexArrayPaths = CGU.getPathsByTypeSet(pyTree, ['IndexArray_t'])
res = []
for indexArrayPath in indexArrayPaths:
data = CGU.getNodeByPath(pyTree, indexArrayPath)
if data[1] is None:
continue
else:
res.extend(data[1].ravel() - 1) # CGNS ids start at 1
return np.array(res, dtype=MuscatIndex).ravel()
def __ReadIndexRange(pyTree: list, dim: List[int]):
"""Read Index Range from CGNS
Args:
pyTree (list): CGNS node which has a child of type IndexRange_t to read
dim (List[str]): dimensions of the coordinates
Returns:
indices in Muscat
"""
indexRangePaths = CGU.getPathsByTypeSet(pyTree, ['IndexRange_t'])
res = []
for indexRangePath in indexRangePaths: # Is it possible there are several ?
indexRange = CGU.getValueByPath(pyTree, indexRangePath)
if indexRange.shape == (3, 2) and len(dim) == 3: # 3D
ix = np.arange(indexRange[0, 0]-1, indexRange[0, 1]) # CGNS ids start at 1
iy = np.arange(indexRange[1, 0]-1, indexRange[1, 1]) # CGNS ids start at 1
iz = np.arange(indexRange[2, 0]-1, indexRange[2, 1]) # CGNS ids start at 1
ix_grid, iy_grid, iz_grid = np.meshgrid(ix, iy, iz, indexing='ij')
global_id = iz_grid + dim[2] * (iy_grid + dim[1] * ix_grid)
res.extend(global_id)
elif indexRange.shape == (2, 2) or len(dim) == 2: # 2D
ix = np.arange(indexRange[0, 0]-1, indexRange[0, 1]) # CGNS ids start at 1
iy = np.arange(indexRange[1, 0]-1, indexRange[1, 1]) # CGNS ids start at 1
ix_grid, iy_grid = np.meshgrid(ix, iy, indexing='ij')
global_id = iy_grid + dim[1] * ix_grid
res.extend(global_id)
else:
begin = indexRange[0]-1 # CGNS ids start at 1
end = indexRange[1]
res.extend(np.arange(begin, end).ravel())
return np.array(res, dtype=MuscatIndex).ravel()
[docs]
def StructuredZoneSurfaceIds(zone: list, offset: int):
"""Get global ids of the nodes on the surface of CGNS structured zone
Args:
zone (list): zone node to treat
offset (int): offset for the zone node ids in the global mesh
Returns:
res (list): global ids of node on each boundary surface of the zone
"""
dim = zone[1].shape[0]
imax, jmax = zone[1][0][0], zone[1][1][0]
kmax = 1
if dim == 3:
kmax = zone[1][2][0]
ix, iy, iz = np.arange(imax), np.arange(jmax), np.arange(kmax)
ix_grid, iy_grid, iz_grid = np.meshgrid(ix, iy, iz, indexing='ij')
global_id = offset + iz_grid + kmax * (iy_grid + jmax * ix_grid)
res = []
res.extend(global_id[0,:,:].ravel())
res.extend(global_id[imax-1,:,:].ravel())
res.extend(global_id[:,0,:].ravel())
res.extend(global_id[:,jmax-1,:].ravel())
if dim == 3:
res.extend(global_id[:,:,0].ravel())
res.extend(global_id[:,:,kmax-1].ravel())
return res
[docs]
def newZoneSubRegion(parent: list, name: str, pointList: NDArray, dim: int, gridLocation: str = "CellCenter", family: str = ""):
"""Create a new ZoneSubRegion in CGNS pytree
Args:
parent (list): CGNS pytree where to add the new ZoneSubRegion node
name (str): name of the CGNS ZoneSubRegion node
pointList (NDArray): list of nodes or elements to add to the ZoneSubRegion
dim (int): physical dimension of the CGNS pytree (2 for 2D, 3 for 3D)
gridLocation (str, optional): Determine if it is a selection of nodes or elements. Defaults to "CellCenter".
family (str, optional): Corresponding familyName. Defaults to "".
Returns:
znode (list): ZoneSubRegion node
"""
_check_cgns_name(name, context="ZoneSubRegion")
CGU.checkDuplicatedName(parent, name)
arange = np.array(pointList, dtype=np.int32, order="F")
zsize = np.array([[1, dim]])
CGU.checkArray(zsize, dienow=True)
znode = CGU.newNode(name, zsize, [], "ZoneSubRegion_t", parent)
CGU.newNode("PointList", arange, [], "IndexArray_t", znode)
CGL.newGridLocation(znode, value=gridLocation)
if family:
_check_cgns_name(family, context="Family")
CGL.newFamilyName(znode, family)
return znode
[docs]
def newFamilyName(parent: list, family: str, base: list = None):
"""Add a FamilyName to a node
Args:
parent (list): CGNS pytree where to add the new FamilyName node
family (str): Name of the family to set
base (list, optional): Ancestor CGNSBase_t node to automatically create the corresponding family. Defaults to None.
"""
_check_cgns_name(family, context="Family")
fnpath = CGU.getPathsByTypeOrNameList(parent, [parent[0], 'FamilyName_t'])
if not fnpath:
CGL.newFamilyName(parent, family=family)
else:
fn = CGU.getNodeByPath(parent, fnpath[0])
familyName = CGU.getValueAsString(fn)
if familyName != family:
afn = CGL.newAdditionalFamilyName(parent, family)
afn[0] = "AdditionalFamilyName"
else:
return
if base:
fn = CGU.getNodeByPath(base, family)
if not fn:
CGL.newFamily(base, family)
[docs]
def createMissingFamilies(tree: list):
"""Check and create if missing corresponding Family_t node for every FamilyName_t node found.
Args:
tree (list): CGNSTree_t or CGNSBase_t to check and complete
"""
fn_paths = CGU.getPathsByTypeSet(tree, ['FamilyName_t', 'AdditionalFamilyName_t'])
for path in fn_paths:
fn = CGU.getNodeByPath(tree, path)
base = CGU.getAncestorByType(tree, fn, "CGNSBase_t")
try:
familyName = CGU.getValueAsString(fn)
_check_cgns_name(familyName, context="Family")
if not CGU.getPathsByTypeOrNameList(base, ["CGNSBase_t", familyName]) and familyName != "Null":
CGL.newFamily(base, familyName)
except TypeError:
print("Couldn't create associated family to", path)
[docs]
def CGNSToMesh(pyTree: list, baseNames: Optional[List[str]] = None, zoneNames: Optional[List[str]] = None, fieldsAsTags: bool = False, partitionedMesh= False ) -> Union[Mesh, PartitionedMesh]:
"""Read a CGNS tree and return a corresponding Muscat Mesh
Args:
pyTree (list): CGNS tree to read
baseNames (Optional[List[str]], optional): Names of the CGNS Bases to treat, if None every CGNS Bases are treated. Defaults to None.
zoneNames (Optional[List[str]], optional): Names of the CGNS Zones to treat, if None every CGNS Zones are treated. Defaults to None.
fieldsAsTags (bool, optional): _description_. Defaults to False.
Raises:
ValueError: if cell dimension not in [2, 3]
Returns:
Mesh: Corresponding Muscat Mesh
"""
if baseNames is not None:
basepaths = CGU.getPathsByNameSet(pyTree, baseNames)
else:
basepaths = CGU.getPathsByTypeSet(pyTree, ["CGNSBase_t"])
if partitionedMesh:
res = PartitionedMesh()
else:
res = Mesh()
nodesSurfaces = []
for basepath in basepaths:
basePyTree = CGU.getNodeByPath(pyTree, basepath)
cell_dim = CGU.getValueByPath(pyTree, basepath)[0]
physical_dim = CGU.getValueByPath(pyTree, basepath)[1]
if zoneNames is not None:
zonepaths = CGU.getPathsByNameSet(basePyTree, zoneNames)
else:
zonepaths = CGU.getPathsByTypeSet(basePyTree, ["Zone_t"])
for zonepath in zonepaths:
zonePyTree = CGU.getNodeByPath(basePyTree, zonepath)
zonetype = CGU.getValueAsString(CGU.getNodeByPath(zonePyTree, 'ZoneType'))
if zonetype == "Structured":
if partitionedMesh :
nbNodes = 0
surfaceIds = StructuredZoneSurfaceIds(zonePyTree, offset=nbNodes)
else:
nbNodes = res.GetNumberOfNodes()
surfaceIds = StructuredZoneSurfaceIds(zonePyTree, offset=nbNodes)
nodesSurfaces.extend(surfaceIds)
else :
surfaceIds = []
gridCoordinatesPath = CGU.getPathsByTypeSet(zonePyTree, ["GridCoordinates_t"])[0]
gx = CGU.getNodeByPath(zonePyTree, gridCoordinatesPath+'/CoordinateX')[1]
dim = gx.shape
local_mesh = Mesh()
local_mesh.nodes = np.empty((gx.size, physical_dim), dtype=MuscatFloat)
local_mesh.nodes[:, 0] = gx.ravel()
local_mesh.nodes[:, 1] = CGU.getNodeByPath(zonePyTree, gridCoordinatesPath+'/CoordinateY')[1].ravel()
if physical_dim == 3:
local_mesh.nodes[:, 2] = CGU.getNodeByPath(zonePyTree, gridCoordinatesPath+'/CoordinateZ')[1].ravel()
local_mesh.originalIDNodes = np.arange(1, len(local_mesh.nodes)+1, dtype=MuscatIndex)
# elements
elementsPaths = CGU.getPathsByTypeSet(zonePyTree, ["Elements_t"])
if not elementsPaths and zonetype == "Structured":
if cell_dim == 3:
ec = StructuredElementsContainer(ED.Hexahedron_8)
ec.SetDimensions([dim[0], dim[1], dim[2]])
elif cell_dim == 2:
ec = StructuredElementsContainer(ED.Quadrangle_4)
ec.SetDimensions([dim[0], dim[1]])
else:
raise ValueError(f"cell_dim equal to {cell_dim} is not functional")
conn = ec.connectivity
local_mesh.elements.AddContainer(ec)
ec.tags.CreateTag(zonePyTree[0]).SetIds(np.arange(ec.GetNumberOfElements()))
for elementsPath in elementsPaths:
cgnsElements = CGU.getNodeByPath(zonePyTree, elementsPath)
cgnsUserName = cgnsElements[0]
cgnsElemType = cgnsElements[1][0]
originalIds = __ReadIndexRange(cgnsElements, dim)
if cgnsElemType == 20: # we are in mixed mode: #TODO: with mixed element CGNS
cpt = 0
elementConnectivity = CGU.getNodeByPath(cgnsElements, cgnsUserName+'/ElementConnectivity')[0][1]
for oid in originalIds:
muscatElemType = GetCGNSNumberToMuscat(elementConnectivity[cpt])
cpt += 1
nbp = ED.numberOfNodes[muscatElemType]
conn = elementConnectivity[cpt:cpt+nbp] - 1
if CGNSNumberToMuscatPermutation.get(cgnsElemType, None) is not None:
conn = conn[CGNSNumberToMuscatPermutation[cgnsElemType]]
cpt += nbp
elems: ElementsContainer = local_mesh.elements.GetElementsOfType(muscatElemType)
elcpt = elems.AddNewElements(conn[None, :], [oid])
if not cgnsUserName.endswith(MuscatToCGNSNames[muscatElemType]):
elems.tags.CreateTag(cgnsUserName,False).AddToTag(elcpt-1)
else:
muscatElemType = GetCGNSNumberToMuscat(cgnsElemType)
elementConnectivity = np.asarray(CGU.getNodeByPath(cgnsElements, 'ElementConnectivity')[1], dtype=MuscatIndex).reshape( (-1, ED.numberOfNodes[muscatElemType] ) )-1
elems: ElementsContainer = local_mesh.elements.GetElementsOfType(muscatElemType)
if CGNSNumberToMuscatPermutation.get(cgnsElemType, None) is not None:
elementConnectivity = elementConnectivity[:, CGNSNumberToMuscatPermutation[cgnsElemType]]
elcpt = elems.cpt
nelcpt = elems.AddNewElements(elementConnectivity, originalIds)
if not cgnsUserName.endswith(MuscatToCGNSNames[muscatElemType]):
elems.tags.CreateTag(cgnsUserName,False).AddToTag(np.arange(elcpt,nelcpt))
# tags
elemTags = []
nodeTags = []
fnpath = CGU.getPathsByTypeList(zonePyTree, ["Zone_t", 'FamilyName_t'])
afnpath = CGU.getPathsByTypeList(zonePyTree, ["Zone_t", 'AdditionalFamilyName_t'])
ef_vol = ElementFilter(dimensionality=local_mesh.GetPointsDimensionality())
cell_ids = ef_vol.GetGlobalIds(local_mesh)
for path in fnpath + afnpath:
fn = CGU.getNodeByPath(zonePyTree, path)
if fn:
familyName = CGU.getValueAsString(fn)
local_mesh.AddElementsToTag(cell_ids, familyName)
elemTags.append(familyName)
BCPaths = CGU.getPathsByTypeList(zonePyTree, ["Zone_t", "ZoneBC_t", "BC_t"])
for BCPath in BCPaths:
BCNode = CGU.getNodeByPath(zonePyTree, BCPath)
bcfnpath = CGU.getPathsByTypeList(BCNode, [ "BC_t",'FamilyName_t'])
if len(bcfnpath):
fn = CGU.getNodeByPath(BCNode, bcfnpath[0])
familyName = CGU.getValueAsString(fn)
BCName = BCNode[0]
indices = __ReadIndex(BCNode, dim)
if len(indices) == 0:
continue
gl = CGU.getPathsByTypeSet(BCNode, ["GridLocation_t"])
if gl:
location = CGU.getValueAsString(CGU.getNodeByPath(BCNode, gl[0]))
else:
location = "Vertex"
if location in ["CellCenter", "FaceCenter", "EdgeCenter"]:
local_mesh.AddElementsToTag(indices, BCName)
elemTags.append(BCName)
elif location == "Vertex":
if len(bcfnpath) and familyName != BCName:
tag = local_mesh.nodesTags.CreateTag(familyName, errorIfAlreadyCreated=False)
tag.AddToTag(indices)
tag.RemoveDoubles()
local_mesh.nodesTags.CreateTag(BCName).SetIds(indices)
nodeTags.append(BCName)
ZSRPaths = CGU.getPathsByTypeList(zonePyTree, ["Zone_t", "ZoneSubRegion_t"])
for path in ZSRPaths:
ZSRNode = CGU.getNodeByPath(zonePyTree, path)
fnpath = CGU.getPathsByTypeList(ZSRNode, ['ZoneSubRegion_t', 'FamilyName_t'])
if fnpath:
fn = CGU.getNodeByPath(ZSRNode, fnpath[0])
familyName = CGU.getValueAsString(fn)
else:
familyName = ZSRNode[0]
indices = __ReadIndex(ZSRNode, dim)
if len(indices) == 0:
continue
gl = CGU.getPathsByTypeSet(ZSRNode, ["GridLocation_t"])[0]
location = CGU.getValueAsString(CGU.getNodeByPath(ZSRNode, gl))
if gl and location in ["CellCenter", "FaceCenter"]:
local_mesh.AddElementsToTag(indices, familyName)
elemTags.append(familyName)
elif not gl or location == "Vertex":
local_mesh.nodesTags.CreateTag(familyName).SetIds(indices)
nodeTags.append(familyName)
# fields
datasPaths = CGU.getPathsByTypeSet(zonePyTree, ["FlowSolution_t"])
for dataPath in datasPaths:
datas = CGU.getNodeByPath(zonePyTree, dataPath)
gl = CGU.getPathsByTypeSet(datas, ["GridLocation_t"])
store = local_mesh.nodeFields
location = "Vertex"
if gl:
location = CGU.getValueAsString(CGU.getNodeByPath(zonePyTree, gl[0]))
if location == "Vertex":
store = local_mesh.nodeFields
elif location in ["FaceCenter", "EdgeCenter", "CellCenter"]:
store = local_mesh.elemFields
fieldPaths = CGU.getPathsByTypeSet(datas, ["DataArray_t"])
Ids = 0
if location in ["CellCenter", "FaceCenter", "EdgeCenter"] and f"{datas[0]}/OriginalIds" in fieldPaths:
fieldData = CGU.getNodeByPath(datas, f"{datas[0]}/OriginalIds")
if fieldData[1] is None:
Ids = None
else:
Ids = np.asarray(fieldData[1]-1, dtype=MuscatIndex)
# for data in local_mesh.elements.values():
# print(data.__dict__)
# local_mesh.SetElementsOriginalIDs(Ids) #How to concatenate ids of elemVol and elemSurf
fieldPaths.remove(f"{datas[0]}/OriginalIds")
for fieldPath in fieldPaths:
fieldData = CGU.getNodeByPath(datas, fieldPath)
dataName = fieldData[0].split("_as_field")[0]
data = fieldData[1]
if zonetype == "Structured":
data = data.ravel()
if dataName == "OriginalIds" and store is local_mesh.nodeFields:
local_mesh.originalIDNodes = np.asarray(data, dtype=MuscatIndex, order='C')
elif store is local_mesh.nodeFields:
if (fieldsAsTags is True) or (fieldsAsTags is False and dataName not in nodeTags):
# The node field size is a multiple of the number of node (ie a scalar or vectorial field)
# Beware vectorial field is not compliant with CGNS standard
store[dataName] = data.ravel()
elif store is local_mesh.elemFields:
if (fieldsAsTags is True) or (fieldsAsTags is False and dataName not in elemTags):
if dataName not in store.keys():
store[dataName] = np.zeros((local_mesh.GetNumberOfElements(), *data.shape[1:]))
if isinstance(Ids, np.ndarray):
store[dataName][Ids] = data
else:
# The element field size is a multiple of the number of elements (ie a scalar or vectorial field)
# Beware vectorial field is not compliant with CGNS standard
nb_elem = data.shape[0]
store[dataName][np.arange(nb_elem)] = data
if partitionedMesh:
local_mesh.nodesTags.CreateTag("SurfaceIds").SetIds(surfaceIds)
local_mesh.nodesTags.CreateTag(f"Part_{len(res.storage)}").SetIds(np.arange(local_mesh.GetNumberOfNodes(), dtype=MuscatIndex))
for ec in local_mesh.elements:
ec.tags.CreateTag(f"Part_{len(res.storage)}").SetIds(np.arange(ec.GetNumberOfElements(), dtype=MuscatIndex))
local_mesh.nodeFields[GlobalIdsFieldName] = None
local_mesh.elemFields[GlobalIdsFieldName] = None
res.AddMesh(local_mesh)
else:
res.Merge(local_mesh)
if partitionedMesh:
res.GenerateGlobalIdsBasedOnNodePositions()
elif not partitionedMesh and nodesSurfaces and nbNodes > 0:
res = CleanDoubleNodesOnMatch(res, nodesSurfaces)
res.PrepareForOutput()
return res
[docs]
def MeshToCGNS(mesh: Mesh, outputPyTree: Optional[list] = None, tagsAsFields: bool = False, exportOriginalIDs: bool = True, updateZone: Union[bool, str] = False, defaultFamilyName: str = "Bulk"):
"""Write a CGNS Tree from a Muscat Mesh
Args:
mesh (Mesh): Muscat Mesh to export in CGNS.
outputPyTree (list, optional): If the mesh has to be written in an pre-existing CGNS tree. Defaults to None.
tagsAsFields (bool, optional): Create a field FlowSolution with tags for paraview visualisation. Defaults to False.
exportOriginalIDs (bool, optional): Create a OriginalIds data array in the CGNS. Defaults to True.
updateZone (Union[bool, str], optional):If outputPyTree, zones to update can be specified: True -> all zones, False -> no update, zone name -> only this zone. Defaults to False.
defaultFamilyName (str, optional): Default family name. Defaults to "Bulk".
Raises:
RuntimeError: Mesh points dimensionality should be in [2, 3]
Returns:
outputPyTree (list): CGNS tree including Muscat Mesh
"""
if outputPyTree is None:
update = False
outputPyTree = CGL.newCGNSTree()
def ConstructZone(outputPyTree, local_mesh: Mesh, zoneName: str, familyName: Optional[str], update: bool) -> list:
_check_cgns_name(zoneName, context="Zone")
if familyName is not None:
_check_cgns_name(familyName, context="Family")
cell_dim = local_mesh.GetElementsDimensionality()
physical_dim = mesh.GetPointsDimensionality()
if physical_dim == 2:
coordNames = ["X", "Y"]
elif physical_dim == 3:
coordNames = ["X", "Y", "Z"]
else: # pragma: no cover
raise RuntimeError('mesh.GetPointsDimensionality() should be 2 or 3')
ztype = "Unstructured"
zone = CGL.newZone(None, zoneName, zsize=np.array([[local_mesh.GetNumberOfNodes()], [local_mesh.GetNumberOfElements(dim=cell_dim)], [0]]).reshape(1, 3), ztype=ztype, family=familyName) # totalNumberOfTags
#add nodes
for i, coord in enumerate(coordNames):
CGL.newCoordinates(zone, name="Coordinate"+coord, value=np.copy(local_mesh.nodes[:, i]))
#add elements
globalOffsets = mesh.ComputeGlobalOffset()
for elementType, data in local_mesh.elements.items():
nbelem = data.connectivity.shape[0]
if nbelem == 0:
continue
nbelem = data.GetNumberOfElements()
if elementType in MuscatElementTypeToCGNS:
connectivity = data.connectivity[:,MuscatElementTypeToCGNS[elementType]]
else:
connectivity = data.connectivity
CGL.newElements(zone, "Elements_"+ MuscatToCGNSNames[data.elementType],
etype=MuscatToCGNSNumber[data.elementType],
erange=np.array((globalOffsets[elementType] + 1, globalOffsets[elementType] + nbelem)),
econnectivity=(connectivity+1).ravel()
)
location = "Vertex"
flowSolution = CGL.newFlowSolution(None, name=f"{location}Fields", gridlocation=location)
## add nodeFields
if exportOriginalIDs and len(local_mesh.originalIDNodes)>0:
CGL.newDataArray(flowSolution, "OriginalIds", value=local_mesh.originalIDNodes+1)
if len(local_mesh.nodeFields) > 0:
for pointFieldName, pointData in local_mesh.nodeFields.items():
_check_cgns_name(pointFieldName, context="DataArray")
if pointData.dtype.char == "U":
print(f"skipping nodeFields '{pointFieldName}' because it is of type: {pointData.dtype}")
continue
if len(pointData.shape) == 1:
CGL.newDataArray(flowSolution, pointFieldName, value=pointData)
else:
for indexes in itertools.product(*[range(dim) for dim in pointData.shape[1:]]):
CGL.newDataArray(flowSolution, f"{pointFieldName}_{'_'.join([str(i) for i in indexes])}", value=pointData[:,tuple(indexes)].ravel())
if len(CGU.hasChildType(flowSolution, "DataArray_t")) > 0:
CGU.setAsChild(zone, flowSolution)
## add elemFields for volum cells
location = "CellCenter"
ef_vol = ElementFilter(dimensionality=cell_dim)
flowSolutionCell = CGL.newFlowSolution(None, name=f"{location}Fields", gridlocation=location)
if exportOriginalIDs and len(local_mesh.GetElementsOriginalIDs(ef_vol))>0:
CGL.newDataArray(flowSolutionCell, "OriginalIds", value=local_mesh.GetElementsOriginalIDs(ef_vol)+1)
cell_ids = ef_vol.GetGlobalIds(local_mesh)
if cell_ids.shape != (0,):
for pointFieldName, cellData in local_mesh.elemFields.items():
_check_cgns_name(pointFieldName, context="DataArray")
if cellData.dtype.char == "U":
if pointFieldName != 'FE Names':
print(f"skipping elemFields '{pointFieldName}' because it is of type: {cellData.dtype}")
continue
valueArray = local_mesh.elemFields[pointFieldName][cell_ids]
if len(valueArray.shape)==1:
CGL.newDataArray(flowSolutionCell, pointFieldName, value=valueArray)
else:
for indexes in itertools.product(*[range(dim) for dim in valueArray.shape[1:]]):
local_pointFieldName = f"{pointFieldName}_{'_'.join([str(i) for i in indexes])}"
_check_cgns_name(local_pointFieldName, context="DataArray")
CGL.newDataArray(flowSolutionCell, local_pointFieldName, value=valueArray[:,tuple(indexes)].ravel())
if len(CGU.hasChildType(flowSolutionCell, "DataArray_t")) > 0:
CGU.setAsChild(zone, flowSolutionCell)
## add elemFields for surface cells
location = "FaceCenter"
# if cellDim == 3:
# location = "FaceCenter"
# elif cellDim == 2: # Not yet supported by cgnscheck
# location = "EdgeCenter"
ef_surf = ElementFilter(dimensionality=cell_dim - 1)
flowSolutionSCell = CGL.newFlowSolution(None, name=f"{location}Fields", gridlocation=location)
if exportOriginalIDs and len(local_mesh.GetElementsOriginalIDs(ef_surf)) > 0:
CGL.newDataArray(flowSolutionSCell, "OriginalIds", value=local_mesh.GetElementsOriginalIDs(ef_surf)+1)
if len(local_mesh.elemFields) > 0:
cellS_ids = ef_surf.GetGlobalIds(local_mesh)
if cellS_ids.shape != (0,):
for pointFieldName, cellData in local_mesh.elemFields.items():
_check_cgns_name(pointFieldName, context="DataArray")
if cellData.dtype.char == "U":
if pointFieldName != 'FE Names':
print(f"skipping elemFields '{pointFieldName}' because it is of type: {cellData.dtype}")
continue
valueArray = local_mesh.elemFields[pointFieldName][cellS_ids]
if len(valueArray.shape) == 1:
CGL.newDataArray(flowSolutionSCell, pointFieldName, value=valueArray)
else:
for indexes in itertools.product(*[range(dim) for dim in valueArray.shape[1:]]):
local_pointFieldName = f"{pointFieldName}_{'_'.join([str(i) for i in indexes])}"
_check_cgns_name(local_pointFieldName, context="DataArray")
CGL.newDataArray(flowSolutionSCell, local_pointFieldName, value=valueArray[:,tuple(indexes)].ravel())
if len(CGU.hasChildType(flowSolutionSCell, "DataArray_t")) > 0:
CGU.setAsChild(zone, flowSolutionSCell)
zbg = CGL.newZoneBC(None)
# add elements tags (as field also)
num_elt_surf = ElementFilter(dimensionality=cell_dim-1).GetGlobalIds(local_mesh)
for elTag in local_mesh.elements.GetTagsNames():
_check_cgns_name(elTag, context="Tag/Family")
ef_tagV = ElementFilter(dimensionality=cell_dim, eTag=elTag)
ids = ef_tagV.GetGlobalIds(local_mesh)
if ids.size > 0:
if np.array_equal(ids, cell_ids): # all volum elem have this tag - it is their family
newFamilyName(zone, family=elTag)
else:
_check_cgns_name(elTag+'_ZSR', context="ZoneSubRegion")
newZoneSubRegion(zone, elTag+'_ZSR', ids+1, cell_dim, family=elTag)
ef_tagS = ElementFilter(dimensionality=cell_dim-1, eTag=elTag)
ids = ef_tagS.GetGlobalIds(local_mesh)
if ids.size > 0:
bcNode = CGL.newBC(parent=zbg, bname=elTag, brange=[ids+1], family=None, pttype="PointList")
if cell_dim == 3:
CGL.newGridLocation(bcNode, value='FaceCenter')
elif cell_dim == 2:
CGL.newGridLocation(bcNode, value='EdgeCenter')
if tagsAsFields:
_check_cgns_name(elTag+"_as_field", context="DataArray"),
cellData = np.zeros(local_mesh.GetNumberOfElements(dim=cell_dim-1), dtype=np.int64)
cellData[np.isin(num_elt_surf, ids)] = 1
CGL.newDataArray(flowSolutionSCell, elTag+"_as_field", cellData)
# add node tags (as field also)
for tag in local_mesh.nodesTags:
_check_cgns_name(tag.name, context="BC")
ids = tag.GetIds()
if update:
try:
bcpath = CGU.getPathsByTypeOrNameList(outputPyTree, ['CGNSTree_t', 'CGNSBase_t', 'Zone_t', 'ZoneBC_t', tag.name])[0]
bc = CGU.getNodeByPath(outputPyTree, bcpath)
btype = CGU.getValueAsString(bc)
node = CGU.getNodeByPath(bc, 'FamilyName')
except IndexError:
node = None
btype = "Null"
if node:
fnbc = CGU.getValueAsString(node)
else:
fnbc = None
else:
btype = "Null"
fnbc = None
bcNode = CGL.newBC(parent=zbg, bname=tag.name, brange=[ids+1], btype=btype, family=fnbc, pttype="IndexArray_t")
CGL.newGridLocation(bcNode, value='Vertex')
if tagsAsFields:
_check_cgns_name(tag.name+"_as_field", context="DataArray")
pointData = np.zeros(local_mesh.GetNumberOfNodes())
pointData[ids] = 1
CGL.newDataArray(flowSolution, tag.name+"_as_field", np.copy(pointData))
if zbg[2]:
CGU.setAsChild(zone, zbg)
return zone
cell_dim = mesh.GetElementsDimensionality()
physical_dim = mesh.GetPointsDimensionality()
if updateZone:
basepath = CGU.getPathsByTypeSet(outputPyTree, ["CGNSBase_t"])[0]
base = CGU.getNodeByPath(outputPyTree, basepath)
if isinstance(updateZone, str):
zone_name = updateZone
_check_cgns_name(updateZone, context="Zone")
zonepaths = CGU.getPathsByTypeOrNameList(base, ["CGNSBase_t", zone_name])
fnpath = CGU.getPathsByTypeOrNameList(base, ['CGNSBase_t', zone_name, 'FamilyName_t'])[0]
fn = CGU.getNodeByPath(base, fnpath)
familyName = CGU.getValueAsString(fn)
else:
zonepaths = CGU.getPathsByTypeSet(outputPyTree, ["Zone_t"])
fnpaths = CGU.getPathsByTypeOrNameList(base, ['CGNSBase_t', zonepaths, 'FamilyName_t'])
if len(zonepaths) == 1:
zone_name = zonepaths[0].split('/')[-1]
else: # Structured CGNS
zone_name = "Zone"
fnpaths = CGU.getPathsByTypeList(base, ['CGNSBase_t', 'Zone_t', 'FamilyName_t'])
fn = []
for path in fnpaths:
node = CGU.getNodeByPath(base, path)
fn.append(CGU.getValueAsString(node))
fn = np.unique(fn)
if fn.size == 1:
familyName = fn[0]
else:
familyName = defaultFamilyName
_check_cgns_name(familyName, context="Family")
update = True
else:
baseName = "Base_"+str(cell_dim)+"_"+str(physical_dim)
base = CGL.newCGNSBase(outputPyTree, baseName, cell_dim, physical_dim)
zone_name = "Zone"
familyName = None
update = False
zone = ConstructZone(outputPyTree, mesh, zone_name, familyName=familyName, update=update)
fnpath = CGU.getPathsByTypeList(zone, ['Zone_t', 'FamilyName_t'])
if fnpath:
fn = CGU.getNodeByPath(zone, fnpath[0])
familyName = CGU.getValueAsString(fn)
_check_cgns_name(familyName, context="Family")
CGL.newFamily(base, familyName)
else:
newFamilyName(zone, defaultFamilyName, base)
if update:
# Clean old zones
for path in zonepaths:
node = CGU.getNodeByPath(outputPyTree, path)
CGU.nodeDelete(outputPyTree, node)
CGU.addChild(base, zone)
createMissingFamilies(outputPyTree)
return outputPyTree
[docs]
def printTree(pyTree: list, pre: str = ''):
"""Pretty print for CGNS Tree
Args:
pyTree (list): CGNS tree to print
pre (str, optional): indentation of print. Defaults to ''.
"""
np.set_printoptions(threshold=5, edgeitems=1)
def printValue(node):
if node[1].dtype == "|S1":
return CGU.getValueAsString(node)
else:
return f"{node[1]}".replace("\n", "")
for child in pyTree[2]:
try:
print(pre, child[0], ":", child[1].shape, printValue(child), child[1].dtype, child[3])
except AttributeError:
print(pre, child[0], ":", child[1], child[3])
if child[2]:
printTree(child, ' '*len(pre)+'|_ ')
np.set_printoptions(edgeitems=3, threshold=1000)
[docs]
def CleanDoubleNodesOnMatch(mesh: Mesh, nodesSurfaces: list):
"""Remove double nodes of a structured multi-zones mesh merged
Args:
mesh (Mesh): Mesh from where to find and delete double nodes
nodesSurfaces (list): Nodes on every boundary surface of each zone
Returns:
Mesh: Mesh without double nodes
"""
mask = np.zeros(mesh.GetNumberOfNodes())
mask[nodesSurfaces] = 1
MMT.CleanDoubleNodes(mesh, tol=1e-15, nodesToTestMask=np.array(mask, dtype=bool), propagateToNodeFields=True)
return mesh
[docs]
def CheckIntegrity(GUI = False):
try:
import CGNS.MAP as CGM
except ImportError:
print("CGNS Python module not found, skipping test")
return "skip"
import Muscat.TestData as MuscatTestData
from Muscat.MeshTools.MeshTools import IsClose
from Muscat.MeshTools.MeshModificationTools import ComputeSkin
from Muscat.IO.GeofReader import ReadGeof
# print("\n Cube unstructured")
# print("############################")
from Muscat.MeshTools.MeshCreationTools import CreateCube, CreateSquare, SubDivideMesh
res = CreateCube()
res.GenerateManufacturedOriginalIDs()
res.AddElementsToTag(np.arange(res.GetNumberOfElements(dim=res.GetPointsDimensionality())), "3D")
res.AddElementsToTag(np.arange(res.GetNumberOfElements(dim=res.GetPointsDimensionality())), "Cube")
# print(res)
res.VerifyIntegrity()
resI = MeshToCGNS(res)
# CGM.save('cube_uns3D.cgns', resI)
resI = MeshToCGNS(res, exportOriginalIDs=False)
zonepath = CGU.getPathsByTypeSet(resI, ["Zone_t"])[0]
zone = CGU.getNodeByPath(resI, zonepath)
resII = CGNSToMesh(resI, zoneNames=zone[0])
resII.VerifyIntegrity()
# print(resII)
IsClose(res, resII)
MeshToCGNS(resII, outputPyTree=resI, updateZone=zone[0], tagsAsFields=True)
# CGM.save('cube_uns3D_2.cgns', resI)
# print("\n Square unstructured")
# print("############################")
res = CreateSquare()
# print(res)
res = SubDivideMesh(res, 2)
res.GenerateManufacturedOriginalIDs()
ef_vol = ElementFilter(dimensionality=res.GetPointsDimensionality())
cell_ids = ef_vol.GetGlobalIds(res)
res.AddElementsToTag(cell_ids, "2D")
res.nodesTags.CreateTag("toto").SetIds([1, 3, 6, 12, 15])
resI = MeshToCGNS(res, tagsAsFields=True)
# CGM.save('toto.cgns', resI)
basepath = CGU.getPathsByTypeSet(resI, ["CGNSBase_t"])[0]
base = CGU.getNodeByPath(resI, basepath)
resII = CGNSToMesh(resI, baseNames=base[0])
# print(resII)
IsClose(res, resII)
resI = MeshToCGNS(resII, tagsAsFields=True)
# CGM.save('toto2.cgns', resI)
# print("\n Structured cgns 3D")
# print("############################")
tree = CGM.load(MuscatTestData.GetTestDataPath() + "CGNSExample/structured.cgns")[0]
res = CGNSToMesh(tree)
res3domaines = CGNSToMesh(tree, partitionedMesh=True)
#export Multidomain
#from Muscat.IO.XdmfWriter import XdmfWriter
#tt = XdmfWriter("toto.xdmf")
#tt.SetMultidomain(True)
#tt.Open()
#for m in res3domaines.storage:
# tt.Write(m, PointFields=list(m.nodeFields.values()), PointFieldsNames=list(m.nodeFields.keys()))
#tt.Close()
res.AddElementsToTag(np.arange(3), "Toto")
res2 = res.View()
ComputeSkin(res2, inPlace=True)
for fieldName in res.elemFields.keys():
res2.elemFields[fieldName] = np.zeros(res2.GetNumberOfElements())
res2.elemFields[fieldName][:res.elemFields[fieldName].shape[0]] = res.elemFields[fieldName]
res2.GenerateManufacturedOriginalIDs()
# print(res2)
resI = MeshToCGNS(res2)
# CGM.save('struct3D.cgns', resI)
resI2 = MeshToCGNS(res2, outputPyTree=tree, updateZone=True)
resII = CGNSToMesh(resI)
resII2 = CGNSToMesh(resI2)
# print(resII)
IsClose(res2, resII)
IsClose(res2, resII2)
resI = MeshToCGNS(resII, outputPyTree=tree, updateZone=True)
# print("\n Structured cgns 2D")
# print("############################")
tree = CGM.load(MuscatTestData.GetTestDataPath() + "CGNSExample/outcart.cgns")[0]
res = CGNSToMesh(tree)
# print(res)
res2 = res.View()
skin = ComputeSkin(res2, md=2, inPlace=False)
res2.MergeElements(skin, force=True)
# res2.Merge(skin)
# MMT.CleanDoubleNodes(res2, tol=1e-15)
res2.GenerateManufacturedOriginalIDs()
# print(res2)
resI = MeshToCGNS(res2)
# CGM.save('struct2D.cgns', resI)
resI2 = MeshToCGNS(res2, outputPyTree=tree, updateZone=True)
# CGM.save('struct2D_2.cgns', resI)
resII = CGNSToMesh(resI)
resII2 = CGNSToMesh(resI2)
# print(resII)
IsClose(res2, resII)
IsClose(res2, resII2)
res_part = CGNSToMesh(tree, partitionedMesh=True)
# print(res_part)
# print("\n HEXA_27 check")
# print("############################")
from Muscat.MeshTools.MeshCreationTools import ToQuadraticMesh
mesh = ToQuadraticMesh(CreateCube())
tree = MeshToCGNS(mesh)
elem_paths = [elp for elp in CGU.getAllNodesByTypeSet(tree, ["Elements_t"]) if 'HEXA_27' in elp][0]
elem_node = CGU.getNodeByPath(tree, elem_paths)
conn = CGU.getValueByPath(elem_node, "ElementConnectivity")-1
assert np.allclose(conn, CGNSToMuscatPermutation["HEXA_27"])
res = CGNSToMesh(tree)
assert np.allclose(res.elements[ED.Hexahedron_27].connectivity[0,:], np.arange(27))
IsClose(mesh, res)
mesh = ToQuadraticMesh(CreateCube(dimensions=[2,3,4]))
tree = MeshToCGNS(mesh)
elem_paths = [elp for elp in CGU.getAllNodesByTypeSet(tree, ["Elements_t"]) if 'HEXA_27' in elp][0]
elem_node = CGU.getNodeByPath(tree, elem_paths)
conn = CGU.getValueByPath(elem_node, "ElementConnectivity")-1
assert np.allclose(conn[:18], [0, 6, 9, 3, 1, 7, 10, 4, 24, 30, 27, 39, 52, 58, 61, 55, 25, 31])
res = CGNSToMesh(tree)
assert np.allclose(res.elements[ED.Hexahedron_27].connectivity, mesh.elements[ED.Hexahedron_27].connectivity)
IsClose(mesh, res)
# print("\n Multielement 3D")
filename = MuscatTestData.GetTestDataPath() + "multielements3D.geof"
mesh = ReadGeof(filename)
tree = MeshToCGNS(mesh)
# print("\n Multielement 2D")
filename = MuscatTestData.GetTestDataPath() + "multielements2D.geof"
mesh = ReadGeof(filename)
tree = MeshToCGNS(mesh)
# IntegrationPoint payload helpers
mesh = CreateSquare(dimensions=[2, 2], ofTriangles=True)
tree = MeshToCGNS(mesh)
basepath = CGU.getPathsByTypeSet(tree, ["CGNSBase_t"])[0]
zonepath = CGU.getPathsByTypeSet(tree, ["Zone_t"])[0]
base = CGU.getNodeByPath(tree, basepath)
zone = CGU.getNodeByPath(tree, zonepath)
nCells = mesh.GetNumberOfElements(dim=2)
AddIntegrationRuleCollection(
base,
collectionName="IntegrationGauss",
idToName={1: "TriCentroidRule"},
rules={
"TriCentroidRule": {
"element_type": "TRI_3",
"reference_space": "Parametric",
"integration_name": "ElementCenter",
"parametric_integration_points": np.array([[1.0 / 3.0, 1.0 / 3.0]], dtype=MuscatFloat),
"weights": np.array([0.5], dtype=MuscatFloat),
}
},
)
AddIntegrationPointFlowSolution(
zone,
flowName="IntegrationPointFields",
dataArrays={"IPScalar": np.arange(nCells, dtype=MuscatFloat)},
itgPointStartOffset=np.arange(nCells + 1, dtype=np.int32),
itgRulesPath=f"/{base[0]}/IntegrationGauss",
itgRulesIds=np.array([1], dtype=np.int32),
)
ipPayload = ReadIntegrationPointPayload(tree)
assert len(ipPayload) == 1
np.testing.assert_equal(ipPayload[0]["itg_point_start_offset"], np.arange(nCells + 1, dtype=np.int32))
np.testing.assert_equal(ipPayload[0]["data_arrays"]["IPScalar"], np.arange(nCells, dtype=MuscatFloat))
return 'ok'
if __name__ == '__main__':
print(CheckIntegrity(True))# pragma: no cover