根据MySQL批量生成DataX配置文件和Hive建表语句

  |   0 评论   |   1,259 浏览

DataX使用json文件进行任务的配置,然而,很多地方都需要指定columns属性,如果需要接入一个有着几百张表的数据库,想想就头大啊,那么我们试着将生成配置文件的步骤自动化起来

准备环境

安装MySQL-python

验证是否已安装

将如下内容保存为python文件,然后运行

#! /usr/bin/python
# -*- coding:utf-8  -*-
import MySQLdb

如果提示“No module named MySQLdb”,意味着你没有安装 MySQLdb 模块

[root@dx3 src]# ./generateDataXConf.py 
Traceback (most recent call last):
  File "./generateDataXConf.py", line 3, in <module>
    import MySQLdb
ImportError: No module named MySQLdb

安装MySQL-python

pip install MySQL-python

写工具

代码

我们以mysqlreader->hdfswriter为例

#! /usr/bin/python
# -*- coding:utf-8  -*-
import MySQLdb
import json
import sys
import os
from optparse import OptionParser
from optparse import OptionGroup

def convertType(orignalType):
    '''
    将MySQL的字段类型转换为Hive的字段类型
    :param orignalType: MySQL的字段类型
    :return:Hive的字段类型
    '''
    destType = None
    # 数值型
    if orignalType.startswith("tinyint"):
        destType = "TINYINT"
    elif orignalType.startswith("smallint"):
        destType = "SMALLINT"
    elif orignalType.startswith("mediumint"):
        destType = "INT"
    elif orignalType.startswith("int"):
        destType = "INT"
    elif orignalType.startswith("bigint"):
        destType = "BIGINT"
    elif orignalType.startswith("float"):
        destType = "FLOAT"
    elif orignalType.startswith("double"):
        destType = "DOUBLE"
    elif orignalType.startswith("numeric"):
        destType = "DOUBLE"
    # 字符串
    elif orignalType.startswith("char"):
        destType = "STRING"
    elif orignalType.startswith("varchar"):
        destType = "STRING"
    elif orignalType.startswith("tinytext"):
        destType = "STRING"
    elif orignalType.startswith("text"):
        destType = "STRING"
    elif orignalType.startswith("mediumtext"):
        destType = "STRING"
    elif orignalType.startswith("longtext"):
        destType = "STRING"
    # 日期时间类型
    elif orignalType.startswith("date"):
        destType = "DATE"
    elif orignalType.startswith("time"):
        destType = "DATE"
    elif orignalType.startswith("year"):
        destType = "DATE"
    elif orignalType.startswith("datetime"):
        destType = "DATE"
    elif orignalType.startswith("timestamp"):
        destType = "TIMESTAMP"
    else:
        destType = "UNSUPPORTED"
        print "未匹配到合适的类型:%s" % orignalType
    return destType


def listTables(db):
    '''
    列出指定MySQL库下所有表
    :param db: 数据库连接
    :return: 表列表
    '''
    sql = "SHOW TABLES;"
    tables = []
    try:
        cursor = db.cursor()
        cursor.execute(sql)
        results = cursor.fetchall()
        for row in results:
            table = row[0]
            tables.append(table)
    except Exception, e:
        print e
    return tables

def getColumns(db, table):
    '''
    生成Reader和Writer对应的Columns属性
    :param db:数据库连接
    :param table:   表名
    :return:    tuple(mysqlreder的columns和hdfswriter的columns)
    '''
    sql = "DESC " + table
    hdfsColumns = []
    mysqlColumns = []
    try:
        cursor = db.cursor()
        cursor.execute(sql)
        results = cursor.fetchall()
        for row in results:
            field = row[0]
            type = row[1]
            is_null = row[2]
            print "field:%s, type:%s, is_null:%s" %(field, type, is_null)
            column = {}
            column['name'] = field
            column_type = convertType(type)
            if column_type == "UNSUPPORTED":
                print "不支持字段:%s的类型:%s" % (field, type)
                continue
            else:
                column['type'] = column_type
                hdfsColumns.append(column)
                mysqlColumns.append(field)
    except Exception, e:
        print e
    return mysqlColumns,hdfsColumns


def generateJobConfig(mysqlColumns, hdfsColumns, table, fields):
    '''
    生成job配置对象
    :param mysqlColumns: mysqlreder的columns
    :param hdfsColumns: hdfswriter的columns
    :param table: 表名
    :param fields: 属性字典
    :return:job配置对象
    '''
    jobTemplate={
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "column": [],
                            "connection": [
                                {
                                    "jdbcUrl": [],
                                    "table": []
                                }
                            ],
                            "password": "",
                            "username": "",
                            "where": "${where}"
                        }
                    },
                    "writer": {
                        "name": "hdfswriter",
                        "parameter": {
                            "column": [],
                            "defaultFS": "hdfs://ns1",
                            "hadoopConfig":{
                                "dfs.nameservices": "ns1",
                                "dfs.ha.namenodes.ns1": "namenode326,namenode418",
                                "dfs.namenode.rpc-address.ns1.namenode326": "nn1.cxy7.com:8020",
                                "dfs.namenode.rpc-address.ns1.namenode418": "nn2.cxy7.com:8020",
                                "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                            },
                            "coreSite": "/usr/local/hadoop/etc/hadoop/core-site.xml",
                            "hdfsSite": "/usr/local/hadoop/etc/hadoop/hdfs-site.xml",
                            "compress": "",
                            "defaultFS": "",
                            "fieldDelimiter": "",
                            "fileName": "",
                            "fileType": "",
                            "path": "",
                            "writeMode": ""
                        }
                    }
                }
            ],
            "setting": {
                "speed": {
                    "channel": ""
                }
            },
            "core": {
                "transport":{
                    "channel": {
                        "speed":{
                            "record":"1000"
                        }
                    }
                }
            }
        }
    }
    # mysqlreader
    readerParameter = jobTemplate['job']['content'][0]['reader']['parameter']
    readerParameter['column'] = mysqlColumns
    readerParameter['username'] = fields['user']
    readerParameter['password'] = fields['password']
    readerParameter['connection'][0]['table'].append(table)
    url = "jdbc:mysql://%s:%d/%s" % (fields['host'], int(fields['port']), fields['db'])
    readerParameter['connection'][0]['jdbcUrl'].append(url)
    # hdfswriter
    writerParameter = jobTemplate['job']['content'][0]['writer']['parameter']
    writerParameter['column'] = hdfsColumns
    writerParameter['compress'] = fields['compress']
    writerParameter['defaultFS'] = fields['defaultFS']
    writerParameter['fieldDelimiter'] = fields['fieldDelimiter']
    writerParameter['fileName'] = table
    writerParameter['fileType'] = fields['fileType']
    writerParameter['path'] = fields['path'] + '/' + table
    writerParameter['writeMode'] = fields['writeMode']

    jobTemplate['job']['setting']['speed']['channel'] = fields['channel']
    return jobTemplate


def generateHiveDDL(table, hdfsColumns, fields):
    '''
    生成Hive建表语句
    :param table:mysql表名
    :param hdfsColumns:hdfswriter的columns
    :param fields:属性字典
    :return:Hive建表语句
    '''
    sql = "CREATE EXTERNAL TABLE " + table + '('
    for column in hdfsColumns:
        sql += '`%s` %s,' % (column['name'], column['type'])
    sql = sql[0:len(sql) - 1]
    sql += ') '
    sql += 'PARTITIONED BY ('
    sql += '`dt` string) '
    sql += 'STORED AS %s ' % fields['fileType']
    sql += "LOCATION 'hdfs://ns1%s/%s'" % (fields['path'], table)
    print sql
    return sql


