Programmēšana

Paredzēts reāllaikam: liela datu ziņojumapmaiņa ar Apache Kafka, 2. daļa

Šī Apache Kafka ievaddaļas JavaWorld pirmajā pusē jūs izstrādājāt pāris maza mēroga ražotāju / patērētāju lietojumprogrammas, izmantojot Kafka. No šiem vingrinājumiem jums jāpārzina Apache Kafka ziņojumapmaiņas sistēmas pamati. Šajā otrajā pusē jūs uzzināsiet, kā izmantot nodalījumus, lai sadalītu slodzi un horizontāli mērogotu lietojumprogrammu, apstrādājot līdz pat miljoniem ziņojumu dienā. Jūs arī uzzināsiet, kā Kafka izmanto ziņojumu nobīdes, lai izsekotu un pārvaldītu sarežģītu ziņojumu apstrādi un kā aizsargāt savu Apache Kafka ziņojumapmaiņas sistēmu pret kļūmēm, ja patērētājs kritīs. Mēs izstrādāsim lietojumprogrammas piemēru no 1. daļas gan publicēšanas, abonēšanas, gan punkta no punkta lietošanas gadījumiem.

Starpsienas Apache Kafka

Kafkas tēmas var iedalīt starpsienās. Piemēram, izveidojot tēmu ar nosaukumu Demo, jūs varat to konfigurēt, lai tajā būtu trīs nodalījumi. Serveris izveidos trīs žurnālfailus, pa vienam katram demonstrācijas nodalījumam. Kad producents publicēja tēmai ziņojumu, tas šim ziņojumam piešķīra nodalījuma ID. Tad serveris pievienos ziņojumu žurnāla failam tikai šim nodalījumam.

Ja pēc tam esat izveidojis divus patērētājus, serveris var piešķirt 1. un 2. nodalījumu pirmajam patērētājam un 3. nodalījumu otrajam patērētājam. Katrs patērētājs lasīs tikai no tam piešķirtajiem nodalījumiem. Demonstrācijas tēmu, kas konfigurēta trim nodalījumiem, var redzēt 1. attēlā.

Lai paplašinātu scenāriju, iedomājieties Kafka kopu ar diviem brokeriem, kas izvietoti divās mašīnās. Sadalot demonstrācijas tēmu, jūs to konfigurējat ar diviem nodalījumiem un divām kopijām. Šāda veida konfigurācijai Kafka serveris piešķirs abus nodalījumus abiem jūsu klastera starpniekiem. Katrs brokeris būtu līderis vienā no starpsienām.

Kad producents publicēja ziņojumu, tas nonāca nodalījuma vadītājam. Vadītājs paņēma ziņojumu un pievienoja to lokālās mašīnas žurnāla failam. Otrais brokeris pasīvi atkārtotu šo saistību žurnālu savā mašīnā. Ja nodalījuma vadītājs samazināsies, otrais starpnieks kļūtu par jauno vadītāju un sāktu apkalpot klientu pieprasījumus. Tādā pašā veidā, kad patērētājs nosūtīja pieprasījumu nodalījumam, šis pieprasījums vispirms nonāks nodalījuma vadītājam, kurš atgriezīs pieprasītos ziņojumus.

Sadalīšanas priekšrocības

