Source code for Muscat.Bridges.CGNSBridge

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
from typing import Any, Dict, List, Optional, Union
from numpy.typing import NDArray

import numpy as np

try:
    import CGNS.MAP as CGM
    import CGNS.PAT.cgnsutils as CGU
    import CGNS.PAT.cgnslib as CGL
    import CGNS.PAT.cgnskeywords as CGK

    CGK.IntegrationPoint_s ="IntegrationPoint"
    if CGK.GridLocation_l[-1] != CGK.IntegrationPoint_s:
        CGK.GridLocation_l.append(CGK.IntegrationPoint_s)

except ImportError:
    from Muscat.Helpers.Logger import Warning
    Warning("CGNS library not found. Please install CGNS python package")

from Muscat.Types import MuscatFloat, MuscatIndex
from Muscat.MeshContainers.Mesh import Mesh, ElementsContainer
from Muscat.MeshContainers.PartitionedMesh import PartitionedMesh, GlobalIdsFieldName
from Muscat.MeshTools import MeshModificationTools as MMT
from Muscat.MeshContainers.Filters.FilterObjects import ElementFilter
import Muscat.MeshContainers.ElementsDescription as ED
from Muscat.MeshContainers.ElementsContainers import StructuredElementsContainer
import itertools

CGNSNameToMuscat = {}
CGNSNameToMuscat["NODE"] = ED.Point_1
# 1D
CGNSNameToMuscat["BAR_2"] = ED.Bar_2
CGNSNameToMuscat["BAR_3"] = ED.Bar_3
# 2D
CGNSNameToMuscat["TRI_3"] = ED.Triangle_3
CGNSNameToMuscat["TRI_6"] = ED.Triangle_6
CGNSNameToMuscat["QUAD_4"] = ED.Quadrangle_4
CGNSNameToMuscat["QUAD_8"] = ED.Quadrangle_8
CGNSNameToMuscat["QUAD_9"] = ED.Quadrangle_9
# 3D
CGNSNameToMuscat["TETRA_4"] = ED.Tetrahedron_4
CGNSNameToMuscat["TETRA_10"] = ED.Tetrahedron_10
CGNSNameToMuscat["PYRA_5"] = ED.Pyramid_5
CGNSNameToMuscat["PYRA_13"] = ED.Pyramid_13
CGNSNameToMuscat["PENTA_6"] = ED.Wedge_6
CGNSNameToMuscat["PENTA_15"] = ED.Wedge_15
CGNSNameToMuscat["PENTA_18"] = ED.Wedge_18
CGNSNameToMuscat["HEXA_8"] = ED.Hexahedron_8
CGNSNameToMuscat["HEXA_20"] = ED.Hexahedron_20
CGNSNameToMuscat["HEXA_27"] = ED.Hexahedron_27
# when adding a new element please add the Pertmutaion
# https://cgns.github.io/CGNS_docs_current/sids/conv.html#unstructgrid
# example  CGNSToMuscatPermutation["TETRA_4"] = [0 3 2 1]
CGNSToMuscatPermutation = {}
CGNSToMuscatPermutation["PENTA_15"] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 13, 14, 9, 10, 11]
CGNSToMuscatPermutation["PENTA_18"] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 13, 14, 9, 10, 11, 15, 16, 17]
CGNSToMuscatPermutation["HEXA_20"] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 16, 17, 18, 19, 12, 13, 14, 15]
CGNSToMuscatPermutation["HEXA_27"] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 16, 17, 18, 19, 12, 13, 14, 15, 24, 22, 21, 23, 20, 25, 26]

CGNS_ElementType_l = ['Null', 'UserDefined', 'NODE', 'BAR_2', 'BAR_3', 'TRI_3', 'TRI_6', 'QUAD_4', 'QUAD_8', 'QUAD_9', 'TETRA_4', 'TETRA_10', 'PYRA_5', 'PYRA_14', 'PENTA_6', 'PENTA_15', 'PENTA_18', 'HEXA_8', 'HEXA_20', 'HEXA_27', 'MIXED', 'PYRA_13', 'NGON_n', 'NFACE_n', 'BAR_4', 'TRI_9', 'TRI_10', 'QUAD_12', 'QUAD_16', 'TETRA_16', 'TETRA_20', 'PYRA_21', 'PYRA_29', 'PYRA_30', 'PENTA_24', 'PENTA_38', 'PENTA_40', 'HEXA_32', 'HEXA_56', 'HEXA_64', 'BAR_5', 'TRI_12', 'TRI_15', 'QUAD_P4_16', 'QUAD_25', 'TETRA_22', 'TETRA_34', 'TETRA_35', 'PYRA_P4_29', 'PYRA_50', 'PYRA_55', 'PENTA_33', 'PENTA_66', 'PENTA_75', 'HEXA_44', 'HEXA_98', 'HEXA_125']

CGNSNumberToMuscat = {CGNS_ElementType_l.index(k): v for k, v in CGNSNameToMuscat.items()}

MuscatToCGNSNames = {y: x for x, y in CGNSNameToMuscat.items()}
MuscatToCGNSNumber = {y: x for x, y in CGNSNumberToMuscat.items()}

CGNS_MAX_NAME_LENGTH = 32
#use the proposal to write integration point data, if false we use a UserDefined
USE_CGNS_PROPOSAL_CPEX0047 = False


def _string_to_cgns_array(value: str) -> np.ndarray:
    """Convert python string to CGNS C1-like array."""
    if value is None:
        value = ""
    return np.frombuffer(value.encode("ascii"), dtype="|S1")


def _strings_to_fixed_cgns_matrix(values: List[str], width: int = CGNS_MAX_NAME_LENGTH) -> np.ndarray:
    """Convert a list of strings to a fixed-width CGNS C1 2D matrix (width, n)."""
    res = np.zeros((width, len(values)), dtype="|S1", order="F")
    for j, name in enumerate(values):
        _check_cgns_name(name, context="MapName")
        encoded = name.encode("ascii")[:width]
        if len(encoded):
            res[:len(encoded), j] = np.frombuffer(encoded, dtype="|S1")
    return res


def _resolve_abs_path(pyTree: list, absPath: str):
    """Resolve a CGNS absolute path like '/Base/Zone/Node'."""
    if absPath is None:
        return None
    return CGU.getNodeByPath(pyTree, absPath.lstrip("/"))


def _get_or_create_child(parent: list, name: str, label: str, value: Any = None):
    node = CGU.getNodeByPath(parent, name)
    if node is None:
        node = CGU.newNode(name, value, [], label, parent)
    return node


def _new_element_association(parent: list, path: str, ids: np.ndarray, nodeName: str = "ItgRules"):
    assoc = CGU.newNode(nodeName, None, [], "ElementAssociation_t", parent)
    CGU.newNode("Path", _string_to_cgns_array(path), [], "DataArray_t", assoc)
    CGU.newNode("Ids", np.asarray(ids, dtype=np.int32), [], "DataArray_t", assoc)
    return assoc


def _read_element_association(node: Optional[list]) -> Optional[Dict[str, Any]]:
    if node is None:
        return None
    pathNode = CGU.getNodeByPath(node, "Path")
    idsNode = CGU.getNodeByPath(node, "Ids")
    path = CGU.getValueAsString(pathNode) if pathNode is not None else None
    ids = np.asarray(idsNode[1], dtype=np.int32).ravel() if idsNode is not None and idsNode[1] is not None else None
    return {"path": path, "ids": ids}


