Šī Apache Kafka ievaddaļas JavaWorld pirmajā pusē jūs izstrādājāt pāris maza mēroga ražotāju / patērētāju lietojumprogrammas, izmantojot Kafka. No šiem vingrinājumiem jums jāpārzina Apache Kafka ziņojumapmaiņas sistēmas pamati. Šajā otrajā pusē jūs uzzināsiet, kā izmantot nodalījumus, lai sadalītu slodzi un horizontāli mērogotu lietojumprogrammu, apstrādājot līdz pat miljoniem ziņojumu dienā. Jūs arī uzzināsiet, kā Kafka izmanto ziņojumu nobīdes, lai izsekotu un pārvaldītu sarežģītu ziņojumu apstrādi un kā aizsargāt savu Apache Kafka ziņojumapmaiņas sistēmu pret kļūmēm, ja patērētājs kritīs. Mēs izstrādāsim lietojumprogrammas piemēru no 1. daļas gan publicēšanas, abonēšanas, gan punkta no punkta lietošanas gadījumiem.
Starpsienas Apache Kafka
Kafkas tēmas var iedalīt starpsienās. Piemēram, izveidojot tēmu ar nosaukumu Demo, jūs varat to konfigurēt, lai tajā būtu trīs nodalījumi. Serveris izveidos trīs žurnālfailus, pa vienam katram demonstrācijas nodalījumam. Kad producents publicēja tēmai ziņojumu, tas šim ziņojumam piešķīra nodalījuma ID. Tad serveris pievienos ziņojumu žurnāla failam tikai šim nodalījumam.
Ja pēc tam esat izveidojis divus patērētājus, serveris var piešķirt 1. un 2. nodalījumu pirmajam patērētājam un 3. nodalījumu otrajam patērētājam. Katrs patērētājs lasīs tikai no tam piešķirtajiem nodalījumiem. Demonstrācijas tēmu, kas konfigurēta trim nodalījumiem, var redzēt 1. attēlā.
Lai paplašinātu scenāriju, iedomājieties Kafka kopu ar diviem brokeriem, kas izvietoti divās mašīnās. Sadalot demonstrācijas tēmu, jūs to konfigurējat ar diviem nodalījumiem un divām kopijām. Šāda veida konfigurācijai Kafka serveris piešķirs abus nodalījumus abiem jūsu klastera starpniekiem. Katrs brokeris būtu līderis vienā no starpsienām.
Kad producents publicēja ziņojumu, tas nonāca nodalījuma vadītājam. Vadītājs paņēma ziņojumu un pievienoja to lokālās mašīnas žurnāla failam. Otrais brokeris pasīvi atkārtotu šo saistību žurnālu savā mašīnā. Ja nodalījuma vadītājs samazināsies, otrais starpnieks kļūtu par jauno vadītāju un sāktu apkalpot klientu pieprasījumus. Tādā pašā veidā, kad patērētājs nosūtīja pieprasījumu nodalījumam, šis pieprasījums vispirms nonāks nodalījuma vadītājam, kurš atgriezīs pieprasītos ziņojumus.
Sadalīšanas priekšrocības
Apsveriet priekšrocības, kas saistītas ar Kafka bāzes ziņojumapmaiņas sistēmas sadalīšanu:
- Mērogojamība: Sistēmā, kurā ir tikai viens nodalījums, tēmā publicētie ziņojumi tiek glabāti žurnāla failā, kas eksistē vienā mašīnā. Tēmas ziņojumu skaitam jāiekļaujas vienā saistību žurnāla failā, un saglabāto ziņojumu lielums nekad nevar būt lielāks par šīs mašīnas diska vietu. Tēmas sadalīšana ļauj mērogot jūsu sistēmu, glabājot ziņojumus dažādās klastera mašīnās. Piemēram, ja vēlaties demonstrācijas tēmai saglabāt 30 gigabaitus (GB) ziņojumu, jūs varētu izveidot Kafka kopu no trim mašīnām, katrā no tām ar 10 GB diska vietas. Tad jūs konfigurētu tēmu ar trim nodalījumiem.
- Servera un slodzes līdzsvarošana: Ja jums ir vairāki nodalījumi, jūs varat izplatīt ziņojumu pieprasījumus starpniekiem. Piemēram, ja jums ir tēma, kas apstrādā 1 miljonu ziņojumu sekundē, varat to sadalīt 100 nodalījumos un pievienot savam klasterim 100 brokerus. Katrs brokeris būtu viena nodalījuma vadītājs, kas atbildētu tikai uz 10 000 klientu pieprasījumiem sekundē.
- Patērētāja un slodzes līdzsvarošana: Līdzīgi kā servera un slodzes līdzsvarošanai, vairāku patērētāju mitināšana dažādās mašīnās ļauj izplatīt patērētāja slodzi. Pieņemsim, ka jūs vēlaties patērēt 1 miljonu ziņojumu sekundē no tēmas ar 100 nodalījumiem. Jūs varētu izveidot 100 patērētājus un palaist tos paralēli. Kafka serveris katram patērētājam piešķirs vienu nodalījumu, un katrs patērētājs paralēli apstrādās 10 000 ziņojumu. Tā kā Kafka katru nodalījumu piešķir tikai vienam patērētājam, nodalījumā katrs ziņojums tiktu patērēts secībā.
Divi nodalīšanas veidi
Ražotājs ir atbildīgs par izlemšanu, uz kuru nodalījumu tiks nosūtīts ziņojums. Ražotājam ir divas iespējas kontrolēt šo uzdevumu:
- Pielāgots sadalītājs: Jūs varat izveidot klasi, kurā ieviests
org.apache.kafka.clients.producer.Partitioner
interfeiss. Šī paražaSadalītājs
ieviesīs biznesa loģiku, lai izlemtu, kur tiek sūtīti ziņojumi. - DefaultPartitioner: Ja neveidojat pielāgotu sadalītāju klasi, pēc noklusējuma
org.apache.kafka.clients.producer.internals.DefaultPartitioner
klase tiks izmantota. Noklusējuma nodalītājs vairumam gadījumu ir pietiekami labs, nodrošinot trīs iespējas:- Manuāli: Kad izveidojat
ProducerRecord
, izmantojiet pārslogotu konstruktorujauns ProducerRecord (topicName, partitionId, messageKey, message)
lai norādītu nodalījuma ID. - Jaukšana (jutīga pret vietni): Kad izveidojat
ProducerRecord
, norādiet amessageKey
, piezvanotjauns ProducerRecord (topicName, messageKey, message)
.DefaultPartitioner
izmantos atslēgas jaukšanu, lai nodrošinātu, ka visi ziņojumi par to pašu atslēgu tiek nosūtīti vienam ražotājam. Šī ir vienkāršākā un visizplatītākā pieeja. - Izsmidzināšana (nejauša slodzes līdzsvarošana): Ja nevēlaties kontrolēt, uz kuru nodalījuma ziņojumu tiek nosūtīts, vienkārši piezvaniet
jauns ProducerRecord (topicName, ziņojums)
lai izveidotu savuProducerRecord
. Šajā gadījumā nodalītājs sūtīs ziņojumus visiem nodalījumiem pēc kārtas, nodrošinot līdzsvarotu servera slodzi.
- Manuāli: Kad izveidojat
Apache Kafka lietojumprogrammas nodalīšana
Vienkāršajam ražotāja / patērētāja piemēram 1. daļā mēs izmantojām a DefaultPartitioner
. Tagad mēs mēģināsim tā vietā izveidot pielāgotu sadalītāju. Pieņemsim, ka šajā piemērā mums ir mazumtirdzniecības vietne, kuru patērētāji var izmantot, lai pasūtītu produktus jebkur pasaulē. Pamatojoties uz lietojumu, mēs zinām, ka lielākā daļa patērētāju ir vai nu Amerikas Savienotajās Valstīs, vai Indijā. Mēs vēlamies sadalīt mūsu lietojumprogrammu, lai nosūtītu pasūtījumus no ASV vai Indijas saviem attiecīgajiem patērētājiem, savukārt pasūtījumi no jebkuras citas vietas tiks nodoti trešajam patērētājam.
Lai sāktu, mēs izveidosim CountryPartitioner
kas īsteno org.apache.kafka.clients.producer.Partitioner
interfeiss. Mums jāievieš šādas metodes:
- Kafka piezvanīs konfigurēt () kad mēs inicializēsim
Sadalītājs
klase, ar aKarte
no konfigurācijas īpašībām. Šī metode inicializē funkcijas, kas raksturīgas lietojumprogrammas biznesa loģikai, piemēram, savienojuma izveidi ar datu bāzi. Šajā gadījumā mēs vēlamies diezgan vispārīgu sadalītāju, kas ņemcountryName
kā īpašums. Pēc tam mēs varam izmantotconfigProperties.put ("partition.0", "USA")
lai kartētu ziņojumu plūsmu starpsienās. Nākotnē mēs varam izmantot šo formātu, lai mainītu, kuras valstis saņem savu nodalījumu. - The
Producents
API izsaukumi nodalījums () reizi ziņojumā. Šajā gadījumā mēs to izmantosim, lai lasītu ziņojumu un parsētu valsts nosaukumu no ziņojuma. Ja valsts nosaukums ircountryToPartitionMap
, tas atgriezīsiespartitionId
glabājasKarte
. Ja nē, tas jauc valsts vērtību un izmantos to, lai aprēķinātu, kurā nodalījumā tai vajadzētu iet. - Mēs saucam aizvērt () lai izslēgtu sadalītāju. Šīs metodes izmantošana nodrošina, ka visi inicializācijas laikā iegūtie resursi tiek iztīrīti izslēgšanas laikā.
Ņemiet vērā, ka, kad Kafka zvana konfigurēt ()
, Kafka ražotājs nodos visas īpašības, kuras mēs esam konfigurējuši ražotājam Sadalītājs
klasē. Ir svarīgi, lai mēs lasītu tikai tās īpašības, kas sākas starpsienas.
, parsējiet tos, lai iegūtu partitionId
un saglabājiet ID mapē countryToPartitionMap
.
Zemāk ir mūsu pielāgotā Sadalītājs
interfeiss.
Sarakstā 1. CountryPartitioner
publiskā klase CountryPartitioner ievieš Partitioner {private static Map countryToPartitionMap; public void configure (Map configs) {System.out.println ("Inside CountryPartitioner.configure" + configs); countryToPartitionMap = jauns HashMap (); par (Map.Entry ieraksts: configs.entrySet ()) {if (entry.getKey (). startsWith ("nodalījumi.")) {String keyName = ieraksts.getKey (); Virknes vērtība = (virkne) entry.getValue (); System.out.println (keyName.substring (11)); int paritionId = Integer.parseInt (keyName.substring (11)); countryToPartitionMap.put (vērtība, paritionId); }}} publiskais int nodalījums (virknes tēma, objekta atslēga, baits [] keyBytes, objekta vērtība, baits [] valueBytes, klastera kopa) {List partitions = cluster.availablePartitionsForTopic (topic); String valueStr = (virknes) vērtība; String countryName = ((Virknes) vērtība) .split (":") [0]; if (countryToPartitionMap.containsKey (countryName)) {// Ja valsts ir kartēta uz konkrētu nodalījumu, atgrieziet to return countryToPartitionMap.get (countryName); } else {// Ja nevienai valstij nav kartēts konkrēts nodalījums, sadaliet starp atlikušajiem nodalījumiem int noOfPartitions = cluster.topics (). size (); return value.hashCode ()% noOfPartitions + countryToPartitionMap.size (); }} public void close () {}}
The Producents
klase 2. sarakstā (zemāk) ir ļoti līdzīga mūsu vienkāršajam ražotājam no 1. daļas, ar divām izmaiņām, kas atzīmētas treknrakstā:
- Mēs iestatījām config rekvizītu ar atslēgu, kas vienāda ar vērtību
ProducerConfig.PARTITIONER_CLASS_CONFIG
, kas atbilst pilnībā kvalificētajam mūsu vārdamCountryPartitioner
klasē. Mēs arī iestatījāmcountryName
uzpartitionId
, tādējādi kartējot īpašības, kurām mēs vēlamies nodotCountryPartitioner
. - Mēs nokārtojam klases eksemplāru, kas ievieš
org.apache.kafka.clients.producer.Callback
interfeiss kā otrais argumentsproducents.sūtīt ()
metodi. Kafka klients piezvanīs savamonCompletion ()
metodi, kad ziņojums ir veiksmīgi publicēts, pievienojot aRecordMetadata
objekts. Mēs varēsim izmantot šo objektu, lai uzzinātu, kuram nodalījumam tika nosūtīts ziņojums, kā arī publicētajam ziņojumam piešķirto nobīdi.
Saraksts 2. Sadalīts ražotājs
publiskā klase Producer {private static Scanner in; public static void main (String [] argv) izmet izņēmumu {if (argv.length! = 1) {System.err.println ("Lūdzu, norādiet 1 parametrus"); System.exit (-1); } Virknes tēmaNosaukums = argv [0]; in = jauns skeneris (System.in); System.out.println ("Ievadiet ziņojumu (ierakstiet exit, lai beigtu)"); // Konfigurēt ražotāja rekvizītus configProperties = new Properties (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configProperties.put (ProducerConfig.PARTITIONER_CLASS_CONFIG, CountryPartitioner.class.getCanonicalName ()); configProperties.put ("partition.1", "USA"); configProperties.put ("partition.2", "India"); org.apache.kafka.clients.producer.Producer producer = jauns KafkaProducer (configProperties); Virknes līnija = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = jauns ProducerRecord (topicName, null, line); producer.send (rec, jauns atzvans () {public void onCompletion (RecordMetadata metadati, izņēmuma izņēmums) {System.out.println ("Ziņojums nosūtīts tēmai ->" + metadata.topic () + ", parition->" + metadata.partition () + "saglabāts pie nobīdes->" + metadati.offset ()); ; }}); līnija = in.nextLine (); } in.close (); producents.slēgt (); }}
Starpsienu piešķiršana patērētājiem
Kafka serveris garantē, ka nodalījums tiek piešķirts tikai vienam patērētājam, tādējādi garantējot ziņojumu patēriņa kārtību. Jūs varat manuāli piešķirt nodalījumu vai arī to automātiski piešķirt.
Ja jūsu biznesa loģika prasa lielāku kontroli, tad jums būs manuāli jāpiešķir nodalījumi. Šajā gadījumā jūs izmantojat KafkaConsumer.assign ()
nodot Kakfa serverim partiju sarakstu, kas interesēja katru patērētāju.
Automātiska nodalījumu piešķiršana ir noklusējuma un visizplatītākā izvēle. Šajā gadījumā Kafka serveris katram patērētājam piešķirs nodalījumu un pārdalīs nodalījumus jaunajiem patērētājiem.
Pieņemsim, ka veidojat jaunu tēmu ar trim nodalījumiem. Uzsākot pirmo patērētāju jaunajai tēmai, Kafka piešķirs visus trīs nodalījumus vienam patērētājam. Ja pēc tam jūs izveidojat otru patērētāju, Kafka pārdalīs visus nodalījumus, piešķirot vienu nodalījumu pirmajam patērētājam un pārējos divus nodalījumus otrajam patērētājam. Ja pievienosiet trešo patērētāju, Kafka atkārtoti piešķirs nodalījumus, lai katram patērētājam tiktu piešķirts viens nodalījums. Visbeidzot, ja sākat ceturto un piekto patērētāju, tad trim patērētājiem būs piešķirts nodalījums, bet pārējie nesaņems nevienu ziņojumu. Ja kāds no trim sākotnējiem nodalījumiem iet uz leju, Kafka izmantos to pašu sadalīšanas loģiku, lai šī patērētāja nodalījumu piešķirtu vienam no papildu patērētājiem.