【博学谷学习记录】超强总结,用心分享 | 陌陌案例-离线与实时查询

 #博学谷IT技术支持#

一、案例需求

  1. 选择合理的存储容器进行数据存储,支持即席查询与离线分析
  2. 实时统计消息总量
  3. 实时统计各个地区收发消息的总量
  4. 实时统计每一位客户发送和接收消息数量

二、架构流程

2.1 数据采集操作:基于flume实现消息数据采集

监听数据源文件,一旦文件中有新的内容出现,将对应数据写入Kafka中,同时既能监听文件,也能监听目录

操作步骤

1. 确定三大组件

Source组件:Taildir Source

Sink组件:Kafka Sink

channel组件:Memory Channel

2. 调整采集配置信息

3. 配置采集文件

4. 在Kafka中创建MOMO_MSG的Topic

5. 启动Flume组件,准备进行数据采集工作

6. 测试是否正常采集

2.2 写入到HBase

1. 在HBase中创建一个名称空间:MOMO_CHAT

2. 在HBase中创建表

  1. 列族设计:C1
  2. 是否需要压缩:GZ
  3. 预分区:基于Hash预分区,6个
  4. 版本号:仅需要保留1个
  5. TTL:数据永久保存不需要考虑

3. 创建一个消费者完成数据写入到HBase

4. 测试操作

  1. 启动相关的软件:zookeeper、hadoop、hbase、kafka
  2. 启动消费者代码
  3. 启动陌陌数据源
  4. 检测hbase表中是否有数据

2.3 对接Phoenix

创建视图

create view MOMO_CHAT.MOMO_MSG(
    "id" varchar primary key,
    C1."msg_time" varchar,
    C1."sender_nickyname" varchar,
    C1."sender_account" varchar,
    C1."sender_sex" varchar,
    C1."sender_ip" varchar,
    C1."sender_os" varchar,
    C1."sender_phone_type" varchar,
    C1."sender_network" varchar,
    C1."sender_gps" varchar,
    C1."receiver_nickyname" varchar,
    C1."receiver_ip" varchar,
    C1."receiver_account" varchar,
    C1."receiver_os" varchar,
    C1."receiver_phone_type" varchar,
    C1."receiver_network" varchar,
    C1."receiver_gps" varchar,
    C1."receiver_sex" varchar,
    C1."msg_type" varchar,
    C1."distance" varchar,
    C1."message" varchar
)

2.4 对接Hive

与HBase集成

create external table MOMO_CHAT.MOMO_MSG(
    id string,
    msg_time string,
    sender_nickyname string,
    sender_account string,
    sender_sex string,
    sender_ip string,
    sender_os string,
    sender_phone_type string,
    sender_network string,
    sender_gps string,
    receiver_nickyname string,
    receiver_ip string,
    receiver_account string,
    receiver_os string,
    receiver_phone_type string,
    receiver_network string,
    receiver_gps string,
    receiver_sex string,
    msg_type string,
    distance string,
    message string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
with serdeproperties('hbase.columns.mapping'=':key,C1:msg_time,
C1:sender_nickyname,
C1:sender_account,
C1:sender_sex,
C1:sender_ip,
C1:sender_os,
C1:sender_phone_type,
C1:sender_network,
C1:sender_gps,
C1:receiver_nickyname,
C1:receiver_ip,
C1:receiver_account,
C1:receiver_os,
C1:receiver_phone_type,
C1:receiver_network,
C1:receiver_gps,
C1:receiver_sex,
C1:msg_type,
C1:distance,
C1:message')
tblproperties('hbase.table.name'='MOMO_CHAT:MOMO_MSG');

2.5 基于Flink进行实时统计计算

  • 实时需求
    • 实时统计各个用户发送消息总量
// 设置过滤条件, 将不符合数据过滤掉
SingleOutputStreamOperator<String> filterOperator = streamSource.filter(new FilterFunction<String>() {
    @Override
    public boolean filter(String msg) throws Exception {
        return (msg != null && !"".equals(msg.trim()) && msg.split("\001").length == 20); 
        // 如果需要此数据, 请返回true 如果不需要, 返回false
    }
});

// 对数据进行转换操作: 将消息中发件人的GPS, 根据GPS地址获取其省份信息, 将省份信息作为key, value放置为1
SingleOutputStreamOperator<Tuple2<String, Long>> mapOperator = filterOperator.map(new MapFunction<String, Tuple2<String, Long>>() {
    @Override
    public Tuple2<String, Long> map(String msg) throws Exception {
        // 对消息数据进行切割操作
        String[] fields = msg.split("\001");
        // 获取发件人的GPS信息
        String[] latAndLng = fields[15].split(",");

        String lng = latAndLng[0].trim();
        String lat = latAndLng[1].trim();

        // 根据经纬度查询省份信息
        String province = HttpClientUtils.findByLatAndLng(lat, lng);

        // 返回数据
        return new Tuple2<>(province, 1L);
    }
});
// 根据省份分组,求和即可
SingleOutputStreamOperator<Tuple2<String, Long>> sumOperator = mapOperator.keyBy(0).sum(1);

// 将Tuple2转换为 momocountBean对象
SingleOutputStreamOperator<MoMoCountBean> operator = sumOperator.map(new MapFunction<Tuple2<String, Long>, MoMoCountBean>() {
    @Override
    public MoMoCountBean map(Tuple2<String, Long> tuple2) throws Exception {
        String province = tuple2.f0;
        Long msgCount = tuple2.f1;

        MoMoCountBean moMoCountBean = new MoMoCountBean();
        moMoCountBean.setMoMoProvince(province);
        moMoCountBean.setMoMo_MsgCount(msgCount);

        return moMoCountBean;
    }
});

// 设置Sink组件, 写入到Mysql
operator.addSink(new MysqlSink("3"));