# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
"""Odb file reader (Abaqus result file)
"""
import os
import numpy as np
from collections import defaultdict
from Muscat.IO.AbaqusTools import GetAbaqusExec
from Muscat.Containers.Mesh import Mesh
from Muscat.IO.AbaqusTools import InpNameToMuscat, permutation
from Muscat.Helpers.Logger import Info, Debug, Error
abaqus_EXEC = os.environ.get("ABAQUS_EXEC","abaqus")
[docs]class OdbReader():
"""Obd Reader class
"""
def __init__(self):
super().__init__()
self.canHandleTemporal = True
self.fieldsNamesToRead = []
self.intanceNumberToRead = 0
self.timeToRead = -1
self.filename =None
self.time = None
self.stepData = None
self.__VariablesToPush = ["fieldsNamesToRead","filename","timeToRead"]
self.__FunctionToBypass = ["SetFileName","SetFieldNameToRead","SetTimeToRead"]
self.proc = None
self.client = None
self.odb = None
self.output = None
self.encoding = None
[docs] def Reset(self):
"""Resets the reader (field name, instance number, TimeToRead, time and stepata)
"""
self.fieldsNamesToRead = []
self.intanceNumberToRead = 0
self.timeToRead = None
self.time = None
self.stepData =None
[docs] def GetAvailableTimes(self):
"""Returns the reading time
Returns
-------
int
reading time
"""
return self.time
def __getattribute__(self, name):
attr = object.__getattribute__(self, name)
if hasattr(attr, '__call__'):
if name in self.__FunctionToBypass:
return attr
try:
import odbAccess as OA
return attr
except:
if self.proc == None:
from Muscat.IO.Wormhole import WormholeClient,GetPipeWormholeScript
from Muscat.Helpers.IO.FileTools import WriteTempFile
import subprocess
script = GetPipeWormholeScript()
fn = WriteTempFile("WormholeServer.py",script)
self.proc = subprocess.Popen([abaqus_EXEC,"python","-B",fn], stdout=subprocess.PIPE,stdin=subprocess.PIPE)
self.client = WormholeClient(proc=self.proc)
self.client.RemoteExec("from Muscat.IO.OdbReader import OdbReader")
self.client.RemoteExec("reader = OdbReader()")
def newfunc(*args, **kwargs):
for var in self.__VariablesToPush:
val = object.__getattribute__(self, var)
self.client.SendData(var,object.__getattribute__(self, var))
if type(val) == str:
self.client.RemoteExec("{0} = str({0})".format(var) )
self.client.RemoteExec("reader.{0} = {0}".format(var) )
self.client.SendData("args",args)
self.client.SendData("kwargs",kwargs)
self.client.RemoteExec("res = reader.{0}(*args, **kwargs)".format(name) )
res = self.client.RetrieveData("res")
return res
return newfunc
else:
return attr
[docs] def SetFileName(self, fn):
"""Sets fileName
Parameters
----------
fn : str
fileName to set
"""
self.filename = fn
[docs] def SetIntanceNumberToRead(self, i):
"""Sets instance number to read
Parameters
----------
i : int
instance number to read
"""
self.intanceNumberToRead = i
[docs] def SetFieldsNamesToRead(self, val):
"""Sets field name to read
Parameters
----------
val : str
field name to read
"""
self.fieldsNamesToRead = val
[docs] def SetTimeToRead(self, time, timeIndex=None):
"""Sets time to read
Parameters
----------
time : float
time at which the data are read
timeIndex : int, optional
time index to read, by default None
"""
if time is not None:
self.timeToRead = time
elif timeIndex is not None:
self.timeToRead = self.time[timeIndex]
[docs] def ConvertInstanceToMuscat(self,instance):
"""Converts odb instance to Muscat objects
Parameters
----------
instance : odb.rootAssembly.instances
odb instance
Returns
-------
Mesh, dict, dict
conversion of the odb instance in objects that can be handled
by Muscat
"""
res = Mesh()
nodes = instance.nodes
nbnodes = len(nodes)
res.nodes = np.empty((nbnodes,3),dtype=float)
res.originalIDNodes = np.empty((nbnodes),dtype=int)
abaToMeshNode = {}
cpt = 0
Debug("Reading Nodes")
for i in nodes:
res.nodes[cpt,:] = i.coordinates
res.originalIDNodes[cpt] = i.label
abaToMeshNode[i.label] = cpt
cpt += 1
Debug("Reading Nodes Keys")
res.PrepareForOutput()
nSets = instance.nodeSets
for nSetK in nSets.keys():
nSet = nSets[nSetK]
name = nSet.name
tag = res.nodesTags.CreateTag(name,False)
buffer = []
for node in nSet.nodes:
buffer.append(node.label)
enum = [abaToMeshNode[x] for x in buffer ]
tag.AddToTag(enum)
tag.RemoveDoubles()
elements = instance.elements
Debug("Reading Elements")
elemToMeshElem = {}
for elem in elements:
eltype = InpNameToMuscat[elem.type]
conn = [abaToMeshNode[n] for n in elem.connectivity ]
elems = res.GetElementsOfType(eltype)
per = permutation.get(elem.type,None)
enum = elems.AddNewElement(conn,elem.label)-1
elemToMeshElem[elem.label] = (eltype,enum)
for name,data in res.elements.items():
per = permutation.get(name,None)
if per is not None :
data.connectivity = data.connectivity[:,per]
Debug("Reading Elements Keys")
res.PrepareForOutput()
eSets = instance.elementSets
for eSetK in eSets.keys():
eSet =eSets[eSetK]
Debug("Reading set {eSetK}".format(eSetK=eSetK))
name = eSet.name
cpt = 0
temptagdata = defaultdict(lambda : list() )
for elem in eSet.elements:
temptagdata[elem.type].append(elem.label)
for elemstype,label in temptagdata.items():
res.GetElementsOfType(InpNameToMuscat[elemstype]).GetTag(name).SetIds( [ elemToMeshElem[x][1] for x in label ] )
for name,data in res.elements.items():
for tag in data.tags:
tag.RemoveDoubles()
res.PrepareForOutput()
return res, abaToMeshNode, elemToMeshElem
[docs] def Read(self):
"""Function that performs the reading of an odb file
Returns
-------
Mesh
output unstructured mesh object containing reading result
"""
if self.output == None:
odb = self.Open()
self.ReadMetaData(odb)
instance = odb.rootAssembly.instances
instancename = list(instance.keys())[self.intanceNumberToRead]
self.__currentInstance = instance[instancename]
res, abaToMeshNode, elemToMeshElem = self.ConvertInstanceToMuscat(instance[instancename])
self.abatomesh = abaToMeshNode
self.elemMap = elemToMeshElem
self.output = res
Debug("Reading Fields")
self.output.nodeFields,self.output.elemFields = self.ReadFields(self.abatomesh,self.elemMap,self.output)
return self.output
[docs] def GetActiveFrame(self):
"""Returns odb active frame
Returns
-------
odb.steps.frames
active frame
"""
if self.timeToRead == -1.:
timeIndex = len(self.time)-1
else:
timeIndex = np.argmin(abs(self.time - self.timeToRead ))
name, val = self.stepData[timeIndex]
odb = self.Open()
frame = odb.steps[name].frames[val]
return frame
[docs] def Open(self):
"""Opens odb object using odbAccess
Returns
-------
Odb
odb object
"""
if not(self.odb is None):
return self.odb
import odbAccess as OA
self.odb = OA.openOdb(self.filename,readOnly=True)
self.ReadMetaData()
return self.odb
[docs] def ReadFields(self, nodeMap, elemMap, res):
"""Function that reads the fields defined in an odb file
Parameters
----------
nodeMap : dict
node map generated by ConvertInstanceToMuscat
elemMap : dict
element map generated by ConvertInstanceToMuscat
res : Mesh
unstructured mesh generated by ConvertInstanceToMuscat
Returns
-------
dict, dict
fields read from an odb file
"""
frame = self.GetActiveFrame()
import odbAccess as OA
nodalFields = {}
elemFields = {}
s1 = 0
s2 = 1
for name,data in frame.fieldOutputs.items():
if len(self.fieldsNamesToRead) != 0 and name not in self.fieldsNamesToRead:
continue
if len(self.fieldsNamesToRead) == 0:
if name in [ "COORD" ]:
Debug("skip field {}".format(name))
continue
Debug("Reading {name} (type:{type})".format(name=name,type=str(data.type)))
if data.type == OA.SCALAR:
s2 = 1
elif data.type == OA.VECTOR:
s2 = 3
elif data.type == OA.TENSOR_3D_FULL:
s2 = 6
else:
Debug("do not how to treat {}".format(data.type))
raise(Exception("error"))
if data.locations[0].position == OA.CENTROID:
s1 = len(elemMap)
storage = elemFields
storage[name] = self.ReadFieldWithMapElement(elemMap,data,s1,s2,res)
elif data.locations[0].position == OA.NODAL:
s1 = len(nodeMap)
storage = nodalFields
storage[name] = self.ReadFieldWithMapNode(nodeMap,data,s1,s2)
elif data.locations[0].position == OA.INTEGRATION_POINT:
sdata = data.getSubset(position=OA.CENTROID)
s1 = len(elemMap)
storage = elemFields
storage[name] = self.ReadFieldWithMapElement(elemMap,data,s1,s2,res)
else:
sdata = data.getSubset(position=OA.NODAL)
s1 = len(nodeMap)
storage = nodalFields
storage[name] = self.ReadFieldWithMapNode(nodeMap,sdata,s1,s2)
return nodalFields, elemFields
[docs] def ReadFieldWithMapNode(self,entityMap,field,s1,s2):
res = np.zeros((s1,s2))
fieldValues = field.values
for v in fieldValues :
if self.__currentInstance != v.instance:
continue
try:
nid = entityMap[v.nodeLabel]
except:
continue
res[nid,:] = v.data
return res
[docs] def ReadFieldWithMapElement(self,entityMap,field,s1,s2,mesh):
res = np.zeros((s1,s2))
fieldValues = field.values
for v in fieldValues :
if self.__currentInstance != v.instance:
continue
eltype, localnumb = entityMap[v.elementLabel]
nid = localnumb + mesh.elements[eltype].globaloffset
res[nid,:] = v.data
return res
from Muscat.IO.IOFactory import RegisterReaderClass
RegisterReaderClass(".odb",OdbReader)
[docs]def CheckIntegrity(GUI:bool = False):
from Muscat.Helpers.CheckTools import SkipTest
if SkipTest("ABAQUS_NO_FAIL"): return "ok"
if not GetAbaqusExec():
return "skip (Abaqus exec not found)"
import time as tt
at = tt.time()
reader = OdbReader()
# no .odb in the database for test for the moment
return "ok"
#reader.SetFileName("path/Job-1.odb")
#time, stepData = reader.ReadMetaData()
#print("time")
#print(time)
#print(stepData)
#reader.timeToRead = 2.0
#reader.SetFieldsNamesToRead(["U"])
#print(reader.Read())
#print(tt.time() - at)
#return "OK"
if __name__ == '__main__':
print(CheckIntegrity(True))