當前位置:編程學習大全網 - 源碼下載 - 如何使用scala+spark讀寫hbase

如何使用scala+spark讀寫hbase

公司有壹些實時數據處理的項目,存儲用的是hbase,提供實時的檢索,當然hbase裏面存儲的數據模型都是簡單的,復雜的多維檢索的結果是在es裏面存儲的,公司也正在引入Kylin作為OLAP的數據分析引擎,這塊後續有空在研究下。

接著上面說的,hbase存儲著壹些實時的數據,前兩周新需求需要對hbase裏面指定表的數據做壹次全量的update以滿足業務的發展,平時操作hbase都是單條的curd,或者插入壹個批量的list,用的都是hbase的java api比較簡單,但這次涉及全量update,所以如果再用原來那種單線程的操作api,勢必速度回慢上許多。

關於批量操作Hbase,壹般我們都會用MapReduce來操作,這樣可以大大加快處理效率,原來也寫過MR操作Hbase,過程比較繁瑣,最近壹直在用scala做spark的相關開發,所以就直接使用scala+spark來搞定這件事了,當然底層用的還是Hbase的TableOutputFormat和TableOutputFormat這個和MR是壹樣的,在spark裏面把從hbase裏面讀取的數據集轉成rdd了,然後做壹些簡單的過濾,轉化,最終在把結果寫入到hbase裏面。

整個流程如下:

(1)全量讀取hbase表的數據

(2)做壹系列的ETL

(3)把全量數據再寫回hbase

核心代碼如下:

//獲取conf

val conf=HBaseConfiguration.create() //設置讀取的表

conf.set(TableInputFormat.INPUT_TABLE,tableName) //設置寫入的表

conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)//創建sparkConf

val sparkConf=new SparkConf() //設置spark的任務名

sparkConf.setAppName("read and write for hbase ") //創建spark上下文

val sc=new SparkContext(sparkConf)

//為job指定輸出格式和輸出表名

val newAPIJobConfiguration1 = Job.getInstance(conf)

newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName)

newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

//全量讀取hbase表

val rdd=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat]

,classOf[ImmutableBytesWritable]

,classOf[Result]

)

//過濾空數據,然後對每壹個記錄做更新,並轉換成寫入的格式

val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas)

//轉換後的結果,再次做過濾

val save_rdd=final_rdd.filter(checkNull)

//最終在寫回hbase表

save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)

sc.stop()

從上面的代碼可以看出來,使用spark+scala操作hbase是非常簡單的。下面我們看壹下,中間用到的幾個自定義函數:

第壹個:checkNotEmptyKs

作用:過濾掉空列簇的數據

def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={ val r=f._2 val rowkey=Bytes.toString(r.getRow) val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(Bytes.toBytes("ks")).asScala if(map.isEmpty) false else true

}

第二個:forDatas

作用:讀取每壹條數據,做update後,在轉化成寫入操作

def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={ val r=f._2 //獲取Result

val put:Put=new Put(r.getRow) //聲明put

val ks=Bytes.toBytes("ks") //讀取指定列簇

val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(ks).asScala

map.foreach(kv=>{//遍歷每壹個rowkey下面的指定列簇的每壹列的數據做轉化

val kid= Bytes.toString(kv._1)//知識點id

var value=Bytes.toString(kv._2)//知識點的value值

value="修改後的value"

put.addColumn(ks,kv._1,Bytes.toBytes(value)) //放入put對象

}

) if(put.isEmpty) null else (new ImmutableBytesWritable(),put)

}

第三個:checkNull 作用:過濾最終結果裏面的null數據

def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={ if(f==null) false else true

}

上面就是整個處理的邏輯了,需要註意的是對hbase裏面的無效數據作過濾,跳過無效數據即可,邏輯是比較簡單的,代碼量也比較少。

  • 上一篇:夢幻西遊15門派闖關幾點開始?
  • 下一篇:phpstorm激活碼2021有效激活碼收錄
  • copyright 2024編程學習大全網