Głównie specjalizuję się w budowaniu procesów ETL odpowiedzialnych za integracje różnych źródeł danych za pomocą Apache NiFi.
  • Bezpieczna komunikacja pomiędzy NiFi, Registry oraz MiNiFi

Budowa prostego i automatycznego przepływu danych

W tym artykule przedstawię jak zbudować prosty przepływ danych wykorzystując następującej elementy: atrybuty flowfile, zmienne oraz wyrażenia regularne. Dobrą praktyką jest budowanie przepływów w osobnych Process Group oraz wersjonowanie ich za pomocą Apache NiFi Registry, umożliwi to w przyszłości łatwe i intuicyjne poruszanie się po Apache NiFi, gdy tych przepływów będzie znacznie więcej oraz kontrolowanie wersji flow między instancjami.

Utworzenie Process Group

Zaczynamy od stworzenia Process Group oraz dodania go do NiFi Registry, w celu wersjonowania zmian. W tym celu z górnego Menu wybieramy ikonkę o nazwie Process Group i przeciągamy na Canvas, wpisujemy nazwę i klikamy Add.

Utworzenie nowej Process Group

W kolejnym kroku dodamy ją do NiFi Registry, zatem naciskamy prawym przyciskiem myszy na utworzoną wcześniej grupę i z listy wybieramy Version, a następnie klikamy Start version control.

Utworzenie kontroli wersji

Apache NiFi wyświetli okno dialogowe, w którym zostaniemy poproszeni o określenie, który Bucket ma zostać wykorzystany do zapisania przepływu oraz uzupełnić jego nazwę. Dodatkowo, jeżeli mamy taką potrzebę to uzupełniamy Flow Description, w którym zawieramy dodatkowe informację na temat przepływu. Pole Version Comments, może posłużyć jako krótka informacja na temat danej wersji przepływu, jest to niezwykle przydatne w momencie, gdy rozwijamy przepływ i opisujemy co się zmieniło w konkretnej wersji przepływu. Po wszystkich tych czynnościach klikamy Save i obok naszej Process Group powinien pojawić się zielony ptaszek, który sygnalizuje, że przepływ danych jest zgodny z wersją zapisaną w NiFi Registry.

Wersjonowanie Flow

Konstrukcja przepływu danych

W tym przykładzie, działanie Flow będzie następujące: FlowFile będzie generowany w interwale 5-sekundowym, a następnie wykorzystując UpdateAttribute zostanie dodany atrybut operacja, do której przypiszemy dwie wartości(dodawanie, odejmowanie). W kolejnym kroku zostanie utworzony kolejny atrybut o nazwie wynik, który wykorzystując wyrażenia regularne NiFi, w zależności od wcześniej zdefiniowanej operacji będzie dodawał lub odejmował. Dodawanie będzie obejmowało wykorzystanie zmiennej składnikA, do której dodamy losowo wygenerowaną liczbę z zakresu od 1 do 10. Odejmowanie wykorzysta losową liczbę z zakresu od 1 do 20 i odejmie zmienną składnikB. W ostatnim kroku procesorem RouteOnAttribute rozdzielimy wygenerowane FlowFile na dwie kolejki w zależności od tego czy wynik jest dodatni lub ujemny.

Zmienne

W pierwszym kroku należy zdefiniować dwie zmienne, które będą wykorzystywane przy dodawaniu i odejmowaniu. W tym celu, należy prawym przyciskiem myszy kliknąć na Canvas naszej Process Group i wybrać opcję Variables. Plusikiem dodajemy obie zmienne, a rezultat tych działań powinien wyglądać następująco:

Zmienne składnikA i składnikB

GenerateFlowFile

Z menu wybieramy ikonę o nazwie Processor i przeciągamy na Canvas, następnie wyświetli się okno dialogowe z listą dostępnych procesorów. W filtrze wpisujemy GenerateFlowFile i dodajemy wyświetlony procesor.

Zawężona lista z dostępnymi procesorami

Aby uzyskać generowanie flowfile w 5-sekundowym interwale, należy w konfiguracji procesora wybrać zakładkę Scheduling i wpisać w Run Schedule wartość 5 sec.

Konfiguracja GenerateFlowFile

UpdateAttribute

W kolejnym kroku utworzymy trzy procesory, które znajdziemy pod nazwą UpdateAttribute. Kolejno w procesorach, należy dodać atrybut operacja o wartościach(dodawanie, odejmowanie).

UpdateAttribute z operacją odejmowania
UpdateAttribute z operacją dodawania

Ostatni UpdateAttribute zawiera wyrażenie regularne NiFi, które będzie odpowiadało za wygenerowanie atrybutu wynik zgodnie z wcześniej przedstawioną metodyką jego obliczania.

UpdateAttribute z wyrażeniem regularnym NiFi
${operacja:equals('dodawanie'):ifElse(${składnikA:plus(${random():mod(10):plus(1)})}, ${random():mod(20):plus(1):minus(${składnikB})})}

RouteOnAttribute

W ostatnim kroku utworzymy procesor RouteOnAttribute, który po atrybucie wynik będzie rozdzielał przychodzące flowfile na dodatnie lub ujemne. W tym celu, należy dodać dwie kolejki z dodatnie i ujemne z wyrażenia regularnym NiFi gt(większe niż) i lt(mniejsze niż).

RouteOnAttribute z wyrażeniem regularnym

Podsumowanie

Wszystko łączymy w całość i w efekcie przepływ danych powinien prezentować się, jak na poniższym zrzucie ekranu.

Finalna wersja przepływu danych

Na koniec prawym przyciskiem myszy wybieramy Version i Commit local changes, aby zaktualizować wersję przepływu danych z NiFi Registry.

Zapisywanie nowej wersji przepływu danych do NiFi Registry
Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *