HBase快速入门系列 (九)| HBase 的实战案例(微博)

HBase实战之谷粒微博

需求分析

  1. 微博内容的浏览,数据库表设计

  2. 用户社交体现:关注用户,取关用户

  3. 拉取关注的人的微博内容

表的分类

1.微博内容表

表结构:

方法名creatTableeContent
Table Nameweibo:content
RowKey用户 ID_时间戳
ColumnFamilyinfo
ColumnLabel标题,内容,图片
Version1 个版本

2.用户关系表

表结构:

方法名createTableRelations
Table Nameweibo:relation
RowKey用户 ID
ColumnFamilyattends、fans
ColumnLabel关注用户 ID,粉丝用户 ID
ColumnValue用户 ID
Version1 个版本

3.微博收件箱表

表结构:

方法名createTableReceiveContentEmails
Table Nameweibo:inbox
RowKey用户 ID
ColumnFamilyinfo
ColumnLabel用户 ID
ColumnValue取微博内容的 RowKey
Version2

代码实现

1.代码设计总览:

  1. 创建命名空间以及表名的定义

  2. 创建微博内容表

  3. 创建用户关系表

  4. 创建用户微博内容接收邮件表

  5. 发布微博内容

  6. 添加关注用户

  7. 移除(取关)用户

  8. 获取关注的人的微博内容

  9. 测试

2. 环境搭建

1.pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>Hadoop</artifactId>
        <groupId>com.huan</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>guli-weibo</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>
    </dependencies>
</project>

2.包名规则

在这里插入图片描述

3.resources文件 (hbase-site.xml 可以直接在本地运行,无需打包jar)

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-->
<configuration>
    <property>  
        <name>hbase.rootdir</name>  
        <value>hdfs://Bigdata01:9000/hbase</value>  
    </property>

    <property>  
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>

    <!-- 0.98后的新变动,之前版本没有.port,默认端口为60000 -->
    <property>
        <name>hbase.master.port</name>
        <value>16000</value>
    </property>

    <property>  
        <name>hbase.zookeeper.quorum</name>
        <value>Bigdata01:2181,Bigdata02:2181,Bigdata03:2181</value>
    </property>

    <property>  
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/opt/module/zookeeper-3.4.10/zkData</value>
    </property>
    <property>
        <name>hbase.master.maxclockskew</name>
        <value>180000</value>
        <description>Time difference of regionserver from master</description>
    </property>
</configuration>

3.创建命名空间以及表名的定义

这里我们设置一个常量给它们定义好方便后续的使用,在后续使用时直接调用即可:

package com.huan.constants;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;



public class Constants {

    //TODO 定义上下文配置信息
    public static final Configuration CONFIGURATION = HBaseConfiguration.create();

    //TODO 定义命名空间
    public static final String NAMESPACE = "weibo";

    //TODO 微博内容表
    public static final String CONTENT_TABLE = "weibo:content";
    public static final String CONTENT_TABLE_CF = "info";
    public static final int CONTENT_TABLE_VERSION = 1;

    //TODO 用户关系表
    public static final String RELATION_TABLE = "weibo:relation";
    public static final String RELATION_TABLE_CF1 = "attends";
    public static final String RELATION_TABLE_CF2 = "fans";
    public static final int RELATION_TABLE_VERSION = 1;

    //TODO 收件箱表
    public static final String INBOX_TABLE = "weibo:inbox";
    public static final String INBOX_TABLE_CF = "info";
    public static final int INBOX_TABLE_VERSION = 2;
}

4.设置一个工具类,在工具类里面将命名空间和表的参数设置好方便后续调用

package com.huan.utils;

import com.huan.constants.Constants;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

/**
 * 1.创建命名空间
 * 2.判断表是否存在
 * 3.创建表 (三张表)
 *
 */
public class HBaseUtils {

    public static Connection connection = null;
    public static Admin admin = null;


