flink批量 async io 写入hbase
目录
flink 采用批量 async io 方式写入hbase
一条一条数据写入hbase太慢了,故采用批量的方式,每2000条数据一个批量写入hbase,提高写入性能
设置一个三秒的翻滚窗口, 将数据聚合在一起, 然后批量的方式, Async IO 异步写入hbase
经在压测环境验证,当长期大数据量消费, 设置checkpoint, async I/O 批量写入hbase 会出现checkpoint超时,写入hbase不及时,flink反压机制不生效。。 这种方式在生产环境慎用!!!!!!
这是坑。。。。。。。
val RunDataDS: DataStream[FdcData[RunData]] = getDatas()
// 设置翻滚窗口,聚合数据
val alarmRuleResultStream: DataStream[List[RunData]] = RunDataDS
.map(_.datas)
.keyBy(_.toolName)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.process(new RunDataProcessWindowFunction())
// 异步IO 写入hbase
AsyncDataStream.orderedWait(
alarmRuleResultStream,
new TestDataHbaseSink(ProjectConfig.HBASE_RUNDATA_TABLE),
6000,
TimeUnit.MILLISECONDS,
100)
.name("Hbase Sink")
/**
* 聚合在一起,批量写入
*/
class RunDataProcessWindowFunction extends ProcessWindowFunction[RunData, List[RunData], String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[RunData], out: Collector[List[RunData]]): Unit = {
out.collect(input.toList)
}
}
自定义sink: TestDataHbaseSink 继承 RichAsyncFunction
val table: BufferedMutator = connection.getBufferedMutator(TableName.valueOf(tableName))
批量写入hbase
import com.hzw.fdc.scalabean.{RunData, RunEventData}
import com.hzw.fdc.util.{ExceptionInfo, ProjectConfig}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
import org.apache.hadoop.hbase.client.{BufferedMutator, Connection, ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
/**
* 批量写入hbase
*
*/
class TestDataHbaseSink(tableName: String) extends RichAsyncFunction[List[RunData], String] {
var connection: Connection = _
private val logger: Logger = LoggerFactory.getLogger(classOf[WindowEndRunDataHbaseSink])
override def open(parameters: Configuration): Unit = {
// 获取全局配置变量
val parameters = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
ProjectConfig.getConfig(parameters)
//创建hbase连接
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", ProjectConfig.HBASE_ZOOKEEPER_QUORUM)
conf.set("hbase.zookeeper.property.clientPort", ProjectConfig.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT)
connection = ConnectionFactory.createConnection(conf)
}
override def timeout(input: List[RunData], resultFuture: ResultFuture[String]): Unit = {
super.timeout(input, resultFuture)
resultFuture.complete(List("timeout"))
}
/**
* 数据写入
*/
override def asyncInvoke(runEventDataList: List[RunData], resultFuture: ResultFuture[String]): Unit = {
val table: BufferedMutator = connection.getBufferedMutator(TableName.valueOf(tableName))
val puts: ListBuffer[Put] = new ListBuffer()
var count = 0
try {
for (runStartEvent <- runEventDataList) {
try {
val runid = s"${runStartEvent.toolName}--${runStartEvent.chamberName}--${runStartEvent.runStartTime}"
val key = s"${runStartEvent.toolName}_${runStartEvent.chamberName}".hashCode % 10
val put = new Put(Bytes.toBytes(s"${key}_${runStartEvent.toolName}_${runStartEvent.chamberName}_${runStartEvent.runStartTime}"))
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("RUN_ID"), Bytes.toBytes(runid))
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("TOOL_NAME"), Bytes.toBytes(runStartEvent.toolName))
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("CHAMBER_NAME"), Bytes.toBytes(runStartEvent.chamberName))
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("RUN_START_TIME"), Bytes.toBytes(runStartEvent.runStartTime))
puts.append(put)
count = count + 1
if (count % 2000 == 0) {
table.mutate(puts.asJava)
puts.clear()
count = 0
}
}catch {
case ex: Exception => logger.warn(s" Exception:${ExceptionInfo.getExceptionInfo(ex)} data: $runStartEvent")
}
}
table.mutate(puts.asJava)
}catch {
case ex: Exception => logger.warn(s" Exception:${ExceptionInfo.getExceptionInfo(ex)}")
}finally {
table.close()
resultFuture.complete(List("ok"))
}
}
/**
* 关闭链接
*/
override def close(): Unit = {
connection.close()
super.close()
}
def hasLength(str: String): String = {
if (str != null) {
str
} else {
""
}
}
}
遇到的问题: java.util.concurrent.TimeoutException: Async function call has timed out.
是因为异步方式没有返回数据;解决方案:
添加:
resultFuture.complete(List(“ok”))
2021-12-20 15:55:55
java.lang.Exception: Could not complete the stream element: Record @ 1639986929999 :
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.completeExceptionally(AsyncWaitOperator.java:368)
at org.apache.flink.streaming.api.scala.async.JavaResultFutureWrapper.completeExceptionally(JavaResultFutureWrapper.scala:42)
at org.apache.flink.streaming.api.scala.async.AsyncFunction$class.timeout(AsyncFunction.scala:60)
at org.apache.flink.streaming.api.scala.async.RichAsyncFunction.timeout(RichAsyncFunction.scala:37)
at org.apache.flink.streaming.api.scala.async.ScalaRichAsyncFunctionWrapper.timeout(ScalaRichAsyncFunctionWrapper.scala:40)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.timerTriggered(AsyncWaitOperator.java:391)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$registerTimeout$1(AsyncWaitOperator.java:386)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1327)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1318)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Async function call has timed out.
... 16 more