flink加载kafka数据源存储至hbase
1、添加框架依赖
implementation 'org.apache.flink:flink-java:1.16.0'
implementation 'org.apache.flink:flink-streaming-java:1.16.0'
implementation 'org.apache.flink:flink-connector-kafka:1.16.0'
implementation 'org.apache.flink:flink-clients:1.16.0'
implementation 'org.apache.flink:flink-json:1.16.0'
implementation 'org.apache.flink:flink-table-api-scala-bridge_2.12:1.16.0'
implementation 'com.alibaba:fastjson:2.0.19.graal'
implementation 'org.apache.hadoop:hadoop-client:3.2.2'
implementation 'org.apache.flink:flink-connector-jdbc:1.16.0'
implementation 'org.apache.phoenix:phoenix-client-hbase-2.4:5.1.2'
2、flink读取kafka数据
kafka数据源数据格式
{"common":{"ar":"110000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone 8","mid":"mid_368770","os":"iOS 13.3.1","uid":"63","vc":"v2.1.132"},"displays":[{"display_type":"activity","item":"1","item_type":"activity_id","order":1,"pos_id":3}],"page":{"during_time":19393,"page_id":"home"},"ts":1605368266000}
编写flink读取kafka数据源
public class ReadKafkaToFlinkFunction {
public static StreamExecutionEnvironment getEnv() {
StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles);
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
executionEnvironment.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
executionEnvironment.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
executionEnvironment.setStateBackend(new HashMapStateBackend());
executionEnvironment.getCheckpointConfig().setCheckpointStorage("hdfs://server115:9000/flink/ck");
System.setProperty("HADOOP_USER_NAME", "wucf");
return executionEnvironment;
}
/**
* ID | CH | MD | MID | OS | DURING_TIME | PAGE_ID | TS
* 将kafka的数据读取到flink
*/
public static void kafkaNewFlinkStream() {
try {
StreamExecutionEnvironment env = getEnv();
// 将kafka数据作为source
KafkaSource<String> kafkaConsumer = KafkaUtils.getKafkaConsumer("topic-log");
DataStreamSource<String> dataStreamSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "KafkaSource");
///dataStreamSource.map((MapFunction<String, Object>) value -> StringUtils.isNoneBlank(value) ? JSONObject.parseObject(value, LogBean.class).getCommon().getUid() : "001").print();
///dataStreamSource.filter((FilterFunction<String>) value -> "6".equals(StringUtils.isNoneBlank(value)?JSONObject.parseObject(value, LogBean.class).getCommon().getUid():"")).print("1");
SinkFunction sink= new FlinkToPhoenixHbaseFunction();
dataStreamSource
.map((MapFunction<String, LogBean>) value -> StringUtils.isNoneBlank(value) ? JSONObject.parseObject(value, LogBean.class) : new LogBean())
.filter((FilterFunction<LogBean>) value -> !Objects.isNull(value.getPage()))
.keyBy((KeySelector<LogBean, Object>) value -> value.getCommon().getUid())
.max("page.during_time")
.addSink(sink);
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
json数据转实体类
@Data
public class LogBean {
private CommonBean common;
private PageBean page;
private long ts;
private List<DisplaysBean> displays;
@Data
public static class CommonBean {
private String ar;
private String ba;
private String ch;
private String is_new;
private String md;
private String mid;
private String os;
private String uid;
private String vc;
}
@Data
public static class PageBean {
private int during_time;
private String page_id;
}
@Data
public static class DisplaysBean {
private String display_type;
private String item;
private String item_type;
private int order;
private int pos_id;
}
}
kafka的工具类
public class KafkaUtils {
public static KafkaSource<String> getKafkaConsumer(String topic){
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("server200:9092,server200:9093")
.setTopics(Arrays.asList(topic))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.setUnbounded(OffsetsInitializer.latest())
.build();
return source;
}
}
3、编写flink的sink 代码
public class FlinkToPhoenixHbaseFunction extends RichSinkFunction<Object> {
private final static Logger LOGGER = LoggerFactory.getLogger(FlinkToPhoenixHbaseFunction.class);
private final static String PHOENIX_DRIVER_NAME="org.apache.phoenix.jdbc.PhoenixDriver";
private final static String PHOENIX_URL = "jdbc:phoenix:server200:2181";
private static DruidPooledConnection connection;
static {
try {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setDriverClassName(PHOENIX_DRIVER_NAME);
dataSource.setUrl(PHOENIX_URL);
dataSource.setTestOnBorrow(false);
dataSource.setTestOnReturn(false);
dataSource.setTestWhileIdle(true);
dataSource.setTimeBetweenEvictionRunsMillis(60000);
dataSource.setMaxActive(20);
dataSource.setInitialSize(5);
connection= dataSource.getConnection();
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
public void open(Configuration parameters) throws Exception {
createLogTb();
}
@Override
public void invoke(Object value, Context context) throws Exception {
upsertLogData((LogBean) value);
}
/**
* 创建表
* create table if not exists "wmy_db.tb_log"(id varchar primary key,common.ch varchar,common.md varchar,common.mid varchar,common.os varchar,page.during_time varchar,page.page_id varchar,displays varchar,ts varchar);
*/
private void createLogTb(){
try {
StringBuilder sql = new StringBuilder();
sql.append(" create table if not exists ");
sql.append("wmydb.").append("tb_log");
sql.append("(");
sql.append("id varchar primary key," +
"common.ch varchar," +
"common.md varchar," +
"common.mid varchar," +
"common.os varchar," +
"page.during_time varchar," +
"page.page_id varchar," +
"displays varchar," +
"ts varchar");
sql.append(")");
PreparedStatement preparedStatement = connection.prepareStatement(sql.toString());
preparedStatement.execute();
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
private void upsertLogData(LogBean logBean){
try {
StringBuilder sql =new StringBuilder();
sql.append(" upsert into \"wmydb.tb_log\" values(");
sql.append("'").append(System.currentTimeMillis()).append("'").append(",");
sql.append("'").append(logBean.getCommon().getCh()).append("'").append(",");
sql.append("'").append(logBean.getCommon().getMd()).append("'").append(",");
sql.append("'").append(logBean.getCommon().getMd()).append("'").append(",");
sql.append("'").append(logBean.getCommon().getOs()).append("'").append(",");
sql.append("'").append(logBean.getPage().getDuring_time()).append("'").append(",");
sql.append("'").append(logBean.getPage().getPage_id()).append("'").append(",");
sql.append("'").append(logBean.getDisplays()).append("'").append(",");
sql.append("'").append(logBean.getTs()).append("'");
sql.append(")");
PreparedStatement preparedStatement = connection.prepareStatement(sql.toString());
preparedStatement.execute();
connection.commit();
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
4、测试
public class FlinkApp {
public static void main(String[] args) {
System.out.println("hello word");
ReadKafkaToFlinkFunction.kafkaNewFlinkStream();
}
}