优化同步规则

This commit is contained in:
邓皓元 2019-01-15 18:39:11 +08:00
parent a38934985e
commit d4727afda4
8 changed files with 334 additions and 224 deletions

View File

@ -33,7 +33,7 @@ class Kernel extends ConsoleKernel
/**
* Queue the given console command.
*
* @param string $command
* @param array $parameters

View File

@ -17,8 +17,6 @@ class MongoSync extends Command
protected $description = '同步卡基础信息数据';
protected static $carrierOperators = [1, 0, 2];
protected $limit = 1000;
const CURSOR_KEY = 'sync_mongo_cursor';
@ -34,13 +32,13 @@ class MongoSync extends Command
$this->line('待同步条数:'.$total);
if ($total) {
Artisan::call('real:sync-bloc');
}
$page = 1;
while ($total) {
if ($page === 1) {
Artisan::call('real:sync-bloc');
}
dispatch(new MongoSyncJob($page, $this->limit, $utcDateTime));
if ($page * $this->limit >= $total) {

View File

@ -22,6 +22,8 @@ class MongoSyncJob implements ShouldQueue
public $limit;
public $utcDateTime;
protected static $carrierOperators = [1, 0, 2];
/**
* Undocumented function
*

View File

@ -6,6 +6,8 @@ use Carbon\Carbon;
use App\Models\Card\Card;
use Illuminate\Support\Arr;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Artisan;
use App\Domains\Virtual\Jobs\CardSyncJob;
use App\Domains\Config\Services\ConfigService;
use App\Domains\Card\Repositories\BlocRepository;
use App\Domains\Card\Repositories\CardRepository;
@ -18,79 +20,28 @@ class CardSync extends Command
const CURSOR_KEY = 'sync_card_cursor';
protected static $carrierOperators = [10 => 0, 11 => 1, 12 => 2];
protected $blocs;
protected $limit = 1000;
public function handle()
{
$nextId = $maxId = app(ConfigService::class)->get(self::CURSOR_KEY) ?: 0;
$maxId = app(ConfigService::class)->get(self::CURSOR_KEY) ?: 0;
$query = DB::connection('vd_old')->table('ckb_custom')
->select(['id', 'custom_no', 'imsi', 'carrieroperator', 'iccid', 'card_number', 'card_from', 'iccid', 'company', 'custom_state', 'create_time' ,'update_time', 'card_cycle_start'])
->where('id', '>', $maxId)
->orderBy('id');
$query = DB::connection('vd_old')->table('ckb_custom')->where('id', '>', $maxId)->orderBy('id');
$nextId = $query->max('id');
$total = $query->count();
$this->line('待同步条数:' . $total);
if ($total) {
$this->blocs = app(BlocRepository::class)->get()->pluck('id', 'shorthand')->toArray();
Artisan::call('real:sync-bloc');
}
$page = 1;
while ($total) {
echo 'sync_card_cursor page #: ' . $page . ' nextId #: ' . $nextId . PHP_EOL;
$res = $query->forPage($page, $this->limit)->get();
if (empty($res)) {
break;
}
$array = [];
foreach ($res as $key => $value) {
$value = (array)$value;
$virtual_activated_at = date('Y-m-d H:i:s', $value['card_cycle_start']);
if (Carbon::parse($virtual_activated_at) < Carbon::parse('2000-01-01 00:00:00')) {
$virtual_activated_at = null;
}
$array[] = [
'sim' => $value['card_number'],
'imsi' => $value['imsi'],
'iccid' => $value['iccid'],
'bloc_id' => $this->blocs[$value['card_from']] ?? 0,
'carrier_operator' => self::$carrierOperators[$value['carrieroperator']] ?? 255,
'type' => ($value['card_number'][3] >= 5) ? 1 : 0,
'activated_at' => $virtual_activated_at,
'virtual_activated_at' => $virtual_activated_at,
'cancelled_at' => ($value['custom_state'] === 13) ? date('Y-m-d H:i:s', $value['update_time']) : null,
'created_at' => date('Y-m-d H:i:s', $value['create_time']),
'updated_at' => date('Y-m-d H:i:s', $value['update_time']),
];
$nextId = $value['id'];
}
$builder = Card::query()->toBase();
$sql = $builder->getGrammar()->compileInsert($builder, $array);
$sql .= 'on conflict (sim) do update set
activated_at=COALESCE(cards.activated_at, excluded.virtual_activated_at),
virtual_activated_at=excluded.virtual_activated_at,
cancelled_at=excluded.cancelled_at';
$builder->connection->insert($sql, Arr::flatten($array, 1));
app(ConfigService::class)->set(self::CURSOR_KEY, $nextId);
dispatch(new CardSyncJob($page, $this->limit, $maxId));
if ($page * $this->limit >= $total) {
break;
@ -99,6 +50,6 @@ class CardSync extends Command
$page++;
}
app(CardRepository::class)->forgetCached();
app(ConfigService::class)->set(self::CURSOR_KEY, $nextId);
}
}

View File

@ -7,6 +7,7 @@ use App\Models\Card\Card;
use App\Models\Virtual\Order;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Artisan;
use App\Domains\Virtual\Jobs\LogSyncJob;
use App\Domains\Config\Services\ConfigService;
use App\Domains\Virtual\Services\ProductService;
use App\Domains\Card\Repositories\BlocRepository;
@ -22,129 +23,31 @@ class LogSync extends Command
const CURSOR_KEY = 'sync_log_cursor';
protected static $types = [11 => 1, 13 => 0, 14 => 2, 15 => 3];
protected static $carrierOperators = [10 => 0, 11 => 1, 12 => 2];
protected static $payChannels = [10 => 'wx', 11 => 'alipay', 12 => 'bank'];
protected static $orderClasses = [
\App\Models\Virtual\OrderCard::class,
\App\Models\Virtual\OrderRenewalCard::class,
\App\Models\Virtual\OrderRenewalPackageCard::class,
\App\Models\Virtual\OrderFlowPackageCards::class,
];
protected $limit = 1000;
protected $blocs;
protected $packages;
protected $products;
public function handle()
{
$nextId = $maxId = app(ConfigService::class)->get(self::CURSOR_KEY) ?: 0;
$maxId = app(ConfigService::class)->get(self::CURSOR_KEY) ?: 0;
$this->blocs = app(BlocRepository::class)->get()->pluck('id', 'shorthand')->toArray();
$this->packages = app(PackageRepository::class)->get()->keyBy('sn');
$this->products = app(ProductRepository::class)->get()->keyBy('sn');
$query = DB::connection('vd_old')->table('logs')->whereIn('type', array_keys(LogSyncJob::$types))
->where('id', '>', $maxId)->orderBy('id');
$query = DB::connection('vd_old')->table('logs')->whereIn('type', array_keys(self::$types))
->where('id', '>', $nextId)->orderBy('id');
$nextId = $query->max('id');
$total = $query->count();
$this->line('待同步条数:'.$total);
if ($total) {
Artisan::call('virtual:sync-company');
Artisan::call('virtual:sync-package');
Artisan::call('virtual:sync-product');
}
$page = 1;
while ($total) {
if ($page === 1) {
Artisan::call('virtual:sync-company');
Artisan::call('virtual:sync-package');
Artisan::call('virtual:sync-product');
}
echo 'sync_log_cursor page #: ' . $page . ' nextId #: ' . $nextId . PHP_EOL;
$res = $query->forPage($page, $this->limit)->get();
DB::beginTransaction();
foreach ($res as $key => $value) {
$value = (array)$value;
$package = $this->getPackage($value['content']);
$unit_price = intval($value['order_account'] * 100);
$custom_price = intval($value['order_account'] * 100);
$product = $this->getProduct($package, $value['company'], $unit_price);
$type = self::$types[$value['type']];
$pay_channel = self::$payChannels[$value['pay_type']];
// 按规则生成订单编号 (月6+类型1+公司3+套餐4+价格6)
$sn = date('Ym', $value['create_time']) . $type . sprintf('%03d', $value['company']) . sprintf('%04d', $package['id']) . sprintf('%06d', $custom_price);
$data = [
'sn' => $sn,
'source' => 1,
'type' => $type,
'company_id' => $value['company'],
'package_id' => $package['id'],
'product_id' => $product['id'],
'pay_channel' => $pay_channel,
'unit_price' => $unit_price,
'counts' => 1,
'total_price' => $unit_price,
'custom_price' => $custom_price,
'order_at' => date('Y-m-d H:i:s', $value['create_time']),
'order_status' => 4,
'transaction_status' => 1,
'created_at' => date('Y-m-d H:i:s', $value['create_time']),
];
try {
if ($order = Order::where('sn', $data['sn'])->first()) {
$order->counts = $order->counts + 1;
$order->total_price = $order->total_price + $unit_price;
$order->custom_price = $order->custom_price + $custom_price;
$order->save();
} else {
$order = Order::create($data);
}
$relationData = [
'type' => $type,
'sim' => $value['sim'],
'order_id' => $order->id,
'company_id' => $order->company_id,
'package_id' => $order->package_id,
'counts' => 1,
'service_start_at' => date('Y-m-d H:i:s', intval($value['valid_start_time'])),
'service_end_at' => date('Y-m-d H:i:s', intval($value['valid_end_time'])),
'created_at' => date('Y-m-d H:i:s', $value['create_time']),
'updated_at' => date('Y-m-d H:i:s'),
];
$class = (new self::$orderClasses[$type])->query();
if ($type) {
if ($relation = $class->where('sim', $value['sim'])->where('order_id', $order->id)->first()) {
$relation->counts = $relation->counts + 1;
$relation->save();
} else {
$relation = $class->create($relationData);
}
} else {
$relation = $class->upsert($relationData, ['sim', 'order_id', 'deleted_at']);
}
} catch (\Exception $e) {
DB::rollback();
throw $e;
}
$nextId = $value['id'];
}
DB::commit();
app(ConfigService::class)->set(self::CURSOR_KEY, $nextId);
dispatch(new LogSyncJob($page, $this->limit, $maxId));
if ($page * $this->limit >= $total) {
break;
@ -152,44 +55,7 @@ class LogSync extends Command
$page++;
}
}
/**
* 获取套餐
*
* @param string $sn
* @return void
*/
protected function getPackage($sn)
{
if (!$package = $this->packages[$sn]) {
throw new \Exception('套餐不存在');
}
return $package;
}
/**
* 获取定价
*
* @param string $sn
* @return void
*/
protected function getProduct($package, $companyId, $price)
{
$sn = strtoupper($package['sn'] . '_' . $companyId . '_' . $price);
if (!$product = $this->products[$sn]) {
$product = app(ProductService::class)->store([
'name' => $package['name'] . '' . $price,
'company_id' => $companyId,
'package_id' => $package['id'],
'base_price' => $price,
'renewal_price' => $price,
]);
$this->products[$sn] = $product;
}
return $product;
app(ConfigService::class)->set(self::CURSOR_KEY, $nextId);
}
}

View File

@ -0,0 +1,96 @@
<?php
namespace App\Domains\Virtual\Jobs;
use App\Models\Card\Card;
use Illuminate\Support\Arr;
use Illuminate\Bus\Queueable;
use Illuminate\Support\Carbon;
use Illuminate\Support\Facades\DB;
use Dipper\Foundation\Bus\Dispatchable;
use Illuminate\Support\Facades\Artisan;
use Illuminate\Contracts\Queue\ShouldQueue;
use App\Domains\Card\Repositories\BlocRepository;
use App\Domains\Card\Repositories\CardRepository;
use App\Domains\Config\Services\ConfigService;
class CardSyncJob implements ShouldQueue
{
use Queueable, Dispatchable;
public $page;
public $limit;
public $maxId;
protected static $carrierOperators = [10 => 0, 11 => 1, 12 => 2];
protected $blocs;
/**
* Undocumented function
*
* @param int $page
* @param int $limit
* @param int $maxId
*/
public function __construct($page, $limit, $maxId)
{
$this->page = $page;
$this->limit = $limit;
$this->maxId = $maxId;
}
/**
*
*/
public function handle()
{
$blocs = app(BlocRepository::class)->get()->pluck('id', 'shorthand')->toArray();
$query = DB::connection('vd_old')->table('ckb_custom')
->select(['id', 'custom_no', 'imsi', 'carrieroperator', 'iccid', 'card_number', 'card_from', 'iccid', 'company', 'custom_state', 'create_time' ,'update_time', 'card_cycle_start'])
->where('id', '>', $this->maxId)
->orderBy('id');
$res = $query->forPage($this->page, $this->limit)->get();
$array = [];
foreach ($res as $key => $value) {
$value = (array)$value;
$virtual_activated_at = date('Y-m-d H:i:s', $value['card_cycle_start']);
if (Carbon::parse($virtual_activated_at) < Carbon::parse('2000-01-01 00:00:00')) {
$virtual_activated_at = null;
}
$array[] = [
'sim' => $value['card_number'],
'imsi' => $value['imsi'],
'iccid' => $value['iccid'],
'bloc_id' => $this->blocs[$value['card_from']] ?? 0,
'carrier_operator' => self::$carrierOperators[$value['carrieroperator']] ?? 255,
'type' => ($value['card_number'][3] >= 5) ? 1 : 0,
'activated_at' => $virtual_activated_at,
'virtual_activated_at' => $virtual_activated_at,
'cancelled_at' => ($value['custom_state'] === 13) ? date('Y-m-d H:i:s', $value['update_time']) : null,
'created_at' => date('Y-m-d H:i:s', $value['create_time']),
'updated_at' => date('Y-m-d H:i:s', $value['update_time']),
];
}
$builder = Card::query()->toBase();
$sql = $builder->getGrammar()->compileInsert($builder, $array);
$sql .= 'on conflict (sim) do update set
activated_at=COALESCE(cards.activated_at, excluded.virtual_activated_at),
virtual_activated_at=excluded.virtual_activated_at,
cancelled_at=excluded.cancelled_at';
$builder->connection->insert($sql, Arr::flatten($array, 1));
app(CardRepository::class)->forgetCached();
}
}

View File

@ -0,0 +1,197 @@
<?php
namespace App\Domains\Virtual\Jobs;
use App\Models\Card\Card;
use Illuminate\Support\Arr;
use App\Models\Virtual\Order;
use Illuminate\Bus\Queueable;
use Illuminate\Support\Carbon;
use Illuminate\Support\Facades\DB;
use Dipper\Foundation\Bus\Dispatchable;
use Illuminate\Support\Facades\Artisan;
use Illuminate\Contracts\Queue\ShouldQueue;
use App\Domains\Config\Services\ConfigService;
use App\Domains\Virtual\Services\ProductService;
use App\Domains\Card\Repositories\BlocRepository;
use App\Domains\Card\Repositories\CardRepository;
use App\Domains\Virtual\Repositories\OrderRepository;
use App\Domains\Virtual\Repositories\PackageRepository;
use App\Domains\Virtual\Repositories\ProductRepository;
use App\Domains\Virtual\Repositories\OrderCardRepository;
use App\Domains\Virtual\Repositories\OrderRenewalCardRepository;
use App\Domains\Virtual\Repositories\OrderFlowPackageCardsRepository;
use App\Domains\Virtual\Repositories\OrderRenewalPackageCardRepository;
class LogSyncJob implements ShouldQueue
{
use Queueable, Dispatchable;
public $page;
public $limit;
public $maxId;
protected $blocs;
protected $packages;
protected $products;
public static $types = [11 => 1, 13 => 0, 14 => 2, 15 => 3];
protected static $carrierOperators = [10 => 0, 11 => 1, 12 => 2];
protected static $payChannels = [10 => 'wx', 11 => 'alipay', 12 => 'bank'];
protected static $orderClasses = [
\App\Models\Virtual\OrderCard::class,
\App\Models\Virtual\OrderRenewalCard::class,
\App\Models\Virtual\OrderRenewalPackageCard::class,
\App\Models\Virtual\OrderFlowPackageCards::class,
];
/**
* Undocumented function
*
* @param int $page
* @param int $limit
* @param int $maxId
*/
public function __construct($page, $limit, $maxId)
{
$this->page = $page;
$this->limit = $limit;
$this->maxId = $maxId;
}
/**
*
*/
public function handle()
{
$this->packages = app(PackageRepository::class)->get()->keyBy('sn');
$this->products = app(ProductRepository::class)->get()->keyBy('sn');
$query = DB::connection('vd_old')->table('logs')->whereIn('type', array_keys(self::$types))
->where('id', '>', $this->maxId)->orderBy('id');
$res = $query->forPage($this->page, $this->limit)->get();
DB::beginTransaction();
foreach ($res as $key => $value) {
$value = (array)$value;
$package = $this->getPackage($value['content']);
$unit_price = intval($value['order_account'] * 100);
$custom_price = intval($value['order_account'] * 100);
$product = $this->getProduct($package, $value['company'], $unit_price);
$type = self::$types[$value['type']];
$pay_channel = self::$payChannels[$value['pay_type']];
// 按规则生成订单编号 (月6+类型1+公司3+套餐4+价格6)
$sn = date('Ym', $value['create_time']) . $type . sprintf('%03d', $value['company']) . sprintf('%04d', $package['id']) . sprintf('%06d', $custom_price);
$data = [
'sn' => $sn,
'source' => 1,
'type' => $type,
'company_id' => $value['company'],
'package_id' => $package['id'],
'product_id' => $product['id'],
'pay_channel' => $pay_channel,
'unit_price' => $unit_price,
'counts' => 1,
'total_price' => $unit_price,
'custom_price' => $custom_price,
'order_at' => date('Y-m-d H:i:s', $value['create_time']),
'order_status' => 4,
'transaction_status' => 1,
'created_at' => date('Y-m-d H:i:s', $value['create_time']),
];
try {
if ($order = Order::where('sn', $data['sn'])->first()) {
$order->counts = $order->counts + 1;
$order->total_price = $order->total_price + $unit_price;
$order->custom_price = $order->custom_price + $custom_price;
$order->save();
} else {
$order = Order::create($data);
}
$relationData = [
'type' => $type,
'sim' => $value['sim'],
'order_id' => $order->id,
'company_id' => $order->company_id,
'package_id' => $order->package_id,
'counts' => 1,
'service_start_at' => date('Y-m-d H:i:s', intval($value['valid_start_time'])),
'service_end_at' => date('Y-m-d H:i:s', intval($value['valid_end_time'])),
'created_at' => date('Y-m-d H:i:s', $value['create_time']),
'updated_at' => date('Y-m-d H:i:s'),
];
$class = (new self::$orderClasses[$type])->query();
if ($type) {
if ($relation = $class->where('sim', $value['sim'])->where('order_id', $order->id)->first()) {
$relation->counts = $relation->counts + 1;
$relation->save();
} else {
$relation = $class->create($relationData);
}
} else {
$relation = $class->upsert($relationData, ['sim', 'order_id', 'deleted_at']);
}
} catch (\Exception $e) {
DB::rollback();
throw $e;
}
}
DB::commit();
app(OrderRepository::class)->forgetCached();
app(OrderCardRepository::class)->forgetCached();
app(OrderRenewalCardRepository::class)->forgetCached();
app(OrderRenewalPackageCardRepository::class)->forgetCached();
app(OrderFlowPackageCardsRepository::class)->forgetCached();
}
/**
* 获取套餐
*
* @param string $sn
* @return void
*/
protected function getPackage($sn)
{
if (!$package = $this->packages[$sn]) {
throw new \Exception('套餐不存在');
}
return $package;
}
/**
* 获取定价
*
* @param string $sn
* @return void
*/
protected function getProduct($package, $companyId, $price)
{
$sn = strtoupper($package['sn'] . '_' . $companyId . '_' . $price);
if (!$product = $this->products[$sn]) {
$product = app(ProductService::class)->store([
'name' => $package['name'] . '' . $price,
'company_id' => $companyId,
'package_id' => $package['id'],
'base_price' => $price,
'renewal_price' => $price,
]);
$this->products[$sn] = $product;
}
return $product;
}
}

View File

@ -61,58 +61,58 @@ class OrderRepository extends Repository
$conditions['id'] = array_wrap($conditions['id']);
$query->whereIn('id', $conditions['id']);
}
if (isset($conditions['source'])) {
$query->where('source', $conditions['source']);
}
if (isset($conditions['type'])) {
$conditions['type'] = array_wrap($conditions['type']);
$query->whereIn('type', $conditions['type']);
}
if (isset($conditions['company_id'])) {
$query->where('company_id', $conditions['company_id']);
}
if (isset($conditions['sn'])) {
$query->where('sn', $conditions['sn']);
}
if (isset($conditions['order_status'])) {
$query->where('order_status', $conditions['order_status']);
}
if (isset($conditions['transaction_status'])) {
$query->where('transaction_status', $conditions['transaction_status']);
}
if (isset($conditions['carrier_operator'])) {
$query->whereHas('package', function ($relation) use ($conditions) {
$relation->withTrashed()->where('carrier_operator', $conditions['carrier_operator']);
});
}
if (isset($conditions['pay_channel'])) {
$query->where('pay_channel', $conditions['pay_channel']);
}
if (isset($conditions['company_name'])) {
$query->whereHas('company', function ($relation) use ($conditions) {
$relation->withTrashed()->where('name', $conditions['company_name']);
});
}
if (isset($conditions['package_name'])) {
$query->whereHas('package', function ($relation) use ($conditions) {
$relation->withTrashed()->where('name', $conditions['package_name']);
});
}
if (isset($conditions['starttime'])) {
$query->where('order_at', '>=', Carbon::parse($conditions['starttime']));
}
if (isset($conditions['endtime'])) {
$query->where('order_at', '<=', Carbon::parse($conditions['endtime']));
}