Source code for Muscat.IO.ReaderBase

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#

"""Base reader object from which all the readers of Muscat
"""
from typing import Union, Tuple, Type, Optional, IO, Any
import os
import sys
import struct
import locale
import io

import numpy as np

from Muscat.Types import MuscatIndex

[docs]class ReaderBase(): """ Base class for all the readers""" def __init__(self,fileName: str = '') -> None: super().__init__() self.fileName: str = "" self.string: Union[str,bytes] = "" self.pipe:bool = False self.readFormat:str = 'r' self.binary: bool = False self.commentHeader: Union[None,str] = None self.filePointer: Union[None, IO[Any]] = None self.lineCounter:MuscatIndex = 0 self.encoding: str = locale.getpreferredencoding(False) # to be defined in the child class self.canHandleTemporal: bool = False self.canHandlePartitioned: bool = False self.canHandleDistributed: bool = False # output, some outputs are cached self.extraOutput = None self.nodalFields:dict = {} self.elementsFields:dict = {} self.SetFileName(fileName)
[docs] def SetBinary(self, binary:bool = True) -> None: """Sets the binary status of the file to read Parameters ---------- binary : bool, optional if True, sets the file to read as binary, by default True """ self.binary = binary if binary: if self.readFormat.find("b") >= 0: return self.readFormat += "b" else: if self.readFormat.find("b") >= 0: self.readFormat = self.readFormat.replace("b","")
[docs] def StartReading(self): if len(self.fileName) > 0 : if self.readFormat.find('b') > -1 : self.filePointer = open(self.fileName, self.readFormat) self.text_stream = self.filePointer else: self.filePointer = open(self.fileName, self.readFormat,encoding=self.encoding) elif len(self.string) > 0: if self.readFormat.find('b') > -1 : if type(self.string) is str: self.filePointer = io.BytesIO(bytearray(self.string,self.encoding)) # pragma: no cover else: self.filePointer = io.BytesIO(self.string) self.text_stream = self.filePointer else: self.filePointer = io.StringIO(self.string) self.text_stream = self.filePointer elif self.pipe: # pragma: no cover r, w = os.pipe() if self.readFormat.find('b') > -1 : self.filePointer = sys.stdin.buffer self.text_stream = self.filePointer else: self.filePointer = sys.stdin else: raise RuntimeError('Need a file, a string or set pipe True to read') self.lineCounter = 0
[docs] def GetFilePointer(self): return self.filePointer
[docs] def EndReading(self): if self.filePointer is not None: self.filePointer.close()
[docs] def SetFileName(self, fileName:str) -> None: """Sets the name of file to read. if fileName is "PIPE" then call self.SetReadFromPipe() Parameters ---------- fileName : str file name to set """ fileName = str(fileName) if not(fileName is None) and len(fileName) >= 4 and fileName[0:4] == "PIPE" : self.SetReadFromPipe() # pragma: no cover else: self.fileName = fileName if len(fileName) == 0 : self.__path = None self.string = "" else: self.filePath = os.path.abspath(os.path.dirname(fileName))+os.sep; self.string = "" self.pipe = False
[docs] def SetStringToRead(self,string:Union[str,bytes]) ->None: """Sets data to be read as a string instead of a file Parameters ---------- string : str data to be read """ self.string = string if len(self.string) > 0: self.fileName = "" self.pipe = False
[docs] def SetReadFromPipe(self)->None: # pragma: no cover """Set this reader to read data from the sys.stdin.buffer """ self.string = "" self.fileName = "" self.pipe = True
[docs] def PeekLine(self)-> str: """Read a line without advancing Returns ------- str the next line in the input buffer """ pos = self.filePointer.tell() line = self.filePointer.readline() self.filePointer.seek(pos) return line
[docs] def Peek(self,length:int=1) -> str: """Read a length number of chars without advancing the file Parameters ---------- length : int, optional number of chars to read, by default 1 Returns ------- str a string of size length the the next chars in the buffer """ pos = self.filePointer.tell() data = self.filePointer.read(length) # Might try/except this line, and finally: f.seek(pos) self.filePointer.seek(pos) return data
[docs] def ReadCleanLine(self) -> Union[str,None]: """Return the next (non comment) line in the file Returns ------- str Return the next clean line """ while(True): string = self.filePointer.readline() self.lineCounter +=1 #end of file if string == "" : return None string = string.replace(u'\ufeff', '').strip(u' \r\n') #empty line if len(string) == 0 : continue if self.commentHeader is None: break# pragma: no cover else : if not string.startswith(self.commentHeader): break return string
##binary interface
[docs] def RawRead(self,cpt:int,withError:bool=False) -> bytes : """Read raw data (binary from the input buffer ) Parameters ---------- cpt : int number of char to read withError : bool, optional raise EOFError if not able to read cpt chars from the buffer, by default False Returns ------- bytes the raw data Raises ------ EOFError _description_ """ res = self.filePointer.read(cpt) if withError and len(res) < cpt: raise EOFError("Problem reading file :" +str(self.fileName) + " EOF") # pragma: no cover else: return res
[docs] def ReadInt32(self) -> int: """Read one int 32 Returns ------- int """ rawData = self.RawRead(4,withError=True) data = struct.unpack("i", rawData)[0] return data
[docs] def ReadInt64(self) -> int: """Read on int 64 Returns ------- int """ rawdata = self.RawRead(8,withError=True) data = struct.unpack("q", rawdata)[0] return data
[docs] def ReadData(self,cpt:int,datatype:Type) -> np.ndarray: """Read data cpt instances of type datatype from the input buffer first try to use the np.fromfile for speed if it fail will try np.frombuffer Parameters ---------- cpt : int number of data to read datatype : Type data type Returns ------- np.ndarray the data read """ try: return np.fromfile(self.filePointer,dtype=datatype,count=cpt,sep="") except: s = np.dtype(datatype).itemsize*cpt data = self.filePointer.read(s) return np.frombuffer(data,dtype=datatype)
def __reshapeData(self,data,finalShape=None): if finalShape is None: return data else: data.shape = finalShape return data
[docs] def ReadFloats32(self,cpt:int,finalShape:Tuple=None) -> np.ndarray: return self.__reshapeData(self.ReadData(cpt,np.float32), finalShape)
[docs] def ReadFloats64(self,cpt:int,finalShape:Tuple=None) -> np.ndarray: return self.__reshapeData(self.ReadData(cpt,np.float64), finalShape)
[docs] def Seek(self,pos:int)->None: """Move pointer to the internal file descriptor to the position pos Parameters ---------- cpt : int _description_ """ self.filePointer.seek(pos)
[docs]def CheckIntegrity(): obj = ReaderBase() obj.commentHeader = "#" obj.SetBinary(False) try: obj.StartReading() raise # pragma: no cover except : pass testString = """0 1 2 3 #this is a comment 4""" obj.SetStringToRead(testString) def checkBaseReaderAscii(obj): obj.StartReading() print("file Pointer: ", str(obj.GetFilePointer() ) ) if obj.PeekLine() != "0\n": # pragma: no cover raise Exception("first line not correct ") if obj.Peek() != "0": # pragma: no cover raise for i in range(5): data = obj.ReadCleanLine() print(f"-> {data}") if i != int(data): raise # pragma: no cover #before last if obj.ReadCleanLine() != None: raise # pragma: no cover from Muscat.Helpers.CheckTools import MustFailFunction MustFailFunction(obj.ReadCleanLine, True) obj.EndReading() checkBaseReaderAscii(obj) from Muscat.Helpers.IO.FileTools import WriteTempFile import pathlib fn = WriteTempFile('ReaderBaseTestString',testString) obj.SetFileName(pathlib.Path(fn)) checkBaseReaderAscii(obj) binarydata = np.array([0], dtype=np.int32).tobytes() binarydata += np.array([1], dtype=np.int64).tobytes() binarydata += np.array([2], dtype=np.float32).tobytes() binarydata += np.array([3], dtype=np.float64).tobytes() fn = WriteTempFile('ReaderBaseTestbinary', binarydata, mode='wb') obj.SetBinary(False) obj.SetBinary(True) obj.SetBinary(False) obj.SetBinary(False) obj.SetBinary(True) obj.SetBinary(True) obj.SetFileName(fn) print(obj.readFormat) def checkBaseReaderBinary(obj): obj.StartReading() if obj.ReadInt32() != 0: raise if obj.ReadInt64() != 1: raise if obj.ReadFloats32(1, (1,1)) != 2.: raise if obj.ReadFloats64(1) != 3.: raise obj.EndReading() checkBaseReaderBinary(obj) obj.SetStringToRead(binarydata) checkBaseReaderBinary(obj) obj.StartReading() obj.Seek(4) return "ok"
if __name__ == '__main__': print(CheckIntegrity())# pragma: no cover