📜 ⬆️ ⬇️

Mail collector (making simple things difficult)

As a preface


Probably, many of you in your practice faced the task of collecting mail from a number of mailboxes. Why it may be necessary? Probably because it is a universal mechanism for exchanging data between systems. Many libraries for any languages ​​that implement SMTP, POP3, IMAP, ready-made solutions for the implementation of the stack of messages (as I called the mailbox is difficult ...), etc.

It is not surprising that many integration tasks are realized through mail. Then a service enters into the business, which is able to quickly pick up, categorize and carry out the necessary actions.

To whom the following code is sufficient, they may not read further:
')
foreach (var mailbox in mailboxes) using (var client = new Pop3Client()) { client.Connect(Hostname, Port, false); client.Authenticate(User, Password); var count = client.GetMessageCount(); for (var i = 0; i < count; i++) { Mail = client.GetMessage(i + 1); var cat = SortMail(Mail); DoSomething(Mail, cat); } } 


What do we do


Immediately make a number of assumptions:
1) Collect mail for multiple systems. Maybe in the future for a few more. And yet ... In general, the solution should be universal;
2) There will probably be a lot of mail - it follows from point 1 (otherwise I would not write this post);
3) Mail will have to be parsed;
4) All service boxes - users do not climb there.

What will we use


The system should work 24/7, so we will implement it as a Windows Service. For these purposes, I propose to immediately use TopShelf .

Of course, everything must be parallelized. This is where my favorite TPL DataFlow library comes onto the scene.

We will pick up mail on POP3. All the “fancy things” of IMAP in this task are superfluous - it is necessary as soon as possible and easier to pick up the source of the letter and delete it on the server. POP3 is enough for the eyes. We use OpenPop.NET .

As already mentioned, the mail will parse. Maybe through Regex, can custom logic ... you never know what. That is why you need to flexibly and quickly push up new rules using plug-ins. This is where the Managed Extensibility Framework helps.

Logs write via NLog .

As an optional fix monitoring in Zabbix . (We are going to work 24/7 and give out a vaunted speed - you need to follow this).

Go


We create the usual console application. Open the NuGet console and install all the necessary packages:

 Install-Package Nlog Install-Package OpenPop.NET Install-Package TopShelf Install-Package Microsoft.TPL.DataFlow 

Go to the project folder, create App.Debug.config and App.Release.config. We unload the project from the studio, open its code (hereafter, TopCrawler.csproj). In the section with the config add:

Configurations
  <None Include="App.Debug.config"> <DependentUpon>App.config</DependentUpon> </None> <None Include="App.Release.config"> <DependentUpon>App.config</DependentUpon> </None> 


And below its own target for MSBuild:

Transform target
 <UsingTask TaskName="TransformXml" AssemblyFile="$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)\Web\Microsoft.Web.Publishing.Tasks.dll" /> <Target Name="AfterCompile" Condition="Exists('App.$(Configuration).config')"> <!--Generate transformed app config in the intermediate directory--> <TransformXml Source="App.config" Destination="$(IntermediateOutputPath)$(TargetFileName).config" Transform="App.$(Configuration).config" /> <!--Force build process to use the transformed configuration file from now on.--> <ItemGroup> <AppConfigWithTargetPath Remove="App.config" /> <AppConfigWithTargetPath Include="$(IntermediateOutputPath)$(TargetFileName).config"> <TargetPath>$(TargetFileName).config</TargetPath> </AppConfigWithTargetPath> </ItemGroup> </Target> 


Personally, I used it in this way - in the old way - to add a transformation of configs for the separation of environments.
For convenience, I suggest strongly-type configs. A separate class will read the configuration. (On the theoretical aspects of such a decision, you can talk in the comments). Configs, logs, monitoring is an excellent reason to implement the Singleton pattern.

Create a folder of the same name in the project (there must be an order). Inside we create 3 classes - Config, Logger, Zabbix. Our logger:

Logger
 static class Logger { public static NLog.Logger Log { get; private set; } public static NLog.Logger Archive { get; private set; } static Logger() { Log = LogManager.GetLogger("Global"); Archive = LogManager.GetLogger("Archivator"); } } 


Monitoring using Zabbix deserves a separate post, so I’ll just leave here the class that implements the agent:

Zabbix
 namespace TopCrawler.Singleton { /// <summary> /// Singleton: zabbix sender class /// </summary> static class Zabbix { public static ZabbixSender Sender { get; private set; } static Zabbix() { Sender = new ZabbixSender(Config.ZabbixServer, Config.ZabbixPort); } } struct ZabbixItem { public string Host; public string Key; public string Value; } class ZabbixSender { internal struct SendItem { // ReSharper disable InconsistentNaming - Zabbix is case sensitive public string host; public string key; public string value; public string clock; // ReSharper restore InconsistentNaming } #pragma warning disable 0649 internal struct ZabbixResponse { public string Response; public string Info; } #pragma warning restore 0649 #region --- Constants --- public const string DefaultHeader = "ZBXD\x01"; public const string SendRequest = "sender data"; public const int DefaultTimeout = 10000; #endregion #region --- Fields --- private readonly DateTime _dtUnixMinTime = DateTime.SpecifyKind(new DateTime(1970, 1, 1), DateTimeKind.Utc); private readonly int _timeout; private readonly string _zabbixserver; private readonly int _zabbixport; #endregion #region --- Constructors --- public ZabbixSender(string zabbixserver, int zabbixport) : this(zabbixserver, zabbixport, DefaultTimeout) { } public ZabbixSender(string zabbixserver, int zabbixport, int timeout) { _zabbixserver = zabbixserver; _zabbixport = zabbixport; _timeout = timeout; } #endregion #region --- Methods --- public string SendData(ZabbixItem itm) { return SendData(new List<ZabbixItem>(1) { itm }); } public string SendData(List<ZabbixItem> lstData) { try { var serializer = new JavaScriptSerializer(); var values = new List<SendItem>(lstData.Count); values.AddRange(lstData.Select(itm => new SendItem { host = itm.Host, key = itm.Key, value = itm.Value, clock = Math.Floor((DateTime.Now.ToUniversalTime() - _dtUnixMinTime).TotalSeconds).ToString(CultureInfo.InvariantCulture) })); var json = serializer.Serialize(new { request = SendRequest, data = values.ToArray() }); var header = Encoding.ASCII.GetBytes(DefaultHeader); var length = BitConverter.GetBytes((long)json.Length); var data = Encoding.ASCII.GetBytes(json); var packet = new byte[header.Length + length.Length + data.Length]; Buffer.BlockCopy(header, 0, packet, 0, header.Length); Buffer.BlockCopy(length, 0, packet, header.Length, length.Length); Buffer.BlockCopy(data, 0, packet, header.Length + length.Length, data.Length); using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) { socket.Connect(_zabbixserver, _zabbixport); socket.Send(packet); //Header var buffer = new byte[5]; ReceivData(socket, buffer, 0, buffer.Length, _timeout); if (DefaultHeader != Encoding.ASCII.GetString(buffer, 0, buffer.Length)) throw new Exception("Invalid header"); //Message length buffer = new byte[8]; ReceivData(socket, buffer, 0, buffer.Length, _timeout); var dataLength = BitConverter.ToInt32(buffer, 0); if (dataLength == 0) throw new Exception("Invalid data length"); //Message buffer = new byte[dataLength]; ReceivData(socket, buffer, 0, buffer.Length, _timeout); var response = serializer.Deserialize<ZabbixResponse>(Encoding.ASCII.GetString(buffer, 0, buffer.Length)); return string.Format("Response: {0}, Info: {1}", response.Response, response.Info); } } catch (Exception e) { return string.Format("Exception: {0}", e); } } private static void ReceivData(Socket pObjSocket, byte[] buffer, int offset, int size, int timeout) { var startTickCount = Environment.TickCount; var received = 0; do { if (Environment.TickCount > startTickCount + timeout) throw new TimeoutException(); try { received += pObjSocket.Receive(buffer, offset + received, size - received, SocketFlags.None); } catch (SocketException ex) { if (ex.SocketErrorCode == SocketError.WouldBlock || ex.SocketErrorCode == SocketError.IOPending || ex.SocketErrorCode == SocketError.NoBufferSpaceAvailable) Thread.Sleep(30); else throw; } } while (received < size); } #endregion } } 


Configs ... It's time to do something interesting. First of all, we will keep the boxes in the configs that we are polling. Secondly settings DataFlow. I suggest this:

Configs
  <CredentialsList> <credentials hostname="8.8.8.8" username="popka@example.com" password="123" port="110" type="fbl" /> <credentials hostname="8.8.8.8" username="kesha@example.com" password="123" port="110" type="bounce" /> </CredentialsList> <DataFlowOptionsList> <datablockoptions name="_sortMailDataBlock" maxdop="4" boundedcapacity="4" /> <datablockoptions name="_spamFilterDataBlock" maxdop="4" boundedcapacity="4" /> <datablockoptions name="_checkBounceDataBlock" maxdop="16" boundedcapacity="16" /> <datablockoptions name="_identifyDataBlock" maxdop="16" boundedcapacity="16" /> <datablockoptions name="_addToCrmDataBlock" maxdop="16" boundedcapacity="16" /> <datablockoptions name="_addToFblDataBlock" maxdop="16" boundedcapacity="16" /> <datablockoptions name="_addToBounceDataBlock" maxdop="16" boundedcapacity="16" /> </DataFlowOptionsList> 


So, the host and port are connected, the user and password - everything is clear. Next is the box type. Suppose the service is used by marketing (as well as other departments). They have mailboxes where auto-mailing lists are being dumped, as well as FBL spam reports. The box itself already categorizes the letter, so for such situations we immediately specify the type of the box. With the DataFlow settings, it will be clear further when we start creating objects. Here we will have our own sections in the config. A lot of manuals how to do it, so just show the result:

Determine Types
  #region --- Types --- static class MailboxType { public const string Bo = "bo"; public const string Crm = "crm"; public const string Fbl = "fbl"; public const string Bounce = "bounce"; } class MailboxInfo { public string Type { get; set; } public string Hostname { get; set; } public string User { get; set; } public string Password { get; set; } public int Port { get; set; } } class DataBlockOptions { public int Maxdop { get; set; } public int BoundedCapacity { get; set; } public DataBlockOptions() { Maxdop = 1; BoundedCapacity = 1; } } #endregion 


Create sections
  /// <summary> /// Custom config section /// </summary> public class CustomSettingsConfigSection : ConfigurationSection { [ConfigurationProperty("CredentialsList")] public CredentialsCollection CredentialItems { get { return base["CredentialsList"] as CredentialsCollection; } } [ConfigurationProperty("DataFlowOptionsList")] public DataBlockOptionsCollection DataFlowOptionsItems { get { return base["DataFlowOptionsList"] as DataBlockOptionsCollection; } } } 


Learning to read the values ​​from these sections.
  /// <summary> /// Custom collection - credentials list /// </summary> [ConfigurationCollection(typeof(CredentialsElement), AddItemName = "credentials")] public class CredentialsCollection : ConfigurationElementCollection, IEnumerable<CredentialsElement> { protected override ConfigurationElement CreateNewElement() { return new CredentialsElement(); } protected override object GetElementKey(ConfigurationElement element) { return ((CredentialsElement)element).Username; } public CredentialsElement this[int index] { get { return BaseGet(index) as CredentialsElement; } } public new IEnumerator<CredentialsElement> GetEnumerator() { for (var i = 0; i < Count; i++) { yield return BaseGet(i) as CredentialsElement; } } } /// <summary> /// Custom credentials item /// </summary> public class CredentialsElement : ConfigurationElement { [ConfigurationProperty("hostname", DefaultValue = "")] public string Hostname { get { return base["hostname"] as string; } } [ConfigurationProperty("username", DefaultValue = "", IsKey = true)] public string Username { get { return base["username"] as string; } } [ConfigurationProperty("password", DefaultValue = "")] public string Password { get { return base["password"] as string; } } [ConfigurationProperty("type", DefaultValue = "")] public string Type { get { return base["type"] as string; } } [ConfigurationProperty("port", DefaultValue = "")] public string Port { get { return base["port"] as string; } } } /// <summary> /// Custom collection - DataBlock options list /// </summary> [ConfigurationCollection(typeof(DataBlockOptionsElement), AddItemName = "datablockoptions")] public class DataBlockOptionsCollection : ConfigurationElementCollection, IEnumerable<DataBlockOptionsElement> { protected override ConfigurationElement CreateNewElement() { return new DataBlockOptionsElement(); } protected override object GetElementKey(ConfigurationElement element) { return ((DataBlockOptionsElement)element).Name; } public CredentialsElement this[int index] { get { return BaseGet(index) as CredentialsElement; } } public new IEnumerator<DataBlockOptionsElement> GetEnumerator() { for (var i = 0; i < Count; i++) { yield return BaseGet(i) as DataBlockOptionsElement; } } } /// <summary> /// Custom DataBlock options item /// </summary> public class DataBlockOptionsElement : ConfigurationElement { [ConfigurationProperty("name", DefaultValue = "", IsKey = true)] public string Name { get { return base["name"] as string; } } [ConfigurationProperty("maxdop", DefaultValue = "")] public string Maxdop { get { return base["maxdop"] as string; } } [ConfigurationProperty("boundedcapacity", DefaultValue = "")] public string BoundedCapacity { get { return base["boundedcapacity"] as string; } } } 


I will not write the full implementation of the config, it is assumed that in the development process the necessary parameters will be added there.

Our custom settings read as follows:

We read
  public List<MailboxInfo> CredentialsList { get; private set; } public Dictionary<string, DataBlockOptions> DataFlowOptionsList { get; private set; } ... static Config() { try { var customConfig = (CustomSettingsConfigSection)ConfigurationManager.GetSection("CustomSettings"); //Get mailboxes foreach (var item in customConfig.CredentialItems) CredentialsList.Add(new MailboxInfo { Hostname = item.Hostname, Port = Convert.ToInt32(item.Port), User = item.Username, Type = item.Type, Password = item.Password }); //Get DataFlow settings foreach (var item in customConfig.DataFlowOptionsItems) DataFlowOptionsList.Add(item.Name, new DataBlockOptions { Maxdop = Convert.ToInt32(item.Maxdop), BoundedCapacity = Convert.ToInt32(item.BoundedCapacity) }); } catch (Exception ex) { Logger.Log.Fatal("Error at reading config: {0}", ex.Message); throw; } } 


Somehow it is very prolonged, and we have not even reached the most interesting.

For now, let's lower the binding from TopShelf, performance counters, communication with the database and get down to business! Create a class Crawler - core. First, read the mail:

  private volatile bool _stopPipeline; ... public void Start() { do { var getMailsTasks = _config.CredentialsList.Select(credentials => Task.Run(() => GetMails(credentials))).ToList(); foreach (var task in getMailsTasks) task.Wait(); Thread.Sleep(2000); } while (!_stopPipeline); //Stop pipeline - wait for completion of all endpoints //   DataFlow  if (_stopPipeline) Logger.Log.Warn("Pipeline has been stopped by user"); } 

It was here that laziness took its toll and I decided not to bother - if the boxes are on the order of 20-30, you can launch a task under each one and not sweat about the number of threads. (I authorize to shower tomatoes.)

Go to the very reading:

  private void GetMails(MailboxInfo info) { try { using (var client = new Pop3Client()) { 

Immediately we calculate the access timings to the box - it is useful for diagnosing the network and server load.

  //Get Zabbix metrics var stopwatch = new Stopwatch(); stopwatch.Start(); //Get mail count client.Connect(info.Hostname, info.Port, false); client.Authenticate(info.User, info.Password); stopwatch.Stop(); 

We send data to Zabbix. Everything is simple - we specify the host name (as entered in Zabbix), the key (again, strictly, as in Zabbix) and the string value.

  //Send it to Zabbix Zabbix.Sender.SendData(new ZabbixItem { Host = Config.HostKey, Key = info.Type + Config.TimingKey, Value = stopwatch.ElapsedMilliseconds.ToString() }); Logger.Log.Debug("Send [{0}] timing to Zabbix: connected to '{1}' as '{2}', timing {3}ms", info.Type, info.Hostname, info.User, stopwatch.ElapsedMilliseconds); var count = client.GetMessageCount(); if (count == 0) return; Logger.Log.Debug("We've got new {0} messages in '{1}'", count, info.User); //Send messages to sorting block for (var i = 0; i < count; i++) { try { var mailInfo = new MessageInfo { IsSpam = false, Mail = client.GetMessage(i + 1), Type = MessageType.UNKNOWN, Subtype = null, Recipient = null, Mailbox = info }; Logger.Log.Debug("Download message from '{0}'. Size: {1}b", info.User, mailInfo.Mail.RawMessage.Length); 

The DataFlow pipeline will be created when creating the Crawler class. We consider that our first stage is to sort the letter.

  while (!_sortMailDataBlock.Post(mailInfo)) Thread.Sleep(500); 

You see, how simple - the conveyor itself is one. All task reading mail throw messages there one by one. If the block is busy, Post will return false and we will just wait until it is free. The current then continues to work at this time. This is what I call concurrency without worries.

The message went to the conveyor, now you can save it with peace of mind in the RAW archive (yes, yes, we read everything we save into a file archive. The support service will thank us later).

Set up, for example, archive rotation:

NLog.config
  <targets> <!-- add your targets here --> <target name="logfile" xsi:type="File" fileName="${basedir}\logs\${shortdate}-message.log" /> <target name="Archivefile" xsi:type="File" fileName="${basedir}\archive\${shortdate}-archive.dat" /> </targets> 


Then you can set logStash on him, but that's another story ...

  //Save every mail to archive Logger.Log.Debug("Archive message"); Logger.Archive.Info(Functions.MessageToString(mailInfo.Mail)); } catch (Exception ex) { Logger.Log.Error("Parse email error: {0}", ex.Message); Functions.ErrorsCounters[info.Type].Increment(); //Archive mail anyway Logger.Log.Debug("Archive message"); Logger.Archive.Info(Encoding.Default.GetString(client.GetMessageAsBytes(i + 1))); } if (_config.DeleteMail) client.DeleteMessage(i + 1); if (_stopPipeline) break; } Logger.Log.Debug("Done with '{0}'", info.User); } } catch (Exception ex) { Logger.Log.Error("General error - type: {0}, message: {1}", ex, ex.Message); Functions.ErrorsCounters[info.Type].Increment(); } } 

Here we used static error counters (in terms of mailbox types), where ErrorsCounters is:

 public static Dictionary<string, Counter> ErrorsCounters = new Dictionary<string, Counter>(); 

And the meters themselves can be done like this:

Counter.cs
 class Counter { private long _counter; public Counter() { _counter = 0; } public void Increment() { Interlocked.Increment(ref _counter); } public long Read() { return _counter; } public long Refresh() { return Interlocked.Exchange(ref _counter, 0); } public void Add(long value) { Interlocked.Add(ref _counter, value); } public void Set(long value) { Interlocked.Exchange(ref _counter, value); } } 


We turn to the creation of the pipeline. Suppose we have mailboxes where auto-replies pour in. Such letters should be parsed (what kind of auto-answer, from whom, on what list, etc.) and put the result in the repository (DB). Suppose there are boxes where FBL reports fall. Such letters immediately add up to the database. We consider all other letters “useful” - they should be checked for spam and sent to an external system, for example, CRM.

As you already understood, this example mainly considers the use of a collector for marketing tasks - collecting statistics on mail delivery, information about spam.

So, we decided on the workflow. We declare the necessary blocks in the Crawler class:

 class MessageInfo { public bool IsSpam { get; set; } public Message Mail { get; set; } public string Subtype { get; set; } public string Recipient { get; set; } public MessageType Type { get; set; } public MailboxInfo Mailbox { get; set; } } class Crawler { //Pipeline private TransformBlock<MessageInfo, MessageInfo> _sortMailDataBlock; private TransformBlock<MessageInfo, MessageInfo> _spamFilterDataBlock; private TransformBlock<MessageInfo, MessageInfo> _checkBounceDataBlock; private TransformBlock<MessageInfo, MessageInfo> _identifyDataBlock; private ActionBlock<MessageInfo> _addToCrmDataBlock; private ActionBlock<MessageInfo> _addToFblDataBlock; private ActionBlock<MessageInfo> _addToBounceDataBlock; ... 

We create an initialization method and create pipeline blocks (to initialize the blocks, we use our wonderful sections from configs):

  public void Init() { //*** Create pipeline *** //Create TransformBlock to get message type var blockOptions = _config.GetDataBlockOptions("_sortMailDataBlock"); _sortMailDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => SortMail(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create TransformBlock to filter spam blockOptions = _config.GetDataBlockOptions("_spamFilterDataBlock"); _spamFilterDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => FilterSpam(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create TransformBlock to sort bounces blockOptions = _config.GetDataBlockOptions("_checkBounceDataBlock"); _checkBounceDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => BounceTypeCheck(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create TransformBlock to identify bounce owner blockOptions = _config.GetDataBlockOptions("_identifyDataBlock"); _identifyDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => GetRecipient(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create ActionBlock to send mail to CRM blockOptions = _config.GetDataBlockOptions("_addToCrmDataBlock"); _addToCrmDataBlock = new ActionBlock<MessageInfo>(mail => AddToCrm(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create ActionBlock to send FBL to MailWH blockOptions = _config.GetDataBlockOptions("_addToFblDataBlock"); _addToFblDataBlock = new ActionBlock<MessageInfo>(mail => AddToFbl(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create ActionBlock to send Bounce to MailWH blockOptions = _config.GetDataBlockOptions("_addToBounceDataBlock"); _addToBounceDataBlock = new ActionBlock<MessageInfo>(mail => AddToBounce(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); 

We assemble the conveyor in accordance with our scheme:

  //*** Build pipeline *** _sortMailDataBlock.LinkTo(_spamFilterDataBlock, info => info.Type == MessageType.GENERAL); _sortMailDataBlock.LinkTo(_addToFblDataBlock, info => info.Type == MessageType.FBL); _sortMailDataBlock.LinkTo(_checkBounceDataBlock, info => info.Type == MessageType.BOUNCE); _sortMailDataBlock.LinkTo(DataflowBlock.NullTarget<MessageInfo>(), info => info.Type == MessageType.UNKNOWN); /*STUB*/ _checkBounceDataBlock.LinkTo(_identifyDataBlock); _identifyDataBlock.LinkTo(_addToBounceDataBlock); _spamFilterDataBlock.LinkTo(_addToCrmDataBlock, info => !info.IsSpam); _spamFilterDataBlock.LinkTo(DataflowBlock.NullTarget<MessageInfo>(), info => info.IsSpam); /*STUB*/ 

As you can see, everything is extremely simple - we associate the block with the following (with the possibility of setting the communication condition). All blocks are executed in parallel. Each block has a degree of parallelism and capacity (with the help of capacity, you can adjust the queue before the block, that is, the block has received the message but does not process it yet). Thus, you can set a high degree of parallelism for "complex" and long operations, such as parsing the contents of the letter.

I will not describe the DataFlow materiel, it is better to read everything in the original TPL DataFlow source .

Next, set the exit rules from the block:

  _sortMailDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_spamFilterDataBlock).Fault(t.Exception); else _spamFilterDataBlock.Complete(); }); _sortMailDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_addToFblDataBlock).Fault(t.Exception); else _addToFblDataBlock.Complete(); }); _sortMailDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_checkBounceDataBlock).Fault(t.Exception); else _checkBounceDataBlock.Complete(); }); _spamFilterDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_addToCrmDataBlock).Fault(t.Exception); else _addToCrmDataBlock.Complete(); }); _checkBounceDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_identifyDataBlock).Fault(t.Exception); else _identifyDataBlock.Complete(); }); _identifyDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_addToBounceDataBlock).Fault(t.Exception); else _addToBounceDataBlock.Complete(); }); } 

Everything, in fact, the pipeline is already working, you can post messages to it. It remains only to stop it by adding our Start method:

Start
  public void Start() { do { var getMailsTasks = _config.CredentialsList.Select(credentials => Task.Run(() => GetMails(credentials))).ToList(); foreach (var task in getMailsTasks) task.Wait(); Thread.Sleep(2000); } while (!_stopPipeline); //Stop pipeline - wait for completion of all endpoints _sortMailDataBlock.Complete(); _addToCrmDataBlock.Completion.Wait(); _addToFblDataBlock.Completion.Wait(); _addToBounceDataBlock.Completion.Wait(); if (_stopPipeline) Logger.Log.Warn("Pipeline has been stopped by user"); } 


Go to the delegates.
Sorting ... Well, let's say everything is simple with us (we always have time to complicate things):

  private MessageInfo SortMail(MessageInfo mail) { switch (mail.Mailbox.Type) { case MailboxType.Crm: mail.Type = MessageType.GENERAL; break; case MailboxType.Bounce: mail.Type = MessageType.BOUNCE; break; case MailboxType.Fbl: mail.Type = MessageType.FBL; break; } return mail; } 

Spam filter This is homework - use SpamAssassin .
Here is the delegate:

  private MessageInfo FilterSpam(MessageInfo mail) { //TODO: Add SpamAssassin logic return mail; } 

And classes for working with the SpamAssassin API ( link to the project ).
And we turn to the parsing of letters. Parsim we auto-reply. Here comes the MEF case.
We create a project (dll) with interfaces for our plug-ins (Let's call the Interfaces).
Add an interface:

  public interface ICondition { string Check(Message mimeMessage); } public interface IConditionMetadata { Type Type { get; } } 

And ... everything. Our TopCrawler depends on this project and the project with plugins will also use it.
Create a new project (also dll), let's call Conditions.
Add types of auto-responses:

  #region --- Types --- static class BounceType { public const string Full = "BounceTypeFull"; public const string Timeout = "BounceTypeTimeout"; public const string Refused = "BounceTypeRefused"; public const string NotFound = "BounceTypeNotFound"; public const string Inactive = "BounceTypeInactive"; public const string OutOfOffice = "BounceTypeOutOfOffice"; public const string HostNotFound = "BounceTypeHostNotFound"; public const string NotAuthorized = "BounceTypeNotAuthorized"; public const string ManyConnections = "BounceTypeManyConnections"; } #endregion 

And the classes that implement our interface:

  [Export(typeof(ICondition))] [ExportMetadata("Type", typeof(ConditionNotFound1))] public class ConditionNotFound1 : ICondition { public string Check(Message mimeMessage) { if (!mimeMessage.MessagePart.IsMultiPart) return null; const string pattern = "Diagnostic-Code:.+smtp.+550"; var regexp = new Regex(pattern, RegexOptions.IgnoreCase); return mimeMessage.MessagePart.MessageParts.Any(part => part.ContentType.MediaType == "message/delivery-status" && regexp.IsMatch(part.GetBodyAsText())) ? BounceType.NotFound : null; } } ... [Export(typeof(ICondition))] [ExportMetadata("Type", typeof(ConditionTimeout2))] public class ConditionTimeout2 : ICondition { return BounceType.Timeout; } ... 

As you can see, it's all about attributes. .
:

  class Crawler { ... //Plugins [ImportMany] public IEnumerable<Lazy<ICondition, IConditionMetadata>> BounceTypeConditions { get; set; } private void LoadPlugins() { try { var container = new CompositionContainer(new DirectoryCatalog(_config.PluginDirectory), true); container.ComposeParts(this); } catch (Exception ex) { Logger.Log.Error("Unable to load plugins: {0}", ex.Message); } } ... 

LoadPlugins . — .

Bounce. , — :

  private MessageInfo BounceTypeCheck(MessageInfo mailInfo) { try { foreach (var condition in BounceTypeConditions) { var res = condition.Value.Check(mailInfo.Mail); if (res == null) continue; mailInfo.Subtype = res; Logger.Log.Debug("Bounce type condition [{0}] triggered for message [{1}]", condition.Metadata.Type, mailInfo.Mail.Headers.MessageId); break; } } catch (Exception ex) { Logger.Log.Error("Failed to determine bounce type for message '{0}': {1}", mailInfo.Mail.Headers.MessageId, ex.Message); Logger.ErrorsCounters[MailboxType.Bounce].Increment(); } return mailInfo; } 

, — ! — ( , ) .

. For example:

  private void AddToBounce(MessageInfo mail) { try { MailWH.BounceAdd(mail); Functions.ProcessedCounters[MailboxType.Bounce].Increment(); Functions.Log.Debug("Send Bounce to MailWH"); } catch (Exception ex) { Functions.Log.Error("Error saving Bounce message '{0}' to MailWH: {1}", mail.Mail.Headers.MessageId, ex.Message); Functions.ErrorsCounters[MailboxType.Bounce].Increment(); } } 

BounceAdd
  public static long BounceAdd(MessageInfo message) { using (var conn = new SqlConnection(ConnectionString)) using (var cmd = new SqlDataAdapter("BounceAdd", conn)) { var body = message.Mail.FindFirstPlainTextVersion() == null ? message.Mail.FindFirstHtmlVersion().GetBodyAsText() : message.Mail.FindFirstPlainTextVersion().GetBodyAsText(); var outId = new SqlParameter("@ID", SqlDbType.BigInt) { Direction = ParameterDirection.Output }; cmd.SelectCommand.CommandType = CommandType.StoredProcedure; cmd.SelectCommand.Parameters.Add(new SqlParameter("@RawMessage", message.Mail.RawMessage)); cmd.SelectCommand.Parameters.Add(new SqlParameter("@Message", body)); cmd.SelectCommand.Parameters.Add(new SqlParameter("@Subject", message.Mail.Headers.Subject ?? "")); cmd.SelectCommand.Parameters.Add(new SqlParameter("@MessageID", message.Mail.Headers.MessageId ?? "")); cmd.SelectCommand.Parameters.Add(new SqlParameter("@AddressTo", message.Mail.Headers.To[0].Address ?? "")); cmd.SelectCommand.Parameters.Add(new SqlParameter("@AddressFrom", message.Mail.Headers.From.Address ?? "")); cmd.SelectCommand.Parameters.Add(new SqlParameter("@DateRecieved", DateTime.Now)); cmd.SelectCommand.Parameters.Add(new SqlParameter("@BounceTypeSysName", (object)message.Subtype ?? DBNull.Value)); cmd.SelectCommand.Parameters.Add(new SqlParameter("@SourceFrom", (object)message.Recipient ?? DBNull.Value)); // TODO: Add ListId support cmd.SelectCommand.Parameters.Add(new SqlParameter("@ListId", DBNull.Value)); cmd.SelectCommand.Parameters.Add(outId); conn.Open(); cmd.SelectCommand.ExecuteNonQuery(); return outId.Value as long? ?? 0; } } 


, TopShelf — .

findings


, . — DataFlow-, . , DataFlow ( ). TopShelf , .

… , , Continious Integration, VS Release Management .

Source: https://habr.com/ru/post/262613/


All Articles