DataX的基本使用

  |   0 评论   |   2,275 浏览

简介

DataX 是阿里云开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

设计理念

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

DataX3.0框架设计

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

datax_framework_new

Reader:Reader?为数据采集模块,负责采集数据源的数据,将数据发送给Framework。

Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX3.0插件体系

经过几年积累,DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下:

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库MySQL 、

Oracle        √        √     、

SQLServer 、

PostgreSQL 、

DRDS 、

达梦 、

通用RDBMS(支持所有关系型数据库) 、
阿里云数仓数据存储ODPS 、

ADS

OSS 、

OCS 、
NoSQL数据存储OTS 、

Hbase0.94 、

Hbase1.1 、

MongoDB 、

Hive 、
无结构化数据存储TxtFile 、

FTP 、

HDFS 、

Elasticsearch

安装

直接下载二进制文件:

DataX下载地址

tar zxvf datax.tar.gz  -C /usr/local
cd /usr/local/datax

下载DataX源码,自己编译

git clone https://github.com/alibaba/DataX.git
cd DataX/
mvn -U clean package assembly:assembly -Dmaven.test.skip=true

插件使用

mysql2textfile

准备数据

USER test;
CREATE TABLE `cxy_order` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `product_id` BIGINT NOT NULL,
  `price` double NOT NULL,
  `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
)
INSERT INTO cxy_order(product_id, price) VALUES(1001, 300.0);
INSERT INTO cxy_order(product_id, price) VALUES(1001, 300.0);
INSERT INTO cxy_order(product_id, price) VALUES(2003, 200.0);
INSERT INTO cxy_order(product_id, price) VALUES(2003, 200.0);
INSERT INTO cxy_order(product_id, price) VALUES(3001, 100.0);

DataX需要使用一个json文件来配置Job

将如下内容保存为cxy_order.json

{  
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "connection": [
                            {
                                "querySql": ["SELECT * FROM cxy_order WHERE price > 200;"
],
                                "jdbcUrl": ["jdbc:mysql://192.168.14.6:3306/test"]
                            }
                        ], 
                        "password": "canal",
                        "username": "canal"
                    }
                }, 
                "writer": {
                    "name": "txtfilewriter", 
                    "parameter": {
                        "path": "/tmp/cxy_order",
                        "fileName": "cxy_order.txt", 
                        "writeMode": "truncate",
                        "dateFormat": "yyyy-MM-dd"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        },
        "core": {
                "transport":{
                        "channel": {
                                "speed":{
                                        "record":"-1"
                                }
                        }
                }
        }
    }

执行

python bin/datax.py ~/cxy_order.json

执行完成后会有以下输出

2018-05-08 21:21:50.078 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2018-05-08 21:21:39
任务结束时刻                    : 2018-05-08 21:21:50
任务总计耗时                    :                 10s
任务平均流量                    :                3B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   2
读写失败总数                    :                   0

查看一下输出的文件

[root@nn2 datax]# ll /tmp/cxy_order/cxy_order.txt__6c391f40_c361_402d_b5da_99ca9ea7a240 
-rw-r--r-- 1 root root 44 6月   8 21:21 /tmp/cxy_order/cxy_order.txt__6c391f40_c361_402d_b5da_99ca9ea7a240
[root@nn2 datax]# cat /tmp/cxy_order/cxy_order.txt__6c391f40_c361_402d_b5da_99ca9ea7a240 
1,1001,300,2018-05-08
2,1001,300,2018-05-08

file2hdfs

创建Job的json文件

将以下内容保存为file2hdfs.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["/tmp/cxy_order"],
                        "encoding": "UTF-8",
                        "column": [
                            {
                                "index": 0,
                                "type": "INT"
                            },
                            {
                                "index": 1,
                                "type": "BIGINT"
                            },
                            {
                                "index": 2,
                                "type": "DOUBLE"
                            },
                            {
                                "index": 3,
                                "type": "TIMESTAMP"
                            }
                        ],
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                    "coreSite": "/usr/local/hadoop/etc/hadoop/core-site.xml",
                        "hdfsSite": "/usr/local/hadoop/etc/hadoop/hdfs-site.xml",
                        "fileType": "text",
                        "path": "/tmp/cxy_order",
                        "fileName": "cxy_order",
                        "writeMode": "append",
                        "dateFormat": "yyyy-MM-dd",
                        "fieldDelimiter": "\t",
                        "column": [
                            {
                                "name": "id",
                                "type": "INT"
                            },
                            {
                                "name": "product_id",
                                "type": "BIGINT"
                            },
                            {
                                "name": "price",
                                "type": "DOUBLE"
                            },
                            {
                                "name": "modified_time",
                                "type": "TIMESTAMP"
                            }
                        ]
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        },
        "core": {
                "transport":{
                        "channel": {
                                "speed":{
                                        "record":"-1"
                                }
                        }
                }
        }
    }
}

开始Job

bin/datax.py file2hdfs.json

执行完成后

2018-05-08 12:02:16.555 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2018-05-08 12:02:04
任务结束时刻                    : 2018-05-08 12:02:16
任务总计耗时                    :                 12s
任务平均流量                    :                0B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   2

查看输出

[root@nn2 datax]# hdfs dfs -ls /tmp/cxy_order
Found 1 items
-rw-r-----   3 root hadoop         70 
2018-05-08 11:34 /tmp/cxy_order/cxy_order__0cb3f446_f255_44bf_a0d8_fa8ca6f304d0
[root@nn2 datax]# hdfs dfs -text /tmp/cxy_order/cxy_order__0cb3f446_f255_44bf_a0d8_fa8ca6f304d0
18/05/08 12:02:41 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
18/05/08 12:02:41 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 41e7584f6e29186a820e286b027fd620f5449c49]
11001300.02018-05-08 20:42:31.0
21001300.02018-05-08 20:42:31.0

hdfs2hbase

创建HBase表

hbase(main):008:0> create 'cxy_order','f'
hbase(main):007:0> scan 'cxy_order'
ROW                                      COLUMN+CELL                                                                                                         
0 row(s) in 0.4530 seconds

创建Job的json文件

将以下文件保存为hdfs2hbase.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "coreSite": "/usr/local/hadoop/etc/hadoop/core-site.xml",
                        "hdfsSite": "/usr/local/hadoop/etc/hadoop/hdfs-site.xml",
                        "defaultFS": "hdfs://10.10.16.10:9000",
                        "fileType": "text",
                        "path": "/tmp/cxy_order",
                        "fieldDelimiter": "\t",
                        "encoding": "UTF-8",
                        "column": [
                            {
                                "index": 0,
                                "type": "STRING"
                            },
                            {
                                "index": 1,
                                "type": "LONG"
                            },
                            {
                                "index": 2,
                                "type": "DOUBLE"
                            },
                            {
                                "index": 3,
                                "type": "STRING"
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "hbase11xwriter",
                    "parameter": {
                        "hbaseConfig": {
                            "hbase.zookeeper.quorum": "nn1.zdp.ol,nn2.zdp.ol,s1.zdp.ol,s14.zdp.ol,s15.zdp.ol,s16.zdp.ol,s17.zdp.ol"
                        },
                        "table": "cxy_order",
                        "mode": "normal",
                        "rowkeyColumn": [
                            {
                                "index": 0,
                                "type": "INT"
                            },
                            {
                                "index": -1,
                                "type": "string",
                                "value": "_"
                            }
                        ],
                        "column": [
                            {
                                "index": 1,
                                "name": "f:product_id",
                                "type": "LONG"
                            },
                            {
                                "index": 2,
                                "name": "f:price",
                                "type": "DOUBLE"
                            },
                            {
                                "index": 3,
                                "name": "f:modified_time",
                                "type": "STRING"
                            }
                        ],
                        "versionColumn": {
                            "index": -1,
                            "value": "123456789"
                        },
                        "encoding": "utf-8"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        },
        "core": {
            "transport": {
                "channel": {
                    "speed": {
                        "record": "-1"
                    }
                }
            }
        }
    }
}

运行

/usr/local/datax/bin/datax.py hdfs2hbase.json

运行完成后,

2018-05-08 14:21:25.739 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2 records, 62 bytes | Speed 6B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%2018-05-08 14:21:25.740 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2018-05-08 14:21:14
任务结束时刻                    : 2018-05-08 14:21:25
任务总计耗时                    :                 11s
任务平均流量                    :                6B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   2
读写失败总数                    :                   0

查看HBase

hbase(main):010:0> scan 'cxy_order'
ROW                                      COLUMN+CELL                                                                                                         
 \x00\x00\x00\x01_                       column=f:modified_time, timestamp=123456789, value=2018-05-08 20:42:31.0                                            
 \x00\x00\x00\x01_                       column=f:price, timestamp=123456789, value=@r\xC0\x00\x00\x00\x00\x00                                               
 \x00\x00\x00\x01_                       column=f:product_id, timestamp=123456789, value=\x00\x00\x00\x00\x00\x00\x03\xE9                                    
 \x00\x00\x00\x02_                       column=f:modified_time, timestamp=123456789, value=2018-05-08 20:42:31.0                                            
 \x00\x00\x00\x02_                       column=f:price, timestamp=123456789, value=@r\xC0\x00\x00\x00\x00\x00                                               
 \x00\x00\x00\x02_                       column=f:product_id, timestamp=123456789, value=\x00\x00\x00\x00\x00\x00\x03\xE9                                    
2 row(s) in 0.0260 seconds


读后有收获可以支付宝请作者喝咖啡