Głównie specjalizuję się w budowaniu procesów ETL odpowiedzialnych za integracje różnych źródeł danych za pomocą Apache NiFi.
  • Budowa prostego i automatycznego przepływu danych

Ekstrakcja danych z plików Excel do MySQL

Przepływ wykonany zostanie z wykorzystaniem dodatkowych procesorów jak np. ConvertExcelToCSVProcessor czy PutDatabaseRecord oraz wsadowego FlowFile z danymi w formacie JSON z parametrami połączenia do serwera SFTP i informacją, które pliki Excel należy zaimportować do bazy danych MySQL. Pracę nad przepływem będą sukcesywnie zapisywane na poszczególnych etapach pracy do Apache NiFi Registry.

Podgląd danych z plików XLSX

Poniżej przedstawiam przykładowy plik Excel, który zawiera w sobie dwa arkusze z danymi sprzedażowymi oraz z listą kontrahentów. Całość została podzielona na dwa pliki, które zostaną zaimportowane do MySQL.

Arkusz Sales z pliku example_1.xlsx
Arkusz Customer_List z pliku example_1.xlsx

Konstrukcja przepływu danych

Początek przepływu został zaczerpnięty po części z poprzedniego artykułu Proste scalanie plików CSV pobranych z SFTP, czyli operować będziemy na informacjach jakie dostarczy nam wsadowy FlowFile z danymi w formacie JSON. Informacje w wygenerowanym FlowFile z JSON zostaną odpowiednio wyodrębnione i zapisane w postaci atrybutów za pomocą procesorów EvaluateJsonPath. Kolejne procesory SplitContent / ExtractText podzielą listę z plikami Excel do zaimportowania, przez co otrzymamy FF z nazwą dla każdego pliku, który chcemy przeprocesować. Dane z JSON pozwolą nam na połączenie i pobranie plików Excel z serwera SFTP przy wykorzystaniu procesora FetchSFTP. Posiadając już pliki xlsx z danymi, użyjemy procesora ConvertExcelToCSVProcessor, który wyodrębni każdy arkusz do osobnego FlowFile. Otrzymane dane w FF zaimportujemy przy użyciu procesora PutDatabaseRecord do serwera MySQL.

GenerateFlowFile z danymi w formacie JSON

We właściwościach procesora w atrybucie Custom Text umieszczamy, poniższe dane.

{
   "sftp":{
      "Hostname":"10.10.0.132",
      "Port":"22",
      "User":"sftp",
      "Password":"Test123",
      "remoteDir":"/home/sftp/xlsx",
      "listFiles":"example_1.xlsx,example_2.xlsx"	  
   }
}
Właściwość procesora GenerateFlowFile

EvaluateJsonPath

Pierwszy procesor jest odpowiedzialny za wyodrębnienie wszelkich informacji zawartych w JSON, a następnie zapisze w atrybutach FlowFile.

Wyodrębnienie informacji z JSON do atrybutów FlowFile

Kolejny, wyodrębni zawartość listFiles do zawartości FlowFile co pozwoli nam na podzielenie FF na osobne pliki, które potrzebne są do dalszego procesowania.

Procesor, który wyodrębnia listFile do zawartości FlowFile

SplitContent / ExtractText

Kolejne procesory podzielą pierwotny FlowFile i utworzą osobne FF dla każdego pliku xlsx wraz z ich oryginalnymi nazwami. Parametr Byte Sequence wskazuje na seperator, po którym zostanie podzielona lista.

Właściwości procesora SplitContent
Zmiana nazwy FlowFile na nazwę pliku xlsx

FetchSFTP

Konfiguracja procesora oparta jest o parametry, które wcześniej zostały wyodrębnione z wejściowego FlowFile zawierającego dane w formacie JSON. Atrybut ${sftpPassword}, należy uzupełnić we właściwości Password procesora.

Właściwości procesora FetchSFTP odpowiedzialnego za pobranie plików xlsx

ConvertExcelToCSVProcessor

Wskazane, poniżej ustawienia procesora ConvertExcelToCSVProcessor pozwolą na poprawne wyodrębnienie arkuszy z plików Excel do osobnych FlowFile, co pozwoli w kolejnym etapie na rozróżnienie, które arkusze mają zostać załadowane do poszczególnych tabel.

Właściwość procesora ConvertExcelToCSVProcessor

PutDatabaseRecord

Oprócz konfiguracji tego procesora, niezbędne jest wykonanie dodatkowych akcji w postaci utworzenia CSVReader oraz DBCPConnectionPool. Table Name, jak można zauważyć na poniższym zrzucie budowana jest wyrażeniem regularnym, na podstawie nazwy FlowFile.

Konfiguracja procesora PutDatabaseRecord z dodatkowymi dwoma Controller Services

Wyrażenie regularne użyte do zbudowania nazwy tabeli docelowej, do której będą importowane dane działa w oparciu o nazwę FlowFile. Arkusze z nazwą Customer_List będą ładowane do tabeli customer, a dane sprzedażowe z arkusza Sales do tabeli sales.

${filename:contains('Sales'):ifElse('sales', ${filename:contains('Customer_List'):ifElse('customer',${filename})})}

Record Reader

W pierwszej kolejność przechodzimy na pole Record Reader i lewym przyciskiem z menu wybieramy Create new service. Z listy dostępnym kontrolerów, należy wybrać CSVReader, a następnie Create.

Okno dialogowe z wyborem utworzenia nowego kontrolera

Przechodzimy do ustawień utworzonego wcześniej kontrolera i konfigurujemy jego parametry zgodnie z załączonym zrzutem. Ważne jest, aby właściwość Schema Access Strategy został ustawiony na Use String Fields From Header.

Ustawienia kontrolera CSVReader

DBCPConnectionPool

Zanim przejdziemy do konfigurowania kontrolera DBCPConnectionPool, należy pobrać wcześniej prawidłowy konektor bazy danych, który pozwoli na podłączenie się do serwera bazodanowego MySQL. W moim przypadku jest to MySQL Connector/J 8.0.30, pobiorę go do wcześniej utworzonego katalogu custom_lib w /opt/nifi.

sudo wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar -P /opt/nifi/custom_lib

Analogicznie do poprzednich kroków, należy utworzyć kontroler, który posłuży do połączenia się z bazą danych. Klikamy zatem we właściwości Database Connection Pooling Service, wybieramy Create new service i domyślnie pojawi się DBCPConnectionPool. Uzupełniamy właściwości kontrolera, jak poniżej ze wskazaniem własnego serwera i bazy danych, do której chcecie się połączyć.

Ustawienia kontrolera DBCPConnectionPool

Podsumowanie

Wszystkie wymagane procesory zostały przez Nas odpowiednio skonfigurowane, a więc nie pozostaje nic innego jak ich należyte połączenie odpowiednimi kolejkami, jak na poniższym zrzucie ekranu.

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *