ThreadSynchronizer Class Reference

class ThreadSynchronizer acts to guard an arbitrary type of buffer between a producing and a consuming thread (note: it's all symmetric between the two thread types). More...

#include <online-nnet2-decoding-threaded.h>

Collaboration diagram for ThreadSynchronizer:

Public Types

enum  ThreadType { kProducer, kConsumer }
 

Public Member Functions

 ThreadSynchronizer ()
 
bool Lock (ThreadType t)
 
bool UnlockSuccess (ThreadType t)
 
bool UnlockFailure (ThreadType t)
 
void SetAbort ()
 
 ~ThreadSynchronizer ()
 

Private Member Functions

 KALDI_DISALLOW_COPY_AND_ASSIGN (ThreadSynchronizer)
 

Private Attributes

bool abort_
 
bool producer_waiting_
 
bool consumer_waiting_
 
std::mutex mutex_
 
ThreadType held_by_
 
Semaphore producer_semaphore_
 
Semaphore consumer_semaphore_
 
int32 num_errors_
 

Detailed Description

class ThreadSynchronizer acts to guard an arbitrary type of buffer between a producing and a consuming thread (note: it's all symmetric between the two thread types).

It has a similar interface to a mutex, except that instead of just Lock and Unlock, it has Lock, UnlockSuccess and UnlockFailure, and each function takes an argument kProducer or kConsumer to identify whether the producing or consuming thread is waiting.

The basic concept is that you lock the object; and if you discover the you're blocked because you're either trying to read an empty buffer or trying to write to a full buffer, you unlock with UnlockFailure; and this will cause your next call to Lock to block until the *other* thread has called Lock and then UnlockSuccess. However, if at that point the other thread calls Lock and then UnlockFailure, it is an error because you can't have both producing and consuming threads claiming that the buffer is full/empty. If you lock the object and were successful you call UnlockSuccess; and you call UnlockSuccess even if, for your own reasons, you ended up not changing the state of the buffer.

Definition at line 65 of file online-nnet2-decoding-threaded.h.

Member Enumeration Documentation

◆ ThreadType

Constructor & Destructor Documentation

◆ ThreadSynchronizer()

◆ ~ThreadSynchronizer()

Definition at line 99 of file online-nnet2-decoding-threaded.cc.

99  {
100 }

Member Function Documentation

◆ KALDI_DISALLOW_COPY_AND_ASSIGN()

KALDI_DISALLOW_COPY_AND_ASSIGN ( ThreadSynchronizer  )
private

◆ Lock()

bool Lock ( ThreadType  t)

Definition at line 37 of file online-nnet2-decoding-threaded.cc.

References ThreadSynchronizer::abort_, ThreadSynchronizer::consumer_semaphore_, ThreadSynchronizer::held_by_, ThreadSynchronizer::kProducer, ThreadSynchronizer::mutex_, ThreadSynchronizer::producer_semaphore_, and Semaphore::Wait().

Referenced by SingleUtteranceNnet2DecoderThreaded::AcceptWaveform(), SingleUtteranceNnet2DecoderThreaded::FeatureComputation(), SingleUtteranceNnet2DecoderThreaded::InputFinished(), SingleUtteranceNnet2DecoderThreaded::NumWaveformPiecesPending(), SingleUtteranceNnet2DecoderThreaded::RunDecoderSearchInternal(), and SingleUtteranceNnet2DecoderThreaded::RunNnetEvaluationInternal().

37  {
38  if (abort_)
39  return false;
42  } else {
44  }
45  if (abort_)
46  return false;
47  mutex_.lock();
48  held_by_ = t;
49  if (abort_) {
50  mutex_.unlock();
51  return false;
52  } else {
53  return true;
54  }
55 }
void Wait()
decrease the counter

◆ SetAbort()

void SetAbort ( )

Definition at line 91 of file online-nnet2-decoding-threaded.cc.

References ThreadSynchronizer::abort_, ThreadSynchronizer::consumer_semaphore_, ThreadSynchronizer::producer_semaphore_, and Semaphore::Signal().

Referenced by SingleUtteranceNnet2DecoderThreaded::AbortAllThreads().

91  {
92  abort_ = true;
93  // we signal the semaphores just in case someone was waiting on either of
94  // them.
97 }
void Signal()
increase the counter

◆ UnlockFailure()

bool UnlockFailure ( ThreadType  t)

Definition at line 76 of file online-nnet2-decoding-threaded.cc.

References ThreadSynchronizer::abort_, ThreadSynchronizer::consumer_waiting_, ThreadSynchronizer::held_by_, KALDI_ASSERT, ThreadSynchronizer::kProducer, ThreadSynchronizer::mutex_, and ThreadSynchronizer::producer_waiting_.

Referenced by SingleUtteranceNnet2DecoderThreaded::FeatureComputation(), SingleUtteranceNnet2DecoderThreaded::RunDecoderSearchInternal(), and SingleUtteranceNnet2DecoderThreaded::RunNnetEvaluationInternal().

76  {
77 
78  KALDI_ASSERT(held_by_ == t && "Code error: unlocking a mutex you don't hold.");
79 
81  KALDI_ASSERT(!producer_waiting_ && "code error.");
82  producer_waiting_ = true;
83  } else {
84  KALDI_ASSERT(!consumer_waiting_ && "code error.");
85  consumer_waiting_ = true;
86  }
87  mutex_.unlock();
88  return !abort_;
89 }
#define KALDI_ASSERT(cond)
Definition: kaldi-error.h:185

◆ UnlockSuccess()

bool UnlockSuccess ( ThreadType  t)

Definition at line 57 of file online-nnet2-decoding-threaded.cc.

References ThreadSynchronizer::abort_, ThreadSynchronizer::consumer_semaphore_, ThreadSynchronizer::consumer_waiting_, ThreadSynchronizer::kProducer, ThreadSynchronizer::mutex_, ThreadSynchronizer::producer_semaphore_, ThreadSynchronizer::producer_waiting_, and Semaphore::Signal().

Referenced by SingleUtteranceNnet2DecoderThreaded::AcceptWaveform(), SingleUtteranceNnet2DecoderThreaded::FeatureComputation(), SingleUtteranceNnet2DecoderThreaded::InputFinished(), SingleUtteranceNnet2DecoderThreaded::NumWaveformPiecesPending(), SingleUtteranceNnet2DecoderThreaded::RunDecoderSearchInternal(), and SingleUtteranceNnet2DecoderThreaded::RunNnetEvaluationInternal().

Member Data Documentation

◆ abort_

◆ consumer_semaphore_

◆ consumer_waiting_

bool consumer_waiting_
private

◆ held_by_

ThreadType held_by_
private

◆ mutex_

◆ num_errors_

int32 num_errors_
private

Definition at line 107 of file online-nnet2-decoding-threaded.h.

◆ producer_semaphore_

◆ producer_waiting_

bool producer_waiting_
private

The documentation for this class was generated from the following files: