📜 ⬆️ ⬇️

Implementing XMPP Publish / Subscribe via Twisted

Good day! In this article, I will talk about how to make a basic publish / subscribe implementation using XMPP using Twisted and the Wokkel library. XMPP supports pub / sub thanks to the XEP-0060 extension. Using pub / sub, you can solve the task of notifying all participants about the event and many others. It is reliably known that Apple uses a Wokkel-based pub / sub inside its notification server, but more on that later.


Why do I need publish / subscribe?


When developing a project, the following task arose: to build a client-server architecture in which clients perform an action on data stored on the server. The data is publicly available and it is necessary for the client to perform actions on the newest version of the data. I will explain a little how the system should work. The client subscribes to the service. When subscribing to a service, the client receives all data stored on the server. The client sends data to the server. The server saves the sent data and sends it to all subscribers. If the subscriber finds that the integrity of the data is compromised, it requests the server to send data, etc.

Instruments


The solution was found in the form of using XMPP and its extension XEP-0060. XMPP was chosen because of its popularity and rich features. In turn, Twisted was chosen as one of the most powerful network frameworks in python. In twisted.words there is some basic support for the xmpp protocol, but unfortunately, there is no support for the pub / sub extension. Wokkel was taken as a library that adds pub / sub support to Twisted. Wokkel supports not only pub / sub, but also ( service discovery ) and a couple of other useful extensions.
')

Example


Let's try to write a small example showing how to work with the Wokkel library. We will have a service and customers subscribed to updates from the service. Every 5 seconds, the client generates a message and sends it to the server. The server sends the message to all subscribed participants. Let's get started

First you need to create an object of class XMPPClient, which is a service (in Twisted terminology). He is responsible for logging in to Jabber server and managing the connection. An object of the PubSubClient or PubSubService class will provide us with the ability to interact at the pub / sub protocol level. Call setHandlerParent so that the protocol and service can communicate with each other. We start the service. The protocol starts listening to the stream for messages related to pub / sub, when receiving the above messages, calls the appropriate handlers. The next step is to look at the implementation of the Service class.

def main ( ) :
log. startLogging ( sys . stdout )
options, args = parse_args ( )
jid, resource, password = args
service = options. service
fulljid = jabber. jid . internJID ( jid + '/' + resource )
transport = XMPPClient ( fulljid, password )
transport. logTraffic = True
protocol = Client ( jabber. jid . internJID ( service ) ) if service else Service ( )
protocol. setHandlerParent ( transport )
transport. startService ( )
reactor. run ( )


In our case, the pub / sub service task is simple: take a message from the subscriber and send it to all other subscribers. The client begins interacting with the service by sending a subscription request. As soon as the service recognizes that the subscribe request has arrived, it calls the subscribe method. In this method, we need to create a subscription associated with the client and return it to the callback. We will not implement access models and believe that everyone can subscribe. When the subscriber sends us some data, the publish method will be called. In the sendData method, we send the incoming data to everyone who subscribes to our service. We now turn to the consideration of the client.

class Service ( PubSubService ) :
def __init__ ( self ) :
PubSubService. __init__ ( self )
self ._subscriptions = { }

def publish ( self , requestor, service, nodeIdentifier, items ) :
self . sendData ( items )
return defer. succeed ( None )

def subscribe ( self , requestor, service, nodeIdentifier, subscriber ) :
if subscriber in self ._subscriptions:
info = self ._subscriptions [ subscriber ]
else :
info = Subscription ( NODE_NAME, subscriber, 'subscribed' )
self ._subscriptions [ subscriber ] = info
return defer. succeed ( info )

def sendData ( self , items ) :
sendList = [ ]
for subscription in self ._subscriptions. values ( ) :
sendList. append ( [ subscription. subscriber , None , items ] )
self . notifyPublish ( self . parent . jid , NODE_NAME, sendList )


When creating a client, we give him the address of the service to which he will subscribe. When the service (XMPPClient) logged into the server jabber and the connection was established, the connectionInitialized method is called on all protocols attached to the service (XMPPClient). We will make sure that immediately after the connection is established, we subscribe to the service we need. After the subscription is successfully completed, we start an event that will occur every 4 seconds and will generate and send a message to the service. Transmitted data must be in a properly formatted XML. Actually that's all.

class Client ( PubSubClient ) :
def __init__ ( self , service ) :
PubSubClient. __init__ ( self )
self .__ service = service

def connectionInitialized ( self ) :
PubSubClient. connectionInitialized ( self )
d = self . subscribe ( self .__ service, NODE_NAME, self . parent . jid )
d. addCallback ( lambda success: task. LoopingCall ( sendGeneratedData, self ) )
d. addCallback ( lambda lc: lc. start ( 5 , now = True ) )

def itemsReceived ( self , event ) :
for item in event. items :
log. msg ( item. getAttribute ( 'sender' ) + 'sends' +
item. getAttribute ( 'message' ) + 'at' + item. getAttribute ( 'time' ) )

def sendData ( self , items ) :
self . publish ( self .__ service, NODE_NAME, items )

def sendGeneratedData ( protocol ) :
element = Element ( ( None , 'item' ) )
element. attributes [ 'sender' ] = protocol. parent . jid . full ( )
element. attributes [ 'time' ] = time. strftime ( "% H:% M:% S" , time. localtime ( ) )
element. attributes [ 'message' ] = 'Hello!'
protocol. sendData ( [ element ] )


View source code here.
To check the work, you need:
Start server: pubsub.py test@example.com server password
Start two clients: pubsub.py --service test@example.com/server test@example.com client1 password
pubsub.py --service test@example.com/server test@example.com client2 password

What does Apple have to do with it?


What we have just done is an example showing one of the hundreds of pub / sub possibilities. If you look at the specification, you can see that our example corresponds to the Leaf node type, with the configuration persistItems = False deliveryPayloads = True and an open access model, without support for affiliations, discovery, the possibility of creating other nodes, etc. Imagine how difficult it is to implement the entire pub / sub specification or even the necessary part. But Ralph Meijer , the author of wokkel, simplified our task and wrote Idavoll . Idavoll is an add-on over Wokkel. It almost completely supports XEP-0060 and has the ability to communicate over http. Idavoll is used by Apple as a notification server. This fact can be checked here.

Conclusion


To study Twisted, I recommend the official documentation and krondo.com/?page_id=1327
Unfortunately, there is very little information on Wokkel, but not on Idavoll at all, so only the generated documentation remains.
Good luck in learning!

Source: https://habr.com/ru/post/111681/


All Articles