0x00 摘要
NVIDIA Megatron 是一个基于 PyTorch 的分布式训练框架,用来训练超大Transformer语言模型,其通过综合应用了数据并行,Tensor并行和Pipeline并行来复现 GPT3,值得我们深入分析其背后机理。
本系列大概有6~7篇文章,通过论文和源码和大家一起学习研究。本文将对 Megatron 的基本架构做一下梳理。
0x01 启动
1.1 分布式启动
启动脚本在 examples/pretrain_bert_distributed.sh,其利用了 torch.distributed.launch 来启动多个进程。具体业务代码是 pretrain_bert.py。
因为 GPUS_PER_NODE 是8,所以 nproc_per_node 是8,这样,在本机上就启动了8个进程,每个进程之中含有模型的一部分。进程的 rank 是被 torch.distributed.launch 调用 elastic 自动分配的。
#!/bin/bash GPUS_PER_NODE=8 #Changeformultinodeconfig MASTER_ADDR=localhost MASTER_PORT=6000 NNODES=1 NODE_RANK=0 WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES)) DATA_PATH=_text_sentence CHECKPOINT_PATH= DISTRIBUTED_ARGS="--nproc_per_node$GPUS_PER_NODE--nnodes$NNODES--node_rank$NODE_RANK--master_addr$MASTER_ADDR--master_port$MASTER_PORT" python-mtorch.distributed.launch$DISTRIBUTED_ARGS pretrain_bert.py --num-layers24 --hidden-size1024 --num-attention-heads16 --micro-batch-size4 --global-batch-size32 --seq-length512 --max-position-embeddings512 --train-iters1000000 --save$CHECKPOINT_PATH --load$CHECKPOINT_PATH --data-path$DATA_PATH --vocab-filebert-vocab.txt --data-implmmap --split949,50,1 --distributed-backendnccl --lr0.0001 --lr-decay-stylelinear --min-lr1.0e-5 --lr-decay-iters990000 --weight-decay1e-2 --clip-grad1.0 --lr-warmup-fraction.01 --log-interval100 --save-interval10000 --eval-interval1000 --eval-iters10 --fp16
1.2 构造基础
pretrain_bert.py 会调用 pretrain 进行预训练。
if__name__=="__main__": pretrain(train_valid_test_datasets_provider,model_provider, ModelType.encoder_or_decoder, forward_step,args_defaults={'tokenizer_type':'BertWordPieceLowerCase'})
1.2.1 获取模型
model_provider返回模型普通版本(vanilla version)。所谓vanilla,我们指的是一个简单的cpu模型,没有 fp16或 ddp,但是已经被 Megatron 改造为并行的版本。
defmodel_provider(pre_process=True,post_process=True): """Buildthemodel.""" print_rank_0('buildingBERTmodel...') args=get_args() num_tokentypes=2ifargs.bert_binary_headelse0 model=BertModel( num_tokentypes=num_tokentypes, add_binary_head=args.bert_binary_head, parallel_output=True, pre_process=pre_process, post_process=post_process) returnmodel
1.2.2 获取数据集
train_valid_test_datasets_provider 会接受train/valid/test数据集的大小,并返回 “train,valid,test” 数据集。
deftrain_valid_test_datasets_provider(train_val_test_num_samples): """Buildtrain,valid,andtestdatasets.""" args=get_args() print_rank_0('>buildingtrain,validation,andtestdatasets' 'forBERT...') train_ds,valid_ds,test_ds=build_train_valid_test_datasets( data_prefix=args.data_path, data_impl=args.data_impl, splits_string=args.split, train_valid_test_num_samples=train_val_test_num_samples, max_seq_length=args.seq_length, masked_lm_prob=args.mask_prob, short_seq_prob=args.short_seq_prob, seed=args.seed, skip_warmup=(notargs.mmap_warmup), binary_head=args.bert_binary_head) print_rank_0(">finishedcreatingBERTdatasets...") returntrain_ds,valid_ds,test_ds
1.2.3 步进函数
forward_step函数接受一个“数据迭代器”和“模型”,并返回一个“loss”标量,该标量带有一个字典,其中key:value是希望在训练期间监视的信息,例如“lm loss:value”。还要求此函数将“batch generator”添加到timers类中。
defforward_step(data_iterator,model): """Forwardstep.""" args=get_args() #Getthebatch. tokens,types,sentence_order,loss_mask,lm_labels,padding_mask=get_batch( data_iterator) ifnotargs.bert_binary_head: types=None #Forwardpassthroughthemodel. output_tensor=model(tokens,padding_mask,tokentype_ids=types, lm_labels=lm_labels) returnoutput_tensor,partial(loss_func,loss_mask,sentence_order)
1.2.3.1 广播数据
forward_step 会调用 get_batch 获取batch 数据,其内部会从迭代器获取数据,然后使用broadcast_data函数把输入数据从 rank 0 广播到所有tensor-model-parallel 其他 ranks之上。
注意,数据并行是把不同数据加载到不同的rank之上,而 Tensor模型并行组之中每个rank都加载同样数据。
defget_batch(data_iterator): """Buildthebatch.""" #Itemsandtheirtype. keys=['text','types','labels','is_random','loss_mask','padding_mask'] datatype=torch.int64 #Broadcastdata. ifdata_iteratorisnotNone: data=next(data_iterator)#获取数据 else: data=None data_b=mpu.broadcast_data(keys,data,datatype)#把数据广播到各个GPU #Unpack. tokens=data_b['text'].long() types=data_b['types'].long() sentence_order=data_b['is_random'].long() loss_mask=data_b['loss_mask'].float() lm_labels=data_b['labels'].long() padding_mask=data_b['padding_mask'].long() returntokens,types,sentence_order,loss_mask,lm_labels,padding_mask
broadcast_data 在每个model parallel group之上,把数据从rank 0发送到同组其他成员。
defbroadcast_data(keys,data,datatype): """Broadcastdatafromrankzeroofeachmodelparallelgrouptothe membersofthesamemodelparallelgroup. Arguments: keys:listofkeysinthedatadisctionarytobebroadcasted data:datadictionaryofstringkeysandcputensorvalues. datatype:torchdatatypeofalltensorsindataassociated withkeys. """ #Build(key,size)and(key,numberofelements)dictionariesalong #withthetotalnumberofelementsonallranks. key_size,key_numel,total_numel=_build_key_size_numel_dictionaries(keys, data) #Packonrankzero. ifget_tensor_model_parallel_rank()==0:#rank0才压缩 #Checkthatallkeyshavethesamedatatype. _check_data_types(keys,data,datatype) #Flattenthedataassociatedwiththekeys flatten_data=torch.cat( [data[key].contiguous().view(-1)forkeyinkeys],dim=0).cuda() else: flatten_data=torch.empty(total_numel, device=torch.cuda.current_device(), dtype=datatype) #Broadcast torch.distributed.broadcast(flatten_data,get_tensor_model_parallel_src_rank(), group=get_tensor_model_parallel_group()) #Unpack output={} offset=0 forkeyinkeys: size=key_size[key] numel=key_numel[key] output[key]=flatten_data.narrow(0,offset,numel).view(size) offset+=numel returnoutput
get_tensor_model_parallel_src_rank 计算与张量模型并行组中第一个local rank对应的全局rank。
defget_tensor_model_parallel_src_rank(): """Calculatetheglobalrankcorrespondingtothefirstlocalrank inthetensormodelparallelgroup.""" global_rank=torch.distributed.get_rank() local_world_size=get_tensor_model_parallel_world_size() return(global_rank//local_world_size)*local_world_size
逻辑图具体如下,三个不同的函数分别为预训练提供不同的功能输入,做到了解耦。
0x02 Pretrain
BERT训练主要分为两步:
Pre-train:pre-train是迁移学习的基础,是训练token-level的语义理解。
Fine-tuning:在已经训练好的语言模型基础之上,加入特定领域(比如金融医疗)的参数来重新训练,比如对于分类问题就可以在pre-train模型基础之上加上一个softmax,再使用语料 fine-tune。
Pre-train 主要如下:
初始化Megatron。
使用model_provider设置模型、优化器和lr计划。
调用train_val_test_data_provider以获取train/val/test数据集。
使用forward_step_func训练模型。
具体代码如下:
defpretrain(train_valid_test_dataset_provider, model_provider, model_type, forward_step_func, extra_args_provider=None, args_defaults={}): """Maintrainingprogram. Thisfunctionwillrunthefollowingsintheorderprovided: 1)initializeMegatron. 2)setupmodel,optimizerandlrscheduleusingthemodel_provider. 3)calltrain_val_test_data_providertogettrain/val/testdatasets. 4)trainthemodleusingtheforward_step_func. """ #Initalizeandgetarguments,timers,andTensorboardwriter. initialize_megatron(extra_args_provider=extra_args_provider, args_defaults=args_defaults) #Adjustthestartuptimesoitreflectsthelargestvalue. #Thiswillbeclosertowhatschedulerwillsee(outsideof #image...launches. global_TRAIN_START_TIME start_time_tensor=torch.cuda.DoubleTensor([_TRAIN_START_TIME]) torch.distributed.all_reduce(start_time_tensor, op=torch.distributed.ReduceOp.MIN) _TRAIN_START_TIME=start_time_tensor.item() args=get_args() timers=get_timers() #Model,optimizer,andlearningrate.使用model_provider设置模型、优化器和lr计划 model,optimizer,lr_scheduler=setup_model_and_optimizer(model_provider, model_type) #Datastuff.调用train_val_test_data_provider以获取train/val/测试数据集 ifargs.virtual_pipeline_model_parallel_sizeisnotNone: all_data_iterators=[ build_train_valid_test_data_iterators(train_valid_test_dataset_provider) for_inrange(len(model)) ] train_data_iterator=[data_iterators[0]fordata_iteratorsinall_data_iterators] valid_data_iterator=[data_iterators[1]fordata_iteratorsinall_data_iterators] test_data_iterator=[data_iterators[2]fordata_iteratorsinall_data_iterators] else: train_data_iterator,valid_data_iterator,test_data_iterator =build_train_valid_test_data_iterators( train_valid_test_dataset_provider) iteration=0 ifargs.do_trainandargs.train_iters>0: iteration=train(forward_step_func,#训练模型 model,optimizer,lr_scheduler, train_data_iterator,valid_data_iterator) ifargs.do_valid: prefix='theendoftrainingforvaldata' evaluate_and_print_results(prefix,forward_step_func, valid_data_iterator,model, iteration,False) ifargs.saveanditeration!=0: save_checkpoint(iteration,model,optimizer,lr_scheduler) ifargs.do_test: #Runontestdata. prefix='theendoftrainingfortestdata' evaluate_and_print_results(prefix,forward_step_func, test_data_iterator,model, 0,True)
对于我们分析来说,initialize_megatron 是重点,这里初始化了 megatron。
0x03 初始化
3.1 initialize_megatron
initialize_megatron 方法会设置全局变量,初始化分布式环境等等。
definitialize_megatron(extra_args_provider=None,args_defaults={}, ignore_unknown_args=False,allow_no_cuda=False): """Setglobalvariables,initializedistributed,and setautoresumeandrandomseeds. `allow_no_cuda`shouldnotbesetunlessusingmegatronforcpuonly dataprocessing.Ingeneralthisargshouldnotbesetunlessyouknow whatyouaredoing. Returnsafunctiontofinalizedistributedenvinitialization (optionally,onlywhenargs.lazy_mpu_init==True) """ ifnotallow_no_cuda: #Makesurecudaisavailable. asserttorch.cuda.is_available(),'MegatronrequiresCUDA.' #Parseargs,buildtokenizer,andsetadlr-autoresume, #tensorboard-writer,andtimers. set_global_variables(extra_args_provider=extra_args_provider,#设置全局变量 args_defaults=args_defaults, ignore_unknown_args=ignore_unknown_args) #torch.distributedinitialization deffinish_mpu_init(): args=get_args() #Pytorchdistributed. _initialize_distributed()#设置分布式 #Randomseedsforreproducibility. ifargs.rank==0: print('>settingrandomseedsto{}...'.format(args.seed)) _set_random_seed(args.seed) #SetpytorchJITlayerfusionoptions. _set_jit_fusion_options() args=get_args() ifargs.lazy_mpu_init: args.use_cpu_initialization=True #delayedinitializationofDDP-relatedstuff #WeonlysetbasicDDPglobals set_tensor_model_parallel_world_size(args.tensor_model_parallel_size) #andreturnfunctionforexternalDDPmanager #tocallwhenithasDDPinitialized set_tensor_model_parallel_rank(args.rank) returnfinish_mpu_init else: #Megatron'sMPUisthemaster.Completeinitializationrightaway. finish_mpu_init() #Autoresume. _init_autoresume() #Compiledependencies. _compile_dependencies() #Nocontinuationfunction returnNone
3.2 初始化分布式环境
_initialize_distributed 代码位于 megatron/initialize.py,此方法会:
调用 torch.distributed.init_process_group 初始化分布式环境。
调用 mpu.initialize_model_parallel 来设置模型并行,数据并行等各种进程组,我们下文会重点讨论。
创建完worker进程之后,程序需要知道哪些进程在训练同一个模型,torch.distributed.init_process_group 就实现了这个功能。torch.distributed.init_process_group 会生成一个进程组,同组内进程训练同一个模型,也能确定用什么方式进行通信。进程组会给组内每个进程一个序号,就是gloabl rank,如果是多机并行,每个机器创建的进程之间也有一个序号,就是 local rank。如果是单机多卡并行,local rank 和 global rank是一致的。
def_initialize_distributed(): """Initializetorch.distributedandmpu.""" args=get_args() device_count=torch.cuda.device_count() iftorch.distributed.is_initialized(): args.rank=torch.distributed.get_rank() args.world_size=torch.distributed.get_world_size() else: #Manuallysetthedeviceids. ifdevice_count>0: device=args.rank%device_count ifargs.local_rankisnotNone: assertargs.local_rank==device, 'expectedlocal-ranktobethesameasrank%device-count.' else: args.local_rank=device torch.cuda.set_device(device) #Calltheinitprocess torch.distributed.init_process_group(#初始化PyTorch分布式环境 backend=args.distributed_backend, world_size=args.world_size,rank=args.rank, timeout=timedelta(minutes=10)) #Setthetensormodel-parallel,pipelinemodel-parallel,and #data-parallelcommunicators. ifdevice_count>0: ifmpu.model_parallel_is_initialized(): print('modelparallelisalreadyinitialized') else: #初始化模型并行,比如设置各种进程组 mpu.initialize_model_parallel(args.tensor_model_parallel_size, args.pipeline_model_parallel_size, args.virtual_pipeline_model_parallel_size, args.pipeline_model_parallel_split_rank)
3.3 初始化进程组全局变量
因为调用了 mpu.initialize_model_parallel 来设置模型并行,数据并行等各种进程组,所以我们假定目前进程组都已经设置成功,所以每个 rank 对应的进程都有自己的全局变量。假定目前有16个GPU,属于两个node,rank 0 ~7 属于第一个节点,rank 8 ~ 15 属于第二个节点。下面的 gi 指的是第 i 个 GPU。
_TENSOR_MODEL_PARALLEL_GROUP :当前 rank 所属于的Intra-layer model parallel group,就是tensor 并行进程组。
假如每一层分为两个tensor,则 _TENSOR_MODEL_PARALLEL_GROUP 例子为:[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]。
_PIPELINE_MODEL_PARALLEL_GROUP :当前 rank 所属于的Intra-layer model parallel group,就是流水线进程组。
假如流水线深度为4,则例子为 [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]。
_MODEL_PARALLEL_GROUP :当前 rank 所属于的模型并行进程组,包括了以上两组。
针对我们例子,就是完整模型被复制了两份,两份分别对应的 GPU 具体是[0, 1, 4, 5, 8, 9, 12, 13],[2, 3, 6, 7, 10, 11, 14, 15]
_EMBEDDING_GROUP :嵌入对应的进程组。
_DATA_PARALLEL_GROUP :当前 rank 所属于的Data parallel group。
假如数据并行度数为2,则例子为[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]。
#Intra-layermodelparallelgroupthatthecurrentrankbelongsto. _TENSOR_MODEL_PARALLEL_GROUP=None #Inter-layermodelparallelgroupthatthecurrentrankbelongsto. _PIPELINE_MODEL_PARALLEL_GROUP=None #Modelparallelgroup(bothintra-andpipeline)thatthecurrentrankbelongsto. _MODEL_PARALLEL_GROUP=None #Embeddinggroup. _EMBEDDING_GROUP=None #Dataparallelgroupthatthecurrentrankbelongsto. _DATA_PARALLEL_GROUP=None
0x04 设置模型
在 Pretrain 之中,会调用如下来设置模型,优化器等等。
#Model,optimizer,andlearningrate.使用model_provider设置模型、优化器和lr计划 model,optimizer,lr_scheduler=setup_model_and_optimizer(model_provider, model_type)
4.1 setup_model_and_optimizer
setup_model_and_optimizer 方法会设置模型和优化器,其中重点是get_model。
defsetup_model_and_optimizer(model_provider_func,model_type): """Setupmodelandoptimizer.""" args=get_args() model=get_model(model_provider_func,model_type) unwrapped_model=unwrap_model(model, (torchDDP,LocalDDP,Float16Module)) optimizer=get_megatron_optimizer(unwrapped_model) lr_scheduler=get_learning_rate_scheduler(optimizer) ifargs.loadisnotNone: timers=get_timers() #Extrabarrierisaddedtomakesureallranksreportthe #maxtime. torch.distributed.barrier() args.iteration=load_checkpoint(model,optimizer,lr_scheduler) torch.distributed.barrier() else: args.iteration=0 #WeonlysupportlocalDDPwithmultiplemicro-batches. iflen(model)>1ormpu.get_pipeline_model_parallel_world_size()>1: assertargs.DDP_impl=='local' #getmodelwithoutFP16and/orTorchDDPwrappers ifargs.iteration==0andlen(unwrapped_model)==1 andhasattr(unwrapped_model[0],'init_state_dict_from_bert'): unwrapped_model[0].init_state_dict_from_bert() ifargs.fp16: optimizer.reload_model_params() returnmodel,optimizer,lr_scheduler
4.2 模型
4.2.1 BertModel
我们首先看看 BertModel 的初始化函数,略过其他功能函数。其主要调用了 get_language_model。
classBertModel(MegatronModule): """BertLanguagemodel.""" def__init__(self, num_tokentypes=2, add_binary_head=True, parallel_output=True, pre_process=True, post_process=True): super(BertModel,self).__init__() args=get_args() self.fp16_lm_cross_entropy=args.fp16_lm_cross_entropy self.add_binary_head=add_binary_head self.parallel_output=parallel_output self.pre_process=pre_process self.post_process=post_process init_method=init_method_normal(args.init_method_std) scaled_init_method=scaled_init_method_normal(args.init_method_std, args.num_layers) #获取语言模型 self.language_model,self._language_model_key=get_language_model( num_tokentypes=num_tokentypes, add_pooler=self.add_binary_head, encoder_attn_mask_type=AttnMaskType.padding, init_method=init_method, scaled_init_method=scaled_init_method, pre_process=self.pre_process, post_process=self.post_process) self.initialize_word_embeddings(init_method_normal) ifself.post_process:#如果是最后一层,会特殊处理 self.lm_head=BertLMHead( self.word_embeddings_weight().size(0), args.hidden_size,init_method,args.layernorm_epsilon,parallel_output) self._lm_head_key='lm_head' self.binary_head=None ifself.add_binary_head: self.binary_head=get_linear_layer(args.hidden_size,2, init_method) self._binary_head_key='binary_head'
4.2.2 语言模型
get_language_model 会获取一个 TransformerLanguageModel。
defget_language_model(num_tokentypes,add_pooler, encoder_attn_mask_type,init_method=None, scaled_init_method=None,add_encoder=True, add_decoder=False, decoder_attn_mask_type=AttnMaskType.causal, pre_process=True,post_process=True): """Buildlanguagemodelandreturnalongwiththekeytosave.""" args=get_args() ifinit_methodisNone: init_method=init_method_normal(args.init_method_std) ifscaled_init_methodisNone: scaled_init_method=scaled_init_method_normal(args.init_method_std, args.num_layers) #Languagemodel. language_model=TransformerLanguageModel( init_method, scaled_init_method, encoder_attn_mask_type, num_tokentypes=num_tokentypes, add_encoder=add_encoder, add_decoder=add_decoder, decoder_attn_mask_type=decoder_attn_mask_type, add_pooler=add_pooler, pre_process=pre_process, post_process=post_process ) #keyusedforcheckpoints. language_model_key='language_model' returnlanguage_model,language_model_key
TransformerLanguageModel 就是具体的语言模型,其中重要的是 ParallelTransformer。这里会依据传入的配置来进行生成。
如果是第一层,即有 pre_process,则会加入 embedding layer。
如果是中间层,则会根据 encoder 还是 decoder 来生成对应的 ParallelTransformer。
如果是最后一层,即有 post_process,则会加入 Pooler,在外层 BertModel 也会有对应处理。
classTransformerLanguageModel(MegatronModule): """Transformerlanguagemodel. Arguments: transformer_hparams:transformerhyperparameters vocab_size:vocabularysize max_sequence_length:maximumsizeofsequence.This isusedforpositionalembedding embedding_dropout_prob:dropoutprobabilityforembeddings num_tokentypes:sizeofthetoken-typeembeddings.0value willignorethisembedding """ def__init__(self, init_method, output_layer_init_method, encoder_attn_mask_type, num_tokentypes=0, add_encoder=True, add_decoder=False, decoder_attn_mask_type=AttnMaskType.causal, add_pooler=False, pre_process=True, post_process=True): super(TransformerLanguageModel,self).__init__() args=get_args() self.pre_process=pre_process self.post_process=post_process self.hidden_size=args.hidden_size self.num_tokentypes=num_tokentypes self.init_method=init_method self.add_encoder=add_encoder self.encoder_attn_mask_type=encoder_attn_mask_type self.add_decoder=add_decoder self.decoder_attn_mask_type=decoder_attn_mask_type self.add_pooler=add_pooler self.encoder_hidden_state=None #Embeddings. ifself.pre_process: self.embedding=Embedding(self.hidden_size, args.padded_vocab_size, args.max_position_embeddings, args.hidden_dropout, self.init_method, self.num_tokentypes) self._embedding_key='embedding' #Transformer. #Encoder(usuallysettoTrue,Falseifpartofanencoder-decoder #architectureandinencoder-onlystage). ifself.add_encoder: self.encoder=ParallelTransformer( self.init_method, output_layer_init_method, self_attn_mask_type=self.encoder_attn_mask_type, pre_process=self.pre_process, post_process=self.post_process ) self._encoder_key='encoder' else: self.encoder=None #Decoder(usuallysettoFalse,Trueifpartofanencoder-decoder #architectureandindecoder-onlystage). ifself.add_decoder: #Temporaryassertionuntilweverifycorrectnessofpipelineparallelism #implementationofT5. self.decoder=ParallelTransformer( self.init_method, output_layer_init_method, layer_type=LayerType.decoder, self_attn_mask_type=self.decoder_attn_mask_type, pre_process=self.pre_process, post_process=self.post_process) self._decoder_key='decoder' else: self.decoder=None ifself.post_process: #Pooler. ifself.add_pooler: self.pooler=Pooler(self.hidden_size,self.init_method) self._pooler_key='pooler'
4.2.3 ParallelTransformer
这里会调用 ParallelTransformerLayer 生成具体的 Transformer层,我们会在后文中进行分析。
即,ParallelTransformer 包括多个 Transformer,其中每层 Transformer 是一个 ParallelTransformerLayer。
classParallelTransformer(MegatronModule): """Transformerclass.""" def__init__(self,init_method,output_layer_init_method, layer_type=LayerType.encoder, self_attn_mask_type=AttnMaskType.padding, pre_process=True,post_process=True): super(ParallelTransformer,self).__init__() args=get_args() self.bf16=args.bf16 self.fp32_residual_connection=args.fp32_residual_connection self.pre_process=pre_process self.post_process=post_process self.input_tensor=None #Storeactivationcheckpoitingflag. self.activations_checkpoint_method=args.activations_checkpoint_method self.activations_checkpoint_num_layers=args.activations_checkpoint_num_layers self.distribute_checkpointed_activations=args.distribute_checkpointed_activations #Numberoflayers. self.num_layers=mpu.get_num_layers(#获得本Transformer的具体层数 args,args.model_type==ModelType.encoder_and_decoder) #Transformerlayers. defbuild_layer(layer_number): returnParallelTransformerLayer(#返回一层Transformmer init_method, output_layer_init_method, layer_number, layer_type=layer_type, self_attn_mask_type=self_attn_mask_type) ifargs.virtual_pipeline_model_parallel_sizeisnotNone: #Numberoflayersineachmodelchunkisthenumberoflayersinthestage, #dividedbythenumberofmodelchunksinastage. self.num_layers=self.num_layers//args.virtual_pipeline_model_parallel_size #With8layers,2stages,and4modelchunks,wewantanassignmentof #layerstostageslike(eachlistisamodelchunk): #Stage0:[0][2][4][6] #Stage1:[1][3][5][7] #With8layers,2stages,and2virtualstages,wewantanassignmentof #layerstostageslike(eachlistisamodelchunk): #Stage0:[0,1][4,5] #Stage1:[2,3][6,7] offset=mpu.get_virtual_pipeline_model_parallel_rank()*( args.num_layers//args.virtual_pipeline_model_parallel_size)+ (mpu.get_pipeline_model_parallel_rank()*self.num_layers) else: #Eachstagegetsacontiguoussetoflayers. offset=mpu.get_pipeline_model_parallel_rank()*self.num_layers self.layers=torch.nn.ModuleList(#生成num_layers个Transformer [build_layer(i+1+offset)foriinrange(self.num_layers)]) ifself.post_process: #Finallayernormbeforeoutput. self.final_layernorm=LayerNorm( args.hidden_size, eps=args.layernorm_epsilon, no_persist_layer_norm=args.no_persist_layer_norm)
目前逻辑如下,我们假定有两个 transformer:
4.2.3.1 获取层数
这里一个重点就是获取层数,即获取本模型在并行处理状况下,应该拥有多少层。如果模型一共64层,流水线深度为16,则并行每个阶段有4层,则本子模型拥有4层。
defget_num_layers(args,is_encoder_and_decoder_model): """Computethenumberoftransformerlayersresidentonthecurrentrank.""" ifget_pipeline_model_parallel_world_size()>1: ifis_encoder_and_decoder_model: assertargs.pipeline_model_parallel_split_rankisnotNone num_ranks_in_encoder=args.pipeline_model_parallel_split_rank num_ranks_in_decoder=get_pipeline_model_parallel_world_size()-num_ranks_in_encoder ifis_pipeline_stage_before_split(): num_layers=args.num_layers//num_ranks_in_encoder else: num_layers=args.num_layers//num_ranks_in_decoder else: num_layers=args.num_layers//get_pipeline_model_parallel_world_size() else: num_layers=args.num_layers returnnum_layers
get_pipeline_model_parallel_world_size 获取本流水线组world size数目,就是流水线深度。
defget_pipeline_model_parallel_world_size(): """Returnworldsizeforthepipelinemodelparallelgroup.""" global_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE if_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZEisnotNone: return_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE returntorch.distributed.get_world_size(group=get_pipeline_model_parallel_group())
_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE 的意思是流水线深度 p,就是纵向切 p-1刀。比如一共 12 层,纵向切 5 刀,则有 6 个stage,每个 stage 有 2 层。
4.2.3.2 前向传播
我们接着看看其前向传播函数,这里主要就是调用内部 ParallelTransformerLayer 的 forward 方法,如果是第一层或者最后一层,则做特殊处理。
defforward(self,hidden_states,attention_mask, encoder_output=None,enc_dec_attn_mask=None, inference_params=None): ifself.pre_process: #Dataformatchangetoavoidexplicittranposes:[bsh]-->[sbh]. #Iftheinputflagforfp32residualconnectionisset,convertforfloat. ifself.fp32_residual_connection: hidden_states=hidden_states.transpose(0,1).contiguous().float() #Otherwise,leaveitasis. else: hidden_states=hidden_states.transpose(0,1).contiguous() else: #Seeset_input_tensor() hidden_states=self.input_tensor ifencoder_outputisnotNone: encoder_output=encoder_output.transpose(0,1).contiguous() ifself.activations_checkpoint_methodisnotNone: hidden_states=self._checkpointed_forward(hidden_states, attention_mask, encoder_output, enc_dec_attn_mask) else: forindexinrange(self.num_layers): layer=self._get_layer(index) hidden_states=layer(#调用ParallelTransformerLayer的forward函数 hidden_states, attention_mask, encoder_output=encoder_output, enc_dec_attn_mask=enc_dec_attn_mask, inference_params=inference_params) #Finallayernorm. ifself.post_process: #Revertingdataformatchange[sbh]-->[bsh]. hidden_states=hidden_states.transpose(0,1).contiguous() output=self.final_layernorm(hidden_states) else: output=hidden_states returnoutput
4.3 get_model
现在让我们回到 get_model,把生成模型的流程整理出来。
BERT之中含有多个transformer,所以直接按照层数切分,每一层是一模一样的transformer layer。前面提到了,在我们样例之中启动了8个进程,每个进程里面有一个子模型,即原始BERT模型的部分层。但是怎么知道每个子模型包含了多少层?答案是:因为已经建立了各种进程组,所以 get_model 方法会依据目前进程组情况进行处理。单个进程内模型获取如下:
如果是有 virtual 设置,则会遍历 virtual size,生成对应数目的模型(BertModel)。
否则如果是 encoder_and_decoder,则针对split进行配置。
设置 tensor model parallel 属性。
把本模型放置到GPU之上。
如果需要数据并行,则配置DDP。
具体代码如下:
defget_model(model_provider_func,model_type=ModelType.encoder_or_decoder,wrap_with_ddp=True): """Buildthemodel.""" args=get_args() args.model_type=model_type #Buildmodel. ifmpu.get_pipeline_model_parallel_world_size()>1and args.virtual_pipeline_model_parallel_sizeisnotNone:#有virtual设置,后续会提到 model=[] foriinrange(args.virtual_pipeline_model_parallel_size):#遍历virtual #设置rank,主要是为了看是不是第一层,最后一层 mpu.set_virtual_pipeline_model_parallel_rank(i) #Setpre_processandpost_processonlyaftervirtualrankisset. pre_process=mpu.is_pipeline_first_stage() post_process=mpu.is_pipeline_last_stage() this_model=model_provider_func(#获取原始模型BertModel pre_process=pre_process, post_process=post_process ) this_model.model_type=model_type model.append(this_model)#模型列表之中添加一个新的BertModel else: pre_process=mpu.is_pipeline_first_stage()#是不是第一层 post_process=mpu.is_pipeline_last_stage()#是不是最后一层 add_encoder=True add_decoder=True ifmodel_type==ModelType.encoder_and_decoder: ifmpu.get_pipeline_model_parallel_world_size()>1: rank=mpu.get_pipeline_model_parallel_rank() split_rank=args.pipeline_model_parallel_split_rank world_size=mpu.get_pipeline_model_parallel_world_size() pre_process=rank==0orrank==split_rank#是不是第一层 post_process=(rank==(split_rank-1))or(#是不是最后一层 rank==(world_size-1)) add_encoder=mpu.is_pipeline_stage_before_split() add_decoder=mpu.is_pipeline_stage_after_split() model=model_provider_func(#获取原始模型 pre_process=pre_process, post_process=post_process, add_encoder=add_encoder, add_decoder=add_decoder) else: model=model_provider_func(#获取原始模型 pre_process=pre_process, post_process=post_process ) model.model_type=model_type ifnotisinstance(model,list): model=[model] #Settensormodelparallelattributesifnotset. #Onlyparametersthatarealreadytensormodelparallelhavethese #attributessetforthem.Weshouldmakesurethedefaultattributes #aresetforallparamssotheoptimizercanusethem. formodel_moduleinmodel: forparaminmodel_module.parameters(): mpu.set_defaults_if_not_set_tensor_model_parallel_attributes(param) #GPUallocation. formodel_moduleinmodel:#把本模型放置到GPU之上 model_module.cuda(torch.cuda.current_device()) #Fp16conversion. ifargs.fp16orargs.bf16: model=[Float16Module(model_module,args)formodel_moduleinmodel] ifwrap_with_ddp:#如果需要数据并行,则配置DDP ifargs.DDP_impl=='torch': i=torch.cuda.current_device() model=[torchDDP(model_module,device_ids=[i],output_device=i, process_group=mpu.get_data_parallel_group()) formodel_moduleinmodel] elifargs.DDP_impl=='local': model=[LocalDDP(model_module, args.accumulate_allreduce_grads_in_fp32, args.use_contiguous_buffers_in_local_ddp) formodel_moduleinmodel] else: raiseNotImplementedError('UnknownDDPimplementationspecified:' '{}.Exiting.'.format(args.DDP_impl)) returnmodel
单个进程内的逻辑大致如下,这里 torchDDP 的意思是把 BertModel 之中的 module 用 torchDDP 来封装。
0x05 数据并行
5.1 设置数据
build_train_valid_test_data_iterators 方法会对数据进行处理,提供了 train,valid,test 三种不同的数据集。
defbuild_train_valid_test_data_iterators( build_train_valid_test_datasets_provider): """XXX""" args=get_args() (train_dataloader,valid_dataloader,test_dataloader)=(None,None,None) #Backwardcompatibility,assumefixedbatchsize. ifargs.iteration>0andargs.consumed_train_samples==0: args.consumed_train_samples=args.iteration*args.global_batch_size ifargs.iteration>0andargs.consumed_valid_samples==0: ifargs.train_samplesisNone: args.consumed_valid_samples=(args.iteration//args.eval_interval)* args.eval_iters*args.global_batch_size #Dataloaderonlyonrank0ofeachmodelparallelgroup. ifmpu.get_tensor_model_parallel_rank()==0: #Numberoftrain/valid/testsamples. ifargs.train_samples: train_samples=args.train_samples else: train_samples=args.train_iters*args.global_batch_size eval_iters=(args.train_iters//args.eval_interval+1)* args.eval_iters test_iters=args.eval_iters train_val_test_num_samples=[train_samples, eval_iters*args.global_batch_size, test_iters*args.global_batch_size] #Buildthedatasets. train_ds,valid_ds,test_ds=build_train_valid_test_datasets_provider( train_val_test_num_samples) #Builddataloders. train_dataloader=build_pretraining_data_loader( train_ds,args.consumed_train_samples) valid_dataloader=build_pretraining_data_loader( valid_ds,args.consumed_valid_samples) test_dataloader=build_pretraining_data_loader(test_ds,0) #Flagstoknowifweneedtodotraining/validation/testing. do_train=train_dataloaderisnotNoneandargs.train_iters>0 do_valid=valid_dataloaderisnotNoneandargs.eval_iters>0 do_test=test_dataloaderisnotNoneandargs.eval_iters>0 #Needtobroadcastnum_tokensandnum_type_tokens. flags=torch.cuda.LongTensor( [int(do_train),int(do_valid),int(do_test)]) else: flags=torch.cuda.LongTensor([0,0,0]) #Broadcastnumtokens. torch.distributed.broadcast(flags, mpu.get_tensor_model_parallel_src_rank(), group=mpu.get_tensor_model_parallel_group()) args.do_train=flags[0].item() args.do_valid=flags[1].item() args.do_test=flags[2].item() #Builditerators. dl_type=args.dataloader_type iftrain_dataloaderisnotNone: train_data_iterator=iter(train_dataloader)ifdl_type=='single' elseiter(cyclic_iter(train_dataloader)) else: train_data_iterator=None ifvalid_dataloaderisnotNone: valid_data_iterator=iter(valid_dataloader)ifdl_type=='single' elseiter(cyclic_iter(valid_dataloader)) else: valid_data_iterator=None iftest_dataloaderisnotNone: test_data_iterator=iter(test_dataloader)ifdl_type=='single' elseiter(cyclic_iter(test_dataloader)) else: test_data_iterator=None returntrain_data_iterator,valid_data_iterator,test_data_iterator
5.2 DDP
在 get_model 之中,有如下代码使用 DDP。
frommegatron.modelimportDistributedDataParallelasLocalDDP fromtorch.nn.parallel.distributedimportDistributedDataParallelastorchDDP ifwrap_with_ddp: ifargs.DDP_impl=='torch': i=torch.cuda.current_device() model=[torchDDP(model_module,device_ids=[i],output_device=i, process_group=mpu.get_data_parallel_group()) formodel_moduleinmodel] elifargs.DDP_impl=='local': model=[LocalDDP(model_module, args.accumulate_allreduce_grads_in_fp32, args.use_contiguous_buffers_in_local_ddp) formodel_moduleinmodel] else: raiseNotImplementedError('UnknownDDPimplementationspecified:' '{}.Exiting.'.format(args.DDP_impl))
所以我们看看 megatron 自己的 DDP实现。
5.2.1 定义
定义只有注释可以看看,使用连续的(contiguous)内存来存储和累积梯度,每一种类型的张量属于一个统一的内存,可以统一做 allreduce。
classDistributedDataParallel(DistributedDataParallelBase): """DDPwithcontiguousbuffersoptionstostorreandaccumulategradients. Thisclass: -hasthepotentialtoreducememoryfragmentation. -providestheoptiontodothegradientaccumulation inatypeotherthantheparamstype(forexamplefp32) Arguments: module:inputmodel. accumulate_allreduce_grads_in_fp32:iftruedothegradientaccumulation andthegradientall-reduceallininfloat32.Ifthisoptionis true,werequire`use_contiguous_buffers`tobetruetoo. use_contiguous_buffers:iftrue,useacontiguousbuffertostorethe gradients. """
5.2.2 初始化
初始化方法的目的是把同类型梯度连续存储。
def__init__(self,module, accumulate_allreduce_grads_in_fp32, use_contiguous_buffers): super(DistributedDataParallel,self).__init__(module) self.accumulate_allreduce_grads_in_fp32 =accumulate_allreduce_grads_in_fp32 self.use_contiguous_buffers=use_contiguous_buffers #Ifweareusingfp32-accumulate-allreduceexplicitly #thismeansweneedmaingradsinacontinousbuffer. ifself.accumulate_allreduce_grads_in_fp32: assertself.use_contiguous_buffers #=================================== #Restofthispartappliesonlyto #thecaseweusecontinuousbuffers. #=================================== self._grad_buffers=None ifself.use_contiguous_buffers:#这里只考虑连续内存 self._grad_buffers={}#定义buffer #Simplefunctiontodefinebuffertype. def_get_buffer_type(param):#返回buffer类型 returntorch.floatif self.accumulate_allreduce_grads_in_fp32elseparam.dtype #Firstcalculatetotalnumberofelementspertype. type_num_elements={} forparaminself.module.parameters():#遍历模型参数 ifparam.requires_grad:#如果需要计算梯度 dtype=_get_buffer_type(param)#获取参数类型 type_num_elements[dtype]=type_num_elements.get(dtype,0) +param.data.nelement()#该类型参数数目做相应增加 #目前type_num_elements是各种类型参数的个数 #Allocatethebuffer. fordtype,num_elementsintype_num_elements.items():#遍历各种类型 self._grad_buffers[dtype]=MemoryBuffer(num_elements,dtype)#分配内存 #这里是假定反向传播是参数的反方向,存储每个参数梯度的起始位置 #Assumethebackproporderisreversetheparamsorder, #storethestartindexforthegradients. forparaminself.module.parameters():#遍历模型参数 ifparam.requires_grad:#如果需要计算梯度 dtype=_get_buffer_type(param)#获取参数类型 type_num_elements[dtype]-=param.data.nelement()#减少size #确定该参数在MemoryBuffer的位置 param.main_grad=self._grad_buffers[dtype].get(#获取该参数对应的内存 param.data.shape,type_num_elements[dtype]) #Backwardhook. #Accumalationfunctionforthegradients.Weneed #tostorethemsotheydon'tgooutofscope. self.grad_accs=[] #Loopoveralltheparametersinthemodel. forparaminself.module.parameters():#遍历模型参数 ifparam.requires_grad:#如果需要计算梯度 #Expandsowegetaccesstograd_fn. param_tmp=param.expand_as(param) #Getthegradientaccumulatorfuncttion. grad_acc=param_tmp.grad_fn.next_functions[0][0]#得到参数对应的梯度函数 grad_acc.register_hook(self._make_param_hook(param))#注册了hook self.grad_accs.append(grad_acc)#统一管理梯度函数,其实就是bookkeeping作用
5.2.3 内存
MemoryBuffer 是内存抽象。
classMemoryBuffer: def__init__(self,numel,dtype): self.numel=numel self.dtype=dtype self.data=torch.zeros(self.numel,#初始化内存 dtype=self.dtype, device=torch.cuda.current_device(), requires_grad=False) defzero(self): """Resetthebuffertozero.""" self.data.zero_() defget(self,shape,start_index): """Returnatensorwiththeinput`shape`asaviewintothe 1-Ddatastartingat`start_index`.""" end_index=start_index+shape.numel()#定位到该张量在内存buffer之中的位置 assertend_index<= self.numel, 'requested tensor is out of the buffer range.' buffer_tensor = self.data[start_index:end_index] # 拿到内存 buffer_tensor = buffer_tensor.view(shape) return buffer_tensor #
5.2.4 支撑函数
下面是两个支撑函数,分别是用于拷贝梯度和将buffer清零。
def_make_param_hook(self,param): """Createtheall-reducehookforbackprop.""" #Hookusedforback-prop. defparam_hook(*unused): #Addthegradienttothebuffer. ifparam.grad.dataisnotNone: param.main_grad.add_(param.grad.data)#把梯度拷贝到连续内存之中 #Nowwecandeallocategradmemory. param.grad=None returnparam_hook defzero_grad_buffer(self): """Setthegradbufferdatatozero.Needstobecalledatthe beginingofeachiteration.""" assertself._grad_buffersisnotNone,'buffersarenotinitialized.' for_,buffer_inself._grad_buffers.items(): buffer_.zero()
我们假定模型有6个参数,3个 fp32,3 个 fp16,所以被组合成两个连续内存 MemoryBuffer。
5.2.5 梯度规约
allreduce_gradients 是 DDP 对外提供的 API,在后面 train step 之中会调用到。
defallreduce_gradients(self): """Reducegradientsacrossdataparallelranks.""" #Ifwehavebuffers,simplyreducethedatainthebuffer. ifself._grad_buffersisnotNone: #连续内存 for_,buffer_inself._grad_buffers.items():#遍历各种类型的buffer buffer_.data/=mpu.get_data_parallel_world_size() torch.distributed.all_reduce(#统一归并 buffer_.data,group=mpu.get_data_parallel_group()) else: #Otherwise,bucketizeandall-reduce buckets={}#否则还是用桶来归并 #Packthebuckets. forparaminself.module.parameters():#遍历梯度 ifparam.requires_gradandparam.gradisnotNone: tp=param.data.type() iftpnotinbuckets: buckets[tp]=[] buckets[tp].append(param)#同类型的梯度放到对应类型的桶之中 param.main_grad=param.grad #Foreachbucket,all-reduceandcopyall-reducedgrads. fortpinbuckets: bucket=buckets[tp] grads=[param.grad.dataforparaminbucket]#把桶里的梯度拿出来 coalesced=_flatten_dense_tensors(grads)#打平梯度 coalesced/=mpu.get_data_parallel_world_size() torch.distributed.all_reduce(#归并 coalesced,group=mpu.get_data_parallel_group()) forbuf,syncedinzip(grads,_unflatten_dense_tensors( coalesced,grads)): buf.copy_(synced)
运行时候,分别对两种类型的连续内存做 AllReduce。
0x06 训练
Pretrain 之中会调用 train 来进行训练。
ifargs.do_trainandargs.train_iters>0: iteration=train(forward_step_func, model,optimizer,lr_scheduler, train_data_iterator,valid_data_iterator)
6.1 训练主体
train 是常规的套路,大家基本上按照名字就可以理解。
deftrain(forward_step_func,model,optimizer,lr_scheduler, train_data_iterator,valid_data_iterator): """Trainthemodelfunction.""" args=get_args() timers=get_timers() #Writeargstotensorboard write_args_to_tensorboard() #Turnontrainingmodewhichenablesdropout. formodel_moduleinmodel: model_module.train()# #Trackingloss. total_loss_dict={} #Iterations. iteration=args.iteration report_memory_flag=True whileiteration< args.train_iters: update_num_microbatches(args.consumed_train_samples) loss_dict, skipped_iter, grad_norm, num_zeros_in_grad = train_step(forward_step_func, # 训练 train_data_iterator, model, optimizer, lr_scheduler) iteration += 1 args.consumed_train_samples += mpu.get_data_parallel_world_size() * args.micro_batch_size * get_num_microbatches() # Logging. loss_scale = optimizer.get_loss_scale().item() params_norm = None if args.log_params_norm: params_norm = calc_params_l2_norm(model) report_memory_flag = training_log(loss_dict, total_loss_dict, optimizer.param_groups[0]['lr'], iteration, loss_scale, report_memory_flag, skipped_iter, grad_norm, params_norm, num_zeros_in_grad) # Autoresume if args.adlr_autoresume and (iteration % args.adlr_autoresume_interval == 0): check_adlr_autoresume_termination(iteration, model, optimizer, lr_scheduler) # Evaluation if args.eval_interval and iteration % args.eval_interval == 0 and args.do_valid: prefix = 'iteration {}'.format(iteration) evaluate_and_print_results(prefix, forward_step_func, valid_data_iterator, model, iteration, False) # Checkpointing saved_checkpoint = False if args.exit_signal_handler: signal_handler = get_signal_handler() if any(signal_handler.signals_received()): save_checkpoint_and_time(iteration, model, optimizer, lr_scheduler) sys.exit() if args.save and args.save_interval and iteration % args.save_interval == 0: save_checkpoint_and_time(iteration, model, optimizer, lr_scheduler) saved_checkpoint = True # Exiting based on duration if args.exit_duration_in_mins: train_time = (time.time() - _TRAIN_START_TIME) / 60.0 done_cuda = torch.cuda.IntTensor( [train_time >args.exit_duration_in_mins]) torch.distributed.all_reduce( done_cuda,op=torch.distributed.ReduceOp.MAX) done=done_cuda.item() ifdone: ifnotsaved_checkpoint: save_checkpoint_and_time(iteration,model,optimizer, lr_scheduler) sys.exit() #Exitingbasedoniterations ifargs.exit_intervalanditeration%args.exit_interval==0: ifnotsaved_checkpoint: save_checkpoint_and_time(iteration,model,optimizer, lr_scheduler) torch.distributed.barrier() sys.exit() returniteration
6.2 训练step
train_step 会获取 get_forward_backward_func 得到 schedule,因为是流水线并行,所以需要 schedule 如何具体训练。
deftrain_step(forward_step_func,data_iterator, model,optimizer,lr_scheduler): """Singletrainingstep.""" args=get_args() timers=get_timers() #Setgradtozero. ifargs.DDP_impl=='local'andargs.use_contiguous_buffers_in_local_ddp: forpartitioninmodel: partition.zero_grad_buffer() optimizer.zero_grad() #获取训练schedule forward_backward_func=get_forward_backward_func() losses_reduced=forward_backward_func(#进行训练 forward_step_func,data_iterator,model, optimizer,timers,forward_only=False) #Emptyunusedmemory ifargs.empty_unused_memory_level>=1: torch.cuda.empty_cache() #All-reduceifneeded. ifargs.DDP_impl=='local': formodel_moduleinmodel: model_module.allreduce_gradients() #All-reduceword_embeddings'gradacrossfirstandlaststagestoensure #thatword_embeddingsparametersstayinsync. #Thisshouldonlyrunformodelsthatsupportpipelinedmodelparallelism #(BERTandGPT-2). ifmpu.is_rank_in_embedding_group(ignore_virtual=True)and mpu.get_pipeline_model_parallel_world_size()>1: ifmpu.is_pipeline_first_stage(ignore_virtual=True): unwrapped_model=model[0] elifmpu.is_pipeline_last_stage(ignore_virtual=True): unwrapped_model=model[-1] else:#WedonotsupporttheinterleavedscheduleforT5yet. unwrapped_model=model[0] unwrapped_model=unwrap_model( unwrapped_model,(torchDDP,LocalDDP,Float16Module)) ifunwrapped_model.share_word_embeddings: word_embeddings_weight=unwrapped_model.word_embeddings_weight() ifargs.DDP_impl=='local': grad=word_embeddings_weight.main_grad else: grad=word_embeddings_weight.grad torch.distributed.all_reduce(grad,group=mpu.get_embedding_group()) #Updateparameters. update_successful,grad_norm,num_zeros_in_grad=optimizer.step() #Updatelearningrate. ifupdate_successful: increment=get_num_microbatches()* args.micro_batch_size* args.data_parallel_size lr_scheduler.step(increment=increment) skipped_iter=0 else: skipped_iter=1 #Emptyunusedmemory ifargs.empty_unused_memory_level>=2: torch.cuda.empty_cache() ifmpu.is_pipeline_last_stage(ignore_virtual=True): #Averagelossacrossmicrobatches. loss_reduced={} forkeyinlosses_reduced[0]: losses_reduced_for_key=[x[key]forxinlosses_reduced] loss_reduced[key]=sum(losses_reduced_for_key)/len(losses_reduced_for_key) returnloss_reduced,skipped_iter,grad_norm,num_zeros_in_grad return{},skipped_iter,grad_norm,num_zeros_in_grad
6.3 获取schedule
get_forward_backward_func 获取 pipeline 的schedule,这里分为 flush 和 interleaving 两种,我们后续会分析这两种schedule。
defget_forward_backward_func(): args=get_args() ifmpu.get_pipeline_model_parallel_world_size()>1: ifargs.virtual_pipeline_model_parallel_sizeisnotNone: forward_backward_func=forward_backward_pipelining_with_interleaving else: forward_backward_func=forward_backward_pipelining_without_interleaving else: forward_backward_func=forward_backward_no_pipelining returnforward_backward_func
训练逻辑大体拓展为:
至此,Megatron 基本架构分析完毕,下一篇我们介绍模型并行设置。
审核编辑:汤梓红
-
NVIDIA
+关注
关注
14文章
5075浏览量
103620 -
源码
+关注
关注
8文章
652浏览量
29422 -
模型
+关注
关注
1文章
3298浏览量
49154 -
语言模型
+关注
关注
0文章
538浏览量
10327 -
pytorch
+关注
关注
2文章
808浏览量
13340
原文标题:[源码解析] 模型并行分布式训练Megatron (2) --- 整体架构
文章出处:【微信号:GiantPandaCV,微信公众号:GiantPandaCV】欢迎添加关注!文章转载请注明出处。
发布评论请先 登录
相关推荐
评论