"" " solver.py Original version by Bruce Eckel Solves one part of the problem. "" " import sys, random, math from twisted.spread import pb from twisted.internet import reactor class Solver (pb.Root): def __init __ (self, id): print "solver.py% s: solver init"% id self.id = id def __str __ (self): # String representation return "Solver% s"% self.id def remote_initialize (self, initArg): return "% s initialized"% self def step (self, arg): print "solver.py% s: solver step"% self.id "Simulate work and return result" result = 0 for i in range (random.randint (1,000,000, 3,000,000)): angle = math.radians (random.randint (0, 45)) result + = math.tanh (angle) /math.cosh (angle) return "% s,% s, result:% .2f"% (self, str (arg), result) # Alias ββmethods, for demonstration version: remote_step1 = step remote_step2 = step remote_step3 = step def remote_status (self): print "solver.py% s: remote_status"% self.id Return "% s operational"% self def remote_terminate (self): print "solver.py% s: remote_terminate"% self.id reactor.callLater (0.5, reactor.stop) return "% s terminating ..."% self if __name__ == "__main__": port = int (sys.argv [1]) reactor.listenTCP (port, pb.PBServerFactory (Solver (sys.argv [1]))) reactor.run ()
"" " Controller.py Original version by Bruce Eckel Starts and manages solvers in separate processes for parallel processing. "" " import sys from subprocess import Popen from twisted.spread import pb from twisted.internet import reactor, defer START_PORT = 5566 MAX_PROCESSES = 2 class Controller (object): def broadcastCommand (self, remoteMethodName, arguments, nextStep, failureMessage): print "controller.py: broadcasting ..." deferreds = [solver.callRemote (remoteMethodName, arguments) for solver in self.solvers.values ββ()] print "controller.py: broadcasted" reactor.callLater (3, self.checkStatus) defer.DeferredList (deferreds, consumeErrors = True) .addCallbacks ( nextStep, self.failed, errbackArgs = (failureMessage)) def checkStatus (self): print "controller.py: checkStatus" for solver in self.solvers.values ββ(): solver.callRemote ("status"). addCallbacks ( lambda r: sys.stdout.write (r + "\ n"), self.failed, errbackArgs = ("Status Check Failed")) def failed (self, results, failureMessage = "Call Failed"): print "controller.py: failed" for (success, returnValue), (address, port) in zip (results, self.solvers): if not success: raise Exception ("address:% s port:% d% s"% (address, port, failureMessage)) def __init __ (self): print "controller.py: init" self.solvers = dict.fromkeys ( [("localhost", i) for i in range (START_PORT, START_PORT + MAX_PROCESSES)]) self.pids = [Popen (["python", "solver.py", str (port)]). pid for ip, port in self.solvers] print "PIDS:", self.pids self.connected = False reactor.callLater (1, self.connect) def connect (self): print "controller.py: connect" connections = [] for address port in self.solvers: factory = pb.PBClientFactory () reactor.connectTCP (address, port, factory) connections.append (factory.getRootObject ()) defer.DeferredList (connections, consumeErrors = True) .addCallbacks ( self.storeConnections, self.failed, errbackArgs = ("Failed to Connect")) print "controller.py: starting parallel jobs" self.start () def storeConnections (self, results): print "controller.py: storeconnections" for (success, solver), (address, port) in zip (results, self.solvers): self.solvers [address, port] = solver print "controller.py: Connected; self.solvers:", self.solvers self.connected = True def start (self): "controller.py: Begin the solving process" if not self.connected: return reactor.callLater (0.5, self.start) self.broadcastCommand ("step1", ("step 1"), self.step2, "Failed Step 1") def step2 (self, results): print "controller.py: step 1 results:", results self.broadcastCommand ("step2", ("step 2"), self.step3, "Failed Step 2") def step3 (self, results): print "controller.py: step 2 results:", results self.broadcastCommand ("step3", ("step 3"), self.collectResults, "Failed Step 3") def collectResults (self, results): print "controller.py: step 3 results:", results self.terminate () def terminate (self): print "controller.py: terminate" for solver in self.solvers.values ββ(): solver.callRemote ("terminate"). addErrback (self.failed, "Termination Failed") reactor.callLater (1, reactor.stop) return "Terminating remote solvers" if __name__ == "__main__": controller = Controller () reactor.run ()
python controller.py
controller.py: init PIDS: [12173, 12174] solver.py 5567: solver init solver.py 5566: solver init controller.py: connect controller.py: starting parallel jobs controller.py: storeconnections controller.py: Connected; self.solvers: {('localhost', 5567):, ('localhost', 5566):} controller.py: broadcasting ... controller.py: broadcasted solver.py 5566: solver step solver.py 5567: solver step controller.py: checkStatus solver.py 5566: remote_status Solver 5566 operational solver.py 5567: remote_status controller.py: step 1 results: [(True, 'Solver 5567, step 1, result: 683825.75'), (True, 'Solver 5566, step 1, result: 543177.17')] controller.py: broadcasting ... controller.py: broadcasted Solver 5567 operational solver.py 5566: solver step solver.py 5567: solver step controller.py: checkStatus solver.py 5566: remote_status Solver 5566 operational solver.py 5567: remote_status controller.py: step 2 results: [(True, 'Solver 5567, step 2, result: 636793.90'), (True, 'Solver 5566, step 2, result: 335358.16')] controller.py: broadcasting ... controller.py: broadcasted Solver 5567 operational solver.py 5566: solver step solver.py 5567: solver step controller.py: checkStatus solver.py 5566: remote_status Solver 5566 operational solver.py 5567: remote_status controller.py: step 3 results: [(True, 'Solver 5567, step 3, result: 847386.43'), (True, 'Solver 5566, step 3, result: 512120.15')] controller.py: terminate Solver 5567 operational solver.py 5566: remote_terminate solver.py 5567: remote_terminate
Source: https://habr.com/ru/post/97201/
All Articles