RDD menggunakan Spark: Blok Bangunan Apache Spark



Blog mengenai RDD yang menggunakan Spark ini akan memberi anda pengetahuan terperinci dan komprehensif mengenai RDD, yang merupakan unit asas Spark & ​​Betapa bergunanya.

, Kata itu sendiri sudah cukup untuk menghasilkan percikan dalam fikiran setiap jurutera Hadoop. KE n dalam ingatan alat pemprosesan yang pantas dalam pengkomputeran kluster. Berbanding dengan MapReduce, perkongsian data dalam memori menjadikan RDD 10-100x lebih pantas daripada perkongsian rangkaian dan cakera dan semua ini mungkin berlaku kerana RDD (set Data Terdistribusi Berdaya Tahan). Perkara utama yang kami fokuskan hari ini dalam RDD ini menggunakan artikel Spark adalah:

Perlukan RDD?

Mengapa kita memerlukan RDD? -RDD menggunakan Spark





Dunia berkembang dengan dan Sains Data kerana kemajuan di . Algoritma berdasarkan Regresi , , dan yang terus berjalan Diagihkan Pengiraan Iteratif ion fesyen yang merangkumi Penggunaan Semula dan Perkongsian data di antara beberapa unit pengkomputeran.

Yang tradisional teknik memerlukan penyimpanan Stabil Menengah dan Teragih seperti HDFS terdiri dari pengiraan berulang dengan replikasi data dan serialisasi data, yang menjadikan prosesnya jauh lebih lambat. Tidak semestinya mencari jalan penyelesaian.



Di sinilah dimana RDD (Set Data Terdistribusi yang Berdaya Tahan) datang ke gambaran besar.

RDD mudah digunakan dan tidak mudah dibuat kerana data diimport dari sumber data dan dimasukkan ke RDD. Selanjutnya, operasi diterapkan untuk memprosesnya. Mereka adalah koleksi memori yang diedarkan dengan kebenaran sebagai Baca sahaja dan yang paling penting, mereka Bertolak ansur dengan kesalahan .



Sekiranya ada partition data daripada RDD adalah hilang , ia dapat dijana semula dengan menerapkan yang sama penjelmaan operasi pada partisi yang hilang di keturunan , bukannya memproses semua data dari awal. Pendekatan semacam ini dalam senario masa nyata dapat membuat keajaiban berlaku dalam situasi kehilangan data atau ketika sistem tergendala.

Apa itu RDD?

RDD atau ( Set Data Teragih yang berdaya tahan ) adalah asas struktur data di Spark. Istilah Berdaya tahan mentakrifkan keupayaan yang menghasilkan data secara automatik atau data bergolek ke belakang kepada keadaan asal apabila malapetaka yang tidak dijangka berlaku dengan kebarangkalian kehilangan data.

Data yang ditulis ke dalam RDD adalah berpisah dan disimpan ke dalam pelbagai nod yang boleh dilaksanakan . Sekiranya nod pelaksanaan gagal dalam jangka masa berjalan, maka ia akan segera mendapat sokongan dari nod yang boleh dilaksanakan seterusnya . Inilah sebabnya mengapa RDD dianggap sebagai jenis struktur data yang maju jika dibandingkan dengan struktur data tradisional yang lain. RDD dapat menyimpan data berstruktur, tidak berstruktur dan separa berstruktur.

Mari maju dengan RDD kami menggunakan blog Spark dan pelajari tentang ciri unik RDD yang memberikan kelebihan berbanding jenis struktur data yang lain.

Ciri RDD

  • Dalam kenangan (RAM) Pengiraan : Konsep pengiraan In-Memory membawa pemprosesan data ke tahap yang lebih cepat dan efisien di mana keseluruhannya prestasi sistem adalah dinaik taraf.
  • L Penilaiannya : Istilah penilaian malas mengatakan bahawa transformasi diterapkan pada data dalam RDD, tetapi outputnya tidak dihasilkan. Sebaliknya, transformasi yang berlaku adalah dicatat.
  • Kegigihan : RDD yang dihasilkan selalu boleh digunakan semula.
  • Operasi Kasar : Pengguna dapat menerapkan transformasi ke semua elemen dalam set data melalui peta, tapis atau kumpulan mengikut operasi.
  • Toleran Kesalahan : Sekiranya terdapat kehilangan data, sistem dapat kembalikan kepada yang keadaan asal dengan menggunakan log transformasi .
  • Ketidakkekalan : Data yang ditentukan, diambil atau dibuat tidak boleh berubah setelah masuk ke dalam sistem. Sekiranya anda perlu mengakses dan mengubah RDD yang ada, anda mesti membuat RDD baru dengan menggunakan satu set Transformasi berfungsi ke RDD semasa atau sebelumnya.
  • Pembahagian : Ia adalah unit penting paralelisme dalam Spark RDD. Secara lalai, bilangan partisi yang dibuat berdasarkan sumber data anda. Anda juga boleh menentukan jumlah partisi yang ingin anda gunakan partition tersuai fungsi.

Penciptaan RDD menggunakan Spark

RDD boleh dibuat di tiga cara:

  1. Membaca data dari koleksi selari
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () hasilRDD.collect () ) .foreach (println)
  1. Memohon penjelmaan pada RDD sebelumnya
kata val = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'very', 'powerful', 'language')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Membaca data dari simpanan luaran atau laluan fail seperti HDFS atau HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Operasi yang dilakukan pada RDD:

Terdapat terutamanya dua jenis operasi yang dilakukan pada RDD, iaitu:

  • Transformasi
  • Tindakan

Transformasi : The operasi kami menggunakan RDD untuk penapis, akses dan ubah suai data dalam RDD induk untuk menghasilkan a RDD berturut-turut dipanggil penjelmaan . RDD baru mengembalikan penunjuk ke RDD sebelumnya memastikan pergantungan antara mereka.

Transformasi adalah Penilaian Malas, dengan kata lain, operasi yang diterapkan pada RDD yang anda jalankan akan dicatat tetapi tidak dilaksanakan. Sistem membuang hasil atau pengecualian setelah mencetuskan Tindakan .

Kita boleh membahagikan transformasi kepada dua jenis seperti di bawah:

  • Transformasi Sempit
  • Transformasi Luas

Transformasi Sempit Kami menerapkan transformasi sempit pada a partition tunggal RDD induk untuk menghasilkan RDD baru kerana data yang diperlukan untuk memproses RDD tersedia pada satu partisi dari ASD ibu bapa . Contoh untuk transformasi sempit adalah:

  • peta ()
  • penapis ()
  • peta rata ()
  • partition ()
  • peta Bahagian ()

Transformasi Luas: Kami menggunakan transformasi luas pada berbilang partition untuk menghasilkan RDD baru. Data yang diperlukan untuk memproses RDD tersedia di beberapa partisi dari ASD ibu bapa . Contoh untuk transformasi yang luas adalah:

pasang php pada windows 7
  • kurangkanDengan ()
  • kesatuan ()

Tindakan : Tindakan mengarahkan Apache Spark untuk mengaplikasikannya pengiraan dan lulus keputusan atau pengecualian kembali kepada RDD pemandu. Beberapa tindakan merangkumi:

  • kumpulkan ()
  • kira ()
  • ambil ()
  • pertama ()

Mari kita praktikkan operasi pada RDD:

IPL (Liga Perdana India) adalah kejohanan kriket dengan tahap yang sangat baik. Oleh itu, mari kita hari ini menggunakan set data IPL dan melaksanakan RDD kami menggunakan Spark.

  • Pertama, mari memuat turun data padanan CSV IPL. Setelah memuat turunnya, ia mula kelihatan sebagai fail EXCEL dengan baris dan lajur.

Pada langkah seterusnya, kami menyalakan percikan api dan memuatkan file match.csv dari lokasinya, dalam kes sayacsvlokasi fail adalah “/ Pengguna/edureka_566977/test/matches.csv”

Sekarang mari kita mulakan dengan Transformasi bahagian pertama:

  • peta ():

Kami guna Transformasi Peta untuk menerapkan operasi transformasi khusus pada setiap elemen RDD. Di sini kami membuat RDD dengan nama CKfile di mana menyimpan kamicsvfail. Kami akan mewujudkan RDD lain yang disebut Negara untuk simpan perincian bandar .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val state = CKfile.map (_. split (',') (2)) menyatakan.koleksi (). foreach (println)

  • penapis ():

Transformasi penapis, nama itu sendiri menggambarkan penggunaannya. Kami menggunakan operasi transformasi ini untuk menyaring data selektif dari kumpulan data yang diberikan. Kami memohon operasi penapis di sini untuk mendapatkan rekod perlawanan IPL tahun ini 2017 dan simpan dalam fil RDD.

menyediakan php pada windows
val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • peta rata ():

Kami menerapkan flatMap adalah operasi transformasi pada setiap elemen RDD untuk membuatRDD baru. Ia serupa dengan transformasi Peta. di sini kami memohonPeta Pelbagaike melancarkan pertandingan kota Hyderabad dan simpan data kefilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). kumpulkan ()

  • partition ():

Setiap data yang kami tulis menjadi RDD dibahagikan kepada sebilangan partisi. Kami menggunakan transformasi ini untuk mencari bilangan partition data sebenarnya terbahagi kepada.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • peta Bahagian ():

