代码拉取完成,页面将自动刷新
# Copyright 2022 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Train nanodetplus and get checkpoint files."""
import os
import ast
import time
import moxing as mox
import mindspore.nn as nn
import mindspore as ms
from mindspore import context, Tensor
from mindspore.communication.management import init, get_rank
from mindspore.train.callback import CheckpointConfig, ModelCheckpoint, LossMonitor, TimeMonitor, Callback, SummaryCollector
from mindspore.train import Model
from mindspore.context import ParallelMode
from mindspore.train.serialization import load_checkpoint, load_param_into_net
from mindspore.common import set_seed
from src.NanodetPlus import nanodetplusWithLossCell, TrainingWrapper,nanodetplus, shufflenet
from src.dataset import create_nanodetplus_dataset
from src.lr_schedule import get_lr, multi_step_lr
from src.init_params import init_net_param, filter_checkpoint_parameter
from src.model_utils.config import config
from src.model_utils.moxing_adapter import moxing_wrapper
from src.model_utils.device_adapter import get_device_id, get_device_num
set_seed(1)
class UploadOutput(Callback):
def __init__(self, train_dir, obs_train_url):
self.train_dir = train_dir
self.obs_train_url = obs_train_url
def epoch_end(self, run_context):
try:
mox.file.copy_parallel(self.train_dir, self.obs_train_url)
print("Successfully Upload {} to {}".format(self.train_dir, self.obs_train_url))
except Exception as e:
print('moxing upload {} to {} failed: '.format(self.train_dir, self.obs_train_url) + str(e))
return
def EnvToObs(train_dir, obs_train_url):
try:
mox.file.copy_parallel(train_dir, obs_train_url)
print("Successfully Upload {} to {}".format(train_dir, obs_train_url))
except Exception as e:
print('moxing upload {} to {} failed: '.format(train_dir, obs_train_url) + str(e))
return
def UploadToQizhi(train_dir, obs_train_url):
device_num = int(os.getenv('RANK_SIZE'))
local_rank = int(os.getenv('RANK_ID'))
if device_num == 1:
EnvToObs(train_dir, obs_train_url)
if device_num > 1:
if local_rank % 8 == 0:
EnvToObs(train_dir, obs_train_url)
return
class Monitor(Callback):
"""
Monitor loss and time.
Args:
lr_init (numpy array): train lr
Returns:
None
Examples:
>>> Monitor(100,lr_init=Tensor([0.05]*100).asnumpy())
"""
def __init__(self, lr_init=None):
super(Monitor, self).__init__()
self.lr_init = lr_init
self.lr_init_len = len(lr_init)
def step_end(self, run_context):
cb_params = run_context.original_args()
print("lr:[{:8.6f}]".format(self.lr_init[cb_params.cur_step_num - 1]), flush=True)
def modelarts_pre_process():
'''modelarts pre process function.'''
def unzip(zip_file, save_dir):
import zipfile
s_time = time.time()
if not os.path.exists(os.path.join(save_dir, config.modelarts_dataset_unzip_name)):
zip_isexist = zipfile.is_zipfile(zip_file)
if zip_isexist:
fz = zipfile.ZipFile(zip_file, 'r')
data_num = len(fz.namelist())
print("Extract Start...")
print("unzip file num: {}".format(data_num))
data_print = int(data_num / 100) if data_num > 100 else 1
i = 0
for file in fz.namelist():
if i % data_print == 0:
print("unzip percent: {}%".format(int(i * 100 / data_num)), flush=True)
i += 1
fz.extract(file, save_dir)
print("cost time: {}min:{}s.".format(int((time.time() - s_time) / 60),
int(int(time.time() - s_time) % 60)))
print("Extract Done.")
else:
print("This is not zip.")
else:
print("Zip has been extracted.")
if config.need_modelarts_dataset_unzip:
zip_file_1 = os.path.join(config.data_path, config.modelarts_dataset_unzip_name + ".zip")
save_dir_1 = os.path.join(config.data_path)
sync_lock = "/tmp/unzip_sync.lock"
# Each server contains 8 devices as most.
if get_device_id() % min(get_device_num(), 8) == 0 and not os.path.exists(sync_lock):
print("Zip file path: ", zip_file_1)
print("Unzip file save dir: ", save_dir_1)
unzip(zip_file_1, save_dir_1)
print("===Finish extract data synchronization===")
try:
os.mknod(sync_lock)
except IOError:
pass
while True:
if os.path.exists(sync_lock):
break
time.sleep(1)
print("Device: {}, Finish sync unzip data from {} to {}.".format(get_device_id(), zip_file_1, save_dir_1))
def set_graph_kernel_context(device_target):
if device_target == "GPU":
# Enable graph kernel for default model ssd300 on GPU back-end.
context.set_context(enable_graph_kernel=True,
graph_kernel_flags="--enable_parallel_fusion --enable_expand_ops=Conv2D")
@moxing_wrapper(pre_process=modelarts_pre_process)
def main():
config.lr_init = ast.literal_eval(config.lr_init)
config.lr_end_rate = ast.literal_eval(config.lr_end_rate)
device_id = get_device_id()
context.set_context(mode=context.GRAPH_MODE, device_target=config.device_target)
if config.device_target == "Ascend":
if context.get_context("mode") == context.PYNATIVE_MODE:
context.set_context(mempool_block_size="31GB")
elif config.device_target == "GPU":
set_graph_kernel_context(config.device_target)
elif config.device_target == "CPU":
device_id = 0
config.distribute = False
else:
raise ValueError(f"device_target support ['Ascend', 'GPU', 'CPU'], but get {config.device_target}")
if config.distribute:
init()
device_num = get_device_num()
rank = get_rank()
context.reset_auto_parallel_context()
context.set_auto_parallel_context(parallel_mode=ParallelMode.DATA_PARALLEL, gradients_mean=True)
else:
rank = 0
device_num = 1
context.set_context(device_id=device_id)
mindrecord_file = os.path.join(config.mindrecord_dir, "nanodet.mindrecord0")
# mindrecord_file = "/home/work/user-job-dir/data/yolof.mindrecord0"
loss_scale = float(config.loss_scale)
# When create MindDataset, using the fitst mindrecord file, such as nanodetplus.mindrecord0.
dataset = create_nanodetplus_dataset(mindrecord_file, repeat_num=1,
num_parallel_workers=config.workers,
batch_size=config.batch_size, device_num=device_num, rank=rank)
dataset_size = dataset.get_dataset_size()
print("Create dataset done!")
backbone = shufflenet(config.num_classes)
# nanodetplus = nanodetplus(backbone, config)
nanodetplus_train = nanodetplus(backbone, config)
net = nanodetplusWithLossCell(nanodetplus_train, config)
# init_net_param(net)
# if hasattr(config, "finetune") and config.finetune:
# init_net_param(net, initialize_mode='XavierUniform')
# else:
# init_net_param(net)
if config.pre_trained:
if config.pre_trained_epoch_size <= 0:
raise KeyError("pre_trained_epoch_size must be greater than 0.")
param_dict = load_checkpoint(config.pre_trained)
if config.filter_weight:
filter_checkpoint_parameter(param_dict)
load_param_into_net(net, param_dict)
lr = Tensor(get_lr(global_step=0,
lr_init=config.lr_init, lr_end=config.lr_end_rate * config.lr, lr_max=config.lr,
warmup_epochs1=config.warmup_epochs1, warmup_epochs2=config.warmup_epochs2,
warmup_epochs3=config.warmup_epochs3, warmup_epochs4=config.warmup_epochs4,
warmup_epochs5=config.warmup_epochs5, total_epochs=config.epoch_size,
steps_per_epoch=dataset_size))
opt = nn.Momentum(filter(lambda x: x.requires_grad, net.get_parameters()), lr,
config.momentum, config.weight_decay, loss_scale)
net = TrainingWrapper(net, opt, loss_scale)
# lr = Tensor(multi_step_lr(0.14, [240,260,275], dataset_size, 300, config.epoch_size, gamma=0.1))
# opt = nn.SGD(filter(lambda x: x.requires_grad, net.get_parameters()),
# lr, 0.9, 0.0001)
# opt = nn.Adam(filter(lambda x: x.requires_grad, net.get_parameters()))
# manager = nn.DynamicLossScaleUpdateCell(loss_scale_value=2**12, scale_factor=4, scale_window=1000)
# net = nn.TrainOneStepWithLossScaleCell(net, opt, manager)
# net = TrainingWrapper(net, opt)
model = Model(net)
print("Start train nanodetplus, the first epoch will be slower because of the graph compilation.")
train_dir = '/cache/output'
uploadOutput = UploadOutput(train_dir, config.train_url)
cb = [TimeMonitor(), LossMonitor(), uploadOutput]
cb += [Monitor(lr_init=lr.asnumpy())]
config_ck = CheckpointConfig(save_checkpoint_steps=dataset_size * config.save_checkpoint_epochs,
keep_checkpoint_max=config.keep_checkpoint_max)
ckpt_cb = ModelCheckpoint(prefix="nanodetplus", directory=config.save_checkpoint_path, config=config_ck)
if config.distribute:
if rank == 0:
cb += [ckpt_cb]
model.train(config.epoch_size, dataset, callbacks=cb, dataset_sink_mode=True)
else:
cb += [ckpt_cb]
model.train(config.epoch_size, dataset, callbacks=cb, dataset_sink_mode=True)
if __name__ == '__main__':
local_data_url = config.data_url
print(os.path.exists(local_data_url))
config.data_url = '/home/work/user-job-dir/data/'
try:
mox.file.copy_parallel(local_data_url, config.data_url)
print("Successfully Download {} to {}".format(local_data_url, config.data_url))
except Exception as e:
print('moxing download {} to {} failed: '.format(local_data_url, config.data_url) + str(e))
print("*****************flag************************")
main()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。