Apsveriet priekšrocības, kas saistītas ar Kafka bāzes ziņojumapmaiņas sistēmas sadalīšanu:

  1. Mērogojamība: Sistēmā, kurā ir tikai viens nodalījums, tēmā publicētie ziņojumi tiek glabāti žurnāla failā, kas eksistē vienā mašīnā. Tēmas ziņojumu skaitam jāiekļaujas vienā saistību žurnāla failā, un saglabāto ziņojumu lielums nekad nevar būt lielāks par šīs mašīnas diska vietu. Tēmas sadalīšana ļauj mērogot jūsu sistēmu, glabājot ziņojumus dažādās klastera mašīnās. Piemēram, ja vēlaties demonstrācijas tēmai saglabāt 30 gigabaitus (GB) ziņojumu, jūs varētu izveidot Kafka kopu no trim mašīnām, katrā no tām ar 10 GB diska vietas. Tad jūs konfigurētu tēmu ar trim nodalījumiem.
  2. Servera un slodzes līdzsvarošana: Ja jums ir vairāki nodalījumi, jūs varat izplatīt ziņojumu pieprasījumus starpniekiem. Piemēram, ja jums ir tēma, kas apstrādā 1 miljonu ziņojumu sekundē, varat to sadalīt 100 nodalījumos un pievienot savam klasterim 100 brokerus. Katrs brokeris būtu viena nodalījuma vadītājs, kas atbildētu tikai uz 10 000 klientu pieprasījumiem sekundē.
  3. Patērētāja un slodzes līdzsvarošana: Līdzīgi kā servera un slodzes līdzsvarošanai, vairāku patērētāju mitināšana dažādās mašīnās ļauj izplatīt patērētāja slodzi. Pieņemsim, ka jūs vēlaties patērēt 1 miljonu ziņojumu sekundē no tēmas ar 100 nodalījumiem. Jūs varētu izveidot 100 patērētājus un palaist tos paralēli. Kafka serveris katram patērētājam piešķirs vienu nodalījumu, un katrs patērētājs paralēli apstrādās 10 000 ziņojumu. Tā kā Kafka katru nodalījumu piešķir tikai vienam patērētājam, nodalījumā katrs ziņojums tiktu patērēts secībā.

Divi nodalīšanas veidi

Ražotājs ir atbildīgs par izlemšanu, uz kuru nodalījumu tiks nosūtīts ziņojums. Ražotājam ir divas iespējas kontrolēt šo uzdevumu:

  • Pielāgots sadalītājs: Jūs varat izveidot klasi, kurā ieviests org.apache.kafka.clients.producer.Partitioner interfeiss. Šī paraža Sadalītājs ieviesīs biznesa loģiku, lai izlemtu, kur tiek sūtīti ziņojumi.
  • DefaultPartitioner: Ja neveidojat pielāgotu sadalītāju klasi, pēc noklusējuma org.apache.kafka.clients.producer.internals.DefaultPartitioner klase tiks izmantota. Noklusējuma nodalītājs vairumam gadījumu ir pietiekami labs, nodrošinot trīs iespējas:
    1. Manuāli: Kad izveidojat ProducerRecord, izmantojiet pārslogotu konstruktoru jauns ProducerRecord (topicName, partitionId, messageKey, message) lai norādītu nodalījuma ID.
    2. Jaukšana (jutīga pret vietni): Kad izveidojat ProducerRecord, norādiet a messageKey, piezvanot jauns ProducerRecord (topicName, messageKey, message). DefaultPartitioner izmantos atslēgas jaukšanu, lai nodrošinātu, ka visi ziņojumi par to pašu atslēgu tiek nosūtīti vienam ražotājam. Šī ir vienkāršākā un visizplatītākā pieeja.
    3. Izsmidzināšana (nejauša slodzes līdzsvarošana): Ja nevēlaties kontrolēt, uz kuru nodalījuma ziņojumu tiek nosūtīts, vienkārši piezvaniet jauns ProducerRecord (topicName, ziņojums) lai izveidotu savu ProducerRecord. Šajā gadījumā nodalītājs sūtīs ziņojumus visiem nodalījumiem pēc kārtas, nodrošinot līdzsvarotu servera slodzi.

Apache Kafka lietojumprogrammas nodalīšana

Vienkāršajam ražotāja / patērētāja piemēram 1. daļā mēs izmantojām a DefaultPartitioner. Tagad mēs mēģināsim tā vietā izveidot pielāgotu sadalītāju. Pieņemsim, ka šajā piemērā mums ir mazumtirdzniecības vietne, kuru patērētāji var izmantot, lai pasūtītu produktus jebkur pasaulē. Pamatojoties uz lietojumu, mēs zinām, ka lielākā daļa patērētāju ir vai nu Amerikas Savienotajās Valstīs, vai Indijā. Mēs vēlamies sadalīt mūsu lietojumprogrammu, lai nosūtītu pasūtījumus no ASV vai Indijas saviem attiecīgajiem patērētājiem, savukārt pasūtījumi no jebkuras citas vietas tiks nodoti trešajam patērētājam.

