Programmēšana

Kā izveidot valstiskas straumēšanas lietojumprogrammas, izmantojot Apache Flink

Fabians Hueske ir Apache Flink projekta izpildītājs un PMC loceklis un Data Artisans līdzdibinātājs.

Apache Flink ir ietvars, lai ieviestu valstiskas plūsmas apstrādes lietojumprogrammas un palaistu tās mērogā skaitļošanas kopā. Iepriekšējā rakstā mēs pārbaudījām, kas ir stāvokļa straumes apstrāde, kādus izmantošanas gadījumus tā risina un kāpēc jums jāievieš un jādarbina straumēšanas lietojumprogrammas, izmantojot Apache Flink.

Šajā rakstā es sniegšu piemērus diviem izplatītas straumes apstrādes gadījumiem, kā parasti, un apspriedīšu, kā tos var ieviest ar Flink. Pirmais izmantošanas gadījums ir uz notikumiem balstītas lietojumprogrammas, t.i., lietojumprogrammas, kas uzņem nepārtrauktas notikumu plūsmas un šiem notikumiem piemēro kādu biznesa loģiku. Otrais ir straumēšanas analīzes izmantošanas gadījums, kur es iepazīstināšu ar diviem analītiskajiem vaicājumiem, kas ieviesti, izmantojot Flink’s SQL API, kuri reāllaikā apkopo straumēšanas datus. Mēs, Data Artisans, sniedzam visu savu piemēru pirmkodu publiskā GitHub repozitorijā.

Pirms mēs iedziļināsimies piemēru detaļās, es iepazīstināšu ar notikumu straumi, ko uzņem lietojumprogrammu piemēri, un paskaidrošu, kā jūs varat palaist mūsu piedāvāto kodu.

Taksometru braucienu plūsma

Mūsu piemēru piemēri ir balstīti uz publisku datu kopu par taksometru braucieniem, kas notika Ņujorkā 2013. gadā. 2015. gada DEBS (ACM Starptautiskās konferences par izplatītām, uz notikumiem balstītām sistēmām) organizatori Grand Challenge pārkārtoja sākotnējo datu kopu un pārveidoja to par viens CSV fails, no kura mēs lasām šādus deviņus laukus.

  • Medaljons - taksometra MD5 summas ID
  • Hack_license - taksometra licences MD5 summas ID
  • Pickup_datetime - laiks, kad tika uzņemti pasažieri
  • Dropoff_datetime - laiks, kad pasažieri tika izlaisti
  • Pickup_longitude - uzņemšanas vietas garums
  • Pickup_latitude - saņemšanas vietas platums
  • Dropoff_longitude - nolaišanās vietas garums
  • Dropoff_latitude - nolaišanās vietas platums
  • Kopējais_summa - kopējais maksājums dolāros

CSV failā ieraksti tiek glabāti to atlaišanas laika atribūta augošā secībā. Tādējādi failu var uzskatīt par sakārtotu notikumu žurnālu, kas tika publicēts, beidzoties ceļojumam. Lai palaistu GitHub sniegtos piemērus, no Google diska ir jālejupielādē DEBS izaicinājuma datu kopa.

Visi lietojumprogrammu piemēri secīgi lasa CSV failu un uzņem to kā taksometra braucienu notikumu straumi. Turpmāk lietojumprogrammas apstrādā notikumus tāpat kā jebkuru citu straumi, t.i., tāpat kā plūsmu, kas tiek uzņemta no žurnāla balstītas publicēšanas-abonēšanas sistēmas, piemēram, Apache Kafka vai Kinesis. Faktiski faila (vai jebkura cita veida pastāvīgu datu) lasīšana un apstrāde par straumi ir Flink pieejas stūrakmens partijas un straumes apstrādes apvienošanā.

Palaist Flink piemērus

Kā minēts iepriekš, mēs publicējām mūsu piemēru piemēru pirmkodu GitHub repozitorijā. Mēs iesakām krātuvi un klonēt krātuvi. Piemērus var viegli izpildīt no jūsu izvēlētā IDE; lai tos palaistu, nav jāiestata un jākonfigurē Flink kopa. Vispirms importējiet piemēru pirmkodu kā Maven projektu. Pēc tam izpildiet lietojumprogrammas galveno klasi un kā programmas parametru norādiet datu faila glabāšanas vietu (saiti datu lejupielādei skatiet iepriekš).

Kad esat palaidis lietojumprogrammu, tā sāks lokālu, iegultu Flink instanci lietojumprogrammas JVM procesā un iesniegs pieteikumu tās izpildei. Kamēr sāk darboties Flink un tiek plānoti darba uzdevumi, jūs redzēsiet virkni žurnāla izrakstu. Kad lietojumprogramma darbojas, tās izvads tiks ierakstīts standarta izvadē.

Uz notikumiem balstītas lietojumprogrammas veidošana Flink

Tagad ļaujiet mums apspriest mūsu pirmās lietošanas gadījumu, kas ir notikumu virzīta lietojumprogramma. Uz notikumiem balstītas lietojumprogrammas uzņem notikumu straumes, veic aprēķinus, tiklīdz notikumi tiek saņemti, un var izstarot jaunus notikumus vai izraisīt ārējas darbības. Vairākas uz notikumiem balstītas lietojumprogrammas var izveidot, savienojot tās kopā, izmantojot notikumu žurnālu sistēmas, līdzīgi tam, kā lielas sistēmas var veidot no mikropakalpojumiem. Uz notikumiem balstītas lietojumprogrammas, notikumu žurnāli un lietojumprogrammu stāvokļa momentuzņēmumi (Flinkā zināmi kā savepoint) veido ļoti spēcīgu dizaina modeli, jo jūs varat atiestatīt to stāvokli un atkārtot viņu ievadi, lai atgūtuos no kļūmes, labotu kļūdu vai migrētu lietojums citā kopā.

Šajā rakstā mēs izskatīsim notikumu virzītu lietojumprogrammu, kas atbalsta pakalpojumu, kas uzrauga taksometru vadītāju darba laiku. 2016. gadā NYC taksometru un limuzīnu komisija nolēma ierobežot taksometru vadītāju darba laiku līdz 12 stundu maiņām un pieprasīt vismaz astoņu stundu pārtraukumu, pirms var sākt nākamo maiņu. Maiņa sākas ar pirmā brauciena sākumu. Kopš tā laika autovadītājs var sākt jaunus braucienus 12 stundu laikā. Mūsu lietojumprogramma izseko braucēju braucienus, atzīmē viņu 12 stundu loga beigu laiku (t.i., laiku, kad viņi var sākt pēdējo braucienu), kā arī atzīmē braucienus, kas pārkāpj noteikumus. Pilnu šī piemēra avota kodu varat atrast mūsu GitHub krātuvē.

Mūsu lietojumprogramma tiek ieviesta, izmantojot Flink’s DataStream API un a KeyedProcessFunction. DataStream API ir funkcionāla API, kuras pamatā ir drukātu datu plūsmu jēdziens. A DataStream ir veida notikumu plūsmas loģisks attēlojums T. Straume tiek apstrādāta, tai piemērojot funkciju, kas rada citu, iespējams, cita veida datu plūsmu. Flink paralēli apstrādā straumes, izdalot notikumus partiju straumēšanai un katram nodalījumam piemērojot dažādus funkciju gadījumus.

Šis koda fragments parāda mūsu uzraudzības lietojumprogrammas augsta līmeņa plūsmu.

// uzņemt taksometru braucienu straumi.

DataStream braucieni = TaxiRides.getRides (env, inputPath);

DataStream paziņojumi = braucieni

// nodalījuma straume pēc vadītāja apliecības ID

.keyBy (r -> r.licenseId)

// pārraudzīt brauciena notikumus un ģenerēt paziņojumus

.process (new MonitorWorkTime ());

// drukāt paziņojumus

paziņojumi.druka ();

Lietojumprogramma sāk uzņemt taksometru braucienu plūsmu. Mūsu piemērā notikumi tiek nolasīti no teksta faila, parsēti un saglabāti mapē TaxiRide POJO objekti. Reālās lietojumprogrammas parasti uzņem notikumus no ziņojumu rindas vai notikumu žurnāla, piemēram, Apache Kafka vai Pravega. Nākamais solis ir taustiņa TaxiRide notikumi licences ID vadītāja. The keyBy operācija nodala plūsmu deklarētajā laukā tā, ka visus notikumus ar to pašu atslēgu apstrādā nākamās funkcijas viens un tas pats paralēlais gadījums. Mūsu gadījumā mēs sadalām licences ID jomā, jo mēs vēlamies uzraudzīt katra atsevišķa autovadītāja darba laiku.

Tālāk mēs izmantojam MonitorWorkTime funkcija sadalītajā TaxiRide notikumiem. Funkcija izseko braucienus vienam vadītājam un uzrauga viņu maiņas un pārtraukumu laikus. Tas izstaro veida notikumus Tuple2, kur katrs kopa apzīmē paziņojumu, kas sastāv no vadītāja apliecības ID un ziņojuma. Visbeidzot, mūsu lietojumprogramma izstaro ziņojumus, izdrukājot tos standarta izvadā. Reālās lietojumprogrammas paziņojumus ierakstītu ārējā ziņojumā vai krātuves sistēmā, piemēram, Apache Kafka, HDFS vai datu bāzes sistēmā, vai arī izraisītu ārēju zvanu, lai tos nekavējoties izstumtu.

Tagad, kad mēs esam apsprieduši lietojumprogrammas kopējo plūsmu, apskatīsim MonitorWorkTime funkcija, kas satur lielāko daļu lietojumprogrammas faktiskās biznesa loģikas. The MonitorWorkTime funkcija ir stāvokļa KeyedProcessFunction ka apēd TaxiRide notikumus un izstaro Tuple2 ieraksti. The KeyedProcessFunction saskarnē ir divas datu apstrādes metodes: processElement () un onTimer (). The processElement () metode ir paredzēta katram ierašanās notikumam. The onTimer () metode tiek izsaukta, kad tiek aktivizēts iepriekš reģistrēts taimeris. Nākamajā fragmentā parādīts MonitorWorkTime funkcija un viss, kas tiek deklarēts ārpus apstrādes metodēm.

publiskā statiskā klase MonitorWorkTime

paplašina KeyedProcessFunction {

// laika konstantes milisekundēs

privāts statisks galīgais garais ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; 12 stundas

privāts statisks galīgais garais REQ_BREAK_TIME = 8 * 60 * 60 * 1000; 8 stundas

privāts statisks gala garais CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 stundas

privāts pārejošs DateTimeFormatter formatētājs;

// norādiet rokturi, lai saglabātu maiņas sākuma laiku

ValueState shiftStart;

@ Pārvarēt

public void open (Configuration conf) {

// reģistrēt valsts rokturi

shiftStart = getRuntimeContext (). getState (

jauns ValueStateDescriptor (“shiftStart”, Types.LONG));

// inicializēt laika formatētāju

this.formatter = DateTimeFormat.forPattern (“gggg-MM-dd HH: mm: ss”);

  }

// processElement () un onTimer () sīkāk aplūkoti tālāk.

}

Funkcija deklarē dažas konstantes laika intervāliem milisekundēs, laika formatētāju un stāvokļa rokturi atslēgas statusam, kuru pārvalda Flinks. Pārvalda stāvokli periodiski pārbauda un neveiksmes gadījumā automātiski atjauno. Atslēgas stāvoklis ir sakārtots katram taustiņam, kas nozīmē, ka funkcija saglabās vienu vērtību katram rokturim un taustiņam. Mūsu gadījumā MonitorWorkTime funkcija uztur a Garš vērtība katrai atslēgai, t.i., katrai licences ID. The shiftStart štats saglabā autovadītāja maiņas sākuma laiku. Valsts rokturis tiek inicializēts atvērts () metodi, kas tiek izsaukta vienu reizi pirms pirmā notikuma apstrādes.

Tagad ieskatīsimies processElement () metodi.

