DBInputFormat untuk Memindahkan Data Dari SQL ke Pangkalan Data NoSQL



Objektif blog ini adalah untuk mempelajari cara memindahkan data dari pangkalan data SQL ke HDFS, cara memindahkan data dari pangkalan data SQL ke pangkalan data NoSQL.

Di blog ini kita akan meneroka keupayaan dan kemungkinan salah satu komponen terpenting teknologi Hadoop iaitu MapReduce.

Hari ini, syarikat mengadopsi kerangka Hadoop sebagai pilihan pertama mereka untuk penyimpanan data kerana kemampuannya untuk mengendalikan data besar dengan berkesan. Tetapi kita juga tahu bahawa data itu serba boleh dan terdapat dalam pelbagai struktur dan format. Untuk mengendalikan sebilangan besar data dan formatnya yang berbeza harus ada mekanisme untuk mengakomodasi semua varietas namun menghasilkan hasil yang efektif dan konsisten.





Komponen yang paling kuat dalam kerangka Hadoop adalah MapReduce yang dapat memberikan kawalan pada data dan strukturnya lebih baik daripada rakan-rakannya yang lain. Walaupun memerlukan kurva pembelajaran yang berlebihan dan kerumitan pengaturcaraan, jika anda dapat mengatasi kerumitan ini, anda pasti dapat menangani semua jenis data dengan Hadoop.

Rangka kerja MapReduce memecahkan semua tugas pemprosesannya menjadi dua fasa: Peta dan Kurangkan.



kepada kekuatan di java

Menyiapkan data mentah anda untuk fasa ini memerlukan pemahaman mengenai beberapa kelas dan antara muka asas. Kelas super untuk pemprosesan semula ini adalah InputFormat.

The InputFormat class adalah salah satu kelas teras dalam Hadoop MapReduce API. Kelas ini bertanggungjawab untuk menentukan dua perkara utama:

  • Pembahagian data
  • Pembaca rakaman

Pembahagian data adalah konsep asas dalam kerangka Hadoop MapReduce yang menentukan ukuran tugas peta individu dan pelayan pelaksanaannya yang berpotensi. The Pembaca Rekod bertanggungjawab untuk membaca rekod sebenar dari fail input dan menyerahkannya (sebagai pasangan kunci / nilai) ke mapper.



Bilangan pemetaan ditentukan berdasarkan jumlah pembelahan. Ini adalah tugas InputFormat untuk membuat perpecahan. Sebahagian besar ukuran pemisahan masa setara dengan ukuran blok tetapi tidak selalunya perpecahan akan dibuat berdasarkan ukuran blok HDFS. Itu bergantung sepenuhnya pada bagaimana kaedah getSplits () InputFormat anda diganti.

Terdapat perbezaan mendasar antara MR split dan blok HDFS. Blok adalah sekumpulan data fizikal sementara perpecahan hanyalah sekeping logik yang dibaca oleh mapper. Perpecahan tidak mengandungi data input, hanya menyimpan rujukan atau alamat data. Perpecahan pada dasarnya mempunyai dua perkara: Panjang dalam bait dan sekumpulan lokasi penyimpanan, yang hanya rentetan.

Untuk memahami perkara ini dengan lebih baik, mari kita ambil satu contoh: Memproses data yang disimpan di MySQL anda menggunakan MR. Oleh kerana tidak ada konsep blok dalam kes ini, teori: 'pembelahan selalu dibuat berdasarkan blok HDFS',gagal. Satu kemungkinan adalah membuat perpecahan berdasarkan rentang baris dalam jadual MySQL anda (dan inilah yang dilakukan oleh DBInputFormat, format input untuk membaca data dari pangkalan data hubungan). Kita mungkin mempunyai bilangan k yang terdiri daripada n baris.

Hanya untuk InputFormats berdasarkan FileInputFormat (sebuah InputFormat untuk menangani data yang disimpan dalam fail) pembelahan dibuat berdasarkan ukuran total, dalam bait, fail input. Walau bagaimanapun, FileSystem menyekat fail input dianggap sebagai batas atas untuk pemisahan input. Sekiranya anda mempunyai fail yang lebih kecil daripada ukuran blok HDFS, anda hanya akan mendapat 1 mapper untuk fail tersebut. Sekiranya anda ingin mempunyai tingkah laku yang berbeza, anda boleh menggunakan ukuran mapred.min.split.size. Tetapi ia bergantung sepenuhnya pada getSplits () InputFormat anda.

Kami mempunyai begitu banyak format input yang ada di bawah pakej org.apache.hadoop.mapreduce.lib.input.

GabungkanFileInputFormat.html

GabungkanFileRecordReader.html

GabungkanFileRecordReaderWrapper.html

GabungkanFileSplit.html

GabungkanSequenceFileInputFormat.html

GabungkanTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FailSplit.html

FixedLengthInputFormat.html

Tidak SahInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

Lalai adalah TextInputFormat.

Begitu juga, kami mempunyai begitu banyak format output yang membaca data dari pengurang dan menyimpannya ke dalam HDFS:

FileOutputCommitter.html

FileOutputFormat.html

membalikkan nombor dalam python

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Lalai adalah TextOutputFormat.

Pada saat anda selesai membaca blog ini, anda pasti akan belajar:

  • Cara menulis program mengurangkan peta
  • Mengenai pelbagai jenis InputFormats yang terdapat di Mapreduce
  • Apa keperluan InputFormats
  • Cara menulis InputFormats tersuai
  • Cara memindahkan data dari pangkalan data SQL ke HDFS
  • Cara memindahkan data dari pangkalan data SQL (di sini MySQL) ke pangkalan data NoSQL (di sini Hbase)
  • Cara memindahkan data dari satu pangkalan data SQL ke jadual lain dalam pangkalan data SQL (Mungkin ini tidak begitu penting jika kita melakukan ini dalam pangkalan data SQL yang sama. Walau bagaimanapun, tidak ada salahnya memiliki pengetahuan yang sama. Anda tidak pernah tahu bagaimana ia boleh digunakan)

Prasyarat:

  • Hadoop dipasang sebelumnya
  • SQL telah dipasang sebelumnya
  • Hbase sudah terpasang
  • Pemahaman asas Java
  • MapReduce pengetahuan
  • Pengetahuan asas kerangka Hadoop

Mari kita fahami pernyataan masalah yang akan kita selesaikan di sini:

Kami mempunyai jadual pekerja di MySQL DB dalam pangkalan data hubungan kami, Edureka. Sekarang mengikut keperluan perniagaan, kita harus mengalihkan semua data yang ada di DB hubungan ke sistem fail Hadoop, yaitu HDFS, NoSQL DB yang dikenal sebagai Hbase.

Kami mempunyai banyak pilihan untuk melakukan tugas ini:

  • Sqoop
  • Flume
  • Pengurangan Peta

Sekarang, anda tidak mahu memasang dan mengkonfigurasi alat lain untuk operasi ini. Anda hanya tinggal satu pilihan iaitu kerangka pemprosesan Hadoop MapReduce. Kerangka MapReduce akan memberi anda kawalan penuh terhadap data semasa memindahkan. Anda boleh memanipulasi lajur dan meletakkan terus di salah satu daripada dua lokasi sasaran.

Catatan:

  • Kita perlu memuat turun dan meletakkan penyambung MySQL di classpath Hadoop untuk mengambil jadual dari jadual MySQL. Untuk melakukan ini, muat turun penyambung com.mysql.jdbc_5.1.5.jar dan simpan di bawah direktori Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Muat turun / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Juga, letakkan semua balang Hbase di bawah Hadpop classpath untuk menjadikan program MR anda mengakses Hbase. Untuk melakukan ini, jalankan arahan berikut :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / kongsi / hadoop / mapreduce / lib /

Versi perisian yang telah saya gunakan dalam pelaksanaan tugas ini adalah:

  • Hadooop-2.3.0
  • HBase 0.98.9-Hadoop2
  • Gerhana Bulan

Untuk mengelakkan program dalam masalah keserasian, saya menetapkan pembaca saya untuk menjalankan perintah dengan persekitaran yang serupa.

DBInputWritable tersuai:

pakej com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implements Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) melemparkan IOException {} awam kosong readFields (ResultSet rs) melemparkan SQLException // Objek hasil menunjukkan representasi data yang dikembalikan dari pernyataan SQL {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) membuang IOException { } public void write (PreparedStatement ps) membuang SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

DBOutputWritable tersuai:

pakej com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable menerapkan Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = beri nama ini.id = id this.dept = dept} public void readFields (DataInput in) membuang IOException {} public void readFields (ResultSet rs) membuang SQLException {} public void write (DataOutput out) membuang IOException {} public void write (PreparedStatement ps) membuang SQLException {ps.setString (1, name) ps.setInt (2, id) ps.setString (3, dept)}}

Jadual Input:

buat edureka pangkalan data
buat emp emping (empid int not null, name varchar (30), dept varchar (20), key primer (empid))
masukkan ke dalam nilai emp (1, 'abhay', 'developmentement'), (2, 'brundesh', 'test')
pilih * dari emp

Kes 1: Pindah dari MySQL ke HDFS

pakej com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Konfigurasi import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBC Konfigurasi import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Teks import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) membuang Pengecualian {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // kelas pemandu' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // nama pengguna' root ') // kata laluan Job job = pekerjaan (conf) baru .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.class) FileOut, Fileput Path baru (args [0])) DBInputFormat.setInput (pekerjaan, DBInputWritable.class, 'emp', // nama jadual input null, null, String baru [] {'empid', 'name', 'dept'} / / lajur jadual) Laluan p = Laluan baru (args [0]) FileSystem fs = FileSystem.get (URI baru (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Bahagian kod ini membolehkan kita menyiapkan atau mengkonfigurasi format input untuk mengakses sumber SQL DB kami. Parameternya merangkumi kelas pemacu, URL mempunyai alamat pangkalan data SQL, nama pengguna dan kata laluannya.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // class driver 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // nama pengguna 'root') // kata laluan

Bahagian kod ini membolehkan kita menyampaikan butir-butir jadual dalam pangkalan data dan menetapkannya dalam objek kerja. Parameternya termasuk tentu saja contoh pekerjaan, kelas yang dapat ditulis khusus yang mesti menerapkan antara muka DBWritable, nama jadual sumber, keadaan jika ada yang lain, parameter penyortiran yang lain tidak ada, senarai lajur jadual masing-masing.

DBInputFormat.setInput (tugas, DBInputWritable.class, 'emp', // nama jadual input null, null, String baru [] {'empid', 'name', 'dept'} // lajur jadual)

Pemetaan

pakej com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Teks import org.apache.hadoop.io Peta kelas awam .IntWritable meluaskan Mapper {
peta kekosongan terlindung (Kekunci LongWritable, DBInputWritable value, Context ctx) {try {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (Teks baru (nama + '+ id +' + dept), id)
} tangkapan (IOException e) {e.printStackTrace ()} tangkapan (InterruptException e) {e.printStackTrace ()}}}

Reducer: Identity Reducer Digunakan

Perintah untuk dijalankan:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Keluaran: Jadual MySQL Dipindahkan ke HDFS

hadoop dfs -ls / dbtohdfs / *

Kes 2: Pindahkan Dari Satu Jadual di MySQL ke Yang Lain di MySQL

membuat jadual output di MySQL

buat pekerja jadual1 (nama varchar (20), id int, dept varchar (20))

pakej com.inputFormat.copy import org.apache.hadoop.conf.Konfigurasi import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBC konfigurasi import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Teks import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable kelas awam Mainonetable_to_other_table {public static void main (String [] args) membuang Pengecualian {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // kelas pemandu 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // nama pengguna' root ') // kata laluan Job job = Job baru (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) pekerjaan .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) pekerjaan.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // nama jadual input null, null, String baru [] {'emp ',' name ',' dept '} // lajur jadual) DBOutputFormat.setOutput (pekerjaan,' pegawai1 ', // output jadual nama String baru [] {' name ',' id ',' dept '} // jadual lajur) System.exit (job.waitForCompletion (benar)? 0: 1)}}

Bahagian kod ini membolehkan kita mengkonfigurasi nama jadual output di SQL DB. Parameternya adalah contoh pekerjaan, nama jadual output dan nama lajur output masing-masing.

apakah perbezaan antara hashmap dan hashtable
DBOutputFormat.setOutput (kerja, 'pekerja1', // nama jadual output String baru [] {'name', 'id', 'dept'} // lajur jadual)

Mapper: Sama seperti Kes 1

Pengurang:

pakej com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Teks import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io Kelas awam NullWritable Reduce memanjangkan Reducer {dilindungi void less (Kekunci teks, nilai Iterable, Konteks ctx) {int sum = 0 String line [] = key.toString (). Split ('') cuba {ctx.write (DBOutputWritable baru (baris [0] .toString (), Integer.parseInt (baris [1] .toString ()), baris [2] .toString ()), NullWritable.get ())} tangkapan (IOException e) {e.printStackTrace ()} tangkapan (InterruptException e) {e.printStackTrace ()}}}

Perintah untuk Dijalankan:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Keluaran: Data yang Dipindahkan Dari Jadual EMP di MySQL ke Kakitangan Meja Lain1 di MySQL

Kes 3: Pindahkan Dari Jadual di MySQL ke Jadual NoSQL (Hbase)

Membuat jadual Hbase untuk menampung output dari jadual SQL:

buat 'pekerja', 'official_info'

Kelas Pemandu:

pakej Dbtohbase import org.apache.hadoop.conf.Konfigurasi import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBC konfigurasi import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Htable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) melontarkan Pengecualian {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // class driver 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // nama pengguna 'root') // kata laluan Job job = Job baru (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('pekerja', Reduce.class, job) job.setInputFormat (NotFormat) .Format kelas) DBInputFormat.setInput (pekerjaan, DBInputWritable.class, 'emp', // input table name null, null, new String [] {'empid', 'name', 'dept'} // table columns) System.exit (job.waitForCompletion (benar)? 0: 1)}}

Potongan kod ini membolehkan anda mengkonfigurasi kelas kunci output yang sekiranya hbase adalah ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Di sini kita memberikan nama jadual hbase dan pengurang untuk bertindak di atas meja.

TableMapReduceUtil.initTableReducerJob ('pekerja', Reduce.class, pekerjaan)

Pemeta:

pakej Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map meluaskan Mapper {private IntWritable one = new IntWritable (1) map void dilindungi (LongWritable id, DBInputWritable value, konteks konteks) {cuba {String line = value.getName () String cd = value.getId () + 'String dept = value.getDept () context.write (baru ImmutableBytesWritable (Bytes.toBytes (cd)), Teks baru (baris + ') '+ dept))} tangkapan (IOException e) {e.printStackTrace ()} tangkapan (InterruptException e) {e.printStackTrace ()}}}

Dalam kod ini, kita mengambil nilai dari para penerima kelas DBinputwritable dan kemudian memasukkannya
ImmutableBytesWritable sehingga mencapai pengurang dalam bentuk bytewriatble yang difahami oleh Hbase.

String line = value.getName () String cd = value.getId () + 'String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), Teks baru (baris +' + dept ))

Pengurang:

pakej Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text public class Reduce memanjangkan TableReducer {public void less (ImmutableBytesWritable key, Iterable, Context konteks) melemparkan IOException, InterruptException {String [] penyebab = null // Nilai gelung untuk (Teks val: nilai) {menyebabkan = val.toString (). split ('')} // Put ke HBase Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info') ), Bytes.toBytes ('name'), Bytes.toBytes (penyebab [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (penyebab [1 ])) konteks.write (kunci, letakkan)}}

Bahagian kod ini membolehkan kita menentukan baris dan lajur yang tepat di mana kita akan menyimpan nilai dari pengurang. Di sini kita menyimpan setiap empid dalam baris yang terpisah kerana kita menjadikan empid sebagai kunci baris yang akan menjadi unik. Di setiap baris kami menyimpan maklumat rasmi pekerja di bawah keluarga kolum 'official_info' di bawah lajur 'nama' dan 'jabatan' masing-masing.

Put put = Put ​​baru (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (sebab [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (penyebab [1])) konteks.write (kunci, letakkan)

Data yang dipindahkan dalam Hbase:

pegawai imbasan

Seperti yang kita lihat, kita dapat menyelesaikan tugas memindahkan data perniagaan kita dari SQL DB relasional ke DB NoSQL dengan jayanya.

Di blog seterusnya, kita akan belajar bagaimana menulis dan melaksanakan kod untuk format input dan output lain.

Terus menghantar komen, soalan atau maklum balas anda. Saya ingin mendengar daripada anda.

Ada soalan untuk kami? Sila sebutkan di bahagian komen dan kami akan menghubungi anda.

Catatan berkaitan: