Transformasi Keadaan Kumulatif Dalam Streaming Spark Apache



Catatan blog ini membincangkan transformasi bernegara dalam Spark Streaming. Ketahui semua tentang penjejakan kumulatif dan kemahiran tinggi untuk karier Hadoop Spark.

Disumbangkan oleh Prithviraj Bose

apa applet dalam java dengan contoh

Dalam blog sebelumnya saya telah membincangkan transformasi bernegara menggunakan konsep Window Apache Spark Streaming. Anda boleh membacanya di sini .





Dalam posting ini saya akan membincangkan operasi kumulatif stateful dalam Apache Spark Streaming. Sekiranya anda baru menggunakan Spark Streaming, saya sangat mengesyorkan anda membaca blog saya yang lalu untuk memahami cara kerja Window.

Jenis Transformasi Negara dalam Streaming Spark (Bersambung…)

> Penjejakan kumulatif

Kami telah menggunakan kurangkanByKeyAndWindow (…) API untuk mengesan keadaan kunci, namun Window menimbulkan batasan untuk kes penggunaan tertentu. Bagaimana jika kita ingin mengumpulkan keadaan kekunci di seluruh dan bukannya membataskannya ke jangka masa? Sekiranya kita perlu menggunakan kemas kiniStateByKey (…) KEBAKARAN.



API ini diperkenalkan di Spark 1.3.0 dan telah sangat popular. Walau bagaimanapun API ini mempunyai beberapa prestasi overhead, prestasinya merosot apabila ukuran keadaan meningkat dari masa ke masa. Saya telah menulis contoh untuk menunjukkan penggunaan API ini. Anda boleh mendapatkan kodnya di sini .

Spark 1.6.0 memperkenalkan API baru mapWithState (…) yang menyelesaikan overhed prestasi yang ditimbulkan oleh kemas kiniStateByKey (…) . Dalam blog ini saya akan membincangkan API khusus ini menggunakan contoh program yang telah saya tulis. Anda boleh mendapatkan kodnya di sini .

Sebelum saya menyelidiki kod, mari kita luangkan beberapa perkataan pada pemeriksaan. Untuk sebarang transformasi bernegara, pemeriksaan mesti dilakukan. Checkpointing adalah mekanisme untuk mengembalikan keadaan kunci sekiranya program pemacu gagal. Semasa pemacu dimulakan semula, keadaan kunci dipulihkan dari fail pemeriksaan. Lokasi pusat pemeriksaan biasanya HDFS atau Amazon S3 atau storan yang boleh dipercayai. Semasa menguji kod, seseorang juga dapat menyimpan di sistem fail tempatan.



Dalam program contoh, kita mendengarkan aliran teks soket pada host = localhost dan port = 9999. Ini menandakan aliran masuk ke (kata, jumlah kejadian) dan melacak jumlah kata menggunakan API 1.6.0 mapWithState (…) . Selain itu, kunci tanpa kemas kini dikeluarkan menggunakan StateSpec.timeout API. Kami memeriksa di HDFS dan frekuensi pemeriksaan setiap 20 saat.

Mari buat pertama kali sesi Spark Streaming,

Spark-streaming-session

Kami mencipta a pusat pemeriksaanDir dalam HDFS dan kemudian memanggil kaedah objek getOrCreate (…) . The dapatkanOrCreate API memeriksa pusat pemeriksaanDir untuk melihat apakah ada keadaan sebelumnya untuk dipulihkan, jika ada, maka ia akan membuat sesi Spark Streaming dan mengemas kini keadaan kunci dari data yang disimpan di dalam fail sebelum meneruskan dengan data baru. Jika tidak, ia mewujudkan sesi Spark Streaming baru.

The dapatkanOrCreate mengambil nama direktori checkpoint dan fungsi (yang telah kami namakan buatFunc ) tandatangan yang sepatutnya () => StreamingContext .

apa itu fungsi maya java

Mari kita periksa kod di dalamnya buatFunc .

Baris # 2: Kami membuat konteks streaming dengan nama pekerjaan ke 'TestMapWithStateJob' dan selang kumpulan = 5 saat.

Baris # 5: Tetapkan direktori pusat pemeriksaan.

Baris # 8: Tetapkan spesifikasi keadaan menggunakan kelas org.apache.streaming.StateSpec objek. Kami mula-mula menetapkan fungsi yang akan mengesan keadaan, kemudian kami menetapkan jumlah partisi untuk DStream yang dihasilkan yang akan dihasilkan semasa transformasi berikutnya. Akhirnya kami menetapkan masa tamat (hingga 30 saat) di mana jika sebarang kemas kini untuk kunci tidak diterima dalam 30 saat maka keadaan kunci akan dikeluarkan.

Baris 12 #: Siapkan aliran soket, ratakan data kumpulan masuk, buat pasangan kunci-nilai, panggil petaWithState , tetapkan selang pemeriksaan ke 20-an dan akhirnya mencetak hasilnya.

Kerangka Spark memanggil th e buatFunc untuk setiap kunci dengan nilai sebelumnya dan keadaan semasa. Kami menghitung jumlahnya dan mengemas kini keadaan dengan jumlah kumulatif dan akhirnya kami mengembalikan jumlah untuk kunci.

pengendalian pengecualian di pl sql

Sumber Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Ada soalan untuk kami? Sila sebutkan di bahagian komen dan kami akan menghubungi anda.

Catatan berkaitan:

Mulakan dengan Apache Spark & ​​Scala

Transformasi Negeri dengan Window dalam Spark Streaming