# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
"""Xdmf file writer
"""
import numpy as np
import os
from Muscat.Helpers.TextFormatHelper import TFormat
import Muscat.Containers.ElementsDescription as ED
from Muscat.Containers.Mesh import Mesh
from Muscat.IO.WriterBase import WriterBase as WriterBase
from Muscat.Helpers.MPIInterface import MPIInterface as MPI
from Muscat.Types import MuscatIndex, MuscatFloat
from Muscat.IO.XdmfTools import XdmfName,XdmfNumber, HasHdf5Support
from Muscat.FE.IntegrationRules import MeshQuadrature
[docs]def ArrayToString(data):
return " ".join(str(x) for x in data)
#* Xdmf supports the following topology types:
# * NoTopologyType
# * Polyvertex - Unconnected Points
# * Polyline - Line Segments
# * Polygon - N Edge Polygon
# * Triangle - 3 Edge Polygon
# * Quadrilateral - 4 Edge Polygon
# * Tetrahedron - 4 Triangular Faces
# * Wedge - 4 Triangular Faces, Quadrilateral Base
# * Hexahedron - 6 Quadrilateral Faces
# * Edge_3 - 3 Node Quadratic Line
# * Triangle_6 - 6 Node Quadratic Triangle
# * Quadrilateral_8 - 8 Node Quadratic Quadrilateral
# * Quadrilateral_9 - 9 Node Bi-Quadratic Quadrilateral
# * Tetrahedron_10 - 10 Node Quadratic Tetrahedron
# * Pyramid_13 - 13 Node Quadratic Pyramid
# * Wedge_15 - 15 Node Quadratic Wedge
# * Wedge_18 - 18 Node Bi-Quadratic Wedge
# * Hexahedron_20 - 20 Node Quadratic Hexahedron
# * Hexahedron_24 - 24 Node Bi-Quadratic Hexahedron
# * Hexahedron_27 - 27 Node Tri-Quadratic Hexahedron
# * Hexahedron_64 - 64 Node Tri-Cubic Hexahedron
# * Hexahedron_125 - 125 Node Tri-Quartic Hexahedron
# * Hexahedron_216 - 216 Node Tri-Quintic Hexahedron
# * Hexahedron_343 - 343 Node Tri-Hexic Hexahedron
# * Hexahedron_512 - 512 Node Tri-Septic Hexahedron
# * Hexahedron_729 - 729 Node Tri-Octic Hexahedron
# * Hexahedron_1000 - 1000 Node Tri-Nonic Hexahedron
# * Hexahedron_1331 - 1331 Node Tri-Decic Hexahedron
# * Hexahedron_Spectral_64 - 64 Node Spectral Tri-Cubic Hexahedron
# * Hexahedron_Spectral_125 - 125 Node Spectral Tri-Quartic Hexahedron
# * Hexahedron_Spectral_216 - 216 Node Spectral Tri-Quintic Hexahedron
# * Hexahedron_Spectral_343 - 343 Node Spectral Tri-Hexic Hexahedron
# * Hexahedron_Spectral_512 - 512 Node Spectral Tri-Septic Hexahedron
# * Hexahedron_Spectral_729 - 729 Node Spectral Tri-Octic Hexahedron
# * Hexahedron_Spectral_1000 - 1000 Node Spectral Tri-Nonic Hexahedron
# * Hexahedron_Spectral_1331 - 1331 Node Spectral Tri-Decic Hexahedron
# * Mixed - Mixture of Unstructured Topologies
[docs]def WriteMeshToXdmf(filename,
baseMeshObject,
PointFields = None,
CellFields = None,
GridFields= None,
IntegrationPointData=None,
PointFieldsNames = None,
CellFieldsNames=None,
GridFieldsNames=None,
IntegrationPointDataNames=None,
IntegrationRule=None,
Binary= True,
HDF5 = True):
"""Function API for writing mesh in the xdmf format file.
Parameters
----------
fileName : str
name of the file to be written
baseMeshObject : Mesh
the mesh to be written
PointFields : list[np.ndarray], optional
fields to write defined at the vertices of the mesh, by default None
CellFields : list[np.ndarray], optional
fields to write defined at the elements of the mesh, by default None
GridFields : list[np.ndarray], optional
grid fields to write, by default None
IntegrationPointData : list[np.ndarray], optional
fields defined at the integration points, by default None
PointFieldsNames : list[str], optional
names of the fields to write defined at the vertices of the mesh, by default None
CellFieldsNames : list[str], optional
names of the fields to write defined at the elements of the mesh, by default None
GridFieldsNames : list[str], optional
names of the grid fields to write, by default None
IntegrationPointDataNames : list[str], optional
names of the fields defined at the integration points, by default None
IntegrationRule : dict[IntegrationRuleType], optional
integration rules associated to the integration point data, by default None
Binary : bool, optional
if True, file is written as binary, by default True
HDF5 : bool, optional
if True, file is written as hdf5 format, by default True
"""
if PointFields is None:
PointFields = []
if CellFields is None:
CellFields = []
if GridFields is None:
GridFields = []
if PointFieldsNames is None:
PointFieldsNames = []
if CellFieldsNames is None:
CellFieldsNames = []
if GridFieldsNames is None:
GridFieldsNames = []
if IntegrationPointData is None:
IntegrationPointData = []
if IntegrationPointDataNames is None:
IntegrationPointDataNames = []
writer = XdmfWriter(filename)
writer.SetBinary(Binary)
writer.SetHdf5(HDF5)
writer.Open()
writer.Write(baseMeshObject,
PointFields= PointFields,
CellFields = CellFields,
GridFields = GridFields,
IntegrationPointData=IntegrationPointData,
PointFieldsNames = PointFieldsNames,
CellFieldsNames = CellFieldsNames,
GridFieldsNames = GridFieldsNames,
IntegrationPointDataNames=IntegrationPointDataNames,
IntegrationRule=IntegrationRule,
)
writer.Close()
#
[docs]class InMemoryFile():
"""
Helper class to write the xmf part of the file into memory
"""
def __init__(self,saveFilePointer):
self.data = ""
self.saveFilePointer = saveFilePointer
[docs] def write(self,data):
self.data += data
[docs] def tell(self):
return 0
[docs] def seek(self,pos):
return 0
[docs]class Hdf5Storage():
"""Class to deal with hdf5 storage"""
def __init__(self,data, filename, datasetName):
self.filename = filename
self.type = None
self.itemSize = 0
self.vectorSize = 0
self.usedByNInstances = 0
if data is not None:
self.usedByNInstances += 1
self.type = data.dtype
self.itemSize = data.dtype.itemsize
self.vectorSize = data.size
self.datasetName = datasetName
def __str__(self):
return str(self.vectorSize) +":"+str(self.usedByNInstances)
[docs] def GetData(self):
from h5py import File as __File
f = __File(self.filename, 'r')
data = np.array(f[self.datasetName])
f.close()
return data
[docs] def UpdateHeavyStorage(self,data):
if self.usedByNInstances > 1: # pragma: no cover
raise Exception("This pointer is used for more than 1 field please (overwrite or setup the writer with the option maxStorageSize=0")
from h5py import File as __File
if data.size != self.vectorSize: # pragma: no cover
raise Exception("Size of data and storage not compatible")
f = __File(self.filename, 'r+')
dataset = f[self.datasetName]
dataset.write_direct(data)
#f.create_dataset(self.datasetName, data=data)
f.close()
[docs]class BinaryStorage():
"""Class to deal with binary storage"""
def __init__(self,data=None, filePointer=None):
self.filename = ""
self.offset = 0
self.type = None
self.itemSize = 0
self.vectorSize = 0
self.usedByNInstances = 0
if data is not None:
self.usedByNInstances += 1
self.type = data.dtype
self.itemSize = data.dtype.itemsize
self.vectorSize = data.size
if filePointer is not None:
self.filename = filePointer.name
self.offset = filePointer.tell()
def __str__(self):
return str(self.vectorSize) +":"+str(self.usedByNInstances)
[docs] def ChangePathOfBinaryStorage(self,newPath):
self.filename = newPath + os.sep + os.path.basename(self.filename)
[docs] def GetData(self):
f = open(self.filename,'rb')
f.seek(self.offset,0)
data = np.fromfile(f,self.type,self.vectorSize,sep="")
f.close()
return data
[docs] def UpdateHeavyStorage(self,data):
if self.usedByNInstances > 1: # pragma: no cover
raise Exception("This pointer is used for more than 1 field please (overwrite or setup the writer with the option maxStorageSize=0")
if data.size != self.vectorSize: # pragma: no cover
raise Exception("Size of data and storage not compatible")
f = open(self.filename,'r+b')
f.seek(self.offset,0)
data.astype(self.type).ravel().tofile(f)
f.close()
[docs]class XdmfWriter(WriterBase):
"""
Class to Write Xdmf files for:
- classic finite element solutions
- domain decomposition problem (multi mesh)
- transient solution (the mesh changes in time)
- solution written in parafac format (monolithic or in ddm mpi)
"""
def __init__(self, fileName = None):
super().__init__()
self.canHandleTemporal = True
self.canHandleAppend = True
self.canHandleMultidomain = True
self.fileName = None
self.timeSteps = []
self.parafacCpt = 0
self.ddmCpt = 0
self.currentTime = 0
self.__XmlSizeLimit = 0
self.__chunkSize = 2**30
self.automaticOpen = False
try:
import h5py
self.__isHdf5 = True
except: # pragma: no cover
self.__isHdf5 = False
self.__hdf5FileName = ""
self.__hdf5FileNameOnly = None
self.__hdf5FilePointer = None
self.__hdf5NameCpt = 0
self.SetBinary(True)
self.__binFileName = ""
self.__filePointer = None
#self.__isOpen = False
self.__binaryCpt = 0
self.__hdf5cpt = 0
self.__binFileCounter = 0
self.__hdf5FileCounter = 0
self.__keepXmlFileInSaneState = True
self.__isParafacFormat = False
#set to off is you what to put the time at the end of the temporal grid
#keep this option True to be compatible with the XDMF3 reader of ParaView
self.__printTimeInsideEachGrid = True
self.SetFileName(fileName)
self.pointFieldsStorage = {}
self.cellFieldsStorage = {}
self.gridFieldsStorage = {}
self.ipStorage = {}
self.globalStorage = {}
self.maxStorageSize = 50
[docs] def IsHdf5(self):
return self.GetHdf5()
[docs] def SetBinary(self, val = True):
"""Sets the binary status of the file to read
Parameters
----------
val : bool, optional
if True, sets the file to read as binary, by default True
"""
super().SetBinary(val)
[docs] def isBinary(self):
return super().isBinary() and not self.__isHdf5
[docs] def GetHdf5(self):
return self.__isHdf5 and super().isBinary()
[docs] def SetHdf5(self, val=True):
if val :
self.SetBinary(True)
try:
import h5py
self.__isHdf5 = val
except: # pragma: no cover
self.__isHdf5 = False
if val:
print("h5py not available using binary file for output")
else:
self.__isHdf5 = False
[docs] def SetChunkSize(self,size):
self.__chunkSize = size
def __str__(self):
res = 'XdmfWriter : \n'
res += ' FileName : '+ str(self.fileName) +'\n'
res += ' isParafacFormat : '+ ('True' if self.__isParafacFormat else "False") +'\n'
if self.isBinary():
res += ' Binary output Active \n'
if self.__binFileName is not None:
res += ' Binary FileName : '+ self.__binFileName +'\n'
if self.IsHdf5():
res += ' Hdf5 output Active \n'
res += ' Hdf5 FileName : '+ self.__hdf5FileName +'\n'
if self.IsTemporalOutput():
res += ' Temporal output Active \n'
res += ' TimeSteps : '+ str(self.timeSteps) + '\n'
if self.isOpen():
res += ' The File is Open!! \n'
return res
[docs] def SetFileName(self, fileName ):
"""Sets the fileName parameter of the class
Parameters
----------
string : str
fileName to set
"""
if fileName is None :
self.fileName = None
self.__path = None
return
self.fileName = fileName
self.__path = os.path.abspath(os.path.dirname(fileName))
self.binFileCounter = 0
self.NewBinaryFilename()
self.NewHdf5Filename()
[docs] def SetParafac(self,val = True):
self.__isParafacFormat = val
[docs] def IsParafacOutput(self):
return self.__isParafacFormat
[docs] def NewBinaryFilename(self):
name = os.path.splitext(os.path.abspath(self.fileName))[0]
name += "" +str(self.__binFileCounter)
if MPI.IsParallel(): # pragma: no cover
name += "D"+ str(MPI.Rank())
name += ".bin"
self.__binFileName = name
self.__binFileNameOnly = os.path.basename(self.__binFileName)
self.__binFileCounter +=1
[docs] def NewHdf5Filename(self):
name = os.path.splitext(os.path.abspath(self.fileName))[0]
name += "" +str(self.__hdf5FileCounter)
if MPI.IsParallel(): # pragma: no cover
name += "D"+ str(MPI.Rank())
name += ".h5"
self.__hdf5FileName = name
self.__hdf5FileNameOnly = os.path.basename(self.__hdf5FileName)
self.__hdf5FileCounter +=1
[docs] def Step(self, dt = 1):
self.currentTime += dt
self.timeSteps.append(self.currentTime)
[docs] def SetXmlSizeLimit(self,val):
self.__XmlSizeLimit= val
[docs] def Open(self, filename = None):
"""Open file for writing
Parameters
----------
filename : str, optional
name of the file to write
"""
# we don't use the open from WriterBase because the set binary is used
# for the .bin file and not for the .xdmf file
if self.isOpen() :
print(TFormat.InRed("The file is already open !!!!!"))
return
if filename is not None:
self.SetFileName(filename)
## we use unbuffered so we can repairer broken files easily
try :
# in python 3 we cant use unbuffered text I/O (bug???)
#self.filePointer = open(self.fileName, 'w',0)
if self.InAppendMode():
if self.isBinary() == False:
raise(Exception("Append Mode only works in binary mode") )
self.filePointer = open(self.fileName, 'r+')
self.filePointer.seek(-100,2)
lines = self.filePointer.readlines()
for line in lines:
if line.find("<!--Temporal") != -1:
l = line.find('pos="')
r = line.find('" ',l+1)
pos = int(line[l+5:r])
#__binFileCounter
l = line.find('ter="')
r = line.find('" ',l+1)
binFileCounter = int(line[l+5:r])
self.NewBinaryFilename()
self.__binaryFilePointer = open (self.__binFileName, "wb")
self.__binaryCpt = 0
#time
l = line.find('ime="')
r = line.find('" ',l+1)
currentTime = float(line[l+5:r])
self.filePointer.seek(pos)
self._isOpen = True
self.__binFileCounter = binFileCounter
self.currentTime = currentTime
return
raise Exception("Unable Open file in append mode ")
else:
mpi = MPI()
if mpi.IsParallel(): # pragma: no cover
self.__keepXmlFileInSaneState = False
if mpi.rank > 0:
self.filePointer = InMemoryFile(self.filePointer)
else:
self.filePointer = open(self.fileName, 'w')
else:
self.filePointer = open(self.fileName, 'w')
except: # pragma: no cover
print(TFormat.InRed("Error File Not Open"))
raise
self._isOpen = True
self.filePointer.write('<?xml version="1.0" encoding="utf-8"?>\n')
self.filePointer.write('<Xdmf xmlns:xi="http://www.w3.org/2001/XInclude" Version="2.92">\n')
self.filePointer.write('<Domain>\n')
if self.IsTemporalOutput():
self.filePointer.write('<Grid Name="Grid_T" GridType="Collection" CollectionType="Temporal" >\n')
if self.IsMultidomainOutput():
self.filePointer.write('<Grid Name="Grid_S" GridType="Collection" CollectionType="Spatial" >\n')
## here we recover the output for the slave nodes (rank> 0) and then we
## write this information in the master node (rank = 0)
self._OpenParallelWriter()
if self.IsParafacOutput():
self.filePointer.write('<Grid Name="Grid_P" GridType="Collection" CollectionType="None" >\n')
if self.isBinary():
self.__binaryFilePointer = open (self.__binFileName, "wb")
if self.IsHdf5():
import h5py
self.__hdf5FilePointer = h5py.File(self.__hdf5FileName, 'w')
if self.__keepXmlFileInSaneState:
self.WriteTail()
def _OpenParallelWriter(self):
mpi = MPI()
if mpi.mpiOK:
self.__keepXmlFileInSaneState = False
if mpi.rank > 0:
self.filePointer = InMemoryFile(self.filePointer )
def _CloseParallelWriter(self):
mpi = MPI()
if mpi.mpiOK and mpi.rank > 0:
mpi.comm.send(self.filePointer.data, dest=0)
self.filePointer = self.filePointer.saveFilePointer
else:
for i in range(1,mpi.size):
data = mpi.comm.recv( source=i)
self.filePointer.write(data)
[docs] def Close(self):
"""Closes writen file after writing operations are done
"""
if self.isOpen():
self.WriteTail()
self.filePointer.close()
self._isOpen = False
if self.isBinary():
self.__binaryFilePointer.close()
if self.IsHdf5():
self.__hdf5FilePointer.close()
#print("File : '" + self.fileName + "' is close.")
[docs] def WriteTail(self):
"""Writes the end of the file
"""
if self.isOpen():
filePosition = self.filePointer.tell()
if self.IsParafacOutput():
self.filePointer.write('</Grid> <!-- Parafac grid -->\n')
self._CloseParallelWriter()
if self.IsMultidomainOutput() :
self.filePointer.write('</Grid> <!-- collection grid -->\n')
if self.IsTemporalOutput():
self.__WriteTime()
self.filePointer.write(' </Grid><!--Temporal pos="'+str(filePosition )+'" __binFileCounter="'+str(self.__binFileCounter)+'" time="'+str(self.currentTime)+'" -->\n')
self.filePointer.write(' </Domain>\n')
self.filePointer.write('</Xdmf>\n')
# we put the pointer just before the tail so we can continue writing
# to the file for a new time step
self.filePointer.seek(filePosition )
def __WriteGeoAndTopo(self, baseMeshObject: Mesh, name=None):
if self.__isParafacFormat:
if "ParafacDims" in baseMeshObject.props:
self.filePointer.write(' <Information Name="Dims" Value="'+str(baseMeshObject.props["ParafacDims"])+'" /> \n')
for i in range(baseMeshObject.props["ParafacDims"]):
self.filePointer.write(' <Information Name="Dim'+str(i)+'" Value="'+baseMeshObject.props["ParafacDim"+str(i)]+'" /> \n')
if "ParafacUnit0" in baseMeshObject.props:
for i in range(baseMeshObject.props["ParafacDims"]):
self.filePointer.write(' <Information Name="Unit'+str(i)+'" Value="'+baseMeshObject.props["ParafacUnit"+str(i)]+'" /> \n')
self.filePointer.write(' <Geometry Type="XYZ">\n')
if ( baseMeshObject.GetPointsDimensionality() == 2 ):
nodes = baseMeshObject.GetPosOfNodes()
nodes = np.concatenate((nodes,np.zeros((baseMeshObject.GetNumberOfNodes(),1))), axis=1 )
self.__WriteDataItem(nodes.ravel(), (baseMeshObject.GetNumberOfNodes(),3) , name="GEO_U_"+str(name) )
else:
self.__WriteDataItem(baseMeshObject.GetPosOfNodes().ravel(), (baseMeshObject.GetNumberOfNodes(),3) , name="GEO_U_"+str(name) )
self.filePointer.write(' </Geometry>\n')
if len(baseMeshObject.elements) > 1:
self.filePointer.write(' <Topology TopologyType="Mixed" NumberOfElements="{0}">\n'.format(baseMeshObject.GetNumberOfElements()))
nbTotalEntries = 0
for elemType, data in baseMeshObject.elements.items():
nbTotalEntries += data.GetNumberOfElements()*(data.GetNumberOfNodesPerElement()+1)
if data.elementType == ED.ElementType.Bar_2 or data.elementType == ED.ElementType.Point_1:
nbTotalEntries += data.GetNumberOfElements()
dataArray = np.empty((nbTotalEntries,),dtype=MuscatIndex)
cpt =0
for elemType, data in baseMeshObject.elements.items():
xdmfElemTypeNumber = XdmfNumber[elemType]
numberOfNodePerElem = data.GetNumberOfNodesPerElement()
# Init container
extraSize = 1 + (xdmfElemTypeNumber in (0x1,0x2))
totalSize = data.GetNumberOfElements() * (numberOfNodePerElem + extraSize)
#tempContainer = np.zeros((data.GetNumberOfElements() ,numberOfNodePerElem + extraSize),dtype=MuscatIndex)
tempContainer = dataArray[cpt:cpt+totalSize]
tempContainer.shape = (data.GetNumberOfElements() ,numberOfNodePerElem + extraSize)
# First row is ElemTypeNumber
tempContainer[:,0] = xdmfElemTypeNumber
# Second row is (1,2) is ElemTypeNumber in (0x1,0x2) or doesn't exist
if xdmfElemTypeNumber == 0x2 : tempContainer[:,1] = 2
elif xdmfElemTypeNumber == 0x1 : tempContainer[:,1] = 1
# Last row are the connectivity
tempContainer[:,extraSize:] = data.connectivity
# Unravel to dataArray and update counter
cpt += totalSize
self.__WriteDataItem(dataArray, name="Topo_U_"+str(name) )
elif len(baseMeshObject.elements):
elements = list(baseMeshObject.elements.keys())[0]
elementType = XdmfName[elements]
self.filePointer.write(' <Topology TopologyType="{}" NumberOfElements="{}" '.format(elementType,baseMeshObject.GetNumberOfElements()))
if XdmfNumber[elements] == 0x2:
self.filePointer.write('NodesPerElement="2" ')
if XdmfNumber[elements] == 0x1:
self.filePointer.write('NodesPerElement="1" ')
self.filePointer.write(' >\n')
self.__WriteDataItem(baseMeshObject.elements[elements].connectivity.ravel(), name="Topo_U_"+str(name) )
else:
self.filePointer.write(' <Topology TopologyType="mixed" NumberOfElements="0" >\n')
self.filePointer.write(' </Topology> \n')
def __WriteAttribute(self,data,name,center,baseMeshObject):
data = data.view()
shape = None
if center == "Node":
nbData = baseMeshObject.GetNumberOfNodes()
elif center == "Cell":
nbData = baseMeshObject.GetNumberOfElements()
elif center == "Grid":
nbData = 1
else:
raise Exception('Cant treat this type of field support' + center ) # pragma: no cover
if data.size == nbData:
arrayType = "Scalar"
elif data.size == nbData*3:
arrayType = "Vector"
elif data.size == nbData*6:
arrayType = "Vector"
elif data.size == nbData*9:
arrayType = "Vector"
elif data.size == nbData*2:
# add an extra zeros to the data and print it out as Vector 3
if baseMeshObject.GetPointsDimensionality() == 2:
shape = (data.size//2,1,2,)
else:
shape = (data.size//2,2,)
if len(data.shape) <= 2:
data.shape = shape
#data = data.transpose(1,0,2)
shape = list(data.shape)
shape [-1] = 3
shape = tuple(shape)
data1 = np.zeros(shape, dtype=data.dtype)
data1[...,0:2] = data
self.__WriteAttribute(data1.ravel(),name,center,baseMeshObject)
return
else:
print(TFormat.InRed("I don't kow how to treat fields '"+ str(name)+"' with " +str(data.size/nbData) +" components")) # pragma: no cover
print(TFormat.InRed("Data has size : " + str(data.size) )) # pragma: no cover
print(TFormat.InRed("But support has size : " + str(nbData) )) # pragma: no cover
raise Exception # pragma: no cover
self.filePointer.write(' <Attribute Center="'+center+'" Name="'+name+'" Type="'+arrayType+'">\n')#
try:
self.__WriteDataItem(data.ravel(),shape,name=name)
except: # pragma: no cover
print("Error Writing heavy data of field: " + str(name))
raise
self.filePointer.write(' </Attribute>\n')
[docs] def WriteIntegrationsPoints(self, quadrature: MeshQuadrature, mesh: Mesh):
"""Writes integration points information to file, this will write only for elements
present on the mesh
"""
for elemName, elems in mesh.elements.items():
if elems.GetNumberOfElements() == 0 :
continue
data = quadrature[elemName]
points = data.points
npData = np.asarray(points,dtype=float).copy()
nip = npData.shape[0]
if npData.shape[1] < 3:
b = np.zeros((nip,3))
b[:,:-1] = npData
npData = b
self.filePointer.write(' <Information Name="QP" Value="')#
self.filePointer.write(str(ED.numberOfNodes[elemName]) + " ")
self.filePointer.write(str(XdmfNumber[elemName]) + " ")
self.filePointer.write(str(nip) + '" > \n')
self.__WriteDataItem(npData.ravel(),[npData.size],name="ip")
self.filePointer.write('</Information> \n')#
[docs] def WriteIntegrationsPointData(self, names, allData):
"""Writes integration points data to file
"""
for i, name in enumerate(names):
data = allData[i]
self.filePointer.write(' <Information Name="IPF" Value="'+str(name)+'" > \n')#
self.ipStorage[name] = self.__WriteDataItem(data.ravel(),[data.size],name='IPD_'+name)
self.filePointer.write('</Information> \n')#
[docs] def NextDomain(self):
self.parafacCpt = 0
self.ddmCpt += 1
if self.IsMultidomainOutput() :
if self.IsParafacOutput():
self.filePointer.write('</Grid> <!-- Parafac grid -->\n')
self.filePointer.write('<Grid Name="Grid_P" GridType="Collection" CollectionType="None" >\n')
else:
raise Exception("Cant make a new domain without a multi domain output")
[docs] def MakeStep(self, Time = None, TimeStep = None):
"""Increases time step in output files
Parameters
----------
Time : float, optional
time value of the time step, by default None
TimeStep : int, optional
number of the time step, by default None
"""
if self.IsMultidomainOutput():
self.filePointer.write('</Grid> <!-- collection grid -->\n')
self.filePointer.write('<Grid Name="Collection" GridType="Collection" CollectionType="Spatial" >\n')
dt = 1
if Time is not None:
dt = Time - self.currentTime
elif TimeStep is not None:
dt = TimeStep
if self.IsTemporalOutput():
self.Step(dt)
if self.IsTemporalOutput() and self.__printTimeInsideEachGrid and (self.IsMultidomainOutput() == False) :
self.filePointer.write(' <Time Value="'+str(self.currentTime)+'" /> \n')
[docs] def Write(self,mesh, PointFields = None, CellFields = None, GridFields= None, PointFieldsNames = None, CellFieldsNames= None, GridFieldsNames=None , Time= None, TimeStep = None,domainName=None, IntegrationRule=None, IntegrationPointData=None, IntegrationPointDataNames=None ):
"""Write data to file in xdmf format
Parameters
----------
mesh : Mesh
the mesh to be written
PointFields : list[np.ndarray], optional
fields to write defined at the vertices of the mesh, by default None
CellFields : list[np.ndarray], optional
fields to write defined at the elements of the mesh, by default None
GridFields : list[np.ndarray], optional
grid fields to write, by default None
PointFieldsNames : list[str], optional
names of the fields to write defined at the vertices of the mesh, by default None
CellFieldsNames : list[str], optional
names of the fields to write defined at the elements of the mesh, by default None
GridFieldsNames : list[str], optional
names of the grid fields to write, by default None
Time : float, optional
time at which the data is read, by default None
TimeStep : int, optional
time index at which the data is read, by default None
domainName : str, optional
Not Used, by default None
IntegrationRule : dict[IntegrationRuleType], optional
integration rules associated to the integration point data, by default None
IntegrationPointData : list[np.ndarray], optional
fields defined at the integration points, by default None
IntegrationPointDataNames : list[str], optional
names of the fields defined at the integration points, by default None
"""
if (Time is not None or TimeStep is not None) and self.IsMultidomainOutput():
raise(Exception("set time using MakeStep, not the Write option") )
if PointFields is None:
PointFields = []
if CellFields is None:
CellFields = []
if GridFields is None:
GridFields = []
if PointFieldsNames is None:
PointFieldsNames = []
if CellFieldsNames is None:
CellFieldsNames = []
if GridFieldsNames is None:
GridFieldsNames = []
if IntegrationPointData is None:
IntegrationPointData = []
if IntegrationPointDataNames is None:
IntegrationPointDataNames = []
self.pointFieldsStorage = {}
self.cellFieldsStorage = {}
self.gridFieldsStorage = {}
self.ipStorage = {}
if not self.isOpen() :
if self.automaticOpen:
self.Open()
else:
print(TFormat.InRed("Please Open The writer First!!!"))
raise Exception
if not self.IsMultidomainOutput():
dt = 1
if Time is not None:
dt = Time - self.currentTime
elif TimeStep is not None:
dt = TimeStep
if self.IsTemporalOutput():
self.Step(dt)
if self.IsParafacOutput ():
suffix = "P" + str(self.parafacCpt)
self.parafacCpt += 1
else:
suffix = str(len(self.timeSteps))
if self.IsMultidomainOutput():
if MPI.IsParallel(): # pragma: no cover
suffix += "_D" + str(MPI.Rank())
else:
suffix += "_D" + str(self.ddmCpt)
#if we have a constantRectilinear mesh with more than only "bulk"
# elements, we add a collection to add all the rest of the elements
if False and mesh.IsConstantRectilinear() and len(mesh.elements) > 1:
self.filePointer.write('<Grid Name="Grid_S'+suffix+'" GridType="Collection" CollectionType="Spatial" >\n')
if self.IsTemporalOutput() and self.__printTimeInsideEachGrid and (self.IsMultidomainOutput() == False) :
self.filePointer.write(' <Time Value="'+str(self.currentTime)+'" /> \n')
self.filePointer.write(' <Grid Name="Bulk">\n')
else:
self.filePointer.write(' <Grid Name="Grid_'+suffix+'">\n')
if self.IsTemporalOutput() and self.__printTimeInsideEachGrid and (self.IsMultidomainOutput() == False) :
self.filePointer.write(' <Time Value="'+str(self.currentTime)+'" /> \n')
self.__WriteGeoAndTopo(mesh,name=suffix)
self.__WriteNodesTagsElementsTags(mesh,PointFieldsNames,CellFieldsNames)
self.__WriteNodesFieldsElementsFieldsGridFields(mesh,
PointFieldsNames,PointFields,
CellFieldsNames,CellFields,
GridFieldsNames,GridFields)
if IntegrationRule is not None:
self.WriteIntegrationsPoints(IntegrationRule, mesh)
self.WriteIntegrationsPointData(IntegrationPointDataNames,IntegrationPointData)
if False and mesh.IsConstantRectilinear() and len(mesh.elements) > 1:
self.filePointer.write(' </Grid>\n')
from Muscat.Containers.Mesh import Mesh
tempMesh = Mesh()
tempMesh.nodes = mesh.nodes
tempMesh.originalIDNodes = mesh.originalIDNodes
for name, data in mesh.elements.items():
if data.mutable :
tempMesh.elements[name] = data
self.filePointer.write(' <Grid Name="Sets">\n')
self.__WriteGeoAndTopo(tempMesh)
self.__WriteNodesTagsElementsTags(tempMesh,[],[])
self.filePointer.write(' </Grid>\n')
self.filePointer.write(' </Grid>\n')
if(self.__keepXmlFileInSaneState):
self.WriteTail()
self.filePointer.flush()
if self.isBinary() :# pragma: no cover
self.__binaryFilePointer.flush()
def __WriteNodesFieldsElementsFieldsGridFields(self,baseMeshObject,
PointFieldsNames,PointFields,
CellFieldsNames,CellFields,
GridFieldsNames,GridFields):
for i in range(len(PointFields)):
name = 'PField'+str(i)
if len(PointFields) == len(PointFieldsNames):
name = PointFieldsNames[i]
self.pointFieldsStorage[name] = self.__WriteAttribute(np.asarray(PointFields[i]), name, "Node",baseMeshObject)
for i in range(len(CellFields)):
name = 'CField'+str(i)
if len(CellFields) == len(CellFieldsNames):
name = CellFieldsNames[i]
self.cellFieldsStorage[name] = self.__WriteAttribute(np.asarray(CellFields[i]), name, "Cell",baseMeshObject)
for i in range(len(GridFields)):
name = 'GField'+str(i)
if len(GridFields) == len(GridFieldsNames):
name = GridFieldsNames[i]
self.gridFieldsStorage[name] = self.__WriteAttribute(np.asarray(GridFields[i]), name, "Grid",baseMeshObject)
def __WriteNodesTagsElementsTags(self,baseMeshObject,PointFieldsNames,CellFieldsNames):
for tag in baseMeshObject.nodesTags:
if tag.name in PointFieldsNames:
name = "Tag_" + tag.name
else:
name = tag.name
data = np.zeros((baseMeshObject.GetNumberOfNodes(),1),dtype=np.int8)
data[baseMeshObject.nodesTags[tag.name].GetIds()] = 1
self.__WriteAttribute(np.asarray(data), name, "Node",baseMeshObject)
#Cell Tags
baseMeshObject.PrepareForOutput()
if False and baseMeshObject.IsConstantRectilinear():
cellTags = baseMeshObject.GetNamesOfElemTagsBulk()
GetElementsInTag = baseMeshObject.GetElementsInTagBulk
else:
cellTags = baseMeshObject.GetNamesOfElementTags()
GetElementsInTag = baseMeshObject.GetElementsInTag
for tagname in cellTags:
if tagname in CellFieldsNames:
name = "Tag_" + tagname
else:
name = tagname
data = GetElementsInTag(tagname)
res = np.zeros((baseMeshObject.GetNumberOfElements(),1),dtype=np.int8)
res[data] = 1
self.__WriteAttribute(np.asarray(res), name, "Cell", baseMeshObject)
def __WriteTime(self):
""" this function is called by the WriteTail, this function must NOT change
the state of the instance, also no writing to binary neither hdf5 files """
if self.isOpen() and self.__printTimeInsideEachGrid == False:
#self.filePointer.write('<Time TimeType="List">\n')
#self.__WriteDataItem(self.timeSteps)
#self.filePointer.write('</Time>\n')
self.filePointer.write('<Time Value="'+ (" ".join(str(x) for x in self.timeSteps)) +'"/>\n')
def __WriteDataItem(self,_data, _shape= None,name=None):
import numpy as np
data = np.asarray(_data)
if _shape is None:
_shape = _data.shape
shape = np.asarray(_shape)
if self.isOpen():
if data.dtype == np.float64:
typename = 'Float'
s = data.dtype.itemsize
elif data.dtype == np.float32:
typename = 'Float'
s = data.dtype.itemsize
elif data.dtype == np.int32:
typename = 'Int'
s = data.dtype.itemsize
elif data.dtype == np.int64:
typename = 'Int'
s = data.dtype.itemsize
elif data.dtype == np.int8:
typename = 'Char'
s = data.dtype.itemsize
else:
print(f"Warning : skipping field '{name}' of type '{type(data[0])}' not supported'")
return None
dimension = ArrayToString(shape)
if self.isBinary() and len(data) > self.__XmlSizeLimit:# pragma: no cover
# to test this feature a big file must be created (so we don't test it)
if self.__binaryCpt > self.__chunkSize :
self.__binaryFilePointer.close()
self.NewBinaryFilename()
self.__binaryFilePointer = open (self.__binFileName, "wb")
self.__binaryCpt = 0
gsData,gsStorage = self.globalStorage.get(str(name),(None,None))
dataToWrite = data.ravel()
if np.array_equal(gsData, dataToWrite):
binaryFile = gsStorage.filenameOnly
seek = gsStorage.offset
gsStorage.usedByNInstances += 1
self.globalStorage[str(name)] = (gsData.copy(),gsStorage)
res = gsStorage
else:
res = BinaryStorage(data=data,filePointer=self.__binaryFilePointer)
res.filenameOnly = self.__binFileNameOnly
binaryFile = self.__binFileNameOnly
seek = self.__binaryCpt
data.ravel().tofile(self.__binaryFilePointer)
self.__binaryCpt += s*len(dataToWrite)
if len(self.globalStorage) > self.maxStorageSize-1:
usage = [x[1].usedByNInstances for x in self.globalStorage.values()]
#print(usage)
minUsage = min(usage)
newGlobalStorage = {}
GT = len(self.globalStorage)
for i,d in self.globalStorage.items():
if d[1].usedByNInstances == minUsage:
GT -= 1
if GT < self.maxStorageSize-1:
break
continue
newGlobalStorage[i]=d
self.globalStorage = newGlobalStorage
self.globalStorage[str(name)] = (dataToWrite.copy(),res)
self.filePointer.write(' <DataItem Format="Binary"'+
' NumberType="'+typename+'"'+
' Dimensions="'+dimension+'" '+
' Seek="'+str(seek)+'" '+
' Endian="Native" '+
' Precision="'+str(s)+'" '+
' Compression="Raw" >')
self.filePointer.write(binaryFile)
self.filePointer.write('</DataItem>\n')
return res
elif self.IsHdf5():
if self.__hdf5cpt > self.__chunkSize :
import h5py
self.__hdf5FilePointer.close()
self.NewHdf5Filename()
self.__hdf5FilePointer = h5py.File(self.__hdf5FileName, 'w')
self.__hdf5cpt = 0
if name is None:
name = "dataset_"+ str(self.__hdf5NameCpt)
else:
name += "_"+ str(self.__hdf5NameCpt)
self.__hdf5NameCpt += 1
self.__hdf5FilePointer.create_dataset(name, data=data)
self.__hdf5cpt += s*data.size
self.filePointer.write(f' <DataItem Format="HDF" NumberType="{typename}" Dimensions="{dimension}" Precision="{s}" >')
self.filePointer.write(f"{self.__hdf5FileNameOnly}:{name}")
self.filePointer.write('</DataItem>\n')
res = Hdf5Storage(data=data,filename=self.__hdf5FileName ,datasetName=name)
return res
else:
self.filePointer.write(' <DataItem Format="XML" NumberType="'+typename+'" Dimensions="'+dimension+'">')
self.filePointer.write(" ".join(str(x) for x in data.ravel()))
self.filePointer.write('</DataItem>\n')
return None
from Muscat.IO.IOFactory import RegisterWriterClass
RegisterWriterClass(".xdmf",XdmfWriter)
RegisterWriterClass(".xmf",XdmfWriter)
[docs]def WriteTest(tempdir, Temporal, Binary, Hdf5 ):
from Muscat.Containers.ConstantRectilinearMeshTools import CreateConstantRectilinearMesh
myMesh = CreateConstantRectilinearMesh(dimensions=[2,3,4], spacing=[0.1, 0.1, 0.1], origin=[-2.5,-1.2,-1.5])
print(myMesh.GetNumberOfElements())
dataT = np.arange(24,dtype=np.float32)
dataT.shape = (2,3,4)
dataDep = np.arange(24*3)+0.3
dataDep.shape = (2,3,4,3)
writer = XdmfWriter(tempdir + 'TestOutput_Bin_'+str(Binary)+'_hdf5_'+str(Hdf5)+'_Temp_'+str(Temporal)+'.xmf')
writer.SetTemporal(Temporal)
writer.SetBinary(Binary)
writer.SetHdf5(Hdf5)
writer.Open()
print(writer)
writer.Write(myMesh,PointFields=[dataT, dataDep], PointFieldsNames=["Temp","Dep"],CellFields=[np.arange(6)],CellFieldsNames=['S'], Time=0)
writer.Write(myMesh,GridFields=[0, 1], GridFieldsNames=['K','P'], TimeStep = 1)
writer.Close()
[docs]def WriteTestAppend(tempdir, Temporal, Binary):
from Muscat.Containers.ConstantRectilinearMeshTools import CreateConstantRectilinearMesh
myMesh = CreateConstantRectilinearMesh(dimensions=[2,3,4], spacing=[0.1, 0.1, 0.1], origin=[-2.5,-1.2,-1.5])
dataT = np.arange(24,dtype=np.float32)
dataT.shape = (2,3,4)
dataDep = np.arange(24*3)+0.3
dataDep.shape = (2,3,4,3)
writer = XdmfWriter(tempdir + 'TestOutput_Bin_'+str(Binary)+'_Temp_'+str(Temporal)+'.xmf')
writer.SetAppendMode(True)
writer.SetTemporal(Temporal)
writer.SetBinary(Binary)
writer.Open()
print(writer)
writer.Write(myMesh,PointFields=[dataT, dataDep], PointFieldsNames=["Temp","Dep"],CellFields=[np.arange(6)],CellFieldsNames=['S'])
writer.Write(myMesh,GridFields=[0, 1], GridFieldsNames=['K','P'], TimeStep = 1)
writer.Close()
[docs]def CheckIntegrity(GUI:bool=False):
imf = InMemoryFile(0)
imf.write("Hello ")
imf.write("World.")
imf.tell()
imf.seek(0)
imf.flush()
imf.close()
np.testing.assert_equal(imf.data, "Hello World.")
from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory
from Muscat.Containers.ConstantRectilinearMeshTools import CreateConstantRectilinearMesh
import Muscat.Containers.Mesh as UM
from Muscat.Containers.MeshCreationTools import CreateMeshOfTriangles, CreateMeshOf
tempdir = TemporaryDirectory.GetTempPath()
#test with empty elements
from Muscat.Containers.MeshCreationTools import CreateMeshOfTriangles, CreateMeshOf
dim0 = CreateMeshOf([[0.,0.,0],[1.,2.,0],[1, 3, 0]], np.arange(3,dtype=MuscatIndex).reshape((3,1)), ED.Point_1)
dim0.props['ParafacDims'] = 2
dim0.props['ParafacDim0'] = "x"
dim0.props['ParafacDim1'] = "y"
dim1 = CreateMeshOf([[0.,0.,0],[1.,0,0],[2, 0, 0]], np.arange(3,dtype=MuscatIndex).reshape((3,1)), ED.Point_1)
dim1.props['ParafacDims'] = 1
dim1.props['ParafacDim0'] = "T"
writer = XdmfWriter()
writer.SetFileName(None)
writer.SetXmlSizeLimit(0)
writer.SetBinary(True)
writer.SetHdf5(False)
writer.SetParafac(True)
writer.Open(filename=tempdir+'parafacNoElements.pxdmf')
mode0 = np.arange(3*3,dtype=MuscatFloat).reshape((3,3))
writer.Write(dim0,PointFields=[ mode0 ],PointFieldsNames=[ "VField_0"] )
writer.Write(dim1,PointFields=[ mode0 ],PointFieldsNames=[ "VField_0"])
writer.Close()
res = CreateMeshOfTriangles([[0.,0.,0],[1.,2.,3],[1, 3, 2]], np.asarray([[0,1,2]]))
print(res)
WriteMeshToXdmf(tempdir+"TestUnstructured.xdmf", res, PointFields = [np.array([1.,2,3])], CellFields =[ np.array([1])] ,GridFields= [[0], np.array([1,2,3]).astype(np.int64) ],
PointFieldsNames = ["PS"],
CellFieldsNames = ["CS"],
GridFieldsNames = ["GS", "GV"] , Binary= True)
elements = res.GetElementsOfType(ED.Bar_2)
elements.AddNewElement([1,2],1)
elements = res.GetElementsOfType(ED.Point_1)
elements.AddNewElement([0],2)
res.nodes = np.ascontiguousarray(res.nodes[:,0:2])
res.GetNodalTag("First_Point").AddToTag(0)
res.AddElementToTagUsingOriginalId(1,"bars")
WriteMeshToXdmf(tempdir+"TestUnstructured_multiElements.xdmf", res, PointFields = [np.array([1.,2,3])], CellFields =[ np.array([1,0,4])] ,GridFields= [[0]],
PointFieldsNames = ["PS"],
CellFieldsNames = ["CS"],
GridFieldsNames = ["GS"] , Binary= True)
#----------------------
res = UM.Mesh()
WriteMeshToXdmf(tempdir+"TestUnstructured_EmptyMesh.xdmf", res)
res.SetNodes([[0,0,0],[1,0,0]],np.arange(2))
elements = res.GetElementsOfType(ED.Point_1)
elements.AddNewElement([0],1)
WriteMeshToXdmf(tempdir+"TestUnstructured_OnlyPoints.xdmf", res)
res = UM.Mesh()
res.nodes = np.array([[0,0,0],[1,0,0]],dtype=MuscatFloat)
res.originalIDNodes = np.arange(res.GetNumberOfNodes(), dtype= MuscatIndex)
elements = res.GetElementsOfType(ED.Bar_2)
elements.AddNewElement([0,1],1)
WriteMeshToXdmf(tempdir+"TestUnstructured_OnlyBars.xdmf", res)
#----------------------
WriteTest(tempdir, False, False, False)
WriteTest(tempdir, False, True, False)
WriteTest(tempdir, False, False, True)
WriteTest(tempdir,True, False, False)
WriteTest(tempdir,True, True, False)
WriteTest(tempdir,True, False, True)
WriteTestAppend(tempdir,True, True)
WriteMeshToXdmf(tempdir+'testDirect.xdmf',CreateConstantRectilinearMesh(dimensions=[1,1,1],origin=[0,0,0], spacing= [0,0,0] ) )
writer = XdmfWriter()
writer.SetFileName(None)
writer.SetFileName(tempdir+'testErrors.xdmf')
writer.SetXmlSizeLimit(0)
writer.Open()
## test of the errors
try:
writer.SetTemporal()
return 'Not ok'# pragma: no cover
except:
pass
try:
writer.SetBinary()
return 'Not ok'# pragma: no cover
except:
pass
# no error anymore just a warning
#try:
writer.Open()
#except:
# pass
writer.Close()
# for the moment we comment this test (verification of "unable to open file")
# need to find a way to raise an exception in linux and in windows with the
# same filename
#
#try:
# writer.Open("c:\\") # in windows this will raise an exception
# writer.Open("\") # in linux this will raise an exception
# return 'Not ok'# pragma: no cover
#except:
# pass
print("ConstantRectilinearMesh in 2D")
CRM2D = CreateConstantRectilinearMesh(dimensions=[2,3], origin=[0,0], spacing=[1,1])
writer = XdmfWriter()
writer.SetFileName(None)
writer.SetXmlSizeLimit(0)
writer.SetBinary(True)
writer.Open(filename=tempdir+'testDirect.xdmf')
writer.Write(CRM2D, PointFields = [ np.zeros((CRM2D.GetNumberOfNodes(),) ).astype(np.float32), np.zeros((CRM2D.GetNumberOfNodes(),) ).astype(int) ],
PointFieldsNames = ["Test", "testInt"],
CellFields= [ np.arange(CRM2D.GetNumberOfElements()*2 ).astype(np.float64)],
CellFieldsNames = [ "TestV"] )
writer.Close()
CRM2D = CreateConstantRectilinearMesh(dimensions=[2,3], origin=[0,0], spacing=[1,1])
writer = XdmfWriter()
writer.SetFileName(None)
writer.SetXmlSizeLimit(0)
writer.SetBinary(True)
writer.SetMultidomain()
writer.Open(filename=tempdir+'testDirectTwoDomains.xdmf')
writer.Write(CRM2D, PointFields = [ np.zeros((CRM2D.GetNumberOfNodes(),) ).astype(np.float32), np.zeros((CRM2D.GetNumberOfNodes(),) ).astype(int) ],
PointFieldsNames = ["Test", "testInt"],
CellFields= [ np.arange(CRM2D.GetNumberOfElements()*2 ).astype(np.float64)],
CellFieldsNames = [ "TestV"] )
CRM3D = CreateConstantRectilinearMesh(dimensions=[2,3,4], origin=[0,0,0], spacing=[1,1,1])
writer.Write(CRM3D)
writer.Close()
if GUI :
from Muscat.Actions.OpenInParaView import OpenInParaView
OpenInParaView(filename = tempdir+'testDirectTwoDomains.xdmf')
writer = XdmfWriter()
writer.SetFileName(None)
writer.SetXmlSizeLimit(0)
writer.SetBinary(True)
writer.SetMultidomain()
writer.SetTemporal()
writer.Open(filename=tempdir+'testDirectTwoDomainsTwoSteps.xdmf')
writer.Write(CRM2D)
writer.Write(CRM3D)
writer.MakeStep(Time=1.5)
writer.Write(CRM3D)
writer.Write(CRM2D)
writer.Close()
if GUI:
from Muscat.Actions.OpenInParaView import OpenInParaView
OpenInParaView(filename = tempdir+'testDirectTwoDomainsTwoSteps.xdmf')
####### work for PXDMF 2.0 #################
writer = XdmfWriter()
writer.SetFileName(None)
writer.SetXmlSizeLimit(0)
writer.SetBinary(True)
writer.SetHdf5(False)
writer.SetParafac(True)
writer.Open(filename=tempdir+'parafac.pxdmf')
from Muscat.Containers.MeshCreationTools import CreateUniformMeshOfBars
mesh1DTime = CreateUniformMeshOfBars(2,5,10)
mesh1DTime.props['ParafacDims'] = 1
mesh1DTime.props['ParafacDim0'] = "T"
mesh1DTime.props["ParafacUnit0"] = "s"
mesh2DParameters = CreateConstantRectilinearMesh(dimensions=[4,4], origin=[0,0], spacing=[1,1])
mesh2DParameters.props['ParafacDims'] = 2
mesh2DParameters.props['ParafacDim0'] = "Px"
mesh2DParameters.props['ParafacDim1'] = "Py"
mesh3DSpace = CreateConstantRectilinearMesh(dimensions=[8,8,8], origin=[0,0,0], spacing=[1,1,1])
from Muscat.FE.IntegrationRules import LagrangeIsoParamQuadrature
elementaryQuadrature = LagrangeIsoParamQuadrature[ED.Quadrangle_4]
IntegrationPointData = np.arange(mesh2DParameters.GetNumberOfElements()*elementaryQuadrature.GetNumberOfPoints() )+0.1
IntegrationPointAllData = [IntegrationPointData]
writer.Write(mesh2DParameters,
IntegrationRule=LagrangeIsoParamQuadrature,
IntegrationPointData=IntegrationPointAllData,
IntegrationPointDataNames=["IPId_0"])
print(IntegrationPointAllData)
print(writer.ipStorage["IPId_0"].GetData())
writer.ipStorage["IPId_0"].UpdateHeavyStorage(IntegrationPointData+10)
print(str(writer.ipStorage["IPId_0"]))
writer.Write(mesh1DTime, CellFields = [np.arange(mesh1DTime.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"])
writer.Write(mesh3DSpace, CellFields = [np.arange(mesh3DSpace.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"])
writer.Close()
####### work for PXDMF 2.0 HDF5 On #################
writer = XdmfWriter()
writer.SetFileName(None)
writer.SetXmlSizeLimit(0)
writer.SetBinary(True)
writer.SetHdf5(True)
writer.SetParafac(True)
writer.Open(filename=tempdir+'parafachdf5.pxdmf')
from Muscat.Containers.MeshCreationTools import CreateUniformMeshOfBars
mesh1DTime = CreateUniformMeshOfBars(2,5,10)
mesh1DTime.props['ParafacDims'] = 1
mesh1DTime.props['ParafacDim0'] = "T"
mesh2DParameters = CreateConstantRectilinearMesh(dimensions=[4,4], origin=[0,0], spacing=[1,1])
mesh2DParameters.props['ParafacDims'] = 2
mesh2DParameters.props['ParafacDim0'] = "Px"
mesh2DParameters.props['ParafacDim1'] = "Py"
mesh3DSpace = CreateConstantRectilinearMesh(dimensions=[8,8,8], origin=[0,0,0], spacing=[1,1,1])
from Muscat.FE.IntegrationRules import LagrangeIsoParamQuadrature
elementaryQuadrature = LagrangeIsoParamQuadrature[ED.Quadrangle_4]
IntegrationPointData = np.arange(mesh2DParameters.GetNumberOfElements()*elementaryQuadrature.GetNumberOfPoints() )+0.1
IntegrationPointAllData = [IntegrationPointData]
writer.Write(mesh1DTime, CellFields = [np.arange(mesh1DTime.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"])
writer.Write(mesh3DSpace, CellFields = [np.arange(mesh3DSpace.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"])
writer.Write(mesh2DParameters,
IntegrationRule=LagrangeIsoParamQuadrature,
IntegrationPointData=IntegrationPointAllData,
IntegrationPointDataNames=["IPId_0"])
writer.Close()
print(IntegrationPointAllData)
writer.ipStorage["IPId_0"].UpdateHeavyStorage(IntegrationPointData+10)
print(str(writer.ipStorage["IPId_0"]))
np.testing.assert_equal(writer.ipStorage["IPId_0"].GetData(),IntegrationPointData+10)
if GUI :
from Muscat.Actions.OpenInParaView import OpenInParaView
OpenInParaView(filename = tempdir+'parafac.pxdmf')
from Muscat.IO.XdmfReader import XdmfReader as XR
f = XR(filename = tempdir+'testDirect.xdmf' )
f.lazy = False
f.Read()
print(tempdir+'testDirect.xdmf')
f.xdmf.GetDomain(0).GetGrid(0).GetFieldsOfType("Cell")
print(f.xdmf.GetDomain(0).GetGrid(0).attributes )
domain = f.xdmf.GetDomain(0)
grid = domain.GetGrid(0)
grid.GetFieldsOfType("Cell")
### Domain Decomposition and Parafac
res = CheckIntegrityDDM(GUI=GUI)
if res.lower() != "ok": return res
CheckIntegrityHdf5(tempdir)
return 'ok'
[docs]def CheckIntegrityHdf5(tempdir):
from Muscat.Containers.ConstantRectilinearMeshTools import CreateConstantRectilinearMesh
myMesh = CreateConstantRectilinearMesh(dimensions=[20,30,40], spacing=[0.1, 0.1, 0.1], origin=[-2.5,-1.2,-1.5] )
dataT = np.arange(24000,dtype=np.float32)
dataT.shape = (20,30,40)
dataDep = np.arange(24000*3)+0.3
dataDep.shape = (20,30,40,3)
writer = XdmfWriter(tempdir + 'TestHDF5_.xmf')
writer.SetChunkSize(2**20)
writer.SetTemporal(True)
writer.SetHdf5(True)
writer.Open()
print(writer)
for i in range(3):
dataT = np.random.rand(24000)
dataT.shape = (20,30,40)
dataDep = np.random.rand(24000*3)+0.3
dataDep.shape = (20,30,40,3)
writer.Write(myMesh,PointFields=[dataT, dataDep], PointFieldsNames=["Temp","Dep"],CellFields=[np.arange(19*29*39)],CellFieldsNames=['S'])
writer.Close()
[docs]def CheckIntegrityDDM(GUI:bool=False):
""" this test function can be launched using the mpirun -n 2 ...
to test the writer in mpi mode
"""
from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory
tempdir = TemporaryDirectory.GetTempPath()
from Muscat.Containers.MeshCreationTools import CreateUniformMeshOfBars
mesh1D = CreateUniformMeshOfBars(0,5,10)
mesh1D.props['ParafacDims'] = 1
mesh1D.props['ParafacDim0'] = "T"
from Muscat.Helpers.MPIInterface import MPIInterface as MPI
writer = XdmfWriter()
writer.SetFileName(None)
writer.SetXmlSizeLimit(0)
writer.SetBinary(True)
writer.SetMultidomain()
writer.SetParafac(True)
mpi = MPI()
print("rank ", mpi.rank)
writer.Open(filename=tempdir+'DDM_parafac.pxdmf')
mpi = MPI()
if mpi.Rank() == 0:
mesh1D.props['ParafacDim0'] = "D0_P0"
writer.Write(mesh1D, CellFields = [np.arange(mesh1D.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"])
mesh1D.nodes[:,0] += 1
mesh1D.props['ParafacDim0'] = "D0_P1"
writer.Write(mesh1D, CellFields = [np.arange(mesh1D.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"])
if mpi.IsParallel() == 0 :
writer.NextDomain()
if mpi.Rank() == mpi.size-1 :
mesh1D.nodes[:,0] += 1
mesh1D.props['ParafacDim0'] = "D1_P0"
writer.Write(mesh1D, CellFields = [np.arange(mesh1D.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"])
mesh1D.nodes[:,0] += 1
mesh1D.props['ParafacDim0'] = "D1_P1"
writer.Write(mesh1D, CellFields = [np.arange(mesh1D.GetNumberOfElements())+0.1 ], CellFieldsNames=["IPId_0"])
writer.Close()
if HasHdf5Support():
return "ok"
else:
return "ok, but no hdf5 support"
if __name__ == '__main__':
#from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory
#TemporaryDirectory.SetTempPath("/tmp/Muscatrectory__is42mzi_safe_to_delete/")
print(CheckIntegrity(GUI=False)) # pragma: no cover