HBase快速入门系列 (九)| HBase 的实战案例(微博)
文章目录
HBase实战之谷粒微博
需求分析
-
微博内容的浏览,数据库表设计
-
用户社交体现:关注用户,取关用户
-
拉取关注的人的微博内容
表的分类
1.微博内容表
表结构:
方法名 | creatTableeContent |
---|---|
Table Name | weibo:content |
RowKey | 用户 ID_时间戳 |
ColumnFamily | info |
ColumnLabel | 标题,内容,图片 |
Version | 1 个版本 |
2.用户关系表
表结构:
方法名 | createTableRelations |
---|---|
Table Name | weibo:relation |
RowKey | 用户 ID |
ColumnFamily | attends、fans |
ColumnLabel | 关注用户 ID,粉丝用户 ID |
ColumnValue | 用户 ID |
Version | 1 个版本 |
3.微博收件箱表
表结构:
方法名 | createTableReceiveContentEmails |
---|---|
Table Name | weibo:inbox |
RowKey | 用户 ID |
ColumnFamily | info |
ColumnLabel | 用户 ID |
ColumnValue | 取微博内容的 RowKey |
Version | 2 |
代码实现
1.代码设计总览:
-
创建命名空间以及表名的定义
-
创建微博内容表
-
创建用户关系表
-
创建用户微博内容接收邮件表
-
发布微博内容
-
添加关注用户
-
移除(取关)用户
-
获取关注的人的微博内容
-
测试
2. 环境搭建
1.pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>Hadoop</artifactId>
<groupId>com.huan</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>guli-weibo</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
</project>
2.包名规则
3.resources文件 (hbase-site.xml 可以直接在本地运行,无需打包jar)
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://Bigdata01:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- 0.98后的新变动,之前版本没有.port,默认端口为60000 -->
<property>
<name>hbase.master.port</name>
<value>16000</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>Bigdata01:2181,Bigdata02:2181,Bigdata03:2181</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/opt/module/zookeeper-3.4.10/zkData</value>
</property>
<property>
<name>hbase.master.maxclockskew</name>
<value>180000</value>
<description>Time difference of regionserver from master</description>
</property>
</configuration>
3.创建命名空间以及表名的定义
这里我们设置一个常量给它们定义好方便后续的使用,在后续使用时直接调用即可:
package com.huan.constants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
public class Constants {
//TODO 定义上下文配置信息
public static final Configuration CONFIGURATION = HBaseConfiguration.create();
//TODO 定义命名空间
public static final String NAMESPACE = "weibo";
//TODO 微博内容表
public static final String CONTENT_TABLE = "weibo:content";
public static final String CONTENT_TABLE_CF = "info";
public static final int CONTENT_TABLE_VERSION = 1;
//TODO 用户关系表
public static final String RELATION_TABLE = "weibo:relation";
public static final String RELATION_TABLE_CF1 = "attends";
public static final String RELATION_TABLE_CF2 = "fans";
public static final int RELATION_TABLE_VERSION = 1;
//TODO 收件箱表
public static final String INBOX_TABLE = "weibo:inbox";
public static final String INBOX_TABLE_CF = "info";
public static final int INBOX_TABLE_VERSION = 2;
}
4.设置一个工具类,在工具类里面将命名空间和表的参数设置好方便后续调用
package com.huan.utils;
import com.huan.constants.Constants;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
/**
* 1.创建命名空间
* 2.判断表是否存在
* 3.创建表 (三张表)
*
*/
public class HBaseUtils {
public static Connection connection = null;
public static Admin admin = null;
//TODO 1.创建命名空间
public static void createNameSpace(String nameSpace) throws IOException {
//1.获取Connection对象
connection = ConnectionFactory.createConnection( Constants.CONFIGURATION );
//2.获取Admin对象
admin = connection.getAdmin();
//3.构建命名空间描述器
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create( nameSpace ).build();
//4.创建命名空间
admin.createNamespace( namespaceDescriptor );
//5.关闭资源
admin.close();
connection.close();
}
//TODO 2.判断表是否存在
public static boolean isTableExist(String tableName) throws IOException {
//1.判断表是否存在
boolean exists = admin.tableExists( TableName.valueOf( tableName ) );
//2.关闭资源
admin.close();
connection.close();
//3.返回结果
return exists;
}
//TODO 3.创建表
public static void createTable(String tableName,int version,String... cfs) throws IOException {
//1.判断是否传入了列族信息
if(cfs.length < 0){
System.out.println("请设置列族信息");
return;
}
//2.判断表是否存在
if(isTableExist( tableName )){
System.out.println(tableName+"表已存在");
return;
}
//3.获取Connection对象
connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
//4.获取Admin对象
admin = connection.getAdmin();
//5.创建表描述器
HTableDescriptor hTableDescriptor = new HTableDescriptor( TableName.valueOf( tableName ) );
//6.循环添加列族信息
for (String cf : cfs) {
//遍历列族
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor( cf );
//7.设置版本
hColumnDescriptor.setMaxVersions( version );
hTableDescriptor.addFamily( hColumnDescriptor );
}
//8.创建表操作
admin.createTable( hTableDescriptor );
//9.关闭资源
admin.close();
connection.close();
}
}
5.写好相应的业务需求方便测试调用
package com.huan.dao;
import com.huan.constants.Constants;
import com.huan.utils.HBaseUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
/**
* 1.发布微博
* 2.删除微博
* 3.关注用户
* 4.取关用户
* 5.获取用户初始化界面
* 6.获取微博详情
*/
public class HBaseDao {
//TODO 1.发布微博
public static void publishWeiBo(String uid, String content) throws IOException {
//1.获取Connection对象
Connection connection = HBaseUtils.connection;
connection = ConnectionFactory.createConnection( Constants.CONFIGURATION );
//第一部分:操作微博内容表
//1.获取微博内容表对象
Table contTable = connection.getTable( TableName.valueOf( Constants.CONTENT_TABLE ) );
//2.获取当前时间戳
long ts = System.currentTimeMillis();
//3.获取RowKey
String rowKey = uid + "_" + ts;
//4.创建Put对象
Put contPut = new Put( Bytes.toBytes( rowKey ) );
//5.给Put对象赋值
contPut.addColumn( Bytes.toBytes( Constants.CONTENT_TABLE_CF ), Bytes.toBytes( "content" ), Bytes.toBytes( content ) );
//6.执行插入数据操作
contTable.put( contPut );
//第二部分操作微博收件箱表
//1.获取用户关系表对象
Table relaTable = connection.getTable( TableName.valueOf( Constants.RELATION_TABLE ) );
//2.获取当前发布微博人的fans列族数据
Get get = new Get( Bytes.toBytes( uid ) );
Result result = relaTable.get( get );
//3.创建一个集合,用户存放微博内容表的Put对象
ArrayList<Put> inboxPuts = new ArrayList<>();
//4.遍历粉丝
for (Cell cell : result.rawCells()) {
//5.构建微博收件箱表的Put对象
Put inboxPut = new Put( CellUtil.cloneQualifier( cell ) );
//6.给收件箱表的Put对象赋值
inboxPut.addColumn( Bytes.toBytes( Constants.INBOX_TABLE_CF ), Bytes.toBytes( uid ), Bytes.toBytes( rowKey ) );
//7.将收件箱表的Put对象存入集合
inboxPuts.add( inboxPut );
}
//8.判断是否有粉丝
if (inboxPuts.size() > 0) {
//获取收件箱表对象
Table inboxTable = connection.getTable( TableName.valueOf( Constants.INBOX_TABLE ) );
//执行收件箱表数据插入操作
inboxTable.put( inboxPuts );
//关闭收件箱表
inboxTable.close();
}
//关闭资源(关系表和内容表)
relaTable.close();
contTable.close();
connection.close();
}
//TODO 2.关注用户
public static void addAttends(String uid, String... attends) throws IOException {
//1.校验是否添加了待关注的人
if (attends.length <= 0) {
System.out.println( "请选择待关注的人。。。" );
return;
}
//获取Connection对象
Connection connection = ConnectionFactory.createConnection( Constants.CONFIGURATION );
//第一部分:操作用户关系表
//1.获取用户关系表对象
Table relaTable = connection.getTable( TableName.valueOf( Constants.RELATION_TABLE ) );
//2.创建一个集合,用于存放用户关系表的Put对象
ArrayList<Put> relaPuts = new ArrayList<>();
//3.创建操作者的Put对象
Put uidPut = new Put( Bytes.toBytes( uid ) );
//4.循环创建被故被关注者的Put对象
for (String attend : attends) {
//5.给操作者的Put对象赋值
uidPut.addColumn( Bytes.toBytes(Constants.RELATION_TABLE_CF1),Bytes.toBytes(attend),Bytes.toBytes(attend));
//6.创建被关注着的Put对象
Put attendPut = new Put( Bytes.toBytes( attend ) );
//7.给被关注着的Put对象复制
attendPut.addColumn( Bytes.toBytes( Constants.RELATION_TABLE_CF2 ), Bytes.toBytes( uid ), Bytes.toBytes( uid ) );
//8.将被关注着的Put对象放入集合
relaPuts.add( attendPut );
}
//9.将操作者的Put对象添加至集合
relaPuts.add( uidPut );
//10.执行用户关系表插入数据操作
relaTable.put( relaPuts );
//第二部分:操作收件箱表
//1.获取微博内容表
Table contTable = connection.getTable( TableName.valueOf( Constants.CONTENT_TABLE ) );
//2.创建收件箱表的Put对象
Put inboxPut = new Put( Bytes.toBytes( uid ) );
//3.循环attends,获取每个被关注者的近期发布的微博
for (String attend:attends) {
//4.获取当前被关注者近期发布的微博 (Scan) -> 集合ResultScanner
Scan scan = new Scan(Bytes.toBytes(attend+"_" ),Bytes.toBytes( attend+ "|" ));
ResultScanner resultScanner = contTable.getScanner( scan );
//TODO 定义一个时间戳
long ts = System.currentTimeMillis();
//5.获取的值进行遍历
for (Result result:resultScanner) {
//6.给收件箱表的Put对象赋值
inboxPut.addColumn( Bytes.toBytes( Constants.INBOX_TABLE_CF )
,Bytes.toBytes(attend ),ts++,result.getRow());
}
}
//7.判断当前的Put对象是否为空
if (!inboxPut.isEmpty()){
//8.获取收件箱表数据
Table inboxTable = connection.getTable( TableName.valueOf( Constants.INBOX_TABLE ) );
//9.插入数据
inboxTable.put( inboxPut );
//10.关闭收件箱连接
inboxTable.close();
}
//关闭资源
relaTable.close();
contTable.close();
connection.close();
}
//TODO 3.取关用户
public static void deleteAttends(String uid,String... dels) throws IOException {
if (dels.length <= 0){
System.out.println("请添加取关的用户");
return;
}
//获取Connection对象
Connection connection = ConnectionFactory.createConnection( Constants.CONFIGURATION );
//第一部分:操作用户关系表
//1.获取用户关系表
Table relaTable = connection.getTable( TableName.valueOf( Constants.RELATION_TABLE ) );
//2.创建一个集合,用于存放用户关系表的Delete对象
ArrayList<Delete> relaDeletes = new ArrayList<>();
//3.创建操作者的Delete对象
Delete uidDelete = new Delete( Bytes.toBytes( uid ) );
//4.循环创建被取关者的Delete对象
for (String del:dels) {
//5.给操作者Delete对象赋值
uidDelete.addColumn( Bytes.toBytes( Constants.RELATION_TABLE_CF1 ),
Bytes.toBytes( del ));
//6.创建被取关者的Delete对象
Delete delDelete = new Delete( Bytes.toBytes( del ) );
//7.给被取关者的Delete对象赋值
delDelete.addColumn( Bytes.toBytes( Constants.RELATION_TABLE_CF2 ),Bytes.toBytes( uid ) );
//8.将被取关者的Delete对象添加至集合
relaDeletes.add( delDelete );
}
//9.将操作者的Delete对象添加至集合
relaDeletes.add( uidDelete );
//10.执行用户关系表的删除操作
relaTable.delete( relaDeletes );
//第二部分:操作收件箱表
//1.获取收件箱表对象
Table inboxTable = connection.getTable( TableName.valueOf( Constants.INBOX_TABLE ) );
//2.创建操作者的Delete对象
Delete inboxDelete = new Delete( Bytes.toBytes( uid ) );
//3.给操作者的Delete对象赋值
for (String del : dels) {
inboxDelete.addColumn( Bytes.toBytes( Constants.INBOX_TABLE_CF ),Bytes.toBytes( del ) );
}
//4.执行收件箱表的删除操作
inboxTable.delete( inboxDelete );
//关闭资源
relaTable.close();
inboxTable.close();
connection.close();
}
//TODO 4.获取初始化页面数据
public static void getInt(String uid) throws IOException {
//获取Connection对象
Connection connection = ConnectionFactory.createConnection( Constants.CONFIGURATION );
//1.获取收件箱表对象
Table inboxTable = connection.getTable( TableName.valueOf( Constants.INBOX_TABLE ) );
//2.获取微博内容表对象
Table contTable = connection.getTable( TableName.valueOf( Constants.CONTENT_TABLE ) );
//3.创建收件箱表Get对象,并获取数据(设置最大版本)
Get inboxGet = new Get( Bytes.toBytes( uid ) );
inboxGet.setMaxVersions();
Result result = inboxTable.get( inboxGet );
//4.遍历获取的数据
for (Cell cell : result.rawCells()){
//5.构建微博内容表Get对象
Get contGet = new Get( CellUtil.cloneValue( cell ) );
//6.获取该Get对象的数据内容
Result contResult = contTable.get( contGet );
//7.解析内容并打印
for (Cell contCell : contResult.rawCells()) {
System.out.println("PK:"+Bytes.toString( CellUtil.cloneRow( contCell ))
+",CF:"+Bytes.toString( CellUtil.cloneFamily( contCell ) )
+",CN:"+Bytes.toString( CellUtil.cloneQualifier( contCell ) )
+",Value:"+Bytes.toString( CellUtil.cloneValue( contCell ) ));
}
}
//关闭资源
inboxTable.close();
contTable.close();
connection.close();
}
//TODO 5.获取用户初始化界面 (获取某个人所有微博详情)
public static void getWeiBo(String uid) throws IOException {
//1.获取Connection对象
Connection connection = ConnectionFactory.createConnection( Constants.CONFIGURATION );
//2.获取微博内容表对象
Table table = connection.getTable( TableName.valueOf( Constants.CONTENT_TABLE ) );
//3.构建Scan对象
Scan scan = new Scan();
//构建过滤器
RowFilter rowFilter = new RowFilter( CompareFilter.CompareOp. EQUAL,new SubstringComparator( uid+"_"));
scan.setFilter( rowFilter );
//4.获取数据
ResultScanner resultScanner = table.getScanner( scan );
//5.解析数据并打印
for (Result result : resultScanner) {
for (Cell cell : result.rawCells()) {
System.out.println("PK:"+Bytes.toString( CellUtil.cloneRow( cell ))
+",CF:"+Bytes.toString( CellUtil.cloneFamily( cell ) )
+",CN:"+Bytes.toString( CellUtil.cloneQualifier( cell ) )
+",Value:"+Bytes.toString( CellUtil.cloneValue( cell ) ));
}
}
//6.关闭资源
table.close();
connection.close();
}
}
6.最后进入测试方法,展示需求
package com.huan;
import com.huan.constants.Constants;
import com.huan.dao.HBaseDao;
import com.huan.utils.HBaseUtils;
import java.io.IOException;
public class WeiBoTest {
public static void init(){
try {
//1.创建命名空间
HBaseUtils.createNameSpace( Constants.NAMESPACE );
//2.创建微博内容表
HBaseUtils.createTable( Constants.CONTENT_TABLE,Constants.CONTENT_TABLE_VERSION,Constants.CONTENT_TABLE_CF );
//3.创建用户关系表
HBaseUtils.createTable( Constants.RELATION_TABLE,Constants.RELATION_TABLE_VERSION,Constants.RELATION_TABLE_CF1,Constants.RELATION_TABLE_CF2 );
//4.创建收件箱表
HBaseUtils.createTable( Constants.INBOX_TABLE,Constants.INBOX_TABLE_VERSION,Constants.INBOX_TABLE_CF );
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
/**
* 1001:昊哥
* 1002:不明人物
* 1003:大欢欢
*/
//TODO 1.初始化
init();
//TODO 1001发布微博
HBaseDao.publishWeiBo( "1001","昊哥爱欢欢!!!" );
//TODO 1002关注1001和1003
HBaseDao.addAttends( "1002","1001","1003" );
//TODO 获取1002初始化界面
HBaseDao.getInt( "1002" );
System.out.println("-------------1111----------------");
//TODO 1003发布3条微博,同时1001发布2条微博
HBaseDao.publishWeiBo( "1003","欢欢也爱昊哥");
Thread.sleep( 10 );
HBaseDao.publishWeiBo( "1001","爱你爱你,满脑子都是你");
Thread.sleep( 10 );
HBaseDao.publishWeiBo( "1003","真的丫,欢欢也是!!");
Thread.sleep( 10 );
HBaseDao.publishWeiBo( "1001","亲亲我的猪");
Thread.sleep( 10 );
HBaseDao.publishWeiBo( "1003","欢欢害羞!!");
//TODO 再次获取1002初始化界面
HBaseDao.getInt("1002" );
System.out.println("-------------2222-------------");
//TODO 1002取关1003
HBaseDao.deleteAttends( "1002","1003" );
//TODO 再次获取1002初始化界面
HBaseDao.getInt("1002" );
System.out.println("-------------3333-------------");
//TODO 1002再次关注1003
HBaseDao.addAttends( "1002","1003" );
//TODO 获取1002初始化页面
HBaseDao.getInt("1002" );
System.out.println("-------------4444--------------");
//TODO 获取1001微博详情
HBaseDao.getWeiBo( "1001" );
}
}