批量生成datax同步JSON(mysql到doris)

img

1.问题描述

使用datax同步mysql数据到doris,表的数量过多,写datax的配置文件很麻烦。鉴于此,编写了一个datax的配置文件生成脚本,可以灵活的实现一键生成配置文件,提高生产效率。
废话不多说,脚本如下

2.问题解决

vim gen_import_mysql_config_simple.py
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host = "xxx"
mysql_port = "xxx"
mysql_user = "xxx"
mysql_passwd = "xxx"

#HDFS NameNode相关配置,需根据实际情况作出修改
doris_host = "xxx"
doris_port = "xxx"
doris_http_port = "xxx"
doris_user = "xxx"
doris_passwd = "xxx"
condition = True


#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/data/job"


def get_connection():
    return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)


def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall
def get_mysql_columns(database, table):
    return map(lambda x: x[0], get_mysql_meta(database, table))


def generate_json(source_database, source_table,sink_database,sink_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 1
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(source_database, source_table),
                        "fetchSize": 1024,
                        "where": "1 = 1",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
                        }]
                    }
                },

                "writer": {
                    "name": "doriswriter",
                    "parameter": {
                        "loadUrl": [doris_host + ":" + doris_port],
                        "column": get_mysql_columns(source_database, source_table),
                        "username": doris_user,
                        "password": doris_passwd,
                        "flushInterval":30000,
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://" + doris_host + ":" + doris_http_port + "/" + sink_database,
                                "selectedDatabase": sink_database,
                                "table": [sink_table]
                            }
                        ],
                        "loadProps": {
                            "format": "json",
                            "strip_outer_array": condition
                        }
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)

def main(args):
    source_database = ""
    source_table = ""
    sink_database = ""
    sink_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:-s:-f:', ['sourcedb=','sourcetbl=','sinkdb=','sinktbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
           source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value
        if opt_name in ('-s', '--sinkdb'):
            sink_database = opt_value
        if opt_name in ('-f', '--sinktbl'):
            sink_table = opt_value
    generate_json(source_database, source_table,sink_database,sink_table)


if __name__ == '__main__':
    main(sys.argv[1:])

3.脚本使用

python ./gen_import_mysql_config_simple.py -d mysql_database -t mysql_table -s doris_database -f doris_table