Programmēšana

Paredzēts reāllaikam: liela datu ziņojumapmaiņa ar Apache Kafka, 1. daļa

Kad sākās lielo datu kustība, tā galvenokārt bija vērsta uz pakešu apstrādi. Izplatītie datu glabāšanas un vaicājumu rīki, piemēram, MapReduce, Hive un Pig, visi tika izstrādāti, lai datus apstrādātu vairākkārt, nevis nepārtraukti. Uzņēmumi katru vakaru vadītu vairākus darbus, lai iegūtu datus no datu bāzes, pēc tam analizētu, pārveidotu un galu galā saglabātu datus. Nesen uzņēmumi ir atklājuši iespēju analizēt un apstrādāt datus un notikumus kā tie notiek, ne tikai reizi pāris stundās. Lielākā daļa tradicionālo ziņojumapmaiņas sistēmu tomēr netiek paplašinātas, lai reāllaikā apstrādātu lielos datus. Tātad LinkedIn inženieri uzbūvēja un atklāti ieguva Apache Kafka: izplatītu ziņojumapmaiņas sistēmu, kas atbilst lielo datu prasībām, mērogojot preču aparatūru.

Dažu pēdējo gadu laikā Apache Kafka ir parādījusies, lai atrisinātu dažādus lietošanas gadījumus. Vienkāršākajā gadījumā tas varētu būt vienkāršs buferis lietojumprogrammu žurnālu glabāšanai. Apvienojumā ar tādu tehnoloģiju kā Spark Streaming, to var izmantot, lai izsekotu datu izmaiņām un rīkotos ar šiem datiem pirms to saglabāšanas galamērķī. Kafka jutīgais režīms padara to par spēcīgu līdzekli krāpšanas atklāšanai, piemēram, pārbaudot kredītkartes darījuma derīgumu, kad tas notiek, un negaidot partijas apstrādes stundas vēlāk.

Šī divu daļu apmācība iepazīstina ar Kafka, sākot ar to, kā to instalēt un palaist jūsu attīstības vidē. Jūs saņemsiet pārskatu par Kafka arhitektūru, kam sekos ievads Apache Kafka ziņojumapmaiņas sistēmas pilnveidošanai. Visbeidzot, jūs izveidosiet pielāgotu ražotāja / patērētāja lietojumprogrammu, kas sūta un patērē ziņojumus, izmantojot Kafka serveri. Apmācības otrajā pusē jūs uzzināsit, kā sadalīt un grupēt ziņojumus un kā kontrolēt, kādus ziņojumus patērēs Kafka patērētājs.

Kas ir Apache Kafka?

Apache Kafka ir ziņojumapmaiņas sistēma, kas izveidota lieliem datiem. Līdzīgi kā Apache ActiveMQ vai RabbitMq, Kafka ļauj lietojumprogrammām, kas izveidotas uz dažādām platformām, sazināties, izmantojot asinhronu ziņojumu pārsūtīšanu. Bet Kafka no šīm tradicionālākajām ziņojumapmaiņas sistēmām atšķiras galvenajos veidos:

  • Tas ir paredzēts mērogošanai horizontāli, pievienojot vairāk preču serveru.
  • Tas nodrošina daudz lielāku caurlaidi gan ražotāja, gan patērētāja procesiem.
  • To var izmantot gan sērijveida, gan reāllaika lietošanas gadījumu atbalstam.
  • Tas neatbalsta JMS, Java uz ziņām orientētu starpprogrammatūras API.

Apache Kafka arhitektūra

Pirms mēs izpētīsim Kafka arhitektūru, jums jāzina tās pamatterminoloģija:

  • A ražotājs ir process, kas var publicēt ziņojumu par tēmu.
  • a patērētājs ir process, kas var abonēt vienu vai vairākas tēmas un patērēt ziņojumus, kas publicēti tēmām.
  • A tēmas kategorija ir plūsmas nosaukums, kurā tiek publicēti ziņojumi.
  • A brokeris ir process, kas darbojas vienā mašīnā.
  • A kopa ir brokeru grupa, kas strādā kopā.

Apache Kafka arhitektūra ir ļoti vienkārša, kā rezultātā dažās sistēmās var sasniegt labāku veiktspēju un caurlaidspēju. Katra Kafka tēma ir kā vienkāršs žurnāla fails. Kad producents publicē ziņojumu, Kafka serveris to pievieno žurnāla faila beigām savai norādītajai tēmai. Serveris arī piešķir kompensēt, kas ir skaitlis, ko izmanto, lai neatgriezeniski identificētu katru ziņojumu. Pieaugot ziņojumu skaitam, palielinās katra nobīdes vērtība; piemēram, ja producents publicē trīs ziņojumus, pirmais var iegūt nobīdi 1, otrais - 2 un trešais - 3.

Kad Kafka patērētājs pirmo reizi startē, tas serverim nosūtīs pieprasījumu, lūdzot izgūt visus ziņojumus par konkrētu tēmu ar nobīdes vērtību, kas lielāka par 0. Serveris pārbaudīs šīs tēmas žurnālfailu un atgriezīs trīs jaunos ziņojumus. . Patērētājs apstrādās ziņojumus, pēc tam nosūtīs ziņojumu pieprasījumu ar nobīdi augstāk nekā 3 utt.

Kafkā klients ir atbildīgs par nobīdes skaita atcerēšanos un ziņojumu izgūšanu. Kafka serveris neizseko un nepārvalda ziņojumu patēriņu. Pēc noklusējuma Kafka serveris saglabās ziņojumu septiņas dienas. Fona pavediens serverī pārbauda un izdzēš ziņojumus, kas ir septiņas dienas vai vecāki. Patērētājs var piekļūt ziņojumiem, kamēr tie atrodas serverī. Tas var lasīt ziņojumu vairākas reizes un pat lasīt ziņojumus apgrieztā saņemšanas secībā. Bet, ja patērētājs neizdodas atgūt ziņojumu pirms septiņu dienu beigām, viņš to nokavēs.

Kafka etaloni

LinkedIn un citu uzņēmumu produkcijas izmantošana ir parādījusi, ka ar pareizu konfigurāciju Apache Kafka katru dienu spēj apstrādāt simtiem gigabaitu datu. 2011. gadā trīs LinkedIn inženieri izmantoja etalonu testēšanu, lai parādītu, ka Kafka varētu sasniegt daudz lielāku caurlaidspēju nekā ActiveMQ un RabbitMQ.

Apache Kafka ātra iestatīšana un demonstrācija

Šajā apmācībā mēs izveidosim pielāgotu lietojumprogrammu, bet sāksim ar Kafka instances instalēšanu un testēšanu pie ārpuses ražotāja un patērētāja.

  1. Apmeklējiet Kafka lejupielādes lapu, lai instalētu jaunāko versiju (0.9 šī raksta laikā).
  2. Izvelciet bināros failus a programmatūra / kafka mapi. Pašreizējai versijai tas ir programmatūra / kafka_2.11-0.9.0.0.
  3. Mainiet pašreizējo direktoriju, lai norādītu uz jauno mapi.
  4. Palaidiet Zookeeper serveri, izpildot komandu: bin / zookeeper-server-start.sh config / zookeeper.properties.
  5. Sāciet Kafka serveri, izpildot: bin / kafka-server-start.sh config / server.properties.
  6. Izveidojiet testa tēmu, kuru varat izmantot testēšanai: bin / kafka-topics.sh --create --zookeeper localhost: 2181 - replikācijas koeficients 1 - nodalījumi 1 - tēma javaworld.
  7. Sāciet vienkāršu konsoles patērētāju, kas var patērēt ziņojumus, kas publicēti noteiktā tēmā, piemēram javaworld: bin / kafka-console-consumer.sh --zookeeper localhost: 2181 - topic javaworld - no sākuma.
  8. Sāciet vienkāršu ražotāja konsoli, kas var publicēt ziņojumus testa tēmā: bin / kafka-console-producer.sh - brokeru saraksta vietējais saimnieks: 9092 - topic javaworld.
  9. Mēģiniet ražotāja konsolē ierakstīt vienu vai divus ziņojumus. Jūsu ziņojumiem vajadzētu būt redzamiem patērētāju konsolē.

