同步优化

This commit is contained in:
邓皓元 2019-04-26 02:06:45 +08:00
parent f1aea73335
commit fe98879597
4 changed files with 67 additions and 86 deletions

View File

@ -27,7 +27,7 @@ class ActivatedSync extends Command
$endMicrotime = new UTCDateTime($day->copy()->endOfDay());
if ($this->option('month')) {
$startMicrotime = new UTCDateTime($day->copy()->subMonth()->startOfMonth());
$startMicrotime = new UTCDateTime($day->copy()->startOfMonth());
}
$query = TblCard::where('pNo', 'No00000000768')
@ -57,26 +57,39 @@ class ActivatedSync extends Command
];
})->all();
$table = app(Card::class)->getTable();
$as = 'as_table';
$reference = 'sim';
$exists = Card::select('sim')->whereIn('sim', array_pluck($array, 'sim'))
->whereNotNull('virtual_activated_at')->get()->pluck('sim')->toArray();
$updates = "activated_at={$as}.activated_at::timestamp,
virtual_activated_at=COALESCE({$table}.activated_at, {$as}.activated_at::timestamp)::timestamp";
foreach ($array as $key => $value) {
if(in_array($value['sim'], $exists)){
unset($array);
}
}
$parameters = collect($array)->map(function ($item) {
return "({$item['sim']}, '{$item['activated_at']}')";
})->implode(', ');
if (!empty($array)) {
$array = array_values($array);
$from = "FROM (VALUES $parameters) AS {$as}(sim, activated_at)";
$table = app(Card::class)->getTable();
$as = 'as_table';
$reference = 'sim';
$where = "WHERE {$table}.{$reference} = {$as}.{$reference}::int8";
$updates = "activated_at={$as}.activated_at::timestamp,
virtual_activated_at=COALESCE({$table}.activated_at, {$as}.activated_at::timestamp)::timestamp";
$sql = trim("UPDATE {$table} SET {$updates} {$from} {$where}");
$simArray = implode(',', array_pluck($array, 'sim'));
$parameters = collect($array)->map(function ($item) {
return "({$item['sim']}, '{$item['activated_at']}')";
})->implode(', ');
DB::statement($sql);
DB::statement("select fix_timelines('{{$simArray}}'::INT8[]);");
$from = "FROM (VALUES $parameters) AS {$as}(sim, activated_at)";
$where = "WHERE {$table}.{$reference} = {$as}.{$reference}::int8";
$sql = trim("UPDATE {$table} SET {$updates} {$from} {$where}");
$simArray = implode(',', array_pluck($array, 'sim'));
DB::statement($sql);
DB::statement("select fix_timelines('{{$simArray}}'::INT8[]);");
}
$page++;
}

View File

