vd/database/migrations/migrate_from_old.pgsql
2019-03-22 11:00:05 +08:00

230 lines
10 KiB
PL/PgSQL

-- 从旧平台迁移数据
-- 第一步:建立外部服务器 ckb_custom 与 ckb_custom_handle_log
CREATE EXTENSION IF NOT EXISTS mysql_fdw;
CREATE SERVER IF NOT EXISTS vd_old FOREIGN DATA WRAPPER mysql_fdw OPTIONS (host 'rm-bp1i7dsf6fi1bc5y2o.mysql.rds.aliyuncs.com', port '3306');
CREATE USER MAPPING IF NOT EXISTS FOR root SERVER vd_old OPTIONS (username 'vduser', password 'fxft@123');
DROP FOREIGN TABLE IF EXISTS ckb_custom;
DROP FOREIGN TABLE IF EXISTS ckb_custom_handle_log;
IMPORT FOREIGN SCHEMA "vd-new" limit to ("ckb_custom","ckb_custom_handle_log") FROM SERVER vd_old INTO vd_old;
-- 第二步:建立物化视图
CREATE MATERIALIZED VIEW logs AS
SELECT
ckb_custom_handle_log.custom_no,
min((ckb_custom.card_number)::text) AS sim,
CASE ckb_custom_handle_log.type
WHEN 13 THEN
0
WHEN 11 THEN
1
WHEN 14 THEN
2
WHEN 15 THEN
3
ELSE
NULL::integer
END AS order_type,
concat(to_char(to_timestamp((ckb_custom_handle_log.create_time)::double precision), 'YYYYMMDDHHMISS'::text), CASE ckb_custom_handle_log.type
WHEN 13 THEN
0
WHEN 11 THEN
1
WHEN 14 THEN
2
WHEN 15 THEN
3
ELSE
NULL::integer
END, CASE ckb_custom_handle_log.pay_type
WHEN 10 THEN
2
WHEN 11 THEN
3
WHEN 12 THEN
0
ELSE
NULL::integer
END, lpad((virtual_companies.id)::text, 3, '0'::text), lpad((virtual_packages.id)::text, 4, '0'::text), lpad((((ckb_custom_handle_log.order_account)::float * 100))::text, 6, '0'::text)) AS order_sn,
COALESCE(virtual_companies.id, 0) AS company_id,
COALESCE(virtual_packages.id, 0) AS package_id,
COALESCE(min(virtual_products.id), 0) AS product_id,
CASE ckb_custom_handle_log.pay_type
WHEN 10 THEN
'wx'::text
WHEN 11 THEN
'alipay'::text
WHEN 12 THEN
'bank'::text
ELSE
NULL::text
END AS pay_channel,
((ckb_custom_handle_log.order_account)::float * 100) AS unit_price,
CASE min((ckb_custom_handle_log.valid_start_time)::text)
WHEN '' THEN
NULL
WHEN '0' THEN
NULL
ELSE
date_trunc('month'::text, to_timestamp((min((ckb_custom_handle_log.valid_start_time)::text))::double precision))
END AS service_start_at,
CASE max((ckb_custom_handle_log.valid_end_time)::text)
WHEN '' THEN
NULL
WHEN '0' THEN
NULL
ELSE
((date_trunc('month'::text, to_timestamp((max((ckb_custom_handle_log.valid_end_time)::text))::double precision)) + '1 mon'::interval) - '00:00:01'::interval)
END AS service_end_at,
to_timestamp((ckb_custom_handle_log.create_time)::double precision) AS created_at,
count(*) AS counts
FROM ((((vd_old.ckb_custom_handle_log
LEFT JOIN vd_old.ckb_custom ON (((ckb_custom.custom_no)::text = (ckb_custom_handle_log.custom_no)::text)))
LEFT JOIN vd.virtual_companies ON ((((virtual_companies.sn)::TEXT = concat('No', lpad((ckb_custom_handle_log.company)::TEXT, 11, '0'::TEXT))) AND (virtual_companies.deleted_at IS NULL))))
LEFT JOIN vd.virtual_packages ON ((((virtual_packages.sn)::text = (ckb_custom_handle_log.content)::text) AND (virtual_packages.deleted_at IS NULL))))
LEFT JOIN vd.virtual_products ON (((virtual_products.sn)::text = concat(ckb_custom_handle_log.content, '_', virtual_companies.id, '_', ((ckb_custom_handle_log.order_account)::float * 100)))))
WHERE (ckb_custom_handle_log.type = ANY (ARRAY[11, 13, 14, 15]))
GROUP BY
ckb_custom_handle_log.create_time,
virtual_companies.id,
virtual_packages.id,
ckb_custom_handle_log.order_account,
ckb_custom_handle_log.custom_no,
ckb_custom_handle_log.type,
ckb_custom_handle_log.pay_type;
-- 第三步:同步卡数据
INSERT INTO vd.cards (sim, imsi, iccid, carrier_operator, "type", virtual_activated_at, cancelled_at, created_at, updated_at)
(
SELECT
card_number::BIGINT as sim,
imsi,
iccid,
CASE carrieroperator WHEN 10 THEN 0 WHEN 11 THEN 1 WHEN 12 THEN 2 ELSE 255 END AS carrier_operator,
CASE WHEN substr(card_number, 4, 1)::INT >= 5 THEN 1 ELSE 0 END AS "type",
CASE card_cycle_start::int WHEN 0 THEN NULL ELSE to_timestamp(card_cycle_start::int) END AS virtual_activated_at,
CASE custom_state WHEN 13 THEN to_timestamp(update_time) ELSE NULL END AS cancelled_at,
to_timestamp(create_time) as created_at,
to_timestamp(update_time) as updated_at
FROM
ckb_custom
) ON CONFLICT (sim) DO UPDATE SET
virtual_activated_at=excluded.virtual_activated_at,
cancelled_at=excluded.cancelled_at;
-- 第四步:同步订单数据
INSERT INTO vd.virtual_orders ("type", sn, "source", company_id, package_id, product_id, pay_channel, unit_price, counts, total_price, custom_price, order_at, order_status, transaction_status, created_at, updated_at)
(
SELECT
order_type,
order_sn,
1,
MIN(company_id),
MIN(package_id),
MIN(product_id),
MIN(pay_channel),
MIN(unit_price),
SUM(counts) AS counts,
MIN(unit_price) * SUM(counts) AS total_price,
MIN(unit_price) * SUM(counts) AS custom_price,
MIN(created_at) AS order_at,
4,
1,
MIN(created_at) AS created_at,
MIN(created_at) AS updated_at
FROM
logs
GROUP BY
order_type, order_sn
) ON CONFLICT (sn, COALESCE(deleted_at::TIMESTAMP, '1970-01-01 08:00:00'::TIMESTAMP)) DO UPDATE SET counts = excluded.counts, total_price = excluded.total_price, custom_price = excluded.custom_price;
-- 第五步:同步订单详情数据
INSERT INTO vd.virtual_order_cards ("type", sim, order_id, company_id, package_id, product_id, counts, unit_price, service_start_at, service_end_at, created_at, updated_at)
(
SELECT
logs.order_type AS "type",
logs.sim::BIGINT AS sim,
virtual_orders.ID AS order_id,
logs.company_id,
logs.package_id,
logs.product_id,
logs.counts,
logs.unit_price,
logs.service_start_at,
logs.service_end_at,
logs.created_at AS created_at,
logs.created_at AS updated_at
FROM
logs
LEFT JOIN vd.virtual_orders ON virtual_orders.sn = logs.order_sn
WHERE
logs.order_type = 0
) ON CONFLICT (sim, order_id, COALESCE(deleted_at::TIMESTAMP, '1970-01-01 08:00:00'::TIMESTAMP)) DO UPDATE SET counts = excluded.counts, service_start_at = excluded.service_start_at, service_end_at = excluded.service_end_at;
INSERT INTO vd.virtual_order_renewal_cards ("type", sim, order_id, company_id, package_id, product_id, counts, unit_price, service_start_at, service_end_at, created_at, updated_at)
(
SELECT
logs.order_type AS "type",
logs.sim::BIGINT AS sim,
virtual_orders.ID AS order_id,
logs.company_id,
logs.package_id,
logs.product_id,
logs.counts,
logs.unit_price,
logs.service_start_at,
logs.service_end_at,
logs.created_at AS created_at,
logs.created_at AS updated_at
FROM
logs
LEFT JOIN vd.virtual_orders ON virtual_orders.sn = logs.order_sn
WHERE
logs.order_type = 1
) ON CONFLICT (sim, order_id, COALESCE(deleted_at::TIMESTAMP, '1970-01-01 08:00:00'::TIMESTAMP)) DO UPDATE SET counts = excluded.counts, service_start_at = excluded.service_start_at, service_end_at = excluded.service_end_at;
INSERT INTO vd.virtual_order_renewal_package_cards ("type", sim, order_id, company_id, package_id, product_id, counts, unit_price, service_start_at, service_end_at, created_at, updated_at)
(
SELECT
logs.order_type AS "type",
logs.sim::BIGINT AS sim,
virtual_orders.ID AS order_id,
logs.company_id,
logs.package_id,
logs.product_id,
logs.counts,
logs.unit_price,
logs.service_start_at,
logs.service_end_at,
logs.created_at AS created_at,
logs.created_at AS updated_at
FROM
logs
LEFT JOIN vd.virtual_orders ON virtual_orders.sn = logs.order_sn
WHERE
logs.order_type = 2
) ON CONFLICT (sim, order_id, COALESCE(deleted_at::TIMESTAMP, '1970-01-01 08:00:00'::TIMESTAMP)) DO UPDATE SET counts = excluded.counts, service_start_at = excluded.service_start_at, service_end_at = excluded.service_end_at;
INSERT INTO vd.virtual_order_flows_package_cards ("type", sim, order_id, company_id, package_id, product_id, counts, unit_price, service_start_at, service_end_at, created_at, updated_at)
(
SELECT
logs.order_type AS "type",
logs.sim::BIGINT AS sim,
virtual_orders.ID AS order_id,
logs.company_id,
logs.package_id,
logs.product_id,
logs.counts,
logs.unit_price,
logs.service_start_at,
logs.service_end_at,
logs.created_at AS created_at,
logs.created_at AS updated_at
FROM
logs
LEFT JOIN vd.virtual_orders ON virtual_orders.sn = logs.order_sn
WHERE
logs.order_type = 3
) ON CONFLICT (sim, order_id, COALESCE(deleted_at::TIMESTAMP, '1970-01-01 08:00:00'::TIMESTAMP)) DO UPDATE SET counts = excluded.counts, service_start_at = excluded.service_start_at, service_end_at = excluded.service_end_at;