# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
import pickle as pickle
import socket
import sys
import os
import threading
from Muscat.Helpers.Logger import Debug, Info
"""Backport of importlib.import_module from 3.x."""
# While not critical (and in no way guaranteed!), it would be nice to keep this
# code compatible with Python 2.3.
# code to make work the import_module in abaqus python 2.6
# if python of abaqus gets updated please erase this function
def _resolve_name(name, package, level):
"""Return the absolute name of the module to be imported."""
if not hasattr(package, 'rindex'):
raise ValueError("'package' not set to a string")
dot = len(package)
for x in range(level, 1, -1):
try:
dot = package.rindex('.', 0, dot)
except ValueError:
raise ValueError("attempted relative import beyond top-level "
"package")
return "%s.%s" % (package[:dot], name)
[docs]def import_module(name, package=None):
"""Import a module.
The 'package' argument is required when performing a relative import. It
specifies the package to use as the anchor point from which to resolve the
relative import to an absolute import.
"""
if name.startswith('.'):
if not package:
raise TypeError("relative imports require the 'package' argument")
level = 0
for character in name:
if character != '.':
break
level += 1
name = _resolve_name(name[level:], package, level)
__import__(name)
return sys.modules[name]
####################################
# solution for time out form https://stackoverflow.com/questions/492519/timeout-on-a-function-call
[docs]def TimeOutHandler():
Debug("Connection TimeOut")
exit(1)
[docs]class WormholeBase():
def __init__(self, timeout=3600):
super().__init__()
self.socket = None
self.otherSideR = None
self.otherSideS = None
self.proto = 0
self.timeout = timeout
[docs] def Receive(self):
if not (self.timeout is None):
alarm = threading.Timer(self.timeout, TimeOutHandler)
alarm.start()
try:
sizestream = ""
while (len(sizestream) < 64):
if self.socket is None:
sizestream += self.otherSideR.read(1).decode('utf8')
else:
sizestream += self.otherSideR.recv(1).decode('utf8')
size = int(sizestream)
if self.socket is None:
datastream = self.otherSideR.read(size)
else:
datastream = self.otherSideR.recv(size)
ldata = len(datastream)
while ldata < size:
if self.socket is None:
datastream += self.otherSideR.read(size-ldata)
else:
datastream += self.otherSideR.recv(size-ldata)
ldata = len(datastream)
if int(sys.version_info.major) >= 3:
data = pickle.loads(datastream, encoding='latin1')
else:
data = pickle.loads(datastream,)
except Exception:
exit(1)
if not (self.timeout is None):
alarm.cancel()
return data
[docs] def Send(self, data):
Debug("Sending data")
if int(sys.version_info.major) >= 3:
streamdata = pickle.dumps(data, self.proto, fix_imports=True)
else:
streamdata = pickle.dumps(data, self.proto)
data = str(len(streamdata)).zfill(64)
if self.socket is None:
self.otherSideS.write(data.encode('utf8'))
self.otherSideS.write(streamdata)
self.otherSideS.flush()
else:
self.otherSideS.send(data.encode('utf8'))
self.otherSideS.send(streamdata)
[docs] def Close(self):
if not (self.otherSideR is self.otherSideS):
self.otherSideS.close()
self.otherSideR.close()
[docs]class WormholeServer():
def __init__(self, port=None, cmd=None, dry=False, timeout=3600, autoStart=True):
super().__init__()
self.globals = {}
self.ready = False
# no code is executes only print it
self.drymode = dry
self.port = port
self.cmd = cmd
self.timeout = timeout
if autoStart:
self.Start()
[docs] def Start(self):
if self.port is not None:
self.communicator = WormholeBase(timeout=self.timeout)
self.ListenUsingPort(self.port)
self.MainLoop()
self.communicator.socket.close()
self.ready = False
elif self.cmd is not None:
#from Muscat.IO.Proxy import ServerProxy
self.communicator = WormholeBase(timeout=self.timeout)
from Muscat.Helpers.PrintBypass import PrintBypass
self.printBypass = PrintBypass()
from Muscat.Helpers.IO.FileTools import GetUniqueTempFile
out = GetUniqueTempFile(".log", "output_")[1]
err = GetUniqueTempFile(".log2", "output_")[1]
self.printBypass.ToDisk(out, err)
# self.printBypass.ToSink()
self.StartUsingPipe()
self.MainLoop()
self.ready = False
self.printBypass.Restore()
[docs] def StartUsingPipe(self):
if int(sys.version_info.major) >= 3:
self.communicator.otherSideR = self.printBypass.stdin_.buffer
self.communicator.otherSideS = self.printBypass.stdout_.buffer
else:
self.communicator.otherSideR = self.printBypass.stdin_
self.communicator.otherSideS = self.printBypass.stdout_
self.ready = True
[docs] def ListenUsingPort(self, port=None):
if port is None:
port = 12345
self.communicator.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.communicator.socket.bind(('', port))
self.communicator.socket.listen(0)
self.ready = True
self.communicator.otherSideR, address = self.communicator.socket.accept()
self.communicator.otherSideS = self.communicator.otherSideR
Debug("(s) {0} connected".format(address))
[docs] def ProtocolNegotiation(self):
ClientHighestProtocol = self.communicator.Receive()
proto = min(pickle.HIGHEST_PROTOCOL, ClientHighestProtocol)
Debug("(s) Using Protocol " + str(proto))
self.communicator.Send(proto)
self.communicator.proto = proto
[docs] def MainLoop(self):
while (True):
action = self.communicator.Receive()
Debug(action)
if action == "p":
self.ProtocolNegotiation()
elif action == "s":
#print("receiving data")
key = self.communicator.Receive()
#print("receiving data for ",key)
value = self.communicator.Receive()
Debug(str(key) + " = " + str(value))
if self.drymode:
Debug("(s)" + str(key) + " = " + str(value))
else:
self.globals[key] = value
# print(self.globals)
#eval(key + " = value",self.globals)
elif action == "c":
expression = self.communicator.Receive()
Debug(expression)
if self.drymode:
Debug("(s) Exec: " + expression)
else:
try:
exec(str(expression), self.globals)
except Exception as e:
self.communicator.Send(str(e))
raise (e)
self.communicator.Send("OK")
elif action == "r":
Debug("(s) Sending data back : " + str(key))
if self.drymode:
self.Send(None)
else:
key = self.communicator.Receive()
self.communicator.Send(self.globals[key])
elif action == "x":
Debug("(s) exit")
self.communicator.Close()
return
else:
Debug("Dont know how to treat " + str(action))
return
[docs]class WormholeClient():
def __init__(self, port=None, host=None, proc=None):
super().__init__()
self.communicator = None
if port is not None:
self.communicator = WormholeBase()
self.Connect(port=port, host=host)
elif proc is not None:
#from Muscat.IO.Proxy import ServerProxy
self.communicator = WormholeBase()
self.StartUsingPipe(proc)
[docs] def StartUsingPipe(self, proc):
self.communicator.otherSideS = proc.stdin
self.communicator.otherSideR = proc.stdout
self.ProtocolNegotiation()
self.communicator.proto = self.proto
[docs] def Connect(self, port=None, host=None):
self.communicator = WormholeBase()
if port is None:
port = 12345
if host is None:
host = "localhost"
self.communicator.otherSideS = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.communicator.socket = True
self.communicator.otherSideR = self.communicator.otherSideS
Debug("(c) connecting to "),
Debug(str((host, port)))
self.communicator.otherSideS.connect((host, port))
Debug("(c) Connection on {0}".format(port))
self.ProtocolNegotiation()
Debug("(c) Using protocol " + str(self.proto))
self.communicator.proto = self.proto
[docs] def ProtocolNegotiation(self):
# request protocol negotiation
Debug("pickle.HIGHEST_PROTOCOL " + str(pickle.HIGHEST_PROTOCOL))
self.communicator.Send("p")
self.communicator.Send(pickle.HIGHEST_PROTOCOL)
#self.Communicator.send(str(pickle.HIGHEST_PROTOCOL)[0].encode() )
self.proto = self.communicator.Receive()
Debug("self.proto " + str(self.proto))
[docs] def SendData(self, key, data):
Debug("(c) sending " + str(key) + " :: " + str(data))
self.communicator.Send("s")
self.communicator.Send(key)
self.communicator.Send(data)
[docs] def RemoteExec(self, expression):
Debug("Remote Exec : '" + str(expression) + "'")
self.communicator.Send("c")
self.communicator.Send(expression)
responce = self.communicator.Receive()
if len(responce) == 2 and responce == "OK":
pass
else:
Debug(responce)
raise (Exception(responce))
[docs] def RetrieveData(self, variable):
self.communicator.Send("r")
self.communicator.Send(variable)
return self.communicator.Receive()
[docs] def Exit(self):
self.communicator.Send("x")
self.communicator.Close()
[docs]def CheckIntegrityNetWork():
import time
testport = GetAnFreePortNumber()
import threading
# try:
if True:
data = {"server": None}
def runServer():
print("(s) Starting Server Side ", testport)
data["server"] = WormholeServer(testport, dry=False, timeout=None, autoStart=False)
data["server"].Start()
from functools import partial
TT = threading.Thread(target=runServer)
TT.start()
cpt = 0
while True:
server = data.get("server", None)
if server is not None:
print(server.ready)
if server.ready:
break
time.sleep(0.01)
if cpt > 50:
break
cpt += 1
print("(c) Starting Client Side ", testport)
client = WormholeClient(testport)
client.SendData("Hola", 5)
client.RemoteExec("Hola += 3")
newhola = client.RetrieveData("Hola")
client.Exit()
print("Done")
if newhola == 8:
return 'ok'
return "Not ok"
try:
pass
except:
TT.join(0)
return "Not OK"
[docs]def GetPipeWormholeScript():
from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory
return """
from Muscat.IO.Wormhole import WormholeServer
from Muscat.Helpers.IO.TemporaryDirectory import TemporaryDirectory
TemporaryDirectory.SetTempPath(r"{0}")
a = WormholeServer(cmd="")
exit()
""".format(TemporaryDirectory.GetTempPath().replace("\\", "/"))
[docs]def CheckIntegrityPipe():
import time
# try:
if True:
def runServerPipe(cmd):
script = GetPipeWormholeScript()
print("(s) Starting Server Side")
import subprocess
import os
proc = subprocess.Popen([cmd, '-c', script], cwd=os.getcwd(),
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
return proc
proc = runServerPipe(sys.executable)
print("(c) Starting Client Side")
time.sleep(0.1)
client = WormholeClient(proc=proc)
client.SendData("Hola", 5)
client.RemoteExec("Hola += 3")
newhola = client.RetrieveData("Hola")
client.Exit()
print("Done")
if newhola == 8:
return 'ok'
return "Not ok"
try:
pass
except:
TT.join(0)
return "Not OK"
[docs]def GetAnFreePortNumber():
import socket
s = socket.socket()
s.bind(("", 0))
portNumber = s.getsockname()[1]
s.close()
return portNumber
[docs]def CheckIntegrity(GU=False):
res = CheckIntegrityNetWork()
if str(res).lower() != "ok":
return res
return CheckIntegrityPipe()
if __name__ == '__main__':
def RunClient(testport):
print("Client Side")
client = WormholeClient(testport)
client.SendData("Hola", 5)
client.RemoteExec("Hola += 3")
newhola = client.RetrieveData("Hola")
print("new Hola :", newhola)
# client.ImportModule("MuscatWriter","XdmfWriter")
client.RemoteExec("from Muscat.IO import XdmfWriter")
client.RemoteExec("from Muscat.IO.XdmfWriter import WriteMeshToXdmf")
import numpy as np
client.SendData("p", np.array([5, 10, 4]))
# client.RemoteEval("v","XdmfWriter.ArrayToString(p)")
client.RemoteExec("v = XdmfWriter.ArrayToString(p)")
client.RemoteExec("a =2 ")
v = client.RetrieveData("v")
print(type(v))
print("v " + str(v))
p = client.RetrieveData("p")
print(type(p))
print("p " + str(p))
client.Exit()
testport = 12348
if len(sys.argv) > 1 and sys.argv[1] == "-s":
print("Server Side")
WormholeServer(testport, dry=False)
elif len(sys.argv) > 1 and sys.argv[1] == "-c":
RunClient(testport)
else:
testport = GetAnFreePortNumber()
import threading
t = threading.Thread(target=WormholeServer, name="WormHoleServer", kwargs={"port": testport, "timeout": None})
t.daemon = True
t.start()
import time
time.sleep(1)
RunClient(testport)
t.join(5)