[docs] def AddIntegrationRuleCollection(baseNode: list, collectionName: str, idToName: Dict[int, str], rules: Dict[str, Dict[str, Any]]) -> list: """Create (or replace) an IntegrationRulesCollection_t under a CGNS base.""" _check_cgns_name(collectionName, context="IntegrationRulesCollection") old = CGU.getNodeByPath(baseNode, collectionName) if old is not None: CGU.nodeDelete(baseNode, old) collection = CGU.newNode(collectionName, None, [], "IntegrationRulesCollection_t", baseNode) # IdToQualifier (MapName_t) ids = np.asarray(list(idToName.keys()), dtype=np.int32) names = list(idToName.values()) mapNode = CGU.newNode("IdToQualifier", ids, [], "MapName_t", collection) CGU.newNode("Names", _strings_to_fixed_cgns_matrix(names), [], "DataArray_t", mapNode) for ruleName, rule in rules.items(): _check_cgns_name(ruleName, context="IntegrationRule") rnode = CGU.newNode(ruleName, None, [], "IntegrationRule_t", collection) CGU.newNode("IntegrationName", _string_to_cgns_array(ruleName), [], "ElementType_t", rnode) CGU.newNode("ElementType", _string_to_cgns_array(rule.get("element_type", "TRI_3")), [], "ElementType_t", rnode) CGU.newNode("ReferenceSpace", _string_to_cgns_array(rule.get("reference_space", "Parametric")), [], "ElementSpace_t", rnode) points = np.asarray(rule["parametric_integration_points"], dtype=MuscatFloat) if points.ndim != 2: raise ValueError("parametric_integration_points must be a 2D array") weights = np.asarray(rule["weights"], dtype=MuscatFloat).ravel() if weights.size != points.shape[0]: raise ValueError("weights length must be equal to number of integration points") CGU.newNode("NumberOfIntegrationPoint", np.array([points.shape[0]], dtype=np.int32), [], "DataArray_t", rnode) CGU.newNode("ParametricDim", np.array([points.shape[1]], dtype=np.int32), [], "DataArray_t", rnode) if "integration_name" in rule: CGU.newNode("IntegrationName", _string_to_cgns_array(rule["integration_name"]), [], "DataArray_t", rnode) CGU.newNode("ParametricIntegrationPoint", points, [], "DataArray_t", rnode) CGU.newNode("Weights", weights, [], "DataArray_t", rnode) return collection
[docs] def AddIntegrationPointFlowSolution(zoneNode: list, flowName: str, dataArrays: Dict[str, np.ndarray], itgPointStartOffset: np.ndarray, itgRulesPath: str, itgRulesIds: np.ndarray, elementsFallbackAssociation: Optional[Dict[str, Any]] = None) -> list: """Create one FlowSolution_t with GridLocation=IntegrationPoint and associated metadata.""" _check_cgns_name(flowName, context="FlowSolution") old = CGU.getNodeByPath(zoneNode, flowName) if old is not None: CGU.nodeDelete(zoneNode, old) # for the moment we use userDefinedData to store IntegrationPoint data # because pycng does not implement IntegrationPoint as a gridlocation if USE_CGNS_PROPOSAL_CPEX0047: flow = CGL.newFlowSolution(None, name=flowName, gridlocation="IntegrationPoint") else: flow = CGL.newUserDefinedData(None, name=flowName) CGL.newDescriptor(flow, 'gridlocation', 'IntegrationPoint') _new_element_association(flow, itgRulesPath, np.asarray(itgRulesIds, dtype=np.int32), nodeName="ItgRules") CGU.newNode("ItgPointStartOffset", np.asarray(itgPointStartOffset, dtype=np.int32), [], "Offset_t", flow) for fieldName, fieldData in dataArrays.items(): _check_cgns_name(fieldName, context="DataArray") CGL.newDataArray(flow, fieldName, value=np.asarray(fieldData)) CGU.setAsChild(zoneNode, flow) if elementsFallbackAssociation is not None: elemName = elementsFallbackAssociation["elements_name"] elemNode = CGU.getNodeByPath(zoneNode, elemName) if elemNode is not None: _new_element_association( elemNode, elementsFallbackAssociation["path"], np.asarray(elementsFallbackAssociation["ids"], dtype=np.int32), nodeName="ItgRules", ) return flow
[docs] def ReadIntegrationPointPayload(pyTree: list, baseNames: Optional[List[str]] = None, zoneNames: Optional[List[str]] = None) -> List[Dict[str, Any]]: """Read raw IntegrationPoint payload from CGNS FlowSolution_t nodes.""" out: List[Dict[str, Any]] = [] def _get_integration_point_location(node: list) -> Optional[str]: """Return grid location for a flow-like node supporting legacy and UDD payloads.""" glPath = CGU.getPathsByTypeSet(node, ["GridLocation_t"]) if glPath: return CGU.getValueAsString(CGU.getNodeByPath(node, glPath[0])) # New payload format stores location as a descriptor on UserDefinedData_t # (e.g. CGL.newDescriptor(flow, 'gridlocation', 'IntegrationPoint')). descNode = CGU.getNodeByPath(node, "gridlocation") if descNode is not None: return CGU.getValueAsString(descNode) return None if baseNames is not None: basePaths = CGU.getPathsByNameSet(pyTree, baseNames) else: basePaths = CGU.getPathsByTypeSet(pyTree, ["CGNSBase_t"]) for basePath in basePaths: baseNode = CGU.getNodeByPath(pyTree, basePath) if zoneNames is not None: zonePaths = CGU.getPathsByNameSet(baseNode, zoneNames) else: zonePaths = CGU.getPathsByTypeSet(baseNode, ["Zone_t"]) for zonePath in zonePaths: zoneNode = CGU.getNodeByPath(baseNode, zonePath) flowPaths = CGU.getPathsByTypeSet(zoneNode, ["FlowSolution_t", "UserDefinedData_t"]) for flowPath in flowPaths: flowNode = CGU.getNodeByPath(zoneNode, flowPath) location = _get_integration_point_location(flowNode) if location != "IntegrationPoint": continue offsetNode = CGU.getNodeByPath(flowNode, "ItgPointStartOffset") if offsetNode is None or offsetNode[1] is None: continue offsets = np.asarray(offsetNode[1], dtype=np.int32).ravel() assoc = _read_element_association(CGU.getNodeByPath(flowNode, "ItgRules")) if assoc is None: elemPaths = CGU.getPathsByTypeSet(zoneNode, ["Elements_t"]) for ep in elemPaths: elem = CGU.getNodeByPath(zoneNode, ep) assoc = _read_element_association(CGU.getNodeByPath(elem, "ItgRules")) if assoc is not None: break dataArrays = {} for fp in CGU.getPathsByTypeSet(flowNode, ["DataArray_t"]): n = CGU.getNodeByPath(flowNode, fp) if n[0] in ["Path", "Ids", "IntegrationName"]: continue if n[1] is None: continue dataArrays[n[0]] = np.asarray(n[1]) resolvedRules = {} if assoc is not None and assoc["path"]: collection = _resolve_abs_path(pyTree, assoc["path"]) if collection is not None: mapNode = CGU.getNodeByPath(collection, "IdToQualifier") idMap: Dict[int, str] = {} if mapNode is not None: ids = np.asarray(mapNode[1], dtype=np.int32).ravel() if mapNode[1] is not None else np.array([], dtype=np.int32) namesNode = CGU.getNodeByPath(mapNode, "Names") if namesNode is not None and namesNode[1] is not None: namesRaw = np.asarray(namesNode[1]) for j, rid in enumerate(ids): idMap[int(rid)] = b"".join(namesRaw[:, j].tolist()).decode("ascii", errors="ignore").strip("\x00 ") for rid in np.unique(assoc["ids"] if assoc["ids"] is not None else np.array([], dtype=np.int32)): ruleName = idMap.get(int(rid)) if not ruleName: continue ruleNode = CGU.getNodeByPath(collection, ruleName) if ruleNode is None: continue etNode = CGU.getNodeByPath(ruleNode, "ElementType") rsNode = CGU.getNodeByPath(ruleNode, "ReferenceSpace") pNode = CGU.getNodeByPath(ruleNode, "ParametricIntegrationPoint") wNode = CGU.getNodeByPath(ruleNode, "Weights") resolvedRules[int(rid)] = { "name": ruleName, "element_type": CGU.getValueAsString(etNode) if etNode is not None else None, "reference_space": CGU.getValueAsString(rsNode) if rsNode is not None else None, "parametric_integration_points": np.asarray(pNode[1]) if pNode is not None and pNode[1] is not None else None, "weights": np.asarray(wNode[1]) if wNode is not None and wNode[1] is not None else None, } out.append( { "base": baseNode[0], "zone": zoneNode[0], "flow_solution": flowNode[0], "itg_point_start_offset": offsets, "itg_rules": assoc, "data_arrays": dataArrays, "resolved_rules": resolvedRules, } ) return out
CGNSNumberToMuscatPermutation = { k: CGNSToMuscatPermutation[name] for k in MuscatToCGNSNumber.values() if (name := MuscatToCGNSNames[CGNSNumberToMuscat[k]]) in CGNSToMuscatPermutation } MuscatElementTypeToCGNS = { k: np.argsort(CGNSToMuscatPermutation[name]) for k in CGNSNameToMuscat.values() if (name := MuscatToCGNSNames[k]) in CGNSToMuscatPermutation } def _check_cgns_name(name: str, context: str = "CGNS node") -> None: """Validate a CGNS node name according to CGNS naming convention. Args: name (str): name to validate context (str, optional): extra context for error message. Defaults to "CGNS node". Raises: ValueError: if name is not a string or if its length is greater than 32 chars Returns: str: validated name """ if not isinstance(name, str): raise ValueError(f"{context} name must be a string, got {type(name).__name__}") if len(name) > CGNS_MAX_NAME_LENGTH: raise ValueError( f"{context} name '{name}' has length {len(name)} which exceeds CGNS limit " f"({CGNS_MAX_NAME_LENGTH})" )
[docs] def GetCGNSNumberToMuscat(cgnsNumber: int): """Get Muscat ElementType from CGNS element type number Args: cgnsNumber (int): CGNS element type number Raises: Exception: if there is no equivalent to CGNS element type in Muscat Returns: res (ElementType): Corresponding elment type in Muscat """ res = CGNSNumberToMuscat.get(cgnsNumber, None) if res is None: # pragma: no cover raise Exception(f"CGNC elements of number : {cgnsNumber} not coded yet") return res
def __ReadIndex(pyTree: list, dim: List[int]): """Read Index Array or Index Range from CGNS Args: pyTree (list): CGNS node which has a child Index to read dim (list): dimensions of the coordinates Returns: indices in Muscat """ a = __ReadIndexArray(pyTree) b = __ReadIndexRange(pyTree, dim) return np.hstack((a, b)) def __ReadIndexArray(pyTree: list): """Read Index Array from CGNS Args: pyTree (list): CGNS node which has a child of type IndexArray_t to read Returns: indices in Muscat """ indexArrayPaths = CGU.getPathsByTypeSet(pyTree, ['IndexArray_t']) res = [] for indexArrayPath in indexArrayPaths: data = CGU.getNodeByPath(pyTree, indexArrayPath) if data[1] is None: continue else: res.extend(data[1].ravel() - 1) # CGNS ids start at 1 return np.array(res, dtype=MuscatIndex).ravel() def __ReadIndexRange(pyTree: list, dim: List[int]): """Read Index Range from CGNS Args: pyTree (list): CGNS node which has a child of type IndexRange_t to read dim (List[str]): dimensions of the coordinates Returns: indices in Muscat """ indexRangePaths = CGU.getPathsByTypeSet(pyTree, ['IndexRange_t']) res = [] for indexRangePath in indexRangePaths: # Is it possible there are several ? indexRange = CGU.getValueByPath(pyTree, indexRangePath) if indexRange.shape == (3, 2) and len(dim) == 3: # 3D ix = np.arange(indexRange[0, 0]-1, indexRange[0, 1]) # CGNS ids start at 1 iy = np.arange(indexRange[1, 0]-1, indexRange[1, 1]) # CGNS ids start at 1 iz = np.arange(indexRange[2, 0]-1, indexRange[2, 1]) # CGNS ids start at 1 ix_grid, iy_grid, iz_grid = np.meshgrid(ix, iy, iz, indexing='ij') global_id = iz_grid + dim[2] * (iy_grid + dim[1] * ix_grid) res.extend(global_id) elif indexRange.shape == (2, 2) or len(dim) == 2: # 2D ix = np.arange(indexRange[0, 0]-1, indexRange[0, 1]) # CGNS ids start at 1 iy = np.arange(indexRange[1, 0]-1, indexRange[1, 1]) # CGNS ids start at 1 ix_grid, iy_grid = np.meshgrid(ix, iy, indexing='ij') global_id = iy_grid + dim[1] * ix_grid res.extend(global_id) else: begin = indexRange[0]-1 # CGNS ids start at 1 end = indexRange[1] res.extend(np.arange(begin, end).ravel()) return np.array(res, dtype=MuscatIndex).ravel()
[docs] def StructuredZoneSurfaceIds(zone: list, offset: int): """Get global ids of the nodes on the surface of CGNS structured zone Args: zone (list): zone node to treat offset (int): offset for the zone node ids in the global mesh Returns: res (list): global ids of node on each boundary surface of the zone """ dim = zone[1].shape[0] imax, jmax = zone[1][0][0], zone[1][1][0] kmax = 1 if dim == 3: kmax = zone[1][2][0] ix, iy, iz = np.arange(imax), np.arange(jmax), np.arange(kmax) ix_grid, iy_grid, iz_grid = np.meshgrid(ix, iy, iz, indexing='ij') global_id = offset + iz_grid + kmax * (iy_grid + jmax * ix_grid) res = [] res.extend(global_id[0,:,:].ravel()) res.extend(global_id[imax-1,:,:].ravel()) res.extend(global_id[:,0,:].ravel()) res.extend(global_id[:,jmax-1,:].ravel()) if dim == 3: res.extend(global_id[:,:,0].ravel()) res.extend(global_id[:,:,kmax-1].ravel()) return res
[docs] def newZoneSubRegion(parent: list, name: str, pointList: NDArray, dim: int, gridLocation: str = "CellCenter", family: str = ""): """Create a new ZoneSubRegion in CGNS pytree Args: parent (list): CGNS pytree where to add the new ZoneSubRegion node name (str): name of the CGNS ZoneSubRegion node pointList (NDArray): list of nodes or elements to add to the ZoneSubRegion dim (int): physical dimension of the CGNS pytree (2 for 2D, 3 for 3D) gridLocation (str, optional): Determine if it is a selection of nodes or elements. Defaults to "CellCenter". family (str, optional): Corresponding familyName. Defaults to "". Returns: znode (list): ZoneSubRegion node """ _check_cgns_name(name, context="ZoneSubRegion") CGU.checkDuplicatedName(parent, name) arange = np.array(pointList, dtype=np.int32, order="F") zsize = np.array([[1, dim]]) CGU.checkArray(zsize, dienow=True) znode = CGU.newNode(name, zsize, [], "ZoneSubRegion_t", parent) CGU.newNode("PointList", arange, [], "IndexArray_t", znode) CGL.newGridLocation(znode, value=gridLocation) if family: _check_cgns_name(family, context="Family") CGL.newFamilyName(znode, family) return znode
[docs] def newFamilyName(parent: list, family: str, base: list = None): """Add a FamilyName to a node Args: parent (list): CGNS pytree where to add the new FamilyName node family (str): Name of the family to set base (list, optional): Ancestor CGNSBase_t node to automatically create the corresponding family. Defaults to None. """ _check_cgns_name(family, context="Family") fnpath = CGU.getPathsByTypeOrNameList(parent, [parent[0], 'FamilyName_t']) if not fnpath: CGL.newFamilyName(parent, family=family) else: fn = CGU.getNodeByPath(parent, fnpath[0]) familyName = CGU.getValueAsString(fn) if familyName != family: afn = CGL.newAdditionalFamilyName(parent, family) afn[0] = "AdditionalFamilyName" else: return if base: fn = CGU.getNodeByPath(base, family) if not fn: CGL.newFamily(base, family)
[docs] def createMissingFamilies(tree: list): """Check and create if missing corresponding Family_t node for every FamilyName_t node found. Args: tree (list): CGNSTree_t or CGNSBase_t to check and complete """ fn_paths = CGU.getPathsByTypeSet(tree, ['FamilyName_t', 'AdditionalFamilyName_t']) for path in fn_paths: fn = CGU.getNodeByPath(tree, path) base = CGU.getAncestorByType(tree, fn, "CGNSBase_t") try: familyName = CGU.getValueAsString(fn) _check_cgns_name(familyName, context="Family") if not CGU.getPathsByTypeOrNameList(base, ["CGNSBase_t", familyName]) and familyName != "Null": CGL.newFamily(base, familyName) except TypeError: print("Couldn't create associated family to", path)
[docs] def CGNSToMesh(pyTree: list, baseNames: Optional[List[str]] = None, zoneNames: Optional[List[str]] = None, fieldsAsTags: bool = False, partitionedMesh= False ) -> Union[Mesh, PartitionedMesh]: """Read a CGNS tree and return a corresponding Muscat Mesh Args: pyTree (list): CGNS tree to read baseNames (Optional[List[str]], optional): Names of the CGNS Bases to treat, if None every CGNS Bases are treated. Defaults to None. zoneNames (Optional[List[str]], optional): Names of the CGNS Zones to treat, if None every CGNS Zones are treated. Defaults to None. fieldsAsTags (bool, optional): _description_. Defaults to False. Raises: ValueError: if cell dimension not in [2, 3] Returns: Mesh: Corresponding Muscat Mesh """ if baseNames is not None: basepaths = CGU.getPathsByNameSet(pyTree, baseNames) else: basepaths = CGU.getPathsByTypeSet(pyTree, ["CGNSBase_t"]) if partitionedMesh: res = PartitionedMesh() else: res = Mesh() nodesSurfaces = [] for basepath in basepaths: basePyTree = CGU.getNodeByPath(pyTree, basepath) cell_dim = CGU.getValueByPath(pyTree, basepath)[0] physical_dim = CGU.getValueByPath(pyTree, basepath)[1] if zoneNames is not None: zonepaths = CGU.getPathsByNameSet(basePyTree, zoneNames) else: zonepaths = CGU.getPathsByTypeSet(basePyTree, ["Zone_t"]) for zonepath in zonepaths: zonePyTree = CGU.getNodeByPath(basePyTree, zonepath) zonetype = CGU.getValueAsString(CGU.getNodeByPath(zonePyTree, 'ZoneType')) if zonetype == "Structured": if partitionedMesh : nbNodes = 0 surfaceIds = StructuredZoneSurfaceIds(zonePyTree, offset=nbNodes) else: nbNodes = res.GetNumberOfNodes() surfaceIds = StructuredZoneSurfaceIds(zonePyTree, offset=nbNodes) nodesSurfaces.extend(surfaceIds) else : surfaceIds = [] gridCoordinatesPath = CGU.getPathsByTypeSet(zonePyTree, ["GridCoordinates_t"])[0] gx = CGU.getNodeByPath(zonePyTree, gridCoordinatesPath+'/CoordinateX')[1] dim = gx.shape local_mesh = Mesh() local_mesh.nodes = np.empty((gx.size, physical_dim), dtype=MuscatFloat) local_mesh.nodes[:, 0] = gx.ravel() local_mesh.nodes[:, 1] = CGU.getNodeByPath(zonePyTree, gridCoordinatesPath+'/CoordinateY')[1].ravel() if physical_dim == 3: local_mesh.nodes[:, 2] = CGU.getNodeByPath(zonePyTree, gridCoordinatesPath+'/CoordinateZ')[1].ravel() local_mesh.originalIDNodes = np.arange(1, len(local_mesh.nodes)+1, dtype=MuscatIndex) # elements elementsPaths = CGU.getPathsByTypeSet(zonePyTree, ["Elements_t"]) if not elementsPaths and zonetype == "Structured": if cell_dim == 3: ec = StructuredElementsContainer(ED.Hexahedron_8) ec.SetDimensions([dim[0], dim[1], dim[2]]) elif cell_dim == 2: ec = StructuredElementsContainer(ED.Quadrangle_4) ec.SetDimensions([dim[0], dim[1]]) else: raise ValueError(f"cell_dim equal to {cell_dim} is not functional") conn = ec.connectivity local_mesh.elements.AddContainer(ec) ec.tags.CreateTag(zonePyTree[0]).SetIds(np.arange(ec.GetNumberOfElements())) for elementsPath in elementsPaths: cgnsElements = CGU.getNodeByPath(zonePyTree, elementsPath) cgnsUserName = cgnsElements[0] cgnsElemType = cgnsElements[1][0] originalIds = __ReadIndexRange(cgnsElements, dim) if cgnsElemType == 20: # we are in mixed mode: #TODO: with mixed element CGNS cpt = 0 elementConnectivity = CGU.getNodeByPath(cgnsElements, cgnsUserName+'/ElementConnectivity')[0][1] for oid in originalIds: muscatElemType = GetCGNSNumberToMuscat(elementConnectivity[cpt]) cpt += 1 nbp = ED.numberOfNodes[muscatElemType] conn = elementConnectivity[cpt:cpt+nbp] - 1 if CGNSNumberToMuscatPermutation.get(cgnsElemType, None) is not None: conn = conn[CGNSNumberToMuscatPermutation[cgnsElemType]] cpt += nbp elems: ElementsContainer = local_mesh.elements.GetElementsOfType(muscatElemType) elcpt = elems.AddNewElements(conn[None, :], [oid]) if not cgnsUserName.endswith(MuscatToCGNSNames[muscatElemType]): elems.tags.CreateTag(cgnsUserName,False).AddToTag(elcpt-1) else: muscatElemType = GetCGNSNumberToMuscat(cgnsElemType) elementConnectivity = np.asarray(CGU.getNodeByPath(cgnsElements, 'ElementConnectivity')[1], dtype=MuscatIndex).reshape( (-1, ED.numberOfNodes[muscatElemType] ) )-1 elems: ElementsContainer = local_mesh.elements.GetElementsOfType(muscatElemType) if CGNSNumberToMuscatPermutation.get(cgnsElemType, None) is not None: elementConnectivity = elementConnectivity[:, CGNSNumberToMuscatPermutation[cgnsElemType]] elcpt = elems.cpt nelcpt = elems.AddNewElements(elementConnectivity, originalIds) if not cgnsUserName.endswith(MuscatToCGNSNames[muscatElemType]): elems.tags.CreateTag(cgnsUserName,False).AddToTag(np.arange(elcpt,nelcpt)) # tags elemTags = [] nodeTags = [] fnpath = CGU.getPathsByTypeList(zonePyTree, ["Zone_t", 'FamilyName_t']) afnpath = CGU.getPathsByTypeList(zonePyTree, ["Zone_t", 'AdditionalFamilyName_t']) ef_vol = ElementFilter(dimensionality=local_mesh.GetPointsDimensionality()) cell_ids = ef_vol.GetGlobalIds(local_mesh) for path in fnpath + afnpath: fn = CGU.getNodeByPath(zonePyTree, path) if fn: familyName = CGU.getValueAsString(fn) local_mesh.AddElementsToTag(cell_ids, familyName) elemTags.append(familyName) BCPaths = CGU.getPathsByTypeList(zonePyTree, ["Zone_t", "ZoneBC_t", "BC_t"]) for BCPath in BCPaths: BCNode = CGU.getNodeByPath(zonePyTree, BCPath) bcfnpath = CGU.getPathsByTypeList(BCNode, [ "BC_t",'FamilyName_t']) if len(bcfnpath): fn = CGU.getNodeByPath(BCNode, bcfnpath[0]) familyName = CGU.getValueAsString(fn) BCName = BCNode[0] indices = __ReadIndex(BCNode, dim) if len(indices) == 0: continue gl = CGU.getPathsByTypeSet(BCNode, ["GridLocation_t"]) if gl: location = CGU.getValueAsString(CGU.getNodeByPath(BCNode, gl[0])) else: location = "Vertex" if location in ["CellCenter", "FaceCenter", "EdgeCenter"]: local_mesh.AddElementsToTag(indices, BCName) elemTags.append(BCName) elif location == "Vertex": if len(bcfnpath) and familyName != BCName: tag = local_mesh.nodesTags.CreateTag(familyName, errorIfAlreadyCreated=False) tag.AddToTag(indices) tag.RemoveDoubles() local_mesh.nodesTags.CreateTag(BCName).SetIds(indices) nodeTags.append(BCName) ZSRPaths = CGU.getPathsByTypeList(zonePyTree, ["Zone_t", "ZoneSubRegion_t"]) for path in ZSRPaths: ZSRNode = CGU.getNodeByPath(zonePyTree, path) fnpath = CGU.getPathsByTypeList(ZSRNode, ['ZoneSubRegion_t', 'FamilyName_t']) if fnpath: fn = CGU.getNodeByPath(ZSRNode, fnpath[0]) familyName = CGU.getValueAsString(fn) else: familyName = ZSRNode[0] indices = __ReadIndex(ZSRNode, dim) if len(indices) == 0: continue gl = CGU.getPathsByTypeSet(ZSRNode, ["GridLocation_t"])[0] location = CGU.getValueAsString(CGU.getNodeByPath(ZSRNode, gl)) if gl and location in ["CellCenter", "FaceCenter"]: local_mesh.AddElementsToTag(indices, familyName) elemTags.append(familyName) elif not gl or location == "Vertex": local_mesh.nodesTags.CreateTag(familyName).SetIds(indices) nodeTags.append(familyName) # fields datasPaths = CGU.getPathsByTypeSet(zonePyTree, ["FlowSolution_t"]) for dataPath in datasPaths: datas = CGU.getNodeByPath(zonePyTree, dataPath) gl = CGU.getPathsByTypeSet(datas, ["GridLocation_t"]) store = local_mesh.nodeFields location = "Vertex" if gl: location = CGU.getValueAsString(CGU.getNodeByPath(zonePyTree, gl[0])) if location == "Vertex": store = local_mesh.nodeFields elif location in ["FaceCenter", "EdgeCenter", "CellCenter"]: store = local_mesh.elemFields fieldPaths = CGU.getPathsByTypeSet(datas, ["DataArray_t"]) Ids = 0 if location in ["CellCenter", "FaceCenter", "EdgeCenter"] and f"{datas[0]}/OriginalIds" in fieldPaths: fieldData = CGU.getNodeByPath(datas, f"{datas[0]}/OriginalIds") if fieldData[1] is None: Ids = None else: Ids = np.asarray(fieldData[1]-1, dtype=MuscatIndex) # for data in local_mesh.elements.values(): # print(data.__dict__) # local_mesh.SetElementsOriginalIDs(Ids) #How to concatenate ids of elemVol and elemSurf fieldPaths.remove(f"{datas[0]}/OriginalIds") for fieldPath in fieldPaths: fieldData = CGU.getNodeByPath(datas, fieldPath) dataName = fieldData[0].split("_as_field")[0] data = fieldData[1] if zonetype == "Structured": data = data.ravel() if dataName == "OriginalIds" and store is local_mesh.nodeFields: local_mesh.originalIDNodes = np.asarray(data, dtype=MuscatIndex, order='C') elif store is local_mesh.nodeFields: if (fieldsAsTags is True) or (fieldsAsTags is False and dataName not in nodeTags): # The node field size is a multiple of the number of node (ie a scalar or vectorial field) # Beware vectorial field is not compliant with CGNS standard store[dataName] = data.ravel() elif store is local_mesh.elemFields: if (fieldsAsTags is True) or (fieldsAsTags is False and dataName not in elemTags): if dataName not in store.keys(): store[dataName] = np.zeros((local_mesh.GetNumberOfElements(), *data.shape[1:])) if isinstance(Ids, np.ndarray): store[dataName][Ids] = data else: # The element field size is a multiple of the number of elements (ie a scalar or vectorial field) # Beware vectorial field is not compliant with CGNS standard nb_elem = data.shape[0] store[dataName][np.arange(nb_elem)] = data if partitionedMesh: local_mesh.nodesTags.CreateTag("SurfaceIds").SetIds(surfaceIds) local_mesh.nodesTags.CreateTag(f"Part_{len(res.storage)}").SetIds(np.arange(local_mesh.GetNumberOfNodes(), dtype=MuscatIndex)) for ec in local_mesh.elements: ec.tags.CreateTag(f"Part_{len(res.storage)}").SetIds(np.arange(ec.GetNumberOfElements(), dtype=MuscatIndex)) local_mesh.nodeFields[GlobalIdsFieldName] = None local_mesh.elemFields[GlobalIdsFieldName] = None res.AddMesh(local_mesh) else: res.Merge(local_mesh) if partitionedMesh: res.GenerateGlobalIdsBasedOnNodePositions() elif not partitionedMesh and nodesSurfaces and nbNodes > 0: res = CleanDoubleNodesOnMatch(res, nodesSurfaces) res.PrepareForOutput() return res
[docs] def MeshToCGNS(mesh: Mesh, outputPyTree: Optional[list] = None, tagsAsFields: bool = False, exportOriginalIDs: bool = True, updateZone: Union[bool, str] = False, defaultFamilyName: str = "Bulk"): """Write a CGNS Tree from a Muscat Mesh Args: mesh (Mesh): Muscat Mesh to export in CGNS. outputPyTree (list, optional): If the mesh has to be written in an pre-existing CGNS tree. Defaults to None. tagsAsFields (bool, optional): Create a field FlowSolution with tags for paraview visualisation. Defaults to False. exportOriginalIDs (bool, optional): Create a OriginalIds data array in the CGNS. Defaults to True. updateZone (Union[bool, str], optional):If outputPyTree, zones to update can be specified: True -> all zones, False -> no update, zone name -> only this zone. Defaults to False. defaultFamilyName (str, optional): Default family name. Defaults to "Bulk". Raises: RuntimeError: Mesh points dimensionality should be in [2, 3] Returns: outputPyTree (list): CGNS tree including Muscat Mesh """ if outputPyTree is None: update = False outputPyTree = CGL.newCGNSTree() def ConstructZone(outputPyTree, local_mesh: Mesh, zoneName: str, familyName: Optional[str], update: bool) -> list: _check_cgns_name(zoneName, context="Zone") if familyName is not None: _check_cgns_name(familyName, context="Family") cell_dim = local_mesh.GetElementsDimensionality() physical_dim = mesh.GetPointsDimensionality() if physical_dim == 2: coordNames = ["X", "Y"] elif physical_dim == 3: coordNames = ["X", "Y", "Z"] else: # pragma: no cover raise RuntimeError('mesh.GetPointsDimensionality() should be 2 or 3') ztype = "Unstructured" zone = CGL.newZone(None, zoneName, zsize=np.array([[local_mesh.GetNumberOfNodes()], [local_mesh.GetNumberOfElements(dim=cell_dim)], [0]]).reshape(1, 3), ztype=ztype, family=familyName) # totalNumberOfTags #add nodes for i, coord in enumerate(coordNames): CGL.newCoordinates(zone, name="Coordinate"+coord, value=np.copy(local_mesh.nodes[:, i])) #add elements globalOffsets = mesh.ComputeGlobalOffset() for elementType, data in local_mesh.elements.items(): nbelem = data.connectivity.shape[0] if nbelem == 0: continue nbelem = data.GetNumberOfElements() if elementType in MuscatElementTypeToCGNS: connectivity = data.connectivity[:,MuscatElementTypeToCGNS[elementType]] else: connectivity = data.connectivity CGL.newElements(zone, "Elements_"+ MuscatToCGNSNames[data.elementType], etype=MuscatToCGNSNumber[data.elementType], erange=np.array((globalOffsets[elementType] + 1, globalOffsets[elementType] + nbelem)), econnectivity=(connectivity+1).ravel() ) location = "Vertex" flowSolution = CGL.newFlowSolution(None, name=f"{location}Fields", gridlocation=location) ## add nodeFields if exportOriginalIDs and len(local_mesh.originalIDNodes)>0: CGL.newDataArray(flowSolution, "OriginalIds", value=local_mesh.originalIDNodes+1) if len(local_mesh.nodeFields) > 0: for pointFieldName, pointData in local_mesh.nodeFields.items(): _check_cgns_name(pointFieldName, context="DataArray") if pointData.dtype.char == "U": print(f"skipping nodeFields '{pointFieldName}' because it is of type: {pointData.dtype}") continue if len(pointData.shape) == 1: CGL.newDataArray(flowSolution, pointFieldName, value=pointData) else: for indexes in itertools.product(*[range(dim) for dim in pointData.shape[1:]]): CGL.newDataArray(flowSolution, f"{pointFieldName}_{'_'.join([str(i) for i in indexes])}", value=pointData[:,tuple(indexes)].ravel()) if len(CGU.hasChildType(flowSolution, "DataArray_t")) > 0: CGU.setAsChild(zone, flowSolution) ## add elemFields for volum cells location = "CellCenter" ef_vol = ElementFilter(dimensionality=cell_dim) flowSolutionCell = CGL.newFlowSolution(None, name=f"{location}Fields", gridlocation=location) if exportOriginalIDs and len(local_mesh.GetElementsOriginalIDs(ef_vol))>0: CGL.newDataArray(flowSolutionCell, "OriginalIds", value=local_mesh.GetElementsOriginalIDs(ef_vol)+1) cell_ids = ef_vol.GetGlobalIds(local_mesh) if cell_ids.shape != (0,): for pointFieldName, cellData in local_mesh.elemFields.items(): _check_cgns_name(pointFieldName, context="DataArray") if cellData.dtype.char == "U": if pointFieldName != 'FE Names': print(f"skipping elemFields '{pointFieldName}' because it is of type: {cellData.dtype}") continue valueArray = local_mesh.elemFields[pointFieldName][cell_ids] if len(valueArray.shape)==1: CGL.newDataArray(flowSolutionCell, pointFieldName, value=valueArray) else: for indexes in itertools.product(*[range(dim) for dim in valueArray.shape[1:]]): local_pointFieldName = f"{pointFieldName}_{'_'.join([str(i) for i in indexes])}" _check_cgns_name(local_pointFieldName, context="DataArray") CGL.newDataArray(flowSolutionCell, local_pointFieldName, value=valueArray[:,tuple(indexes)].ravel()) if len(CGU.hasChildType(flowSolutionCell, "DataArray_t")) > 0: CGU.setAsChild(zone, flowSolutionCell) ## add elemFields for surface cells location = "FaceCenter" # if cellDim == 3: # location = "FaceCenter" # elif cellDim == 2: # Not yet supported by cgnscheck # location = "EdgeCenter" ef_surf = ElementFilter(dimensionality=cell_dim - 1) flowSolutionSCell = CGL.newFlowSolution(None, name=f"{location}Fields", gridlocation=location) if exportOriginalIDs and len(local_mesh.GetElementsOriginalIDs(ef_surf)) > 0: CGL.newDataArray(flowSolutionSCell, "OriginalIds", value=local_mesh.GetElementsOriginalIDs(ef_surf)+1) if len(local_mesh.elemFields) > 0: cellS_ids = ef_surf.GetGlobalIds(local_mesh) if cellS_ids.shape != (0,): for pointFieldName, cellData in local_mesh.elemFields.items(): _check_cgns_name(pointFieldName, context="DataArray") if cellData.dtype.char == "U": if pointFieldName != 'FE Names': print(f"skipping elemFields '{pointFieldName}' because it is of type: {cellData.dtype}") continue valueArray = local_mesh.elemFields[pointFieldName][cellS_ids] if len(valueArray.shape) == 1: CGL.newDataArray(flowSolutionSCell, pointFieldName, value=valueArray) else: for indexes in itertools.product(*[range(dim) for dim in valueArray.shape[1:]]): local_pointFieldName = f"{pointFieldName}_{'_'.join([str(i) for i in indexes])}" _check_cgns_name(local_pointFieldName, context="DataArray") CGL.newDataArray(flowSolutionSCell, local_pointFieldName, value=valueArray[:,tuple(indexes)].ravel()) if len(CGU.hasChildType(flowSolutionSCell, "DataArray_t")) > 0: CGU.setAsChild(zone, flowSolutionSCell) zbg = CGL.newZoneBC(None) # add elements tags (as field also) num_elt_surf = ElementFilter(dimensionality=cell_dim-1).GetGlobalIds(local_mesh) for elTag in local_mesh.elements.GetTagsNames(): _check_cgns_name(elTag, context="Tag/Family") ef_tagV = ElementFilter(dimensionality=cell_dim, eTag=elTag) ids = ef_tagV.GetGlobalIds(local_mesh) if ids.size > 0: if np.array_equal(ids, cell_ids): # all volum elem have this tag - it is their family newFamilyName(zone, family=elTag) else: _check_cgns_name(elTag+'_ZSR', context="ZoneSubRegion") newZoneSubRegion(zone, elTag+'_ZSR', ids+1, cell_dim, family=elTag) ef_tagS = ElementFilter(dimensionality=cell_dim-1, eTag=elTag) ids = ef_tagS.GetGlobalIds(local_mesh) if ids.size > 0: bcNode = CGL.newBC(parent=zbg, bname=elTag, brange=[ids+1], family=None, pttype="PointList") if cell_dim == 3: CGL.newGridLocation(bcNode, value='FaceCenter') elif cell_dim == 2: CGL.newGridLocation(bcNode, value='EdgeCenter') if tagsAsFields: _check_cgns_name(elTag+"_as_field", context="DataArray"), cellData = np.zeros(local_mesh.GetNumberOfElements(dim=cell_dim-1), dtype=np.int64) cellData[np.isin(num_elt_surf, ids)] = 1 CGL.newDataArray(flowSolutionSCell, elTag+"_as_field", cellData) # add node tags (as field also) for tag in local_mesh.nodesTags: _check_cgns_name(tag.name, context="BC") ids = tag.GetIds() if update: try: bcpath = CGU.getPathsByTypeOrNameList(outputPyTree, ['CGNSTree_t', 'CGNSBase_t', 'Zone_t', 'ZoneBC_t', tag.name])[0] bc = CGU.getNodeByPath(outputPyTree, bcpath) btype = CGU.getValueAsString(bc) node = CGU.getNodeByPath(bc, 'FamilyName') except IndexError: node = None btype = "Null" if node: fnbc = CGU.getValueAsString(node) else: fnbc = None else: btype = "Null" fnbc = None bcNode = CGL.newBC(parent=zbg, bname=tag.name, brange=[ids+1], btype=btype, family=fnbc, pttype="IndexArray_t") CGL.newGridLocation(bcNode, value='Vertex') if tagsAsFields: _check_cgns_name(tag.name+"_as_field", context="DataArray") pointData = np.zeros(local_mesh.GetNumberOfNodes()) pointData[ids] = 1 CGL.newDataArray(flowSolution, tag.name+"_as_field", np.copy(pointData)) if zbg[2]: CGU.setAsChild(zone, zbg) return zone cell_dim = mesh.GetElementsDimensionality() physical_dim = mesh.GetPointsDimensionality() if updateZone: basepath = CGU.getPathsByTypeSet(outputPyTree, ["CGNSBase_t"])[0] base = CGU.getNodeByPath(outputPyTree, basepath) if isinstance(updateZone, str): zone_name = updateZone _check_cgns_name(updateZone, context="Zone") zonepaths = CGU.getPathsByTypeOrNameList(base, ["CGNSBase_t", zone_name]) fnpath = CGU.getPathsByTypeOrNameList(base, ['CGNSBase_t', zone_name, 'FamilyName_t'])[0] fn = CGU.getNodeByPath(base, fnpath) familyName = CGU.getValueAsString(fn) else: zonepaths = CGU.getPathsByTypeSet(outputPyTree, ["Zone_t"]) fnpaths = CGU.getPathsByTypeOrNameList(base, ['CGNSBase_t', zonepaths, 'FamilyName_t']) if len(zonepaths) == 1: zone_name = zonepaths[0].split('/')[-1] else: # Structured CGNS zone_name = "Zone" fnpaths = CGU.getPathsByTypeList(base, ['CGNSBase_t', 'Zone_t', 'FamilyName_t']) fn = [] for path in fnpaths: node = CGU.getNodeByPath(base, path) fn.append(CGU.getValueAsString(node)) fn = np.unique(fn) if fn.size == 1: familyName = fn[0] else: familyName = defaultFamilyName _check_cgns_name(familyName, context="Family") update = True else: baseName = "Base_"+str(cell_dim)+"_"+str(physical_dim) base = CGL.newCGNSBase(outputPyTree, baseName, cell_dim, physical_dim) zone_name = "Zone" familyName = None update = False zone = ConstructZone(outputPyTree, mesh, zone_name, familyName=familyName, update=update) fnpath = CGU.getPathsByTypeList(zone, ['Zone_t', 'FamilyName_t']) if fnpath: fn = CGU.getNodeByPath(zone, fnpath[0]) familyName = CGU.getValueAsString(fn) _check_cgns_name(familyName, context="Family") CGL.newFamily(base, familyName) else: newFamilyName(zone, defaultFamilyName, base) if update: # Clean old zones for path in zonepaths: node = CGU.getNodeByPath(outputPyTree, path) CGU.nodeDelete(outputPyTree, node) CGU.addChild(base, zone) createMissingFamilies(outputPyTree) return outputPyTree
[docs] def printTree(pyTree: list, pre: str = ''): """Pretty print for CGNS Tree Args: pyTree (list): CGNS tree to print pre (str, optional): indentation of print. Defaults to ''. """ np.set_printoptions(threshold=5, edgeitems=1) def printValue(node): if node[1].dtype == "|S1": return CGU.getValueAsString(node) else: return f"{node[1]}".replace("\n", "") for child in pyTree[2]: try: print(pre, child[0], ":", child[1].shape, printValue(child), child[1].dtype, child[3]) except AttributeError: print(pre, child[0], ":", child[1], child[3]) if child[2]: printTree(child, ' '*len(pre)+'|_ ') np.set_printoptions(edgeitems=3, threshold=1000)
[docs] def CleanDoubleNodesOnMatch(mesh: Mesh, nodesSurfaces: list): """Remove double nodes of a structured multi-zones mesh merged Args: mesh (Mesh): Mesh from where to find and delete double nodes nodesSurfaces (list): Nodes on every boundary surface of each zone Returns: Mesh: Mesh without double nodes """ mask = np.zeros(mesh.GetNumberOfNodes()) mask[nodesSurfaces] = 1 MMT.CleanDoubleNodes(mesh, tol=1e-15, nodesToTestMask=np.array(mask, dtype=bool), propagateToNodeFields=True) return mesh
[docs] def CheckIntegrity(GUI = False): try: import CGNS.MAP as CGM except ImportError: print("CGNS Python module not found, skipping test") return "skip" import Muscat.TestData as MuscatTestData from Muscat.MeshTools.MeshTools import IsClose from Muscat.MeshTools.MeshModificationTools import ComputeSkin from Muscat.IO.GeofReader import ReadGeof # print("\n Cube unstructured") # print("############################") from Muscat.MeshTools.MeshCreationTools import CreateCube, CreateSquare, SubDivideMesh res = CreateCube() res.GenerateManufacturedOriginalIDs() res.AddElementsToTag(np.arange(res.GetNumberOfElements(dim=res.GetPointsDimensionality())), "3D") res.AddElementsToTag(np.arange(res.GetNumberOfElements(dim=res.GetPointsDimensionality())), "Cube") # print(res) res.VerifyIntegrity() resI = MeshToCGNS(res) # CGM.save('cube_uns3D.cgns', resI) resI = MeshToCGNS(res, exportOriginalIDs=False) zonepath = CGU.getPathsByTypeSet(resI, ["Zone_t"])[0] zone = CGU.getNodeByPath(resI, zonepath) resII = CGNSToMesh(resI, zoneNames=zone[0]) resII.VerifyIntegrity() # print(resII) IsClose(res, resII) MeshToCGNS(resII, outputPyTree=resI, updateZone=zone[0], tagsAsFields=True) # CGM.save('cube_uns3D_2.cgns', resI) # print("\n Square unstructured") # print("############################") res = CreateSquare() # print(res) res = SubDivideMesh(res, 2) res.GenerateManufacturedOriginalIDs() ef_vol = ElementFilter(dimensionality=res.GetPointsDimensionality()) cell_ids = ef_vol.GetGlobalIds(res) res.AddElementsToTag(cell_ids, "2D") res.nodesTags.CreateTag("toto").SetIds([1, 3, 6, 12, 15]) resI = MeshToCGNS(res, tagsAsFields=True) # CGM.save('toto.cgns', resI) basepath = CGU.getPathsByTypeSet(resI, ["CGNSBase_t"])[0] base = CGU.getNodeByPath(resI, basepath) resII = CGNSToMesh(resI, baseNames=base[0]) # print(resII) IsClose(res, resII) resI = MeshToCGNS(resII, tagsAsFields=True) # CGM.save('toto2.cgns', resI) # print("\n Structured cgns 3D") # print("############################") tree = CGM.load(MuscatTestData.GetTestDataPath() + "CGNSExample/structured.cgns")[0] res = CGNSToMesh(tree) res3domaines = CGNSToMesh(tree, partitionedMesh=True) #export Multidomain #from Muscat.IO.XdmfWriter import XdmfWriter #tt = XdmfWriter("toto.xdmf") #tt.SetMultidomain(True) #tt.Open() #for m in res3domaines.storage: # tt.Write(m, PointFields=list(m.nodeFields.values()), PointFieldsNames=list(m.nodeFields.keys())) #tt.Close() res.AddElementsToTag(np.arange(3), "Toto") res2 = res.View() ComputeSkin(res2, inPlace=True) for fieldName in res.elemFields.keys(): res2.elemFields[fieldName] = np.zeros(res2.GetNumberOfElements()) res2.elemFields[fieldName][:res.elemFields[fieldName].shape[0]] = res.elemFields[fieldName] res2.GenerateManufacturedOriginalIDs() # print(res2) resI = MeshToCGNS(res2) # CGM.save('struct3D.cgns', resI) resI2 = MeshToCGNS(res2, outputPyTree=tree, updateZone=True) resII = CGNSToMesh(resI) resII2 = CGNSToMesh(resI2) # print(resII) IsClose(res2, resII) IsClose(res2, resII2) resI = MeshToCGNS(resII, outputPyTree=tree, updateZone=True) # print("\n Structured cgns 2D") # print("############################") tree = CGM.load(MuscatTestData.GetTestDataPath() + "CGNSExample/outcart.cgns")[0] res = CGNSToMesh(tree) # print(res) res2 = res.View() skin = ComputeSkin(res2, md=2, inPlace=False) res2.MergeElements(skin, force=True) # res2.Merge(skin) # MMT.CleanDoubleNodes(res2, tol=1e-15) res2.GenerateManufacturedOriginalIDs() # print(res2) resI = MeshToCGNS(res2) # CGM.save('struct2D.cgns', resI) resI2 = MeshToCGNS(res2, outputPyTree=tree, updateZone=True) # CGM.save('struct2D_2.cgns', resI) resII = CGNSToMesh(resI) resII2 = CGNSToMesh(resI2) # print(resII) IsClose(res2, resII) IsClose(res2, resII2) res_part = CGNSToMesh(tree, partitionedMesh=True) # print(res_part) # print("\n HEXA_27 check") # print("############################") from Muscat.MeshTools.MeshCreationTools import ToQuadraticMesh mesh = ToQuadraticMesh(CreateCube()) tree = MeshToCGNS(mesh) elem_paths = [elp for elp in CGU.getAllNodesByTypeSet(tree, ["Elements_t"]) if 'HEXA_27' in elp][0] elem_node = CGU.getNodeByPath(tree, elem_paths) conn = CGU.getValueByPath(elem_node, "ElementConnectivity")-1 assert np.allclose(conn, CGNSToMuscatPermutation["HEXA_27"]) res = CGNSToMesh(tree) assert np.allclose(res.elements[ED.Hexahedron_27].connectivity[0,:], np.arange(27)) IsClose(mesh, res) mesh = ToQuadraticMesh(CreateCube(dimensions=[2,3,4])) tree = MeshToCGNS(mesh) elem_paths = [elp for elp in CGU.getAllNodesByTypeSet(tree, ["Elements_t"]) if 'HEXA_27' in elp][0] elem_node = CGU.getNodeByPath(tree, elem_paths) conn = CGU.getValueByPath(elem_node, "ElementConnectivity")-1 assert np.allclose(conn[:18], [0, 6, 9, 3, 1, 7, 10, 4, 24, 30, 27, 39, 52, 58, 61, 55, 25, 31]) res = CGNSToMesh(tree) assert np.allclose(res.elements[ED.Hexahedron_27].connectivity, mesh.elements[ED.Hexahedron_27].connectivity) IsClose(mesh, res) # print("\n Multielement 3D") filename = MuscatTestData.GetTestDataPath() + "multielements3D.geof" mesh = ReadGeof(filename) tree = MeshToCGNS(mesh) # print("\n Multielement 2D") filename = MuscatTestData.GetTestDataPath() + "multielements2D.geof" mesh = ReadGeof(filename) tree = MeshToCGNS(mesh) # IntegrationPoint payload helpers mesh = CreateSquare(dimensions=[2, 2], ofTriangles=True) tree = MeshToCGNS(mesh) basepath = CGU.getPathsByTypeSet(tree, ["CGNSBase_t"])[0] zonepath = CGU.getPathsByTypeSet(tree, ["Zone_t"])[0] base = CGU.getNodeByPath(tree, basepath) zone = CGU.getNodeByPath(tree, zonepath) nCells = mesh.GetNumberOfElements(dim=2) AddIntegrationRuleCollection( base, collectionName="IntegrationGauss", idToName={1: "TriCentroidRule"}, rules={ "TriCentroidRule": { "element_type": "TRI_3", "reference_space": "Parametric", "integration_name": "ElementCenter", "parametric_integration_points": np.array([[1.0 / 3.0, 1.0 / 3.0]], dtype=MuscatFloat), "weights": np.array([0.5], dtype=MuscatFloat), } }, ) AddIntegrationPointFlowSolution( zone, flowName="IntegrationPointFields", dataArrays={"IPScalar": np.arange(nCells, dtype=MuscatFloat)}, itgPointStartOffset=np.arange(nCells + 1, dtype=np.int32), itgRulesPath=f"/{base[0]}/IntegrationGauss", itgRulesIds=np.array([1], dtype=np.int32), ) ipPayload = ReadIntegrationPointPayload(tree) assert len(ipPayload) == 1 np.testing.assert_equal(ipPayload[0]["itg_point_start_offset"], np.arange(nCells + 1, dtype=np.int32)) np.testing.assert_equal(ipPayload[0]["data_arrays"]["IPScalar"], np.arange(nCells, dtype=MuscatFloat)) return 'ok'
if __name__ == '__main__': print(CheckIntegrity(True))# pragma: no cover