批量生成datax同步JSON(mysql到doris)
1.问题描述
使用datax同步mysql数据到doris,表的数量过多,写datax的配置文件很麻烦。鉴于此,编写了一个datax的配置文件生成脚本,可以灵活的实现一键生成配置文件,提高生产效率。
废话不多说,脚本如下
2.问题解决
vim gen_import_mysql_config_simple.py
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb
#MySQL相关配置,需根据实际情况作出修改
mysql_host = "xxx"
mysql_port = "xxx"
mysql_user = "xxx"
mysql_passwd = "xxx"
#HDFS NameNode相关配置,需根据实际情况作出修改
doris_host = "xxx"
doris_port = "xxx"
doris_http_port = "xxx"
doris_user = "xxx"
doris_passwd = "xxx"
condition = True
#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/data/job"
def get_connection():
return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)
def get_mysql_meta(database, table):
connection = get_connection()
cursor = connection.cursor()
sql = "SELECT COLUMN_NAME from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
cursor.execute(sql, [database, table])
fetchall = cursor.fetchall()
cursor.close()
connection.close()
return fetchall
def get_mysql_columns(database, table):
return map(lambda x: x[0], get_mysql_meta(database, table))
def generate_json(source_database, source_table,sink_database,sink_table):
job = {
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": mysql_user,
"password": mysql_passwd,
"column": get_mysql_columns(source_database, source_table),
"fetchSize": 1024,
"where": "1 = 1",
"connection": [{
"table": [source_table],
"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
}]
}
},
"writer": {
"name": "doriswriter",
"parameter": {
"loadUrl": [doris_host + ":" + doris_port],
"column": get_mysql_columns(source_database, source_table),
"username": doris_user,
"password": doris_passwd,
"flushInterval":30000,
"connection": [
{
"jdbcUrl": "jdbc:mysql://" + doris_host + ":" + doris_http_port + "/" + sink_database,
"selectedDatabase": sink_database,
"table": [sink_table]
}
],
"loadProps": {
"format": "json",
"strip_outer_array": condition
}
}
}
}]
}
}
if not os.path.exists(output_path):
os.makedirs(output_path)
with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
json.dump(job, f)
def main(args):
source_database = ""
source_table = ""
sink_database = ""
sink_table = ""
options, arguments = getopt.getopt(args, '-d:-t:-s:-f:', ['sourcedb=','sourcetbl=','sinkdb=','sinktbl='])
for opt_name, opt_value in options:
if opt_name in ('-d', '--sourcedb'):
source_database = opt_value
if opt_name in ('-t', '--sourcetbl'):
source_table = opt_value
if opt_name in ('-s', '--sinkdb'):
sink_database = opt_value
if opt_name in ('-f', '--sinktbl'):
sink_table = opt_value
generate_json(source_database, source_table,sink_database,sink_table)
if __name__ == '__main__':
main(sys.argv[1:])
3.脚本使用
python ./gen_import_mysql_config_simple.py -d mysql_database -t mysql_table -s doris_database -f doris_table