@ -118,15 +118,13 @@ class AddedOrderSync extends Command
$value['virtual_order_id'] = $orders[$value['sim']][$i]['order_id'] ?? 0;
}
if (Carbon::now()->diffInMonths($this->datetime) < 2) {
$exists = Card::select('sim')->whereIn('sim', array_pluck($data, 'sim'))
->whereNotNull('virtual_activated_at')->get()->pluck('sim')->toArray();
$exists = Card::select('sim')->whereIn('sim', array_pluck($data, 'sim'))
->whereNotNull('virtual_activated_at')->get()->pluck('sim')->toArray();
$simArray = array_diff(array_pluck($data, 'sim'), $exists);
$simArray = array_diff(array_pluck($data, 'sim'), $exists);
if (!empty($simArray)) {
MongoCardJob::dispatch($simArray)->onQueue('sync');
}
if (!empty($simArray)) {
MongoCardJob::dispatch($simArray)->onQueue('sync');
}
$only = ['company_id', 'package_id', 'counts', 'unit_price'];

View File

@ -57,15 +57,13 @@ class OrderBaseSync extends Command
$value['virtual_order_id'] = $orders[$value['sim']] ?? 0;
}
if (Carbon::now()->diffInMonths($datetime) < 2) {
$exists = Card::select('sim')->whereIn('sim', array_pluck($data, 'sim'))
->whereNotNull('virtual_activated_at')->get()->pluck('sim')->toArray();
$exists = Card::select('sim')->whereIn('sim', array_pluck($data, 'sim'))
->whereNotNull('virtual_activated_at')->get()->pluck('sim')->toArray();
$simArray = array_diff(array_pluck($data, 'sim'), $exists);
$simArray = array_diff(array_pluck($data, 'sim'), $exists);
if (!empty($simArray)) {
MongoCardJob::dispatch($simArray)->onQueue('sync');
}
if (!empty($simArray)) {
MongoCardJob::dispatch($simArray)->onQueue('sync');
}
$only = [ 'order_id', 'company_id', 'package_id', 'counts', 'unit_price'];

View File

@ -24,81 +24,53 @@ class FlowPoolMonthSync extends Command
$flowPoolsKeyByName = $flowPools->keyBy('name');
$packages = app(PackageRepository::class)->withTrashed()->get()->keyBy('sn');
$old_stats = DB::connection('vd_old')->table('ckb_data_pool_statis')->orderBy('dpmb_id')->get()->collect()->toArray();
$array = [];
foreach ($old_stats as $stat) {
$this->line($stat['dpmb_id'] . '-' . $stat['year_month'] . '-' . $stat['td_pool_sn']);
DB::connection('vd_old')->table('ckb_data_pool_statis_item')->select(['td_pool_sn', 'sim', 'vd_package_sn', 'flows_used'])
->chunk(10000, function ($chunk) use (&$array, $flowPoolsKeyByName, $realFlowPools, $flowPools, $packages) {
foreach ($chunk as $value) {
$pool_id = 0;
$pool_id = 0;
if (strpos($value['td_pool_sn'], 'vd-') === 0) {
$pool_id = $flowPoolsKeyByName[str_replace('vd-', '', $value['td_pool_sn'])]['id'];
} else {
$real_pool_id = $realFlowPools[$value['td_pool_sn']]['id'];
if (strpos($stat['td_pool_sn'], 'vd-') === 0) {
$pool_id = $flowPoolsKeyByName[str_replace('vd-', '', $stat['td_pool_sn'])]['id'];
} else {
$real_pool_id = $realFlowPools[$stat['td_pool_sn']]['id'];
if (!$real_pool_id) {
throw new NotExistException('未找到RD流量池 #:' . $value['td_pool_sn']);
}
if (!$real_pool_id) {
throw new NotExistException('未找到RD流量池 #:' . $stat['td_pool_sn']);
}
foreach ($flowPools as $key => $value) {
if (in_array($real_pool_id, $value['real_pool_ids'])) {
$pool_id = $value['id'];
break;
foreach ($flowPools as $key => $value) {
if (in_array($real_pool_id, $value['real_pool_ids'])) {
$pool_id = $value['id'];
break;
}
}
}
}
if (!$pool_id) {
throw new NotExistException('未找到对应VD流量池 #:' . $stat['td_pool_sn']);
}
if (!$pool_id) {
throw new NotExistException('未找到对应VD流量池 #:' . $value['td_pool_sn']);
}
$items = DB::connection('vd_old')->table('ckb_data_pool_statis_item')
->select(['sim', 'vd_package_sn', 'flows_used'])
->where('pool_statis_id', $stat['dpmb_id'])->get()->collect()->toArray();
$sim = intval($value['sim']);
$key = $sim . '-' . $value['year_month'];
$items = array_map(function ($item) use ($packages, $stat, $pool_id) {
return [
'month' => $stat['year_month'],
'sim' => intval($item['sim']),
'package_id' => $packages[$item['vd_package_sn']]['id'],
$array[$key] = [
'month' => $value['year_month'],
'sim' => $sim,
'package_id' => $packages[$value['vd_package_sn']]['id'],
'pool_id' => $pool_id,
'mebibyte' => floatval($item['flows_used']),
'mebibyte' => isset($array[$key]) ? $array[$key]['mebibyte'] + floatval($value['flows_used']) : floatval($value['flows_used']),
];
}, $items);
}
});
$array = array_merge($array, $items);
}
$array = array_values($array);
$this->line('插入数据,条数 #:' . count($array));
$array = array_groupBy($array, 'month');
foreach ($array as $month => $group) {
$group = array_groupBy($group, 'sim');
$news = [];
foreach ($group as $sim => $item) {
if (count($item) > 1) {
$temp = $item[0];
$mebibyte = 0;
foreach ($item as $key => $value) {
$mebibyte += $value['mebibyte'];
}
$temp['mebibyte'] = $mebibyte;
$news[$sim] = $temp;
}else {
$news[$sim] = $item[0];
}
}
$array[$month] = array_values($news);
}
foreach ($array as $month => $items) {
$table = FlowPoolService::checkTable($month);
foreach (array_chunk($items, 1000) as $values) {