<pre name="code" class="java">package com.qin.operadb;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
/***
* 封裝數據庫實體信息
* 的記錄
*
* 搜索大數據技術交流群:376932160
*
* **/
public class PersonRecoder implements Writable,DBWritable {
public int id;//對應數據庫中id字段
public String name;//對應數據庫中的name字段
public int age;//對應數據庫中的age字段
@Override
public void readFields(ResultSet result) throws SQLException {
this.id=result.getInt(1);
this.name=result.getString(2);
this.age=result.getInt(3);
}
@Override
public void write(PreparedStatement stmt) throws SQLException {
stmt.setInt(1, id);
stmt.setString(2, name);
stmt.setInt(3, age);
}
@Override
public void readFields(DataInput arg0) throws IOException {
// TODO Auto-generated method stub
this.id=arg0.readInt();
this.name=Text.readString(arg0);
this.age=arg0.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(id);
Text.writeString(out, this.name);
out.writeInt(this.age);
}
@Override
public String toString() {
// TODO Auto-generated method stub
return "id: "+id+" 年齡: "+age+" 名字:"+name;
}
}
</pre>
MR類的定義代碼,註意是壹個Map Only作業:
<pre name="code" class="java">package com.qin.operadb;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReadMapDB {
/**
* Map作業讀取數據記錄數
*
* **/
private static class DBMap extends Mapper<LongWritable, PersonRecoder , LongWritable, Text>{
@Override
protected void map(LongWritable key, PersonRecoder value,Context context)
throws IOException, InterruptedException {
context.write(new LongWritable(value.id), new Text(value.toString()));
}
}
public static void main(String[] args)throws Exception {
JobConf conf=new JobConf(ReadMapDB.class);
//Configuration conf=new Configuration();
// conf.set("mapred.job.tracker","192.168.75.130:9001");
//讀取person中的數據字段
// conf.setJar("tt.jar");
//註意這行代碼放在最前面,進行初始化,否則會報
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.211.36:3306/test", "root", "qin");
/**要讀取的字段信息**/
String fileds[]=new String[]{"id","name","age"};
/**Job任務**/
Job job=new Job(conf, "readDB");
System.out.println("模式: "+conf.get("mapred.job.tracker"));
/**設置數據庫輸入格式的壹些信息**/
DBInputFormat.setInput(job, PersonRecoder.class, "person", null, "id", fileds);
/***設置輸入格式*/
job.setInputFormatClass(DBInputFormat.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(DBMap.class);
String path="hdfs://192.168.75.130:9000/root/outputdb";
FileSystem fs=FileSystem.get(conf);
Path p=new Path(path);
if(fs.exists(p)){
fs.delete(p, true);
System.out.println("輸出路徑存在,已刪除!");
}
FileOutputFormat.setOutputPath(job,p );
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}