批量将ClickHouse表的MergeTree引擎转ReplicatedMergeTree引擎

  |   0 评论   |   616 浏览

背景

在上一篇《利用复制表ReplicatedMergeTree实现ClickHouse高可用》中我们提到,集群配置了多个副本之后,还需要将表启用复制功能,才可以实现表级别的高可用,本文以MergeTree引擎转ReplicatedMergeTree引擎为例来说明过程

详细步骤

重命名原表

CREATE DATABASE backup ON CLUSTER cxy7_cluster
RENAME TABLE cxy7_db.cxy7_order TO backup.cxy7_order_bak;

创建ReplicatedMergeTree引擎的新表

CREATE TABLE cxy7_order ON CLUSTER cxy7_cluster
(
    `time`  DateTime,
    `uid`   String,
    `dt`    Date,
    `event` String
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/cxy7_order',
           '{replica}') PARTITION BY dt ORDER BY time SETTINGS index_granularity = 8192

移动文件到新表的detached目录

mv /data1/clickhouse/data/backup/cxy7_order_bak/* /data1/clickhouse/data/cxy7_db/cxy7_order/detached/

添加分区

可以查询system.parts表来确定原表的分区列表,然后使用ATTACH PARTITION子句将分区添加至新的表中

ALTER TABLE cxy7_db.cxy7_order ATTACH PARTITION '2019-08-07'
ALTER TABLE cxy7_db.cxy7_order ATTACH PARTITION '2019-08-08'

验证数据

cxy7.com :) select count(*) from backup.cxy7_order_bak;
SELECT count(*)
FROM backup.cxy7_order_bak
┌─count()─┐
│ 446748 │
└─────────┘
cxy7.com :) select count(*) from cxy7_db.cxy7_order;
SELECT count(*)
FROM cxy7_db.cxy7_order
┌─count()─┐
│ 446748 │
└─────────┘

自动化脚本

我将该过程写了一个脚本,可以自动完成以上步骤

# 转换单个表
python convert_replicated_engine.py -d cxy7_db -t cxy7_order
# 转换所有表
python convert_replicated_engine.py

脚本

# -*- coding: utf-8 -*-
import sys
import requests
import re
import json
from urllib import quote
import os
from optparse import OptionParser
from optparse import OptionGroup
def run_cmd(host, query, port = 8123, database='cxy7_db'):
    url = 'http://%s:%d?user=%s&password=%s&database=%s&max_execution_time=300&query=' % (
    host, port, user, password, database)
    print query
    url += quote(query.encode('utf8'))
    resp = requests.post(url)
    print 'host------>%s' % host
    if resp.status_code == 200:
        return resp.content
    else:
        print 'err:', resp.content
if __name__ == "__main__":
    usage = "Usage: %prog [options] cxy7.com clickhouse python client"
    parser = OptionParser(usage=usage)
    ch_group = OptionGroup(parser, "clickhouse Config", "...")
    ch_group.add_option('-d', '--database', help='database')
    ch_group.add_option('-t', '--table', help='table')
    ch_group.add_option('-c', '--cluster', default = 'cxy7_cluster', help='cluster')
    ch_group.add_option('-u', '--user', default = 'cxy7', help='user')
    ch_group.add_option('-p', '--password', default = 'cxy7_pwd', help='password')
    parser.add_option_group(ch_group)
    options, args = parser.parse_args(sys.argv[1:])
    database = options.database
    table = options.table
    user = options.user
    password = options.password
    cluster = options.cluster
    nodes = ['ch-1','ch-2','ch-3','ch-4']
    localhost = nodes[0]
    sql = "SELECT database, name, data_path, create_table_query FROM system.tables WHERE database != 'system' AND engine = 'MergeTree' AND name not like '%bak%'"
    if database and table:
        sql += " AND database = '%s' AND name = '%s'" % (database, table)
    sql += 'FORMAT JSON'
    resp = run_cmd(localhost, sql)
    resp = json.loads(resp)
    tables = resp['data']
    for table in tables:
        database = table['database']
        table_name = table['name']
        data_path = table['data_path']
        create_table_query = table['create_table_query']
        t = '%s.%s' % (database, table_name)
        sql = 'RENAME TABLE %s TO %s' % (t, t + '_bak')
        for node in nodes:
            print run_cmd(node, sql)
        sql = create_table_query.replace(t, t + ' ON CLUSTER %s' % cluster)
        sql = sql.replace('MergeTree()', "ReplicatedMergeTree('/clickhouse/tables/{shard}/%s/%s', '{replica}')" % (database, table_name))
        print sql
        run_cmd(localhost, sql)
        count_sql = 'SELECT count() FROM ' + t + '_cluster';
        print run_cmd(localhost, count_sql)
        for node in nodes:
            bak_data_path = data_path.replace(database + '/' + table_name, database + '/' + table_name + '_bak')
            cmd = 'ssh %s "mv %s* %sdetached"' % (node, bak_data_path, data_path)
            print cmd
            os.system(cmd)
            sql = "SELECT partition FROM system.parts WHERE database = '%s' AND table = '%s' AND active GROUP BY partition ORDER BY partition DESC FORMAT JSON;" % (database, table_name + '_bak')
            resp = run_cmd(node, sql)
            resp = json.loads(resp)
            data = resp['data']
            for item in data:
                partition = item['partition']
                if partition[0:1] == '(':
                    sql = 'ALTER TABLE %s ATTACH PARTITION %s' % (t, partition)
                else:
                    sql = "ALTER TABLE %s ATTACH PARTITION '%s'" % (t, partition)
                run_cmd(node, sql)
        print run_cmd(localhost, count_sql)


读后有收获可以支付宝请作者喝咖啡