    //TODO 1.创建命名空间
    public static void createNameSpace(String nameSpace) throws IOException {

        //1.获取Connection对象
        connection = ConnectionFactory.createConnection( Constants.CONFIGURATION );
        //2.获取Admin对象
        admin = connection.getAdmin();

        //3.构建命名空间描述器
        NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create( nameSpace ).build();

        //4.创建命名空间
        admin.createNamespace( namespaceDescriptor );

        //5.关闭资源
        admin.close();
        connection.close();
    }

    //TODO 2.判断表是否存在
    public static boolean isTableExist(String tableName) throws IOException {

        //1.判断表是否存在
        boolean exists = admin.tableExists( TableName.valueOf( tableName ) );

        //2.关闭资源
        admin.close();
        connection.close();

        //3.返回结果
        return exists;
    }

    //TODO 3.创建表
    public static void createTable(String tableName,int version,String... cfs) throws IOException {
        //1.判断是否传入了列族信息
        if(cfs.length < 0){
            System.out.println("请设置列族信息");
            return;
        }

        //2.判断表是否存在
        if(isTableExist( tableName )){
            System.out.println(tableName+"表已存在");
            return;
        }
        //3.获取Connection对象
        connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
        //4.获取Admin对象
        admin = connection.getAdmin();
        //5.创建表描述器
        HTableDescriptor hTableDescriptor = new HTableDescriptor( TableName.valueOf( tableName ) );
        //6.循环添加列族信息
        for (String cf : cfs) {
            //遍历列族
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor( cf );
            //7.设置版本
            hColumnDescriptor.setMaxVersions( version );

            hTableDescriptor.addFamily( hColumnDescriptor );
        }

        //8.创建表操作
        admin.createTable( hTableDescriptor );
        //9.关闭资源
        admin.close();
        connection.close();
    }

}

5.写好相应的业务需求方便测试调用

package com.huan.dao;

import com.huan.constants.Constants;
import com.huan.utils.HBaseUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;

/**
 * 1.发布微博
 * 2.删除微博
 * 3.关注用户
 * 4.取关用户
 * 5.获取用户初始化界面
 * 6.获取微博详情
 */
public class HBaseDao {

    //TODO 1.发布微博
    public static void publishWeiBo(String uid, String content) throws IOException {
        //1.获取Connection对象
        Connection connection = HBaseUtils.connection;
        connection = ConnectionFactory.createConnection( Constants.CONFIGURATION );

        //第一部分:操作微博内容表
        //1.获取微博内容表对象
        Table contTable = connection.getTable( TableName.valueOf( Constants.CONTENT_TABLE ) );

        //2.获取当前时间戳
        long ts = System.currentTimeMillis();

        //3.获取RowKey
        String rowKey = uid + "_" + ts;

        //4.创建Put对象
        Put contPut = new Put( Bytes.toBytes( rowKey ) );
        //5.给Put对象赋值
        contPut.addColumn( Bytes.toBytes( Constants.CONTENT_TABLE_CF ), Bytes.toBytes( "content" ), Bytes.toBytes( content ) );
        //6.执行插入数据操作
        contTable.put( contPut );

        //第二部分操作微博收件箱表
        //1.获取用户关系表对象
        Table relaTable = connection.getTable( TableName.valueOf( Constants.RELATION_TABLE ) );

        //2.获取当前发布微博人的fans列族数据
        Get get = new Get( Bytes.toBytes( uid ) );
        Result result = relaTable.get( get );

        //3.创建一个集合,用户存放微博内容表的Put对象
        ArrayList<Put> inboxPuts = new ArrayList<>();
        //4.遍历粉丝
        for (Cell cell : result.rawCells()) {
            //5.构建微博收件箱表的Put对象
            Put inboxPut = new Put( CellUtil.cloneQualifier( cell ) );

            //6.给收件箱表的Put对象赋值
            inboxPut.addColumn( Bytes.toBytes( Constants.INBOX_TABLE_CF ), Bytes.toBytes( uid ), Bytes.toBytes( rowKey ) );

            //7.将收件箱表的Put对象存入集合
            inboxPuts.add( inboxPut );
        }

        //8.判断是否有粉丝
        if (inboxPuts.size() > 0) {
            //获取收件箱表对象
            Table inboxTable = connection.getTable( TableName.valueOf( Constants.INBOX_TABLE ) );

            //执行收件箱表数据插入操作
            inboxTable.put( inboxPuts );

            //关闭收件箱表
            inboxTable.close();

        }

        //关闭资源(关系表和内容表)
        relaTable.close();
        contTable.close();
        connection.close();
    }

    //TODO 2.关注用户
    public static void addAttends(String uid, String... attends) throws IOException {
        //1.校验是否添加了待关注的人
        if (attends.length <= 0) {
            System.out.println( "请选择待关注的人。。。" );
            return;
        }

        //获取Connection对象
        Connection connection = ConnectionFactory.createConnection( Constants.CONFIGURATION );

        //第一部分:操作用户关系表
        //1.获取用户关系表对象
        Table relaTable = connection.getTable( TableName.valueOf( Constants.RELATION_TABLE ) );
        //2.创建一个集合,用于存放用户关系表的Put对象
        ArrayList<Put> relaPuts = new ArrayList<>();
        //3.创建操作者的Put对象
        Put uidPut = new Put( Bytes.toBytes( uid ) );
        //4.循环创建被故被关注者的Put对象
        for (String attend : attends) {
            //5.给操作者的Put对象赋值
            uidPut.addColumn( Bytes.toBytes(Constants.RELATION_TABLE_CF1),Bytes.toBytes(attend),Bytes.toBytes(attend));
            //6.创建被关注着的Put对象
            Put attendPut = new Put( Bytes.toBytes( attend ) );
            //7.给被关注着的Put对象复制
            attendPut.addColumn( Bytes.toBytes( Constants.RELATION_TABLE_CF2 ), Bytes.toBytes( uid ), Bytes.toBytes( uid ) );
            //8.将被关注着的Put对象放入集合
            relaPuts.add( attendPut );
        }
        //9.将操作者的Put对象添加至集合
        relaPuts.add( uidPut );

        //10.执行用户关系表插入数据操作
        relaTable.put( relaPuts );

        //第二部分:操作收件箱表
        //1.获取微博内容表
        Table contTable = connection.getTable( TableName.valueOf( Constants.CONTENT_TABLE ) );
        //2.创建收件箱表的Put对象
        Put inboxPut = new Put( Bytes.toBytes( uid ) );
        //3.循环attends,获取每个被关注者的近期发布的微博
        for (String attend:attends) {

            //4.获取当前被关注者近期发布的微博 (Scan) -> 集合ResultScanner
            Scan scan = new Scan(Bytes.toBytes(attend+"_"  ),Bytes.toBytes( attend+ "|" ));
            ResultScanner resultScanner = contTable.getScanner( scan );

            //TODO 定义一个时间戳
            long ts = System.currentTimeMillis();

            //5.获取的值进行遍历
            for (Result result:resultScanner) {

                //6.给收件箱表的Put对象赋值
                inboxPut.addColumn( Bytes.toBytes( Constants.INBOX_TABLE_CF )
                ,Bytes.toBytes(attend  ),ts++,result.getRow());
            }
        }
        //7.判断当前的Put对象是否为空
        if (!inboxPut.isEmpty()){

            //8.获取收件箱表数据
            Table inboxTable = connection.getTable( TableName.valueOf( Constants.INBOX_TABLE ) );
            //9.插入数据
            inboxTable.put( inboxPut );
            //10.关闭收件箱连接
            inboxTable.close();
        }

        //关闭资源
        relaTable.close();
        contTable.close();
        connection.close();
    }