Lietojumprogrammas piemērs ar Apache Kafka

Jūs esat redzējis, kā Apache Kafka darbojas ārpus kastes. Tālāk izstrādāsim pielāgotu ražotāja / patērētāja lietojumprogrammu. Ražotājs izgūs lietotāja ievadi no konsoles un katru jaunu rindu nosūtīs kā ziņojumu uz Kafka serveri. Patērētājs izgūs ziņojumus par noteiktu tēmu un izdrukās tos konsolē. Šajā gadījumā ražotāja un patērētāja komponenti ir jūsu pašu īstenoti kafka-console-producer.sh un kafka-console-consumer.sh.

Sāksim ar a Producents.java klasē. Šajā klienta klasē ir loģika, lai lasītu lietotāja ievadi no konsoles un nosūtītu šo ievadi kā ziņojumu uz Kafka serveri.

Mēs konfigurējam ražotāju, izveidojot objektu no java.util.Īpašības klase un tās īpašību iestatīšana. ProducerConfig klase nosaka visas pieejamās dažādās īpašības, taču Kafka noklusējuma vērtības ir pietiekamas lielākajai daļai lietojumu. Noklusējuma konfigurācijai mums jāiestata tikai trīs obligātie rekvizīti:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) nosaka saimniekdatoru: portu pāru sarakstu, ko izmanto, lai izveidotu sākotnējos savienojumus ar Kakfa kopu resursdators1: ports1, resursdators2: ports2, ... formātā. Pat ja mūsu Kafka klasterī ir vairāk nekā viens brokeris, mums jānorāda tikai pirmā brokera vērtība resursdators: osta. Kafka klients izmantos šo vērtību, lai veiktu atklāšanas zvanu brokerim, kas atgriezīs visu klastera brokeru sarakstu. Ieteicams programmā norādīt vairāk nekā vienu brokeri BOOTSTRAP_SERVERS_CONFIG, tā ka, ja pirmais brokeris nedarbosies, klients varēs izmēģināt citus brokerus.

Kafka serveris sagaida ziņojumus baita [] atslēga, baita [] vērtība formātā. Tā vietā, lai pārveidotu katru atslēgu un vērtību, Kafka klienta bibliotēka ļauj mums izmantot tādus draudzīgākus veidus kā Stīga un int ziņu sūtīšanai. Bibliotēka tos pārveidos atbilstošajā tipā. Piemēram, lietotnes paraugam nav ziņojumam specifiskas atslēgas, tāpēc mēs to izmantosim nulle par atslēgu. Vērtībai, kuru izmantosim a Stīga, kas ir lietotāja ievadītie dati konsolē.

Lai konfigurētu ziņojuma atslēga, mēs iestatījām vērtību KEY_SERIALIZER_CLASS_CONFIG uz org.apache.kafka.common.serialization.ByteArraySerializer. Tas darbojas tāpēc, ka nulle nav jāpārvērš baits []. Priekš ziņojuma vērtība, mēs iestatījām VALUE_SERIALIZER_CLASS_CONFIG uz org.apache.kafka.common.serialization.StringSerializer, jo šī klase zina, kā pārveidot a Stīga uz a baits [].

Pielāgotas atslēgas / vērtības objekti

Līdzīgs StringSerializer, Kafka nodrošina seriālizatorus citiem primitīviem, piemēram, int un ilgi. Lai atslēgai vai vērtībai izmantotu pielāgotu objektu, mums jāizveido klases ieviešana org.apache.kafka.common.serialization.Serializer. Pēc tam mēs varētu pievienot loģiku, lai klasificētu klasi baits []. Mums arī mūsu patērētāja kodā būtu jāizmanto atbilstošs deserializators.

Kafka producents

Pēc pildīšanas Rekvizīti klasi ar nepieciešamajām konfigurācijas īpašībām, mēs varam to izmantot, lai izveidotu objektu KafkaRažotājs. Kad mēs pēc tam vēlamies nosūtīt ziņojumu Kafka serverim, mēs izveidosim objektu ProducerRecord un piezvaniet KafkaRažotājs's nosūtīt () metodi ar šo ierakstu, lai nosūtītu ziņojumu. The ProducerRecord ir divi parametri: tēmas nosaukums, kurai jāpublicē ziņojums, un faktiskais ziņojums. Neaizmirstiet piezvanīt Producer.close () metode, kad esat izmantojis ražotāju:

Saraksts 1. KafkaProducer

 publiskā klase Producer {private static Scanner in; public static void main (String [] argv) izmet izņēmumu {if (argv.length! = 1) {System.err.println ("Lūdzu, norādiet 1 parametrus"); System.exit (-1); } Virknes tēmaNosaukums = argv [0]; in = jauns skeneris (System.in); System.out.println ("Ievadiet ziņojumu (ierakstiet exit, lai beigtu)"); // Konfigurēt ražotāja rekvizītus configProperties = new Properties (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer producer = jauns KafkaProducer (configProperties); Virknes līnija = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = jauns ProducerRecord (topicName, line); producents.sūtīt (rec); līnija = in.nextLine (); } in.close (); producents.slēgt (); }} 

Konfigurēt ziņojuma patērētāju

Tālāk mēs izveidosim vienkāršu patērētāju, kurš abonēs tēmu. Ikreiz, kad tēmai tiek publicēts jauns ziņojums, tā nolasīs šo ziņojumu un izdrukās konsolē. Patērētāja kods ir diezgan līdzīgs ražotāja kodam. Mēs vispirms izveidojam objektu java.util.Īpašības, nosakot tā patērētājam raksturīgās īpašības un pēc tam izmantojot to, lai izveidotu jaunu objektu KafkaPatērētājs. ConsumerConfig klase nosaka visas īpašības, kuras mēs varam iestatīt. Ir tikai četras obligātās īpašības:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Tāpat kā mēs to darījām ražotāju klasei, mēs to izmantosim BOOTSTRAP_SERVERS_CONFIG lai konfigurētu resursdatora / porta pārus patērētāju klasei. Šī konfigurācija ļauj mums izveidot sākotnējos savienojumus ar Kakfa kopu resursdators1: ports1, resursdators2: ports2, ... formātā.

Kā jau iepriekš minēju, Kafka serveris sagaida ziņojumus baits [] taustiņu un baits [] vērtību formātus, un tai ir sava ieviešana dažādu veidu sērijveidošanai baits []. Tāpat kā mēs to darījām ar ražotāju, no patērētāja puses, lai pārveidotu, mums būs jāizmanto pielāgots deserializators baits [] atpakaļ attiecīgajā tipā.

Lietojumprogrammas parauga gadījumā mēs zinām, ka ražotājs to izmanto ByteArraySerializer par atslēgu un StringSerializer par vērtību. Tāpēc klienta pusē mums tas ir jāizmanto org.apache.kafka.common.serialization.ByteArrayDeserializer par atslēgu un org.apache.kafka.common.serialization.StringDeserializer par vērtību. Iestatot šīs klases kā vērtības KEY_DESERIALIZER_CLASS_CONFIG un VALUE_DESERIALIZER_CLASS_CONFIG ļaus patērētājam deserializēt baits [] kodēti veidi, kurus sūtījis ražotājs.

Visbeidzot, mums jāiestata GROUP_ID_CONFIG. Tam vajadzētu būt grupas nosaukumam virknes formātā. Sīkāk par šo konfigurāciju paskaidrošu minūtes laikā. Pagaidām paskatieties uz Kafka patērētāju ar četriem obligātajiem īpašumiem:

$config[zx-auto] not found$config[zx-overlay] not found