πŸ“œ ⬆️ ⬇️

Python and Twisted - Notes on parallel data processing (multiprocessing)

image Twisted is a Python framework for developing network applications that, among many other applications, can also be used for parallel data processing β€” multiprocessing. This is great, but I had to sweat in order to find what I needed.

I flipped through the Twisted documentation and the O'Reilly Twisted book. There is also a recipe in the Python Cookbook . However, I found the most interesting in the article of Bruce Ekkel - Parallelism with Python, Twisted and Flex . Also worth reading Bruce Ekkel's original articles about Twisted: Grokking Twisted .

Here are my comments on the current example of Bruce.
')
I removed the Flex - partly because I do not need it and I do not want to know anything about it. In the example, a controller is started, which initializes a number of separate parallel processes-calculators, in which some complex actions are already started (these processes are called solvers). Also here there is an interaction between the controller and computers. Although this example runs only on one machine, the principles described in the article are not difficult to extend to a system of several computers.

For a good example of how this works, please see the original article .

Here is the solver.py which is copied from the original. The real β€œwork” takes place in the step () method. I just added some debug information for myself.
 "" "
 solver.py
 Original version by Bruce Eckel
 Solves one part of the problem.
 "" "
 import sys, random, math
 from twisted.spread import pb
 from twisted.internet import reactor

 class Solver (pb.Root):

     def __init __ (self, id):
         print "solver.py% s: solver init"% id
         self.id = id

     def __str __ (self): # String representation
         return "Solver% s"% self.id

     def remote_initialize (self, initArg):
         return "% s initialized"% self

     def step (self, arg):
         print "solver.py% s: solver step"% self.id
         "Simulate work and return result"
         result = 0
         for i in range (random.randint (1,000,000, 3,000,000)):
             angle = math.radians (random.randint (0, 45))
             result + = math.tanh (angle) /math.cosh (angle)
         return "% s,% s, result:% .2f"% (self, str (arg), result)

     # Alias ​​methods, for demonstration version:
     remote_step1 = step
     remote_step2 = step
     remote_step3 = step

     def remote_status (self):
         print "solver.py% s: remote_status"% self.id
         Return "% s operational"% self

     def remote_terminate (self):
         print "solver.py% s: remote_terminate"% self.id
         reactor.callLater (0.5, reactor.stop)
         return "% s terminating ..."% self

 if __name__ == "__main__":
     port = int (sys.argv [1])
     reactor.listenTCP (port, pb.PBServerFactory (Solver (sys.argv [1])))
     reactor.run ()



Here is the controller.py . It is also copied from the original article, but I removed the Flex and created the start and terminate signals in the controller class. I'm not sure that this makes sense, but at least it allowed me to use the example normally. I also moved the terminate method from FlexInterface to Controller .
 "" "
 Controller.py
 Original version by Bruce Eckel
 Starts and manages solvers in separate processes for parallel processing.
 "" "
 import sys
 from subprocess import Popen
 from twisted.spread import pb
 from twisted.internet import reactor, defer

 START_PORT = 5566
 MAX_PROCESSES = 2

 class Controller (object):

     def broadcastCommand (self, remoteMethodName, arguments, nextStep, failureMessage):
         print "controller.py: broadcasting ..."
         deferreds = [solver.callRemote (remoteMethodName, arguments) 
                      for solver in self.solvers.values ​​()]
         print "controller.py: broadcasted"
         reactor.callLater (3, self.checkStatus)

         defer.DeferredList (deferreds, consumeErrors = True) .addCallbacks (
             nextStep, self.failed, errbackArgs = (failureMessage))
    
     def checkStatus (self):
         print "controller.py: checkStatus"
         for solver in self.solvers.values ​​():
             solver.callRemote ("status"). addCallbacks (
                 lambda r: sys.stdout.write (r + "\ n"), self.failed, 
                 errbackArgs = ("Status Check Failed"))
                                                     
     def failed (self, results, failureMessage = "Call Failed"):
         print "controller.py: failed"
         for (success, returnValue), (address, port) in zip (results, self.solvers):
             if not success:
                 raise Exception ("address:% s port:% d% s"% (address, port, failureMessage))

     def __init __ (self):
         print "controller.py: init"
         self.solvers = dict.fromkeys (
             [("localhost", i) for i in range (START_PORT, START_PORT + MAX_PROCESSES)])
         self.pids = [Popen (["python", "solver.py", str (port)]). pid
                      for ip, port in self.solvers]
         print "PIDS:", self.pids
         self.connected = False
         reactor.callLater (1, self.connect)

     def connect (self):
         print "controller.py: connect"
         connections = []
         for address port in self.solvers:
             factory = pb.PBClientFactory ()
             reactor.connectTCP (address, port, factory)
             connections.append (factory.getRootObject ())
         defer.DeferredList (connections, consumeErrors = True) .addCallbacks (
             self.storeConnections, self.failed, errbackArgs = ("Failed to Connect"))

         print "controller.py: starting parallel jobs"
         self.start ()

     def storeConnections (self, results):
         print "controller.py: storeconnections"
         for (success, solver), (address, port) in zip (results, self.solvers):
             self.solvers [address, port] = solver
         print "controller.py: Connected; self.solvers:", self.solvers
         self.connected = True

     def start (self):
         "controller.py: Begin the solving process"
         if not self.connected:
             return reactor.callLater (0.5, self.start)
         self.broadcastCommand ("step1", ("step 1"), self.step2, "Failed Step 1")

     def step2 (self, results):
         print "controller.py: step 1 results:", results
         self.broadcastCommand ("step2", ("step 2"), self.step3, "Failed Step 2")

     def step3 (self, results):
         print "controller.py: step 2 results:", results
         self.broadcastCommand ("step3", ("step 3"), self.collectResults, "Failed Step 3")

     def collectResults (self, results):
         print "controller.py: step 3 results:", results
         self.terminate ()
        
     def terminate (self):
         print "controller.py: terminate"
         for solver in self.solvers.values ​​():
             solver.callRemote ("terminate"). addErrback (self.failed, "Termination Failed")
         reactor.callLater (1, reactor.stop)
         return "Terminating remote solvers"

 if __name__ == "__main__":
     controller = Controller ()
     reactor.run ()



To run the program, put both files in one folder and run
 python controller.py


You should see how the load of two processors (if they, of course, you have 2 ;-)) rises to 100%. And here is the script output to the screen:
 controller.py: init
 PIDS: [12173, 12174]
 solver.py 5567: solver init
 solver.py 5566: solver init
 controller.py: connect
 controller.py: starting parallel jobs
 controller.py: storeconnections
 controller.py: Connected;  self.solvers: {('localhost', 5567):, ('localhost', 5566):}
 controller.py: broadcasting ...
 controller.py: broadcasted
 solver.py 5566: solver step
 solver.py 5567: solver step
 controller.py: checkStatus
 solver.py 5566: remote_status
 Solver 5566 operational
 solver.py 5567: remote_status
 controller.py: step 1 results: [(True, 'Solver 5567, step 1, result: 683825.75'), (True, 'Solver 5566, step 1, result: 543177.17')]
 controller.py: broadcasting ...
 controller.py: broadcasted
 Solver 5567 operational
 solver.py 5566: solver step
 solver.py 5567: solver step
 controller.py: checkStatus
 solver.py 5566: remote_status
 Solver 5566 operational
 solver.py 5567: remote_status
 controller.py: step 2 results: [(True, 'Solver 5567, step 2, result: 636793.90'), (True, 'Solver 5566, step 2, result: 335358.16')]
 controller.py: broadcasting ...
 controller.py: broadcasted
 Solver 5567 operational
 solver.py 5566: solver step
 solver.py 5567: solver step
 controller.py: checkStatus
 solver.py 5566: remote_status
 Solver 5566 operational
 solver.py 5567: remote_status
 controller.py: step 3 results: [(True, 'Solver 5567, step 3, result: 847386.43'), (True, 'Solver 5566, step 3, result: 512120.15')]
 controller.py: terminate
 Solver 5567 operational
 solver.py 5566: remote_terminate
 solver.py 5567: remote_terminate




Original

Source: https://habr.com/ru/post/97201/


All Articles