1 #ifndef SRC_ALGORITHMS_SKETCH_CALCULATION_H_
2 #define SRC_ALGORITHMS_SKETCH_CALCULATION_H_
5 #include "../sketch/graph_sketch.h"
6 #include "../graph/graph.h"
7 #include "../utils/thread_utils.h"
32 unsigned int num_threads = 5,
33 double increase_factor = 1.1);
47 struct PrunningAlgoStatistics;
54 static void AssignTask(
unsigned int start_index,
55 unsigned int end_index,
56 bool insert_to_candidate_list,
57 bool clear_candidate_list,
58 thread::MessageChannel * communication_channel,
62 static void ThreadLoop(thread::MessageChannel * communication_channel,
65 SketchDijkstraCallBacks<T>* call_backs,
66 DijkstraParams * dijkstra_param,
71 SketchDijkstraCallBacks<T>* call_backs,
72 DijkstraParams * dijkstra_param,
73 const std::vector<NodeDistanceIdRandomIdData> * distribution,
74 unsigned int start_index,
75 unsigned int end_index);
78 static void CalculateNodeSketch(
typename T::TNode source,
80 SketchDijkstraCallBacks<T>* call_backs,
81 DijkstraParams * param);
86 static void CalculateNodeSketch(
typename T::TNode source,
88 SketchDijkstraCallBacks<T>* call_backs,
89 DijkstraParams * param) {
90 PrunedDijkstra<T, SketchDijkstraCallBacks<T> >(source,
100 DijkstraParams param;
101 SketchDijkstraCallBacks<T> call_backs;
102 call_backs.InitSketchDijkstraCallBacks(graph_sketch);
103 const std::vector<NodeDistanceIdRandomIdData> * distribution = graph_sketch->GetNodesDistribution();
104 for (
unsigned int i=0; i < distribution->size(); i++) {
105 typename T::TNode source((*distribution)[i].GetNId());
106 if (graph->IsNode((*distribution)[i].GetNId())) {
107 CalculateNodeSketch<T>(source, graph, &call_backs, ¶m);
110 graph_sketch->CalculateAllDistanceNeighborhood();
113 static int GetBatchSize(
int numthreads,
int approxSize) {
114 int mod = approxSize % numthreads;
118 return approxSize + (numthreads - mod);
122 static void ThreadLoop(thread::MessageChannel * communication_channel,
125 SketchDijkstraCallBacks<T>* call_backs,
126 DijkstraParams * dijkstra_param,
128 while (communication_channel->get_should_stop() ==
false) {
129 thread::Message message;
130 if (communication_channel->GetMessage(&message,
id)) {
131 if (message.insert_to_candidate_list) {
132 LOG_M(DEBUG2,
"has work start_index " << message.start_index <<
" end_index " << message.end_index);
133 CalculateSketchBatch<T>(graph, call_backs, dijkstra_param, graph_sketch->GetNodesDistribution(), message.start_index, message.end_index);
135 if (message.clear_candidate_list) {
136 graph_sketch->InsertCandidatesNodes(message.start_index, message.end_index);
138 communication_channel->Finished(
id);
139 LOG_M(DEBUG2,
"Finished work start_index " << message.start_index <<
" end_index " << message.end_index);
142 LOG_M(DEBUG2,
" Exiting Finished ");
148 unsigned int num_threads,
149 double increase_factor) {
150 const std::vector<NodeDistanceIdRandomIdData> * distribution = graph_sketch->GetNodesDistribution();
151 std::vector<std::thread> threads;
152 std::vector<DijkstraParams> params;
153 std::vector<SketchDijkstraCallBacks<T>* > call_backs;
154 call_backs.resize(num_threads);
155 params.resize(num_threads);
156 thread::ModuloLock lock;
157 lock.InitModuloLock();
158 thread::MessageChannel communication_channel;
159 communication_channel.InitMessageChannel(num_threads);
160 for (
int i=0; i < num_threads; ++i) {
161 call_backs[i] =
new SketchDijkstraCallBacks<T>();
162 call_backs[i]->InitSketchDijkstraCallBacks(graph_sketch);
163 call_backs[i]->set_multi_threaded_params(
true, &lock);
165 for (
unsigned int i=0; i < num_threads; i++) {
166 threads.push_back( std::thread( ThreadLoop<T>,
167 &communication_channel,
175 for (
unsigned int i=0; i < threads.size(); i++) {
176 while (threads[i].joinable() ==
false) {}
179 LOG_M(DEBUG2,
"Num nodes " << distribution->size() );
180 unsigned int batch_size = GetBatchSize(num_threads, graph_sketch->GetK());
181 for (
unsigned int i=0; i < distribution->size();) {
182 int batchEnd = i + batch_size;
183 if (batchEnd > distribution->size()) {
184 batchEnd = distribution->size() + 1;
186 AssignTask<T>(i, batchEnd,
true,
false, &communication_channel, num_threads);
188 AssignTask<T>(0, distribution->size() + 1,
false,
true,
189 &communication_channel, num_threads);
192 batch_size *= increase_factor;
193 batch_size = GetBatchSize(num_threads, batch_size);
196 communication_channel.stop();
197 for (
unsigned int i=0; i < threads.size(); i++) {
200 graph_sketch->CalculateAllDistanceNeighborhood();
204 static void AssignTask(
unsigned int start_index,
205 unsigned int end_index,
206 bool insert_to_candidate_list,
207 bool clear_candidate_list,
208 thread::MessageChannel * communication_channel,
210 unsigned int workPerThread = (end_index - start_index) / num_threads;
211 std::vector<std::thread> threads;
212 LOG_M(DEBUG1,
" Allocating work " << start_index <<
" end=" << end_index);
213 for (
unsigned int i=start_index, threadIndex=0; i < end_index; i += workPerThread, threadIndex++) {
214 communication_channel->AddBatch(i, i+workPerThread, insert_to_candidate_list, clear_candidate_list);
216 while (communication_channel->AllFinished() ==
false) {}
217 LOG_M(DEBUG1,
"Finished Allocating work " << start_index <<
" end=" << end_index);
222 SketchDijkstraCallBacks<T>* call_backs,
223 DijkstraParams * dijkstra_param,
224 const std::vector<NodeDistanceIdRandomIdData> * distribution,
225 unsigned int start_index,
226 unsigned int end_index) {
227 for (
unsigned int i=start_index; (i < distribution->size() && i < end_index ); i++) {
228 typename T::TNode source((*distribution)[i].GetNId());
229 if (graph->IsNode((*distribution)[i].GetNId())) {
230 CalculateNodeSketch<T>(source, graph, call_backs, dijkstra_param);
239 #endif // SRC_ALGORITHMS_SKETCH_CALCULATION_H_
static void CalculateGraphSketch(graph::Graph< T > *graph, GraphSketch *graph_sketch)
Calculate the bottom K single threaded.
Optimized version of Dijkstra algorithm for the sketch calculation algorithm.
static void CalculateGraphSketchMultiCore(graph::Graph< T > *graph, GraphSketch *graph_sketch, unsigned int num_threads=5, double increase_factor=1.1)
Calculate the bottom K graph sketch in parallel This implementation is faster than the single threade...
Graph data structure Thin wrapper over SNAP graph.
Definition: graph.h:107
Data structure for the graph sketch.
Definition: graph_sketch.h:17