@ Pārvarēt

public void processElement (

TaxiRide brauciens,

Konteksts ctx,

Kolekcionārs out) izmet izņēmumu {

// meklēt pēdējās maiņas sākuma laiku

Long startTs = shiftStart.value ();

ja (startTs == null ||

startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

// tas ir pirmais brauciens ar jaunu maiņu.

startTs = ride.pickUpTime;

shiftStart.update (startTs);

garie beigu termiņi = startTs + ALLOWED_WORK_TIME;

out.collect (Tuple2.of (ride.licenseId,

“Jums ir atļauts pieņemt jaunus pasažierus līdz“ + formatter.print (endTs)));

// reģistrējiet taimeri, lai 24 stundas atjaunotu stāvokli

ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

} else if (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

// šis brauciens sākās pēc atļautā darba laika beigām.

// tas ir noteikumu pārkāpums!

out.collect (Tuple2.of (ride.licenseId,

“Šis brauciens pārkāpa darba laika noteikumus.”));

  }

}

The processElement () metode tiek izsaukta katram TaxiRide notikumu. Pirmkārt, metode no vadītāja roktura iegūst vadītāja maiņas sākuma laiku. Ja valstī nav sākuma laika (startTs == null) vai ja pēdējā maiņa sākās vairāk nekā 20 stundas (ALLOWED_WORK_TIME + REQ_BREAK_TIME) agrāk nekā pašreizējais brauciens, pašreizējais brauciens ir pirmais brauciens ar jaunu maiņu. Jebkurā gadījumā funkcija sāk jaunu maiņu, atjauninot maiņas sākuma laiku uz pašreizējā brauciena sākuma laiku, izsūta vadītājam ziņojumu ar jaunās maiņas beigu laiku un reģistrē taimeri, lai notīrītu 24 stundu laikā.

Ja pašreizējais brauciens nav pirmais jaunās maiņas brauciens, funkcija pārbauda, ​​vai tas pārkāpj darba laika regulējumu, t.i., vai tas sākās vairāk nekā 12 stundas vēlāk nekā vadītāja pašreizējās maiņas sākums. Tādā gadījumā funkcija izsūta ziņojumu, lai informētu vadītāju par pārkāpumu.

The processElement () metode MonitorWorkTime funkcija reģistrē taimeri stāvokļa attīrīšanai 24 stundas pēc maiņas sākuma. Ir svarīgi noņemt vairs nevajadzīgu stāvokli, lai novērstu stāvokļa pieaugumu noplūdes stāvokļa dēļ. Taimeris tiek aktivizēts, kad lietojumprogrammas laiks pārsniedz taimera laika zīmogu. Tajā brīdī onTimer () metodi sauc. Līdzīgi kā stāvoklī, katram taustiņam tiek saglabāti taimeri, un funkcija tiek ievietota saistītās atslēgas kontekstā pirms onTimer () metodi sauc. Tādējādi visa stāvokļa piekļuve tiek novirzīta uz atslēgu, kas bija aktīva, kad taimeris tika reģistrēts.

Apskatīsim onTimer () metode MonitorWorkTime.

@ Pārvarēt

public void onTimer (

ilgi taimeri,

OnTimerContext ctx,

Kolekcionārs out) izmet izņēmumu {

// noņemt maiņas stāvokli, ja jau nav sākta jauna maiņa.

Long startTs = shiftStart.value ();

ja (startTs == taimerisTs - CLEAN_UP_INTERVAL) {

shiftStart.clear ();

  }

}

The processElement () metode 24 stundas pēc tam, kad maiņa sāk tīrīt stāvokli, kas vairs nav vajadzīgs, reģistrē taimerus. Valsts attīrīšana ir vienīgā loģika, ko onTimer () metodi īsteno. Kad taimeris aizdegas, mēs pārbaudām, vai vadītājs pa to laiku sāka jaunu maiņu, t.i., vai mainījās maiņas sākuma laiks. Ja tas tā nav, mēs noņemam vadītāja maiņas stāvokli.

$config[zx-auto] not found$config[zx-overlay] not found