同步优化

This commit is contained in:
邓皓元 2019-01-23 10:27:37 +08:00
parent 67c170ed44
commit ed04a81983
15 changed files with 136 additions and 251 deletions

View File

@ -50,10 +50,10 @@ class MongoSync extends Command
$jobs->push(new MongoSyncJob($page, $this->limit, $utcDateTime));
}
app(ConfigService::class)->set(self::CURSOR_KEY, intval($nextMicrotime));
$total && MongoSyncJob::withChain($jobs->toArray())
->dispatch(1, $this->limit, $utcDateTime)
->allOnQueue('sync');
app(ConfigService::class)->set(self::CURSOR_KEY, intval($nextMicrotime));
}
}

View File

@ -53,10 +53,10 @@ class CardSync extends Command
$jobs->push(new CardSyncJob($page, $this->limit, $maxId));
}
$nextId && app(ConfigService::class)->set(self::CURSOR_KEY, $nextId);
$total && CardSyncJob::withChain($jobs->toArray())
->dispatch(1, $this->limit, $maxId)
->allOnQueue('sync');
$nextId && app(ConfigService::class)->set(self::CURSOR_KEY, $nextId);
}
}

View File

@ -33,9 +33,10 @@ class CompanySync extends Command
$item['updated_at'] = date('Y-m-d H:i:s', $item['updated_at']);
$item['deleted_at'] = $item['del'] ? $item['updated_at'] : null;
unset($item['del']);
unset($item['id']);
}
Company::upsert($data, 'id');
Company::upsert($data, ['sn', 'deleted_at']);
app(CompanyRepository::class)->forgetCached();
}

View File

@ -59,10 +59,10 @@ class LogSync extends Command
$jobs->push(new LogSyncJob($page, $this->limit, $maxId));
}
$nextId && app(ConfigService::class)->set(self::CURSOR_KEY, $nextId);
$total && LogSyncJob::withChain($jobs->toArray())
->dispatch(1, $this->limit, $maxId)
->allOnQueue('sync');
$nextId && app(ConfigService::class)->set(self::CURSOR_KEY, $nextId);
}
}

View File

