Praktyczny przewodnik po platformie Apache Kafka
Systemy komunikacji sterowanej zdarzeniami (Message Brokers) umożliwiają luźne powiązanie usług i komponentów w organizacji lub projekcie, zapewniając jednocześnie komunikację asynchroniczną, skalowalność, wysoką przepustowość, niezawodność i bezpieczeństwo przesyłanych danych.
Apache Kafka to jeden z najpopularniejszych brokerów wiadomości, znany ze swojej wydajności i szeroko stosowany przez firmy takie jak Netflix, Uber i LinkedIn. Ale czy rzeczywiście spełnia te oczekiwania? Czy zapewnia bezpieczeństwo, skalowalność i niezawodność danych? Czy działa optymalnie zaraz po instalacji, czy wymaga dodatkowej konfiguracji?
Architektura platformy Apache Kafka
W tej sekcji przedstawię i pokrótce omówię komponenty, które składają się na typową instalację Apache Kafka. Zrozumienie, jak działają, ułatwi dalszą analizę funkcjonalności i opcji konfiguracji platformy Kafka.
Jakie są podstawowe składniki platformy Apache Kafka?
Powyższy diagram ilustruje typowe składniki instalacji Apache Kafka. Kluczowym elementem jest klaster Kafka, który składa się z brokerów – serwerów odpowiedzialnych za przechowywanie i przetwarzanie danych.
Klaster jest zarządzany przez Apache Zookeeper (począwszy od wersji 2.8.0 wprowadzono alternatywne rozwiązanie o nazwie Kafka Kraft, eliminując potrzebę korzystania z Zookeepera; od wersji 3.5.0 Zookeeper został oznaczony jako przestarzałe).
Każdy broker przechowuje dane w tematach, które są podzielone na partycje. Partycjonowanie odgrywa kluczową rolę w skalowalności i wydajności platformy Kafka, ponieważ umożliwia wielu odbiorcom równoległe przetwarzanie danych.
Co to jest partycjonowanie i kolejność wiadomości?
Podczas pisania wiadomości do tematu platforma Kafka przypisuje je do określonej partycji na podstawie jednej z dwóch metod:
- Klucz partycji — jeśli klucz zostanie określony, wszystkie wiadomości z tym samym kluczem są wysyłane do tej samej partycji.
- Algorytm działania okrężnego — jeśli nie określono klucza, platforma Kafka równomiernie dystrybuuje komunikaty między dostępnymi partycjami.
W ramach jednej partycji platforma Kafka gwarantuje kolejność komunikatów (na podstawie czasu zapisu). Nie gwarantuje jednak globalnego porządku na różnych partycjach w ramach tego samego tematu.
Każda wiadomość w partycji ma unikatowy identyfikator nazywany przesunięciem, który umożliwia użytkownikom śledzenie przetwarzania danych i zarządzanie nim.
Co to jest grupa odbiorców i przetwarzanie równoległe?
Użytkownicy platformy Kafka są przypisywani do grup odbiorców. Każdy odbiorca w grupie odczytuje komunikaty z co najmniej jednej partycji, ale żadna partycja nie może być obsługiwana przez więcej niż jednego odbiorcę w tej samej grupie.
Przykłady:
- Temat ma 3 partycje, a grupa odbiorców składa się z jednego odbiorcy, → odbiorca odbiera komunikaty ze wszystkich partycji. (Schemat powyżej)
- Temat ma 3 partycje, a grupa składa się z 3 odbiorców, → każdy odbiorca odbiera komunikaty z jednej partycji. (Schemat powyżej).
- Temat ma 3 partycje, a grupa składa się z 4 odbiorców → jeden z nich pozostaje nieaktywny. (Schemat powyżej).
Grupa konsumentów może być traktowana jako aplikacja Spring Boot lub Quarkus, podczas gdy indywidualny konsument może być traktowany jako wątek w tej aplikacji. W związku z tym podczas planowania przepływności wdrożonego systemu należy odpowiednio wybrać liczbę partycji w tematach — liczba partycji określa maksymalną liczbę wątków, które mogą przetwarzać komunikaty równolegle.
Rola brokera w klastrze Kafka
Każdy broker pełni kilka kluczowych funkcji:
- Partycje wiodące — broker może pełnić rolę lidera dla wybranych partycji i obsługiwać ich operacje odczytu/zapisu.
- Replikacja danych — broker przesyła dane partycji do innych serwerów w klastrze, zapewniając nadmiarowość.
- Zarządzanie offsetami konsumenta (offset konsumenta = punkt, w którym dany konsument zakończył czytanie komunikatu) – Broker przechowuje informacje o tym, które komunikaty zostały już przeczytane.
- Obsługa przechowywania danych w tematach — broker kontroluje usuwanie starszych danych zgodnie z zasadami przechowywania ustawionymi dla danego tematu w partycji.
Co to jest Kafka Connect? Jak Kafka integruje się z innymi systemami?
Kolejnym składnikiem architektury jest Kafka Connect. Jest to mechanizm, który umożliwia Kafce integrację z innymi systemami, umożliwiając komunikację ze źródłami zewnętrznymi. Składa się z dwóch rodzajów złączy:
- Źródło — łączniki, które odczytują dane z systemów źródłowych (np. baz danych, systemów plików, S3) i wysyłają je do platformy Kafka.
- Ujście — łączniki, które pobierają/odczytują dane z określonych tematów platformy Kafka i zapisują je w systemach docelowych (np. bazach danych, systemach plików, S3).
Na rynku dostępnych jest wiele gotowych do użycia konektorów. Do najpopularniejszych dostawców należą:
- Łączniki Confluent → Confluent (niektóre wymagają licencji).
- Łączniki Camel → Apache Camel (licencja Apache 2.0).
- Lenses.io → Lenses.io konektorów (licencja Open Source).
Warto zauważyć, że w przypadku Confluent licencja może się różnić w zależności od konektora – niektóre wymagają zapłaty, podczas gdy inne są open source. Czasami nawet łączniki źródła i ujścia dla tego samego systemu mogą mieć różne licencje.
Ciekawą opcją są Lenses.io konektorów, które oferują między innymi konektor JMS – zastosowaliśmy go w jednym z naszych projektów.
Rola aplikacji strumieniowych – przetwarzanie danych w Kafce
Ostatnim komponentem pokazanym na diagramie architektury jest Stream Application. Jest to komponent, który pobiera dane z tematu, przetwarza je i zapisuje w temacie docelowym. Pełni rolę zarówno producenta, jak i konsumenta komunikatów.
Aplikacje strumieniowe mogą być tworzone przy użyciu różnych frameworków, takich jak:
- Wiosenny rozruch
- Quarkus (Quarkus)
Typowe operacje wykonywane przez aplikacje strumieniowe obejmują:
- Przekształcenia wiadomości
- Obliczenia
- Agregacja, scalanie wiadomości z wielu tematów
Ostatnim komponentem pokazanym na diagramie architektury jest Stream Application. Jest to komponent, który pobiera dane z tematu, przetwarza je i wysyła do tematu docelowego. Innymi słowy, łączy w sobie funkcjonalności konsumenta i producenta Kafki.
Aplikacje te mogą być tworzone przy użyciu różnych frameworków, takich jak Spring Boot lub Quarkus, i mogą wykonywać transformacje komunikatów, obliczenia, agregacje i scalanie komunikatów z wielu tematów.
Omówione powyżej komponenty stanowią podstawę architektury Apache Kafka. W kolejnych sekcjach przyjrzymy się ich konfiguracji i optymalizacji.
Platforma Kafka ma obszerną dokumentację, a wiele dodatkowych zasobów można znaleźć w Internecie. Warto na przykład zapoznać się z takimi elementami jak Schema Registry, który umożliwia zarządzanie schematami wiadomości przesyłanych w Kafce – nie zostało to omówione powyżej.
Obszary funkcjonalne platformy Apache Kafka
Zanim zagłębimy się w szczegółową analizę, warto omówić kluczowe obszary funkcjonalne, które powinien zapewnić broker komunikatów:
- Wdrożenie i konfiguracja – broker (lub otaczający go ekosystem narzędzi) powinien umożliwiać łatwe wdrażanie komponentów, zarządzanie kolejkami, kontrolę dostępu i elastyczną konfigurację systemu.
- Monitorowanie – Kluczowym aspektem działania brokera (lub jego ekosystemu) jest możliwość monitorowania komponentów i zbierania kluczowych wskaźników, takich jak przepustowość, wykorzystanie zasobów (CPU/RAM), rozmiar kolejki i inne parametry operacyjne.
- Bezpieczeństwo – broker powinien obsługiwać bezpieczne protokoły transmisji danych oraz implementować mechanizmy uwierzytelniania i autoryzacji w celu kontroli dostępu do kolejek i operacji.
- High Availability and Disaster Recovery – broker powinien wspierać mechanizmy zapewniające ciągłość działania, takie jak replikacja danych, strategie disaster recovery i tworzenie kopii zapasowych.
- Wysoka wydajność przetwarzania komunikatów – Broker powinien zapewniać przetwarzanie komunikatów o wysokiej przepustowości przy optymalnym wykorzystaniu zasobów systemowych (CPU/RAM).
W kolejnych sekcjach przeanalizujemy te obszary w kontekście platformy Apache Kafka, omawiając jej możliwości i podkreślając elementy, które należy wziąć pod uwagę, aby uniknąć potencjalnych problemów.
Wdrażanie i konfigurowanie platformy Kafka
Apache Kafka oferuje różne opcje instalacji. W naszych projektach najczęściej wdrażamy Kafkę na klastrach Kubernetes lub RedHat OpenShift, ponieważ zapewniają one wysoką dostępność i bezpieczeństwo (szczegółowe omówienie tych aspektów wykracza poza zakres tego artykułu).
Jednym z najpopularniejszych sposobów instalowania platformy Apache Kafka na platformie OpenShift jest Strimzi (strimzi.io), który jest zgodny ze wzorcem operatora.
Strimzi wprowadza zestaw dedykowanych zasobów (Custom Resource Definitions – CRDs), które pozwalają na opisanie instalacji klastra Kafka, Kafka Connect, konektorów, definiowanie tematów i zarządzanie uprawnieniami dostępu.
Jedną z głównych zalet Strimzi jest to, że znacznie upraszcza instalację całego ekosystemu Apache Kafka, który – jak omówiono w sekcji architektury – składa się z wielu komponentów.
W poniższych przykładach używamy szablonów programu Helm (helm.sh) do definiowania zasobów dla klastra OpenShift.
Instalowanie klastra Apache Kafka
Podstawowym CRD jest Kafka CRD, który umożliwia instalację klastra Kafka:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: {{ include "broker.fullname" . }} labels: {{- include "broker.labels" . | nindent 4 }} spec: kafka: resources: (1) requests: cpu: {{ .Values.kafka.resources.requests.cpu }} memory: {{ .Values.kafka.resources.requests.memory }} limits: cpu: {{ .Values.kafka.resources.limits.cpu }} memory: {{ .Values.kafka.resources.limits.memory }} version: 3.4.0 replicas: {{ .Values.kafka.replicas}} (2) …
W (1) określamy zasoby obliczeniowe dla brokerów platformy Kafka, takie jak żądane limity procesora CPU/RAM oraz procesora CPU i pamięci RAM.
W (2) definiujemy liczbę brokerów, którzy będą tworzyć klaster Apache Kafka.
W tym samym CRD możemy również określić parametry instalacji dla Zookeepera.
zookeeper: resources: (1) requests: cpu: {{ .Values.zookeeper.resources.requests.cpu }} memory: {{ .Values.zookeeper.resources.requests.memory }} limits: cpu: {{ .Values.zookeeper.resources.limits.cpu }} memory: {{ .Values.zookeeper.resources.limits.memory }} replicas: {{ .Values.zookeeper.replicas }} (2)
W (1) określamy zasoby procesora/pamięci RAM przydzielone dla usługi Zookeeper, a w (2) liczbę jego wystąpień.
Dodatkowo w tym samym CRD możemy skonfigurować parametry dysku dla Apache Kafka, a także szczegółowe ustawienia konfiguracyjne.
storage: (1) type: jbod volumes: - id: 0 type: persistent-claim size: {{ .Values.kafka.storage.size }} deleteClaim: true class: {{ .Values.kafka.storage.class }} - id: 1 type: persistent-claim size: {{ .Values.kafka.storage.size }} deleteClaim: true class: {{ .Values.kafka.storage.class }} config: (2) message.max.bytes:{{ .Values.kafka.config.messageMaxBytes }} (3) num.partitions:{{.Values.kafka.config.defaultNumPartitions }}(4) log.retention.hours:{{.Values.kafka.config.logRetentionHours}}(5) num.network.threads: {{.Values.kafka.config.networkThreads }} (6) num.io.threads: {{.Values.kafka.config.ioThreads }} (7)
W (1) znajduje się definicja magazynu dla tworzonego klastra Apache Kafka.
W (2) definiujemy szczegółowe parametry konfiguracyjne dla brokera. W powyższym przykładzie określono kilka podstawowych parametrów, takich jak:
- (3) maksymalny rozmiar wiadomości,
- (4) domyślna liczba partycji,
- (5) domyślny okres przechowywania tematów,
- (6) liczba wątków sieciowych,
- (7) liczba gwintów I/O.
Po załadowaniu powyższej konfiguracji do klastra Kubernetes lub OpenShift i przetworzeniu jej poprzez operatora Strimzi, otrzymamy w pełni funkcjonalny klaster Apache Kafka zarządzany przez Zookeepera.
Jego działanie można zweryfikować za pomocą odpowiednich narzędzi:
Po wystawieniu PODów zobaczymy zarówno brokerów, jak i Zookeepera:
Kolejnym niezbędnym składnikiem instalacji jest Kafka Connect, który można uzyskać za pomocą innego CRD dostarczonego przez firmę Strimzi, o nazwie KafkaConnect:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: connect-cluster-{{ include "broker.fullname" . }} annotations: strimzi.io/use-connector-resources: "true" spec: image:{{ .Values.kafkaConnect.image.repository}}:{{ .Values.kafkaConnect.image.tag}} replicas: {{ .Values.kafkaConnect.replicas}} (1) bootstrapServers: {{ include "broker.fullname" . }}-kafka-bootstrap:9093 (2)
W (1) definiujemy liczbę wystąpień składających się na klaster Kafka Connect, a w (2) określamy odwołanie do klastra Apache Kafka.
KafkaConnect CRD pozwala nam również na definiowanie innych elementów, takich jak ustawienia zabezpieczeń, pamięć masowa, alokacja zasobów i parametry konfiguracyjne Zookeepera.
Po wgraniu tego CRD do klastra OpenShift uzyskamy w pełni funkcjonalny klaster Kafka Connect, który można zweryfikować za pomocą następującego polecenia:
W ten sposób sprawnie i szybko zainstalowaliśmy klaster Apache Kafka oraz Kafka Connect. Ostatnim krokiem jest wdrożenie przykładowego tematu i przykładowego łącznika.
Zacznijmy od definicji tematu, do której Strimzi udostępnia dedykowany CRD – KafkaTopic:
apiVersion: kafka.strimzi.io/v1beta2
uprzejmy: KafkaTopic
METADANE:
Nazwa: {{ .Values.kafkaTopics.file.data.name }} (1)
Etykiety:
strimzi.io/cluster: {{ . Values.kafkaTopics.cluster }}
Spec:
partycje: {{. Values.kafkaTopics.file.data.partitions }} (2)
repliki: {{. Values.kafkaTopics.file.data.replicas }} (3)
Konfiguracja:
retention.ms:{{. Values.kafkaTopics.file.data.config.retention}} (4)
Ten prosty plik pozwala nam określić:
- (1) Nazwa tematu.
- (2) Liczba partycji dla tematu — jeśli nie zostanie określona, platforma Kafka użyje wartości domyślnej skonfigurowanej na poziomie brokera (zgodnie z opisem w CRD dla klastra Kafka).
- (3) Liczba replik dla tematu – określa, ile kopii danych powinno być przechowywanych w klastrze.
- (4) Okres przechowywania dla danego tematu – czas, przez jaki dane są przechowywane przed ich usunięciem. Wiadomości starsze niż określona wartość zostaną usunięte.
Parametr przechowywania ma kluczowe znaczenie dla zarządzania przestrzenią dyskową klastra platformy Kafka. Różne tematy mogą mieć różne wartości przechowywania w zależności od konkretnych potrzeb — tematy o większym znaczeniu biznesowym mogą mieć przypisane dłuższe okresy przechowywania.
Teraz przygotujmy przykładową definicję łącznika przy użyciu KafkaConnector CRD.
apiVersion: kafka.strimzi.io/v1beta2
uprzejmy: KafkaConnector
METADANE:
Nazwa: jmssourceconnector-{{ $connector.name }}
Etykiety:
strimzi.io/cluster: {{ $. Values.kafkaConnectors.kccluster }}
Spec:
klasa: com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector (1)
tasksMax: {{ . Values.connector.tasksMax }} (2)
Automatyczne ponowne uruchomienie:
włączone: {{ $. Values.connector.autoRestart }} (3)
Konfiguracja (4)
connect.jms.connection.factory: {{ . Values.connector.connectionFactory }}
connect.jms.destination.selector: {{ . Values.connector.selector }}
connect.jms.initial.context.factory:{{. Values.connector.contextFactoryClass }}
connect.jms.kcql: WSTAW DO test_topic WYBIERZ * Z {{ . Wartości .connector.queue }} WITHTYPE QUEUE
connect.jms.queues: {{ . Wartości.łącznik.kolejka }}
connect.jms.url: {{ . Wartości.connector.urls }}
connect.progess.enabled: {{ . Values.connector.progessEnabled }}
connect.jms.scale.type: {{ . Wartości.scaleType }}
connect.jms.username: {{ . Wartości.nazwa użytkownika }}
connect.jms.password: {{ . Wartości.hasło }}
Ten plik definiuje łącznik źródłowy JMS, który jest odpowiedzialny za odczytywanie danych z kolejki JMS i przekazywanie ich do tematu platformy Kafka:
- (1) Klasa implementująca konektor – określa komponent, który będzie uruchamiany w celu odbierania wiadomości z serwera JMS. Każdy łącznik ma własną implementację w postaci plików JAR dostarczonych przez dostawcę łącznika. Te pliki muszą znajdować się w klastrze Kafka Connect, aby mogły być używane w czasie wykonywania.
- (2) Liczba zadań łącznika — określa liczbę wystąpień zadań uruchomionych w klastrze Kafka Connect. Parametr ten pozwala na kontrolowanie liczby instancji równoległych, co wpływa na przepustowość przetwarzania danych.
- (3) Parametry konfiguracyjne konektora – specyficzne dla danej implementacji. Należy je ustawić zgodnie z dokumentacją dostarczoną przez dostawcę łącznika. W podanym przykładzie są to parametry wymagane do nawiązania połączenia z zewnętrznym serwerem JMS.
W ten sposób zakończyliśmy proces instalowania klastra Apache Kafka i Kafka Connect, a także wdrożenia przykładowego tematu i przykładowego łącznika.
Jak widzimy, Kafka oferuje wiele możliwości konfiguracyjnych, a Strimzi pozwala na ich definiowanie za pomocą dedykowanych CRD. Zastosowanie Helma umożliwia parametryzację tych CRD (poprzez dedykowane pliki Values), co pozwala na utrzymanie wielu konfiguracji, np. dla środowisk deweloperskich, testowych i produkcyjnych.
Jedną z kluczowych zalet tego podejścia do zarządzania instalacją i konfiguracją klastra Kafka jest możliwość przechowywania plików konfiguracyjnych (szablonów) w systemie kontroli wersji – są to pliki YAML przechowywane w dedykowanym repozytorium. Dzięki temu możliwy jest łatwy audyt zmian i śledzenie historii modyfikacji.
Tę konfigurację można następnie wdrożyć w środowiskach przy użyciu potoków w narzędziach takich jak GitLab lub Jenkins. Dodatkowo możliwe jest zastosowanie systemów automatycznej synchronizacji, takich jak ArgoCD (https://argo-cd.readthedocs.io/en/stable/), które monitorują repozytorium pod kątem zmian i automatycznie propagują je do środowisk docelowych.
Po omówieniu procesu wdrażania i konfiguracji Kafki przejdźmy teraz do tematu bezpieczeństwa.
Bezpieczeństwo systemu komunikacji sterowanej zdarzeniami
Każdy system przeznaczony do komunikacji sterowanej zdarzeniami powinien zapewniać bezpieczeństwo przesyłanych danych, w tym szyfrowanie, uwierzytelnianie i autoryzację. W związku z tym Apache Kafka, zgodnie z oficjalną dokumentacją, oferuje następujące mechanizmy bezpieczeństwa:
- Uwierzytelnianie połączeń klienta z brokerem przy użyciu protokołów takich jak SSL lub SASL.
- Uwierzytelnianie komunikacji między brokerem a opiekunem zoo.
- Szyfrowanie przesyłanych danych – zarówno pomiędzy klientami a brokerem, jak i pomiędzy brokerami podczas replikacji.
- Autoryzacja operacji wykonywanych przez klientów w celu kontroli dostępu do tematów i zasobów.
Dostosowanie platformy Apache Kafka do wymagań zabezpieczeń wymaga skonfigurowania wielu parametrów. Na przykład konieczne jest skonfigurowanie bezpiecznego odbiornika:
listeners=BROKER://localhost:9092
listener.security.protocol.map=BROKER:SASL_SSL,KONTROLER:SASL_SSL
oraz generowanie kluczy SSL za pomocą narzędzi takich jak keytool. Proces ten wymaga szczególnej uwagi i skupienia, ale zdecydowanie warto go wdrożyć, aby zapewnić pełne bezpieczeństwo przesyłanych danych.
Jeśli platforma Kafka jest wdrażana w klastrze Kubernetes lub OpenShift i korzysta z narzędzia Strimzi, konfiguracja uwierzytelniania staje się łatwiejsza. Strimzi automatyzuje wiele procesów, zapewniając odpowiednie wpisy w plikach zasobów niestandardowych (CR), a także dodatkowe CRD (definicje zasobów niestandardowych), które upraszczają zarządzanie zabezpieczeniami.
Zacznijmy od konfiguracji uwierzytelniania dla brokera – plik definiujący klaster Kafka zawiera następujące opcje:
apiVersion: kafka.strimzi.io/v1beta2
uprzejmy: Kafka
METADANE:
Nazwa: {{ include “broker.fullname” . }}
Etykiety:
{{- include “broker.labels” . | nindent 4 }}
Spec:
Kafka:
…
Słuchacze: (1)
– Nazwa: TLS (2)
port: 9093
Rodzaj: Wewnętrzny
TLS: prawda
uwierzytelnianie:
Typ: TLS
– Nazwa: zewnętrzna (3)
port: 9094
Typ: Trasa
TLS: prawda
uwierzytelnianie:
Typ: TLS
…
- (1) Definicja słuchaczy.
- (2) Odbiornik wewnętrzny (dla klastra OpenShift) działający na porcie 9093 z włączonym protokołem TLS i uwierzytelnianiem opartym na certyfikatach.
- (3) Port zewnętrzny 9094 (uwidoczniony za pośrednictwem trasy OpenShift) z włączonym protokołem TLS, ale bez uwierzytelniania opartego na certyfikatach.
Po załadowaniu tej konfiguracji operator Strimzi automatycznie wygeneruje odpowiednie wpisy tajne OpenShift, które będą służyły do zarządzania uwierzytelnianiem i szyfrowaniem komunikacji:
Jeśli teraz spojrzymy na wcześniej przygotowany i wdrożony klaster Kafka Connect, zauważymy, że nie będzie on w stanie połączyć się z brokerem Kafka. Aby przywrócić komunikację między tymi komponentami, konieczne jest dostosowanie konfiguracji Kafka Connect z uwzględnieniem aspektów bezpieczeństwa.
W tym celu należy wprowadzić odpowiednie zmiany w pliku zasobu niestandardowego (CR) dla platformy KafkaConnect:
apiVersion: kafka.strimzi.io/v1beta2
uprzejmy: KafkaConnect
METADANE:
Nazwa: Connect-Cluster-{{ include “broker.fullname” . }}
Spec:
regał:
…
bootstrapServers:{{include “broker.fullname” .}}-kafka-bootstrap:9093
Uwierzytelnianie: (1)
Typ: TLS
certyfikatAndKey:
secretName: kafka-connect-tls-user
Certyfikat: User.crt
Legenda: user.key
TLS: (2)
trustedCertyfikaty:
– secretName: {{ include “broker.fullname” . }}-cluster-ca-cert
Certyfikat: CA.CRT
- (1) W tym miejscu definiujemy, w jaki sposób platforma Kafka Connect będzie łączyć się i uwierzytelniać za pomocą brokera platformy Kafka. Określamy użycie protokołu TLS i wskazujemy, że Kafka Connect powinna “prezentować się” za pomocą certyfikatu.
- (2) W tym miejscu skonfigurujemy sposób, w jaki platforma Kafka Connect będzie “ufać” certyfikatowi, którego broker platformy Kafka używa do uwierzytelniania podczas nawiązywania połączenia. Odwołuje się do wpisu tajnego OpenShift, który został wygenerowany przez operatora Strimzi w poprzednim kroku.
Pozostaje jednak kluczowe pytanie: skąd platforma Kafka Connect pobiera certyfikat w sekcji (1)? W końcu nigdzie nie zdefiniowaliśmy wpisu tajnego kafka-connect-tls-user.
Odpowiedź jest prosta — ten certyfikat reprezentuje użytkownika lub aplikację, która łączy się z brokerem platformy Kafka. Strimzi udostępnia dedykowany CRD o nazwie KafkaUser, który umożliwia zdefiniowanie takiego użytkownika:
apiVersion: kafka.strimzi.io/v1beta2
uprzejmy: KafkaUser
METADANE:
Nazwa: kafka-connect-tls-user
Etykiety:
strimzi.io/cluster: {{ include “broker.fullname” . }}
Spec:
Uwierzytelnianie: (1)
Typ: TLS
Autoryzacja:(2)
Typ: prosty
Listy acl:
– Zasób: (3)
Typ: temat (3.1)
Nazwa: event-data (3.2)
patternType: literał (3,3)
Operacje: (3.4)
-Tworzyć
-Opisać
– Opisz konfiguracje
-Czytać
-Pisać
Gospodarz: “*” (3,5)
# Grupa konsumencka Łącznik zlewu AWS S3
-zasób:
Typ: Grupa (4)
Nazwa: connect-sinkconnector
patternType: literał
Operacji:
-Tworzyć
-Opisać
-Czytać
-Pisać
host: “*”
Rzućmy okiem na zawartość tego pliku:
- (1) Definiujemy uwierzytelnianie TLS dla utworzonego użytkownika.
- (2) Określamy uprawnienia użytkownika – jest to lista elementów definiujących dostęp do zasobów.
- (3) Definiujemy pierwszy zasób typu temat:
- (3.1) Określamy typ zasobu jako temat.
- (3.2) Ustalamy jego nazwę.
- (3.3) Wskazujemy, że nazwę należy traktować dosłownie – możliwe jest również użycie wartości prefiksu, która pozwala na przypisanie uprawnień do wszystkich zasobów, których nazwa zaczyna się od określonego ciągu znaków.
- (3.4) Wymieniamy operacje, które użytkownik może wykonywać.
- (3.5) Określamy adres/host, z którego użytkownik może się połączyć.
- (4) Definiujemy inny typ zasobu – grupę odbiorców o określonej nazwie i liście dozwolonych operacji dla tej grupy.
Po załadowaniu tej definicji operator Strimzi automatycznie wygeneruje odpowiedni wpis tajny, którego Kafka Connect użyje do uwierzytelnienia u brokera, a który zostanie rozpoznany przez brokera:
Jeśli aplikacja (KafkaUser) korzystająca z takiego certyfikatu podejmie próbę wykonania nieautoryzowanej operacji na brokerze (tj. takiej, której nie uwzględniliśmy w definicji KafkaUser), broker automatycznie ją zablokuje.
W logach zobaczymy wtedy komunikat o błędzie:
2024-10-02 13:42:21,932 INFO Principal = User:CN=kafka-connect-tls-user is Denied Operation = Opisz z hosta = 10.253.23.147 w zasobie = Cluster:LITERAL:kafka-cluster dla żądania = ListPartitionReassignments with resourceRefCount = 1
Powyższy błąd wskazuje, że użytkownik kafka-connect-tls-user nie ma uprawnień do pobierania informacji o klastrze. Jest to oczekiwane zachowanie, ponieważ nie udzieliliśmy tego dostępu w konfiguracji.
Aby rozwiązać ten błąd, musimy dodać odpowiednie uprawnienie do pliku konfiguracyjnego:
-zasób:
Typ: Klaster
Operacji:
-Opisać
host: “*”
W podanym przykładzie zostanie wyświetlony nowy typ zasobu reprezentujący klaster platformy Kafka.
Jak widać, Apache Kafka oferuje rozbudowane możliwości konfiguracji zabezpieczeń — od uwierzytelniania i szyfrowania po autoryzację. Jeśli korzystamy ze Strimzi, konfiguracja tych mechanizmów staje się znacznie łatwiejsza, ponieważ wiele elementów jest generowanych automatycznie przez operatora Strimzi.
Tym samym potwierdziliśmy, że pod względem bezpieczeństwa Apache Kafka jest rozwiązaniem klasy enterprise, które spełnia wymagania nawet najbardziej rygorystycznych działów bezpieczeństwa.
Przejdźmy teraz do analizy wysokiej dostępności platformy Kafka i metod zapewnienia jej niezawodności.
Wysoka dostępność i odzyskiwanie po awarii
Systemy komunikacji sterowane zdarzeniami są odpowiedzialne za przesyłanie komunikatów między wieloma systemami i usługami. Im więcej komponentów opiera się na takiej komunikacji, tym bardziej krytyczna staje się rola brokera w całym przepływie informacji.
Ale co się stanie, jeśli broker upadnie? Czy może stać się pojedynczym punktem awarii (SPOF)?
Odpowiedź brzmi: tak — jeśli system jest źle wdrożony i skonfigurowany, może stać się SPOF.
Jak Apache Kafka radzi sobie z tym wyzwaniem?
Podstawowym mechanizmem zapewniającym wysoką dostępność na platformie Kafka jest replikacja.
Replikacja oznacza, że dane partycji z tematu są przechowywane w wielu węzłach w klastrze. Jeśli jeden z węzłów ulegnie awarii, jego rola może zostać przejęta przez inne węzły, ponieważ dane są replikowane.
Poniższy diagram ilustruje ten proces:
Klaster Kafka składa się z trzech brokerów. Na poniższym diagramie liderzy partycji są oznaczeni kolorem niebieskim.
Na przykład broker 1 służy jako lider partycji 0 w temacie 1 i jest odpowiedzialny za replikowanie swoich danych do innych węzłów w klastrze:
- Broker 2 jest pierwszą repliką partycji 0 w temacie 1 (oznaczonej kolorem pomarańczowym).
- Broker 3 jest drugą repliką partycji 0 w temacie 1 (oznaczoną na zielono).
Broker 2 i Broker 3 są nazywane replikami w synchronizacji (ISR), co oznacza, że pozostają zsynchronizowane z liderem.
Jeśli broker 1 zakończy się niepowodzeniem, jedna z replik synchronizacji zostanie automatycznie wybrana jako nowy lider dla partycji 0 tematu 1, dzięki czemu klienci będą mogli nadal korzystać z tematu bez zakłóceń. Dzięki temu mechanizmowi Kafka zapewnia odporność na awarie.
Dodatkowo, jeśli Kafka jest wdrożona na klastrze OpenShift, który jest rozproszony pomiędzy dwoma centrami danych (głównym – Data Center i backupowym – Disaster Recovery Center), możliwe jest wymuszenie odpowiedniego rozmieszczenia brokerów w celu zwiększenia dostępności systemu. Węzły są również umieszczane w centrum zapasowym, co poprawia dostępność naszej instalacji.
W tym celu warto skorzystać z konfiguracji affinity, która pozwala na kontrolę nad tym, gdzie wdrażane są konkretne węzły klastra.
powinowactwo:
nodeAffinity:
wymaganeDuringSchedulingIgnoredDuringExecution:
…
Kolejnym kluczowym elementem w zapewnieniu niezawodności jest możliwość przywrócenia systemu po awarii i tworzenia kopii zapasowych. W tym zakresie dostępnych jest kilka metod:
Opcja 1:
Niektórzy użytkownicy korzystają z narzędzia Kafka MirrorMaker, które umożliwia replikację danych między klastrami Kafka. W tym modelu istnieje klaster podstawowy i klaster zapasowy znajdujące się w innej lokalizacji. MirrorMaker służy do synchronizacji danych między tymi klastrami. W przypadku awarii klastra podstawowego ruch zostanie przekierowany do klastra kopii zapasowych.
Opcja 2:
W przypadku instalacji w klastrze OpenShift, w którym dyski klastrowe platformy Kafka są dostarczane przez usługę OpenShift, istnieje opcja tworzenia ich kopii zapasowej (trwałe oświadczenie woluminu) przy użyciu migawek. Jednym z narzędzi umożliwiających to jest Commvault.
W jednym z projektów firmy Inteca sprawdziliśmy, czy możliwe jest tworzenie kopii zapasowych płytek PVC klastra Kafka bez wyłączania jego węzłów. Sam proces tworzenia kopii zapasowej zakończył się sukcesem, ale odzyskiwanie danych okazało się problematyczne i nie zawsze się udało. Głównym wyzwaniem była spójność danych po przywróceniu.
Aby efektywnie korzystać z takich narzędzi do tworzenia kopii zapasowych, zaleca się zamknięcie klastra Kafka podczas procesu tworzenia migawki. Chociaż powoduje to krótkotrwały przestój systemu, może znacznie zwiększyć niezawodność procesu przywracania danych.
Opcja 3:
Powszechnie używaną metodą tworzenia kopii zapasowej platformy Apache Kafka jest wysyłanie danych do S3. W tym podejściu łącznik ujścia S3 jest używany w programie Kafka Connect, który pobiera dane z tematów i wysyła je do S3 (łącznik źródłowy S3 jest wyłączony):
Po przywróceniu danych z kopii zapasowej łączniki powracają do swoich oryginalnych ustawień. Należy zauważyć, że zmiany stanu łączników w trakcie i po przywróceniu kopii zapasowej są operacjami ręcznymi — zadaniami administracyjnymi.
Opisane powyżej podejście jest powszechnie stosowanym rozwiązaniem do tworzenia kopii zapasowych dla platformy Apache Kafka. Należy jednak zwrócić uwagę na jeden kluczowy aspekt, a mianowicie na właściwe przywrócenie danych i odzyskanie kompensacji dla grup konsumentów.
Po przywróceniu danych z kopii zapasowej łączniki powracają do swoich oryginalnych ustawień. Jest to popularne podejście do procesu tworzenia kopii zapasowych i odzyskiwania dla platformy Apache Kafka. Należy jednak wziąć pod uwagę jeden kluczowy aspekt: właściwe przywrócenie danych i odzyskanie kompensacji grup odbiorców:
- W celu przywrócenia danych należy upewnić się, że dane są przypisane do odpowiednich partycji we właściwej kolejności. Jeśli to się nie powiedzie, grupy konsumentów mogą rozpocząć ponowne przetwarzanie już przetworzonych danych lub, w niektórych przypadkach, całkowicie zrezygnować z przetwarzania niektórych danych.
- Drugim kluczowym aspektem jest zachowanie informacji o tym, gdzie dana grupa odbiorców zakończyła przetwarzanie, tj. utrzymanie informacji o przesunięciu dla każdej partycji tematu.
Zilustrujemy to na przykładzie:
Konsument odczytuje dane z tematu 1. Z partycji 0 pobrał i przetworzył wiadomość 0, tak jak ma to miejsce w przypadku partycji 1. Po przetworzeniu konsument informuje brokera, że te komunikaty zostały przetworzone, a informacje te są przechowywane w specjalnym wewnętrznym temacie platformy Kafka o nazwie __consumer_offsets.
Komunikat 1 z partycji 0 nie został potwierdzony, co oznacza, że przesunięcie grupy dla tej partycji jest mniejsze niż przesunięcie tego komunikatu.
Jeśli po awarii odczytamy dane z kopii zapasowej, a w trakcie procesu przywracania komunikaty zamienią partycje lub zmienią kolejność, przechowywana informacja o tym, gdzie każdy konsument zakończył przetwarzanie, stanie się bezużyteczna. Co więcej, używanie tych nieprawidłowych danych może prowadzić do błędów.
Jeśli na przykład po odzyskaniu partycja 1 zawiera zarówno fioletowy, jak i pomarańczowy komunikat, podczas gdy partycja 0 zawiera tylko zielony komunikat, mogą wystąpić następujące problemy:
- Zielony komunikat nigdy nie zostanie przetworzony, ponieważ przesunięcie dla tej partycji wskazuje, że komunikat z przesunięciem 0 został już przetworzony — nawet jeśli w rzeczywistości nie został.
- Pomarańczowy komunikat, który został nieprawidłowo umieszczony w partycji 1 zamiast w partycji 0, może mieć przesunięcie 1. W kontekście przechowywanego offsetu dla tej partycji oznacza to, że musi ona zostać przetworzona, nawet jeśli została już przetworzona wcześniej.
Dlatego bardzo ważne jest, aby podczas przywracania zostało zachowane zarówno prawidłowe przypisanie partycji, jak i kolejność komunikatów. Dodatkowo musi zostać poprawnie odtworzony stan przetwarzania każdej grupy odbiorców, czyli ich offsety.
Jakie konektory można wykorzystać do realizacji takiej kopii zapasowej?
Confluent udostępnia dedykowane łączniki, takie jak Confluent S3 Source Connector. Należy jednak pamiętać, że wymagają one licencji.
Alternatywą są łączniki typu open source, takie jak Apache Camel S3 Sink i Apache Camel S3 Source, ale mają one pewne ograniczenia implementacji:
- Nie zapisują one informacji o partycji, co oznacza, że oryginalna partycja wiadomości jest tracona.
- Każda wiadomość jest przechowywana jako oddzielny obiekt w S3, co zwiększa koszty i złożoność magazynowania.
Ze względu na te ograniczenia używanie ich w środowisku produkcyjnym wymaga dostosowania, takiego jak zaimplementowanie dedykowanego transformatora komunikatów dla łącznika ujścia (przekształcenia pojedynczego komunikatu). Ten transformator propagowałby partycję i przesunął informacje do S3. Ponadto wymagany byłby drugi transformator dla złącza źródłowego, aby użyć informacji o partycji i przesunięciu przechowywanych w S3 w celu poprawnego przywrócenia komunikatów.
Omówiliśmy teraz aspekty związane z wysoką dostępnością i kopiami zapasowymi. Przejdźmy do monitorowania naszej instalacji Kafki w środowisku produkcyjnym.
Monitorowanie operacji platformy Kafka
W przypadku każdego systemu komunikacji sterowanej zdarzeniami kluczowe znaczenie ma monitorowanie jego wydajności produkcyjnej, aby zapobiec problemom , takim jak utrata danych, przestoje w przetwarzaniu lub niewystarczająca moc przetwarzania.
Ze względu na swoją architekturę i model działania Apache Kafka wymaga monitorowania kilku kluczowych aspektów, w tym:
- Kondycja klastra,
- Wykorzystanie zasobów,
- Replikacja
- Złącza
- Wydajność przetwarzania.
Platforma Apache Kafka może uwidocznić te informacje w usłudze Prometheus (https://prometheus.io/), skąd dane można pobrać i zwizualizować przy użyciu narzędzi takich jak Grafana (https://grafana.com/).
Aby zaimplementować takie monitorowanie dla klastra Kafka wdrożonego w usłudze OpenShift, można zdefiniować element PodMonitor. Jest to jednostka z Prometheus Operator, która jest wbudowana w platformę OpenShift:
apiVersion: monitoring.coreos.com/v1
uprzejmy: PodMonitor
METADANE:
Nazwa: kafka-resources-metrics
Etykiety:
Aplikacja: Strimzi
Spec:
Selektor:
matchExpressions:
-: “strimzi.io/kind”
operator: W
wartości: [“Kafka”, “KafkaConnect”, “KafkaMirrorMaker”, “KafkaMirrorMaker2”]
namespaceSelector:
Nazwy meczów:
– {{ . Przestrzeń nazw wydania }}
podMetricsEndpoints:
– ścieżka: /metrics
port: TCP-Prometheus
Interwał: 1m
Ponowne etykietowanie:
-separator:;
wyrażenie regularne: __meta_kubernetes_pod_label_(strimzi_io_.+)
Wymiana: $1
Akcja: Labelmap
…
Po wgraniu tej definicji i skonfigurowaniu Grafana uzyskamy następujący widok:
Zawiera on takie informacje jak:
- Liczba replik,
- liczba zsynchronizowanych replik,
- Szczegółowe dane o partycjach,
- Liczba komunikatów publikowanych na sekundę,
- Liczba komunikatów przetwarzanych przez grupę odbiorców na sekundę,
- Opóźnienie, czyli różnica między bieżącym przesunięciem komunikatu w partycjach tematu a przesunięciem przetworzonym przez grupę odbiorców. Im większe opóźnienie, tym gorzej, ponieważ może to oznaczać, że grupa konsumentów nie jest w stanie nadążyć z przetwarzaniem wiadomości.
Dodatkowo alerty można definiować w oparciu o Prometheus Alert Manager lub Grafana Alerting, co pozwala na otrzymywanie powiadomień o problemach.
Innym zalecanym narzędziem w tym obszarze jest Redpanda Console. To narzędzie umożliwia weryfikację stanu klastra, łączników, grup odbiorców i tematów:
Dodatkowo narzędzie to pozwala na wykonywanie aktywnych operacji, takich jak:
- Ponowne uruchamianie łącznika,
- Usuwanie wiadomości z tematu,
- Usuwanie tematu.
Wszystkie te funkcjonalności są przydatne w codziennej konserwacji i operacjach administracyjnych platformy Kafka.
Wyposażeni w narzędzia do monitorowania, możemy teraz przejść do ostatniego tematu: wysokiej wydajności przetwarzania.
Wysoka wydajność przetwarzania
Systemy komunikacji sterowane zdarzeniami muszą być zdolne do wysokowydajnego przetwarzania danych. Systemy te integrują i przesyłają wiadomości między różnymi innymi systemami, a powolne przetwarzanie wiadomości może prowadzić do poważnych problemów, zwłaszcza w rozwiązaniach działających w czasie rzeczywistym lub prawie w czasie rzeczywistym.
Jak wspomniano w sekcji architektury, ważnym czynnikiem zapewniającym wydajność przetwarzania jest wybór odpowiedniej liczby partycji dla tematów. Liczba partycji określa maksymalną liczbę wątków, które mogą przetwarzać dane z tematu równolegle. Nie jest to jednak jedyny parametr, który należy wziąć pod uwagę.
Kafka obsługuje ogromne ilości danych, które są stale przesyłane między klientami a brokerami, a także między brokerami podczas replikacji. Dlatego ważne jest, aby obciążenie sieci nie powodowało problemów z wydajnością. Narzut komunikacji sieciowej ma duży wpływ na ogólną wydajność przetwarzania. Platforma Apache Kafka udostępnia kilka parametrów do zarządzania tym:
- Kompresja (compression.type) – umożliwia włączenie kompresji po stronie producenta, np. konektora lub aplikacji strumieniowej. Kompresja może znacznie zmniejszyć rozmiar przesyłanych danych. Platforma Kafka obsługuje następujące typy kompresji: none, gzip, snappy, lz4, zstd.
- Domyślnie kompresja jest ustawiona na “none” (brak kompresji). Nie jest to zalecane w przypadku wdrożeń produkcyjnych, chyba że istnieje uzasadniony powód takiej konfiguracji.
- Kompresja jest włączona po stronie producenta platformy Kafka. Użytkownicy nie wymagają specjalnej konfiguracji, a dekompresja jest obsługiwana automatycznie przez biblioteki klienckie platformy Kafka.
- Włączenie kompresji zwiększa obciążenie związane z przetwarzaniem zarówno dla producenta, jak i konsumenta, ale mimo to znacznie zwiększa przepustowość producenta.
- Batch size (batch.size) – kolejny parametr po stronie producenta, który definiuje rozmiar partii wiadomości wysyłanych przez producenta. Wartość domyślna to 16 KB.
- Oznacza to, że jeśli wiadomości przekroczą 16 KB, każda wiadomość zostanie wysłana osobno, co zwiększy obciążenie sieci i negatywnie wpłynie na przepływność.
- Lepiej jest wysyłać wiadomości rzadziej w większych partiach – dlatego zaleca się dostosowanie tego parametru do rozmiaru wiadomości używanego w systemie.
- Linger time (linger.ms) – kolejny parametr po stronie producenta, który jest ściśle powiązany z batch.size.
- Ten parametr określa, jak długo producent czeka przed wysłaniem partii.
- Jeśli partia zostanie wypełniona wiadomościami przed upływem tego czasu, zostanie wysłana natychmiast.
- Jeśli partia nie jest pełna, ale upływa czas oczekiwania, jest ona wysyłana mimo to.
- Wartość domyślna to 0 ms, co oznacza, że wiadomości są wysyłane natychmiast, co może zwiększyć obciążenie sieci i zmniejszyć wydajność przepustowości.
Poniżej przykład z jednego z wdrożonych systemów:
Jak widać na załączonych wykresach, system wykorzystuje trzy aplikacje strumieniowe.
- Jedna z tych aplikacji (żółty wykres) przetwarza około 1 250 komunikatów na sekundę.
- Zielona aplikacja przetwarza 750 komunikatów na sekundę.
- Niebieska aplikacja przetwarza najmniej — poniżej 500 komunikatów na sekundę.
W efekcie niebieska aplikacja ma największe opóźnienie (widoczne na wykresie po prawej stronie), które stale rośnie. Może to prowadzić do opóźnień w dostarczaniu danych do systemów docelowych.
Po zmodyfikowaniu parametrów w następujący sposób:
- compression.type=lz4 dla wszystkich producentów,
- batch.size=128KB dla wszystkich producentów,
- linger.ms=100ms dla wszystkich producentów,
Uzyskaliśmy następujące wyniki:
Na wykresie przedstawiającym liczbę komunikatów zużywanych na sekundę (po lewej stronie) wykresy wszystkich aplikacji nakładają się na siebie, co wskazuje, że te aplikacje przetwarzają teraz prawie taką samą liczbę komunikatów na sekundę.
Na wykresie przedstawiającym lagi poszczególnych konsumentów (prawa strona) widać, że żadne z opóźnień nie wzrasta – pozostają na niskim i stabilnym poziomie, co jest pożądanym zachowaniem.
Wynik jest niezwykły, ponieważ między pierwszym a drugim testem nie zmieniono żadnych innych parametrów, nie wprowadzono żadnych modyfikacji w aplikacji, a test został przeprowadzony na tych samych serwerach. Wpływ tylko tych trzech parametrów na przepustowość jest znaczący.
Przedstawione powyżej wartości parametrów nie są wzorcem odniesienia ani jedynym słusznym rozwiązaniem. W każdym przypadku wartości tych parametrów mogą się różnić i powinny być one dobrane w sposób przemyślany, w oparciu o testy wydajnościowe i specyficzne wymagania rozwiązania.
Dodatkowo, zanim dostosujemy te parametry, powinniśmy najpierw zadbać o wydajność naszych aplikacji. W kontekście Kafki aplikacje – zwłaszcza aplikacje strumieniowe – powinny przetwarzać komunikaty bardzo szybko, w zakresie milisekund lub kilkudziesięciu milisekund.
Dłuższe czasy przetwarzania mogą spowodować, że nawet dopracowanie powyższych parametrów nie przyniesie żadnej poprawy.
Podsumowanie
Apache Kafka to rozwiązanie, które z powodzeniem może być wykorzystywane do zapewnienia wydajnej i bezpiecznej komunikacji sterowanej zdarzeniami. W artykule starałem się poruszyć różne obszary funkcjonalne, które są fundamentem działania takich systemów i muszą być brane pod uwagę podczas ich wdrażania. Mam nadzieję, że zebrane tu informacje pozwolą Ci efektywnie korzystać z Kafki. Celem artykułu było podzielenie się prawdziwymi doświadczeniami projektowymi, a fakt, że Kafka doskonale radzi sobie w wielu aspektach, jest wyłącznie jego zasługą, a nie formą lokowania 😊 produktu.
Dowiedz się więcej o usłudze zarządzanej Apache Kafka firmy Inteca