    //TODO 3.取关用户
    public static void deleteAttends(String uid,String... dels) throws IOException {

        if (dels.length <= 0){
            System.out.println("请添加取关的用户");
            return;
        }

        //获取Connection对象
        Connection connection = ConnectionFactory.createConnection( Constants.CONFIGURATION );
        //第一部分:操作用户关系表
        //1.获取用户关系表
        Table relaTable = connection.getTable( TableName.valueOf( Constants.RELATION_TABLE ) );
        //2.创建一个集合,用于存放用户关系表的Delete对象
        ArrayList<Delete> relaDeletes = new ArrayList<>();
        //3.创建操作者的Delete对象
        Delete uidDelete = new Delete( Bytes.toBytes( uid ) );
        //4.循环创建被取关者的Delete对象
        for (String del:dels) {

            //5.给操作者Delete对象赋值
            uidDelete.addColumn( Bytes.toBytes( Constants.RELATION_TABLE_CF1 ),
                    Bytes.toBytes( del ));
            //6.创建被取关者的Delete对象
            Delete delDelete = new Delete( Bytes.toBytes( del ) );
            //7.给被取关者的Delete对象赋值
            delDelete.addColumn( Bytes.toBytes( Constants.RELATION_TABLE_CF2 ),Bytes.toBytes( uid ) );
            //8.将被取关者的Delete对象添加至集合
            relaDeletes.add( delDelete );
        }

        //9.将操作者的Delete对象添加至集合
        relaDeletes.add( uidDelete );
        //10.执行用户关系表的删除操作
        relaTable.delete( relaDeletes );
        //第二部分:操作收件箱表
        //1.获取收件箱表对象
        Table inboxTable = connection.getTable( TableName.valueOf( Constants.INBOX_TABLE ) );
        //2.创建操作者的Delete对象
        Delete inboxDelete = new Delete( Bytes.toBytes( uid ) );
        //3.给操作者的Delete对象赋值
        for (String del : dels) {
            inboxDelete.addColumn( Bytes.toBytes( Constants.INBOX_TABLE_CF ),Bytes.toBytes( del ) );
        }

        //4.执行收件箱表的删除操作
        inboxTable.delete( inboxDelete );

        //关闭资源
        relaTable.close();
        inboxTable.close();
        connection.close();

    }

    //TODO 4.获取初始化页面数据
    public static void getInt(String uid) throws IOException {
        //获取Connection对象
        Connection connection = ConnectionFactory.createConnection( Constants.CONFIGURATION );
        //1.获取收件箱表对象
        Table inboxTable = connection.getTable( TableName.valueOf( Constants.INBOX_TABLE ) );
        //2.获取微博内容表对象
        Table contTable = connection.getTable( TableName.valueOf( Constants.CONTENT_TABLE ) );

        //3.创建收件箱表Get对象,并获取数据(设置最大版本)
        Get inboxGet = new Get( Bytes.toBytes( uid ) );
        inboxGet.setMaxVersions();
        Result result = inboxTable.get( inboxGet );
        //4.遍历获取的数据
        for (Cell cell : result.rawCells()){

            //5.构建微博内容表Get对象
            Get contGet = new Get( CellUtil.cloneValue( cell ) );
            //6.获取该Get对象的数据内容
            Result contResult = contTable.get( contGet );

            //7.解析内容并打印
            for (Cell contCell : contResult.rawCells()) {

                System.out.println("PK:"+Bytes.toString(  CellUtil.cloneRow( contCell ))
                +",CF:"+Bytes.toString( CellUtil.cloneFamily( contCell ) )
                +",CN:"+Bytes.toString( CellUtil.cloneQualifier( contCell ) )
                +",Value:"+Bytes.toString( CellUtil.cloneValue( contCell ) ));
            }
        }
        //关闭资源
        inboxTable.close();
        contTable.close();
        connection.close();

    }