@ -47,7 +47,7 @@ class CardSyncJob implements ShouldQueue
*/
public function handle()
{
$blocs = app(BlocRepository::class)->get()->pluck('id', 'shorthand')->toArray();
$blocs = app(BlocRepository::class)->withTrashed()->get()->pluck('id', 'shorthand')->toArray();
$query = DB::connection('vd_old')->table('ckb_custom')
->select(['id', 'custom_no', 'imsi', 'carrieroperator', 'iccid', 'card_number', 'card_from', 'iccid', 'company', 'custom_state', 'create_time' ,'update_time', 'card_cycle_start'])

View File

@ -11,13 +11,16 @@ use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;
use Dipper\Foundation\Bus\Dispatchable;
use Illuminate\Support\Facades\Artisan;
use App\Models\Virtual\OrderCardPartition;
use Illuminate\Contracts\Queue\ShouldQueue;
use App\Domains\Config\Services\ConfigService;
use App\Domains\Virtual\Commands\Sync\LogSync;
use App\Domains\Virtual\Services\CommonService;
use App\Domains\Virtual\Services\ProductService;
use App\Domains\Card\Repositories\BlocRepository;
use App\Domains\Card\Repositories\CardRepository;
use App\Domains\Virtual\Repositories\OrderRepository;
use App\Domains\Virtual\Repositories\CompanyRepository;
use App\Domains\Virtual\Repositories\PackageRepository;
use App\Domains\Virtual\Repositories\ProductRepository;
use App\Domains\Virtual\Repositories\OrderCardRepository;
@ -34,21 +37,14 @@ class LogSyncJob implements ShouldQueue
public $limit;
public $maxId;
protected $blocs;
protected $packages;
protected $products;
protected $companies;
public static $types = [11 => 1, 13 => 0, 14 => 2, 15 => 3];
protected static $carrierOperators = [10 => 0, 11 => 1, 12 => 2];
protected static $payChannels = [10 => 'wx', 11 => 'alipay', 12 => 'bank'];
protected static $orderClasses = [
\App\Models\Virtual\OrderCard::class,
\App\Models\Virtual\OrderRenewalCard::class,
\App\Models\Virtual\OrderRenewalPackageCard::class,
\App\Models\Virtual\OrderFlowPackageCards::class,
];
/**
* Undocumented function
*
@ -68,8 +64,9 @@ class LogSyncJob implements ShouldQueue
*/
public function handle()
{
$this->packages = app(PackageRepository::class)->get()->keyBy('sn');
$this->products = app(ProductRepository::class)->get()->keyBy('sn');
$this->packages = app(PackageRepository::class)->withTrashed()->get()->keyBy('sn');
$this->products = app(ProductRepository::class)->withTrashed()->get()->keyBy('sn');
$this->companies = app(CompanyRepository::class)->withTrashed()->get()->keyBy('sn');
$query = DB::connection('vd_old')->table('logs')->whereIn('type', array_keys(self::$types))
->where('id', '>', $this->maxId)->orderBy('id');
@ -82,11 +79,14 @@ class LogSyncJob implements ShouldQueue
}
DB::beginTransaction();
$orderArray = [];
$relationArray = [];
foreach ($res as $key => $value) {
$value = (array)$value;
$package = $this->getPackage($value['content']);
$company = $this->getCompany(CommonService::stringifyCompanyId($value['company']));
$unit_price = intval($value['order_account'] * 100);
$custom_price = intval($value['order_account'] * 100);
$product = $this->getProduct($package, $value['company'], $unit_price);
@ -94,13 +94,13 @@ class LogSyncJob implements ShouldQueue
$pay_channel = self::$payChannels[$value['pay_type']];
// 按规则生成订单编号 (月6+类型1+公司3+套餐4+价格6)
$sn = date('Ym', $value['create_time']) . $type . sprintf('%03d', $value['company']) . sprintf('%04d', $package['id']) . sprintf('%06d', $custom_price);
$sn = date('Ym', $value['create_time']) . $type . sprintf('%03d', $company['id']) . sprintf('%04d', $package['id']) . sprintf('%06d', $custom_price);
$data = [
$orderArray[$sn] = [
'sn' => $sn,
'source' => 1,
'type' => $type,
'company_id' => $value['company'],
'company_id' => $company['id'],
'package_id' => $package['id'],
'product_id' => $product['id'],
'pay_channel' => $pay_channel,
@ -114,44 +114,49 @@ class LogSyncJob implements ShouldQueue
'created_at' => date('Y-m-d H:i:s', $value['create_time']),
];
try {
if ($order = Order::where('sn', $data['sn'])->first()) {
$order->counts = $order->counts + 1;
$order->total_price = $order->total_price + $unit_price;
$order->custom_price = $order->custom_price + $custom_price;
$order->save();
} else {
$order = Order::create($data);
}
$relationData = [
$relationArray[] = [
'order_sn' => $sn,
'type' => $type,
'sim' => $value['sim'],
'order_id' => $order->id,
'company_id' => $order->company_id,
'package_id' => $order->package_id,
'company_id' => $company['id'],
'package_id' => $package['id'],
'counts' => 1,
'created_at' => date('Y-m-d H:i:s', $value['create_time']),
'updated_at' => date('Y-m-d H:i:s'),
];
$class = (new self::$orderClasses[$type])->query();
if ($type) {
if ($relation = $class->where('sim', $value['sim'])->where('order_id', $order->id)->first()) {
$relation->counts = $relation->counts + 1;
$relation->save();
} else {
$relation = $class->create($relationData);
}
} else {
$relation = $class->upsert($relationData, ['sim', 'order_id', 'deleted_at']);
try {
$builder = Order::query()->toBase();
$sql = $builder->getGrammar()->compileInsert($builder, $orderArray);
$sql .= " on conflict (sn, COALESCE(deleted_at, '1970-01-01 08:00:00'::timestamp)) do update set
counts=virtual_orders.counts + 1,
total_price=virtual_orders.unit_price * (virtual_orders.counts + 1),
custom_price=virtual_orders.unit_price * (virtual_orders.counts + 1)";
$builder->connection->insert($sql, Arr::flatten($orderArray, 1));
$orders = Order::withTrashed()->select(['id', 'sn'])->whereIn('sn', array_pluck($orderArray, 'sn'))->get()->pluck('id', 'sn')->toArray();
foreach ($relationArray as $key => $value) {
$relationArray[$key]['order_id'] = $orders[$value['order_sn']];
unset($relationArray[$key]['order_sn']);
}
$builder = OrderCardPartition::query()->toBase();
$sql = $builder->getGrammar()->compileInsert($builder, $relationArray);
$sql .= " on conflict (type, order_id, sim, COALESCE(deleted_at, '1970-01-01 08:00:00'::timestamp)) do update set
counts=virtual_order_cards_partition.counts + 1";
$builder->connection->insert($sql, Arr::flatten($relationArray, 1));
} catch (\Exception $e) {
DB::rollback();
throw $e;
}
}
DB::commit();
@ -163,6 +168,21 @@ class LogSyncJob implements ShouldQueue
app(OrderCardPartitionRepository::class)->forgetCached();
}
/**
* 获取套餐
*
* @param string $sn
* @return void
*/
protected function getCompany($sn)
{
if (!$company = $this->companies[$sn]) {
throw new \Exception('企业不存在');
}
return $company;
}
/**
* 获取套餐
*

View File

@ -73,10 +73,6 @@ trait OrderCardConcern
}
if (isset($conditions['month'])) {
// $query->where(function ($subQuery) use ($conditions) {
// $subQuery->where('service_start_at', '<=', Carbon::parse($conditions['month']))
// ->where('service_end_at', '>=', Carbon::parse($conditions['month']));
// });
$conditions['month'] = (int)Carbon::parse($conditions['month'])->format('Ym');
$query->whereRaw("timelines_array(sim) @> '{{$conditions['month']}}'");
}

View File

@ -3,6 +3,7 @@
namespace App\Models\Virtual;
use App\Core\Model;
use Illuminate\Database\Eloquent\SoftDeletes;
/**
* App\Models\Virtual\Product
@ -38,6 +39,8 @@ use App\Core\Model;
*/
class Product extends Model
{
use SoftDeletes;
protected $table = 'virtual_products';
public function company()

View File

@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION TIMELINES_JSON (sim INT8)
CREATE OR REPLACE FUNCTION TIMELINES_JSON (sim INT8, activated_at TIMESTAMP)
RETURNS JSONB
AS $$
DECLARE
@ -6,14 +6,11 @@ DECLARE
query TEXT;
order_row RECORD;
temp_text TEXT;
temp_activated_at TIMESTAMP;
temp_service_start_at TIMESTAMP;
temp_service_end_at TIMESTAMP;
next_timestamp TIMESTAMP;
BEGIN
SELECT virtual_activated_at INTO temp_activated_at FROM vd.cards WHERE cards.sim = TIMELINES_JSON.sim;
IF temp_activated_at IS NULL THEN
IF activated_at IS NULL THEN
RETURN timelines;
END IF;
@ -37,7 +34,7 @@ BEGIN
-- 服务时间
CASE (order_row. "type")
WHEN 0 THEN
temp_service_start_at := TO_CHAR(temp_activated_at, 'YYYY-MM-01 00:00:00');
temp_service_start_at := TO_CHAR(activated_at, 'YYYY-MM-01 00:00:00');
temp_service_end_at := temp_service_start_at + (order_row.service_months || ' month')::INTERVAL + (order_row.delay_months || ' month')::INTERVAL - '1 second'::INTERVAL;
WHEN 1, 2 THEN
IF (next_timestamp > order_row.created_at) THEN
@ -72,6 +69,18 @@ END;
$$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION TIMELINES_JSON (sim INT8)
RETURNS JSONB
AS $$
DECLARE
activated_at TIMESTAMP;
BEGIN
SELECT virtual_activated_at INTO activated_at FROM vd.cards WHERE cards.sim = TIMELINES_JSON.sim;
RETURN vd.TIMELINES_JSON(sim, activated_at);
END;
$$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION TIMELINES_RANGE (sim INT8)
RETURNS TSRANGE[]
AS $$
@ -90,6 +99,39 @@ END;
$$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION TIMELINES_ARRAY (sim INT8, activated_at TIMESTAMP)
RETURNS INT[]
AS $$
DECLARE
timelines INT[] := '{}';
timelines_json JSONB;
item JSONB;
temp_months INT;
i INT;
BEGIN
IF activated_at IS NULL THEN
RETURN timelines;
END IF;
timelines_json := vd.TIMELINES_JSON(sim, activated_at);
FOR item IN SELECT * FROM json_array_elements(timelines_json::JSON) LOOP
temp_months = (TO_CHAR((item->>'service_end_at')::TIMESTAMP, 'YYYY')::INT - TO_CHAR((item->>'service_start_at')::TIMESTAMP, 'YYYY')::INT) * 12
+ (TO_CHAR((item->>'service_end_at')::TIMESTAMP, 'MM')::INT - TO_CHAR((item->>'service_start_at')::TIMESTAMP, 'MM')::INT);
i := 0;
RAISE NOTICE 'TIMELINES_ARRAY: % - % - %', item->>'sim', item->>'package_id', temp_months;
WHILE i <= temp_months LOOP
timelines := timelines || TO_CHAR((item->>'service_start_at')::TIMESTAMP + (i || ' month')::INTERVAL, 'YYYYMM')::INT;
i := i + 1;
END LOOP;
END LOOP;
RETURN timelines;
END;
$$
LANGUAGE plpgsql IMMUTABLE;
CREATE OR REPLACE FUNCTION TIMELINES_ARRAY (sim INT8)
RETURNS INT[]
AS $$

View File

@ -36,10 +36,7 @@ CREATE TABLE virtual_order_renewal_cards PARTITION OF virtual_order_cards_partit
CREATE TABLE virtual_order_renewal_package_cards PARTITION OF virtual_order_cards_partition FOR VALUES IN (2);
CREATE TABLE virtual_order_flows_package_cards PARTITION OF virtual_order_cards_partition FOR VALUES IN (3);
CREATE UNIQUE INDEX "virtual_order_cards_order_id_sim_deleted_at_unique" ON "virtual_order_cards" (order_id, sim, COALESCE(deleted_at, '1970-01-01 08:00:00'::timestamp));
CREATE UNIQUE INDEX "virtual_order_renewal_cards_order_id_sim_deleted_at_unique" ON "virtual_order_cards" (order_id, sim, COALESCE(deleted_at, '1970-01-01 08:00:00'::timestamp));
CREATE UNIQUE INDEX "virtual_order_renewal_package_cards_order_id_sim_deleted_at_unique" ON "virtual_order_cards" (order_id, sim, COALESCE(deleted_at, '1970-01-01 08:00:00'::timestamp));
CREATE UNIQUE INDEX "virtual_order_flows_package_cards_order_id_sim_deleted_at_unique" ON "virtual_order_cards" (order_id, sim, COALESCE(deleted_at, '1970-01-01 08:00:00'::timestamp));
CREATE UNIQUE INDEX "virtual_order_cards_partition_order_id_sim_deleted_at_unique" ON "virtual_order_cards_partition" ("type", order_id, sim, COALESCE(deleted_at, '1970-01-01 08:00:00'::timestamp));
CREATE INDEX "virtual_order_cards_timelines_index" ON "virtual_order_cards" USING GIN (timelines_array(sim));
CREATE INDEX "virtual_order_renewal_cards_timelines_index" ON "virtual_order_renewal_cards" USING GIN (timelines_array(sim));

View File

@ -1,174 +0,0 @@
CREATE OR REPLACE FUNCTION FIX_TIMELINES (sim INT8, activated_at TIMESTAMP without TIME zone)
RETURNS BOOLEAN
AS $$
DECLARE
query TEXT;
order_row RECORD;
temp_service_start_at TIMESTAMP;
temp_service_end_at TIMESTAMP;
next_timestamp TIMESTAMP;
BEGIN
RAISE NOTICE '% - %', sim, activated_at;
IF activated_at IS NULL THEN
UPDATE
vd.virtual_order_cards_partition
SET
service_start_at = NULL,
service_end_at = NULL
WHERE
virtual_order_cards_partition.sim = FIX_TIMELINES.sim;
ELSE
query := 'SELECT
virtual_order_cards_partition.*,
virtual_packages.service_months,
virtual_packages.effect_months,
virtual_packages.delay_months
FROM
vd.virtual_order_cards_partition
JOIN vd.virtual_packages ON virtual_order_cards_partition.package_id = virtual_packages."id"
WHERE
virtual_order_cards_partition.sim = $1
ORDER BY
"type" ASC,
created_at ASC';
FOR order_row IN EXECUTE query
USING sim LOOP
-- 服务时间
CASE (order_row. "type")
WHEN 0 THEN
temp_service_start_at := TO_CHAR(activated_at, 'YYYY-MM-01 00:00:00');
temp_service_end_at := temp_service_start_at + (order_row.service_months || ' month')::INTERVAL + (order_row.delay_months || ' month')::INTERVAL - '1 second'::INTERVAL;
WHEN 1,
2 THEN
IF (next_timestamp > order_row.created_at) THEN
temp_service_start_at := TO_CHAR(next_timestamp, 'YYYY-MM-01 00:00:00');
ELSE
temp_service_start_at := TO_CHAR(order_row.created_at, 'YYYY-MM-01 00:00:00');
END IF;
temp_service_end_at := temp_service_start_at + order_row.counts * (order_row.service_months || ' month')::INTERVAL + (order_row.delay_months || ' month')::INTERVAL - '1 second'::INTERVAL;
ELSE
-- 先购买了加油包后再激活的
IF (order_row.created_at < activated_at) THEN
IF (order_row.created_at < TO_CHAR(activated_at - '1 month'::INTERVAL, 'YYYY-MM-01 00:00:00')::TIMESTAMP) THEN
-- 购买时间小于一个月的,直接生效
temp_service_start_at := TO_CHAR(activated_at, 'YYYY-MM-01 00:00:00');
ELSE
-- 延时生效
temp_service_start_at := TO_CHAR(activated_at + (order_row.effect_months || ' month')::INTERVAL,
'YYYY-MM-01 00:00:00');
END IF;
ELSE
-- 延时生效
temp_service_start_at := TO_CHAR(order_row.created_at + (order_row.effect_months || ' month')::INTERVAL,
'YYYY-MM-01 00:00:00');
END IF;
temp_service_end_at := temp_service_start_at + (order_row.service_months || ' month')::INTERVAL + (order_row.delay_months || ' month')::INTERVAL - '1 second'::INTERVAL;
END CASE;
next_timestamp := temp_service_end_at + '1 second'::INTERVAL;
-- 更新数据库
UPDATE
vd.virtual_order_cards_partition
SET
service_start_at = temp_service_start_at,
service_end_at = temp_service_end_at
WHERE
virtual_order_cards_partition. "id" = order_row. "id";
END LOOP;
END IF;
RETURN TRUE;
END;
$$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION CARD_TIMELINES ()
RETURNS TRIGGER
AS $$
BEGIN
IF (TG_OP = 'INSERT') THEN
-- 插入数据
IF (NEW.virtual_activated_at IS NOT NULL) THEN
PERFORM
vd.FIX_TIMELINES (NEW.sim,
NEW.virtual_activated_at);
END IF;
ELSIF (TG_OP = 'UPDATE') THEN
-- 更新数据
IF (OLD.virtual_activated_at IS NOT NULL) OR (NEW.virtual_activated_at IS NOT NULL) THEN
PERFORM
vd.FIX_TIMELINES (NEW.sim,
NEW.virtual_activated_at);
END IF;
ELSIF (TG_OP = 'DELETE') THEN
-- 删除数据
PERFORM
vd.FIX_TIMELINES (NEW.sim,
NULL);
END IF;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS CARD_TIMELINES ON cards;
CREATE TRIGGER CARD_TIMELINES AFTER INSERT
OR
UPDATE
OR DELETE ON cards FOR EACH ROW EXECUTE PROCEDURE CARD_TIMELINES ();
CREATE OR REPLACE FUNCTION ORDER_TIMELINES ()
RETURNS TRIGGER
AS $$
DECLARE
order_row RECORD;
BEGIN
IF (TG_OP = 'INSERT') THEN
-- 插入数据
SELECT
NEW.sim,
virtual_activated_at
FROM
vd.cards INTO order_row
WHERE
cards.sim = NEW.sim;
PERFORM
vd.FIX_TIMELINES (order_row.sim,
order_row.virtual_activated_at);
ELSIF (TG_OP = 'DELETE') THEN
SELECT
NEW.sim,
virtual_activated_at
FROM
vd.cards INTO order_row
WHERE
cards.sim = NEW.sim;
PERFORM
vd.FIX_TIMELINES (order_row.sim,
order_row.virtual_activated_at);
END IF;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS ORDER_TIMELINES ON virtual_order_cards;
CREATE TRIGGER ORDER_TIMELINES AFTER INSERT
OR DELETE ON virtual_order_cards FOR EACH ROW EXECUTE PROCEDURE ORDER_TIMELINES ();
DROP TRIGGER IF EXISTS ORDER_TIMELINES ON virtual_order_renewal_cards;
CREATE TRIGGER ORDER_TIMELINES AFTER INSERT
OR DELETE ON virtual_order_renewal_cards FOR EACH ROW EXECUTE PROCEDURE ORDER_TIMELINES ();
DROP TRIGGER IF EXISTS ORDER_TIMELINES ON virtual_order_renewal_package_cards;
CREATE TRIGGER ORDER_TIMELINES AFTER INSERT
OR DELETE ON virtual_order_renewal_package_cards FOR EACH ROW EXECUTE PROCEDURE ORDER_TIMELINES ();
DROP TRIGGER IF EXISTS ORDER_TIMELINES ON virtual_order_flows_package_cards;
CREATE TRIGGER ORDER_TIMELINES AFTER INSERT
OR DELETE ON virtual_order_flows_package_cards FOR EACH ROW EXECUTE PROCEDURE ORDER_TIMELINES ();