AspNetCore&MassTransit Courier實現分布式事務的詳細過程
目錄
- 分布式事務
- Saga模式
- 執行過程
- 恢復策略
- 協作方式
- 編排式(Orchestrator)
- 協同式(Choreography)
- MassTransit Courier
- 補償服務
- 服務建立
- 服務配置
- 服務編排
- 執行請求
- 執行成功
- 執行補償
- 參考文獻
在之前的一篇博文中,CAP框架可以方便我們實現非實時、異步場景下的最終一致性,而有些用例總是無法避免的需要在實時、同步場景下進行,可以借助Saga事務來解決這一困擾。在一些博文和倉庫中也搜尋到了.Net下實現Saga模式的解決方案MassTransit,這就省得自己再造輪子了。
分布式事務
分布式系統中,分布式事務是一個不能避免的問題,如何保證不同節點間的數據一致性。舉個常見的例子,下訂單、減庫存、扣余額,三者在單個節點時,可以借助本地事務,實現要么成功要么失敗。而當三者處于不同節點時,又參雜了如網絡環境、節點自身環境、服務環境等各種因素,使得三個節點想要實現要么成功、要么失敗就增加了許多困難。
CAP理論和BASE理論很好的詮釋了這一問題,也有了許多的解決分布式事務的方案,如2PC、3PC、TCC、本地消息表、Saga等一系列解決方案,面對不同場景、不同要求等可選擇不同的解決方案。
在之前提到過一個基于本地消息表的CAP框架,借助最終一致性很方便的解決了異步非實時請求下的分布式事務,而對于大部分場景雖然可以直接或者妥協方式使用著異步非實時,如同步實時場景的下訂單且減庫存變更到異步非實時場景的下訂單后發事件減庫存,但是總有那么一些場景,不得不去考慮同步實時請求下的分布式事務。
Saga模式
Saga模式又叫做長時間運行事務(Long-running-transaction), 由普林斯頓大學的 Hector Garcia-Molina和Kenneth Salem 1987年發表的論文《Sagas》。核心思想是將長事務拆分為多個本地短事務,通過保證所有短事務的成功或失敗來決定整體的成功或失敗,由Saga事務協調器協調管理,所有節點執行成功,則成功,如有節點失敗,則反向執行前置節點的補償操作。
- 每個Saga事務由一系列冪等的有序子事務(sub-transaction) Ti 組成。
- 每個Ti 都有對應的冪等補償動作Ci,補償動作用于撤銷Ti造成的結果。
執行過程
當正常執行時,依照T1、T2、T3三個短事務正常執行下去,直到最后一個Tn事務執行完畢,宣告整個事務的成功。
而當執行到某個Tj出現故障時,則反向補償之前的Tj-1..T1,每個對應的補償操作Cj-1...C1,其中Tj事務由于在執行階段就已失敗,所以Tj對應的補償動作Cj不需要執行,即也確定了最后一個Tn事務可以不設置補償動作Cn。
恢復策略
- 向前恢復(forward recovery):對于Ti事務的執行,部分場景下可能因為數據庫的連接、網絡的波動等導致短暫的失敗,對Ti事務重試執行,以確保整個事務的執行,如執行T1, T2, T3,當執行T3失敗時,不直接宣告失敗,對T3執行重試以排除部分不穩定因素,如在若干次重試無效后,再考慮向后恢復。
- 向后恢復(backward recovery):按照執行順序方式作為向前的指向,則向后為反向補償,對已執行過的節點順序倒退執行各Ti的補償動作Ci,也就是把走過的路往回走,對執行過的操作執行業務上的反操作,如正向流程執行減庫存則補償操作時執行加庫存。
協作方式
對于服務與服務間的協作,我們通常有兩種模式:Orchestration(編排式) 和 Choreography(協同式),在Saga模式中也有著這兩種的實現。
- 編排式(Orchestrator):把 Saga 的決策和執行順序邏輯集中在一個 Saga 編排器類中。Saga 編排器發出命令式消息給各個 Saga 參與方,指示這些參與方服務完成具體操作(本地事務)。
- 協同式(Choreography):把 Saga 的決策和執行順序邏輯分布在 Saga 的每個參與方中,它們通過交換事件的方式來進行溝通。
編排式與協同式的差異僅在于服務之間的協作方式,每個參與服務的接口定義卻沒有任何區別。
編排式(Orchestrator)
編排式的 Saga 需要開發人員定義一個編排器類,用于編排一個Saga中多個參與服務執行的流程。如果整個業務流程正常結束,業務就成功完成,一旦這個過程的任何環節出現失敗,Saga編排器類就會以相反的順序調用補償操作,重新進行業務回滾。
對于每個參與的服務而言,需要做的事情是
- 訂閱并處理命令消息
- 執行命令后返回響應消息
- 設計執行邏輯和補償邏輯
以提交訂單為例,假設場景是分布式系統下,進程間以消息傳遞進行通信:
1、事務發起方的主業務邏輯請求預先定義好的Saga編排器類(內部編排了執行順序)。
2、Saga編排器類向MQ發送減庫存事件,庫存服務訂閱事件、執行處理并返回MQ處理結果。
3、Saga編排器類向MQ發送減余額事件,支付服務訂閱事件、執行處理并返回MQ處理結果。
4、Saga編排器類向MQ發送創建訂單命令,訂單服務訂閱事件并按照命令創建訂單。
5、主業務邏輯接收并處理Saga編排器類處理結果。
6、整個過程由Saga 編排器類對接收到的回復進行判決,來決定是繼續執行還是懸崖勒馬。
協同式(Choreography)
沒有集中式的編排類,而是各參與方間相互訂閱,一個服務訂閱另一個服務的事件。
先由事務發起方執行邏輯并發布一個事件,該事件被一個或多個服務進行訂閱,這些服務執行本地數據庫操作并發布(或不發布)新的事件,該部分需要保證本地數據庫的操作成功且寫入MQ的消息也成功,可考慮使用本地消息表或是基于MQ事務。當最后一個服務執行本地事務并且不發布任何事件或者發布的事件沒有被任何Saga參與者訂閱意味著事務結束,則整個業務流程的分布式事務完成。如果某一服務出現故障,那么則反向發布事件,執行補償操作,以此回滾。
以提交訂單為例,假設場景是分布式系統下,進程間以消息傳遞進行通信:
1、事務發起方執行主業務邏輯發送提交訂單命令。
2、庫存服務訂閱事件、扣減庫存并發布已扣減事件。
3、訂單服務訂閱庫存已扣減事件,創建訂單并發布訂單已創建事件。
4、支付服務訂閱訂單已創建事件,執行支付并發布訂單已支付事件。
5、主業務邏輯訂閱訂單已支付事件并處理。
當某服務內執行時如存在異常,則反向發布事件,如訂單創建失敗,則發布OrderCreatedFailed事件,庫存服務訂閱該事件并執行補償操作。
相比而言,編排式中參與服務無需向協同式中訂閱上游服務的事件,減少了服務間對事件協議的依賴,而只需要關心集權的編排器類發送的消息。
MassTransit Courier
補償服務
當開啟一個事務前,需要做一些準備,準備一個事務Id,記錄整個事務執行情況,各Tj事務執行情況,當前請求上下文參數,入參參數記錄等,以方便執行補償操作時需要用到。如當Tj事務執行失敗時,需要對Cj-1到C1執行補償操作,此時各補償操作需要一些正向執行T1,Tj-1的請求參數或執行結果,因此都需要記錄下來。
在Courier中,通過Routing Slip來完成這些記錄,創建一個Guid,記錄請求上下文參數信息,可以綁定幾個內置事件,在各階段到來時會發送事件,如有需要可以訂閱。
var builder = new RoutingSlipBuilder(NewId.NextGuid());builder.AddSubscription(context.ReceiveContext.InputAddress, RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted | RoutingSlipEvents.CompensationFailed);builder.AddVariable("RequestId", context.RequestId);builder.AddVariable("ResponseAddress", context.ResponseAddress);builder.AddVariable("FaultAddress", context.FaultAddress);builder.AddVariable("Request", context.Message);//組合一系列Activityvar routingSlip = builder.Build();await context.Execute(routingSlip).ConfigureAwait(false);
服務建立
弄了個Demo,建立了三個服務,此處我使用編排式來完成,但無論是選用編排式還是協同式,都借助RabbitMQ實現消息傳遞。
每個服務都安裝了MassTransit相關的包
MassTransit.AspNetCoreMassTransit.RabbitMQ
將Saga編排器類放置在OrderService中了,對于編排器類的放置,個人認為是應該看用例的主服務是誰而放置,想過放在BFF去協調三個服務,但是總是感覺不是BFF的職責范圍。
服務配置
在各服務中對MassTransit配置,如下在OrderService中對MassTransit需要使用到的RabbitMQ配置,對需要進行多個服務協作的用例配置Routing Slip,對消息隊列偵聽訂閱需要的事件并配置相應的Activity處理。
services.AddMassTransit(x =>{ var currentAssembly = Assembly.GetExecutingAssembly(); x.AddActivities(currentAssembly); x.AddConsumers(currentAssembly); x.AddRequestClient<createordercommand>(); x.UsingRabbitMq((context, cfg) => {// 配置RabbitMQcfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>{ h.Username(Configuration["RabbitmqConfig:Username"]); h.Password(Configuration["RabbitmqConfig:Password"]);});//配置Routing Slipcfg.ReceiveEndpoint("CreateOrderCommand", ep =>{ ep.ConfigureConsumer<createorderrequestproxy>(context); ep.ConfigureConsumer<createorderresponseproxy>(context);});// 配置訂閱隊列及Handler處理cfg.ReceiveEndpoint("CreateOrder_execute", ep =>{ ep.ExecuteActivityHost<createorderactivity, createordermodel="">(context);}); });});services.AddMassTransitHostedService();
服務編排
構建Routing Slip,此處依據用例的需求,對需要協作的服務編排,組合一系列的Activity。
Task BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<createordercommand> request){ builder.AddActivity("ReduceStock", new Uri("..."), new {}); builder.AddActivity("DeductBalance", new Uri("..."), new {}); builder.AddActivity("CreateOrder", new Uri("..."), new { }); return Task.CompletedTask;}
執行請求
當請求進入后,通過RequestClient發送CreateOrderCommand,同步等待執行結果,再由編排器類負責協調預設好的Activity,發送事件到消息隊列,經各Activity訂閱處理最終返回結果。
[Route("[controller]")]public class OrderController : ControllerBase{ private readonly IRequestClient<createordercommand> _createOrderClient; public OrderController(IRequestClient<createordercommand> createOrderClient) {_createOrderClient = createOrderClient; } [HttpGet("CreateOrder")] public async Task<commoncommandresponse<createorderresult>> CreateOrder() {var result = await _createOrderClient.GetResponse<commoncommandresponse<createorderresult>>(new CreateOrderCommand(){ // ...});return result.Message; }}
各服務中對于Activity設置偵聽隊列以及請求信息,調用Execute執行邏輯,當出現異常時返回到MQ通知編排器類,在對之前執行的Activity執行Compensate。如在CreateOrderActivity中執行異常,由編排器類執行補償,ReduceStockActivity調用Compensate,執行增加庫存邏輯
public class ReduceStockActivity : IActivity<ReduceStockModel, ReduceStockLog>{ public async Task<ExecutionResult> Execute(ExecuteContext<ReduceStockModel> context) {var argument = context.Arguments;// 扣減庫存await Task.Delay(100);return context.Completed(new ReduceStockLog() { ProductId = argument.ProductId, Amount = 1 }); } public async Task<CompensationResult> Compensate(CompensateContext<ReduceStockLog> context) {// 增加庫存await Task.Delay(100);return context.Compensated(); }}
執行成功
用例請求執行后,先由Controller發送請求,再由庫存服務扣減庫存,支付服務扣減余額,最后由訂單服務創建訂單,當創建失敗時,執行補償操作,庫存服務增加庫存,支付服務增加余額。
執行補償
用例請求執行后,先由Controller發送請求,再由庫存服務扣減庫存,支付服務扣減余額,最后由訂單服務創建訂單,當創建失敗時,執行補償操作,庫存服務增加庫存,支付服務增加余額。
在整個事務失敗后,先會返回異常,再由編排器執行補償操作,實現最終的數據一致性。MassTransit也提供了重試機制以實現向前恢復,避免因數據庫連接超時、網絡波動等問題造成的失敗。
參考文獻
到此這篇關于AspNetCore&MassTransit Courier實現分布式事務的文章就介紹到這了,更多相關AspNetCore分布式事務內容請搜索以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持!
