This article provides guidelines for creating scalable, high-performance, and cost-effective messaging solutions based on Windows Azure queues. The document is intended for architects and developers of cloud solutions that use Windows Azure queues.

Introduction
Traditional queuing-based messaging solutions use a repository called
message queue . This is a repository of data received or transmitted by participants using the asynchronous exchange mechanism.
')
Queue-based data exchange is a solid foundation for creating a fault-tolerant scalable messaging architecture with support for a wide range of universal scenarios in a distributed computing environment. Regardless of the task (dispatching large volumes of work or reliable messaging), the technology of message queues provides first-class information exchange capabilities in accordance with the requirements of asynchronous data transfer.
This document describes the functionality of the Windows Azure platform, as well as how to use design patterns to create optimized and inexpensive queue-based messaging systems. The document contains a detailed overview of the basic methods of software implementation of the queue-based interaction in modern solutions for the Windows Azure platform, as well as recommendations for improving performance, increasing scalability and reducing operating costs.
Scenario
For greater clarity, we summarize the real scenario as follows. The supplier of SaaS solutions is commissioning a new billing system implemented as a Windows Azure application and serving the needs of the enterprise in processing a large number of customer transactions. The solution is based on moving workloads to the cloud and using the elasticity of the Windows Azure infrastructure to perform complex calculations.
The local element of the integrated infrastructure provides regular daily consolidation and scheduling of large transaction volumes for subsequent processing by a service hosted in the Windows Azure cloud environment. The volume of transactions transferred varies from several thousand to hundreds of thousands within one package, and the cumulative daily volume can reach several million transactions. This solution must meet the requirements of the service level agreement (SLA) in terms of ensuring the guaranteed maximum delay in data processing.
The solution architecture is based on a distributed system pattern called map-reduce and consists of many instances of cloud-based work roles that use Windows Azure queue storage to dispatch work. Transaction bursts are accepted by the
Process Initiator Worker role. They are then broken down into smaller work tasks that are transferred to Windows Azure queues for workload distribution.
The workload is processed by a large number of instances of the
Processing Worker role, which extract work operations from the queues, and perform computational procedures. In these instances of handlers, multi-threaded queue listeners are used for parallel data processing, ensuring maximum performance.
Processed work items are redirected to a dedicated queue, from which they are extracted by an instance of the
Process Controller role for aggregation and long-term storage in the warehouse for data mining and reporting.
The solution architecture is as follows:

The above diagram illustrates an example of a typical architecture used to scale large or complex computational loads. The queue-based messaging pattern implemented in this architecture is typical of many Windows Azure applications and services that need to exchange data with each other using queues. This allows a canonical approach to the study of the main components used for queuing-based messaging.
Queuing Basics
A standard messaging solution that supports the exchange of data between distributed components includes
publishers who place messages in a queue, as well as one or more
subscribers who receive these messages. In most cases, subscribers, sometimes referred to
as queue listeners , are implemented as single or multi-threaded applications that run continuously or run on demand, according to the user's schedule.
At a higher level, there are two main dispatch mechanisms that allow the listener of the queue to receive messages stored in it:
- Polling (polling-based query model) : A listener keeps track of a queue, checking for new messages in it at regular intervals. If the queue is empty, the listener continues to poll the queue, periodically turning into a sleep state.
- Switching (push-based model) : A listener subscribes to an event that the trigger is associated with (by the publisher or the queue service manager) that is triggered when a message is sent to the queue. The listener can initiate message processing, so he will not have to re-poll the queue to determine if new data is available.
It must be emphasized that the practical implementation of each of the mechanisms has its own characteristics. For example, a poll can be either blocking or nonblocking. The lock puts a hold request until a new message appears in the queue (or the waiting time expires), while non-blocking requests are executed immediately if the queue is empty. Using the switch model allows you to force notifications to listeners on the queue whenever the very first message arrives in an empty queue or when the queue depth reaches a certain value.
Note The queuing operation supported by the Windows Azure Queue Service API is non-blocking. This means that the
GetMessage or
GetMessages API methods will immediately exit if the message queue is empty. In contrast, the
Durable Message Buffers (DMB) buffers provided by the Windows Azure integration bus use blocking message retrieval operations, in which the calling thread is blocked until a message arrives in the DMB queue or a specified waiting period expires.
The following are the most common approaches to software implementation of Windows Azure queue listeners:
- The listener is implemented as an application component, an instance of which is created and executed as part of an instance of a working role.
- The life cycle of a role listener component is often tied to the execution time of an instance of a hosted role.
- The main processing logic is a loop through which messages are removed from the queues and sent for processing.
- If the queue of received messages is empty, the listener enters sleep mode, the duration of which is determined by the system shutdown algorithms and depends on the application.
- A program cycle for receiving messages is being performed; the queue is polled until the listener is notified to exit the loop and complete their work.
The following block diagram illustrates the standard implementation logic for a queue listener with a built-in polling mechanism in Windows Azure.
Note Discussion of more complex decision patterns, such as those requiring the use of a central queue manager (broker), is beyond the scope of this document.
Using a classic queue listener in conjunction with a polling mechanism is not the most optimal choice. The Windows Azure pricing model is based on counting the number of transactions within the repository based on the number of application requests to the queue, regardless of whether the queue is full. The following sections are devoted to discussing methods to maximize performance and minimize the cost of implementing messaging systems based on Windows Azure queues.
Recommendations for cost reduction, performance and scalability
This section describes design methods that allow for improved performance and improved scalability, as well as reducing the cost of a turnkey solution.
The system implementation pattern can be called a more effective solution only if it ensures the achievement of the following goals:
- Reducing operating costs by eliminating data exchange transactions with the repository that do not perform useful work.
- Eliminate unnecessary delays associated with the use of the polling interval when checking for new messages in the queue.
- Dynamic scaling (expansion and reduction) due to the adaptation of computing power to changing volumes of work.
The implementation template must perform these tasks without complicating the system, otherwise the benefits of its implementation will be negated.
Recommendations for optimizing transaction costs when communicating with the repository
In assessing the total cost of ownership (TCO) and return on investment (ROI) of a solution deployed on the Windows Azure platform, one of the most important variables of a TCO formula is the volume of transactions in data exchange with the storage. Reducing the number of data exchange transactions with Windows Azure queues allows you to reduce the operational costs associated with using Windows Azure solutions.
When implementing a queue-based messaging solution, developers can reduce the number of data exchange transactions with the repository.
- When sending messages to a queue, you can group related messages into one, larger package, compress and store the compressed image in the blob storage, and then use the queue to store a link to the blob with this data.
- When retrieving messages from a queue, you can combine several messages into one batch to conduct data exchange transactions with the repository. The GetMessages method, implemented in the Queue Service API, ensures that a specified number of messages are removed from a queue within a single transaction (see note below).
- When checking for the presence of work items in the queue, avoid using aggressive polling intervals and set a time delay that increases the polling interval of the queue if requests to it do not return data.
- Reduce the number of queue listeners — when using a query-based model, use only one listener for each of the role instances when the queue is empty. To nullify the number of listeners for each instance of a role, use a notification mechanism to create instances of listeners when queued work items are received.
- If the work queues remain empty most of the time, create a mechanism for automatically reducing the number of role instances that tracks system metrics to determine when the application needs to increase the number of role instances to handle the increased workload.
The above recommendations can be implemented as a general mechanism for processing message packets and encapsulating most of the basic operations for interacting with queues, blob storage and flow control. Next will be discussed how to implement such a mechanism.
Important information . When receiving messages using the
GetMessages method, the maximum size of the Queue Service API package for a remove from a queue operation is 32. Exceeding this value will result in a runtime exception.
Transaction costs in Windows Azure queues increase linearly as the number of queue service clients increases, for example, when scaling the number of role instances or as the number of queuing threads increases. To demonstrate the possible increase in costs during the implementation of the solution without taking into account the above recommendations, we will give an example with specific figures.
Impact of inefficient architecture on costs
Creation of the architecture of the above-described billing system, which does not contain optimization mechanisms, will lead to increased operational costs after the solution is deployed on the Windows Azure platform. This section describes the reasons for possible additional costs.
In accordance with the scenario definition, a software solution receives business transaction data at regular intervals. Suppose that this solution is occupied by processing the workload only 25% of the time of the standard eight-hour workday. As a result, 6 hours (8 hours * 0.75) fall on the “inactivity time” when the system is not processing transactions. Moreover, the solution does not receive data at all for 16 hours every day during non-working hours.
During the period of inactivity, which together totals 22 hours, the decision tries to remove work information from the queue without receiving notifications about the addition of new data to it. During this time, each individual deletion stream performs up to 79,200 transactions (22 hours * 60 minutes * 60 transactions per minute) related to the input queue with a default polling interval of one second.
As mentioned above, the pricing model of the Windows Azure platform services uses separate “storage transactions” as the base unit. A store transaction is a request from a user application to add, read, update, or delete store data. At the time of writing this technical document, the cost of storage transactions was $ 0.01 per 10,000 transactions.
UPDATE : at the time of publication of the transfer: $ 0.01 per
100,000 transactions.
Important information . When calculating the number of transactions associated with the queues, it must be remembered that placing one message in a queue is a single transaction, while receiving a message is often a two-step process that includes receiving a message and requesting that the message be removed from the queue. As a result, a successful operation to remove a message from the queue will require two repository transactions. Please note that even if the request to delete a message from the queue is not associated with receiving data, it is still considered a paid transaction.
Storage transactions created by a single data deletion stream in the queue described above will add approximately $ 2.38 to the monthly invoice for services ($ 79,200/10,000 * $ 0.01 * 30 days). 200 threads deleting messages from the queue (or one thread in 200 copies of the working role) will increase your monthly expenses by $ 457.2 (
UPDATE : if you make calculations at the time of publication of the translation of the article, then it is $ 45.7). These costs occur when the system does not perform any calculations, but only checks for the presence of work items in the queue. The above example is abstract because no one will implement the service in this way. You must use the following optimization techniques.
Recommendations for eliminating unnecessary delays
To optimize the performance of messaging systems based on Windows Azure queues, you can use the message processing level of the publishers and subscribers provided by the Windows Azure integration bus, as described below.
In this case, developers need to combine the mechanisms for polling and forcing real-time notifications to allow listeners to subscribe to a notification event (trigger) that occurs under certain conditions and indicates that a new workload has been placed in the queue. This approach allows you to create a standard queuing cycle at the level of publishers and subscribers to dispatch notifications.
In complex distributed systems, this approach requires the use of a “message bus” or “message processing middleware” to reliably send notifications to one or more subscribers. The Windows Azure integration bus is the best solution for messaging between loosely coupled distributed application services that are deployed not only in Windows Azure, but also locally. It is ideally suited for the implementation of the message bus architecture, which provides for the exchange of notifications between data transmission processes using queues.
The procedures for creating a messaging system using queues can use the following pattern:

The principles used to communicate between instances of Windows Azure roles when interacting between publishers and queue service subscribers meet most of the requirements for exchanging notifications based on forced data. The basic concepts of this process are discussed in one of our previous publications.
Important information . Using the Windows Azure integration bus is governed by a pricing scheme that takes into account two important components of this process. First, a fee is charged for receiving and sending data when exchanging with the data center. Secondly, a fee is charged for the number of connections established between the application and the integration bus infrastructure.
In this regard, it is important to analyze the costs and benefits to evaluate all the positive and negative aspects of using the integration bus for implementing a specific architecture. It should be evaluated whether the implementation of the dispatch level of notifications based on the integration bus will result in an actual cost reduction that can justify the investment in this project and the additional developer labor costs.
The negative impact of delays can be quite easily minimized by creating an additional level of messaging between publishers and subscribers. Additional cost reductions are achieved through dynamic (elastic) scaling, the implementation of which is described in the next section.
Dynamic scaling guidelines
The Windows Azure platform supports the ability to easily and quickly scale solutions of customers both upwards and downwards. The ability to adapt to workload and traffic fluctuations is one of the main advantages of this cloud computing platform. This means that the notion of “scalability” has ceased to be a term in the dictionary of IT professionals, and the support of scalability no longer requires excessive costs. Software implementation of this feature is available in any cloud solution with a well-designed architecture.
Dynamic scaling is a technical feature of a particular solution that allows you to adapt to a variable workload by increasing and decreasing the storage space and computing power available to the runtime environment. The Windows Azure platform contains built-in support for dynamic scaling using a distributed computing infrastructure that allows users to allocate the necessary power for an installed fee.
It is important to distinguish between two types of dynamic scaling supported by the Windows Azure platform:
- Scaling role instances is adding and removing additional instances of working roles or web roles to handle the current workload. This often requires changing the number of instances in the service configuration. The Windows Azure runtime responds to an increase in the number of instances by creating new instances, and a decrease in the number of instances results in the termination of some of the existing instances.
- Scaling processes (threads) is maintaining a sufficient level of capacity (the number of processing threads) for a specific role instance at the expense of increasing or decreasing the number of threads depending on the current workload.
Implementing dynamic scaling in a role-based messaging solution requires consideration of the following recommendations:
- Keep track of key performance indicators , including CPU usage, queue depth, response time, and latency in processing messages.
- Dynamically increase or decrease the number of worker role instances to handle peak workload, both expected and unpredictable.
- Programmatically increase and decrease the number of processing flows to adapt the system to various workload indicators.
- Perform a workload partitioning and parallel processing of small fragments using the .NET Framework 4 Task Parallel Library.
- Ensure the availability of computing power when managing solutions with a non-constant level of workload; this will allow to cope with a sudden increase in load, without making additional efforts to create additional copies.
Service Management APIs allow services hosted on the Windows Azure platform to increase or decrease the number of running role instances by changing the deployment configuration in the runtime environment.
Note By default, no more than 20 instances of Windows Azure computing operations are available for a standard subscription. This allows you to protect users of the Windows Azure platform from increasing the cost of maintenance if they accidentally try to create a very large number of role instances. This is the so-called “soft” restriction. Any application to increase this quota must be submitted to the Windows Azure Support Technical Team.
Dynamically scaling the number of role instances is not always the best way to handle dramatically increased workload. For example, a new instance of a virtual machine takes a few seconds to prepare for work, and currently, service level agreements do not include indicators related to the duration of this process. Instead, you can take a simpler path — increase the number of worker threads to cope with a temporary increase in workload. When processing a workload, it monitors its performance to identify situations that require a dynamic increase or decrease in the number of work processes.
Important information . Currently, the target value of the scalability metric for a single Windows Azure queue is limited to 500 transactions per second. If an application tries to exceed this threshold, for example, performing operations on a queue with several instances of the role, each of which has hundreds of object removal threads running, the storage service may return an HTTP error 503, "Server is busy." In the event that this error occurs in the application, a mechanism for the repetition of transactions should be implemented using an algorithm with an exponentially increasing delay time. However, if HTTP 503 errors occur regularly, it is recommended to use multiple queues and apply a segmentation strategy that will allow these queues to be used to scale the workload.
In most cases, automated scaling of workflows is performed by a separate instance of the role. Scaling role instances often requires developing a central element of the solution architecture that tracks performance indicators and takes measures to scale the system. The diagram below contains a description of a service component called the
Dynamic Scaling Agent (
Dynamic Scaling Agent ), which collects and analyzes data related to workload indicators to determine whether new instances are to be created or decommissioned inactive.

