Twisted is a Python framework for developing network applications that, among many other applications, can also be used for parallel data processing β multiprocessing. This is great, but I had to sweat in order to find what I needed. "" "
solver.py
Original version by Bruce Eckel
Solves one part of the problem.
"" "
import sys, random, math
from twisted.spread import pb
from twisted.internet import reactor
class Solver (pb.Root):
def __init __ (self, id):
print "solver.py% s: solver init"% id
self.id = id
def __str __ (self): # String representation
return "Solver% s"% self.id
def remote_initialize (self, initArg):
return "% s initialized"% self
def step (self, arg):
print "solver.py% s: solver step"% self.id
"Simulate work and return result"
result = 0
for i in range (random.randint (1,000,000, 3,000,000)):
angle = math.radians (random.randint (0, 45))
result + = math.tanh (angle) /math.cosh (angle)
return "% s,% s, result:% .2f"% (self, str (arg), result)
# Alias ββmethods, for demonstration version:
remote_step1 = step
remote_step2 = step
remote_step3 = step
def remote_status (self):
print "solver.py% s: remote_status"% self.id
Return "% s operational"% self
def remote_terminate (self):
print "solver.py% s: remote_terminate"% self.id
reactor.callLater (0.5, reactor.stop)
return "% s terminating ..."% self
if __name__ == "__main__":
port = int (sys.argv [1])
reactor.listenTCP (port, pb.PBServerFactory (Solver (sys.argv [1])))
reactor.run ()
"" "
Controller.py
Original version by Bruce Eckel
Starts and manages solvers in separate processes for parallel processing.
"" "
import sys
from subprocess import Popen
from twisted.spread import pb
from twisted.internet import reactor, defer
START_PORT = 5566
MAX_PROCESSES = 2
class Controller (object):
def broadcastCommand (self, remoteMethodName, arguments, nextStep, failureMessage):
print "controller.py: broadcasting ..."
deferreds = [solver.callRemote (remoteMethodName, arguments)
for solver in self.solvers.values ββ()]
print "controller.py: broadcasted"
reactor.callLater (3, self.checkStatus)
defer.DeferredList (deferreds, consumeErrors = True) .addCallbacks (
nextStep, self.failed, errbackArgs = (failureMessage))
def checkStatus (self):
print "controller.py: checkStatus"
for solver in self.solvers.values ββ():
solver.callRemote ("status"). addCallbacks (
lambda r: sys.stdout.write (r + "\ n"), self.failed,
errbackArgs = ("Status Check Failed"))
def failed (self, results, failureMessage = "Call Failed"):
print "controller.py: failed"
for (success, returnValue), (address, port) in zip (results, self.solvers):
if not success:
raise Exception ("address:% s port:% d% s"% (address, port, failureMessage))
def __init __ (self):
print "controller.py: init"
self.solvers = dict.fromkeys (
[("localhost", i) for i in range (START_PORT, START_PORT + MAX_PROCESSES)])
self.pids = [Popen (["python", "solver.py", str (port)]). pid
for ip, port in self.solvers]
print "PIDS:", self.pids
self.connected = False
reactor.callLater (1, self.connect)
def connect (self):
print "controller.py: connect"
connections = []
for address port in self.solvers:
factory = pb.PBClientFactory ()
reactor.connectTCP (address, port, factory)
connections.append (factory.getRootObject ())
defer.DeferredList (connections, consumeErrors = True) .addCallbacks (
self.storeConnections, self.failed, errbackArgs = ("Failed to Connect"))
print "controller.py: starting parallel jobs"
self.start ()
def storeConnections (self, results):
print "controller.py: storeconnections"
for (success, solver), (address, port) in zip (results, self.solvers):
self.solvers [address, port] = solver
print "controller.py: Connected; self.solvers:", self.solvers
self.connected = True
def start (self):
"controller.py: Begin the solving process"
if not self.connected:
return reactor.callLater (0.5, self.start)
self.broadcastCommand ("step1", ("step 1"), self.step2, "Failed Step 1")
def step2 (self, results):
print "controller.py: step 1 results:", results
self.broadcastCommand ("step2", ("step 2"), self.step3, "Failed Step 2")
def step3 (self, results):
print "controller.py: step 2 results:", results
self.broadcastCommand ("step3", ("step 3"), self.collectResults, "Failed Step 3")
def collectResults (self, results):
print "controller.py: step 3 results:", results
self.terminate ()
def terminate (self):
print "controller.py: terminate"
for solver in self.solvers.values ββ():
solver.callRemote ("terminate"). addErrback (self.failed, "Termination Failed")
reactor.callLater (1, reactor.stop)
return "Terminating remote solvers"
if __name__ == "__main__":
controller = Controller ()
reactor.run ()
python controller.py
controller.py: init
PIDS: [12173, 12174]
solver.py 5567: solver init
solver.py 5566: solver init
controller.py: connect
controller.py: starting parallel jobs
controller.py: storeconnections
controller.py: Connected; self.solvers: {('localhost', 5567):, ('localhost', 5566):}
controller.py: broadcasting ...
controller.py: broadcasted
solver.py 5566: solver step
solver.py 5567: solver step
controller.py: checkStatus
solver.py 5566: remote_status
Solver 5566 operational
solver.py 5567: remote_status
controller.py: step 1 results: [(True, 'Solver 5567, step 1, result: 683825.75'), (True, 'Solver 5566, step 1, result: 543177.17')]
controller.py: broadcasting ...
controller.py: broadcasted
Solver 5567 operational
solver.py 5566: solver step
solver.py 5567: solver step
controller.py: checkStatus
solver.py 5566: remote_status
Solver 5566 operational
solver.py 5567: remote_status
controller.py: step 2 results: [(True, 'Solver 5567, step 2, result: 636793.90'), (True, 'Solver 5566, step 2, result: 335358.16')]
controller.py: broadcasting ...
controller.py: broadcasted
Solver 5567 operational
solver.py 5566: solver step
solver.py 5567: solver step
controller.py: checkStatus
solver.py 5566: remote_status
Solver 5566 operational
solver.py 5567: remote_status
controller.py: step 3 results: [(True, 'Solver 5567, step 3, result: 847386.43'), (True, 'Solver 5566, step 3, result: 512120.15')]
controller.py: terminate
Solver 5567 operational
solver.py 5566: remote_terminate
solver.py 5567: remote_terminate
Source: https://habr.com/ru/post/97201/
All Articles