當前位置:編程學習大全網 - 編程語言 - 如何通過網絡訪問hdfs的數據

如何通過網絡訪問hdfs的數據

妳好,實體類定義代碼:

<pre name="code" class="java">package com.qin.operadb;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapreduce.lib.db.DBWritable;

/***

* 封裝數據庫實體信息

* 的記錄

*

* 搜索大數據技術交流群:376932160

*

* **/

public class PersonRecoder implements Writable,DBWritable {

public int id;//對應數據庫中id字段

public String name;//對應數據庫中的name字段

public int age;//對應數據庫中的age字段

@Override

public void readFields(ResultSet result) throws SQLException {

this.id=result.getInt(1);

this.name=result.getString(2);

this.age=result.getInt(3);

}

@Override

public void write(PreparedStatement stmt) throws SQLException {

stmt.setInt(1, id);

stmt.setString(2, name);

stmt.setInt(3, age);

}

@Override

public void readFields(DataInput arg0) throws IOException {

// TODO Auto-generated method stub

this.id=arg0.readInt();

this.name=Text.readString(arg0);

this.age=arg0.readInt();

}

@Override

public void write(DataOutput out) throws IOException {

// TODO Auto-generated method stub

out.writeInt(id);

Text.writeString(out, this.name);

out.writeInt(this.age);

}

@Override

public String toString() {

// TODO Auto-generated method stub

return "id: "+id+" 年齡: "+age+" 名字:"+name;

}

}

</pre>

MR類的定義代碼,註意是壹個Map Only作業:

<pre name="code" class="java">package com.qin.operadb;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.lib.IdentityReducer;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;

import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ReadMapDB {

/**

* Map作業讀取數據記錄數

*

* **/

private static class DBMap extends Mapper<LongWritable, PersonRecoder , LongWritable, Text>{

@Override

protected void map(LongWritable key, PersonRecoder value,Context context)

throws IOException, InterruptedException {

context.write(new LongWritable(value.id), new Text(value.toString()));

}

}

public static void main(String[] args)throws Exception {

JobConf conf=new JobConf(ReadMapDB.class);

//Configuration conf=new Configuration();

// conf.set("mapred.job.tracker","192.168.75.130:9001");

//讀取person中的數據字段

// conf.setJar("tt.jar");

//註意這行代碼放在最前面,進行初始化,否則會報

DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.211.36:3306/test", "root", "qin");

/**要讀取的字段信息**/

String fileds[]=new String[]{"id","name","age"};

/**Job任務**/

Job job=new Job(conf, "readDB");

System.out.println("模式: "+conf.get("mapred.job.tracker"));

/**設置數據庫輸入格式的壹些信息**/

DBInputFormat.setInput(job, PersonRecoder.class, "person", null, "id", fileds);

/***設置輸入格式*/

job.setInputFormatClass(DBInputFormat.class);

job.setOutputKeyClass(LongWritable.class);

job.setOutputValueClass(Text.class);

job.setMapperClass(DBMap.class);

String path="hdfs://192.168.75.130:9000/root/outputdb";

FileSystem fs=FileSystem.get(conf);

Path p=new Path(path);

if(fs.exists(p)){

fs.delete(p, true);

System.out.println("輸出路徑存在,已刪除!");

}

FileOutputFormat.setOutputPath(job,p );

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

  • 上一篇:平面設計怎麽樣?是壹個有前途的職業嗎?
  • 下一篇:java開發錢景如何?疫情有影響麽?
  • copyright 2024編程學習大全網