The scaling agent service can be deployed as a worker role on the Windows Azure platform or as a local service. Regardless of the deployment topology used, this service will have access to Windows Azure queues.
Having discussed the impact of delay time on scaling, transaction costs when exchanging data with the repository, and dynamic scaling requirements, we now turn to the practical implementation of these recommendations.
Technical implementation
The previous sections described the main features of a well-designed messaging architecture implemented using Windows Azure queue storage services. We looked at three key aspects of scaling: reducing data processing latency, optimizing data storage transaction costs, and improving response to workload instability.
This section is intended for developers of Windows Azure applications and contains a description of the software implementation of patterns.
Note. This section contains information about creating a queue listener with support for automatic scaling, as well as models based on queries and forcing data to be sent. For information on modern methods of dynamic scaling at the role instance level, refer to the projects implemented by the user community and published on the
MSDN Code Gallery website.
Creating a standard queue listener
First, create a contract that is implemented by the queue listener component, which is hosted in the worker role and is waiting for data transfer for the Windows Azure queue.
/// , Windows Azure. public interface ICloudQueueServiceWorkerRoleExtension { /// , . void StartListener(int threadCount); /// . CloudQueueListenerInfo QueryState(); /// Windows Azure. int DequeueBatchSize { get; set; } /// , . TimeSpan DequeueInterval { get; set; } /// , , . event WorkCompletedDelegate QueueEmpty; }
The
QueueEmpty event
is intended to be used by the node. It contains a mechanism that allows the node to control the queue listener when the queue is empty. The event delegate is defined as follows:
/// <summary> /// , /// . /// </summary> /// <param name="sender"> .</param> /// <param name="idleCount"> , .</param> /// <param name="delay">, .</param> /// <returns>, .</returns> public delegate bool WorkCompletedDelegate(object sender, int idleCount, out TimeSpan delay);
The processing of queue elements can be simplified by creating listeners with support for universal templates instead of using the built-in SDK classes, such as
CloudQueueMessage . Create a new queue listener interface with support for universal queue access patterns:
Note: we also allowed the listener with universal template support to transfer queue elements to one or more subscribers by implementing the Observer design pattern using the IObservable interface, available in the .NET Framework 4.
We intend to save a single instance of the component that implements the ICloudQueueListenerExtension interface. However, we need the ability to simultaneously run multiple threads (tasks) to remove messages from the queue. Therefore, we add support for a multi-threaded logic to remove messages from the queue to the listener component. To solve this problem, the Task Parallel Library (TPL) library of parallel data processing functions is used. The
StartListener method will allow us to create the required number of threads to remove messages from the queue:
/// <summary> /// . /// </summary> /// <param name="threadCount"> .</param> public void StartListener(int threadCount) { Guard.ArgumentNotZeroOrNegativeValue(threadCount, "threadCount"); // . if (this.dequeueTasks.IsAddingCompleted) { this.dequeueTasks = new BlockingCollection<Task>(this.dequeueTaskList); } for (int i = 0; i < threadCount; i++) { CancellationToken cancellationToken = this.cancellationSignal.Token; CloudQueueListenerDequeueTaskState<T> workerState = new CloudQueueListenerDequeueTaskState<T>(Subscriptions, cancellationToken, this.queueLocation, this.queueStorage); // , . this.dequeueTasks.Add(Task.Factory.StartNew(DequeueTaskMain, workerState, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)); } // , . this.dequeueTasks.CompleteAdding(); }
The
DequeueTaskMain method implements stream functions that remove messages from the queue. It supports the following basic operations:
/// <summary> /// Windows Azure. /// </summary> /// <param name="state">, .</param> private void DequeueTaskMain(object state) { CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state; int idleStateCount = 0; TimeSpan sleepInterval = DequeueInterval; try { // , . while (workerState.CanRun) { try { var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg; int messageCount = 0; // PLINQ. queueMessages.ForAll((message) => { // . idleStateCount = 0; // , . workerState.OnNext(message); // , . workerState.QueueStorage.Delete<T>(message); // . messageCount++; }); // , . if (0 == messageCount) { // , (, ). idleStateCount++; // , . if (QueueEmpty != null) { // , . if (QueueEmpty(this, idleStateCount, out sleepInterval)) { // , . break; } } // . Thread.Sleep(sleepInterval); } } catch (Exception ex) { if (ex is OperationCanceledException) { throw; } else { // . workerState.OnError(ex); // , . Thread.Sleep(sleepInterval); } } } } finally { workerState.OnCompleted(); } }
It is necessary to make a few explanations related to the implementation features of the
DequeueTaskMain method.
First, the Parallel LINQ (PLINQ) method is used to dispatch messages for further processing.
The main advantage of using PLINQ to solve this problem is to speed up the processing of messages due to the parallel use of the delegate in separate workflows on different processors whenever possible.
Note Internal query concurrency control is provided by PLINQ. There is no guarantee that the PLINQ system will use more than one kernel to support parallelization. If the PLINQ system detects the likelihood of a slow query execution due to the additional computational power required for parallelization, the query can be executed sequentially. To realize all the benefits of PLINQ, the cumulative workload for executing a query must be large enough to justify the use of additional computational power to manage the thread pool.
Secondly, we do not create separate requests to receive each specific message. Instead, the Queue Service APIs are used to retrieve a specified number of messages from the queue. The number of received messages is determined by the
DequeueBatchSize parameter, which is passed to the Get method. When accessing the data warehouse abstraction layer, this parameter is passed to the queue service API. In addition, a security check is carried out to ensure that the packet size does not exceed the maximum allowed for the API. Below is the software implementation of this approach:
, . ,
QueueEmpty , , .
QueueEmpty , , .
QueueEmpty « », .
QueueEmpty . -, , (
delay ). -, .
QueueEmpty , . . : ; , . , . , . , , .
helps reduce the number of storage transactions and reduce transaction costs , as discussed above. private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay) { // ICloudQueueServiceWorkerRoleExtension, . ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension; // , . IWorkItemProcessorConfigurationExtension config = Extensions.Find<IWorkItemProcessorConfigurationExtension>(); // . CloudQueueListenerInfo queueServiceState = queueService.QueryState(); // , . int deltaBackoffMs = 100; int minimumIdleIntervalMs = Convert.ToInt32(config.Settings.MinimumIdleInterval.TotalMilliseconds); int maximumIdleIntervalMs = Convert.ToInt32(config.Settings.MaximumIdleInterval.TotalMilliseconds); // . int delta = (int)((Math.Pow(2.0, (double)idleCount) - 1.0) * (new Random()).Next((int)(deltaBackoffMs * 0.8), (int)(deltaBackoffMs * 1.2))); int interval = Math.Min(minimumIdleIntervalMs + delta, maximumIdleIntervalMs); // . delay = TimeSpan.FromMilliseconds((double)interval); // , // . , . return delay.TotalMilliseconds >= maximumIdleIntervalMs && queueServiceState.ActiveDequeueTasks > 1; }
The automated scaling mechanism can be described as follows:- As soon as a message appears in the queue, the task of deleting messages from the queue immediately processes the workload. Requests to remove packet messages from the queue are transmitted without delay.
- After the initial queue is empty, each of the tasks to remove messages from the queue raises a QueueEmpty event.
- The QueueEmpty event handler calculates a random exponential delay between sending requests and sends the task of removing messages from the queue to suspend operations for a specified interval.
- , .
- , , . , .
- , . , , , .
. -, , . .
/// , . public struct CloudQueueListenerInfo { /// Windows Azure. public int CurrentQueueDepth { get; internal set; } /// , . public int ActiveDequeueTasks { get; internal set; } /// , . public int TotalDequeueTasks { get; internal set; } }
-, , (. ).
/// . public CloudQueueListenerInfo QueryState() { return new CloudQueueListenerInfo() { CurrentQueueDepth = this.queueStorage.GetCount(this.queueLocation.QueueName), ActiveDequeueTasks = (from task in this.dequeueTasks where task.Status != TaskStatus.Canceled && task.Status != TaskStatus.Faulted && task.Status != TaskStatus.RanToCompletion select task).Count(), TotalDequeueTasks = this.dequeueTasks.Count }; }
, . .
, :
/// <summary> /// , , . /// </summary> /// <param name="sender"> .</param> public delegate void WorkDetectedDelegate(object sender); ICloudQueueServiceWorkerRoleExtension, , , ( ): public interface ICloudQueueServiceWorkerRoleExtension { // ... , . . ... /// , , . event WorkDetectedDelegate QueueWorkDetected; }
, .
QueueWorkDetected ,
DequeueTaskMain , :
public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T> { // , , . public event WorkDetectedDelegate QueueWorkDetected; private void DequeueTaskMain(object state) { CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state; int idleStateCount = 0; TimeSpan sleepInterval = DequeueInterval; try { // , . while (workerState.CanRun) { try { var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg; int messageCount = 0; // , . if (idleStateCount > 0 && queueMessages.Count() > 0) { if (QueueWorkDetected != null) { QueueWorkDetected(this); } } // ... , . . ...
QueueWorkDetected . , . . , , :
public class WorkItemProcessorWorkerRole : RoleEntryPoint { // Windows Azure . public override sealed bool OnStart() {
In the light of the above example, it is necessary to clarify the use of the GetOptimalDequeueTaskCount method . This method is responsible for calculating the number of queuing tasks for processing the workload. When calling, it is necessary to determine (using any suitable decision-making mechanisms) how much computing power is required for the queue listener to handle the expected workload.For example, a developer can follow the most simple path and implement a set of static rules directly in the GetOptimalDequeueTaskCount method . . , , , .
:
/// <summary> /// , . /// </summary> /// <param name="currentDepth"> .</param> /// <returns> .</returns> private int GetOptimalDequeueTaskCount(int currentDepth) { if (currentDepth < 100) return 10; if (currentDepth >= 100 && currentDepth < 1000) return 50; if (currentDepth >= 1000) return 100; // . return 1; }
. — , .
, ( ). , . , ,
QueueWorkDetected .
.
Implement publisher and subscriber level to remove from the queue with zero delay
In this section, we will supplement the above implementation of the queue listener with a push notification mechanism created on the basis of a unidirectional multicast stream for the integration bus. This notification mechanism handles a trigger event, which is a signal for the queue listener to start working on removing items from the queue. This approach eliminates the need to poll the queue when checking for new messages and eliminates the effect of the delay factor .Create a trigger event received by the listener of the queue when a new workload appears in it: /// -, . [DataContract(Namespace = WellKnownNamespace.DataContracts.Infrastructure)] public class CloudQueueWorkDetectedTriggerEvent { /// , . [DataMember] public string StorageAccount { get; private set; } /// , . [DataMember] public string QueueName { get; private set; } /// (, ). [DataMember] public long PayloadSize { get; private set; } // ... ... }
Let the queue listener implementations act as subscribers receiving a trigger event. The first step is to assign the queue listener as an observer of the CloudQueueWorkDetectedTriggerEvent event :
The second step will be the implementation of the OnNext method defined in the IObserver interface. This method is called by the supplier to notify the observer about the new event: public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T> { // ... ... /// <summary> /// /// </summary> /// <param name="e">-, .</param> public void OnNext(CloudQueueWorkDetectedTriggerEvent e) { Guard.ArgumentNotNull(e, "e"); // , - , ; , . if (this.queueLocation.StorageAccount == e.StorageAccount && this.queueLocation.QueueName == e.QueueName) { if (QueueWorkDetected != null) { QueueWorkDetected(this); } } } // ... ... }
, , .
QueueWorkDetected .
CloudQueueWorkDetectedTriggerEvent .
, . , , , . ,
QueueEmpty :
private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay) {
Thus, we no longer determine whether there is one active removal task from the queue. The modified QueueEmpty event handler takes into account only the fact that the maximum inactivity interval has been exceeded, after which all active removal tasks from the queue will be completed.To receive notifications from CloudQueueWorkDetectedTriggerEvent , a publisher and subscriber model is used, which is implemented as a loosely coupled messaging model between instances of Windows Azure roles . In essence, we enable the level of data exchange between roles and handle incoming events as follows: public class InterRoleEventSubscriberExtension : IInterRoleEventSubscriberExtension { // ... . . , Windows Azure Customer Advisory Team ... public void OnNext(InterRoleCommunicationEvent e) { if (this.owner != null && e.Payload != null) {
-,
CloudQueueWorkDetectedTriggerEvent , , , . . - , :
public class ProcessInitiatorWorkerRole : RoleEntryPoint { // , . private volatile IInterRoleCommunicationExtension interRoleCommunicator; // ... . . , Windows Azure Customer Advisory Team ... private void HandleWorkload() { // 1. , . // ... ( ) ... // 2. . // ... ( ) ... // 3. . // -, , . var trigger = new CloudQueueWorkDetectedTriggerEvent("MyStorageAccount", "InputQueue"); // . var interRoleEvent = new InterRoleCommunicationEvent(CloudEnvironment.CurrentRoleInstanceId, trigger); // . interRoleCommunicator.Publish(interRoleEvent); } }
, , , . , Windows Azure.
Conclusion
« — » Windows Azure, .
:
- Windows Azure .
- , 500 .
- Windows Azure ; .
- , .
- .
- Before using the integration bus to prepare for sending real-time notifications , you need to evaluate the benefits and costs.
Developers should consider the following:- Designing a messaging solution requires batch processing when writing procedures for storing and retrieving data from Windows Azure queues.
- Implementing an effective queue-listening service requires the creation of a mechanism that polls empty queues using a single thread to remove items from the queues .
- Use dynamic scaling to reduce the number of worker role instances if the queues remain empty for a long time.
- , .
- , , .
- Windows Azure , .
- Windows Azure , .
- .NET Framework 4, TPL, PLINQ «», , .