flink批量 async io 写入hbase

flink 采用批量 async io 方式写入hbase

    一条一条数据写入hbase太慢了,故采用批量的方式,每2000条数据一个批量写入hbase,提高写入性能

    设置一个三秒的翻滚窗口, 将数据聚合在一起, 然后批量的方式, Async IO 异步写入hbase

    经在压测环境验证,当长期大数据量消费, 设置checkpoint, async I/O 批量写入hbase 会出现checkpoint超时,写入hbase不及时,flink反压机制不生效。。 这种方式在生产环境慎用!!!!!!

这是坑。。。。。。。

val RunDataDS: DataStream[FdcData[RunData]] = getDatas()

    // 设置翻滚窗口,聚合数据
    val alarmRuleResultStream: DataStream[List[RunData]] = RunDataDS
      .map(_.datas)
      .keyBy(_.toolName)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))
      .process(new RunDataProcessWindowFunction())

    // 异步IO 写入hbase
    AsyncDataStream.orderedWait(
      alarmRuleResultStream,
      new TestDataHbaseSink(ProjectConfig.HBASE_RUNDATA_TABLE),
      6000,
      TimeUnit.MILLISECONDS,
      100)
      .name("Hbase Sink")
/**
   *  聚合在一起,批量写入
   */
  class RunDataProcessWindowFunction extends ProcessWindowFunction[RunData, List[RunData], String, TimeWindow] {

    def process(key: String, context: Context, input: Iterable[RunData], out: Collector[List[RunData]]): Unit = {
      out.collect(input.toList)
    }
  }

自定义sink: TestDataHbaseSink 继承 RichAsyncFunction

val table: BufferedMutator = connection.getBufferedMutator(TableName.valueOf(tableName))
批量写入hbase


import com.hzw.fdc.scalabean.{RunData, RunEventData}
import com.hzw.fdc.util.{ExceptionInfo, ProjectConfig}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
import org.apache.hadoop.hbase.client.{BufferedMutator, Connection, ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.slf4j.{Logger, LoggerFactory}

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

/**
 *   批量写入hbase
 *
 */
class TestDataHbaseSink(tableName: String) extends RichAsyncFunction[List[RunData], String] {
  var connection: Connection = _
  private val logger: Logger = LoggerFactory.getLogger(classOf[WindowEndRunDataHbaseSink])

  override def open(parameters: Configuration): Unit = {
    // 获取全局配置变量
    val parameters = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
    ProjectConfig.getConfig(parameters)


    //创建hbase连接
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", ProjectConfig.HBASE_ZOOKEEPER_QUORUM)
    conf.set("hbase.zookeeper.property.clientPort", ProjectConfig.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT)
    connection = ConnectionFactory.createConnection(conf)
  }

override def timeout(input: List[RunData], resultFuture: ResultFuture[String]): Unit = {
    super.timeout(input, resultFuture)
    resultFuture.complete(List("timeout"))
  }
  
 /**
   * 数据写入
   */
  override def asyncInvoke(runEventDataList: List[RunData], resultFuture: ResultFuture[String]): Unit = {

    val table: BufferedMutator = connection.getBufferedMutator(TableName.valueOf(tableName))

    val puts: ListBuffer[Put] = new ListBuffer()
    var count = 0

    try {

      for (runStartEvent <- runEventDataList) {
        try {

          val runid = s"${runStartEvent.toolName}--${runStartEvent.chamberName}--${runStartEvent.runStartTime}"

          val key = s"${runStartEvent.toolName}_${runStartEvent.chamberName}".hashCode % 10
          val put = new Put(Bytes.toBytes(s"${key}_${runStartEvent.toolName}_${runStartEvent.chamberName}_${runStartEvent.runStartTime}"))
          put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("RUN_ID"), Bytes.toBytes(runid))
          put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("TOOL_NAME"), Bytes.toBytes(runStartEvent.toolName))
          put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("CHAMBER_NAME"), Bytes.toBytes(runStartEvent.chamberName))
          put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("RUN_START_TIME"), Bytes.toBytes(runStartEvent.runStartTime))

          puts.append(put)
          count = count + 1

          if (count % 2000 == 0) {

            table.mutate(puts.asJava)
            puts.clear()
            count = 0
          }

        }catch {
          case ex: Exception => logger.warn(s" Exception:${ExceptionInfo.getExceptionInfo(ex)} data: $runStartEvent")
        }
      }
      table.mutate(puts.asJava)
    }catch {
      case ex: Exception => logger.warn(s" Exception:${ExceptionInfo.getExceptionInfo(ex)}")
    }finally {
      table.close()
      resultFuture.complete(List("ok"))
    }

  }


  /**
   * 关闭链接
   */
  override def close(): Unit = {
    connection.close()
    super.close()

  }


  def hasLength(str: String): String = {
    if (str != null) {
      str
    } else {
      ""
    }
  }

}

遇到的问题: java.util.concurrent.TimeoutException: Async function call has timed out.

是因为异步方式没有返回数据;解决方案:
添加:
resultFuture.complete(List(“ok”))
在这里插入图片描述
在这里插入图片描述

2021-12-20 15:55:55
java.lang.Exception: Could not complete the stream element: Record @ 1639986929999 : 
    at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.completeExceptionally(AsyncWaitOperator.java:368)
    at org.apache.flink.streaming.api.scala.async.JavaResultFutureWrapper.completeExceptionally(JavaResultFutureWrapper.scala:42)
    at org.apache.flink.streaming.api.scala.async.AsyncFunction$class.timeout(AsyncFunction.scala:60)
    at org.apache.flink.streaming.api.scala.async.RichAsyncFunction.timeout(RichAsyncFunction.scala:37)
    at org.apache.flink.streaming.api.scala.async.ScalaRichAsyncFunctionWrapper.timeout(ScalaRichAsyncFunctionWrapper.scala:40)
    at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.timerTriggered(AsyncWaitOperator.java:391)
    at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$registerTimeout$1(AsyncWaitOperator.java:386)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1327)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1318)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Async function call has timed out.
    ... 16 more