Source code for Muscat.Containers.Filters.FilterOperators

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
from __future__ import annotations
from typing import Optional, List, Union, Any, Collection, Generator, Sequence
# from collections.abc import Sequence
from functools import reduce

import numpy as np

from Muscat.Types import MuscatIndex, ScalarIntLike
import Muscat.Containers.ElementsDescription as ED
from Muscat.Containers.Mesh import Mesh
from Muscat.Containers.Filters.FilterBase import FilterBase, ElementsSelection, IntersectOrdered1D, UnionOrdered1D
from Muscat.Containers.Filters.FilterObjects import ElementFilter, NodeFilter

FilterLike = Union[FilterBase, "FilterOperatorBase"]


[docs]class FilterOperatorBase(): """Base class to construct derived Filters (Union filter for example) Parameters ---------- filters : List[Union[Filter,FilterOperatorBase]], optional the list of filters to use, by default None """ def __init__(self, filters: Optional[Union[FilterLike,List[FilterLike]]] = None, minNumberOfInternalFilters: int = 1, maxNumberOfInternalFilters: int = 100) -> None: super().__init__() self.filters: List[Union[FilterOperatorBase, ElementFilter, NodeFilter]] = [] self.withError = False self._minNumberOfInternalFilters = minNumberOfInternalFilters self._maxNumberOfInternalFilters = maxNumberOfInternalFilters if isinstance(filters, (FilterOperatorBase, FilterBase)) and self._minNumberOfInternalFilters == 1 and self._maxNumberOfInternalFilters == 1: filters = [filters] if filters is not None: self.AddToFiltersList(filters)
[docs] def IsEquivalent(self, other: Any) -> bool: """To check if 2 element filter are equivalent () Parameters ---------- other : Any other object to check the equivalency Returns ------- bool True if the two filters are equal """ if id(self) == id(other): return True if isinstance(other, self.__class__): if self.filters != other.filters: return False return True else: return False
[docs] def ClearFilterList(self) -> None: """Clear the internal list of filter to be used to compute the operator """ self.filters = []
[docs] def AddToFiltersList(self, filters: Union[FilterLike, Sequence[FilterLike]]) -> None: """Add filter(s) to the internal list of filter to be used by the operators Parameters ---------- filters : Union[FilterLike, Sequence[FilterLike]] a filter or a list of filters Raises ------ TypeError if filters is not of the correct type RuntimeError if the operator cant accept more internal filters """ if isinstance(filters, (FilterBase, FilterOperatorBase)): filters = [filters] elif not isinstance(filters, (list, tuple)): raise TypeError(f"filters must be a list or tuple of filters not a : {type(filters)}") if len(filters) + len(self.filters) > self._maxNumberOfInternalFilters: raise RuntimeError(f"can't hold ({len(filters) + len(self.filters)}) filters, max is ({self._maxNumberOfInternalFilters}) ") self.filters.extend(filters)
def __call__(self, mesh: Mesh) -> Generator[ElementsSelection, None, None]: """ Iteration interface to ease the use of the filter :example: myFilter = UnionFilter() # <- or a IntersectionFilter for example myFilter.AddToFiltersList([myOtherFilter1,myOtherFilter2]) for selection in myFilter: print("This function is called on the union of the 2 filters") print("Number of element of type " + str(name)+ " is : " + str(len(ids) ) """ self.Check() elementsFound = False meshOffset = 0 selectionOffset = 0 for elements in mesh.elements: indices = self.GetIdsToTreat(mesh, elements.elementType) if len(indices) > 0: elementsFound = True yield ElementsSelection(mesh=mesh, indices=indices, elements=elements, meshOffset=meshOffset, selectionOffset=selectionOffset) meshOffset += elements.GetNumberOfElements() selectionOffset += len(indices) if selectionOffset == 0: if self.withError and elementsFound == False: raise Exception("Error!! Zero element in the element filter : \n" + str(self)) else: Warning("Error!! Zero element in the element filter : \n" + str(self))
[docs] def Check(self) -> None: if len(self.filters) < self._minNumberOfInternalFilters: raise RuntimeError(f"I need at least ({self._minNumberOfInternalFilters}) filters")
[docs] def GetIdsToTreat(self, mesh: Mesh, elementType: Union[ED.ElementType, str]) -> Union[np.ndarray, Collection]: """return the indices of the element selected by the filter for the ElementsContainer Parameters ---------- mesh : Mesh the mesh to filter elementType : str the element Type to work on Returns ------- np.ndarray the indices of the selected elements by the filter """ raise NotImplementedError("Cant use the base class as an operator")
[docs] def GetNodesIndices(self, mesh: Mesh) -> np.ndarray: """return the indices of the node selected by the filter Parameters ---------- mesh : Mesh elementType : str Returns ------- np.ndarray the indices of the selected nodes by the filter Raises ------ NotImplementedError _description_ """ raise NotImplementedError("Cant use the base class as an operator")
[docs]class UnionFilter(FilterOperatorBase): """ Specialized class to compute the union of filter (add) """ def __init__(self, filters: Optional[Union[FilterLike,List[FilterLike]]] = None) -> None: super().__init__(filters=filters, minNumberOfInternalFilters=1, maxNumberOfInternalFilters= 1000)
[docs] def GetIdsToTreat(self, mesh: Mesh, elementType: Union[ED.ElementType, str]) -> Union[np.ndarray, Collection]: """return the ids of the element selected by the filter for the ElementsContainer Parameters ---------- elementType : str the element Type to work on Returns ------- np.ndarray the ids selected by the filter """ return reduce(UnionOrdered1D, ( ff.GetIdsToTreat(mesh, elementType).astype(MuscatIndex,copy=False) for ff in self.filters))
[docs] def GetNodesIndices(self, mesh: Mesh) -> np.ndarray: self.Check() return reduce(UnionOrdered1D, (ff.GetNodesIndices(mesh).astype(MuscatIndex,copy=False) for ff in self.filters))
[docs]class IntersectionFilter(FilterOperatorBase): """ Specialized class to compute the intersection of filters """ def __init__(self, filters: Optional[Union[FilterLike,List[FilterLike]]] = None) -> None: super().__init__(filters=filters, minNumberOfInternalFilters=1, maxNumberOfInternalFilters=int(1e10))
[docs] def GetIdsToTreat(self, mesh: Mesh, elementType: Union[ED.ElementType, str]) -> Union[np.ndarray, Collection]: """return the ids of the element selected by the filter for the ElementsContainer Parameters ---------- elementType : str the element Type to work on Returns ------- np.ndarray the ids selected by the filter """ ids = None for ff in self.filters: if ids is None: ids = ff.GetIdsToTreat(mesh, elementType) else: ids = np.intersect1d(ids, ff.GetIdsToTreat(mesh, elementType), assume_unique=True) if len(ids) == 0: return ids return ids
[docs] def GetNodesIndices(self, mesh: Mesh) -> np.ndarray: self.Check() ids: np.ndarray = None for ff in self.filters: if ids is None: ids = ff.GetNodesIndices(mesh) else: ids = np.intersect1d(ids, ff.GetNodesIndices(mesh), assume_unique=True) if len(ids) == 0: return ids return ids
[docs]class DifferenceFilter(FilterOperatorBase): """ Specialized class to compute the difference between two filters """ def __init__(self, filters: Optional[Union[FilterLike,List[FilterLike]]] =None) -> None: super().__init__(filters=filters, minNumberOfInternalFilters=2, maxNumberOfInternalFilters=2)
[docs] def GetIdsToTreat(self, mesh: Mesh, elementType: Union[ED.ElementType, str]) -> Union[np.ndarray, Collection]: """return the ids of the element selected by the filter for the ElementsContainer Parameters ---------- elementType : str the element Type to work on Returns ------- np.ndarray the ids selected by the filter """ ids = None idsA = self.filters[0].GetIdsToTreat(mesh, elementType) idsB = self.filters[1].GetIdsToTreat(mesh, elementType) return np.setdiff1d(idsA, idsB, assume_unique=True)
[docs] def GetNodesIndices(self, mesh: Mesh) -> np.ndarray: self.Check() idsA = self.filters[0].GetNodesIndices(mesh) idsB = self.filters[1].GetNodesIndices(mesh) return np.setdiff1d(idsA, idsB, assume_unique=True)
[docs]class ComplementaryFilter(FilterOperatorBase): """Class to generate the complementary part of a filter """ def __init__(self, filters: Optional[Union[FilterLike,List[FilterLike]]]=None) -> None: """Create a complementary filter Parameters ---------- filters : list, optional A list containing only one filter to compute the complementary part, by default None Raises ------ Exception if the list contains more than one filter """ super().__init__(filters=filters, minNumberOfInternalFilters=1, maxNumberOfInternalFilters=1)
[docs] def GetIdsToTreat(self, mesh: Mesh, elementType: Union[ED.ElementType, str]) -> Union[np.ndarray, Collection]: """return the ids of the element selected by the filter for the ElementsContainer Parameters ---------- elementType : str the element Type to work on Returns ------- np.ndarray the ids selected by the filter Raises ------ Exception if the list contains more than one filter """ f = self.filters[0] ids = f.GetIdsToTreat(mesh, elementType) nbElements = mesh.GetElementsOfType(elementType).GetNumberOfElements() if len(ids) == nbElements: return [] mask = np.ones(nbElements, dtype=bool) mask[ids] = False return np.where(mask)[0]
[docs] def GetNodesIndices(self, mesh: Mesh) -> np.ndarray: self.Check() # We compute the nodes of the complementary elements in the case of an element filter if isinstance(self.filters[0], ElementFilter): return reduce(UnionOrdered1D,map(lambda selection : selection.elements.GetNodesIndexFor(selection.indices), self(mesh)),np.empty((0), dtype=MuscatIndex)) else: indices = self.filters[0].GetNodesIndices(mesh) mask = np.ones(mesh.GetNumberOfNodes(), dtype=bool) mask[indices] = False return np.where(mask)[0]
[docs]class FilterWithMesh(FilterOperatorBase): """Filter operator to ignore the incoming mesh and use a internally stored mesh """ def __init__(self, filters: Union[FilterLike,List[FilterLike]], mesh: Union[Mesh,None], ignoreIncomingMesh= False) -> None: """Create a instance of filter with a stored mesh Parameters ---------- filters : List[FilterBase] a list of only one filter mesh : Mesh the mesh to be used by this filter if self.ignoreIncomingMesh is True ignoreIncomingMesh : bool if true the mesh used Raises ------ Exception if the filters list contain more than one filter """ super().__init__(filters=filters, minNumberOfInternalFilters=1, maxNumberOfInternalFilters=1) self.mesh: Union[Mesh,None] = mesh self.ignoreIncomingMesh = ignoreIncomingMesh def _GetMeshToWorkOn_(self, mesh: Union[Mesh,None]) -> Mesh: """Internal member function to get the mesh to work on Parameters ---------- mesh : Mesh _description_ Returns ------- Mesh _description_ Raises ------ RuntimeError _description_ RuntimeError _description_ """ if self.mesh is None: if mesh is None: raise RuntimeError("Need a mesh to work on ") else: if self.ignoreIncomingMesh: raise RuntimeError("ignoreIncomingMesh is True but self.mesh is None") else: return mesh else: if mesh is None: return self.mesh else: if self.ignoreIncomingMesh: return self.mesh else: return mesh
[docs] def GetIdsToTreat(self, mesh: Mesh, elementType: Union[ED.ElementType, str]) -> Union[np.ndarray, Collection]: """return the ids of the element selected by the filter for the ElementsContainer Parameters ---------- mesh : Mesh Not Used! this argument is ignored, the mesh used is stored in self.mesh elementType : str the element Type to work on Returns ------- np.ndarray the ids selected by the filter """ f = self.filters[0] return f.GetIdsToTreat(self._GetMeshToWorkOn_(mesh), elementType)
def __call__(self, mesh: Optional[Mesh] = None) -> Generator[ElementsSelection, None, None]: """ Iteration interface to ease the use of the filter :example: myFilter = FilterOperatorBase(myMesh) # <- UnionElementFilter for example myFilter.AddToFiltersList([myOtherFilter1,myOtherFilter2]) for selection in myFilter: print("This function is called on the union of the 2 filters") print("Number of element of type " + str(name)+ " is : " + str(len(ids) ) """ self.Check() mesh = self._GetMeshToWorkOn_(mesh) return super().__call__(mesh)
[docs] def GetNodesIndices(self, mesh: Optional[Mesh] = None) -> np.ndarray: self.Check() mesh = self._GetMeshToWorkOn_(mesh) return self.filters[0].GetNodesIndices(mesh)
[docs]class PartialFilter(FilterOperatorBase): """ Utility class to create a partition of a ElementFilter """ def __init__(self, elementFilter:Union[FilterLike,List[FilterLike]], partitions: ScalarIntLike, partitionNumber: ScalarIntLike) -> None: """Create a Partial Element Filter. The partition is done using the numpy.array_split over the ids, so no geometrical compactness of the ids. Parameters ---------- elementFilter : a filter to extract the ids to treat partitions : int the number of partition. partitionNumber : int the selected partition for this instance """ super().__init__([elementFilter], minNumberOfInternalFilters=1, maxNumberOfInternalFilters=1) self.partitions = partitions self.partitionNumber = partitionNumber
[docs] def GetIdsToTreat(self, mesh: Mesh, elementType: Union[ED.ElementType, str]) -> Union[np.ndarray, Collection]: """return the ids of the element selected by the filter for the elementType Parameters ---------- elementType : str the element Type to work on Returns ------- np.ndarray the ids selected by the filter Raises ------ Exception if the list contains more than one filter """ res = self.filters[0].GetIdsToTreat(mesh, elementType) return np.array_split(res, self.partitions)[self.partitionNumber]
[docs] def GetNodesIndices(self, mesh: Mesh) -> np.ndarray: self.Check() res = self.filters[0].GetNodesIndices(mesh) return np.array_split(res, self.partitions)[self.partitionNumber]
[docs]class FrozenFilter(FilterOperatorBase): def __init__(self, filters: FilterLike, mesh, onElements: bool = True, onNodes: bool = False, withError: bool = False) -> None: """Class to hold a pre-evaluated filter Parameters ---------- mesh : Optional[Mesh], optional The mesh to work on, by default None filters : Union[Filter,FilterOperatorBase], optional the list with only one filter to use, by default None """ super().__init__(filters=filters, minNumberOfInternalFilters=1, maxNumberOfInternalFilters=1) self.checkMesh = True # we hold an instance of the mesh to check that the incoming mesh is the same self.mesh = mesh self.__frozenElementData = {} if onElements: meshOffset = 0 selectionOffset = 0 for elementType, elements in mesh.elements.items(): ids = self.filters[0].GetIdsToTreat(mesh, elementType=elementType) ids.flags.writeable = False res = ElementsSelection(mesh=mesh, indices=ids, elements=elements, meshOffset=meshOffset, selectionOffset=selectionOffset) self.SetIdsToTreatFor(res) meshOffset += elements.GetNumberOfElements() selectionOffset += len(ids) self.withError = withError if selectionOffset == 0: if self.withError: raise Exception("Error!! Zero element in the element filter : \n" + str(self)) else: Warning("Error!! Zero element in the element filter : \n" + str(self)) self.__frozenNodalData: Union[np.ndarray,None] = None if onNodes: self.__frozenNodalData = self.filters[0].GetNodesIndices(mesh) if (onNodes == False) and (onElements == False): raise RuntimeError("onNodes and onElements are False. this generate an useless frozen filter")
[docs] def IsEquivalent(self, other: Union[FrozenFilter, Any]) -> bool: if not super().IsEquivalent(other): return False if self is other: return True if not (self.__frozenElementData == other.__frozenElementData): return False if not np.array_equal(self.__frozenNodalData, other.__frozenNodalData): return False return True
[docs] def SetIdsToTreatFor(self, selection: ElementsSelection): """Set the ids for a specific element type. this filter keeps a reference to elements internally Parameters ---------- selection : ElementsSelection ElementsSelection to extract the element type """ self.__frozenElementData[selection.elements.elementType] = selection
[docs] def GetIdsToTreat(self, mesh: Mesh, elementType: Union[ED.ElementType, str]) -> Union[np.ndarray, Collection]: """return the ids of the element selected by the filter for the elementType Parameters ---------- mesh: Mesh this is a frozen filter so mesh is ignored elementType : str the element Type to work on Returns ------- np.ndarray the ids to selected by this filter """ if self.checkMesh and (self.mesh is not mesh): raise RuntimeError("Mesh are not the same, this frozen filter can be used on the original mesh only (checkMesh = True) ") return self.__frozenElementData[elementType].indices # (data, ids)
[docs] def GetElementSelectionSize(self) -> int: """Return the size of the element selection Returns ------- int number of elements in the selection """ cpt = 0 for selection in self.__frozenElementData.values(): cpt += selection.Size() return cpt
[docs] def GetNodeSelectionSize(self) -> int: """Return the size of the element selection Returns ------- int number of elements in the selection """ if self.__frozenNodalData is None: raise RuntimeError("Frozen filter not evaluated on nodes! Cant compute the size of the selection") return len(self.__frozenNodalData)
[docs] def GetNodesIndices(self, mesh: Optional[Mesh] = None) -> np.ndarray: self.Check() if self.checkMesh and mesh is not None and (self.mesh is not mesh): raise RuntimeError("Mesh are not the same, this frozen filter can be used on the original mesh only (checkMesh = True) ") if self.__frozenNodalData is None: raise RuntimeError("Nodal indices absent in this frozen filter, please rerun with onNodes = True") return self.__frozenNodalData
[docs]def CheckIntegrity(GUI: bool = False): from Muscat.Helpers.CheckTools import MustFailFunction, MustFailFunctionWith from Muscat.Containers import MeshCreationTools as UMCT from Muscat.Containers.Filters.FilterObjects import ElementFilter import Muscat.Containers.ElementsDescription as ED mesh = UMCT.CreateSquare(dimensions=[10, 10], origin=[0, 0], spacing=[10/9, 10/9], ofTriangles=True) mesh2 = UMCT.CreateSquare(dimensions=[10, 10], origin=[0, 0], spacing=[10/9, 10/9], ofTriangles=True) MustFailFunctionWith(NotImplementedError, FilterOperatorBase().GetIdsToTreat, mesh, ED.Bar_2) MustFailFunctionWith(NotImplementedError, FilterOperatorBase().GetNodesIndices, mesh) f1 = ElementFilter(elementType=ED.Bar_2) f2 = ElementFilter(elementType=ED.Triangle_3) uf1 = UnionFilter(filters=[f1, f2]) uf2 = UnionFilter(filters=[f1]) uf3 = UnionFilter(filters=[f1]) uf3.ClearFilterList() uf3.AddToFiltersList(f1) assert (uf1.IsEquivalent(uf1)) assert (uf1.IsEquivalent(uf2) == False) assert (uf1.IsEquivalent("hello") == False) assert (uf2.IsEquivalent(uf3)) def ElementCount(fil, local_mesh=None): if local_mesh == None: local_mesh = mesh if local_mesh == 0: local_mesh = None cpt = 0 for selection in fil(mesh=local_mesh): cpt += selection.Size() pass return cpt def NodeCount(fil: FilterOperatorBase) -> int: return len(fil.GetNodesIndices(mesh=mesh)) np.testing.assert_equal(ElementCount(uf1), mesh.GetNumberOfElements()) uf3.ClearFilterList() MustFailFunction(ElementCount, uf3) # check empty filter list ff1 = FrozenFilter(uf1, mesh, onElements=True, onNodes=True) np.testing.assert_equal(ff1.GetElementSelectionSize(), mesh.GetNumberOfElements()) np.testing.assert_equal(ff1.GetNodeSelectionSize(), mesh.GetNumberOfNodes()) np.testing.assert_equal(ElementCount(ff1), mesh.GetNumberOfElements()) np.testing.assert_equal(NodeCount(ff1), mesh.GetNumberOfNodes()) MustFailFunction(FrozenFilter, uf1, mesh, onElements=False, onNodes=False) MustFailFunctionWith(TypeError, uf1.AddToFiltersList, "not a filter") df1 = DifferenceFilter() MustFailFunction(df1.AddToFiltersList, [f1, f1, f1]) if1 = IntersectionFilter() if1.AddToFiltersList([f1, f2]) ElementCount(if1) if1.withError = True MustFailFunction(ElementCount, if1) # check empty filter NodeCount(if1) if2 = IntersectionFilter([f1, f1, f1]) ElementCount(if2) NodeCount(if2) if2 = IntersectionFilter([ElementFilter(elementType=ED.Bar_2, eTag="X0"), ElementFilter(elementType=ED.Bar_2, eTag="X1")]) np.testing.assert_equal(ElementCount(if2), 0) np.testing.assert_equal(NodeCount(if2), 0) MustFailFunction(FrozenFilter, if2, mesh, onElements=True, onNodes=False, withError=True) ff2 = FrozenFilter(if2, mesh, onElements=True, onNodes=False, withError=False) MustFailFunctionWith(RuntimeError, ff2.GetNodeSelectionSize) ff3 = FrozenFilter(if2, mesh, onElements=True, onNodes=True, withError=False) ff4 = FrozenFilter(if2, mesh, onElements=False, onNodes=True, withError=False) ff5 = FrozenFilter(if2, mesh, onElements=True, onNodes=False, withError=False) ff6 = FrozenFilter(if2, mesh2, onElements=True, onNodes=True, withError=False) ff6.checkMesh = True assert (ff2.IsEquivalent(ff1) == False) assert (ff2.IsEquivalent(ff2)) assert (ff2.IsEquivalent(ff3) == False) assert (ff2.IsEquivalent(ff4) == False) assert (ff2.IsEquivalent(ff5)) MustFailFunction(ff2.GetNodesIndices) MustFailFunction(ff6.GetNodesIndices, mesh) MustFailFunction(ElementCount, ff6, mesh) df1 = DifferenceFilter([ElementFilter(), f1]) # all the mesh without the bars np.testing.assert_equal(ElementCount(df1), mesh.GetElementsOfType(ED.Triangle_3).GetNumberOfElements()) np.testing.assert_equal(NodeCount(df1), 64) cf1 = ComplementaryFilter(df1) np.testing.assert_equal(ElementCount(cf1), mesh.GetElementsOfType(ED.Bar_2).GetNumberOfElements()) np.testing.assert_equal(NodeCount(cf1), 100-64) fwm1 = FilterWithMesh(if2, mesh=None) MustFailFunction(ElementCount, fwm1, 0) ElementCount(fwm1, mesh) fwm1.mesh = mesh ElementCount(fwm1, 0) ElementCount(fwm1, mesh) fwm1.ignoreIncomingMesh = True fwm1.mesh = None MustFailFunction(ElementCount, fwm1, 0) MustFailFunction(ElementCount, fwm1, mesh) fwm1.mesh = mesh ElementCount(fwm1, 0) ElementCount(fwm1, mesh) fwm1.GetNodesIndices() pf1 = PartialFilter(f2, 2, 0) pf2 = PartialFilter(f2, 2, 1) np.testing.assert_equal(ElementCount(pf1) + ElementCount(pf2), mesh.GetElementsOfType(ED.Triangle_3).GetNumberOfElements()) np.testing.assert_equal(NodeCount(pf1) + NodeCount(pf2), mesh.GetNumberOfNodes()) return 'ok'
if __name__ == '__main__': print(CheckIntegrity(GUI=True)) # pragma: no cover