Kami menganggap MapPatitions sebagai alternatif Peta () danuntuk setiap() bersama. Kami menggunakan mapPartitions di sini untuk mencari bilangan baris kami ada di RDD fil kami.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • kurangkanDengan ():

Kami gunaKurangkanBoleh() pada Pasangan Nilai-Utama . Kami menggunakan transformasi ini pada kamicsvfail untuk mencari pemain dengan Man of the match tertinggi .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • kesatuan ():

Nama itu menjelaskan semuanya, Kami menggunakan transformasi kesatuan adalah untuk kelab dua RDD bersama . Di sini kita membuat dua RDD iaitu fil dan fil2. fil RDD mengandungi rekod perlawanan IPL 2017 dan fil2 RDD mengandungi rekod perlawanan IPL 2016.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Mari kita mulakan dengan Tindakan bahagian di mana kita menunjukkan output sebenar:

  • kumpulkan ():

Kumpulkan adalah tindakan yang biasa kita gunakan paparkan kandungannya dalam RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

  • hitung ():

Kiraadalah tindakan yang kita gunakan untuk mengira bilangan rekod hadir dalam RDDDi sinikami menggunakan operasi ini untuk mengira jumlah rekod dalam fail match.csv kami.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.count ()

  • ambil ():

Ambil adalah operasi Tindakan yang serupa dengan pengumpulan tetapi satu-satunya perbezaan ialah mencetaknya bilangan baris terpilih mengikut permintaan pengguna. Di sini kami menggunakan kod berikut untuk mencetak sepuluh laporan utama.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. ambil (10) .foreach (println)

  • pertama ():

Pertama () adalah operasi tindakan yang serupa dengan mengumpulkan () dan mengambil ()iadigunakan untuk mencetak laporan paling atas hasilnya Di sini kita menggunakan operasi pertama () untuk mencari jumlah maksimum perlawanan yang dimainkan di bandar tertentu dan kami menjadikan Mumbai sebagai output.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') val state = CKfile.map (_. split (',') (2)) val Scount = States.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y). peta (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Untuk menjadikan proses kami belajar RDD menggunakan Spark, lebih menarik lagi, saya telah menghasilkan kes penggunaan yang menarik.

RDD menggunakan Spark: Pokemon Use Case

  • Pertama, Mari kita memuat turun fail Pokemon.csv dan memuatkannya ke percikan api seperti yang kita lakukan ke file Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemon sebenarnya terdapat dalam pelbagai jenis, Mari kita cari beberapa jenis.

  • Mengeluarkan skema dari fail Pokemon.csv

Kita mungkin tidak memerlukannya Skema fail Pokemon.csv. Oleh itu, kami membuangnya.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Mencari bilangan partition pokemon.csv kami diedarkan ke.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Pokemon Air

Mencari bilangan pokemon Air

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Pokemon Api

Mencari bilangan pokemon Api

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Kita juga dapat mengesan penduduk dari pelbagai jenis pokemon menggunakan fungsi kiraan
WaterRDD.count () FireRDD.count ()

  • Oleh kerana saya suka permainan strategi pertahanan mari kita cari pokemon dengan pertahanan maksimum.
val defenceList = NoHeader.map {x => x.split (',')}. peta {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

apa yang berlegar di css
  • Kami tahu yang maksimum nilai kekuatan pertahanan tetapi kita tidak tahu pokemon mana itu. jadi, mari kita cari yang mana pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. peta {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Memesan [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Sekarang mari kita selesaikan pokemon dengan paling tidak Pertahanan
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Sekarang mari kita lihat Pokemon dengan a strategi kurang bertahan.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWith .map {x => x.split (',')}. peta {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Memesan [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Jadi, dengan ini, kita akan mengakhiri RDD ini dengan menggunakan artikel Spark. Saya harap kami memberi sedikit pengetahuan mengenai pengetahuan anda mengenai RDD, ciri-ciri mereka dan pelbagai jenis operasi yang dapat dilakukan pada mereka.

Artikel ini berdasarkan dirancang untuk mempersiapkan anda untuk Ujian Persijilan Pembangun Cloudera Hadoop dan Spark (CCA175). Anda akan mendapat pengetahuan mendalam mengenai Apache Spark dan Spark Ecosystem, yang merangkumi Spark RDD, Spark SQL, Spark MLlib dan Spark Streaming. Anda akan mendapat pengetahuan yang komprehensif mengenai bahasa Pemrograman Scala, HDFS, Sqoop, Flume, Spark GraphX ​​dan Sistem Pemesejan seperti Kafka.