Cách xây dựng một workflow thông qua masstransit saga state machine

Cách xây dựng một workflow thông qua masstransit saga state machine

December 16, 2020 0 By Nam Vu

Đặt vấn đề: 

Bạn đã từng xử lý một logic business chuyển trạng thái qua lại đơn giản hay phức tạp bao giờ chưa? Hãy tưởng tượng bạn đi mua hàng trên tiki thì quá trình từ khi bạn đặt hàng cho đến khi sản phẩm được giao tới tay bạn sẽ phải trải qua các trạng thái như thế nào? Không đơn thuần là đặt hàng cho và giỏ không đâu. Bên dưới xử lý lại là cả một bài toán xử lý khá là đau đầu. 

Bạn có thể xem qua một số workflow sau và cảm nhận chủ đề bài viết này nhé!

ntechdevelopers
ntechdevelopers

Giờ bạn thấy công việc xử lý workflow của một feature chính phức tạp thế nào rồi chứ. Rồi giờ thì chúng ta cùng giải quyết bài toán workflow này nào!

Ý tưởng:

– Ý tưởng 1: ý tưởng này nảy ra đầu tiên mà các anh em thường nghĩ tới đó là cứ If…Else… hay Switch…Case… thôi. Đơn giản là nếu gặp trạng thái này thì xử lý thế này, gặp trạng thái kia thì xử lý thế kia.

– Ý tưởng 2: sử dụng smart enum, nếu bạn chưa từng nghe qua hay chưa tìm hiểu về smart enum thì mình sẽ đề cập tới giải pháp này ở một bài viết khác. Đại ý của ý tưởng này là giúp bạn chuyển các state enum qua lại với những ràng buộc nhất định từ đó bạn có thể handle xem ở trạng thái đó thì mình sẽ thực thi gì. Ví dụ bạn đang có trạng thái A theo workflow thì nó phải chuyển sang trạng thái C, nếu mà gọi nó chuyển qua trạng thái B thì nó sẽ văng lỗi, khi đó bạn sẽ phân tách được trường hợp chuyển từ A sang C thì làm gì, từ C sang B thì làm gì, từ A sang B thì xử lý lỗi đó thế nào.

– Ý tưởng 3: ý tưởng này phát triển từ ý tưởng 2 tạo nên, tuy nhiên mỗi bước chuyển đổi trạng thái nó sẽ phát một event hay một tín hiệu nào đó, ở đâu đó nhận được tín hiệu này sẽ xử lý thực thi logic liên quan. Nếu bạn biết đến cơ chế publish/subcribe pattern (pub/sub) thì bạn sẽ dễ dàng nắm bắt được ý tưởng này, nó kết hợp giữa cách xử lý của smart enum khi chuyển trạng thái và cách xử lý truyền nhận tín hiệu (message) của pub/sub để thực thi xử lý. Đó chính là cơ chế hoạt động của saga pattern trong bài viết này.

Giải quyết vấn đề:

Bạn cho rằng ý tưởng 1 cũng giải quyết được mà, nhưng khi bạn If…Else… các trường hợp như vậy thì với số trạng thái lớn thì bạn phải xử lý khối công việc rất lớn. Chưa kể maintain khối code đó của bạn rất khó, nếu có một thay đổi nào đó, bạn phải biết nó sẽ được thêm vào đâu và có ảnh hưởng những đoạn ElseIf… khác không? Nhanh trước mắt nhưng mà lại gây khó khăn về sau.

Với ý tưởng số 2 cũng hay đó có thể áp dụng chẳng vấn đề gì khi thêm bớt state trong workflow. Tuy nhiên nếu bạn xử lý monolithic thì chẳng vấn đề gì, khi bạn xử lý một workflow liên service trong microservice thì nó lại là lại phát sinh ra một vấn đề khác đó là khả năng xử lý bất đồng bộ và các logic phụ thuộc rất nhiều với nhau. Bạn muốn thay thế một logic thực thi trong flow thì bạn phải can thiệp vào class smart enum này. 

Với Saga trong ý tưởng số 3 thì nó là một tính năng khá mạnh trong Masstransit (Một mã nguồn mở về message bus để xâc dựng các ứng dụng hướng microservice). Bình thường Masstransit đảm nhận nhiệm vụ giao tiếp giữa các service với nhau thông qua message queue (Một dữ liệu tín hiệu truyền nhận kiểu những lá thư trong hộp thư ấy). Điều đáng nói ở đây là khi bạn xử lý một logic phức tạp mà mỗi bước trong logic đó lại cần giao tiếp với service khác thì bạn cần phải fire event đi (phát tín hiệu). Và service nào đăng ký xử lý tín hiệu đó thì nó sẽ nhận nhiệm vụ chỉ xử lý khi có tín hiệu đó thôi, không quan tâm đến workflow tổng thể. Ở saga state machine nó sẽ giúp chúng ta đảm nhận việc gắn các step logic trong workflow và có cơ chế truyền nhận khi state thay đổi. Ngoài ra saga còn giúp chúng ta phát những tín hiệu xử lý thành công hay xử lý thất bại. Từ đó bạn có thể lắng nghe để xử lý các exception case (trường hợp ngoại lệ)

ntechdevelopers

Ok, khi bạn đã hiểu rõ ý tưởng và cách thức hoạt động của saga rồi thì cùng mình xây dựng một ứng dụng nhỏ để hiểu rõ hơn cách chuyển đổi state và handle sự kiện nhiều service nhé.

Đầu tiên thì mình có 1 workflow đơn giản như sau:

ntechdevelopers

Để implement saga state machine thì thành phần quan trọng nhất chính là MasstransitStateMachine. Class này giúp chúng ta handle logic chuyển state của toàn bộ flow trên. Đi kèm với class này đó là một model để chứa state cũng như các trường dữ liệu đi kèm với status khi chuyển đổi trạng thái. Với ví dụ demo workflow bên trên mình sẽ viết như sau

ntechdevelopers

Như hình trên thì cái quan trọng không kém MasstransitStateMachine chính là State. Tất nhiên rồi muốn chuyển trạng thái thì phải có danh sách trạng thái đúng không. Đối với Saga Pattern thông thường thì bạn phải định nghĩa cả State Intital và Final, tuy nhiên đối với Masstransit thì nó đã hỗ trợ cho mình 2 state đầu và cuối này. Điều dễ hiểu thôi, bất cứ workflow nào chẳng có điểm bắt đầu và điểm kết thúc chứ.

Một thành phần tiếp theo trong thiết kế này chính là Event. Như ngay từ đoạn ý tưởng mình đã nói, khi workflow thực thi thì nó chỉ có nhiệm vụ xâu chuỗi các logic lại với nhau thôi, còn khi chuyển từ logic này sang logic khác thì đều gọi thông qua event. Với cơ chế pub/sub thì bạn có thể đăng ký nhận và xử lý event ở bất cứ service nào. Đó là lý do mình thiết kế theo dạng Interface các event nhằm mục đích các service khác chỉ kế thừa và implement chúng. Nếu bạn trong cùng một service thì có thể implement class bình thường để đỡ phức tạp cũng không vấn đề.

Thành phần tiếp theo không thể thiếu đó là message queue rồi. Ở đây mình sử dụng rabbitMQ, bạn có thể dùng nhiều thư viên và dịch vụ message queue khác như Azure Service Bus, Amazon SQS hay Apache Kafka.

Cho những ai chưa biết về RabbitMQ.

RabbitMQ là một Message broker open-source, ban đầu được dùng cho Advanced Message Queuing Protocol (AMQP), sau đó đã được phát triển để hỗ trợ Streaming Text Oriented Messaging Protocol (STOMP), Message Queuing Telemetry Transport (MQTT), và những giao thức khác. RabbitMQ được viết bằng Erlang, một ngôn ngữ không phổ biến nhưng khá phù hợp với các công việc của Message Broker.
Message broker (hay còn gọi là integration broker hoặc interface engine) là một module trung gian trung chuyển message từ người gửi đến người nhận.

ntechdevelopers

Khi chúng ta có Event, State và có Message Queue rồi thì giờ chỉ việc khai báo Event và định nghĩa logic cho workflow thôi.

ntechdevelopers

Mình có một số lưu ý khi xây dựng logic định nghĩa này như sau:

Thứ nhất, các event phải có CorrelationId và mỗi một lời gọi hàm xuyên suốt flow thì CorrelationId phải duy nhất và xuyên suốt flow. Trường thông tin này giúp bạn xâu chuỗi loại toàn bộ flow đi qua từ bước từng state cũng như từng service, nó giúp bạn tracing dễ dàng hơn và giúp cho Masstransit nhận biết được workflow saga. Bạn tưởng tượng nó chính là hạt hồng cầu chạy trong mạch máu của bạn đi đến từng bộ phận trong cơ thể. Nó giúp bạn có thể truy vết được bạn đã đi đâu, làm gì ở từng state từng service.

Thứ hai, để khai báo một event thì bạn sử dụng cú pháp

Event(() => BookingRequestReceived, x => x.CorrelateById(context =>  context.Message.CorrelationId));

Khi này bạn khi workflow của bạn sẽ hiểu event đó khi state thay đổi, tuy nhiên bạn cũng có thể khai báo schedule để có thể set thời gian expired cũng như độ trễ khi nhận event. 

Schedule(() => BookingExpired, x => x.ExpirationId, x =>
{
    x.Delay = TimeSpan.FromSeconds(10);
    x.Received = e => e.CorrelateById(context => context.Message.BookingId);
});

Thứ 3, như mình nói ban đâu, masstransit hỗ trợ State Initial và Final giúp bạn nên bạn phải chú ý bạn phải không được bỏ quên định nghĩa logic 2 state này cho dù bạn không có state trung gian nào bên trong.

During(Initial,
        When(CreateBookingCommandReceived).Then(context =>
        {
            context.Instance.CustomerId = context.Data.CustomerId;
            context.Instance.RequestTime = context.Data.RequestTime;
            context.Instance.BookingId = context.Data.BookingId;
        })
        .Publish(ctx => new BookingRequestReceivedEvent(ctx.Instance))
        .TransitionTo(Submitted)
        .ThenAsync(context =>  Console.Out.WriteLineAsync(context.Instance.ToString()))
    );

// Define other state
...

SetCompletedWhenFinalized();

Bạn nhìn cú pháp của masstransit bên trên cũng khá dễ hiểu, định nghĩa During khi nhận được event CreateBookingCommandReceived chuyển từ state Initial sang state Submitted (với câu lệnh TransitionTo) thì nó sẽ publish event BookingRequestReceivedEvent  sau đó in ra thông tin dữ liệu (với câu lệnh ThenAsync… WriteLine…)
Đến đây thì dừng lại một chút, bạn sẽ phải handle 2 event từ bên ngoài service khác.
Đó là CreateBookingCommandReceived và BookingRequestReceivedEvent .
Với CreateBookingCommandReceived thì bạn sẽ gọi từ trên UI, đây chính là nơi phát tín hiệu nguồn đi vào workflow này của bạn. Nhận được tín hiệu này thì workflow sẽ khởi động chuyển từ trạng thái Inital sang trạng thái mình định nghĩa đầu tiên là Submitted. Sau When là Then thì bạn truyền input đầu vào của tín hiệu nguồn trên cho workflow. Khi chạy đến đây thỏa điều kiện đúng event, đúng state nguồn, đúng state đích, thì bạn sẽ thực phát tín hiệu BookingRequestReceivedEvent cho service khác nhận và handle xử lý nó. Đơn giản đúng không? Tương tự cho các step khác trong flow Submitted => Processed, Processed => Final, Processed => Cancelled, Cancelled => Final.

Điều lưu ý nữa đó chính là Ignore, hàm này giúp bạn hạn chế và chặn chuyển đổi sai flow giống như Smart Enum. Khi bạn muốn Processed => Cancelled, mà trong lúc này bạn có tín hiệu event BookingCreated thì nó sẽ văng lỗi và không cho bạn đi sai flow. Nhiệm vụ của bạn là chụp lấy lỗi đó và handle xử lý nó để đảm bảo logic workflow của bạn không đi ngược dẫn tới sai yêu cầu.

Được rồi, mình có workflow hoàn chỉnh rồi thì bạn đừng quên đăng ký nhận tín hiệu (RecieiveEndPoint) nhé. 

ntechdevelopers

Ở bước đăng ký event này thì mình có 2 vấn đề cần lưu ý. 

Điều đầu tiên là bạn đăng ký một endpoint cho message queue saga độc lập với các event còn lại. Như hình vẽ trên thì để mà state machine saga có thể tự vận hành chuyển đổi các state thì bạn phải đăng ký một endpoint riêng để saga có thể trao đổi message nội bộ.

// Register endpoint handle saga
var bus = BusConfigurator.ConfigureBus((cfg, host) =>
{
    Console.WriteLine("Register endpoint handle saga...");
    cfg.ReceiveEndpoint(host, RabbitMqConstants.SagaQueue, e =>
    {
        e.StateMachineSaga(sagaStateMachine, repository);
    });
});

Điều thứ hai là khi bạn đăng khi các event consumer khác mà cùng một endpoint message queue key thì bạn hãy tách độc lập mỗi bus ra. Hoặc nếu dùng chung thì bạn phải đăng ký với message queue key khác nhau. Vì các message state trao đổi bất đồng bộ nên nếu chung 1 endpoint với cùng một bus instance, message của bạn sẽ bị tắc trên rabbitMQ mà không consumer được.

ntechdevelopers

Bước cuối cùng là handler consumer cho các event publish của bạn và 2 event quan trọng là Successfull và Failed nhằm mục đích monitor theo dõi được workflow đi được đến đâu và có xảy ra lỗi gì không?
Đối với microservice này thì bạn handle consumer ở service nào cũng được, chỉ cần đăng ký nhận và có handle consumer là được thôi. 
Nếu bạn muốn tracking lại state workflow để có thể rollback hay tạo một job phụ để retry lại một step nào đó trong workflow thì bạn nên lưu thông tin xuống database, và tạo một con job xử lý vấn đề này để đảm bảo sự an toàn trong workflow của bạn.

Mình xin tổng kết lại bằng kết quả chạy từ ứng dụng demo của mình như sau:

– Start all service: Ntech.Saga.Service.Api (Đại diện cho service gateway call từ UI), Ntech.Saga.Service.Management (Đại diện cho service chứa saga state machine), Ntech.Saga.Service.Handlling (Đại diện cho service handle nhận event xử lý từ workflow)

ntechdevelopers

– Trigger event luồng booking happycase từ endpoint /booking trong Ntech.Saga.Service.Api

ntechdevelopers

– Trigger event luồng booking cancelled từ endpoint /booking/cancel trong Ntech.Saga.Service.Api

ntechdevelopers

– Trigger event luồng exception case từ endpoint /booking/fail trong Ntech.Saga.Service.Api

Dưới đây là source code demo các bạn có thể tham khảo.
# Github

Hi vọng bài viết có thể giúp ích cho các bạn hiểu được cách xây dựng một workflow thông qua masstransit saga state machine.

Chúc bạn may mắn!