Good day! In this article, I will talk about how to make a basic
publish / subscribe implementation using XMPP using
Twisted and the
Wokkel library. XMPP supports pub / sub thanks to the
XEP-0060 extension. Using pub / sub, you can solve the task of notifying all participants about the event and many others. It is reliably known that Apple uses a Wokkel-based pub / sub inside its notification server, but more on that later.
Why do I need publish / subscribe?
When developing a project, the following task arose: to build a client-server architecture in which clients perform an action on data stored on the server. The data is publicly available and it is necessary for the client to perform actions on the newest version of the data. I will explain a little how the system should work. The client subscribes to the service. When subscribing to a service, the client receives all data stored on the server. The client sends data to the server. The server saves the sent data and sends it to all subscribers. If the subscriber finds that the integrity of the data is compromised, it requests the server to send data, etc.
Instruments
The solution was found in the form of using XMPP and its extension XEP-0060. XMPP was chosen because of its popularity and rich features. In turn, Twisted was chosen as one of the most powerful network frameworks in python. In twisted.words there is some basic support for the xmpp protocol, but unfortunately, there is no support for the pub / sub extension. Wokkel was taken as a library that adds pub / sub support to Twisted. Wokkel supports not only pub / sub, but also (
service discovery ) and a couple of other useful extensions.
')
Example
Let's try to write a small example showing how to work with the Wokkel library. We will have a service and customers subscribed to updates from the service. Every 5 seconds, the client generates a message and sends it to the server. The server sends the message to all subscribed participants. Let's get started
First you need to create an object of class XMPPClient, which is a service (in Twisted terminology). He is responsible for logging in to Jabber server and managing the connection. An object of the PubSubClient or PubSubService class will provide us with the ability to interact at the pub / sub protocol level. Call setHandlerParent so that the protocol and service can communicate with each other. We start the service. The protocol starts listening to the stream for messages related to pub / sub, when receiving the above messages, calls the appropriate handlers. The next step is to look at the implementation of the Service class.
def main ( ) :
log. startLogging ( sys . stdout )
options, args = parse_args ( )
jid, resource, password = args
service = options. service
fulljid = jabber. jid . internJID ( jid + '/' + resource )
transport = XMPPClient ( fulljid, password )
transport. logTraffic = True
protocol = Client ( jabber. jid . internJID ( service ) ) if service else Service ( )
protocol. setHandlerParent ( transport )
transport. startService ( )
reactor. run ( )
In our case, the pub / sub service task is simple: take a message from the subscriber and send it to all other subscribers. The client begins interacting with the service by sending a subscription request. As soon as the service recognizes that the subscribe request has arrived, it calls the subscribe method. In this method, we need to create a subscription associated with the client and return it to the callback. We will not implement access models and believe that everyone can subscribe. When the subscriber sends us some data, the publish method will be called. In the sendData method, we send the incoming data to everyone who subscribes to our service. We now turn to the consideration of the client.
class Service ( PubSubService ) :
def __init__ ( self ) :
PubSubService. __init__ ( self )
self ._subscriptions = { }
def publish ( self , requestor, service, nodeIdentifier, items ) :
self . sendData ( items )
return defer. succeed ( None )
def subscribe ( self , requestor, service, nodeIdentifier, subscriber ) :
if subscriber in self ._subscriptions:
info = self ._subscriptions [ subscriber ]
else :
info = Subscription ( NODE_NAME, subscriber, 'subscribed' )
self ._subscriptions [ subscriber ] = info
return defer. succeed ( info )
def sendData ( self , items ) :
sendList = [ ]
for subscription in self ._subscriptions. values ( ) :
sendList. append ( [ subscription. subscriber , None , items ] )
self . notifyPublish ( self . parent . jid , NODE_NAME, sendList )
When creating a client, we give him the address of the service to which he will subscribe. When the service (XMPPClient) logged into the server jabber and the connection was established, the connectionInitialized method is called on all protocols attached to the service (XMPPClient). We will make sure that immediately after the connection is established, we subscribe to the service we need. After the subscription is successfully completed, we start an event that will occur every 4 seconds and will generate and send a message to the service. Transmitted data must be in a properly formatted XML. Actually that's all.
class Client ( PubSubClient ) :
def __init__ ( self , service ) :
PubSubClient. __init__ ( self )
self .__ service = service
def connectionInitialized ( self ) :
PubSubClient. connectionInitialized ( self )
d = self . subscribe ( self .__ service, NODE_NAME, self . parent . jid )
d. addCallback ( lambda success: task. LoopingCall ( sendGeneratedData, self ) )
d. addCallback ( lambda lc: lc. start ( 5 , now = True ) )
def itemsReceived ( self , event ) :
for item in event. items :
log. msg ( item. getAttribute ( 'sender' ) + 'sends' +
item. getAttribute ( 'message' ) + 'at' + item. getAttribute ( 'time' ) )
def sendData ( self , items ) :
self . publish ( self .__ service, NODE_NAME, items )
def sendGeneratedData ( protocol ) :
element = Element ( ( None , 'item' ) )
element. attributes [ 'sender' ] = protocol. parent . jid . full ( )
element. attributes [ 'time' ] = time. strftime ( "% H:% M:% S" , time. localtime ( ) )
element. attributes [ 'message' ] = 'Hello!'
protocol. sendData ( [ element ] )
View source code
here.To check the work, you need:
Start server:
pubsub.py test@example.com server passwordStart two clients:
pubsub.py --service test@example.com/server test@example.com client1 passwordpubsub.py --service test@example.com/server test@example.com client2 passwordWhat does Apple have to do with it?
What we have just done is an example showing one of the hundreds of pub / sub possibilities. If you look at the specification, you can see that our example corresponds to the Leaf node type, with the configuration persistItems = False deliveryPayloads = True and an open access model, without support for affiliations, discovery, the possibility of creating other nodes, etc. Imagine how difficult it is to implement the entire pub / sub specification or even the necessary part. But
Ralph Meijer , the author of wokkel, simplified our task and wrote
Idavoll . Idavoll is an add-on over Wokkel. It almost completely supports XEP-0060 and has the ability to communicate over http. Idavoll is used by Apple as a notification server. This fact can be checked
here.Conclusion
To study Twisted, I recommend the official documentation and
krondo.com/?page_id=1327Unfortunately, there is very little information on Wokkel, but not on Idavoll at all, so only the generated documentation remains.
Good luck in learning!