diff --git a/src/cudadecoder/Makefile b/src/cudadecoder/Makefile index ede7cfddbe7..166f72e060f 100644 --- a/src/cudadecoder/Makefile +++ b/src/cudadecoder/Makefile @@ -23,7 +23,8 @@ LIBNAME = kaldi-cudadecoder ADDLIBS = ../cudamatrix/kaldi-cudamatrix.a ../base/kaldi-base.a ../matrix/kaldi-matrix.a \ ../lat/kaldi-lat.a ../util/kaldi-util.a ../matrix/kaldi-matrix.a ../gmm/kaldi-gmm.a \ ../fstext/kaldi-fstext.a ../hmm/kaldi-hmm.a ../gmm/kaldi-gmm.a ../transform/kaldi-transform.a \ - ../tree/kaldi-tree.a ../online2/kaldi-online2.a ../nnet3/kaldi-nnet3.a + ../tree/kaldi-tree.a ../online2/kaldi-online2.a ../nnet3/kaldi-nnet3.a \ + ../cudafeat/kaldi-cudafeat.a # Implicit rule for kernel compilation %.o : %.cu diff --git a/src/cudadecoder/batched-threaded-nnet3-cuda-pipeline.cc b/src/cudadecoder/batched-threaded-nnet3-cuda-pipeline.cc index 33264bd7a9f..a25116a3b61 100644 --- a/src/cudadecoder/batched-threaded-nnet3-cuda-pipeline.cc +++ b/src/cudadecoder/batched-threaded-nnet3-cuda-pipeline.cc @@ -248,9 +248,15 @@ void BatchedThreadedNnet3CudaPipeline::OpenDecodeHandle( task->callback = std::move(callback); task->Init(key, wave_data); - work_pool_->enqueue(THREAD_POOL_LOW_PRIORITY, - &BatchedThreadedNnet3CudaPipeline::ComputeOneFeature, - this, task); + if (config_.gpu_feature_extract) { + //Feature extraction done on device + AddTaskToPendingTaskQueue(task); + } else { + //Feature extraction done on host thread + work_pool_->enqueue(THREAD_POOL_LOW_PRIORITY, + &BatchedThreadedNnet3CudaPipeline::ComputeOneFeatureCPU, + this, task); + } } void BatchedThreadedNnet3CudaPipeline::OpenDecodeHandle( @@ -262,9 +268,15 @@ void BatchedThreadedNnet3CudaPipeline::OpenDecodeHandle( task->callback = std::move(callback); task->Init(key, wave_data, sample_rate); - work_pool_->enqueue(THREAD_POOL_LOW_PRIORITY, - &BatchedThreadedNnet3CudaPipeline::ComputeOneFeature, - this, task); + if (config_.gpu_feature_extract) { + //Feature extraction done on device + AddTaskToPendingTaskQueue(task); + } else { + //Feature extraction done on host thread + work_pool_->enqueue(THREAD_POOL_LOW_PRIORITY, + &BatchedThreadedNnet3CudaPipeline::ComputeOneFeatureCPU, + this, task); + } } bool BatchedThreadedNnet3CudaPipeline::GetRawLattice(const std::string &key, @@ -410,6 +422,8 @@ void BatchedThreadedNnet3CudaPipeline::AquireAdditionalTasks( void BatchedThreadedNnet3CudaPipeline::ComputeBatchNnet( nnet3::NnetBatchComputer &computer, int32 first, std::vector &tasks) { + nvtxRangePushA("ComputeBatchNnet"); + bool output_to_cpu = false; int32 online_ivector_period = 0; int max_pending_minibatches = @@ -421,17 +435,32 @@ void BatchedThreadedNnet3CudaPipeline::ComputeBatchNnet( // for all new batches enqueue up nnet work. for (int i = first; i < tasks.size(); i++) { TaskState &task = *tasks[i]; - Vector &ivector_features = task.task_data->ivector_features; - Matrix &input_features = task.task_data->input_features; + std::shared_ptr &task_data = task.task_data; std::vector &ntasks = nnet_tasks[i]; + + if (config_.gpu_feature_extract) { + CuVector &ivector_features = task_data->ivector_features; + CuMatrix &input_features = task_data->input_features; + + CuVector *ifeat = NULL; + if (ivector_features.Dim() > 0) { + ifeat = &ivector_features; + } + // create task list + computer.SplitUtteranceIntoTasks(output_to_cpu, input_features, ifeat, + NULL, online_ivector_period, &ntasks); + } else { + Vector &ivector_features = task_data->ivector_features_cpu; + Matrix &input_features = task_data->input_features_cpu; - Vector *ifeat = NULL; - if (ivector_features.Dim() > 0) { - ifeat = &ivector_features; + Vector *ifeat = NULL; + if (ivector_features.Dim() > 0) { + ifeat = &ivector_features; + } + // create task list + computer.SplitUtteranceIntoTasks(output_to_cpu, input_features, ifeat, + NULL, online_ivector_period, &ntasks); } - // create task list - computer.SplitUtteranceIntoTasks(output_to_cpu, input_features, ifeat, NULL, - online_ivector_period, &ntasks); // Add tasks to computer for (size_t j = 0; j < ntasks.size(); j++) { @@ -448,33 +477,37 @@ void BatchedThreadedNnet3CudaPipeline::ComputeBatchNnet( // Extract Posteriors for (int i = first; i < tasks.size(); i++) { TaskState &task = *tasks[i]; - CuMatrix &posteriors = task.task_data->posteriors; + std::shared_ptr &task_data = task.task_data; + CuMatrix &posteriors = task_data->posteriors; MergeTaskOutput(nnet_tasks[i], &posteriors); // nnet output is no longer necessary as we have copied the output out nnet_tasks[i].resize(0); // featurs are no longer needed so free memory - task.task_data->ivector_features.Resize(0); - task.task_data->input_features.Resize(0, 0); + task_data->ivector_features.Resize(0); + task_data->input_features.Resize(0, 0); } + + nvtxRangePop(); } // Computes Features for a single decode instance. -void BatchedThreadedNnet3CudaPipeline::ComputeOneFeature(TaskState *task_) { - nvtxRangePushA("ComputeOneFeature"); +void BatchedThreadedNnet3CudaPipeline::ComputeOneFeatureCPU(TaskState *task_) { + nvtxRangePushA("ComputeOneFeatureCPU"); TaskState &task = *task_; - Vector &ivector_features = task.task_data->ivector_features; - Matrix &input_features = task.task_data->input_features; + std::shared_ptr &task_data = task.task_data; + Vector &ivector_features = task_data->ivector_features_cpu; + Matrix &input_features = task_data->input_features_cpu; // create decoding state OnlineNnet2FeaturePipeline feature(*feature_info_); // Accept waveforms feature.AcceptWaveform( - task.task_data->sample_frequency, - SubVector(*task.task_data->wave_samples, 0, - task.task_data->wave_samples->Dim())); + task_data->sample_frequency, + SubVector(*task_data->wave_samples, 0, + task_data->wave_samples->Dim())); feature.InputFinished(); // All frames should be ready here int32 numFrames = feature.NumFramesReady(); @@ -487,7 +520,8 @@ void BatchedThreadedNnet3CudaPipeline::ComputeOneFeature(TaskState *task_) { std::vector frames(numFrames); // create list of frames - for (int j = 0; j < numFrames; j++) frames[j] = j; + for (int j = 0; j < numFrames; j++) + frames[j] = j; // Copy Features input_features.Resize(numFrames, input_dim); @@ -501,18 +535,106 @@ void BatchedThreadedNnet3CudaPipeline::ComputeOneFeature(TaskState *task_) { // Copy Features feature.IvectorFeature()->GetFrame(numFrames - 1, &ivector_features); } - nvtxRangePop(); AddTaskToPendingTaskQueue(task_); + + nvtxRangePop(); } +// Computes features across the tasks[first,tasks.size() +void BatchedThreadedNnet3CudaPipeline::ComputeBatchFeatures( + int32 first, std::vector &tasks, + OnlineCudaFeaturePipeline &feature_pipeline) { + KALDI_ASSERT(config_.gpu_feature_extract==true); + nvtxRangePushA("CopyBatchWaves"); + // below we will pack waves into a single buffer for efficient transfer across device + + // first count the total number of elements and create a single large vector + int count=0; + for (int i = first; i < tasks.size(); i++) { + count+=tasks[i]->task_data->wave_samples->Dim(); + } + + // creating a thread local vector of pinned memory. + // wave data will be stagged through this memory to get + // more efficient non-blocking transfers to the device. + thread_local Vector pinned_vector; + + if (pinned_vector.Dim() < count ) { + if ( pinned_vector.Dim()!=0) { + cudaHostUnregister(pinned_vector.Data()); + } + // allocated array 2x size + pinned_vector.Resize(count*2,kUndefined); + cudaHostRegister(pinned_vector.Data(), pinned_vector.Dim()*sizeof(BaseFloat),0); + } + + // We will launch a thread for each task in order to get better host memory bandwidth + std::vector > futures; //for syncing + + //vector copy function for threading below. + auto copy_vec = [](SubVector &dst, const SubVector &src) { + nvtxRangePushA("CopyVec"); + dst.CopyFromVec(src); + nvtxRangePop(); + }; + + // next launch threads to copy all waves for each task in parallel + count=0; + for (int i = first; i < tasks.size(); i++) { + std::shared_ptr &task_data = tasks[i]->task_data; + SubVector wave(pinned_vector,count,task_data->wave_samples->Dim()); + count+=task_data->wave_samples->Dim(); + futures.push_back( + work_pool_->enqueue(copy_vec, wave, *(task_data->wave_samples)) + ); + } + + // wait for waves to be copied into place + for (int i = 0; i < futures.size(); i++) { + futures[i].get(); + } + + CuVector cu_waves(count, kUndefined); + // copy memory down asynchronously. Vector copy functions are synchronous so we do it manually. + // It is important for this to happen asynchrously to help hide launch latency of smaller kernels + // that come in the future. + cudaMemcpyAsync(cu_waves.Data(), pinned_vector.Data(), cu_waves.Dim()*sizeof(BaseFloat), + cudaMemcpyHostToDevice, cudaStreamPerThread); + nvtxRangePop(); + + nvtxRangePushA("ComputeBatchFeatures"); + // extract features for each wave + count=0; + for (int i = first; i < tasks.size(); i++) { + TaskState &task = *tasks[i]; + std::shared_ptr &task_data = task.task_data; + + CuSubVector cu_wave(cu_waves,count,task_data->wave_samples->Dim()); + count+=task_data->wave_samples->Dim(); + feature_pipeline.ComputeFeatures(cu_wave, task_data->sample_frequency, + &task_data->input_features, &task_data->ivector_features); + + int32 numFrames = task_data->input_features.NumRows(); + + if (numFrames == 0) { + //Make this a warning for now. Need to check how this is handled + KALDI_WARN << "Warning empty audio file"; + } + } + nvtxRangePop(); +} + + + // Allocates decodables for tasks in the range of tasks[first,tasks.size()) void BatchedThreadedNnet3CudaPipeline::AllocateDecodables( int32 first, std::vector &tasks, std::vector &decodables) { // Create mapped decodable here for (int i = first; i < tasks.size(); i++) { - CuMatrix &posteriors = tasks[i]->task_data->posteriors; + std::shared_ptr &task_data = tasks[i]->task_data; + CuMatrix &posteriors = task_data->posteriors; decodables.push_back( new DecodableCuMatrixMapped(*trans_model_, posteriors, 0)); } @@ -666,6 +788,8 @@ void BatchedThreadedNnet3CudaPipeline::ExecuteWorker(int threadId) { nnet3::NnetBatchComputer computer(config_.compute_opts, am_nnet_->GetNnet(), am_nnet_->Priors()); + OnlineCudaFeaturePipeline feature_pipeline(config_.feature_opts); + ChannelState channel_state; std::vector tasks; // The state for each decode @@ -713,10 +837,12 @@ void BatchedThreadedNnet3CudaPipeline::ExecuteWorker(int threadId) { // New tasks are now in the in tasks[start,tasks.size()) if (start != tasks.size()) { // if there are new tasks + if (config_.gpu_feature_extract) + ComputeBatchFeatures(start, tasks, feature_pipeline); ComputeBatchNnet(computer, start, tasks); AllocateDecodables(start, tasks, decodables); } - } // end if(tasks_front_!=tasks_back_) + } // end if (tasks_front_!=tasks_back_) // check if there is no active work on this thread. // This can happen if another thread was assigned the work. diff --git a/src/cudadecoder/batched-threaded-nnet3-cuda-pipeline.h b/src/cudadecoder/batched-threaded-nnet3-cuda-pipeline.h index 57c396be9eb..6401b24b7db 100644 --- a/src/cudadecoder/batched-threaded-nnet3-cuda-pipeline.h +++ b/src/cudadecoder/batched-threaded-nnet3-cuda-pipeline.h @@ -27,6 +27,7 @@ #include "lat/determinize-lattice-pruned.h" #include "nnet3/nnet-batch-compute.h" #include "online2/online-nnet2-feature-pipeline.h" +#include "cudafeat/online-cuda-feature-pipeline.h" #include "thread-pool.h" // If num_channels sets to automatic, @@ -53,7 +54,8 @@ struct BatchedThreadedNnet3CudaPipelineConfig { num_worker_threads(20), determinize_lattice(true), max_pending_tasks(4000), - num_decoder_copy_threads(2){}; + num_decoder_copy_threads(2), + gpu_feature_extract(true) {}; void Register(OptionsItf *po) { po->Register("max-batch-size", &max_batch_size, "The maximum batch size to be used by the decoder. " @@ -88,6 +90,11 @@ struct BatchedThreadedNnet3CudaPipelineConfig { po->Register("cuda-decoder-copy-threads", &num_decoder_copy_threads, "Advanced - Number of worker threads used in the decoder for " "the host to host copies."); + po->Register("gpu-feature-extract", &gpu_feature_extract, + "Extract features on the GPU. This reduces CPU overhead " + "leading to better scalability but may reduce overall " + "performance for a single GPU."); + feature_opts.Register(po); decoder_opts.Register(po); det_opts.Register(po); @@ -101,6 +108,7 @@ struct BatchedThreadedNnet3CudaPipelineConfig { bool determinize_lattice; int max_pending_tasks; int num_decoder_copy_threads; + bool gpu_feature_extract; void ComputeConfig() { if (num_channels == -1) @@ -203,8 +211,10 @@ class BatchedThreadedNnet3CudaPipeline { wave_samples; // Used as a pointer to either the raw // data or the samples passed float sample_frequency; - Vector ivector_features; - Matrix input_features; + Vector ivector_features_cpu; + Matrix input_features_cpu; + CuVector ivector_features; + CuMatrix input_features; CuMatrix posteriors; TaskData(const WaveData &wave_data_in) @@ -289,7 +299,12 @@ class BatchedThreadedNnet3CudaPipeline { std::vector &tasks); // Computes Features for a single decode instance. - void ComputeOneFeature(TaskState *task); + void ComputeOneFeatureCPU(TaskState *task); + + // Computes features across the tasks[first,tasks.size() + void ComputeBatchFeatures(int32 first, + std::vector &tasks, + OnlineCudaFeaturePipeline &feature_pipeline); // Computes Nnet across the current decode batch void ComputeBatchNnet(nnet3::NnetBatchComputer &computer, int32 first,