All Distance Sketch  0.1
All distance sketch based algorithms
thread_utils.h
1 #ifndef ALL_DISTANCE_SKETCH_ALL_DISTANCE_SKETCH_UTILS_THREAD_UTILS_H_
2 #define ALL_DISTANCE_SKETCH_ALL_DISTANCE_SKETCH_UTILS_THREAD_UTILS_H_
3 
4 #include "../common.h"
5 
6 namespace all_distance_sketch {
7 namespace thread {
8 
9 class ModuloLock {
10 public:
11  void InitModuloLock(unsigned int anum_locks_ = 1048576) {
12  locks_ = new std::mutex[anum_locks_];
13  num_locks_ = anum_locks_;
14  }
15  void Lock(unsigned int aObjectId) {
16  while ( (locks_ + (aObjectId % num_locks_))->try_lock() == false ) {
17  usleep(1000);
18  }
19  }
20  void UnLock(unsigned int aObjectId) {
21  (locks_ + (aObjectId % num_locks_))->unlock();
22  }
23  int GetNumCollitions() {
24  return 0;// numCollitions;
25  }
26  ~ModuloLock() {
27  delete [] locks_;
28  }
29 private:
30  std::mutex * locks_;
31  unsigned int num_locks_;
32  // std::atomic_uint numCollitions;
33 };
34 
35 struct Message {
36  Message() : start_index(0), end_index(0), insert_to_candidate_list(false), clear_candidate_list(false) {}
37  unsigned int start_index;
38  unsigned int end_index;
39  bool insert_to_candidate_list;
40  bool clear_candidate_list;
41 };
42 
43 class MessageChannel {
44 public:
45  void InitMessageChannel(unsigned int aNumThreads) {
46  shouldStop = false;
47  threadStatus.resize(aNumThreads, false);
48  }
49 
50  void stop() {
51  shouldStop = true;
52  }
53 
54  bool get_should_stop() {
55  return shouldStop;
56  }
57 
58  void AddBatch(unsigned int aStartIndex, unsigned int aEndIndex, bool aExec, bool aClean) {
59  locks_.lock();
60  Message m;
61  m.start_index = aStartIndex;
62  m.end_index = aEndIndex;
63  m.insert_to_candidate_list = aExec;
64  m.clear_candidate_list = aClean;
65  myMessages.push_back(m);
66  locks_.unlock();
67  }
68  bool GetMessage(Message * m, unsigned int id) {
69  bool ret;
70  locks_.lock();
71  if (myMessages.size() > 0) {
72  (*m) = myMessages.front();
73  myMessages.pop_front();
74  threadStatus[id] = true;
75  ret = true;
76  }else {
77  ret = false;
78  }
79  locks_.unlock();
80  return ret;
81  }
82 
83  bool AllFinished() {
84  bool ret = true;
85  locks_.lock();
86  if (myMessages.size() != 0) {
87  locks_.unlock();
88  return false;
89  }
90  for (unsigned int i=0; i < threadStatus.size(); i++) {
91  if (threadStatus[i] == true) {
92  ret = false;
93  }
94  }
95  locks_.unlock();
96  return ret;
97  }
98 
99  void Finished(unsigned int id) {
100  locks_.lock();
101  threadStatus[id] = false;
102  locks_.unlock();
103  }
104 
105 private:
106  std::list< Message > myMessages;
107  unsigned int end_index;
108  std::mutex locks_;
109  bool shouldStop;
110  std::vector<bool> threadStatus;
111 };
112 
113 } // namesapce thread
114 } // namespace all_distance_sketch
115 
116 #endif
Definition: common.h:53