Source code for Muscat.IO.XdmfWriter

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#

"""Xdmf file writer
"""

import numpy as np
import os

from Muscat.Helpers.TextFormatHelper import TFormat

import Muscat.Containers.ElementsDescription as ED
from Muscat.Containers.Mesh import Mesh
from Muscat.IO.WriterBase import WriterBase as WriterBase
from Muscat.Helpers.MPIInterface import MPIInterface as MPI
from Muscat.Types import MuscatIndex, MuscatFloat
from Muscat.IO.XdmfTools import XdmfName,XdmfNumber, HasHdf5Support
from Muscat.FE.IntegrationRules import MeshQuadrature
[docs]def ArrayToString(data): return " ".join(str(x) for x in data)
#* Xdmf supports the following topology types: # * NoTopologyType # * Polyvertex - Unconnected Points # * Polyline - Line Segments # * Polygon - N Edge Polygon # * Triangle - 3 Edge Polygon # * Quadrilateral - 4 Edge Polygon # * Tetrahedron - 4 Triangular Faces # * Wedge - 4 Triangular Faces, Quadrilateral Base # * Hexahedron - 6 Quadrilateral Faces # * Edge_3 - 3 Node Quadratic Line # * Triangle_6 - 6 Node Quadratic Triangle # * Quadrilateral_8 - 8 Node Quadratic Quadrilateral # * Quadrilateral_9 - 9 Node Bi-Quadratic Quadrilateral # * Tetrahedron_10 - 10 Node Quadratic Tetrahedron # * Pyramid_13 - 13 Node Quadratic Pyramid # * Wedge_15 - 15 Node Quadratic Wedge # * Wedge_18 - 18 Node Bi-Quadratic Wedge # * Hexahedron_20 - 20 Node Quadratic Hexahedron # * Hexahedron_24 - 24 Node Bi-Quadratic Hexahedron # * Hexahedron_27 - 27 Node Tri-Quadratic Hexahedron # * Hexahedron_64 - 64 Node Tri-Cubic Hexahedron # * Hexahedron_125 - 125 Node Tri-Quartic Hexahedron # * Hexahedron_216 - 216 Node Tri-Quintic Hexahedron # * Hexahedron_343 - 343 Node Tri-Hexic Hexahedron # * Hexahedron_512 - 512 Node Tri-Septic Hexahedron # * Hexahedron_729 - 729 Node Tri-Octic Hexahedron # * Hexahedron_1000 - 1000 Node Tri-Nonic Hexahedron # * Hexahedron_1331 - 1331 Node Tri-Decic Hexahedron # * Hexahedron_Spectral_64 - 64 Node Spectral Tri-Cubic Hexahedron # * Hexahedron_Spectral_125 - 125 Node Spectral Tri-Quartic Hexahedron # * Hexahedron_Spectral_216 - 216 Node Spectral Tri-Quintic Hexahedron # * Hexahedron_Spectral_343 - 343 Node Spectral Tri-Hexic Hexahedron # * Hexahedron_Spectral_512 - 512 Node Spectral Tri-Septic Hexahedron # * Hexahedron_Spectral_729 - 729 Node Spectral Tri-Octic Hexahedron # * Hexahedron_Spectral_1000 - 1000 Node Spectral Tri-Nonic Hexahedron # * Hexahedron_Spectral_1331 - 1331 Node Spectral Tri-Decic Hexahedron # * Mixed - Mixture of Unstructured Topologies
[docs]def WriteMeshToXdmf(filename, baseMeshObject, PointFields = None, CellFields = None, GridFields= None, IntegrationPointData=None, PointFieldsNames = None, CellFieldsNames=None, GridFieldsNames=None, IntegrationPointDataNames=None, IntegrationRule=None, Binary= True, HDF5 = True): """Function API for writing mesh in the xdmf format file. Parameters ---------- fileName : str name of the file to be written baseMeshObject : Mesh the mesh to be written PointFields : list[np.ndarray], optional fields to write defined at the vertices of the mesh, by default None CellFields : list[np.ndarray], optional fields to write defined at the elements of the mesh, by default None GridFields : list[np.ndarray], optional grid fields to write, by default None IntegrationPointData : list[np.ndarray], optional fields defined at the integration points, by default None PointFieldsNames : list[str], optional names of the fields to write defined at the vertices of the mesh, by default None CellFieldsNames : list[str], optional names of the fields to write defined at the elements of the mesh, by default None GridFieldsNames : list[str], optional names of the grid fields to write, by default None IntegrationPointDataNames : list[str], optional names of the fields defined at the integration points, by default None IntegrationRule : dict[IntegrationRuleType], optional integration rules associated to the integration point data, by default None Binary : bool, optional if True, file is written as binary, by default True HDF5 : bool, optional if True, file is written as hdf5 format, by default True """ if PointFields is None: PointFields = [] if CellFields is None: CellFields = [] if GridFields is None: GridFields = [] if PointFieldsNames is None: PointFieldsNames = [] if CellFieldsNames is None: CellFieldsNames = [] if GridFieldsNames is None: GridFieldsNames = [] if IntegrationPointData is None: IntegrationPointData = [] if IntegrationPointDataNames is None: IntegrationPointDataNames = [] writer = XdmfWriter(filename) writer.SetBinary(Binary) writer.SetHdf5(HDF5) writer.Open() writer.Write(baseMeshObject, PointFields= PointFields, CellFields = CellFields, GridFields = GridFields, IntegrationPointData=IntegrationPointData, PointFieldsNames = PointFieldsNames, CellFieldsNames = CellFieldsNames, GridFieldsNames = GridFieldsNames, IntegrationPointDataNames=IntegrationPointDataNames, IntegrationRule=IntegrationRule, ) writer.Close()
#
[docs]class InMemoryFile(): """ Helper class to write the xmf part of the file into memory """ def __init__(self,saveFilePointer): self.data = "" self.saveFilePointer = saveFilePointer
[docs] def write(self,data): self.data += data
[docs] def tell(self): return 0
[docs] def seek(self,pos): return 0
[docs] def close(self): pass
[docs] def flush(self): pass
[docs]class Hdf5Storage(): """Class to deal with hdf5 storage""" def __init__(self,data, filename, datasetName): self.filename = filename self.type = None self.itemSize = 0 self.vectorSize = 0 self.usedByNInstances = 0 if data is not None: self.usedByNInstances += 1 self.type = data.dtype self.itemSize = data.dtype.itemsize self.vectorSize = data.size self.datasetName = datasetName def __str__(self): return str(self.vectorSize) +":"+str(self.usedByNInstances)
[docs] def GetData(self): from h5py import File as __File f = __File(self.filename, 'r') data = np.array(f[self.datasetName]) f.close() return data
[docs] def UpdateHeavyStorage(self,data): if self.usedByNInstances > 1: # pragma: no cover raise Exception("This pointer is used for more than 1 field please (overwrite or setup the writer with the option maxStorageSize=0") from h5py import File as __File if data.size != self.vectorSize: # pragma: no cover raise Exception("Size of data and storage not compatible") f = __File(self.filename, 'r+') dataset = f[self.datasetName] dataset.write_direct(data) #f.create_dataset(self.datasetName, data=data) f.close()
[docs]class BinaryStorage(): """Class to deal with binary storage""" def __init__(self,data=None, filePointer=None): self.filename = "" self.offset = 0 self.type = None self.itemSize = 0 self.vectorSize = 0 self.usedByNInstances = 0 if data is not None: self.usedByNInstances += 1 self.type = data.dtype self.itemSize = data.dtype.itemsize self.vectorSize = data.size if filePointer is not None: self.filename = filePointer.name self.offset = filePointer.tell() def __str__(self): return str(self.vectorSize) +":"+str(self.usedByNInstances)
[docs] def ChangePathOfBinaryStorage(self,newPath): self.filename = newPath + os.sep + os.path.basename(self.filename)
[docs] def GetData(self): f = open(self.filename,'rb') f.seek(self.offset,0) data = np.fromfile(f,self.type,self.vectorSize,sep="") f.close() return data
[docs] def UpdateHeavyStorage(self,data): if self.usedByNInstances > 1: # pragma: no cover raise Exception("This pointer is used for more than 1 field please (overwrite or setup the writer with the option maxStorageSize=0") if data.size != self.vectorSize: # pragma: no cover raise Exception("Size of data and storage not compatible") f = open(self.filename,'r+b') f.seek(self.offset,0) data.astype(self.type).ravel().tofile(f) f.close()
[docs]class XdmfWriter(WriterBase): """ Class to Write Xdmf files for: - classic finite element solutions - domain decomposition problem (multi mesh) - transient solution (the mesh changes in time) - solution written in parafac format (monolithic or in ddm mpi) """ def __init__(self, fileName = None): super().__init__() self.canHandleTemporal = True self.canHandleAppend = True self.canHandleMultidomain = True self.fileName = None self.timeSteps = [] self.parafacCpt = 0 self.ddmCpt = 0 self.currentTime = 0 self.__XmlSizeLimit = 0 self.__chunkSize = 2**30 self.automaticOpen = False try: import h5py self.__isHdf5 = True except: # pragma: no cover self.__isHdf5 = False self.__hdf5FileName = "" self.__hdf5FileNameOnly = None self.__hdf5FilePointer = None self.__hdf5NameCpt = 0 self.SetBinary(True) self.__binFileName = "" self.__filePointer = None #self.__isOpen = False self.__binaryCpt = 0 self.__hdf5cpt = 0 self.__binFileCounter = 0 self.__hdf5FileCounter = 0 self.__keepXmlFileInSaneState = True self.__isParafacFormat = False #set to off is you what to put the time at the end of the temporal grid #keep this option True to be compatible with the XDMF3 reader of ParaView self.__printTimeInsideEachGrid = True self.SetFileName(fileName) self.pointFieldsStorage = {} self.cellFieldsStorage = {} self.gridFieldsStorage = {} self.ipStorage = {} self.globalStorage = {} self.maxStorageSize = 50
[docs] def IsHdf5(self): return self.GetHdf5()
[docs] def SetBinary(self, val = True): """Sets the binary status of the file to read Parameters ---------- val : bool, optional if True, sets the file to read as binary, by default True """ super().SetBinary(val)
[docs] def isBinary(self): return super().isBinary() and not self.__isHdf5
[docs] def GetHdf5(self): return self.__isHdf5 and super().isBinary()
[docs] def SetHdf5(self, val=True): if val : self.SetBinary(True) try: import h5py self.__isHdf5 = val except: # pragma: no cover self.__isHdf5 = False if val: print("h5py not available using binary file for output") else: self.__isHdf5 = False
[docs] def SetChunkSize(self,size): self.__chunkSize = size
def __str__(self): res = 'XdmfWriter : \n' res += ' FileName : '+ str(self.fileName) +'\n' res += ' isParafacFormat : '+ ('True' if self.__isParafacFormat else "False") +'\n' if self.isBinary(): res += ' Binary output Active \n' if self.__binFileName is not None: res += ' Binary FileName : '+ self.__binFileName +'\n' if self.IsHdf5(): res += ' Hdf5 output Active \n' res += ' Hdf5 FileName : '+ self.__hdf5FileName +'\n' if self.IsTemporalOutput(): res += ' Temporal output Active \n' res += ' TimeSteps : '+ str(self.timeSteps) + '\n' if self.isOpen(): res += ' The File is Open!! \n' return res
[docs] def SetFileName(self, fileName ): """Sets the fileName parameter of the class Parameters ---------- string : str fileName to set """ if fileName is None : self.fileName = None self.__path = None return self.fileName = fileName self.__path = os.path.abspath(os.path.dirname(fileName)) self.binFileCounter = 0 self.NewBinaryFilename() self.NewHdf5Filename()
[docs] def SetParafac(self,val = True): self.__isParafacFormat = val
[docs] def IsParafacOutput(self): return self.__isParafacFormat
[docs] def NewBinaryFilename(self): name = os.path.splitext(os.path.abspath(self.fileName))[0] name += "" +str(self.__binFileCounter) if MPI.IsParallel(): # pragma: no cover name += "D"+ str(MPI.Rank()) name += ".bin" self.__binFileName = name self.__binFileNameOnly = os.path.basename(self.__binFileName) self.__binFileCounter +=1
[docs] def NewHdf5Filename(self): name = os.path.splitext(os.path.abspath(self.fileName))[0] name += "" +str(self.__hdf5FileCounter) if MPI.IsParallel(): # pragma: no cover name += "D"+ str(MPI.Rank()) name += ".h5" self.__hdf5FileName = name self.__hdf5FileNameOnly = os.path.basename(self.__hdf5FileName) self.__hdf5FileCounter +=1
[docs] def Step(self, dt = 1): self.currentTime += dt self.timeSteps.append(self.currentTime)
[docs] def SetXmlSizeLimit(self,val): self.__XmlSizeLimit= val
[docs] def Open(self, filename = None): """Open file for writing Parameters ---------- filename : str, optional name of the file to write """ # we don't use the open from WriterBase because the set binary is used # for the .bin file and not for the .xdmf file if self.isOpen() : print(TFormat.InRed("The file is already open !!!!!")) return if filename is not None: self.SetFileName(filename) ## we use unbuffered so we can repairer broken files easily try : # in python 3 we cant use unbuffered text I/O (bug???) #self.filePointer = open(self.fileName, 'w',0) if self.InAppendMode(): if self.isBinary() == False: raise(Exception("Append Mode only works in binary mode") ) self.filePointer = open(self.fileName, 'r+') self.filePointer.seek(-100,2) lines = self.filePointer.readlines() for line in lines: if line.find("<!--Temporal") != -1: l = line.find('pos="') r = line.find('" ',l+1) pos = int(line[l+5:r]) #__binFileCounter l = line.find('ter="') r = line.find('" ',l+1) binFileCounter = int(line[l+5:r]) self.NewBinaryFilename() self.__binaryFilePointer = open (self.__binFileName, "wb") self.__binaryCpt = 0 #time l = line.find('ime="') r = line.find('" ',l+1) currentTime = float(line[l+5:r]) self.filePointer.seek(pos) self._isOpen = True self.__binFileCounter = binFileCounter self.currentTime = currentTime return raise Exception("Unable Open file in append mode ") else: mpi = MPI() if mpi.IsParallel(): # pragma: no cover self.__keepXmlFileInSaneState = False if mpi.rank > 0: self.filePointer = InMemoryFile(self.filePointer) else: self.filePointer = open(self.fileName, 'w') else: self.filePointer = open(self.fileName, 'w') except: # pragma: no cover print(TFormat.InRed("Error File Not Open")) raise self._isOpen = True self.filePointer.write('<?xml version="1.0" encoding="utf-8"?>\n') self.filePointer.write('<Xdmf xmlns:xi="http://www.w3.org/2001/XInclude" Version="2.92">\n') self.filePointer.write('<Domain>\n') if self.IsTemporalOutput(): self.filePointer.write('<Grid Name="Grid_T" GridType="Collection" CollectionType="Temporal" >\n') if self.IsMultidomainOutput(): self.filePointer.write('<Grid Name="Grid_S" GridType="Collection" CollectionType="Spatial" >\n') ## here we recover the output for the slave nodes (rank> 0) and then we ## write this information in the master node (rank = 0) self._OpenParallelWriter() if self.IsParafacOutput(): self.filePointer.write('<Grid Name="Grid_P" GridType="Collection" CollectionType="None" >\n') if self.isBinary(): self.__binaryFilePointer = open (self.__binFileName, "wb") if self.IsHdf5(): import h5py self.__hdf5FilePointer = h5py.File(self.__hdf5FileName, 'w') if self.__keepXmlFileInSaneState: self.WriteTail()
def _OpenParallelWriter(self): mpi = MPI() if mpi.mpiOK: self.__keepXmlFileInSaneState = False if mpi.rank > 0: self.filePointer = InMemoryFile(self.filePointer ) def _CloseParallelWriter(self): mpi = MPI() if mpi.mpiOK and mpi.rank > 0: mpi.comm.send(self.filePointer.data, dest=0) self.filePointer = self.filePointer.saveFilePointer else: for i in range(1,mpi.size): data = mpi.comm.recv( source=i) self.filePointer.write(data)
[docs] def Close(self): """Closes writen file after writing operations are done """ if self.isOpen(): self.WriteTail() self.filePointer.close() self._isOpen = False if self.isBinary(): self.__binaryFilePointer.close() if self.IsHdf5(): self.__hdf5FilePointer.close()
#print("File : '" + self.fileName + "' is close.")
[docs] def WriteTail(self): """Writes the end of the file """ if self.isOpen(): filePosition = self.filePointer.tell() if self.IsParafacOutput(): self.filePointer.write('</Grid> <!-- Parafac grid -->\n') self._CloseParallelWriter() if self.IsMultidomainOutput() : self.filePointer.write('</Grid> <!-- collection grid -->\n') if self.IsTemporalOutput(): self.__WriteTime() self.filePointer.write(' </Grid><!--Temporal pos="'+str(filePosition )+'" __binFileCounter="'+str(self.__binFileCounter)+'" time="'+str(self.currentTime)+'" -->\n') self.filePointer.write(' </Domain>\n') self.filePointer.write('</Xdmf>\n') # we put the pointer just before the tail so we can continue writing # to the file for a new time step self.filePointer.seek(filePosition )
def __WriteGeoAndTopo(self, baseMeshObject: Mesh, name=None): if self.__isParafacFormat: if "ParafacDims" in baseMeshObject.props: self.filePointer.write(' <Information Name="Dims" Value="'+str(baseMeshObject.props["ParafacDims"])+'" /> \n') for i in range(baseMeshObject.props["ParafacDims"]): self.filePointer.write(' <Information Name="Dim'+str(i)+'" Value="'+baseMeshObject.props["ParafacDim"+str(i)]+'" /> \n') if "ParafacUnit0" in baseMeshObject.props: for i in range(baseMeshObject.props["ParafacDims"]): self.filePointer.write(' <Information Name="Unit'+str(i)+'" Value="'+baseMeshObject.props["ParafacUnit"+str(i)]+'" /> \n') self.filePointer.write(' <Geometry Type="XYZ">\n') if ( baseMeshObject.GetPointsDimensionality() == 2 ): nodes = baseMeshObject.GetPosOfNodes() nodes = np.concatenate((nodes,np.zeros((baseMeshObject.GetNumberOfNodes(),1))), axis=1 ) self.__WriteDataItem(nodes.ravel(), (baseMeshObject.GetNumberOfNodes(),3) , name="GEO_U_"+str(name) ) else: self.__WriteDataItem(baseMeshObject.GetPosOfNodes().ravel(), (baseMeshObject.GetNumberOfNodes(),3) , name="GEO_U_"+str(name) ) self.filePointer.write(' </Geometry>\n') if len(baseMeshObject.elements) > 1: self.filePointer.write(' <Topology TopologyType="Mixed" NumberOfElements="{0}">\n'.format(baseMeshObject.GetNumberOfElements())) nbTotalEntries = 0 for elemType, data in baseMeshObject.elements.items(): nbTotalEntries += data.GetNumberOfElements()*(data.GetNumberOfNodesPerElement()+1) if data.elementType == ED.ElementType.Bar_2 or data.elementType == ED.ElementType.Point_1: nbTotalEntries += data.GetNumberOfElements() dataArray = np.empty((nbTotalEntries,),dtype=MuscatIndex) cpt =0 for elemType, data in baseMeshObject.elements.items(): xdmfElemTypeNumber = XdmfNumber[elemType] numberOfNodePerElem = data.GetNumberOfNodesPerElement() # Init container extraSize = 1 + (xdmfElemTypeNumber in (0x1,0x2)) totalSize = data.GetNumberOfElements() * (numberOfNodePerElem + extraSize) #tempContainer = np.zeros((data.GetNumberOfElements() ,numberOfNodePerElem + extraSize),dtype=MuscatIndex) tempContainer = dataArray[cpt:cpt+totalSize] tempContainer.shape = (data.GetNumberOfElements() ,numberOfNodePerElem + extraSize) # First row is ElemTypeNumber tempContainer[:,0] = xdmfElemTypeNumber # Second row is (1,2) is ElemTypeNumber in (0x1,0x2) or doesn't exist if xdmfElemTypeNumber == 0x2 : tempContainer[:,1] = 2 elif xdmfElemTypeNumber == 0x1 : tempContainer[:,1] = 1 # Last row are the connectivity tempContainer[:,extraSize:] = data.connectivity # Unravel to dataArray and update counter cpt += totalSize self.__WriteDataItem(dataArray, name="Topo_U_"+str(name) ) elif len(baseMeshObject.elements): elements = list(baseMeshObject.elements.keys())[0] elementType = XdmfName[elements] self.filePointer.write(' <Topology TopologyType="{}" NumberOfElements="{}" '.format(elementType,baseMeshObject.GetNumberOfElements())) if XdmfNumber[elements] == 0x2: self.filePointer.write('NodesPerElement="2" ') if XdmfNumber[elements] == 0x1: self.filePointer.write('NodesPerElement="1" ') self.filePointer.write(' >\n') self.__WriteDataItem(baseMeshObject.elements[elements].connectivity.ravel(), name="Topo_U_"+str(name) ) else: self.filePointer.write(' <Topology TopologyType="mixed" NumberOfElements="0" >\n') self.filePointer.write(' </Topology> \n') def __WriteAttribute(self,data,name,center,baseMeshObject): data = data.view() shape = None if center == "Node": nbData = baseMeshObject.GetNumberOfNodes() elif center == "Cell": nbData = baseMeshObject.GetNumberOfElements() elif center == "Grid": nbData = 1 else: raise Exception('Cant treat this type of field support' + center ) # pragma: no cover if data.size == nbData: arrayType = "Scalar" elif data.size == nbData*3: arrayType = "Vector" elif data.size == nbData*6: arrayType = "Vector" elif data.size == nbData*9: arrayType = "Vector" elif data.size == nbData*2: # add an extra zeros to the data and print it out as Vector 3 if baseMeshObject.GetPointsDimensionality() == 2: shape = (data.size//2,1,2,) else: shape = (data.size//2,2,) if len(data.shape) <= 2: data.shape = shape #data = data.transpose(1,0,2) shape = list(data.shape) shape [-1] = 3 shape = tuple(shape) data1 = np.zeros(shape, dtype=data.dtype) data1[...,0:2] = data self.__WriteAttribute(data1.ravel(),name,center,baseMeshObject) return else: print(TFormat.InRed("I don't kow how to treat fields '"+ str(name)+"' with " +str(data.size/nbData) +" components")) # pragma: no cover print(TFormat.InRed("Data has size : " + str(data.size) )) # pragma: no cover print(TFormat.InRed("But support has size : " + str(nbData) )) # pragma: no cover raise Exception # pragma: no cover self.filePointer.write(' <Attribute Center="'+center+'" Name="'+name+'" Type="'+arrayType+'">\n')# try: self.__WriteDataItem(data.ravel(),shape,name=name) except: # pragma: no cover print("Error Writing heavy data of field: " + str(name)) raise self.filePointer.write(' </Attribute>\n')
[docs] def WriteIntegrationsPoints(self, quadrature: MeshQuadrature, mesh: Mesh): """Writes integration points information to file, this will write only for elements present on the mesh """ for elemName, elems in mesh.elements.items(): if elems.GetNumberOfElements() == 0 : continue data = quadrature[elemName] points = data.points npData = np.asarray(points,dtype=float).copy() nip = npData.shape[0] if npData.shape[1] < 3: b = np.zeros((nip,3)) b[:,:-1] = npData npData = b self.filePointer.write(' <Information Name="QP" Value="')# self.filePointer.write(str(ED.numberOfNodes[elemName]) + " ") self.filePointer.write(str(XdmfNumber[elemName]) + " ") self.filePointer.write(str(nip) + '" > \n') self.__WriteDataItem(npData.ravel(),[npData.size],name="ip") self.filePointer.write('</Information> \n')#
[docs] def WriteIntegrationsPointData(self, names, allData): """Writes integration points data to file """ for i, name in enumerate(names): data = allData[i] self.filePointer.write(' <Information Name="IPF" Value="'+str(name)+'" > \n')# self.ipStorage[name] = self.__WriteDataItem(data.ravel(),[data.size],name='IPD_'+name) self.filePointer.write('</Information> \n')#
[docs] def NextDomain(self): self.parafacCpt = 0 self.ddmCpt += 1 if self.IsMultidomainOutput() : if self.IsParafacOutput(): self.filePointer.write('</Grid> <!-- Parafac grid -->\n') self.filePointer.write('<Grid Name="Grid_P" GridType="Collection" CollectionType="None" >\n') else: raise Exception("Cant make a new domain without a multi domain output")
[docs] def MakeStep(self, Time = None, TimeStep = None): """Increases time step in output files Parameters ---------- Time : float, optional time value of the time step, by default None TimeStep : int, optional number of the time step, by default None """ if self.IsMultidomainOutput(): self.filePointer.write('</Grid> <!-- collection grid -->\n') self.filePointer.write('<Grid Name="Collection" GridType="Collection" CollectionType="Spatial" >\n') dt = 1 if Time is not None: dt = Time - self.currentTime elif TimeStep is not None: dt = TimeStep if self.IsTemporalOutput(): self.Step(dt) if self.IsTemporalOutput() and self.__printTimeInsideEachGrid and (self.IsMultidomainOutput() == False) : self.filePointer.write(' <Time Value="'+str(self.currentTime)+'" /> \n')
[docs] def Write(self,mesh, PointFields = None, CellFields = None, GridFields= None, PointFieldsNames = None, CellFieldsNames= None, GridFieldsNames=None , Time= None, TimeStep = None,domainName=None, IntegrationRule=None, IntegrationPointData=None, IntegrationPointDataNames=None ): """Write data to file in xdmf format Parameters ---------- mesh : Mesh the mesh to be written PointFields : list[np.ndarray], optional fields to write defined at the vertices of the mesh, by default None CellFields : list[np.ndarray], optional fields to write defined at the elements of the mesh, by default None GridFields : list[np.ndarray], optional grid fields to write, by default None PointFieldsNames : list[str], optional names of the fields to write defined at the vertices of the mesh, by default None CellFieldsNames : list[str], optional names of the fields to write defined at the elements of the mesh, by default None GridFieldsNames : list[str], optional names of the grid fields to write, by default None Time : float, optional time at which the data is read, by default None TimeStep : int, optional time index at which the data is read, by default None domainName : str, optional Not Used, by default None IntegrationRule : dict[IntegrationRuleType], optional integration rules associated to the integration point data, by default None IntegrationPointData : list[np.ndarray], optional fields defined at the integration points, by default None IntegrationPointDataNames : list[str], optional names of the fields defined at the integration points, by default None """ if (Time is not None or TimeStep is not None) and self.IsMultidomainOutput(): raise(Exception("set time using MakeStep, not the Write option") ) if PointFields is None: PointFields = [] if CellFields is None: CellFields = [] if GridFields is None: GridFields = [] if PointFieldsNames is None: PointFieldsNames = [] if CellFieldsNames is None: CellFieldsNames = [] if GridFieldsNames is None: GridFieldsNames = [] if IntegrationPointData is None: IntegrationPointData = [] if IntegrationPointDataNames is None: IntegrationPointDataNames = [] self.pointFieldsStorage = {} self.cellFieldsStorage = {} self.gridFieldsStorage = {} self.ipStorage = {} if not self.isOpen() : if self.automaticOpen: self.Open() else: print(TFormat.InRed("Please Open The writer First!!!")) raise Exception if not self.IsMultidomainOutput(): dt = 1 if Time is not None: dt = Time - self.currentTime elif TimeStep is not None: dt = TimeStep if self.IsTemporalOutput(): self.Step(dt) if self.IsParafacOutput (): suffix = "P" + str(self.parafacCpt) self.parafacCpt += 1 else: suffix = str(len(self.timeSteps)) if self.IsMultidomainOutput(): if MPI.IsParallel(): # pragma: no cover suffix += "_D" + str(MPI.Rank()) else: suffix += "_D" + str(self.ddmCpt) #if we have a constantRectilinear mesh with more than only "bulk" # elements, we add a collection to add all the rest of the elements if False and mesh.IsConstantRectilinear() and len(mesh.elements) > 1: self.filePointer.write('<Grid Name="Grid_S'+suffix+'" GridType="Collection" CollectionType="Spatial" >\n') if self.IsTemporalOutput() and self.__printTimeInsideEachGrid and (self.IsMultidomainOutput() == False) : self.filePointer.write(' <Time Value="'+str(self.currentTime)+'" /> \n') self.filePointer.write(' <Grid Name="Bulk">\n') else: self.filePointer.write(' <Grid Name="Grid_'+suffix+'">\n') if self.IsTemporalOutput() and self.__printTimeInsideEachGrid and (self.IsMultidomainOutput() == False) : self.filePointer.write(' <Time Value="'+str(self.currentTime)+'" /> \n') self.__WriteGeoAndTopo(mesh,name=suffix) self.__WriteNodesTagsElementsTags(mesh,PointFieldsNames,CellFieldsNames) self.__WriteNodesFieldsElementsFieldsGridFields(mesh, PointFieldsNames,PointFields, CellFieldsNames,CellFields, GridFieldsNames,GridFields) if IntegrationRule is not None: self.WriteIntegrationsPoints(IntegrationRule, mesh) self.WriteIntegrationsPointData(IntegrationPointDataNames,IntegrationPointData) if False and mesh.IsConstantRectilinear() and len(mesh.elements) > 1: self.filePointer.write(' </Grid>\n') from Muscat.Containers.Mesh import Mesh tempMesh = Mesh() tempMesh.nodes = mesh.nodes tempMesh.originalIDNodes = mesh.originalIDNodes for name, data in mesh.elements.items(): if data.mutable : tempMesh.elements[name] = data self.filePointer.write(' <Grid Name="Sets">\n') self.__WriteGeoAndTopo(tempMesh) self.__WriteNodesTagsElementsTags(tempMesh,[],[]) self.filePointer.write(' </Grid>\n') self.filePointer.write(' </Grid>\n') if(self.__keepXmlFileInSaneState): self.WriteTail() self.filePointer.flush() if self.isBinary() :# pragma: no cover self.__binaryFilePointer.flush()
def __WriteNodesFieldsElementsFieldsGridFields(self,baseMeshObject, PointFieldsNames,PointFields, CellFieldsNames,CellFields, GridFieldsNames,GridFields): for i in range(len(PointFields)): name = 'PField'+str(i) if len(PointFields) == len(PointFieldsNames): name = PointFieldsNames[i] self.pointFieldsStorage[name] = self.__WriteAttribute(np.asarray(PointFields[i]), name, "Node",baseMeshObject) for i in range(len(CellFields)): name = 'CField'+str(i) if len(CellFields) == len(CellFieldsNames): name = CellFieldsNames[i] self.cellFieldsStorage[name] = self.__WriteAttribute(np.asarray(CellFields[i]), name, "Cell",baseMeshObject) for i in range(len(GridFields)): name = 'GField'+str(i) if len(GridFields) == len(GridFieldsNames): name = GridFieldsNames[i] self.gridFieldsStorage[name] = self.__WriteAttribute(np.asarray(GridFields[i]), name, "Grid",baseMeshObject) def __WriteNodesTagsElementsTags(self,baseMeshObject,PointFieldsNames,CellFieldsNames): for tag in baseMeshObject.nodesTags: if tag.name in PointFieldsNames: name = "Tag_" + tag.name else: name = tag.name data = np.zeros((baseMeshObject.GetNumberOfNodes(),1),dtype=np.int8) data[baseMeshObject.nodesTags[tag.name].GetIds()] = 1 self.__WriteAttribute(np.asarray(data), name, "Node",baseMeshObject) #Cell Tags baseMeshObject.PrepareForOutput() if False and baseMeshObject.IsConstantRectilinear(): cellTags = baseMeshObject.GetNamesOfElemTagsBulk() GetElementsInTag = baseMeshObject.GetElementsInTagBulk else: cellTags = baseMeshObject.GetNamesOfElementTags() GetElementsInTag = baseMeshObject.GetElementsInTag for tagname in cellTags: if tagname in CellFieldsNames: name = "Tag_" + tagname else: name = tagname data = GetElementsInTag(tagname) res = np.zeros((baseMeshObject.GetNumberOfElements(),1),dtype=np.int8) res[data] = 1 self.__WriteAttribute(np.asarray(res), name, "Cell", baseMeshObject) def __WriteTime(self): """ this function is called by the WriteTail, this function must NOT change the state of the instance, also no writing to binary neither hdf5 files """ if self.isOpen() and self.__printTimeInsideEachGrid == False: #self.filePointer.write('<Time TimeType="List">\n') #self.__WriteDataItem(self.timeSteps) #self.filePointer.write('</Time>\n') self.filePointer.write('<Time Value="'+ (" ".join(str(x) for x in self.timeSteps)) +'"/>\n') def __WriteDataItem(self,_data, _shape= None,name=None): import numpy as np data = np.asarray(_data) if _shape is None: _shape = _data.shape shape = np.asarray(_shape) if self.isOpen(): if data.dtype == np.float64: typename = 'Float' s = data.dtype.itemsize elif data.dtype == np.float32: typename = 'Float' s = data.dtype.itemsize elif data.dtype == np.int32: typename = 'Int' s = data.dtype.itemsize elif data.dtype == np.int64: typename = 'Int' s = data.dtype.itemsize elif data.dtype == np.int8: typename = 'Char' s = data.dtype.itemsize else: print(f"Warning : skipping field '{name}' of type '{type(data[0])}' not supported'") return None dimension = ArrayToString(shape) if self.isBinary() and len(data) > self.__XmlSizeLimit:# pragma: no cover # to test this feature a big file must be created (so we don't test it) if self.__binaryCpt > self.__chunkSize : self.__binaryFilePointer.close() self.NewBinaryFilename() self.__binaryFilePointer = open (self.__binFileName, "wb") self.__binaryCpt = 0 gsData,gsStorage = self.globalStorage.get(str(name),(None,None)) dataToWrite = data.ravel() if np.array_equal(gsData, dataToWrite): binaryFile = gsStorage.filenameOnly seek = gsStorage.offset gsStorage.usedByNInstances += 1 self.globalStorage[str(name)] = (gsData.copy(),gsStorage) res = gsStorage else: res = BinaryStorage(data=data,filePointer=self.__binaryFilePointer) res.filenameOnly = self.__binFileNameOnly binaryFile = self.__binFileNameOnly seek = self.__binaryCpt data.ravel().tofile(self.__binaryFilePointer) self.__binaryCpt += s*len(dataToWrite) if len(self.globalStorage) > self.maxStorageSize-1: usage = [x[1].usedByNInstances for x in self.globalStorage.values()] #print(usage) minUsage = min(usage) newGlobalStorage = {} GT = len(self.globalStorage) for i,d in self.globalStorage.items(): if d[1].usedByNInstances == minUsage: GT -= 1 if GT < self.maxStorageSize-1: break continue newGlobalStorage[i]=d self.globalStorage = newGlobalStorage self.globalStorage[str(name)] = (dataToWrite.copy(),res) self.filePointer.write(' <DataItem Format="Binary"'+ ' NumberType="'+typename+'"'+ ' Dimensions="'+dimension+'" '+ ' Seek="'+str(seek)+'" '+ ' Endian="Native" '+ ' Precision="'+str(s)+'" '+ ' Compression="Raw" >') self.filePointer.write(binaryFile) self.filePointer.write('</DataItem>\n') return res elif self.IsHdf5(): if self.__hdf5cpt > self.__chunkSize : import h5py self.__hdf5FilePointer.close() self.NewHdf5Filename() self.__hdf5FilePointer = h5py.File(self.__hdf5FileName, 'w') self.__hdf5cpt = 0 if name is None: name = "dataset_"+ str(self.__hdf5NameCpt) else: name += "_"+ str(self.__hdf5NameCpt) self.__hdf5NameCpt += 1 self.__hdf5FilePointer.create_dataset(name, data=data) self.__hdf5cpt += s*data.size self.filePointer.write(f' <DataItem Format="HDF" NumberType="{typename}" Dimensions="{dimension}" Precision="{s}" >') self.filePointer.write(f"{self.__hdf5FileNameOnly}:{name}") self.filePointer.write('</DataItem>\n') res = Hdf5Storage(data=data,filename=self.__hdf5FileName ,datasetName=name) return res else: self.filePointer.write(' <DataItem Format="XML" NumberType="'+typename+'" Dimensions="'+dimension+'">') self.filePointer.write(" ".join(str(x) for x in data.ravel())) self.filePointer.write('</DataItem>\n') return None
from Muscat.IO.IOFactory import RegisterWriterClass RegisterWriterClass(".xdmf",XdmfWriter) RegisterWriterClass(".xmf",XdmfWriter)
[docs]def WriteTest(tempdir, Temporal, Binary, Hdf5 ): from Muscat.Containers.ConstantRectilinearMeshTools import CreateConstantRectilinearMesh myMesh = CreateConstantRectilinearMesh(dimensions=[2,3,4], spacing=[0.1, 0.1, 0.1], origin=[-2.5,-1.2,-1.5]) print(myMesh.GetNumberOfElements()) dataT = np.arange(24,dtype=np.float32) dataT.shape = (2,3,4) dataDep = np.arange(24*3)+0.3 dataDep.shape = (2,3,4,3) writer = XdmfWriter(tempdir + 'TestOutput_Bin_'+str(Binary)+'_hdf5_'+str(Hdf5)+'_Temp_'+str(Temporal)+'.xmf') writer.SetTemporal(Temporal) writer.SetBinary(Binary) writer.SetHdf5(Hdf5) writer.Open() print(writer) writer.Write(myMesh,PointFields=[dataT, dataDep], PointFieldsNames=["Temp","Dep"],CellFields=[np.arange(6)],CellFieldsNames=['S'], Time=0) writer.Write(myMesh,GridFields=[0, 1], GridFieldsNames=['K','P'], TimeStep = 1) writer.Close()
[docs]def WriteTestAppend(tempdir, Temporal, Binary): from Muscat.Containers.ConstantRectilinearMeshTools import CreateConstantRectilinearMesh myMesh = CreateConstantRectilinearMesh(dimensions=[2,3,4], spacing=[0.1, 0.1, 0.1], origin=[-2.5,-1.2,-1.5]) dataT = np.arange(24,dtype=np.float32) dataT.shape = (2,3,4) dataDep = np.arange(24*3)+0.3 dataDep.shape = (2,3,4,3) writer = XdmfWriter(tempdir + 'TestOutput_Bin_'+str(Binary)+'_Temp_'+str(Temporal)+'.xmf') writer.SetAppendMode(True) writer.SetTemporal(Temporal) writer.SetBinary(Binary) writer.Open() print(writer) writer.Write(myMesh,PointFields=[dataT, dataDep], PointFieldsNames=["Temp","Dep"],CellFields=[np.arange(6)],CellFieldsNames=['S']) writer.Write(myMesh,GridFields=[0, 1], GridFieldsNames=['K','P'], TimeStep = 1) writer.Close()
[docs]def CheckIntegrity(GUI:bool=False): imf = InMemoryFile(0) imf.write("Hello ") imf.write("World.") imf.tell() imf.seek(0) imf.flush() imf.close() np.testing.assert_equal(imf.data, "Hello World.") from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory from Muscat.Containers.ConstantRectilinearMeshTools import CreateConstantRectilinearMesh import Muscat.Containers.Mesh as UM from Muscat.Containers.MeshCreationTools import CreateMeshOfTriangles, CreateMeshOf tempdir = TemporaryDirectory.GetTempPath() #test with empty elements from Muscat.Containers.MeshCreationTools import CreateMeshOfTriangles, CreateMeshOf dim0 = CreateMeshOf([[0.,0.,0],[1.,2.,0],[1, 3, 0]], np.arange(3,dtype=MuscatIndex).reshape((3,1)), ED.Point_1) dim0.props['ParafacDims'] = 2 dim0.props['ParafacDim0'] = "x" dim0.props['ParafacDim1'] = "y" dim1 = CreateMeshOf([[0.,0.,0],[1.,0,0],[2, 0, 0]], np.arange(3,dtype=MuscatIndex).reshape((3,1)), ED.Point_1) dim1.props['ParafacDims'] = 1 dim1.props['ParafacDim0'] = "T" writer = XdmfWriter() writer.SetFileName(None) writer.SetXmlSizeLimit(0) writer.SetBinary(True) writer.SetHdf5(False) writer.SetParafac(True) writer.Open(filename=tempdir+'parafacNoElements.pxdmf') mode0 = np.arange(3*3,dtype=MuscatFloat).reshape((3,3)) writer.Write(dim0,PointFields=[ mode0 ],PointFieldsNames=[ "VField_0"] ) writer.Write(dim1,PointFields=[ mode0 ],PointFieldsNames=[ "VField_0"]) writer.Close() res = CreateMeshOfTriangles([[0.,0.,0],[1.,2.,3],[1, 3, 2]], np.asarray([[0,1,2]])) print(res) WriteMeshToXdmf(tempdir+"TestUnstructured.xdmf", res, PointFields = [np.array([1.,2,3])], CellFields =[ np.array([1])] ,GridFields= [[0], np.array([1,2,3]).astype(np.int64) ], PointFieldsNames = ["PS"], CellFieldsNames = ["CS"], GridFieldsNames = ["GS", "GV"] , Binary= True) elements = res.GetElementsOfType(ED.Bar_2) elements.AddNewElement([1,2],1) elements = res.GetElementsOfType(ED.Point_1) elements.AddNewElement([0],2) res.nodes = np.ascontiguousarray(res.nodes[:,0:2]) res.GetNodalTag("First_Point").AddToTag(0) res.AddElementToTagUsingOriginalId(1,"bars") WriteMeshToXdmf(tempdir+"TestUnstructured_multiElements.xdmf", res, PointFields = [np.array([1.,2,3])], CellFields =[ np.array([1,0,4])] ,GridFields= [[0]], PointFieldsNames = ["PS"], CellFieldsNames = ["CS"], GridFieldsNames = ["GS"] , Binary= True) #---------------------- res = UM.Mesh() WriteMeshToXdmf(tempdir+"TestUnstructured_EmptyMesh.xdmf", res) res.SetNodes([[0,0,0],[1,0,0]],np.arange(2)) elements = res.GetElementsOfType(ED.Point_1) elements.AddNewElement([0],1) WriteMeshToXdmf(tempdir+"TestUnstructured_OnlyPoints.xdmf", res) res = UM.Mesh() res.nodes = np.array([[0,0,0],[1,0,0]],dtype=MuscatFloat) res.originalIDNodes = np.arange(res.GetNumberOfNodes(), dtype= MuscatIndex) elements = res.GetElementsOfType(ED.Bar_2) elements.AddNewElement([0,1],1) WriteMeshToXdmf(tempdir+"TestUnstructured_OnlyBars.xdmf", res) #---------------------- WriteTest(tempdir, False, False, False) WriteTest(tempdir, False, True, False) WriteTest(tempdir, False, False, True) WriteTest(tempdir,True, False, False) WriteTest(tempdir,True, True, False) WriteTest(tempdir,True, False, True) WriteTestAppend(tempdir,True, True) WriteMeshToXdmf(tempdir+'testDirect.xdmf',CreateConstantRectilinearMesh(dimensions=[1,1,1],origin=[0,0,0], spacing= [0,0,0] ) ) writer = XdmfWriter() writer.SetFileName(None) writer.SetFileName(tempdir+'testErrors.xdmf') writer.SetXmlSizeLimit(0) writer.Open() ## test of the errors try: writer.SetTemporal() return 'Not ok'# pragma: no cover except: pass try: writer.SetBinary() return 'Not ok'# pragma: no cover except: pass # no error anymore just a warning #try: writer.Open() #except: # pass writer.Close() # for the moment we comment this test (verification of "unable to open file") # need to find a way to raise an exception in linux and in windows with the # same filename # #try: # writer.Open("c:\\") # in windows this will raise an exception # writer.Open("\") # in linux this will raise an exception # return 'Not ok'# pragma: no cover #except: # pass print("ConstantRectilinearMesh in 2D") CRM2D = CreateConstantRectilinearMesh(dimensions=[2,3], origin=[0,0], spacing=[1,1]) writer = XdmfWriter() writer.SetFileName(None) writer.SetXmlSizeLimit(0) writer.SetBinary(True) writer.Open(filename=tempdir+'testDirect.xdmf') writer.Write(CRM2D, PointFields = [ np.zeros((CRM2D.GetNumberOfNodes(),) ).astype(np.float32), np.zeros((CRM2D.GetNumberOfNodes(),) ).astype(int) ], PointFieldsNames = ["Test", "testInt"], CellFields= [ np.arange(CRM2D.GetNumberOfElements()*2 ).astype(np.float64)], CellFieldsNames = [ "TestV"] ) writer.Close() CRM2D = CreateConstantRectilinearMesh(dimensions=[2,3], origin=[0,0], spacing=[1,1]) writer = XdmfWriter() writer.SetFileName(None) writer.SetXmlSizeLimit(0) writer.SetBinary(True) writer.SetMultidomain() writer.Open(filename=tempdir+'testDirectTwoDomains.xdmf') writer.Write(CRM2D, PointFields = [ np.zeros((CRM2D.GetNumberOfNodes(),) ).astype(np.float32), np.zeros((CRM2D.GetNumberOfNodes(),) ).astype(int) ], PointFieldsNames = ["Test", "testInt"], CellFields= [ np.arange(CRM2D.GetNumberOfElements()*2 ).astype(np.float64)], CellFieldsNames = [ "TestV"] ) CRM3D = CreateConstantRectilinearMesh(dimensions=[2,3,4], origin=[0,0,0], spacing=[1,1,1]) writer.Write(CRM3D) writer.Close() if GUI : from Muscat.Actions.OpenInParaView import OpenInParaView OpenInParaView(filename = tempdir+'testDirectTwoDomains.xdmf') writer = XdmfWriter() writer.SetFileName(None) writer.SetXmlSizeLimit(0) writer.SetBinary(True) writer.SetMultidomain() writer.SetTemporal() writer.Open(filename=tempdir+'testDirectTwoDomainsTwoSteps.xdmf') writer.Write(CRM2D) writer.Write(CRM3D) writer.MakeStep(Time=1.5) writer.Write(CRM3D) writer.Write(CRM2D) writer.Close() if GUI: from Muscat.Actions.OpenInParaView import OpenInParaView OpenInParaView(filename = tempdir+'testDirectTwoDomainsTwoSteps.xdmf') ####### work for PXDMF 2.0 ################# writer = XdmfWriter() writer.SetFileName(None) writer.SetXmlSizeLimit(0) writer.SetBinary(True) writer.SetHdf5(False) writer.SetParafac(True) writer.Open(filename=tempdir+'parafac.pxdmf') from Muscat.Containers.MeshCreationTools import CreateUniformMeshOfBars mesh1DTime = CreateUniformMeshOfBars(2,5,10) mesh1DTime.props['ParafacDims'] = 1 mesh1DTime.props['ParafacDim0'] = "T" mesh1DTime.props["ParafacUnit0"] = "s" mesh2DParameters = CreateConstantRectilinearMesh(dimensions=[4,4], origin=[0,0], spacing=[1,1]) mesh2DParameters.props['ParafacDims'] = 2 mesh2DParameters.props['ParafacDim0'] = "Px" mesh2DParameters.props['ParafacDim1'] = "Py" mesh3DSpace = CreateConstantRectilinearMesh(dimensions=[8,8,8], origin=[0,0,0], spacing=[1,1,1]) from Muscat.FE.IntegrationRules import LagrangeIsoParamQuadrature elementaryQuadrature = LagrangeIsoParamQuadrature[ED.Quadrangle_4] IntegrationPointData = np.arange(mesh2DParameters.GetNumberOfElements()*elementaryQuadrature.GetNumberOfPoints() )+0.1 IntegrationPointAllData = [IntegrationPointData] writer.Write(mesh2DParameters, IntegrationRule=LagrangeIsoParamQuadrature, IntegrationPointData=IntegrationPointAllData, IntegrationPointDataNames=["IPId_0"]) print(IntegrationPointAllData) print(writer.ipStorage["IPId_0"].GetData()) writer.ipStorage["IPId_0"].UpdateHeavyStorage(IntegrationPointData+10) print(str(writer.ipStorage["IPId_0"])) writer.Write(mesh1DTime, CellFields = [np.arange(mesh1DTime.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"]) writer.Write(mesh3DSpace, CellFields = [np.arange(mesh3DSpace.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"]) writer.Close() ####### work for PXDMF 2.0 HDF5 On ################# writer = XdmfWriter() writer.SetFileName(None) writer.SetXmlSizeLimit(0) writer.SetBinary(True) writer.SetHdf5(True) writer.SetParafac(True) writer.Open(filename=tempdir+'parafachdf5.pxdmf') from Muscat.Containers.MeshCreationTools import CreateUniformMeshOfBars mesh1DTime = CreateUniformMeshOfBars(2,5,10) mesh1DTime.props['ParafacDims'] = 1 mesh1DTime.props['ParafacDim0'] = "T" mesh2DParameters = CreateConstantRectilinearMesh(dimensions=[4,4], origin=[0,0], spacing=[1,1]) mesh2DParameters.props['ParafacDims'] = 2 mesh2DParameters.props['ParafacDim0'] = "Px" mesh2DParameters.props['ParafacDim1'] = "Py" mesh3DSpace = CreateConstantRectilinearMesh(dimensions=[8,8,8], origin=[0,0,0], spacing=[1,1,1]) from Muscat.FE.IntegrationRules import LagrangeIsoParamQuadrature elementaryQuadrature = LagrangeIsoParamQuadrature[ED.Quadrangle_4] IntegrationPointData = np.arange(mesh2DParameters.GetNumberOfElements()*elementaryQuadrature.GetNumberOfPoints() )+0.1 IntegrationPointAllData = [IntegrationPointData] writer.Write(mesh1DTime, CellFields = [np.arange(mesh1DTime.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"]) writer.Write(mesh3DSpace, CellFields = [np.arange(mesh3DSpace.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"]) writer.Write(mesh2DParameters, IntegrationRule=LagrangeIsoParamQuadrature, IntegrationPointData=IntegrationPointAllData, IntegrationPointDataNames=["IPId_0"]) writer.Close() print(IntegrationPointAllData) writer.ipStorage["IPId_0"].UpdateHeavyStorage(IntegrationPointData+10) print(str(writer.ipStorage["IPId_0"])) np.testing.assert_equal(writer.ipStorage["IPId_0"].GetData(),IntegrationPointData+10) if GUI : from Muscat.Actions.OpenInParaView import OpenInParaView OpenInParaView(filename = tempdir+'parafac.pxdmf') from Muscat.IO.XdmfReader import XdmfReader as XR f = XR(filename = tempdir+'testDirect.xdmf' ) f.lazy = False f.Read() print(tempdir+'testDirect.xdmf') f.xdmf.GetDomain(0).GetGrid(0).GetFieldsOfType("Cell") print(f.xdmf.GetDomain(0).GetGrid(0).attributes ) domain = f.xdmf.GetDomain(0) grid = domain.GetGrid(0) grid.GetFieldsOfType("Cell") ### Domain Decomposition and Parafac res = CheckIntegrityDDM(GUI=GUI) if res.lower() != "ok": return res CheckIntegrityHdf5(tempdir) return 'ok'
[docs]def CheckIntegrityHdf5(tempdir): from Muscat.Containers.ConstantRectilinearMeshTools import CreateConstantRectilinearMesh myMesh = CreateConstantRectilinearMesh(dimensions=[20,30,40], spacing=[0.1, 0.1, 0.1], origin=[-2.5,-1.2,-1.5] ) dataT = np.arange(24000,dtype=np.float32) dataT.shape = (20,30,40) dataDep = np.arange(24000*3)+0.3 dataDep.shape = (20,30,40,3) writer = XdmfWriter(tempdir + 'TestHDF5_.xmf') writer.SetChunkSize(2**20) writer.SetTemporal(True) writer.SetHdf5(True) writer.Open() print(writer) for i in range(3): dataT = np.random.rand(24000) dataT.shape = (20,30,40) dataDep = np.random.rand(24000*3)+0.3 dataDep.shape = (20,30,40,3) writer.Write(myMesh,PointFields=[dataT, dataDep], PointFieldsNames=["Temp","Dep"],CellFields=[np.arange(19*29*39)],CellFieldsNames=['S']) writer.Close()
[docs]def CheckIntegrityDDM(GUI:bool=False): """ this test function can be launched using the mpirun -n 2 ... to test the writer in mpi mode """ from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory tempdir = TemporaryDirectory.GetTempPath() from Muscat.Containers.MeshCreationTools import CreateUniformMeshOfBars mesh1D = CreateUniformMeshOfBars(0,5,10) mesh1D.props['ParafacDims'] = 1 mesh1D.props['ParafacDim0'] = "T" from Muscat.Helpers.MPIInterface import MPIInterface as MPI writer = XdmfWriter() writer.SetFileName(None) writer.SetXmlSizeLimit(0) writer.SetBinary(True) writer.SetMultidomain() writer.SetParafac(True) mpi = MPI() print("rank ", mpi.rank) writer.Open(filename=tempdir+'DDM_parafac.pxdmf') mpi = MPI() if mpi.Rank() == 0: mesh1D.props['ParafacDim0'] = "D0_P0" writer.Write(mesh1D, CellFields = [np.arange(mesh1D.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"]) mesh1D.nodes[:,0] += 1 mesh1D.props['ParafacDim0'] = "D0_P1" writer.Write(mesh1D, CellFields = [np.arange(mesh1D.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"]) if mpi.IsParallel() == 0 : writer.NextDomain() if mpi.Rank() == mpi.size-1 : mesh1D.nodes[:,0] += 1 mesh1D.props['ParafacDim0'] = "D1_P0" writer.Write(mesh1D, CellFields = [np.arange(mesh1D.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"]) mesh1D.nodes[:,0] += 1 mesh1D.props['ParafacDim0'] = "D1_P1" writer.Write(mesh1D, CellFields = [np.arange(mesh1D.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"]) writer.Close() if HasHdf5Support(): return "ok" else: return "ok, but no hdf5 support"
if __name__ == '__main__': #from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory #TemporaryDirectory.SetTempPath("/tmp/Muscatrectory__is42mzi_safe_to_delete/") print(CheckIntegrity(GUI=False)) # pragma: no cover