Lai sāktu, mēs izveidosim CountryPartitioner kas īsteno org.apache.kafka.clients.producer.Partitioner interfeiss. Mums jāievieš šādas metodes:

  1. Kafka piezvanīs konfigurēt () kad mēs inicializēsim Sadalītājs klase, ar a Karte no konfigurācijas īpašībām. Šī metode inicializē funkcijas, kas raksturīgas lietojumprogrammas biznesa loģikai, piemēram, savienojuma izveidi ar datu bāzi. Šajā gadījumā mēs vēlamies diezgan vispārīgu sadalītāju, kas ņem countryName kā īpašums. Pēc tam mēs varam izmantot configProperties.put ("partition.0", "USA") lai kartētu ziņojumu plūsmu starpsienās. Nākotnē mēs varam izmantot šo formātu, lai mainītu, kuras valstis saņem savu nodalījumu.
  2. The Producents API izsaukumi nodalījums () reizi ziņojumā. Šajā gadījumā mēs to izmantosim, lai lasītu ziņojumu un parsētu valsts nosaukumu no ziņojuma. Ja valsts nosaukums ir countryToPartitionMap, tas atgriezīsies partitionId glabājas Karte. Ja nē, tas jauc valsts vērtību un izmantos to, lai aprēķinātu, kurā nodalījumā tai vajadzētu iet.
  3. Mēs saucam aizvērt () lai izslēgtu sadalītāju. Šīs metodes izmantošana nodrošina, ka visi inicializācijas laikā iegūtie resursi tiek iztīrīti izslēgšanas laikā.

Ņemiet vērā, ka, kad Kafka zvana konfigurēt (), Kafka ražotājs nodos visas īpašības, kuras mēs esam konfigurējuši ražotājam Sadalītājs klasē. Ir svarīgi, lai mēs lasītu tikai tās īpašības, kas sākas starpsienas., parsējiet tos, lai iegūtu partitionIdun saglabājiet ID mapē countryToPartitionMap.

Zemāk ir mūsu pielāgotā Sadalītājs interfeiss.

Sarakstā 1. CountryPartitioner

 publiskā klase CountryPartitioner ievieš Partitioner {private static Map countryToPartitionMap; public void configure (Map configs) {System.out.println ("Inside CountryPartitioner.configure" + configs); countryToPartitionMap = jauns HashMap (); par (Map.Entry ieraksts: configs.entrySet ()) {if (entry.getKey (). startsWith ("nodalījumi.")) {String keyName = ieraksts.getKey (); Virknes vērtība = (virkne) entry.getValue (); System.out.println (keyName.substring (11)); int paritionId = Integer.parseInt (keyName.substring (11)); countryToPartitionMap.put (vērtība, paritionId); }}} publiskais int nodalījums (virknes tēma, objekta atslēga, baits [] keyBytes, objekta vērtība, baits [] valueBytes, klastera kopa) {List partitions = cluster.availablePartitionsForTopic (topic); String valueStr = (virknes) vērtība; String countryName = ((Virknes) vērtība) .split (":") [0]; if (countryToPartitionMap.containsKey (countryName)) {// Ja valsts ir kartēta uz konkrētu nodalījumu, atgrieziet to return countryToPartitionMap.get (countryName); } else {// Ja nevienai valstij nav kartēts konkrēts nodalījums, sadaliet starp atlikušajiem nodalījumiem int noOfPartitions = cluster.topics (). size (); return value.hashCode ()% noOfPartitions + countryToPartitionMap.size (); }} public void close () {}} 

The Producents klase 2. sarakstā (zemāk) ir ļoti līdzīga mūsu vienkāršajam ražotājam no 1. daļas, ar divām izmaiņām, kas atzīmētas treknrakstā:

  1. Mēs iestatījām config rekvizītu ar atslēgu, kas vienāda ar vērtību ProducerConfig.PARTITIONER_CLASS_CONFIG, kas atbilst pilnībā kvalificētajam mūsu vārdam CountryPartitioner klasē. Mēs arī iestatījām countryName uz partitionId, tādējādi kartējot īpašības, kurām mēs vēlamies nodot CountryPartitioner.
  2. Mēs nokārtojam klases eksemplāru, kas ievieš org.apache.kafka.clients.producer.Callback interfeiss kā otrais arguments producents.sūtīt () metodi. Kafka klients piezvanīs savam onCompletion () metodi, kad ziņojums ir veiksmīgi publicēts, pievienojot a RecordMetadata objekts. Mēs varēsim izmantot šo objektu, lai uzzinātu, kuram nodalījumam tika nosūtīts ziņojums, kā arī publicētajam ziņojumam piešķirto nobīdi.

Saraksts 2. Sadalīts ražotājs

 publiskā klase Producer {private static Scanner in; public static void main (String [] argv) izmet izņēmumu {if (argv.length! = 1) {System.err.println ("Lūdzu, norādiet 1 parametrus"); System.exit (-1); } Virknes tēmaNosaukums = argv [0]; in = jauns skeneris (System.in); System.out.println ("Ievadiet ziņojumu (ierakstiet exit, lai beigtu)"); // Konfigurēt ražotāja rekvizītus configProperties = new Properties (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");  configProperties.put (ProducerConfig.PARTITIONER_CLASS_CONFIG, CountryPartitioner.class.getCanonicalName ()); configProperties.put ("partition.1", "USA"); configProperties.put ("partition.2", "India");  org.apache.kafka.clients.producer.Producer producer = jauns KafkaProducer (configProperties); Virknes līnija = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = jauns ProducerRecord (topicName, null, line); producer.send (rec, jauns atzvans () {public void onCompletion (RecordMetadata metadati, izņēmuma izņēmums) {System.out.println ("Ziņojums nosūtīts tēmai ->" + metadata.topic () + ", parition->" + metadata.partition () + "saglabāts pie nobīdes->" + metadati.offset ()); ; }}); līnija = in.nextLine (); } in.close (); producents.slēgt (); }} 

Starpsienu piešķiršana patērētājiem

Kafka serveris garantē, ka nodalījums tiek piešķirts tikai vienam patērētājam, tādējādi garantējot ziņojumu patēriņa kārtību. Jūs varat manuāli piešķirt nodalījumu vai arī to automātiski piešķirt.

Ja jūsu biznesa loģika prasa lielāku kontroli, tad jums būs manuāli jāpiešķir nodalījumi. Šajā gadījumā jūs izmantojat KafkaConsumer.assign () nodot Kakfa serverim partiju sarakstu, kas interesēja katru patērētāju.

Automātiska nodalījumu piešķiršana ir noklusējuma un visizplatītākā izvēle. Šajā gadījumā Kafka serveris katram patērētājam piešķirs nodalījumu un pārdalīs nodalījumus jaunajiem patērētājiem.

Pieņemsim, ka veidojat jaunu tēmu ar trim nodalījumiem. Uzsākot pirmo patērētāju jaunajai tēmai, Kafka piešķirs visus trīs nodalījumus vienam patērētājam. Ja pēc tam jūs izveidojat otru patērētāju, Kafka pārdalīs visus nodalījumus, piešķirot vienu nodalījumu pirmajam patērētājam un pārējos divus nodalījumus otrajam patērētājam. Ja pievienosiet trešo patērētāju, Kafka atkārtoti piešķirs nodalījumus, lai katram patērētājam tiktu piešķirts viens nodalījums. Visbeidzot, ja sākat ceturto un piekto patērētāju, tad trim patērētājiem būs piešķirts nodalījums, bet pārējie nesaņems nevienu ziņojumu. Ja kāds no trim sākotnējiem nodalījumiem iet uz leju, Kafka izmantos to pašu sadalīšanas loģiku, lai šī patērētāja nodalījumu piešķirtu vienam no papildu patērētājiem.

$config[zx-auto] not found$config[zx-overlay] not found