private static async Task SendSmsCommand(IBus busControl) { var command = new SendSmsCommand { CommandId = Guid.NewGuid(), PhoneNumber = "89031112233", Message = "Thank you for your reply" }; var endpoint = await busControl.GetSendEndpoint(AppSettings.CommandEndpoint); await endpoint.Send(command); }
public interface ISendSms { Guid CommandId { get; } string PhoneNumber { get; } string Message { get; } }
public interface ISmsSent { Guid EventId { get; } DateTime SentAtUtc { get; } }
cfg.ReceiveEndpoint(host, "play_with_masstransit_queue", e => e.LoadFrom(container));
cfg.ReceiveEndpoint(host, e=> e.LoadFrom(container));
public interface ICommand<T> { } public interface ISendSms<T> : ICommand<T> { Guid CommandId { get; } string PhoneNumber { get; } string Message { get; } } class SendSmsCommand : ISendSms<string> { public Guid CommandId { get; set; } public string PhoneNumber { get; set; } public string Message { get; set; } }
"messageType": [ "urn:message:PlayWithMassTransit30.Extension:SendSmsCommand", "urn:message:PlayWithMassTransit30.Contract.Command:ISendSms[[System:String]]", "urn:message:PlayWithMassTransit30.Contract.Command:ICommand[[System:String]]" ]
"host": { "machineName": "DESKTOP-SI9OHUR", "processName": "PlayWithMassTransit30.vshost", "processId": 1092, "assembly": "PlayWithMassTransit30", "assemblyVersion": "1.0.0.0", "frameworkVersion": "4.0.30319.42000", "massTransitVersion": "3.4.1.808", "operatingSystemVersion": "Microsoft Windows NT 6.2.9200.0" }
"message": { "commandId": "7388f663-82dc-403a-8bf9-8952f2ff262e", "phoneNumber": "89031112233", "message": "Thank you for your reply" }
public class SendSmsConsumer : IConsumer<ISendSms<string>> { public SendSmsConsumer(IBusControl busControl) { _busControl = busControl; } public async Task Consume(ConsumeContext<ISendSms<string>> context) { var message = context.Message; Console.WriteLine($"[IConsumer<ISendSms>] Send sms command consumed"); Console.WriteLine($"[IConsumer<ISendSms>] CommandId: {message.CommandId}"); Console.WriteLine($"[IConsumer<ISendSms>] Phone number: {message.PhoneNumber}"); Console.WriteLine($"[IConsumer<ISendSms>] Message: {message.Message}"); Console.Write(Environment.NewLine); Console.WriteLine(" : "); await _busControl.SmsSent(DateTime.UtcNow); } private readonly IBus _busControl; }
public static class BusExtensions { /// <summary> /// /// </summary> /// <param name="bus"></param> /// <param name="host"></param> /// <param name="phoneNumber"></param> /// <param name="message"></param> /// <returns></returns> public static async Task SendSms( this IBus bus, Uri host, string phoneNumber, string message ) { var command = new SendSmsCommand { CommandId = Guid.NewGuid(), PhoneNumber = phoneNumber, Message = message }; await bus.SendCommand(host, command); } /// <summary> /// /// </summary> /// <param name="bus"></param> /// <param name="sentAtUtc"></param> /// <returns></returns> public static async Task SmsSent( this IBus bus, DateTime sentAtUtc ) { var @event = new SmsSentEvent { EventId = Guid.NewGuid(), SentAtUtc = sentAtUtc }; await bus.PublishEvent(@event); } /// <summary> /// /// </summary> /// <typeparam name="T"></typeparam> /// <param name="bus"></param> /// <param name="address"></param> /// <param name="command"></param> /// <returns></returns> private static async Task SendCommand<T>(this IBus bus, Uri address, T command) where T : class { var endpoint = await bus.GetSendEndpoint(address); await endpoint.Send(command); } /// <summary> /// /// </summary> /// <typeparam name="T"></typeparam> /// <param name="bus"></param> /// <param name="message"></param> /// <returns></returns> private static async Task PublishEvent<T>(this IBus bus, T message) where T : class { await bus.Publish(message); } } class SendSmsCommand : ISendSms<string> { public Guid CommandId { get; set; } public string PhoneNumber { get; set; } public string Message { get; set; } } class SmsSentEvent : ISmsSent { public Guid EventId { get; set; } public DateTime SentAtUtc { get; set; } }
public static class UnityExtensions { public static void LoadFrom(this IReceiveEndpointConfigurator configurator, IUnityContainer container); }
public static IBusControl GetConfiguredFactory(IUnityContainer container) { if (container == null) { throw new ArgumentNullException(nameof(container)); } var control = Bus.Factory.CreateUsingRabbitMq(cfg => { var host = cfg.Host(AppSettings.Host, h => { }); // cfg.ReceiveEndpoint(host, e => e.LoadFrom(container)); cfg.ReceiveEndpoint(host, "play_with_masstransit_queue", e => e.LoadFrom(container)); }); control.ConnectConsumeObserver(new ConsumeObserver()); control.ConnectReceiveObserver(new ReceiveObserver()); control.ConnectConsumeMessageObserver(new ConsumeObserverSendSmsCommand()); control.ConnectSendObserver(new SendObserver()); control.ConnectPublishObserver(new PublishObserver()); return control; }
public class ConsumeObserver : IConsumeObserver { public Task PreConsume<T>(ConsumeContext<T> context) where T : class { Console.WriteLine($"[ConsumeObserver] PreConsume {context.MessageId}"); return Task.CompletedTask; } public Task PostConsume<T>(ConsumeContext<T> context) where T : class { Console.WriteLine($"[ConsumeObserver] PostConsume {context.MessageId}"); return Task.CompletedTask; } public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class { Console.WriteLine($"[ConsumeObserver] ConsumeFault {context.MessageId}"); return Task.CompletedTask; } }
Source: https://habr.com/ru/post/314080/
All Articles