-- 从旧平台迁移数据 -- 第一步:建立外部服务器 ckb_custom 与 ckb_custom_handle_log CREATE EXTENSION IF NOT EXISTS mysql_fdw; CREATE SERVER IF NOT EXISTS vd_old FOREIGN DATA WRAPPER mysql_fdw OPTIONS (host 'rm-bp1i7dsf6fi1bc5y2o.mysql.rds.aliyuncs.com', port '3306'); CREATE USER MAPPING IF NOT EXISTS FOR root SERVER vd_old OPTIONS (username 'vduser', password 'fxft@123'); DROP FOREIGN TABLE IF EXISTS ckb_custom; DROP FOREIGN TABLE IF EXISTS ckb_custom_handle_log; IMPORT FOREIGN SCHEMA "vd-new" limit to ("ckb_custom","ckb_custom_handle_log") FROM SERVER vd_old INTO vd_old; -- 第二步:建立物化视图 CREATE MATERIALIZED VIEW logs AS SELECT ckb_custom_handle_log.custom_no, min((ckb_custom.card_number)::text) AS sim, CASE ckb_custom_handle_log.type WHEN 13 THEN 0 WHEN 11 THEN 1 WHEN 14 THEN 2 WHEN 15 THEN 3 ELSE NULL::integer END AS order_type, concat(to_char(to_timestamp((ckb_custom_handle_log.create_time)::double precision), 'YYYYMMDDHHMISS'::text), CASE ckb_custom_handle_log.type WHEN 13 THEN 0 WHEN 11 THEN 1 WHEN 14 THEN 2 WHEN 15 THEN 3 ELSE NULL::integer END, CASE ckb_custom_handle_log.pay_type WHEN 10 THEN 2 WHEN 11 THEN 3 WHEN 12 THEN 0 ELSE NULL::integer END, lpad((virtual_companies.id)::text, 3, '0'::text), lpad((virtual_packages.id)::text, 4, '0'::text), lpad((((ckb_custom_handle_log.order_account)::float * 100))::text, 6, '0'::text)) AS order_sn, COALESCE(virtual_companies.id, 0) AS company_id, COALESCE(virtual_packages.id, 0) AS package_id, COALESCE(min(virtual_products.id), 0) AS product_id, CASE ckb_custom_handle_log.pay_type WHEN 10 THEN 'wx'::text WHEN 11 THEN 'alipay'::text WHEN 12 THEN 'bank'::text ELSE NULL::text END AS pay_channel, ((ckb_custom_handle_log.order_account)::float * 100) AS unit_price, CASE min((ckb_custom_handle_log.valid_start_time)::text) WHEN '' THEN NULL WHEN '0' THEN NULL ELSE date_trunc('month'::text, to_timestamp((min((ckb_custom_handle_log.valid_start_time)::text))::double precision)) END AS service_start_at, CASE max((ckb_custom_handle_log.valid_end_time)::text) WHEN '' THEN NULL WHEN '0' THEN NULL ELSE ((date_trunc('month'::text, to_timestamp((max((ckb_custom_handle_log.valid_end_time)::text))::double precision)) + '1 mon'::interval) - '00:00:01'::interval) END AS service_end_at, to_timestamp((ckb_custom_handle_log.create_time)::double precision) AS created_at, count(*) AS counts FROM ((((vd_old.ckb_custom_handle_log LEFT JOIN vd_old.ckb_custom ON (((ckb_custom.custom_no)::text = (ckb_custom_handle_log.custom_no)::text))) LEFT JOIN vd.virtual_companies ON ((((virtual_companies.sn)::TEXT = concat('No', lpad((ckb_custom_handle_log.company)::TEXT, 11, '0'::TEXT))) AND (virtual_companies.deleted_at IS NULL)))) LEFT JOIN vd.virtual_packages ON ((((virtual_packages.sn)::text = (ckb_custom_handle_log.content)::text) AND (virtual_packages.deleted_at IS NULL)))) LEFT JOIN vd.virtual_products ON (((virtual_products.sn)::text = concat(ckb_custom_handle_log.content, '_', virtual_companies.id, '_', ((ckb_custom_handle_log.order_account)::float * 100))))) WHERE (ckb_custom_handle_log.type = ANY (ARRAY[11, 13, 14, 15])) GROUP BY ckb_custom_handle_log.create_time, virtual_companies.id, virtual_packages.id, ckb_custom_handle_log.order_account, ckb_custom_handle_log.custom_no, ckb_custom_handle_log.type, ckb_custom_handle_log.pay_type; -- 第三步:同步卡数据 INSERT INTO vd.cards (sim, imsi, iccid, bloc_id, carrier_operator, "type", activated_at, virtual_activated_at, cancelled_at, created_at, updated_at) ( SELECT card_number::BIGINT as sim, imsi, iccid, COALESCE(blocs.id, 0) as bloc_id, CASE carrieroperator WHEN 10 THEN 0 WHEN 11 THEN 1 WHEN 12 THEN 2 ELSE 255 END AS carrier_operator, CASE WHEN substr(card_number, 4, 1)::INT >= 5 THEN 1 ELSE 0 END AS "type", CASE card_cycle_start::int WHEN 0 THEN NULL ELSE to_timestamp(card_cycle_start::int) END AS activated_at, CASE card_cycle_start::int WHEN 0 THEN NULL ELSE to_timestamp(card_cycle_start::int) END AS virtual_activated_at, CASE custom_state WHEN 13 THEN to_timestamp(update_time) ELSE NULL END AS cancelled_at, to_timestamp(create_time) as created_at, to_timestamp(update_time) as updated_at FROM ckb_custom LEFT JOIN vd.blocs ON blocs.sn = ckb_custom.card_from ) ON CONFLICT (sim) DO UPDATE SET activated_at=COALESCE(cards.activated_at, excluded.virtual_activated_at), virtual_activated_at=excluded.virtual_activated_at, cancelled_at=excluded.cancelled_at; -- 第四步:同步订单数据 INSERT INTO vd.virtual_orders ("type", sn, "source", company_id, package_id, product_id, pay_channel, unit_price, counts, total_price, custom_price, order_at, order_status, transaction_status, created_at, updated_at) ( SELECT order_type, order_sn, 1, MIN(company_id), MIN(package_id), MIN(product_id), MIN(pay_channel), MIN(unit_price), SUM(counts) AS counts, MIN(unit_price) * SUM(counts) AS total_price, MIN(unit_price) * SUM(counts) AS custom_price, MIN(created_at) AS order_at, 4, 1, MIN(created_at) AS created_at, MIN(created_at) AS updated_at FROM logs GROUP BY order_type, order_sn ) ON CONFLICT (sn, COALESCE(deleted_at::TIMESTAMP, '1970-01-01 08:00:00'::TIMESTAMP)) DO UPDATE SET counts = excluded.counts, total_price = excluded.total_price, custom_price = excluded.custom_price; -- 第五步:同步订单详情数据 INSERT INTO vd.virtual_order_cards ("type", sim, order_id, company_id, package_id, product_id, counts, unit_price, service_start_at, service_end_at, created_at, updated_at) ( SELECT logs.order_type AS "type", logs.sim::BIGINT AS sim, virtual_orders.ID AS order_id, logs.company_id, logs.package_id, logs.product_id, logs.counts, logs.unit_price, logs.service_start_at, logs.service_end_at, logs.created_at AS created_at, logs.created_at AS updated_at FROM logs LEFT JOIN vd.virtual_orders ON virtual_orders.sn = logs.order_sn WHERE logs.order_type = 0 ) ON CONFLICT (sim, order_id, COALESCE(deleted_at::TIMESTAMP, '1970-01-01 08:00:00'::TIMESTAMP)) DO UPDATE SET counts = excluded.counts, service_start_at = excluded.service_start_at, service_end_at = excluded.service_end_at; INSERT INTO vd.virtual_order_renewal_cards ("type", sim, order_id, company_id, package_id, product_id, counts, unit_price, service_start_at, service_end_at, created_at, updated_at) ( SELECT logs.order_type AS "type", logs.sim::BIGINT AS sim, virtual_orders.ID AS order_id, logs.company_id, logs.package_id, logs.product_id, logs.counts, logs.unit_price, logs.service_start_at, logs.service_end_at, logs.created_at AS created_at, logs.created_at AS updated_at FROM logs LEFT JOIN vd.virtual_orders ON virtual_orders.sn = logs.order_sn WHERE logs.order_type = 1 ) ON CONFLICT (sim, order_id, COALESCE(deleted_at::TIMESTAMP, '1970-01-01 08:00:00'::TIMESTAMP)) DO UPDATE SET counts = excluded.counts, service_start_at = excluded.service_start_at, service_end_at = excluded.service_end_at; INSERT INTO vd.virtual_order_renewal_package_cards ("type", sim, order_id, company_id, package_id, product_id, counts, unit_price, service_start_at, service_end_at, created_at, updated_at) ( SELECT logs.order_type AS "type", logs.sim::BIGINT AS sim, virtual_orders.ID AS order_id, logs.company_id, logs.package_id, logs.product_id, logs.counts, logs.unit_price, logs.service_start_at, logs.service_end_at, logs.created_at AS created_at, logs.created_at AS updated_at FROM logs LEFT JOIN vd.virtual_orders ON virtual_orders.sn = logs.order_sn WHERE logs.order_type = 2 ) ON CONFLICT (sim, order_id, COALESCE(deleted_at::TIMESTAMP, '1970-01-01 08:00:00'::TIMESTAMP)) DO UPDATE SET counts = excluded.counts, service_start_at = excluded.service_start_at, service_end_at = excluded.service_end_at; INSERT INTO vd.virtual_order_flows_package_cards ("type", sim, order_id, company_id, package_id, product_id, counts, unit_price, service_start_at, service_end_at, created_at, updated_at) ( SELECT logs.order_type AS "type", logs.sim::BIGINT AS sim, virtual_orders.ID AS order_id, logs.company_id, logs.package_id, logs.product_id, logs.counts, logs.unit_price, logs.service_start_at, logs.service_end_at, logs.created_at AS created_at, logs.created_at AS updated_at FROM logs LEFT JOIN vd.virtual_orders ON virtual_orders.sn = logs.order_sn WHERE logs.order_type = 3 ) ON CONFLICT (sim, order_id, COALESCE(deleted_at::TIMESTAMP, '1970-01-01 08:00:00'::TIMESTAMP)) DO UPDATE SET counts = excluded.counts, service_start_at = excluded.service_start_at, service_end_at = excluded.service_end_at;