All Distance Sketch  0.1
All distance sketch based algorithms
sketch_calculation.h
Go to the documentation of this file.
1 #ifndef SRC_ALGORITHMS_SKETCH_CALCULATION_H_
2 #define SRC_ALGORITHMS_SKETCH_CALCULATION_H_
3 
4 #include "../common.h"
5 #include "../sketch/graph_sketch.h"
6 #include "../graph/graph.h"
7 #include "../utils/thread_utils.h"
8 
10 
15 namespace all_distance_sketch {
16 
29 template <class T>
31  GraphSketch * graph_sketch,
32  unsigned int num_threads = 5,
33  double increase_factor = 1.1);
34 
41 template <class T>
42 static void CalculateGraphSketch(graph::Graph<T> *graph,
43  GraphSketch * graph_sketch);
44 
47 struct PrunningAlgoStatistics;
48 
49 /*
50 * Forward declaration
51 */
52 
53 template <class T>
54 static void AssignTask(unsigned int start_index,
55  unsigned int end_index,
56  bool insert_to_candidate_list,
57  bool clear_candidate_list,
58  thread::MessageChannel * communication_channel,
59  int num_threads);
60 
61 template <class T>
62 static void ThreadLoop(thread::MessageChannel * communication_channel,
63  graph::Graph<T> *graph,
64  GraphSketch * graph_sketch,
65  SketchDijkstraCallBacks<T>* call_backs,
66  DijkstraParams * dijkstra_param,
67  unsigned int id);
68 
69 template <class T>
70 static void CalculateSketchBatch(graph::Graph<T> *graph,
71  SketchDijkstraCallBacks<T>* call_backs,
72  DijkstraParams * dijkstra_param,
73  const std::vector<NodeDistanceIdRandomIdData> * distribution,
74  unsigned int start_index,
75  unsigned int end_index);
76 
77 template <class T>
78 static void CalculateNodeSketch(typename T::TNode source,
79  graph::Graph<T> *graph,
80  SketchDijkstraCallBacks<T>* call_backs,
81  DijkstraParams * param);
82 
83 
84 
85 template <class T>
86 static void CalculateNodeSketch(typename T::TNode source,
87  graph::Graph<T> *graph,
88  SketchDijkstraCallBacks<T>* call_backs,
89  DijkstraParams * param) {
90  PrunedDijkstra<T, SketchDijkstraCallBacks<T> >(source,
91  graph,
92  call_backs,
93  param);
94 }
95 
96 template <class T>
97 static void CalculateGraphSketch(graph::Graph<T> *graph,
98  GraphSketch * graph_sketch) {
99  // The vector is sorted
100  DijkstraParams param;
101  SketchDijkstraCallBacks<T> call_backs;
102  call_backs.InitSketchDijkstraCallBacks(graph_sketch);
103  const std::vector<NodeDistanceIdRandomIdData> * distribution = graph_sketch->GetNodesDistribution();
104  for (unsigned int i=0; i < distribution->size(); i++) {
105  typename T::TNode source((*distribution)[i].GetNId());
106  if (graph->IsNode((*distribution)[i].GetNId())) {
107  CalculateNodeSketch<T>(source, graph, &call_backs, &param);
108  }
109  }
110  graph_sketch->CalculateAllDistanceNeighborhood();
111 }
112 
113 static int GetBatchSize(int numthreads, int approxSize) {
114  int mod = approxSize % numthreads;
115  if (mod == 0) {
116  return approxSize;
117  }
118  return approxSize + (numthreads - mod);
119 }
120 
121 template <class T>
122 static void ThreadLoop(thread::MessageChannel * communication_channel,
123  graph::Graph<T> *graph,
124  GraphSketch * graph_sketch,
125  SketchDijkstraCallBacks<T>* call_backs,
126  DijkstraParams * dijkstra_param,
127  unsigned int id) {
128  while (communication_channel->get_should_stop() == false) {
129  thread::Message message;
130  if (communication_channel->GetMessage(&message, id)) {
131  if (message.insert_to_candidate_list) {
132  LOG_M(DEBUG2, "has work start_index " << message.start_index << " end_index " << message.end_index);
133  CalculateSketchBatch<T>(graph, call_backs, dijkstra_param, graph_sketch->GetNodesDistribution(), message.start_index, message.end_index);
134  }
135  if (message.clear_candidate_list) {
136  graph_sketch->InsertCandidatesNodes(message.start_index, message.end_index);
137  }
138  communication_channel->Finished(id);
139  LOG_M(DEBUG2, "Finished work start_index " << message.start_index << " end_index " << message.end_index);
140  }
141  }
142  LOG_M(DEBUG2, " Exiting Finished ");
143 }
144 
145 template <class T>
147  GraphSketch * graph_sketch,
148  unsigned int num_threads,
149  double increase_factor) {
150  const std::vector<NodeDistanceIdRandomIdData> * distribution = graph_sketch->GetNodesDistribution();
151  std::vector<std::thread> threads;
152  std::vector<DijkstraParams> params;
153  std::vector<SketchDijkstraCallBacks<T>* > call_backs;
154  call_backs.resize(num_threads);
155  params.resize(num_threads);
156  thread::ModuloLock lock;
157  lock.InitModuloLock();
158  thread::MessageChannel communication_channel;
159  communication_channel.InitMessageChannel(num_threads);
160  for (int i=0; i < num_threads; ++i) {
161  call_backs[i] = new SketchDijkstraCallBacks<T>();
162  call_backs[i]->InitSketchDijkstraCallBacks(graph_sketch);
163  call_backs[i]->set_multi_threaded_params(true, &lock);
164  }
165  for (unsigned int i=0; i < num_threads; i++) {
166  threads.push_back( std::thread( ThreadLoop<T>,
167  &communication_channel,
168  graph,
169  graph_sketch,
170  (call_backs[i]),
171  &(params[i]),
172  i) );
173 
174  }
175  for (unsigned int i=0; i < threads.size(); i++) {
176  while (threads[i].joinable() == false) {}
177  }
178 
179  LOG_M(DEBUG2, "Num nodes " << distribution->size() );
180  unsigned int batch_size = GetBatchSize(num_threads, graph_sketch->GetK());
181  for (unsigned int i=0; i < distribution->size();) {
182  int batchEnd = i + batch_size;
183  if (batchEnd > distribution->size()) {
184  batchEnd = distribution->size() + 1;
185  }
186  AssignTask<T>(i, batchEnd, true, false, &communication_channel, num_threads);
187 
188  AssignTask<T>(0, distribution->size() + 1, false, true,
189  &communication_channel, num_threads);
190 
191  i = i + batch_size;
192  batch_size *= increase_factor;
193  batch_size = GetBatchSize(num_threads, batch_size);
194  }
195  // Ask all to stop
196  communication_channel.stop();
197  for (unsigned int i=0; i < threads.size(); i++) {
198  threads[i].join();
199  }
200  graph_sketch->CalculateAllDistanceNeighborhood();
201 }
202 
203 template <class T>
204 static void AssignTask(unsigned int start_index,
205  unsigned int end_index,
206  bool insert_to_candidate_list,
207  bool clear_candidate_list,
208  thread::MessageChannel * communication_channel,
209  int num_threads) {
210  unsigned int workPerThread = (end_index - start_index) / num_threads;
211  std::vector<std::thread> threads;
212  LOG_M(DEBUG1, " Allocating work " << start_index << " end=" << end_index);
213  for (unsigned int i=start_index, threadIndex=0; i < end_index; i += workPerThread, threadIndex++) {
214  communication_channel->AddBatch(i, i+workPerThread, insert_to_candidate_list, clear_candidate_list);
215  }
216  while (communication_channel->AllFinished() == false) {}
217  LOG_M(DEBUG1, "Finished Allocating work " << start_index << " end=" << end_index);
218 }
219 
220 template <class T>
221 static void CalculateSketchBatch(graph::Graph<T> *graph,
222  SketchDijkstraCallBacks<T>* call_backs,
223  DijkstraParams * dijkstra_param,
224  const std::vector<NodeDistanceIdRandomIdData> * distribution,
225  unsigned int start_index,
226  unsigned int end_index) {
227  for (unsigned int i=start_index; (i < distribution->size() && i < end_index ); i++) {
228  typename T::TNode source((*distribution)[i].GetNId());
229  if (graph->IsNode((*distribution)[i].GetNId())) {
230  CalculateNodeSketch<T>(source, graph, call_backs, dijkstra_param);
231  }
232  }
233 }
234 
238 } // namespace all_distance_sketch
239 #endif // SRC_ALGORITHMS_SKETCH_CALCULATION_H_
Definition: common.h:53
static void CalculateGraphSketch(graph::Graph< T > *graph, GraphSketch *graph_sketch)
Calculate the bottom K single threaded.
Optimized version of Dijkstra algorithm for the sketch calculation algorithm.
static void CalculateGraphSketchMultiCore(graph::Graph< T > *graph, GraphSketch *graph_sketch, unsigned int num_threads=5, double increase_factor=1.1)
Calculate the bottom K graph sketch in parallel This implementation is faster than the single threade...
Graph data structure Thin wrapper over SNAP graph.
Definition: graph.h:107
Data structure for the graph sketch.
Definition: graph_sketch.h:17