根据MySQL批量生成DataX配置文件和Hive建表语句
DataX使用json文件进行任务的配置,然而,很多地方都需要指定columns属性,如果需要接入一个有着几百张表的数据库,想想就头大啊,那么我们试着将生成配置文件的步骤自动化起来
准备环境
安装MySQL-python
验证是否已安装
将如下内容保存为python文件,然后运行
#! /usr/bin/python # -*- coding:utf-8 -*- import MySQLdb
如果提示“No module named MySQLdb”,意味着你没有安装 MySQLdb 模块
[root@dx3 src]# ./generateDataXConf.py Traceback (most recent call last): File "./generateDataXConf.py", line 3, in <module> import MySQLdb ImportError: No module named MySQLdb
安装MySQL-python
pip install MySQL-python
写工具
代码
我们以mysqlreader->hdfswriter为例
#! /usr/bin/python # -*- coding:utf-8 -*- import MySQLdb import json import sys import os from optparse import OptionParser from optparse import OptionGroup def convertType(orignalType): ''' 将MySQL的字段类型转换为Hive的字段类型 :param orignalType: MySQL的字段类型 :return:Hive的字段类型 ''' destType = None # 数值型 if orignalType.startswith("tinyint"): destType = "TINYINT" elif orignalType.startswith("smallint"): destType = "SMALLINT" elif orignalType.startswith("mediumint"): destType = "INT" elif orignalType.startswith("int"): destType = "INT" elif orignalType.startswith("bigint"): destType = "BIGINT" elif orignalType.startswith("float"): destType = "FLOAT" elif orignalType.startswith("double"): destType = "DOUBLE" elif orignalType.startswith("numeric"): destType = "DOUBLE" # 字符串 elif orignalType.startswith("char"): destType = "STRING" elif orignalType.startswith("varchar"): destType = "STRING" elif orignalType.startswith("tinytext"): destType = "STRING" elif orignalType.startswith("text"): destType = "STRING" elif orignalType.startswith("mediumtext"): destType = "STRING" elif orignalType.startswith("longtext"): destType = "STRING" # 日期时间类型 elif orignalType.startswith("date"): destType = "DATE" elif orignalType.startswith("time"): destType = "DATE" elif orignalType.startswith("year"): destType = "DATE" elif orignalType.startswith("datetime"): destType = "DATE" elif orignalType.startswith("timestamp"): destType = "TIMESTAMP" else: destType = "UNSUPPORTED" print "未匹配到合适的类型:%s" % orignalType return destType def listTables(db): ''' 列出指定MySQL库下所有表 :param db: 数据库连接 :return: 表列表 ''' sql = "SHOW TABLES;" tables = [] try: cursor = db.cursor() cursor.execute(sql) results = cursor.fetchall() for row in results: table = row[0] tables.append(table) except Exception, e: print e return tables def getColumns(db, table): ''' 生成Reader和Writer对应的Columns属性 :param db:数据库连接 :param table: 表名 :return: tuple(mysqlreder的columns和hdfswriter的columns) ''' sql = "DESC " + table hdfsColumns = [] mysqlColumns = [] try: cursor = db.cursor() cursor.execute(sql) results = cursor.fetchall() for row in results: field = row[0] type = row[1] is_null = row[2] print "field:%s, type:%s, is_null:%s" %(field, type, is_null) column = {} column['name'] = field column_type = convertType(type) if column_type == "UNSUPPORTED": print "不支持字段:%s的类型:%s" % (field, type) continue else: column['type'] = column_type hdfsColumns.append(column) mysqlColumns.append(field) except Exception, e: print e return mysqlColumns,hdfsColumns def generateJobConfig(mysqlColumns, hdfsColumns, table, fields): ''' 生成job配置对象 :param mysqlColumns: mysqlreder的columns :param hdfsColumns: hdfswriter的columns :param table: 表名 :param fields: 属性字典 :return:job配置对象 ''' jobTemplate={ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [], "connection": [ { "jdbcUrl": [], "table": [] } ], "password": "", "username": "", "where": "${where}" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [], "defaultFS": "hdfs://ns1", "hadoopConfig":{ "dfs.nameservices": "ns1", "dfs.ha.namenodes.ns1": "namenode326,namenode418", "dfs.namenode.rpc-address.ns1.namenode326": "nn1.cxy7.com:8020", "dfs.namenode.rpc-address.ns1.namenode418": "nn2.cxy7.com:8020", "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }, "coreSite": "/usr/local/hadoop/etc/hadoop/core-site.xml", "hdfsSite": "/usr/local/hadoop/etc/hadoop/hdfs-site.xml", "compress": "", "defaultFS": "", "fieldDelimiter": "", "fileName": "", "fileType": "", "path": "", "writeMode": "" } } } ], "setting": { "speed": { "channel": "" } }, "core": { "transport":{ "channel": { "speed":{ "record":"1000" } } } } } } # mysqlreader readerParameter = jobTemplate['job']['content'][0]['reader']['parameter'] readerParameter['column'] = mysqlColumns readerParameter['username'] = fields['user'] readerParameter['password'] = fields['password'] readerParameter['connection'][0]['table'].append(table) url = "jdbc:mysql://%s:%d/%s" % (fields['host'], int(fields['port']), fields['db']) readerParameter['connection'][0]['jdbcUrl'].append(url) # hdfswriter writerParameter = jobTemplate['job']['content'][0]['writer']['parameter'] writerParameter['column'] = hdfsColumns writerParameter['compress'] = fields['compress'] writerParameter['defaultFS'] = fields['defaultFS'] writerParameter['fieldDelimiter'] = fields['fieldDelimiter'] writerParameter['fileName'] = table writerParameter['fileType'] = fields['fileType'] writerParameter['path'] = fields['path'] + '/' + table writerParameter['writeMode'] = fields['writeMode'] jobTemplate['job']['setting']['speed']['channel'] = fields['channel'] return jobTemplate def generateHiveDDL(table, hdfsColumns, fields): ''' 生成Hive建表语句 :param table:mysql表名 :param hdfsColumns:hdfswriter的columns :param fields:属性字典 :return:Hive建表语句 ''' sql = "CREATE EXTERNAL TABLE " + table + '(' for column in hdfsColumns: sql += '`%s` %s,' % (column['name'], column['type']) sql = sql[0:len(sql) - 1] sql += ') ' sql += 'PARTITIONED BY (' sql += '`dt` string) ' sql += 'STORED AS %s ' % fields['fileType'] sql += "LOCATION 'hdfs://ns1%s/%s'" % (fields['path'], table) print sql return sql def generateJson(db, table, fields): ''' 为表生成配置对象和Hive建表语句,并导出到文件 :param db:数据库连接 :param table:表名 :param fields:属性字典 :return: ''' mysqlColumns, hdfsColumns = getColumns(db, table) jobConf = generateJobConfig(mysqlColumns, hdfsColumns, table, fields) sql = generateHiveDDL(table, hdfsColumns, fields) if fields.get('output'): if (not os.path.exists(fields['output'])): os.mkdir(fields['output']) # 导出配置文件 filePath = '%s/%s.json' % (fields['output'], table); print "dump to %s" % filePath file = open(filePath, 'w') json.dump(jobConf, file, indent=4, sort_keys=False) # print json.dumps(jobConf, indent=4, sort_keys=False) # 导出Hive建表语句 sqlPath = '%s/%s.sql' % (fields['output'], table); file = open(sqlPath, 'w') print 'export sql to %s' % sqlPath file.write(sql) file.close() if __name__ == "__main__": usage = "Usage: %prog [options] mysqlreader and hdfswriter config" parser = OptionParser(usage=usage) dbGroup = OptionGroup(parser, "MySQL Config", "...") dbGroup.add_option('-H', '--host', help='mysql host') dbGroup.add_option('-P', '--port', default="3306", help='mysql port') dbGroup.add_option('-u', '--user', default="root", help='mysql username') dbGroup.add_option('-p', '--password', help='mysql password') dbGroup.add_option('-D', '--db', help="mysql db") dbGroup.add_option('-T', '--table', help=u"mysql table,如果不指定,表示为整个库的所有表生成配置文件") parser.add_option_group(dbGroup) hdfsGroup = OptionGroup(parser, "HDFS Config", '...') hdfsGroup.add_option('--compress', default="NONE", help=u"压缩选项") hdfsGroup.add_option('--defaultFS', default="\t", help="HDFS") hdfsGroup.add_option('--fieldDelimiter', default="\t", help=u"字段分隔符") hdfsGroup.add_option('--fileType', default="TEXT", help=u"输出文件类型") hdfsGroup.add_option('--writeMode', default="append", help=u"写模式") hdfsGroup.add_option('--path', default="/tmp", help=u"HDFS目录") hdfsGroup.add_option('--channel', default="2", help=u"限速选项") hdfsGroup.add_option('-o', '--output', default="/tmp", help=u"输出的文件路径") parser.add_option_group(hdfsGroup) options, args = parser.parse_args(sys.argv[1:]) fields = {} for fieldName in dir(options): if fieldName.find('_') == -1: field = getattr(options, fieldName) if not field: if fieldName in ['table', 'output']: # 可选字段 continue print "field:(%s) can't be None" % fieldName parser.print_help() sys.exit(-1) else: fields[fieldName] = field db = MySQLdb.connect(options.host, options.user, options.password, options.db, int(options.port)) if fields.get('table'): generateJson(db, fields['table'], fields) else: # 如果没有指定表名,则为该库下所有表生成配置文件 tables = listTables(db) for table in tables: generateJson(db, table, fields) db.close
运行一下
Usage
可能遇到的问题
mysql_config not found
[root@h98 src]# pip install MySQL-python Collecting MySQL-python Downloading http://mirrors.aliyun.com/pypi/packages/a5/e9/51b544da85a36a68debe7a7091f068d802fc515a3a202652828c73453cad/MySQL-python-1.2.5.zip (108kB) 100% |████████████████████████████████| 112kB 2.5MB/s Complete output from command python setup.py egg_info: sh: mysql_config: command not found Traceback (most recent call last): File "<string>", line 1, in <module> File "/tmp/pip-build-S6R7N7/MySQL-python/setup.py", line 17, in <module> metadata, options = get_config() File "setup_posix.py", line 43, in get_config libs = mysql_config("libs_r") File "setup_posix.py", line 25, in mysql_config raise EnvironmentError("%s not found" % (mysql_config.path,)) EnvironmentError: mysql_config not found ---------------------------------------- Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-build-S6R7N7/MySQL-python/
解决办法
yum install mysql-devel
读后有收获可以支付宝请作者喝咖啡
