# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
"""Base reader object from which all the readers of Muscat
"""
from typing import Union, Tuple, Type, Optional, IO, Any
import os
import sys
import struct
import locale
import io
import numpy as np
from Muscat.Types import MuscatIndex
[docs]class ReaderBase():
""" Base class for all the readers"""
def __init__(self,fileName: str = '') -> None:
super().__init__()
self.fileName: str = ""
self.string: Union[str,bytes] = ""
self.pipe:bool = False
self.readFormat:str = 'r'
self.binary: bool = False
self.commentHeader: Union[None,str] = None
self.filePointer: Union[None, IO[Any]] = None
self.lineCounter:MuscatIndex = 0
self.encoding: str = locale.getpreferredencoding(False)
# to be defined in the child class
self.canHandleTemporal: bool = False
self.canHandlePartitioned: bool = False
self.canHandleDistributed: bool = False
# output, some outputs are cached
self.extraOutput = None
self.nodalFields:dict = {}
self.elementsFields:dict = {}
self.SetFileName(fileName)
[docs] def SetBinary(self, binary:bool = True) -> None:
"""Sets the binary status of the file to read
Parameters
----------
binary : bool, optional
if True, sets the file to read as binary, by default True
"""
self.binary = binary
if binary:
if self.readFormat.find("b") >= 0:
return
self.readFormat += "b"
else:
if self.readFormat.find("b") >= 0:
self.readFormat = self.readFormat.replace("b","")
[docs] def StartReading(self):
if len(self.fileName) > 0 :
if self.readFormat.find('b') > -1 :
self.filePointer = open(self.fileName, self.readFormat)
self.text_stream = self.filePointer
else:
self.filePointer = open(self.fileName, self.readFormat,encoding=self.encoding)
elif len(self.string) > 0:
if self.readFormat.find('b') > -1 :
if type(self.string) is str:
self.filePointer = io.BytesIO(bytearray(self.string,self.encoding)) # pragma: no cover
else:
self.filePointer = io.BytesIO(self.string)
self.text_stream = self.filePointer
else:
self.filePointer = io.StringIO(self.string)
self.text_stream = self.filePointer
elif self.pipe: # pragma: no cover
r, w = os.pipe()
if self.readFormat.find('b') > -1 :
self.filePointer = sys.stdin.buffer
self.text_stream = self.filePointer
else:
self.filePointer = sys.stdin
else:
raise RuntimeError('Need a file, a string or set pipe True to read')
self.lineCounter = 0
[docs] def GetFilePointer(self):
return self.filePointer
[docs] def EndReading(self):
if self.filePointer is not None:
self.filePointer.close()
[docs] def SetFileName(self, fileName:str) -> None:
"""Sets the name of file to read.
if fileName is "PIPE" then call self.SetReadFromPipe()
Parameters
----------
fileName : str
file name to set
"""
fileName = str(fileName)
if not(fileName is None) and len(fileName) >= 4 and fileName[0:4] == "PIPE" :
self.SetReadFromPipe() # pragma: no cover
else:
self.fileName = fileName
if len(fileName) == 0 :
self.__path = None
self.string = ""
else:
self.filePath = os.path.abspath(os.path.dirname(fileName))+os.sep;
self.string = ""
self.pipe = False
[docs] def SetStringToRead(self,string:Union[str,bytes]) ->None:
"""Sets data to be read as a string instead of a file
Parameters
----------
string : str
data to be read
"""
self.string = string
if len(self.string) > 0:
self.fileName = ""
self.pipe = False
[docs] def SetReadFromPipe(self)->None: # pragma: no cover
"""Set this reader to read data from the sys.stdin.buffer
"""
self.string = ""
self.fileName = ""
self.pipe = True
[docs] def PeekLine(self)-> str:
"""Read a line without advancing
Returns
-------
str
the next line in the input buffer
"""
pos = self.filePointer.tell()
line = self.filePointer.readline()
self.filePointer.seek(pos)
return line
[docs] def Peek(self,length:int=1) -> str:
"""Read a length number of chars without advancing the file
Parameters
----------
length : int, optional
number of chars to read, by default 1
Returns
-------
str
a string of size length the the next chars in the buffer
"""
pos = self.filePointer.tell()
data = self.filePointer.read(length) # Might try/except this line, and finally: f.seek(pos)
self.filePointer.seek(pos)
return data
[docs] def ReadCleanLine(self) -> Union[str,None]:
"""Return the next (non comment) line in the file
Returns
-------
str
Return the next clean line
"""
while(True):
string = self.filePointer.readline()
self.lineCounter +=1
#end of file
if string == "" :
return None
string = string.replace(u'\ufeff', '').strip(u' \r\n')
#empty line
if len(string) == 0 :
continue
if self.commentHeader is None:
break# pragma: no cover
else :
if not string.startswith(self.commentHeader):
break
return string
##binary interface
[docs] def RawRead(self,cpt:int,withError:bool=False) -> bytes :
"""Read raw data (binary from the input buffer )
Parameters
----------
cpt : int
number of char to read
withError : bool, optional
raise EOFError if not able to read cpt chars from the buffer, by default False
Returns
-------
bytes
the raw data
Raises
------
EOFError
_description_
"""
res = self.filePointer.read(cpt)
if withError and len(res) < cpt:
raise EOFError("Problem reading file :" +str(self.fileName) + " EOF") # pragma: no cover
else:
return res
[docs] def ReadInt32(self) -> int:
"""Read one int 32
Returns
-------
int
"""
rawData = self.RawRead(4,withError=True)
data = struct.unpack("i", rawData)[0]
return data
[docs] def ReadInt64(self) -> int:
"""Read on int 64
Returns
-------
int
"""
rawdata = self.RawRead(8,withError=True)
data = struct.unpack("q", rawdata)[0]
return data
[docs] def ReadData(self,cpt:int,datatype:Type) -> np.ndarray:
"""Read data cpt instances of type datatype from the input buffer
first try to use the np.fromfile for speed if it fail will try
np.frombuffer
Parameters
----------
cpt : int
number of data to read
datatype : Type
data type
Returns
-------
np.ndarray
the data read
"""
try:
return np.fromfile(self.filePointer,dtype=datatype,count=cpt,sep="")
except:
s = np.dtype(datatype).itemsize*cpt
data = self.filePointer.read(s)
return np.frombuffer(data,dtype=datatype)
def __reshapeData(self,data,finalShape=None):
if finalShape is None:
return data
else:
data.shape = finalShape
return data
[docs] def ReadFloats32(self,cpt:int,finalShape:Tuple=None) -> np.ndarray:
return self.__reshapeData(self.ReadData(cpt,np.float32), finalShape)
[docs] def ReadFloats64(self,cpt:int,finalShape:Tuple=None) -> np.ndarray:
return self.__reshapeData(self.ReadData(cpt,np.float64), finalShape)
[docs] def Seek(self,pos:int)->None:
"""Move pointer to the internal file descriptor to the position pos
Parameters
----------
cpt : int
_description_
"""
self.filePointer.seek(pos)
[docs]def CheckIntegrity():
obj = ReaderBase()
obj.commentHeader = "#"
obj.SetBinary(False)
try:
obj.StartReading()
raise # pragma: no cover
except :
pass
testString = """0
1
2
3
#this is a comment
4"""
obj.SetStringToRead(testString)
def checkBaseReaderAscii(obj):
obj.StartReading()
print("file Pointer: ", str(obj.GetFilePointer() ) )
if obj.PeekLine() != "0\n": # pragma: no cover
raise Exception("first line not correct ")
if obj.Peek() != "0": # pragma: no cover
raise
for i in range(5):
data = obj.ReadCleanLine()
print(f"-> {data}")
if i != int(data):
raise # pragma: no cover
#before last
if obj.ReadCleanLine() != None:
raise # pragma: no cover
from Muscat.Helpers.CheckTools import MustFailFunction
MustFailFunction(obj.ReadCleanLine, True)
obj.EndReading()
checkBaseReaderAscii(obj)
from Muscat.Helpers.IO.FileTools import WriteTempFile
import pathlib
fn = WriteTempFile('ReaderBaseTestString',testString)
obj.SetFileName(pathlib.Path(fn))
checkBaseReaderAscii(obj)
binarydata = np.array([0], dtype=np.int32).tobytes()
binarydata += np.array([1], dtype=np.int64).tobytes()
binarydata += np.array([2], dtype=np.float32).tobytes()
binarydata += np.array([3], dtype=np.float64).tobytes()
fn = WriteTempFile('ReaderBaseTestbinary', binarydata, mode='wb')
obj.SetBinary(False)
obj.SetBinary(True)
obj.SetBinary(False)
obj.SetBinary(False)
obj.SetBinary(True)
obj.SetBinary(True)
obj.SetFileName(fn)
print(obj.readFormat)
def checkBaseReaderBinary(obj):
obj.StartReading()
if obj.ReadInt32() != 0: raise
if obj.ReadInt64() != 1: raise
if obj.ReadFloats32(1, (1,1)) != 2.: raise
if obj.ReadFloats64(1) != 3.: raise
obj.EndReading()
checkBaseReaderBinary(obj)
obj.SetStringToRead(binarydata)
checkBaseReaderBinary(obj)
obj.StartReading()
obj.Seek(4)
return "ok"
if __name__ == '__main__':
print(CheckIntegrity())# pragma: no cover