Hangfire is a library for .net (core) that allows you to execute some code asynchronously according to the principle of "fire and forget". An example of such a code could be sending E-Mail, video processing, synchronization with another system, etc. In addition to "fire and forget" there is support for deferred tasks, as well as scheduled tasks in the Cron format.
Currently, there are a lot of similar libraries. A few advantages in favor of Hangfire:
I won't go into too much detail, since there are quite a few good articles about Hangfire and how to use it. In this article I will explain how to use the support of several queues (or task pools), how to fix the standard retry-functionality and make sure that each queue has an individual configuration.
Important note: in the title, I used the term pseudo-turn, because Hangfire does not guarantee the execution of tasks in a specific order. Those. The principle of "First In First Out" does not work and we will not rely on it. Moreover, the author of the library recommends making the tasks idempotent, i.e. steady to unforeseen multiple execution. Further I will simply use the word "turn", since Hangfire uses the term "Queue".
Hangfire has simple queue support. Although it does not offer the flexibility of Message Queue Systems, such as rabbitMQ or Azure Service Bus, this is often enough to solve a wide range of tasks.
Each task has a property "Queue", that is, the name of the queue in which it should run. By default, the task is sent to the queue with the name "default", unless otherwise specified. Support for multiple queues is needed in order to separately manage the execution of tasks of different types. For example, we may want video processing tasks to go to the "video_queue" queue, and send E-Mails to the "email_queue" queue. In this way, we are able to independently perform these two types of tasks. If we want to bring the video processing to a dedicated server, we can easily do this by running a separate Hangfire server as a console application that will process the "video_queue" queue.
Setting up a hangfire server in asp.net core is as follows:
public void Configure(IApplicationBuilder app) { app.UseHangfireServer(new BackgroundJobServerOptions { WorkerCount = 2, Queues = new[] { "email_queue", "video_queue" } }); }
As I mentioned above, there is a default queue in Hangfire called "default". If the queued task, for example, "video_queue", failed and needs to be replayed, then it will be sent to the "default" queue, not "video_queue", and, as a result, our task will not be completed the instance of the Hangfire server we would like, if at all. This behavior was established by me experimentally and is probably a bug in the Hangfire itself.
Hangfire provides us with the possibility of expanding functionality with the help of so-called filters ( Job Filters ), which are similar in principle to Actions Filters in ASP.NET MVC. The fact is that the internal logic of the Hangfire is implemented as a State Machine. This is the engine that alternately translates tasks in a pool from one state to another (for example, created -> enqueued -> processing -> succeeded), and filters allow us to "intercept" the task being performed each time its state changes and to manipulate it. A filter is implemented as an attribute that can be applied to a particular method, class, or globally.
As an argument, an ElectStateContext object is passed to the filter method. This object contains complete information about the currently running task. Among other things, it has the methods GetJobParameter <> (...) and SettJobParameter <> (...). Job Parameters allow you to store task-related information in a database. It is in the Job Parameters that the name of the queue to which the task was originally sent is stored, only for some reason this information is ignored during the next repetition.
So, we have a task that failed with an error and must be sent for re-execution to the right queue (the same one that was assigned to it at the time of the initial creation). The repetition of a task that completed with an error is a transition from the state “failed” to the state “enqueued”. To solve the problem, create a filter that, when the task goes to the "enqueued" state, checks to which queue the task was initially sent and set the "QueueName" parameter to the required value:
public class HangfireUseCorrectQueueFilter : JobFilterAttribute, IElectStateFilter { public void OnStateElection(ElectStateContext context) { if (context.CandidateState is EnqueuedState enqueuedState) { var queueName = context.GetJobParameter<string>("QueueName"); if (string.IsNullOrWhiteSpace(queueName)) { context.SetJobParameter("QueueName", enqueuedState.Queue); } else { enqueuedState.Queue = queueName; } } } }
In order to apply the default filter to all tasks (that is, globally), add the following code to our configuration:
GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });
Another small snag is that the default GlobalJobFilters collection contains an instance of the AutomaticRetryAttribute class. This is a standard filter that is responsible for re-execution of failed tasks. It also sends the task to the "default" queue, ignoring the original queue. In order for our bike to go, you need to remove this filter from the collection and allow our filter to take responsibility for re-performing tasks. As a result, the configuration code will look like this:
var defaultRetryFilter = GlobalJobFilters.Filters .FirstOrDefault(f => f.Instance is AutomaticRetryAttribute); if (defaultRetryFilter != null && defaultRetryFilter.Instance != null) { GlobalJobFilters.Filters.Remove(defaultRetryFilter.Instance); } GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });
It should be noted that AutomaticRetryAttribute implements the logic of automatic increase in the interval between attempts (the interval increases with each subsequent attempt), and removing AutomaticRetryAttribute from the GlobalJobFilters collection, we discard this functionality (see implementation of the ScheduleAgainLater method)
So, we have achieved that our tasks can be performed in different queues and this allows us to independently manage their execution, including processing different queues on different machines. Only now we don’t know how many times and at what intervals our tasks will be repeated in case of an error, because we removed the AutomaticRetryAttribute from the filter collection.
We want to be able to configure the interval and the number of repetitions separately for each queue, and also, if we have not explicitly specified values ​​for any queue, we want the default values ​​to be applied. To do this, we will implement another filter and call it HangfireRetryJobFilter
.
Ideally, the configuration code should look something like this:
GlobalJobFilters.Filters.Add(new HangfireRetryJobFilter { Order = 2, ["email_queue"] = new HangfireQueueSettings { DelayInSeconds = 120, RetryAttempts = 3 }, ["video_queue"] = new HangfireQueueSettings { DelayInSeconds = 60, RetryAttempts = 5 } });
To do this, we first add the HangfireQueueSettings
class, which will serve as a container for our settings.
public sealed class HangfireQueueSettings { public int RetryAttempts { get; set; } public int DelayInSeconds { get; set; } }
Then we add the implementation of the filter itself, which, when the tasks are executed again after the error, will apply the settings depending on the queue configuration and monitor the number of retries:
public class HangfireRetryJobFilter : JobFilterAttribute, IElectStateFilter, IApplyStateFilter { private readonly HangfireQueueSettings _defaultQueueSettings = new HangfireQueueSettings { RetryAttempts = 3, DelayInSeconds = 10 }; private readonly IDictionary<string, HangfireQueueSettings> _settings = new Dictionary<string, HangfireQueueSettings>(); public HangfireQueueSettings this[string queueName] { get { return _settings.TryGetValue(queueName, out HangfireQueueSettings queueSettings) ? queueSettings : _defaultQueueSettings; } set { _settings[queueName] = value; } } public void OnStateElection(ElectStateContext context) { if (!(context.CandidateState is FailedState failedState)) { // This filter accepts only failed job state. return; } var retryAttempt = context.GetJobParameter<int>("RetryCount") + 1; var queueName = context.GetJobParameter<string>("QueueName"); if (retryAttempt <= this[queueName].RetryAttempts) { ScheduleAgainLater(context, retryAttempt, failedState, queueName); } else { TransitionToDeleted(context, failedState, queueName); } } public void OnStateApplied( ApplyStateContext context, IWriteOnlyTransaction transaction) { if (context.NewState is ScheduledState && context.NewState.Reason != null && context.NewState.Reason.StartsWith("Retry attempt")) { transaction.AddToSet("retries", context.BackgroundJob.Id); } } public void OnStateUnapplied( ApplyStateContext context, IWriteOnlyTransaction transaction) { if (context.OldStateName == ScheduledState.StateName) { transaction.RemoveFromSet("retries", context.BackgroundJob.Id); } } private void ScheduleAgainLater( ElectStateContext context, int retryAttempt, FailedState failedState, string queueName) { context.SetJobParameter("RetryCount", retryAttempt); var delay = TimeSpan.FromSeconds(this[queueName].DelayInSeconds); const int maxMessageLength = 50; var exceptionMessage = failedState.Exception.Message.Length > maxMessageLength ? failedState.Exception.Message.Substring(0, maxMessageLength - 1) + "…" : failedState.Exception.Message; // If attempt number is less than max attempts, we should // schedule the job to run again later. var reason = $"Retry attempt {retryAttempt} of {this[queueName].RetryAttempts}: {exceptionMessage}"; context.CandidateState = delay == TimeSpan.Zero ? (IState)new EnqueuedState { Reason = reason } : new ScheduledState(delay) { Reason = reason }; } private void TransitionToDeleted( ElectStateContext context, FailedState failedState, string queueName) { context.CandidateState = new DeletedState { Reason = this[queueName].RetryAttempts > 0 ? "Exceeded the maximum number of retry attempts." : "Retries were disabled for this job." }; } }
Note to the code: when implementing theHangfireRetryJobFilter
class, theAutomaticRetryAttribute
class fromHangfireRetryJobFilter
was taken as the basis, so the implementation of some methods partially coincides with the corresponding methods of this class.
I managed to find two ways to assign a queue to a task: documented and not.
1st method - hang the corresponding attribute on the method
[Queue("video_queue")] public void SomeMethod() { } BackgroundJob.Enqueue(() => SomeMethod());
http://docs.hangfire.io/en/latest/background-processing/configuring-queues.html
2nd way (undocumented) - use the class BackgroundJobClient
var client = new BackgroundJobClient(); client.Create(() => MyMethod(), new EnqueuedState("video_queue"));
The advantage of the second method is that it does not create unnecessary dependencies on Hangfire and allows you to decide in the process of execution which task the queue should go to. Unfortunately, in the official documentation, I did not find a mention of the BackgroundJobClient
class and how to use it. The second method I used in my decision, so that it is tested in practice.
In this article, we used the support of multiple queues in the Hangfire to separate the processing of different types of tasks. We implemented our own mechanism for repeating failed tasks with the possibility of an individual configuration for each queue, expanding Hangfire functionality using Job Filters, and also learned how to send tasks for execution to the required queue.
I hope this article will be useful to someone. I would welcome comments.
Hangfire Documentation
Hangfire source code
Scott Hanselman - How to Run Background Tasks in ASP.NET
Source: https://habr.com/ru/post/434364/
All Articles