# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
import numpy as np
import concurrent.futures
from scipy.sparse import coo_matrix
from Muscat.Helpers.Decorators import froze_it
from Muscat.Helpers.CPU import GetNumberOfAvailableCores
from Muscat.Helpers.Logger import Debug, Error
from Muscat.Types import MuscatIndex, MuscatFloat
from Muscat.MeshContainers.Filters.FilterObjects import ElementFilter
from Muscat.MeshContainers.Filters.FilterOperators import FrozenFilter
from Muscat.MeshContainers.Filters.FilterTools import GetListOfPartialElementFilter, GetFrozenFilter
from Muscat.FE.DofNumbering import ComputeDofNumbering
from Muscat.FE.Spaces.FESpaces import LagrangeSpaceGeo
from Muscat.FE.Fields.FEField import FEField
from Muscat.FE.Fields.IPField import IPField
from Muscat.FE.IntegrationRules import LagrangeIsoParamQuadrature, IntegrationRulesFactory, GetIntegrationRuleByName
import Muscat.FE.WeakForms.NumericalWeakForm as WeakForm
import Muscat.FE.Integration
[docs]
@froze_it
class IntegrationBaseClass():
""" Class to define and execute an integration of a weak form
"""
def __init__(self, other=None):
super().__init__()
if other is None:
# inputs
self.mesh = None
self.elementFilter = None
self.weakForm = None
self.numericalWeakForm = None
self.integrator = None
self.integrationRule = None
self.extraFields = []
self.posFields = []
self.unknownFields = None
self.testFields = None
self.nbCPUs = GetNumberOfAvailableCores()
self.onlyEvaluation = False
self.constants = {}
# ----
self.vK = None
self.iK = None
self.jK = None
self.numberOfUsedvij = 0
self.rhs = None
# ----
#self.SetIntegrator()
else:
"""this is a internal use to prepare a class instance for multithread integration """
self.nbCPUs = 1
self.mesh = other.mesh
self.integrator.SetIntegrationRule(other.integrationRule)
self.elementFilter = other.elementFilter
self.onlyEvaluation = other.onlyEvaluation
self.integrator.SetOnlyEvaluation(other.onlyEvaluation)
self.constants = other.constants
self.integrator.SetConstants(other.constants)
self.SetUnknownFields(other.unknownFields)
self.SetTestFields(other.testFields)
self.extraFields = other.extraFields
self.integrator.SetExtraFields(other.extraFields + other.posFields)
self.integrationRule = other.integrationRule
self.numericalWeakForm = other.numericalWeakForm
self.posFields = other.posFields
# ----
self.vK = None
self.iK = None
self.jK = None
self.numberOfUsedvij = 0
self.rhs = None
# ----
[docs]
def Reset(self):
self.integrator.Reset()
[docs]
def SetConstants(self, constants):
"""Set the contacts to be used in the weak form
Parameters
----------
constant : dict
dictionary with the constants key : string , value: float
"""
self.constants = constants
self.integrator.SetConstants(constants)
[docs]
def SetOnlyEvaluation(self, onlyEvaluation):
"""Set the onlyEvaluation option. If true the contribution of the determinant of the
transformation and the weight of the integration points is ignored. the user is
responsible of dividing by the mass matrix (if necessary) to get the correct values.
Parameters
----------
onlyEvaluation : bool
True to activate this option
"""
self.onlyEvaluation = onlyEvaluation
self.integrator.SetOnlyEvaluation(onlyEvaluation)
[docs]
def SetUnknownFields(self, unknownFields):
"""Set the fields used for the unknown space
Parameters
----------
unknownFields : list(FEField) list of fields
"""
if unknownFields is None:
unknownFields = []
self.unknownFields = unknownFields
self.integrator.SetUnknownFields(unknownFields)
[docs]
def SetTestFields(self, testFields):
"""Set the fields used for the test space
Parameters
----------
tfs : list(FEField) list of fields
if tfs is none then the unknown fields are used (Galerkin projection)
"""
self.testFields = testFields
self.integrator.SetTestFields(testFields)
[docs]
def SetIntegrationRule(self, integrationRuleName=None, integrationRule=None):
"""Set the Integration rule to be used during integration
Parameters
----------
integrationRuleName : str, optional
name of the integrationRule
integrationRule : dict, optional
integration rule for every element type key->str: value: tuple(intPoints ndarray, intWeights )
"""
if integrationRuleName is None:
if integrationRule is None:
self.integrationRule = LagrangeIsoParamQuadrature
else:
self.integrationRule = integrationRule
else:
if integrationRule is None:
self.integrationRule = GetIntegrationRuleByName(integrationRuleName)
else:
raise Exception("must give integrationRuleName or integrationRule not both")
self.integrator.SetIntegrationRule(self.integrationRule)
[docs]
def SetMesh(self, mesh):
"""Set the mesh defining the integration domain
Parameters
----------
mesh : Mesh
mesh containing the geometry
"""
self.mesh = mesh
fields = list()
LSGNum = ComputeDofNumbering(self.mesh, space=LagrangeSpaceGeo, fromConnectivity=True)
for i in range(self.mesh.GetPointsDimensionality()):
pos_x = FEField(f"Pos_{i}", mesh=self.mesh, space=LagrangeSpaceGeo, numbering=LSGNum, data=np.ascontiguousarray(self.mesh.nodes[:, i]))
fields.append(pos_x)
fields.append(FEField(f"Coordinate_{i}", mesh=self.mesh, space=LagrangeSpaceGeo, numbering=LSGNum, data=pos_x.data ))
self.posFields = fields
[docs]
def SetElementFilter(self, elementFilter=None):
"""Set the element filter to select the elements of the integration
"""
if elementFilter is None:
elementFilter = ElementFilter(dimensionality=self.mesh.GetElementsDimensionality())
if self.mesh is None:
raise RuntimeError("Need to set the mesh first")
self.elementFilter = GetFrozenFilter(elementFilter, mesh=self.mesh)
[docs]
def PreStartCheck(self):
""" verification of the integration rule for the ip fields:
"""
for f in self.extraFields:
if isinstance(f, IPField):
if f.rule != self.integrationRule:
print("f.rule")
print(f.rule)
print("integrationRule")
print(self.integrationRule)
raise Exception(f"Integration rule of field {f.GetName()} not compatible with the integration")
from Muscat.MeshContainers.Mesh import Mesh
if not isinstance(self.mesh, Mesh):
self.mesh.GetPosOfNodes()
derDir = self.numericalWeakForm.GetMaxDerivativeDimensionality()
if self.mesh.GetPointsDimensionality() < derDir:
Error(f"Cant compute the derivative in the direction number : {derDir}, but with a mesh of dimensionality : {self.mesh.GetPointsDimensionality()}")
raise RuntimeError(f"Cant compute the derivative in the direction number : {derDir}, but with a mesh of dimensionality : {self.mesh.GetPointsDimensionality()}")
[docs]
def SetOutputObjects(self, vK, iK, jK, rhs):
""" This is an advace feature, the user must put objects of the correct size"""
self.vK = vK
self.iK = iK
self.jK = jK
self.rhs = rhs
[docs]
def Allocate(self):
""" Function to allocate the memory to do the integration
This function must be called right before the integration
"""
numberOfVIJ = self.integrator.ComputeNumberOfVIJ(self.mesh, self.elementFilter)
if numberOfVIJ == 0 and (self.testFields is not None and len(self.testFields)*len(self.unknownFields)) > 0:
print("Warning!!! System with zero dofs")
raise Exception("Error!!! System with zero dofs")
# be sure to have valid pointer so we allocate at least one element
if numberOfVIJ == 0:
numberOfVIJ = 1
vK = np.zeros(numberOfVIJ, dtype=MuscatFloat)
iK = np.zeros(numberOfVIJ, dtype=MuscatIndex)
jK = np.zeros(numberOfVIJ, dtype=MuscatIndex)
rhs = np.zeros(self.integrator.GetTotalTestDofs(), dtype=MuscatFloat)
self.SetOutputObjects(vK, iK, jK, rhs)
[docs]
def Compute(self, nbThreads=None):
"""Execute the integration in multithread
Parameters
----------
forceMonoThread : bool
true to force the use of only one thread
"""
numberOfElementToTreat = self.elementFilter.GetElementSelectionSize()
if numberOfElementToTreat == 0:
print("Warning no elements selected for integration. Please Check your Element Filter")
return
elif not self.integrator.IsMultiThread() or nbThreads==1 or self.nbCPUs == 1:
Debug(f"Integration nbThreads={nbThreads}, nbCPUs={self.nbCPUs}")
return self.ComputeMonoThread()
elif numberOfElementToTreat < Muscat.FE.Integration.MultiThreadThreshold:
return self.ComputeMonoThread()
def InitSpaces(fields):
for f in fields:
if isinstance(f, IPField):
continue
f.space.InitSpaces()
LagrangeSpaceGeo.InitSpaces()
if self.unknownFields is not None:
InitSpaces(self.unknownFields)
InitSpaces(self.extraFields)
if self.testFields is not None:
InitSpaces(self.testFields)
allWorkload = GetListOfPartialElementFilter(self.elementFilter, self.nbCPUs, self.mesh)
workload = []
cpt = 0
totalTestDofs = self.integrator.GetTotalTestDofs()
for f in allWorkload:
if f.GetElementSelectionSize() > 0:
numberOfVIJ = self.integrator.ComputeNumberOfVIJ(self.mesh, f)
workload.append((f, self.vK[cpt:numberOfVIJ+cpt], self.iK[cpt:numberOfVIJ+cpt], self.jK[cpt:numberOfVIJ+cpt], totalTestDofs))
cpt += numberOfVIJ
with concurrent.futures.ThreadPoolExecutor(max_workers=self.nbCPUs) as executor:
results = executor.map(self._InternalComputeMonoThreadSafe, workload)
for rhs in results:
self.rhs += rhs
self.numberOfUsedvij = cpt
def _InternalComputeMonoThreadSafe(self, elementFilter_vK_iK_jK):
elementFilter, vK, iK, jK, totalTestDofs = elementFilter_vK_iK_jK
res = type(self)(self)
res.SetElementFilter(elementFilter)
res.SetOutputObjects(vK, iK, jK, np.zeros(totalTestDofs, dtype=MuscatFloat))
res.ComputeMonoThread()
return res.GetRhs()
[docs]
def ComputeMonoThread(self, elementFilter=None):
"""Execute the integration using only one tread (this function is no thread safe)
"""
self.integrator.PrepareFastIntegration(self.mesh, self.numericalWeakForm, self.vK, self.iK, self.jK, 0, self.rhs)
if elementFilter is None:
elementFilter = self.elementFilter
for selection in elementFilter(self.mesh):
self.integrator.ActivateElementType(selection.elements)
self.integrator.Integrate(self.numericalWeakForm, np.asarray(selection.indices, dtype=MuscatIndex))
self.numberOfUsedvij = self.integrator.GetNumberOfUsedIvij()
[docs]
def GetKvij(self):
"""Get the values to build the operator
Returns
-------
values : ndarray
values of the operator
tuple : (ndarray,ndarray)
indices (i,j)
"""
data = (self.vK[0:self.numberOfUsedvij], (self.iK[0:self.numberOfUsedvij], self.jK[0:self.numberOfUsedvij]))
return data
[docs]
def GetLinearSystemSize(self):
"""Get the size of the Linear System
Returns
-------
nbrows : int
Number of rows of the linear system
nbcols : int
Number of columns of the linear system
"""
return (self.integrator.GetTotalTestDofs(), self.integrator.GetTotalUnknownDofs())
[docs]
def GetK(self):
""" Get K as a scipy.sparse.coo_matrix
Returns
-------
K : coo_matrix
the assembled matrix
"""
return coo_matrix(self.GetKvij(), shape=self.GetLinearSystemSize())
[docs]
def GetRhs(self):
""" Get the right hand side term
Returns
-------
rhs : ndarray
Array with the values of the right hand side term
"""
return self.rhs
[docs]
def CheckIntegrity(GUI:bool=False):
obj = IntegrationBaseClass()
print(obj)
return "OK"
if __name__ == '__main__':# pragma: no cover
print(CheckIntegrity())