【博学谷学习记录】超强总结,用心分享 | 陌陌案例-离线与实时查询
#博学谷IT技术支持#
一、案例需求
- 选择合理的存储容器进行数据存储,支持即席查询与离线分析
- 实时统计消息总量
- 实时统计各个地区收发消息的总量
- 实时统计每一位客户发送和接收消息数量
二、架构流程
2.1 数据采集操作:基于flume实现消息数据采集
监听数据源文件,一旦文件中有新的内容出现,将对应数据写入Kafka中,同时既能监听文件,也能监听目录
操作步骤
1. 确定三大组件
Source组件:Taildir Source
Sink组件:Kafka Sink
channel组件:Memory Channel
2. 调整采集配置信息
3. 配置采集文件
4. 在Kafka中创建MOMO_MSG的Topic
5. 启动Flume组件,准备进行数据采集工作
6. 测试是否正常采集
2.2 写入到HBase
1. 在HBase中创建一个名称空间:MOMO_CHAT
2. 在HBase中创建表
- 列族设计:C1
- 是否需要压缩:GZ
- 预分区:基于Hash预分区,6个
- 版本号:仅需要保留1个
- TTL:数据永久保存不需要考虑
3. 创建一个消费者完成数据写入到HBase
4. 测试操作
- 启动相关的软件:zookeeper、hadoop、hbase、kafka
- 启动消费者代码
- 启动陌陌数据源
- 检测hbase表中是否有数据
2.3 对接Phoenix
创建视图
create view MOMO_CHAT.MOMO_MSG(
"id" varchar primary key,
C1."msg_time" varchar,
C1."sender_nickyname" varchar,
C1."sender_account" varchar,
C1."sender_sex" varchar,
C1."sender_ip" varchar,
C1."sender_os" varchar,
C1."sender_phone_type" varchar,
C1."sender_network" varchar,
C1."sender_gps" varchar,
C1."receiver_nickyname" varchar,
C1."receiver_ip" varchar,
C1."receiver_account" varchar,
C1."receiver_os" varchar,
C1."receiver_phone_type" varchar,
C1."receiver_network" varchar,
C1."receiver_gps" varchar,
C1."receiver_sex" varchar,
C1."msg_type" varchar,
C1."distance" varchar,
C1."message" varchar
)
2.4 对接Hive
与HBase集成
create external table MOMO_CHAT.MOMO_MSG(
id string,
msg_time string,
sender_nickyname string,
sender_account string,
sender_sex string,
sender_ip string,
sender_os string,
sender_phone_type string,
sender_network string,
sender_gps string,
receiver_nickyname string,
receiver_ip string,
receiver_account string,
receiver_os string,
receiver_phone_type string,
receiver_network string,
receiver_gps string,
receiver_sex string,
msg_type string,
distance string,
message string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties('hbase.columns.mapping'=':key,C1:msg_time,
C1:sender_nickyname,
C1:sender_account,
C1:sender_sex,
C1:sender_ip,
C1:sender_os,
C1:sender_phone_type,
C1:sender_network,
C1:sender_gps,
C1:receiver_nickyname,
C1:receiver_ip,
C1:receiver_account,
C1:receiver_os,
C1:receiver_phone_type,
C1:receiver_network,
C1:receiver_gps,
C1:receiver_sex,
C1:msg_type,
C1:distance,
C1:message')
tblproperties('hbase.table.name'='MOMO_CHAT:MOMO_MSG');
2.5 基于Flink进行实时统计计算
- 实时需求
- 实时统计各个用户发送消息总量
// 设置过滤条件, 将不符合数据过滤掉
SingleOutputStreamOperator<String> filterOperator = streamSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String msg) throws Exception {
return (msg != null && !"".equals(msg.trim()) && msg.split("\001").length == 20);
// 如果需要此数据, 请返回true 如果不需要, 返回false
}
});
// 对数据进行转换操作: 将消息中发件人的GPS, 根据GPS地址获取其省份信息, 将省份信息作为key, value放置为1
SingleOutputStreamOperator<Tuple2<String, Long>> mapOperator = filterOperator.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String msg) throws Exception {
// 对消息数据进行切割操作
String[] fields = msg.split("\001");
// 获取发件人的GPS信息
String[] latAndLng = fields[15].split(",");
String lng = latAndLng[0].trim();
String lat = latAndLng[1].trim();
// 根据经纬度查询省份信息
String province = HttpClientUtils.findByLatAndLng(lat, lng);
// 返回数据
return new Tuple2<>(province, 1L);
}
});
// 根据省份分组,求和即可
SingleOutputStreamOperator<Tuple2<String, Long>> sumOperator = mapOperator.keyBy(0).sum(1);
// 将Tuple2转换为 momocountBean对象
SingleOutputStreamOperator<MoMoCountBean> operator = sumOperator.map(new MapFunction<Tuple2<String, Long>, MoMoCountBean>() {
@Override
public MoMoCountBean map(Tuple2<String, Long> tuple2) throws Exception {
String province = tuple2.f0;
Long msgCount = tuple2.f1;
MoMoCountBean moMoCountBean = new MoMoCountBean();
moMoCountBean.setMoMoProvince(province);
moMoCountBean.setMoMo_MsgCount(msgCount);
return moMoCountBean;
}
});
// 设置Sink组件, 写入到Mysql
operator.addSink(new MysqlSink("3"));