Source code for Muscat.FE.Integrators.PythonIntegration

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#


# distutils: language = c++
from typing import Optional, Dict, Union

import numpy as np
from scipy.sparse import coo_matrix

from Muscat.Helpers.Decorators import  froze_it
from Muscat.FE.Spaces.FESpaces import LagrangeSpaceGeo
from Muscat.FE.SymWeakForm import testcharacter
from Muscat.FE.Fields.FEField import FEField
from Muscat.Types import MuscatFloat, MuscatIndex
from Muscat.FE.IntegrationRules import MeshQuadrature

[docs] @froze_it class MonoElementsIntegral(): """ Class to assembly a formulation (weak form) into a matrix or a vector * unknownDofsOffset : offset for the unknown dofs * __ufs__ : unknown fields * testDofsOffset : offset for the test dofs * __tfs__ : unknown dofs * totalTestDofs : Total number fo test dofs (computed Automatically) * totalUnknownDofs : Total number fo unknown dofs (computed Automatically) * geoSpace : Geometry approximation space (computed Automatically) * __efs__ : Extra Fields * __cfs__ : (dic(str:float) ) Constants * integrationRule : integration rule for the integration * onlyEvaluation : To force the integrator to not multiply by the detJac * numberOfVIJ = 0 * F : rhs vector * vK, iK, jK : Vectors containing the values and indices for the entries of the operator * totalvijcpt : Number of non zero entries in the self.vK iK ant jK vector * maxNumberOfTerms : maximal number of terms in a monom (computed Automatically) * hasnormal : (computed Automatically) * __usedSpaces__ * geoSpaceNumber """ def __init__(self): super().__init__() self.unknownDofsOffset = None self.__ufs__ = None self.testDofsOffset = None self.__tfs__ = None self.totalTestDofs = 0 self.totalUnknownDofs= 0 self.geoSpace = None self.__efs__ = None self.__ipefs__ = None self.__cfs__ = {} self.integrationRule = None self.onlyEvaluation = False """ For the evaluation we only add the contribution without doing the integration the user is responsible of dividing by the mass matrix to get the correct values also the user can use a discontinues field to generate element surface stress (for example) """ self.numberOfVIJ = 0 self.F = None self.vK = None self.iK = None self.jK = None self.totalvijcpt = 0 self.maxNumberOfTerms = 0 self.hasnormal = False self.__usedSpaces__ = None self.__usedNumbering__ = None self.__usedValues__ = None self.__usedValuesAtIP__ = None self.geoSpaceNumber = 0 # internal variables dependent on the current element type been treated # (internal use only ) self.localSpaces = None self.localNumbering = None self.NumberOfShapeFunctionForEachSpace = None self.p = None self.w = None self.nodes = None self.connectivity = None self.NumberOfShapeFunctionTest = 0 self.NumberOfShapeFunctionUnknown = 0
[docs] def IsMultiThread(self): """In pure python the GIL block so a multiThread is useless""" return False
[docs] def Reset(self): self.numberOfVIJ = 0 self.totalvijcpt = 0
[docs] def SetUnknownFields(self,ufs): """ Set the fields used for the unknown space ufs : list(FEField) list of fields """ self.__ufs__ = ufs self.unknownDofsOffset = np.zeros(len(ufs),dtype=int) self.totalUnknownDofs = 0 cpt = 0 for uf in ufs: self.unknownDofsOffset[cpt] = self.totalUnknownDofs self.totalUnknownDofs += uf.numbering.size cpt += 1
[docs] def SetTestFields(self, tfs=None): """ Set the fields used for the test space tfs : list(FEField) list of fields if tfs is none then the unknown fields are used (Galerkin projection) """ if tfs is None: tfs = [ uf.GetTestField() for uf in self.__ufs__ ] self.__tfs__ = tfs self.testDofsOffset = np.zeros(len(tfs),dtype=int) self.totalTestDofs = 0 cpt =0 for tf in tfs: self.testDofsOffset[cpt] = self.totalTestDofs self.totalTestDofs += tf.numbering.size cpt += 1
[docs] def SetExtraFields(self,efs): """ Set the extra fields used in the weak formulation efs : list(FEField or IPField) list of fields """ self.__efs__ = [] self.__ipefs__ = [] for ef in efs: if isinstance(ef,FEField): self.__efs__.append(ef) else: self.__ipefs__.append(ef)
[docs] def SetConstants(self,cfs): """ Set The constants used in the weak formulation cfs : dic(str:float) constants dictionary """ self.__cfs__ = cfs
[docs] def ComputeNumberOfVIJ(self, mesh, elementFilter): """ Compute and return the number triplets to be calculated during integration """ self.numberOfVIJ = 0 for selection in elementFilter(mesh): if len(selection.indices) == 0: continue numberOfUsedElements = len(selection.indices) us = np.sum([f.space[selection.elementType].GetNumberOfShapeFunctions() for f in self.__ufs__] ) ts = np.sum([f.space[selection.elementType].GetNumberOfShapeFunctions() for f in self.__tfs__ ] ) self.numberOfVIJ += numberOfUsedElements*(us*ts) return self.numberOfVIJ
[docs] def SetIntegrationRule(self, integrationRuleOrName: Optional[Union[Dict,str]] =None): """ Function to set the integration Rule """ if integrationRuleOrName is None : integrationRuleOrName = "LagrangeIsoParamQuadrature" if isinstance( integrationRuleOrName, dict): self.integrationRule = integrationRuleOrName elif isinstance(integrationRuleOrName, str): from Muscat.FE.IntegrationRules import GetIntegrationRuleByName self.integrationRule = GetIntegrationRuleByName(integrationRuleOrName) elif isinstance(integrationRuleOrName, MeshQuadrature): self.integrationRule = dict(integrationRuleOrName.items()) else: raise Exception("Error setting the integration rule..")
[docs] def PrepareFastIntegration(self,mesh, wform, vK,iK,jK, cpt, F): """ Function to prepare the integration procedure, this function checks: - if the weak form needs the normal at each integration point - prepare the fields to be used - fills each term in the weak formulation with the data about the fields for fast access mesh : a mesh wform: the weak form to be integrated vK,iK,jK = the vectors to store the calculated values for the K op cpt """ self.vK = vK self.iK = iK self.jK = jK self.F = F ##we modified the internal structure (varialbe ending with _) for fast access self.hasnormal = False for monom in wform: for term in monom: if "Normal" in term.fieldName: self.hasnormal = True break if self.hasnormal is True: break constantNames = [] for x in self.__cfs__: constantNames.append(x) ## spaces treatement spacesId = {} spacesNames = {} spacesId[id(self.geoSpace)] = self.geoSpace spacesNames["Geometry_Space_internal"] = id(self.geoSpace) for uf in self.__ufs__: spacesId[id(uf.space)] = uf.space spacesNames[uf.name] = id(uf.space) for tf in self.__tfs__: spacesId[id(tf.space)] = tf.space spacesNames[tf.name] = id(tf.space) for ef in self.__efs__: spacesId[id(ef.space)] = ef.space spacesNames[ef.name] = id(ef.space) sId = list(spacesId.keys()) self.__usedSpaces__ = [ spacesId[k] for k in sId] self.geoSpaceNumber = sId.index(spacesNames["Geometry_Space_internal"]) spacesNames = { sn:sId.index(spacesNames[sn]) for sn in spacesNames } # Numbering treatement numberingId = {} numberingNames = {} for uf in self.__ufs__: numberingId[id(uf.numbering)] = uf.numbering numberingNames[uf.name] = id(uf.numbering) for tf in self.__tfs__: numberingId[id(tf.numbering)] = tf.numbering numberingNames[tf.name] = id(tf.numbering) for ef in self.__efs__: numberingId[id(ef.numbering)] = ef.numbering numberingNames[ef.name] = id(ef.numbering) nId = list(numberingId.keys()) self.__usedNumbering__ = [ numberingId[k] for k in nId] numberingNames = { sn:nId.index(numberingNames[sn]) for sn in numberingNames} # Values treatement valuesId = {} valuesNames = {} for ef in self.__efs__: valuesId[id(ef.data)] = ef.data valuesNames[ef.name] = id(ef.data) vId = list(valuesId.keys()) self.__usedValues__ = [ valuesId[k] for k in vId] valuesNames = { vnk:vId.index(vnv) for vnk, vnv in valuesNames.items() } self.maxNumberOfTerms = 0 for monom in wform: self.maxNumberOfTerms = max(self.maxNumberOfTerms, monom.GetNumberOfProds()) for term in monom: if "Normal" in term.fieldName : term.internalType = term.EnumNormal elif term.constant: if term.fieldName in constantNames: term.valuesIndex_ = constantNames.index(term.fieldName) term.internalType = term.EnumConstant elif term.fieldName in [f.name for f in self.__efs__]: term.spaceIndex_= spacesNames[term.fieldName] term.numberingIndex_= numberingNames[term.fieldName] term.valuesIndex_= valuesNames[term.fieldName] term.internalType = term.EnumExtraField elif term.fieldName in [f.name for f in self.__ipefs__]: term.valuesIndex_= [ef.name for ef in self.__ipefs__ ].index(term.fieldName) term.internalType = term.EnumExtraIPField else: raise Exception( f"Field : '{term.fieldName}' not found in Constants nor FEField nor IPFIelds" ) elif term.fieldName in [f.name for f in self.__ufs__] : term.spaceIndex_= spacesNames[term.fieldName] term.numberingIndex_= numberingNames[term.fieldName] #used for the offset term.valuesIndex_= [uf.name for uf in self.__ufs__ ].index(term.fieldName) term.internalType = term.EnumUnknownField elif term.fieldName in [f.name for f in self.__tfs__]: term.spaceIndex_= spacesNames[term.fieldName] term.numberingIndex_= numberingNames[term.fieldName] #term.valuesIndex_= valuesNames[term.fieldName] term.valuesIndex_= [uf.name for uf in self.__tfs__ ].index(term.fieldName) term.internalType = term.EnumTestField elif term.fieldName in [f.name for f in self.__efs__]: term.spaceIndex_= spacesNames[term.fieldName] term.numberingIndex_= numberingNames[term.fieldName] term.valuesIndex_= valuesNames[term.fieldName] term.internalType = term.EnumExtraField elif term.fieldName in [f.name for f in self.__ipefs__]: term.valuesIndex_= [ef.name for ef in self.__ipefs__ ].index(term.fieldName) term.internalType = term.EnumExtraIPField else : term.internalType = term.EnumError raise(Exception("Term " +str(term.fieldName) + " not found in the database " )) self.SetPoints(mesh.nodes)
[docs] def SetPoints(self,nodes): """ ## from https://github.com/cython/cython/wiki/tutorials-NumpyPointerToC multiply (arr, value) Takes a numpy arry as input, and multiplies each elemetn by value, in place param: array -- a 2-d numpy array of np.float64 """ self.nodes = nodes return None
[docs] def SetOnlyEvaluation(self,onlyEvaluation= True): """ To activate the Only Evaluation functionality For the evaluation we only add the constribution without doing the integration (multiplication by the detjac ) the user is responsible of dividing by the mass matrix to get the correct values . Ffr example the user can use a discontinues field to generate element surface stress """ self.onlyEvaluation = onlyEvaluation
[docs] def ActivateElementType(self,domain): """ Function to prepared the integration for a type of element domain : (ElementsContainer) """ self.localNumbering = [] for numbering in self.__usedNumbering__: self.localNumbering.append( numbering.get(domain.elementType,None) ) self.p = self.integrationRule[domain.elementType].points self.w = self.integrationRule[domain.elementType].weights self.geoSpace = LagrangeSpaceGeo[domain.elementType].GetSpaceEvaluatedAt(self.p,self.w) self.NumberOfShapeFunctionForEachSpace = np.zeros(len(self.__usedSpaces__), dtype=int) self.NumberOfShapeFunctionTest = np.sum([ f.space[domain.elementType].GetNumberOfShapeFunctions() for f in self.__tfs__ ]) self.NumberOfShapeFunctionUnknown = np.sum([ f.space[domain.elementType].GetNumberOfShapeFunctions() for f in self.__ufs__ ]) cpt = 0 self.localSpaces = list() for space in self.__usedSpaces__: if space is None : self.NumberOfShapeFunctionForEachSpace[cpt] = 0 self.localSpaces.append(None) else: self.localSpaces.append(space[domain.elementType].GetSpaceEvaluatedAt(self.p,self.w)) self.NumberOfShapeFunctionForEachSpace[cpt] = space[domain.elementType].GetNumberOfShapeFunctions() cpt += 1 self.connectivity = domain.connectivity self.__usedValuesAtIP__ = [ipef.data[domain.elementType] for ipef in self.__ipefs__ ]
[docs] def Integrate(self,wform,idstotreat): """ Main function to execute the integration wform: (PyWeakForm) Python Or C++ version of the weak form to be integrated idstotreat: list like (int) ids of the element to treat """ constantsNumerical = np.empty(len(self.__cfs__), dtype=MuscatFloat) cpt =0 for x in self.__cfs__: constantsNumerical[cpt] = self.__cfs__[x] cpt += 1 numberOfIntegrationPoints = len(self.w) maxNumberOfElementVIJ = self.NumberOfShapeFunctionTest*self.NumberOfShapeFunctionUnknown ev = np.empty(maxNumberOfElementVIJ*wform.GetNumberOfTerms()*numberOfIntegrationPoints,dtype=MuscatFloat) ei = np.empty(maxNumberOfElementVIJ*wform.GetNumberOfTerms()*numberOfIntegrationPoints,dtype=MuscatIndex) ej = np.empty(maxNumberOfElementVIJ*wform.GetNumberOfTerms()*numberOfIntegrationPoints,dtype=MuscatIndex) numberOfFields = len(self.__usedSpaces__) BxByBzI = [None] *numberOfFields NxNyNzI = [None] *numberOfFields for n in idstotreat: fillcpt =0 xcoor = self.nodes[self.connectivity[n],:] for ip in range(numberOfIntegrationPoints): #we recover the jacobian matrix Jacobian, Jdet, Jinv = self.geoSpace.GetJacobianDeterminantAndInverseAtIP(ip,xcoor) for i in range(numberOfFields): if self.localSpaces[i] is not None: NxNyNzI[i] = self.localSpaces[i].valN[ip] BxByBzI[i] = Jinv(self.localSpaces[i].valdphidxi[ip]) if self.hasnormal: normal = self.geoSpace.GetNormal(Jacobian) for monom in wform: factor = monom.prefactor if self.onlyEvaluation : # For the evaluation we only add the constribution without doing the integration # the user is responsible of dividing by the mass matrix to get the correct values # also the user can use a discontinues field to generate element surface stress (for example) pass else: # for the integration we multiply by the deteminant of the jac factor *= Jdet factor *= self.w[ip] hasright = False for term in monom: if term.internalType == term.EnumNormal : factor *= normal[term.derDegree] continue elif term.internalType == term.EnumConstant : factor *= constantsNumerical[term.valuesIndex_] continue elif term.internalType == term.EnumUnknownField : if term.derDegree == 1: right = BxByBzI[term.spaceIndex_][[term.derCoordIndex_],:] else: right = np.array([NxNyNzI[term.spaceIndex_],]) rightNumbering = self.localNumbering[term.numberingIndex_][n,:] + self.unknownDofsOffset[term.valuesIndex_] hasright = True l2 = self.NumberOfShapeFunctionForEachSpace[term.spaceIndex_] continue elif term.internalType == term.EnumTestField : if term.derDegree == 1: left = BxByBzI[term.spaceIndex_][term.derCoordIndex_] else: left = NxNyNzI[term.spaceIndex_] leftNumbering = self.localNumbering[term.numberingIndex_][n,:] + self.testDofsOffset[term.valuesIndex_] l1 = self.NumberOfShapeFunctionForEachSpace[term.spaceIndex_] continue elif term.internalType == term.EnumExtraField : if term.derDegree == 1: func = BxByBzI[term.spaceIndex_][term.derCoordIndex_] else: func = NxNyNzI[term.spaceIndex_] centerNumbering = self.localNumbering[term.numberingIndex_][n,:] vals = self.__usedValues__[term.valuesIndex_][centerNumbering] factor *= np.dot(func,vals) continue elif term.internalType == term.EnumExtraIPField : if term.derDegree == 1: raise Exception("Integration point field cant be derivated") val = self.__usedValuesAtIP__[term.valuesIndex_][n,ip] factor *= val else : raise(Exception("Cant treat term " + str(term.fieldName))) if factor == 0: continue if hasright: l = l1*l2 l2cpt = fillcpt for i in range(l1): for j in range(l2) : ev[l2cpt] = left[i]*right[0,j]*factor l2cpt +=1 l2cpt = fillcpt for i in range(l1): for j in range(l2) : ej[l2cpt] = rightNumbering[j] l2cpt += 1 l2cpt = fillcpt for j in range(l2) : for i in range(l1): ei[l2cpt] = leftNumbering[j] l2cpt += 1 fillcpt += l else: for i in range(l1): self.F[leftNumbering[i]] += left[i]*factor if fillcpt: data = coo_matrix((ev[:fillcpt], (ei[:fillcpt],ej[:fillcpt])), shape=( self.totalTestDofs,self.totalUnknownDofs)) data.sum_duplicates() data.eliminate_zeros() start = self.totalvijcpt stop = start+len(data.data) self.vK[start:stop] = data.data self.iK[start:stop] = data.row self.jK[start:stop] = data.col self.totalvijcpt += len(data.data)
[docs] def GetNumberOfUsedIvij(self): """ Return number of non zero values in the vectors vK,iK,jK """ return self.totalvijcpt
[docs] def AddToNumbefOfUsedIvij(self,data): """ add a value to the UsedIvij """ self.totalvijcpt += data
[docs] def GetTotalTestDofs(self): """ Return the number of dofs of the test space (number of rows of the K matrix) """ return self.totalTestDofs
[docs] def GetTotalUnknownDofs(self): """ Return the number of dofs of the Unknown space (number of cols of the K matrix) """ return self.totalUnknownDofs
[docs] def CheckIntegrity(GUI:bool=False): import Muscat.FE.Integration as Integration backup = Integration.UseCpp Integration.UseCpp = False res = "ok" try: from Muscat.FE.UnstructuredFeaSym import CheckIntegrity as CI res = CI(GUI) except: Integration.UseCpp = backup raise Integration.UseCpp = backup return res
if __name__ == '__main__':# pragma: no cover print(CheckIntegrity(True))