接著上面說的,hbase存儲著壹些實時的數據,前兩周新需求需要對hbase裏面指定表的數據做壹次全量的update以滿足業務的發展,平時操作hbase都是單條的curd,或者插入壹個批量的list,用的都是hbase的java api比較簡單,但這次涉及全量update,所以如果再用原來那種單線程的操作api,勢必速度回慢上許多。
關於批量操作Hbase,壹般我們都會用MapReduce來操作,這樣可以大大加快處理效率,原來也寫過MR操作Hbase,過程比較繁瑣,最近壹直在用scala做spark的相關開發,所以就直接使用scala+spark來搞定這件事了,當然底層用的還是Hbase的TableOutputFormat和TableOutputFormat這個和MR是壹樣的,在spark裏面把從hbase裏面讀取的數據集轉成rdd了,然後做壹些簡單的過濾,轉化,最終在把結果寫入到hbase裏面。
整個流程如下:
(1)全量讀取hbase表的數據
(2)做壹系列的ETL
(3)把全量數據再寫回hbase
核心代碼如下:
//獲取conf
val conf=HBaseConfiguration.create() //設置讀取的表
conf.set(TableInputFormat.INPUT_TABLE,tableName) //設置寫入的表
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)//創建sparkConf
val sparkConf=new SparkConf() //設置spark的任務名
sparkConf.setAppName("read and write for hbase ") //創建spark上下文
val sc=new SparkContext(sparkConf)
//為job指定輸出格式和輸出表名
val newAPIJobConfiguration1 = Job.getInstance(conf)
newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName)
newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
//全量讀取hbase表
val rdd=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat]
,classOf[ImmutableBytesWritable]
,classOf[Result]
)
//過濾空數據,然後對每壹個記錄做更新,並轉換成寫入的格式
val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas)
//轉換後的結果,再次做過濾
val save_rdd=final_rdd.filter(checkNull)
//最終在寫回hbase表
save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)
sc.stop()
從上面的代碼可以看出來,使用spark+scala操作hbase是非常簡單的。下面我們看壹下,中間用到的幾個自定義函數:
第壹個:checkNotEmptyKs
作用:過濾掉空列簇的數據
def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={ val r=f._2 val rowkey=Bytes.toString(r.getRow) val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(Bytes.toBytes("ks")).asScala if(map.isEmpty) false else true
}
第二個:forDatas
作用:讀取每壹條數據,做update後,在轉化成寫入操作
def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={ val r=f._2 //獲取Result
val put:Put=new Put(r.getRow) //聲明put
val ks=Bytes.toBytes("ks") //讀取指定列簇
val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(ks).asScala
map.foreach(kv=>{//遍歷每壹個rowkey下面的指定列簇的每壹列的數據做轉化
val kid= Bytes.toString(kv._1)//知識點id
var value=Bytes.toString(kv._2)//知識點的value值
value="修改後的value"
put.addColumn(ks,kv._1,Bytes.toBytes(value)) //放入put對象
}
) if(put.isEmpty) null else (new ImmutableBytesWritable(),put)
}
第三個:checkNull 作用:過濾最終結果裏面的null數據
def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={ if(f==null) false else true
}
上面就是整個處理的邏輯了,需要註意的是對hbase裏面的無效數據作過濾,跳過無效數據即可,邏輯是比較簡單的,代碼量也比較少。