Cześć! Po dłuższej przerwie jestem bardzo zadowolony, że mogę Was zaprosić do zapoznania się z kolejnym wpisem w Cesarstwie-Dev. Tym razem wrócę do przetwarzania bezserwerowego i omówię temat zapowiedziany we wpisie o Azure Functions. Jeśli jeszcze nie miałeś okazji przeczytać tego artykułu oraz jego temat jest Ci obcy, to zalecam zapoznanie się z jego treścią. W poniższym wpisie będę zakładał znajomość podstaw Azure Functions. Poza opisem oraz przykładową implementacją Durable Functions opiszę dzisiaj aż dwa przypadki użycia, które miałem okazję popełnić. Co więcej, w jednym z nich nie wszystko poszło tak, jak planowaliśmy. Zaciekawiony? Zapraszam do artykułu! Uwaga, autoreklama! Jednocześnie przypominam, że jeśli jesteś zainteresowany kolejnymi wpisami na blogu, to zachęcam do zapisania się do subskrypcji widocznej z prawej strony.

Dwa Srebrne Szachy Na Białej Powierzchni
https://www.pexels.com/pl-pl/zdjecie/dwa-srebrne-szachy-na-bialej-powierzchni-411207/

Wstęp do Durable Functions

Czym jest rozszerzenie Durable Functions? W pytaniu zdradziłem już jedną rzecz! Durable Functions to rozszerzenie standardowych Azure Functions pozwalające na pisanie stanowych funkcji w bezserwerowym środowisku. Jak opisuje Microsoft: Behind the scenes, the extension manages state, checkpoints, and restarts for you, allowing you to focus on your business logic. Co oznacza to w praktyce? Rozszerzenie Durable Functions pozwala nam tworzyć długotrwałe procesy, których stan możemy sprawdzać w czasie ich działania. Co więcej, możliwe jest odczytanie stanu zakończonych już funkcji.

Dla wizualizacji myśli weźmy pod lupę przykład zgłoszenia szkody w aucie. Załóżmy, że użytkownik musi zgłosić szkodę, a następnie wgrać zdjęcia oraz opis szkody. Następnie pracownik musi zaakceptować podane informacje. Po tej akceptacji projektowany system może rozpocząć przetwarzanie zlecenia.

Proces zgłaszania szkody

Z powyższego diagramu możemy wywnioskować, że proces zgłaszania szkody jest długotrwały. Rozpoczyna się w chwili zgłoszenia szkody przez użytkownika. Następnie, system oczekuje na dwa wydarzenia, a dokładniej na wgranie zdjęć oraz wgranie detali. W chwili, gdy system zarejestruje oba te zdarzenia, to poinformuje o tym odpowiedniego pracownika. Jeśli potwierdzi on zgłoszenie, to zostanie ono zaakceptowane przez system.

Koordynacja procesu

Z poprzedniego rozdziału wynika, że możemy użyć rozszerzenia Durable Functions do implementacji długotrwałych procesów. W świecie mikroserwisów utrudnione jest koordynowania różnych zadań pomiędzy serwisami. Ten styl architektoniczny zapewnia niezależność, zarówno logiczną, jak i wdrożeniową, jednak wiąże się także z wieloma wyzwaniami. Jednym z nich jest właśnie orkiestracja procesów w celu zachowania spójności danych.

Programowanie ma to w sobie, że mimo oczywistej unikalności i niepowtarzalności naszych projektów, większość problemów została już kiedyś rozwiązana. Nie inaczej jest w tym wypadku. Do dyspozycji mamy wzorce projektowe takie jak Saga czy Process Manager. Na temat tych wzorców można napisać przynajmniej trzy wpisy na tym blogu, wiec zamiast tego odeślę do specjalistów. W tym odcinku podcastu BSD Mariusz Gil oraz Jakub Pilimon rozmawiają właśnie o nich. Wspomnę jedynie, o różnicy pomiędzy orkiestracją a choreografią.

ChoreografiaKażda operacja publikuje zdarzenia, na które reaguję procesy w innych serwisach
OrkiestracjaIstnieje orkiestrator, który nadzoruje oraz przechowuje logikę oraz kolejność wykonywanych operacji. Często zapewnia również akcje kompensacyjne
Choreografia a orkiestracja

Durable Functions – implementacja

Sporo teorii za nami i ani grama kodu! Najwyższy czas to zmienić. Wpierw zobaczymy podstawy, by następnie przejrzeć implementację opisanego wyżej procesu zgłaszania szkody. Stwórzmy wpierw projekt, wybierzmy odpowiedni wzór (Azure Functions) oraz odpowiedni wyzwalacz.

W bazowym projekcie stworzone zostały trzy funkcje o różnych wyzwalaczach. Przed analizą kodu, uruchommy program i zobaczmy jego działanie. W konsoli wyświetlą się istniejące funkcje – jedna z nich jest HttpTrigger. Skopiujmy z niej link oraz wklejmy w przeglądarkę. W konsoli powinniśmy widzieć:

Spójrzmy teraz na wygenerowane logi oraz przejrzyjmy kod.

[FunctionName("Function1")]
public static async Task<List<string>> RunOrchestrator(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var outputs = new List<string>();

    // Replace "hello" with the name of your Durable Activity Function.
    outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "Tokyo"));
    outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "Seattle"));
    outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "London"));

    // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
    return outputs;
}

[FunctionName("Function1_Hello")]
public static string SayHello([ActivityTrigger] string name, ILogger log)
{
    log.LogInformation($"Saying hello to {name}.");
    return $"Hello {name}!";
}

[FunctionName("Function1_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
    [DurableClient] IDurableOrchestrationClient starter,
    ILogger log)
{
    // Function input comes from the request content.
    string instanceId = await starter.StartNewAsync("Function1", null);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return starter.CreateCheckStatusResponse(req, instanceId);
}

Jedynym wyzwalaczem, za pomocą którego możemy uruchomić proces, jest HttpTrigger. Właśnie od ostatniej metody zaczynamy wędrówkę po kodzie. Warto zauważyć, że przyjmuje ona w parametrze klienta oznaczonego parametrem [DurableClient]. Za pomocą tego klienta możemy rozpocząć nową orkiestrację za pomocą metody StartNewAsync. Jednym z parametrów tej metody jest nazwa innej funkcji, która powinna być oznaczona atrybutem FunctionName oraz jest wyzwalana za pomocą atrybutu OrchestrationTrigger.

Metoda HttpStart zwraca obiekt zawierający informacje na temat orkiestracji, w tym linki. Za pomocą tych linków możemy między innymi sprawdzić status orkiestracji. Polecam zapoznać się z nimi we własnym zakresie! Wewnątrz wspomnianej metody RunOrchestrator mamy dostęp do kontekstu (context), za pomocą którego możemy wywoływać aktywności (funkcje wyzwalane ActivityTrigger) bądź sub-orkiestracje. Aktywności te są następnie kolejkowane oraz wywoływane. W tym miejscu możemy też poczekać na pewne zdarzenie, zamiast wywoływać konkretną aktywność. Wykonujemy to za pomocą metody WaitForExternalEvent – pokażę zastosowanie w przykładzie.

Jak to działa pod spodem?

Poniższa wiedza nie jest obowiązkowa do implementacji Durable Functions. Niemniej, warto wiedzieć jak to naprawdę działa. Przy aktywnościach wspomniałem, że ich wywołania są kolejkowane. Byłem bardzo dosłowny w swoich słowach. Możemy użyć wbudowanego w Visual Studio widoku Cloud Explorer i przekonać się, co zostało stworzone bez naszej wiedzy.

Skupmy się na kolejkach oraz tabelach, gdyż odgrywają one ważną rolę.

TypNazwaZastosowanie
Table_InstancesZawiera historię orkiestracji. Posiada takie dane jak nazwa, daty, status czy wynik.
Table_HistoryBardziej zaawansowana niż powyższa, gdyż zawiera więcej danych dla każdej orkiestracji (między innymi aktywności)
Queue_workitemsDo tej kolejki trafiają żądania wysłane przez wywołanie funkcji aktywności.
Queue_control-XTych kolejek mamy więcej, gdyż służą one do działania orkiestracji. Dlaczego jest ich więcej? W porównaniu do aktywności, orkiestracje są stanowe i muszą być wykonane na konkretnej instancji. Trafiają tutaj miedzy innymi odpowiedzi z aktywności.

Podsumujmy teraz dla uporządkowania trzy najważniejsze aspekty. Po pierwsze, mamy klienta orkiestracji (OrchestrationClient), który odpowiedzialny jest za jej rozpoczynanie oraz monitorowanie statusu. Następnie mamy orkiestrację, czyli funkcje wyzwalaną za pomocą OrchestrationTrigger. Służy ona do koordynowania procesu, który składa się z aktywności oraz (ewentualnie) sub-orkiestracji. Ostatecznie, mamy również funkcje aktywności (wyzwalane ActivityTrigger), które są pojedynczymi krokami procesu. Co ważne, aktywności są bezstanowe.

Durable Functions na przykładzie

Przejdźmy do wspomnianego już przykładu zgłaszania szkody! Zacznijmy of funkcji wejściowej.

        [FunctionName("ReportDamage")]
        public static async Task<HttpResponseMessage> ReportDamage(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequestMessage req,
            [DurableClient] IDurableOrchestrationClient starter)
        {
            var id = await starter.StartNewAsync("UserReportedDamage");
            return starter.CreateCheckStatusResponse(req, id);
        }

Przejrzyjmy również wszystkie funkcje aktywności (w uproszczonej postaci).

        [FunctionName("CreateNewDamageReport")]
        public static async Task<string> CreateNewDamageReport(
            [ActivityTrigger] object reportedDamage, ILogger log)
        {
            // logic with creating new damage report
            var id = Guid.NewGuid().ToString();
            log.LogInformation($"Created new damage report with id: {id}");
            return id;
        }

        [FunctionName("SendApprovalRequest")]
        public static void SendApprovalRequest(
            [ActivityTrigger] string id, ILogger log)
        {
            // logic with creating new damage report
            log.LogInformation($"Send approval request for damage report with id: {id}");
        }

        [FunctionName("PublishApprovedDamageEvent")]
        public static void PublishApprovedDamageEvent(
            [ActivityTrigger] string id, ILogger log)
        {
            // logic with creating new damage report
            log.LogInformation($"Publishing approved damage report with id: {id}");
        }

Jak widać, implementacja aktywności jest maksymalne uproszczona w celu ułatwienia przekazu. Przejdźmy teraz do najważniejszej metody – orkiestratora!

        [FunctionName("UserReportedDamage")]
        public static async Task UserReportedDamageSaga(
            [OrchestrationTrigger] IDurableOrchestrationContext context)
        {
            var id = await context.CallActivityAsync<string>(nameof(CreateNewDamageReport), new { });

            // Prepare tasks for data
            var imageTask = context.WaitForExternalEvent("ImageFilled");
            var detailsTask = context.WaitForExternalEvent("DetailsFilled");
            var bothTasks = Task.WhenAll(imageTask, detailsTask);

            //  Prepare task for timeout
            using var cts = new CancellationTokenSource();
            var timeoutAt = context.CurrentUtcDateTime.AddDays(5);
            var timeoutTask = context.CreateTimer(timeoutAt, cts.Token);

            var winner = await Task.WhenAny(bothTasks, timeoutTask);
            if (winner == bothTasks)
            {
                // Cancel timeout task
                cts.Cancel();
                await context.CallActivityAsync(nameof(SendApprovalRequest), id);
                await context.WaitForExternalEvent<bool>("DamageApprovalResponded");
                // skipping timeout logic to simplify
                await context.CallActivityAsync(nameof(PublishApprovedDamageEvent), id);
            }
            else
            {
                // Timeout happened
            }
        }

Metoda ta rozpoczyna swoje działanie od wywołania aktywności stworzenia nowego zgłoszenia szkody. Następnie zaczyna się coś ciekawego. Tworzymy taski, które odpowiadają czekaniem na zdarzenia związane z uzupełnieniem danych przez użytkownika. W kolejnym kroku taski te są łączone w jeden oraz tworzony jest task odpowiadający za kontrolę czasu. Następnie czekamy, aż któryś z powyższych tasków zostanie skończony. Jeśli jest to timeout to powinniśmy pewnie wysłać maila do klienta, jednak w razie poprawnie wprowadzonych danych wywołujemy aktywność, która prosi pewnego pracownika o potwierdzenie zgłoszenia. Przy czekaniu na jego odpowiedź pominąłem kod związany z maksymalnym czasem oczekiwania dla zwiększenia przejrzystości. Wydaje mi się, że powyższy kod dobrze oddaje intencje biznesowe procesu, jednak jestem pewny, że zastanawiasz się, skąd przychodzą te zdarzenia.

Przyjmowanie zdarzeń

        [FunctionName("SendImages")]
        public static async Task<HttpResponseMessage> SendImages(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequestMessage req,
            [DurableClient] IDurableOrchestrationClient client)
        {
            var instanceId = req.RequestUri.ParseQueryString()["instanceId"];
            await client.RaiseEventAsync(instanceId, "ImageFilled");
            return new HttpResponseMessage(HttpStatusCode.OK);
        }

        [FunctionName("SendDetails")]
        public static async Task<HttpResponseMessage> SendDetails(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequestMessage req,
            [DurableClient] IDurableOrchestrationClient client)
        {
            var instanceId = req.RequestUri.ParseQueryString()["instanceId"];
            await client.RaiseEventAsync(instanceId, "DetailsFilled");
            return new HttpResponseMessage(HttpStatusCode.OK);
        }

        [FunctionName("ApproveDamageReport")]
        public static async Task<HttpResponseMessage> ApproveDamageReport(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequestMessage req,
            [DurableClient] IDurableOrchestrationClient client)
        {
            var instanceId = req.RequestUri.ParseQueryString()["instanceId"];
            await client.RaiseEventAsync(instanceId, "DamageApprovalResponded", true);
            return new HttpResponseMessage(HttpStatusCode.OK);
        }

Powyższe funkcje są wyzwalane za pomocą Http. Co ważne, muszą one przyjąć w żądaniu id odpowiedniej orkiestracji. Oznacza to, że przy dodawaniu zgłoszenia do systemu musimy być w stanie zapamiętać tę wartość. Każda z tych metod wykorzystuje klienta orkiestracji do wyzwolenia odpowiedniego zdarzenia, które następnie wędruje do naszej orkiestracji. Proste i przyjemne, prawda? Załączam również screen z konsoli.

Z powyższego screenu wynika, że funkcja UserReportedDamage jest wywoływana kilka razy. Wynika to z faktu, że kiedy orkiestrator dojdzie do niewywołanej wcześniej aktywności (co wie na podstawie tabelki History), to wyśle wiadomość na kolejkę workitems i zakończy działanie. W kolejnym kroku budzona wiadomością na kolejce jest funkcja aktywności. Ta, kończąc swoje zadanie, wysyła wiadomość do kolejki control. Następnie orkiestrator ponownie jest uruchamiany – po pobraniu wiadomości z odpowiedniej kolejki control. Wtedy też wynik aktywności pobierany jest z tabelki History. Trzymanie takich danych umożliwia również opisywanemu rozszerzeniu wykonywania powtórzeń. Osiągniemy to wywołując metodę CallActivityWithRetryAsync zamiast CallActivityAsync.

Jako programiści musimy pamiętać, ze orkiestrator wywoływany jest wielokrotnie. Jego kod musi być deterministyczny, tak więc nie możemy w nim używać danych losowych (Guid.NewGuid), operacji I/O czy operacji asynchronicznych (poza wywoływaniem aktywności).

Z życia wzięte

Na koniec obiecane dwa przykłady! Zacznijmy od udanego. W pewnym projekcie potrzebowaliśmy usuwać stare dane pochodzące od różnych dostawców. W tym celu użyliśmy Durable Functions, które dla każdego dostawcy uruchamiały osobną sub-orkiestrację. Sub-orkiestracja ta pobierała dane do usunięcia i usuwała je, jeśli ich liczba była mniejsza od pewnego ustalonego procenta wszystkich danych. W przeciwnym wypadku wysyłała wiadomość na Slacku i czekała na potwierdzenie od jednego z programistów. Rozwiązanie to sprawdziło się idealnie i jesteśmy z niego bardzo zadowoleni.

Niestety, zdarzyło mi się też potknąć na używaniu tego rozszerzenia. Potrzebowaliśmy stworzyć narzędzie do pobierania danych w różnych etapach (przyjmijmy dwóch, jednak było ich więcej). Idea polegała na tym, że pierwszy etap zwracał nam listę obiektów, które powinniśmy załadować w drugim etapie. Tutaj również zdecydowaliśmy się do orkiestracji tego procesu poprzez Durable Functions. Danych było bardzo dużo, setki tysięcy, a czasami miliony. Zaimplementowane rozwiązanie okazało się zdecydowanie nieoptymalne czasowo, a co najgorsze potrafiło „zgubić” dane po drodze (między innymi przez ograniczone wielkości kolejek). W późniejszym czasie w czeluściach dokumentacji znaleźliśmy fragment mówiący, że nie zaleca się używać Durable Functions do przetwarzania dużej liczby danych. No cóż…

Zakończenie

Podsumowując, Durable Functions to rozszerzenie Azure Functions pozwalające nam pisać stanowy kod w bezserwerowym środowisku. Dobrze sprawdza się do koordynacji długotrwałych procesów, jak i łatwo je wykorzystać przy dodawaniu ludzkiej interakcji do kodu. Warto zapoznać się z innymi wzorcami tutaj.

Mam nadzieję, że udało Ci się dotrwać do końca. Powyższy wpis zawiera dużo informacji i przykładów, więc jestem przekonany, że każdy może z niego coś wynieść. Jeśli chcesz się podzielić jakimiś przemyśleniami na temat tego wpisu, to serdecznie zapraszam do dyskusji w komentarzach bądź mailowo (admin@cesarstwo-dev.pl).

Niezależnie, bardzo dziękuję za przeczytanie tego wpisu, jak i obecność na moim blogu. Serdecznie zapraszam do przejrzenia innych wpisów, jak i regularnego odwiedzania Cesarstwa-Dev. A ja tymczasem się żegnam, Cześć!


3 Komentarze

Mr K. · 2021-01-27 o 18:45

Bardzo fajny i merytorycznie na wysokim poziomie artykuł. Ciekawe wspomnienie o wzorcu Choreografii – może o tym kolejny wpis. Oby tak dalej!

    Cesarz · 2021-02-05 o 09:24

    Cześć! Dziękuję za komentarz i miłe słowa! 🙂

dotnetomaniak.pl · 2021-01-26 o 23:08

Durable Functions – podstawy i przykłady

Dziękujemy za dodanie artykułu – Trackback z dotnetomaniak.pl

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *