实时同步MySQL表数据到ClickHouse

  |   0 评论   |   2,992 浏览

背景

为了实时地对业务数据库中的数据进行分析,需要从MySQL将数据实时地同步至ClickHouse,主要有以下方案:

  • ClickHouse中直接建立MySQL引擎的表,缺点是查询在MySQL端执行,失去了ClickHouse的高效性;

  • 利用canal之类的工具中先转到Kafka中,再导入ClickHouse,有点绕;

  • 利用clickhouse-mysql实时地同步

clickhouse-mysql简介

clickhouse-mysql是用于从MySQL到Click House中同步数据的工具,不但可以一次性迁移现有数据,还可以监听新的Insert事件,从而可以实时地同步。

当前只支持insert事件,社区中已经有人支持了UPDATE和DELETE事件,应该很快就会发布。

GitHub地址:https://github.com/Altinity/clickhouse-mysql-data-reader

安装

要求

Python3.6+

安装clickhouse-mysql

安装MySQL repo

sudo yum install -y https://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm

安装epel

sudo yum install -y epel-release

安装直接依赖

sudo yum install -y mysql-community-devel
sudo yum install -y mariadb-devel
sudo yum install -y gcc
sudo yum install -y python34-devel python34-pip

安装clickhouse-mysql

sudo pip3 install clickhouse-mysql

准备数据

授予权限

CREATE USER 'reader'@'%' IDENTIFIED BY 'qwerty';
CREATE USER 'reader'@'127.0.0.1' IDENTIFIED BY 'qwerty';
CREATE USER 'reader'@'localhost' IDENTIFIED BY 'qwerty';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE, SUPER ON *.* TO 'reader'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE, SUPER ON *.* TO 'reader'@'127.0.0.1';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE, SUPER ON *.* TO 'reader'@'localhost';
FLUSH PRIVILEGES;

MySQL启用binlog

[mysqld]# mandatory
server-id        = 1
log_bin          = /var/lib/mysql/bin.log
binlog-format    = row # very important if you want to receive write, update and delete row events
# optional
expire_logs_days = 30
max_binlog_size  = 768M
# setup listen address
bind-address     = 0.0.0.0

创建MySQL表

CREATE TABLE `dict_date` (
  `day_key` varchar(10) NOT NULL,
  `day_of_year` int(11) DEFAULT NULL,
  `weekday` int(11) DEFAULT NULL,
  `week_of_year` int(11) DEFAULT NULL,
  `week_of_month` int(11) DEFAULT NULL,
  `report_week_of_year` int(11) DEFAULT NULL,
  `year` int(11) DEFAULT NULL,
  `month` int(11) DEFAULT NULL,
  `day` int(11) DEFAULT NULL,
  `reportweekday` int(11) DEFAULT NULL,
  `report_week_day_range` varchar(30) DEFAULT NULL,
  PRIMARY KEY (`day_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPRESSED

插入100行数据

MySQL [test]> select count(*) from dict_date;
+----------+
| count(*) |
+----------+
| 100 |
+----------+
1 row in set (0.00 sec)

同步已经存在的数据

image.png

启动clickhouse-mysql

clickhouse-mysql \
--src-host=127.0.0.1 \
--src-user=reader \
--src-password=qwerty \
--migrate-table \
--src-tables=test.dict_date \
--dst-host=127.0.0.1 \
--dst-port=9123 \
--dst-user=xxx \
--dst-password=xxx \
--dst-table=dict_date \
--dst-create-table

查看ClickHouse记录数

cxy7.com :) select count(*) from test.dict_date;

SELECT count(*)
FROM test.dict_date

┌─count()─┐
│ 100 │
└─────────┘

1 rows in set. Elapsed: 0.001 sec.

实时同步新数据

image.png

准备记录读取binlog的position文件

默认情况下,该工具不保存position,如果进程重启,则会从头开始消费,导致数据重复,因此,需要将position保存下来

查看position

MySQL [test]> show master status;
+------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------+----------+--------------+------------------+-------------------+
| bin.000002 | 76090 | | | |
+------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)

创建position文件

echo "bin.000002:76090" > /data/clickhouse-mysql/pos/dict_date.pos

启动clickhouse-mysql

clickhouse-mysql \
--src-server-id=1 \
--src-resume \
--binlog-position-file=/data/clickhouse-mysql/pos/dict_date.pos \
--src-wait \
--nice-pause=1 \
--log-level=info \
--src-host=127.0.0.1 \
--src-user=reader \
--src-password=qwerty \
--src-tables=test.dict_date \
--dst-host=127.0.0.1 \
--dst-port=9123 \
--dst-user=xxx \
--dst-password=xxx \
--dst-table=dict_date \
--csvpool \
--pump-data

往MySQL中再插入50行

MySQL [test]> select count(*) from dict_date;
+----------+
| count(*) |
+----------+
| 150 |
+----------+
1 row in set (0.00 sec)

对比clickhouse

cxy7.com :) select count(*) from test.dict_date;

SELECT count(*)
FROM test.dict_date

┌─count()─┐
│ 150 │
└─────────┘

1 rows in set. Elapsed: 0.001 sec.

查看position文件

[root@cxy7.com ~]# cat /data/clickhouse-mysql/pos/dict_date.pos
bin.000002:89909

重启恢复

image.png

重启clickhouse-mysql进程

查询clickhouse

cxy7.com :) select count(*) from test.dict_date;

SELECT count(*)
FROM test.dict_date

┌─count()─┐
│ 150 │
└─────────┘

1 rows in set. Elapsed: 0.001 sec.

再往表里插入50行

MySQL [test]> select count(*) from dict_date;
+----------+
| count(*) |
+----------+
| 200 |
+----------+
1 row in set (0.00 sec)

对比clickhouse

cxy7.com :) select count(*) from test.dict_date;

SELECT count(*)
FROM test.dict_date

┌─count()─┐
│ 200 │
└─────────┘

1 rows in set. Elapsed: 0.001 sec.

Congratulations!

两边一致!

可能遇到的问题

每次重启后都会消费全量数据,造成重复

解决办法

只有第一次启动的时候需要--migrate-table参数,后面都不需要了

ex='NoneType' object has no attribute 'tzinfo'

异常信息

2019-10-18 15:49:02,955/1571384942.955640:CRITICAL:QUERY FAILED
2019-10-18 15:49:02,955/1571384942.955796:CRITICAL:ex='NoneType' object has no attribute 'tzinfo'
2019-10-18 15:49:02,955/1571384942.955857:CRITICAL:sql=INSERT INTO `test`.`dict_date` (`day_key`, `day_of_year`, `weekday`, `week_of_year`, `week_of_month`, `report_week_of_year`, `year`, `month`, `day`, `reportweekday`, `report_week_day_range`) VALUES

clichouse日志报错

2019.10.18 15:49:02.955702 [ 116 ] {c6aaf8fa-7ee6-4688-8bab-e7bbe72e3944} <Error> executeQuery: Code: 33, e.displayText() = DB::Exception: Cannot read all data in NativeBlockInputStream. Rows read: 0. Rows expected: 100000. (version 19.13.1.11) (from 127.0.0.1:39733) (in query: INSERT INTO `test`.`dict_date` (`day_key`, `day_of_year`, `weekday`, `week_of_year`, `week_of_month`, `report_week_of_year`, `year`, `month`, `day`, `reportweekday`, `report_week_day_range`) VALUES), Stack trace:
0. clickhouse-server(StackTrace::StackTrace()+0x30) [0x6f18ee0]
1. clickhouse-server(DB::Exception::Exception(std::string const&, int)+0x1f) [0x315f3ef]
2. clickhouse-server(DB::NativeBlockInputStream::readData(DB::IDataType const&, DB::IColumn&, DB::ReadBuffer&, unsigned long, double)+0x31d) [0x5c4ee8d]
3. clickhouse-server(DB::NativeBlockInputStream::readImpl()+0xbd7) [0x5c4fc27]
4. clickhouse-server(DB::IBlockInputStream::read()+0x238) [0x5c44288]
5. clickhouse-server(DB::TCPHandler::receiveData()+0x4f) [0x319ddcf]
6. clickhouse-server(DB::TCPHandler::receivePacket()+0x85) [0x319ee55]
7. clickhouse-server(DB::TCPHandler::readData(DB::Settings const&)+0x19b) [0x319f30b]
8. clickhouse-server(DB::TCPHandler::processInsertQuery(DB::Settings const&)+0x20b) [0x319f7fb]
9. clickhouse-server(DB::TCPHandler::runImpl()+0xc67) [0x31a06a7]
10. clickhouse-server(DB::TCPHandler::run()+0x1c) [0x31a176c]
11. clickhouse-server(Poco::Net::TCPServerConnection::start()+0xf) [0x6a5e54f]
12. clickhouse-server(Poco::Net::TCPServerDispatcher::run()+0x166) [0x6a5e916]
13. clickhouse-server(Poco::PooledThread::run()+0x77) [0x70ede07]
14. clickhouse-server(Poco::ThreadImpl::runnableEntry(void*)+0x38) [0x70e9fc8]
15. clickhouse-server() [0x76bbfff]
16. /lib64/libpthread.so.0(+0x7dd5) [0x7f38c9b5cdd5]
17. /lib64/libc.so.6(clone+0x6d) [0x7f38c9583ead]
2019.10.18 15:49:02.955861 [ 116 ] {c6aaf8fa-7ee6-4688-8bab-e7bbe72e3944} <Information> TCPHandler: Processed in 0.683 sec.
2019.10.18 15:49:02.955891 [ 116 ] {} <Information> TCPHandler: Done processing connection.

原因分析

时间字段里有不合法数据“0000-00-00 00:00:00”

解决办法

改为正常的时间格式

ex=utf_8_encode() argument 1 must be str, not int

异常信息

2019-10-18 16:27:37,195/1571387257.195470:CRITICAL:QUERY FAILED
2019-10-18 16:27:37,195/1571387257.195639:CRITICAL:ex=utf_8_encode() argument 1 must be str, not int
2019-10-18 16:27:37,195/1571387257.195707:CRITICAL:sql=INSERT INTO `test`.`dict_date` (`day_key`, `day_of_year`, `weekday`, `week_of_year`, `week_of_month`, `report_week_of_year`, `year`, `month`, `day`, `reportweekday`, `report_week_day_range`) VALUES

原因分析

mysql和clickhouse的字段类型不一致,mysql里是int型,clickhouse里是String

解决办法

clickhouse字段改为Int8就好了


读后有收获可以支付宝请作者喝咖啡