# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
"""CSV file reader
"""
from typing import Optional, List
import numpy as np
from Muscat.Types import MuscatFloat
from Muscat.IO.IOFactory import RegisterWriterClass
from Muscat.Containers.Filters.FilterObjects import ElementFilter, NodeFilter
from Muscat.Containers.Mesh import Mesh
from Muscat.IO.WriterBase import WriterBase as WriterBase
[docs]def WriteMeshToCsv(filename, mesh: Mesh,
PointFields: Optional[List[np.ndarray]] = None, CellFields: Optional[List[np.ndarray]] = None, GridFields: Optional[List[MuscatFloat]] = None,
PointFieldsNames: Optional[List[str]] = None, CellFieldsNames: Optional[List[str]] = None, GridFieldsNames: Optional[List[str]] = None,
nFilter: Optional[NodeFilter] = None, eFilter: Optional[ElementFilter] = None):
"""Function API for writing data into a CSV file
Parameters
----------
fileName : str
name of the file to be written
mesh : Mesh
support of the data to be written
PointFields : list[np.ndarray], optional
list of fields defined at the vertices of the mesh, by default None
CellFields : list[np.ndarray], optional
list of fields defined at the cells of the mesh, by default None
GridFields : list[np.ndarray], optional
list of grid data, by default None
PointFieldsNames : list[str], optional
list of field names defined at the vertices of the mesh, by default None
CellFieldsNames : list[str], optional
list of field names defined at the cells of the mesh, by default None
GridFieldsNames : list[str], optional
list of grid data names, by default None
nFilter : NodeFilter, optional
node filter to select a part of baseMeshObject, by default None
eFilter : ElementFilter, optional
element filter to select a part of baseMeshObject, by default None
"""
if PointFields is None:
PointFields = []
if CellFields is None:
CellFields = []
if GridFields is None:
GridFields = []
if PointFieldsNames is None:
PointFieldsNames = []
if CellFieldsNames is None:
CellFieldsNames = []
if GridFieldsNames is None:
GridFieldsNames = []
writer = CsvWriter(filename)
writer.nodalFilter = nFilter
writer.elementFilter = eFilter
writer.Open()
writer.Write(mesh,
PointFields=PointFields,
CellFields=CellFields,
GridFields=GridFields,
PointFieldsNames=PointFieldsNames,
CellFieldsNames=CellFieldsNames,
GridFieldsNames=GridFieldsNames
)
writer.Close()
[docs]class CsvWriter(WriterBase):
"""Class to writes a CSV file on disk
"""
def __init__(self, fileName='') -> None:
super().__init__(fileName=fileName)
self.canHandleTemporal = True
self.canHandleAppend = True
self.canHandleMultidomain = False
self.automaticOpen = False
self.fileName = None
self.SetBinary(False)
self.SetFileName(fileName)
self.separator = ","
self.nodalFilter: Optional[NodeFilter] = None
self.elementFilter: Optional[ElementFilter] = None
self.currentTime = 0.
self.cpt = 0
def __str__(self) -> str:
res = 'CsvWriter : \n'
res += ' FileName : ' + str(self.fileName) + '\n'
if self.isOpen():
res += ' The File is Open!! \n'
res += str(self.nodalFilter) + '\n'
res += str(self.elementFilter)
return res
[docs] def SetETag(self, string: str) -> None:
"""Add the tag "string" to the class element filter
Parameters
----------
string : str
element tag to add
"""
self.elementFilter = ElementFilter()
self.elementFilter.SetETags(string)
[docs] def SetNTag(self, string: str):
"""Add the tag "string" to the class node filter
Parameters
----------
string : str
node tag to add
"""
self.nodalFilter = NodeFilter()
self.nodalFilter.SetNTags(string)
[docs] def WriteHead(self, mesh: Mesh, PointFieldsNames: List[str], CellFieldsNames: List[str], GridFieldsNames: List[str]) -> None:
"""Function to write the header of the output CSV file
Parameters
----------
mesh : Mesh
support of the data to be written
PointFieldsNames : list[str], optional
list of field names defined at the vertices of the mesh, by default None
CellFieldsNames : list[str], optional
list of field names defined at the cells of the mesh, by default None
GridFieldsNames : list[str], optional
list of grid data names, by default None
"""
sep = self.separator + " "
self.writeText("Step")
self.writeText(sep)
self.nodalFields = []
self.elementFields = []
self.globalFields = []
for name in GridFieldsNames:
self.writeText(name)
self.writeText(sep)
self.globalFields.append(name)
if self.nodalFilter is not None:
indices = self.nodalFilter.GetNodesIndices(mesh)
if len(indices) > 1 or len(indices) < 1: # pragma: no cover
raise RuntimeError(f"Need a filter extracting only one point: current selection = {len(indices)}")
# conn = mesh.GetElementsOfType(op.name).connectivity[op.id,:]
self.writeText("PointId")
self.writeText(sep)
for name in PointFieldsNames:
self.writeText(name)
self.nodalFields.append(name)
self.writeText(sep)
if self.elementFilter is not None:
cpt = 0
for selection in self.elementFilter(mesh):
cpt += len(selection.indices)
if cpt > 1 or cpt < 1: # pragma: no cover
raise RuntimeError("Need a filter extracting only one Element")
# conn = mesh.GetElementsOfType(op.name).connectivity[op.id,:]
self.writeText("CellId")
self.writeText(sep)
self.writeText("CellGlobalId")
self.writeText(sep)
self.writeText("CellType")
self.writeText(sep)
newLinePrint = False
for name in CellFieldsNames:
if newLinePrint:
self.writeText(sep)
self.writeText(name)
self.elementFields.append(name)
newLinePrint = True
self.writeText("\n")
[docs] def Step(self, dt: MuscatFloat = MuscatFloat(1.)) -> None:
"""Function to increase current time by a time increment
Parameters
----------
dt : int, optional
time increment, by default 1.
"""
self.currentTime += dt
[docs] def Write(self, mesh: Mesh, PointFields: Optional[List[np.ndarray]] = None, CellFields: Optional[List[np.ndarray]] = None, GridFields: Optional[List[MuscatFloat]] = None,
PointFieldsNames: Optional[List[str]] = None, CellFieldsNames: Optional[List[str]] = None, GridFieldsNames: Optional[List[str]] = None, Time: Optional[MuscatFloat] = None,
TimeStep: Optional[MuscatFloat] = None, domainName: Optional[str] = None):
"""Function to writes a CSV file on disk
Parameters
----------
fileName : str
name of the file to be written
mesh : Mesh
support of the data to be written
PointFields : list[np.ndarray], optional
list of fields defined at the vertices of the mesh, by default None
CellFields : list[np.ndarray], optional
list of fields defined at the cells of the mesh, by default None
GridFields : list[np.ndarray], optional
list of grid data, by default None
PointFieldsNames : list[str], optional
list of field names defined at the vertices of the mesh, by default None
CellFieldsNames : list[str], optional
list of field names defined at the cells of the mesh, by default None
GridFieldsNames : list[str], optional
list of grid data names, by default None
Time : float, optional
time at which the data is written, by default None
TimeStep : float, optional
number of the time step at which the data is written, by default None
domainName : str, optional
name of domain to write, by default None
"""
sep = self.separator + " "
if PointFields is None:
PointFields = []
if CellFields is None:
CellFields = []
if GridFields is None:
GridFields = []
if PointFieldsNames is None:
PointFieldsNames = []
if CellFieldsNames is None:
CellFieldsNames = []
if GridFieldsNames is None:
GridFieldsNames = []
if not self.isOpen():
if self.automaticOpen:
self.Open()
else: # pragma: no cover
raise RuntimeError("Please Open The writer First!!!")
if self.filePointer.tell() == 0:
self.WriteHead(mesh, PointFieldsNames=PointFieldsNames, CellFieldsNames=CellFieldsNames, GridFieldsNames=GridFieldsNames)
dt = MuscatFloat(1)
if Time is not None:
dt = Time - self.currentTime
elif TimeStep is not None:
dt = TimeStep
if self.IsTemporalOutput():
self.Step(dt)
# self.currentTime
self.writeText(str(self.cpt))
self.writeText(sep)
data = {a: b for a, b in zip(GridFieldsNames, GridFields)}
for name in self.globalFields:
if name in data:
self.writeText(str(data[name]))
else:# pragma: no cover
self.writeText("nan")
self.writeText(sep)
if self.nodalFilter is not None:
index = self.nodalFilter.GetNodesIndices(mesh)[0]
# conn = mesh.GetElementsOfType(op.name).connectivity[op.id,:]
self.writeText(str(index))
self.writeText(sep)
data = {a: b for a, b in zip(PointFieldsNames, PointFields)}
for name in self.nodalFields:
if name in data:
if len(data[name].shape) == 1:
self.writeText(str(data[name][index]))
else:
self.writeText(str(data[name][index, :]))
else: # pragma: no cover
self.writeText("nan")
self.writeText(sep)
newLinePrint = False
if self.elementFilter is not None:
index = 0
globalId = 0
elementType = ""
for selection in self.elementFilter(mesh):
index = selection.indices[0]
globalId = list(selection.GetMeshSlice())[0]
elementType = selection.elementType
self.writeText(str(id))
self.writeText(sep)
self.writeText(str(globalId))
self.writeText(sep)
self.writeText(str(elementType))
self.writeText(sep)
data = {a: b for a, b in zip(CellFieldsNames, CellFields)}
for name in self.elementFields:
if newLinePrint:
self.writeText(sep)
if name in data:
if len(data[name].shape) == 1:
self.writeText(str(data[name][globalId]))
else:
self.writeText(str(data[name][globalId, :]))
else: # pragma: no cover
self.writeText("nan")
newLinePrint = True
self.writeText("\n")
self.filePointer.flush()
self.cpt += 1
[docs] def Close(self) -> None:
pass
RegisterWriterClass(".csv", CsvWriter)
[docs]def CheckIntegrity(GUI: bool = False) -> str:
from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory
from Muscat.Containers.MeshCreationTools import CreateMeshOfTriangles
import Muscat.Containers.ElementsDescription as ED
tempdir = TemporaryDirectory.GetTempPath()
res = CreateMeshOfTriangles([[0., 0., 0], [1., 2., 3], [1, 3, 2]], np.array([[0, 1, 2]]))
print(res)
elements = res.GetElementsOfType(ED.Bar_2)
elements.AddNewElement([1, 2], 1)
elements.tags.CreateTag("First_bar").SetIds([0])
print(elements.tags.CreateTag("First_bar", False).GetIds())
elements = res.GetElementsOfType(ED.Point_1)
elements.AddNewElement([0], 2)
res.GetNodalTag("First_Point").AddToTag(0)
nodeFilter = NodeFilter()
nodeFilter.AddNTag("First_Point")
elementFilter = ElementFilter()
elementFilter.AddETag("First_bar")
nbElements = res.GetNumberOfElements()
PointFields=[np.array([1., 2, 3]), res.nodes, res.nodes[:,0]]
CellFields=[np.array([1, 0]),np.zeros((nbElements,3))]
GridFields=[[0.], np.array([1, 2, 3]).astype(MuscatFloat)]
PointFieldsNames=["PS", "nodes","x_pos"]
CellFieldsNames=["CS","CV"]
GridFieldsNames=["GS", "GV"]
WriteMeshToCsv(tempdir+"TestUnstructured.csv", res,
PointFields=PointFields,
CellFields=CellFields,
GridFields=GridFields,
PointFieldsNames=PointFieldsNames,
CellFieldsNames=CellFieldsNames,
GridFieldsNames=GridFieldsNames, nFilter=nodeFilter, eFilter=elementFilter)
WriteMeshToCsv(tempdir+"TestUnstructuredEmpty.csv", res, nFilter=nodeFilter, eFilter=elementFilter)
writer = CsvWriter(tempdir+"TestUnstructuredII.csv")
writer.SetETag("First_bar")
writer.SetNTag("First_Point")
writer.SetTemporal()
writer.automaticOpen = True
writer.Write(res,
PointFields=PointFields,
CellFields=CellFields,
GridFields=GridFields,
PointFieldsNames=PointFieldsNames,
CellFieldsNames=CellFieldsNames,
GridFieldsNames=GridFieldsNames,
Time=0.
)
print(str(writer))
writer.Write(res,
TimeStep=0.5
)
writer.Close()
return 'ok'
if __name__ == '__main__':
print(CheckIntegrity(GUI=True)) # pragma: no cover