def generateJson(db, table, fields):
    '''
    为表生成配置对象和Hive建表语句,并导出到文件
    :param db:数据库连接
    :param table:表名
    :param fields:属性字典
    :return:
    '''
    mysqlColumns, hdfsColumns = getColumns(db, table)
    jobConf = generateJobConfig(mysqlColumns, hdfsColumns, table, fields)
    sql = generateHiveDDL(table, hdfsColumns, fields)
    if fields.get('output'):
        if (not os.path.exists(fields['output'])):
            os.mkdir(fields['output'])
        # 导出配置文件
        filePath = '%s/%s.json' % (fields['output'], table);
        print "dump to %s" % filePath
        file = open(filePath, 'w')
        json.dump(jobConf, file, indent=4, sort_keys=False)
        # print json.dumps(jobConf, indent=4, sort_keys=False)
        # 导出Hive建表语句
        sqlPath = '%s/%s.sql' % (fields['output'], table);
        file = open(sqlPath, 'w')
        print 'export sql to %s' % sqlPath
        file.write(sql)
        file.close()


if __name__ == "__main__":
    usage = "Usage: %prog [options] mysqlreader and hdfswriter config"
    parser = OptionParser(usage=usage)
    dbGroup = OptionGroup(parser, "MySQL Config", "...")
    dbGroup.add_option('-H', '--host', help='mysql host')
    dbGroup.add_option('-P', '--port', default="3306", help='mysql port')
    dbGroup.add_option('-u', '--user', default="root", help='mysql username')
    dbGroup.add_option('-p', '--password', help='mysql password')
    dbGroup.add_option('-D', '--db', help="mysql db")
    dbGroup.add_option('-T', '--table', help=u"mysql table,如果不指定,表示为整个库的所有表生成配置文件")
    parser.add_option_group(dbGroup)

    hdfsGroup = OptionGroup(parser, "HDFS Config", '...')
    hdfsGroup.add_option('--compress', default="NONE", help=u"压缩选项")
    hdfsGroup.add_option('--defaultFS', default="\t", help="HDFS")
    hdfsGroup.add_option('--fieldDelimiter', default="\t", help=u"字段分隔符")
    hdfsGroup.add_option('--fileType', default="TEXT", help=u"输出文件类型")
    hdfsGroup.add_option('--writeMode', default="append", help=u"写模式")
    hdfsGroup.add_option('--path', default="/tmp", help=u"HDFS目录")
    hdfsGroup.add_option('--channel', default="2", help=u"限速选项")
    hdfsGroup.add_option('-o', '--output', default="/tmp", help=u"输出的文件路径")
    parser.add_option_group(hdfsGroup)
    options, args = parser.parse_args(sys.argv[1:])
    fields = {}
    for fieldName in dir(options):
        if fieldName.find('_') == -1:
            field = getattr(options, fieldName)
            if not field:
                if fieldName in ['table', 'output']:
                    # 可选字段
                    continue
                print "field:(%s) can't be None" % fieldName
                parser.print_help()
                sys.exit(-1)
            else:
                fields[fieldName] = field

    db = MySQLdb.connect(options.host, options.user, options.password, options.db, int(options.port))

    if fields.get('table'):
        generateJson(db, fields['table'], fields)
    else:
        # 如果没有指定表名,则为该库下所有表生成配置文件
        tables = listTables(db)
        for table in tables:
            generateJson(db, table, fields)
    db.close

运行一下image.png

Usage

image.png

可能遇到的问题

mysql_config not found

[root@h98 src]# pip install MySQL-python
Collecting MySQL-python
  Downloading http://mirrors.aliyun.com/pypi/packages/a5/e9/51b544da85a36a68debe7a7091f068d802fc515a3a202652828c73453cad/MySQL-python-1.2.5.zip (108kB)
    100% |████████████████████████████████| 112kB 2.5MB/s 
    Complete output from command python setup.py egg_info:
    sh: mysql_config: command not found
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/tmp/pip-build-S6R7N7/MySQL-python/setup.py", line 17, in <module>
        metadata, options = get_config()
      File "setup_posix.py", line 43, in get_config
        libs = mysql_config("libs_r")
      File "setup_posix.py", line 25, in mysql_config
        raise EnvironmentError("%s not found" % (mysql_config.path,))
    EnvironmentError: mysql_config not found
    
    ----------------------------------------
Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-build-S6R7N7/MySQL-python/

解决办法

yum install mysql-devel