    //TODO 5.获取用户初始化界面 (获取某个人所有微博详情)
    public static void getWeiBo(String uid) throws IOException {
        //1.获取Connection对象
        Connection connection = ConnectionFactory.createConnection( Constants.CONFIGURATION );
        //2.获取微博内容表对象
        Table table = connection.getTable( TableName.valueOf( Constants.CONTENT_TABLE ) );
        //3.构建Scan对象
        Scan scan = new Scan();

        //构建过滤器
        RowFilter rowFilter = new RowFilter( CompareFilter.CompareOp. EQUAL,new SubstringComparator( uid+"_"));
        scan.setFilter( rowFilter );

        //4.获取数据
        ResultScanner resultScanner = table.getScanner( scan );
        //5.解析数据并打印
        for (Result result : resultScanner) {
            for (Cell cell : result.rawCells()) {

                System.out.println("PK:"+Bytes.toString(  CellUtil.cloneRow( cell ))
                        +",CF:"+Bytes.toString( CellUtil.cloneFamily( cell ) )
                        +",CN:"+Bytes.toString( CellUtil.cloneQualifier( cell ) )
                        +",Value:"+Bytes.toString( CellUtil.cloneValue( cell ) ));
            }
        }


        //6.关闭资源
        table.close();
        connection.close();

    }

}

6.最后进入测试方法,展示需求

package com.huan;

import com.huan.constants.Constants;
import com.huan.dao.HBaseDao;
import com.huan.utils.HBaseUtils;

import java.io.IOException;

public class WeiBoTest {

    public static void init(){

        try {
            //1.创建命名空间
            HBaseUtils.createNameSpace( Constants.NAMESPACE );
            //2.创建微博内容表
            HBaseUtils.createTable( Constants.CONTENT_TABLE,Constants.CONTENT_TABLE_VERSION,Constants.CONTENT_TABLE_CF );
            //3.创建用户关系表
            HBaseUtils.createTable( Constants.RELATION_TABLE,Constants.RELATION_TABLE_VERSION,Constants.RELATION_TABLE_CF1,Constants.RELATION_TABLE_CF2 );
            //4.创建收件箱表
            HBaseUtils.createTable( Constants.INBOX_TABLE,Constants.INBOX_TABLE_VERSION,Constants.INBOX_TABLE_CF );

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {

        /**
         * 1001:昊哥
         * 1002:不明人物
         * 1003:大欢欢
         */

        //TODO 1.初始化
        init();
        //TODO 1001发布微博
        HBaseDao.publishWeiBo( "1001","昊哥爱欢欢!!!" );
        //TODO 1002关注1001和1003
        HBaseDao.addAttends( "1002","1001","1003" );
        //TODO 获取1002初始化界面
        HBaseDao.getInt( "1002" );
        System.out.println("-------------1111----------------");
        //TODO 1003发布3条微博,同时1001发布2条微博
        HBaseDao.publishWeiBo( "1003","欢欢也爱昊哥");
        Thread.sleep( 10 );
        HBaseDao.publishWeiBo( "1001","爱你爱你,满脑子都是你");
        Thread.sleep( 10 );
        HBaseDao.publishWeiBo( "1003","真的丫,欢欢也是!!");
        Thread.sleep( 10 );
        HBaseDao.publishWeiBo( "1001","亲亲我的猪");
        Thread.sleep( 10 );
        HBaseDao.publishWeiBo( "1003","欢欢害羞!!");
        //TODO 再次获取1002初始化界面
        HBaseDao.getInt("1002"  );
        System.out.println("-------------2222-------------");
        //TODO 1002取关1003
        HBaseDao.deleteAttends( "1002","1003" );
        //TODO 再次获取1002初始化界面
        HBaseDao.getInt("1002"  );
        System.out.println("-------------3333-------------");
        //TODO 1002再次关注1003
        HBaseDao.addAttends( "1002","1003" );
        //TODO 获取1002初始化页面
        HBaseDao.getInt("1002"  );
        System.out.println("-------------4444--------------");

        //TODO 获取1001微博详情
        HBaseDao.getWeiBo( "1001" );
    }

}