Source code for Muscat.IO.PipeIO

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#

"""Classes for input/output pipe handling
"""

import sys

from Muscat.IO.IOFactory import RegisterWriterClass,RegisterReaderClass

[docs]class PipeReader(object): def __init__(self): super().__init__() def Open(self): self.inbuffer = sys.stdin.buffer
[docs] def SetFileName(self,val): pass
[docs] def SetBinary(self,val): pass
[docs] def Read(self): import pickle return pickle.load(self.inbuffer,encoding = 'latin1')
## def ReadStr(self): ## import pickle # return pickle.loads(self.inbuffer,encoding = 'latin1')
[docs] def Open(self): pass
[docs] def Close(self): pass
[docs]class PipeWriter(object): def __init__(self): super().__init__() self.outbuffer = None self.canHandleBinaryChange = False
[docs] def SetFileName(self,val): pass
[docs] def SetBinary(self,val): pass
[docs] def Write(self, outmesh, PointFieldsNames = None, PointFields = None, CellFieldsNames = None, CellFields = None): import pickle pickle.dump(outmesh,self.outbuffer,0) self.outbuffer.flush()
# def WriteStr(self,outmesh,PointFieldsNames=None,PointFields=None,CellFieldsNames=None,CellFields=None): # import pickle # self.outbuffer.write(pickle.dumps(outmesh).decode('latin1'))
[docs] def Open(self): import sys if int(sys.version_info.major) >= 3: self.outbuffer = sys.stdout.buffer else: self.outbuffer = sys.stdout
[docs] def Close(self): pass
RegisterReaderClass(".PIPE",PipeReader) RegisterWriterClass(".PIPE",PipeWriter)
[docs]def CheckIntegrity(GUI:bool=False): from Muscat.Containers.MeshCreationTools import CreateCube mesh = CreateCube() print(mesh) PipeReader() return "OK"
if __name__ == '__main__': print(CheckIntegrity())# pragma: no cover