《数据同步-NIFI系列》NiFi同步中文表、中文字段名
NiFi同步中文表、中文字段名
一 目的
假设Oracle数据库中有一个中文表,其字段名为纯中文。我们希望将表数据同步到MySQL数据库中,在MySQL数据库中创建同等类型的英文表名称和英文字段名称。那么在中文字段向英文字段同步过程中会出现异常导致不能直接同步。
二 表介绍
源表
CREATE TABLE ZXY.中文表_ZXY
(
姓名 VARCHAR(20) null,
性别 VARCHAR(20) null,
年龄 VARCHAR(20) null
) ;
insert into "中文表_ZXY" values('小A','男','8');
insert into "中文表_ZXY" values('小B','女','9');
目标表
create table zxy.c_zxy
(
name varchar(20) null,
sex varchar(20) null,
age varchar(20) null
) ;
二 Nifi全量同步中文表-普通流程
最常见的全量同步,通过ExecuteSQL组件truncate目标表,使用ExecuteSQLRecord组件查询源表数据( AvroRecordSetWriter写数据),最后通过PutDataBaseRecord将数据存到目标表中( AvroReader读数据)。但是最后报错"due to None of the fields in the record map to the columns defined by the c_zxy table",中文表和英文表之间找不到映射关系,那么应该调整流程,使其具有映射关系。

三 Nifi全量同步中文表-自定义流程
整体流程使用七个组件,第一步通过ExecutSQL组件truncate目标表,第二步使用ExecuteSQLRecord组件查询源表数据( AvroRecordSetWriter写数据),第三步使用ConvertAvroToJSON组件将Avro数据转成JSON类型数据,第四步使用SplitJson将整个JSON切分成一条一条的JSON数据,第五步使用EvaluateJsonPath解析中文表的字段并将其映射为英文名,第六步使用ReplaceText获取上一步的参数并写成SQL语句insert into zxy.c_zxy(name,sex,age) values('${name}','${sex}','${age}'),第七步使用PutSQL组件直接执行上一步传来的SQL语句。

目标表成功同步到数据
mysql> select * from zxy.c_zxy;
+------+------+------+
| name | sex | age |
+------+------+------+
| 小A | 男 | 8 |
| 小B | 女 | 9 |
+------+------+------+
2 rows in set (0.00 sec)
四 自定义流程介绍
4.1 ExecutSQL
ExecuteSQL组件,Database Connection Pooling Service配置数据库,SQL select query配置SQL实现清空目标表数据。其余配置项根据实际场景调整。

4.2 ExecuteSQLRecord
ExecuteSQLRecord组件,Database Connection Pooling Service配置数据库,SQL select query配置SQL实现查询源表数据,Record Writer配置AvroRecordWriter。其余配置项根据实际场景调整。

4.3 ConvertAvroToJSON
ConvertAvroToJSON未调整配置,其余配置项根据实际场景调整。

4.4 SplitJson
SplitJson组件,未调整配置。其余配置项根据实际场景调整。

4.5 EvaluateJsonPath
EvaluateJsonPath组件,Destination配置为flowfile-attribute,便于将我们的添加的配置项分发。通过$获取数据,将其映射为英文名。其余配置项根据实际场景调整。

4.6 ReplaceText
ReplaceText组件,Replacement Value配置为insert into zxy.c_zxy(name,sex,age) values('${name}','${sex}','${age}'),通过$获取上一步中分发的数据,并组成SQL语句。其余配置项根据实际场景调整。

4.7 PutSQL
PutSQL组件,接收上一步的SQL语句,并放到数据库中执行。其余配置项根据实际场景调整。

五 数据源配置介绍
5.1 源数据库
DBCPConnectionPool组件,Database Connection URL配置jdbc:oracle:thin:@IP:PORT:服务名,Database Driver Class Name配置oracle.jdbc.driver.OracleDriver,Database Driver Location为在安装Nifi服务的服务器上的路径,Database User配置数据库连接用户名,Password配置数据库连接密码。其余配置项根据实际场景调整。

5.2 目标数据库
DBCPConnectionPool组件,Database Connection URL配置jdbc:mysql://IP:PORT/zxy?characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true,Database Driver Class Name配置com.mysql.jdbc.Driver,Database Driver Location为在安装Nifi服务的服务器上的路径,Database User配置数据库连接用户名,Password配置数据库连接密码。其余配置项根据实际场景调整。
