當前位置:編程學習大全網 - 編程語言 - 有沒有Python寫的spark連接Hbase的例子

有沒有Python寫的spark連接Hbase的例子

博主項目實踐中,經常需要用Spark從Hbase中讀取數據。其中,spark的版本為1.6,hbase的版本為0.98。現在記錄壹下如何在spark中操作讀取hbase中的數據。

對於這種操作型的需求,沒有什麽比直接上代碼更簡單明了的了。so,show me the code!

object Demo extends Logging{

val CF_FOR_FAMILY_USER = Bytes.toBytes("U");

val CF_FOR_FAMILY_DEVICE = Bytes.toBytes("D")

val QF_FOR_MODEL = Bytes.toBytes("model")

val HBASE_CLUSTER = "hbase://xxx/"

val TABLE_NAME = "xxx";

val HBASE_TABLE = HBASE_CLUSTER + TABLE_NAME

def genData(sc:SparkContext) = {

//20161229的數據,rowkey的設計為9999-yyyyMMdd

val filter_of_1229 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("79838770"))

//得到qf為w:00-23的數據

val filter_of_qf = new QualifierFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator("w"))

val all_filters = new util.ArrayList[Filter]()

all_filters.add(filter_of_1229)

all_filters.add(filter_of_qf)

//hbase多個過濾器

val filterList = new FilterList(all_filters)

val scan = new Scan().addFamily(CF_FOR_FAMILY_USER)

scan.setFilter(filterList)

scan.setCaching(1000)

scan.setCacheBlocks(false)

val conf = HBaseConfiguration.create()

conf.set(TableInputFormat.INPUT_TABLE,HBASE_TABLE )

conf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()))

sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

//後面是針對hbase查詢結果的具體業務邏輯

.map()

...

def main(args: Array[String]): Unit = {

val Array(output_path) = args

val sparkConf = new SparkConf().setAppName("demo")

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val sc = new SparkContext(sparkConf)

genUuidWifi(sc).saveAsTextFile(output_path)

sc.stop()

}

}1234567891011121314151617181920212223242526272829303132333435363738394041424344454612345678910111213141516171819202122232425262728293031323334353637383940414243444546

需要註意的壹個小點就是如果hbase裏有多個過濾器,註意需要使用FilterList。

  • 上一篇:"比爾蓋茨"的發展史?
  • 下一篇:LaTex 和cTex關系
  • copyright 2024編程學習大全網