All Distance Sketch  0.1
All distance sketch based algorithms
utils.h
1 #ifndef ALL_DISTANCE_SKETCH_SRC_SKETCH_UTILS_H_
2 #define ALL_DISTANCE_SKETCH_SRC_SKETCH_UTILS_H_
3 
4 #include <iostream>
5 #include <fstream>
6 #include <sys/stat.h>
7 #include <sys/types.h>
8 #include <fcntl.h>
9 
10 #include <glob.h>
11 #include <google/protobuf/io/coded_stream.h>
12 #include <google/protobuf/io/zero_copy_stream_impl.h>
13 #include "../common.h"
14 
15 using namespace all_distance_sketch;
16 using namespace std;
17 using namespace boost;
18 using namespace google::protobuf::io;
19 
20 static const int kMaxFileSize = 256000000;
21 
22 inline std::vector<std::string> glob(const std::string& pat){
23  using namespace std;
24  glob_t glob_result;
25  glob(pat.c_str(),GLOB_TILDE,NULL,&glob_result);
26  vector<string> ret;
27  for(unsigned int i=0;i<glob_result.gl_pathc;++i){
28  ret.push_back(string(glob_result.gl_pathv[i]));
29  }
30  globfree(&glob_result);
31  return ret;
32 }
33 
34 bool WriteMessage(const google::protobuf::MessageLite& message,
35  CodedOutputStream* coded_output,
36  int* size_written) {
37  const int size = message.ByteSize();
38  coded_output->WriteVarint32(size);
39 
40  uint8_t* buffer = coded_output->GetDirectBufferForNBytesAndAdvance(size);
41  if (buffer != NULL) {
42  // Optimization: The message fits in one buffer, so use the faster
43  // direct-to-array serialization path.
44  message.SerializeWithCachedSizesToArray(buffer);
45  } else {
46  // Slightly-slower path when the message is multiple buffers.
47  message.SerializeWithCachedSizes(coded_output);
48  if (coded_output->HadError()) return false;
49  }
50  (*size_written) += size;
51  return true;
52 }
53 
54 bool ReadMessage(google::protobuf::MessageLite* message,
55  google::protobuf::io::CodedInputStream* coded_input) {
56  // Read the size.
57  uint32_t size;
58  if (!coded_input->ReadVarint32(&size)) return false;
59 
60  // Tell the stream not to read beyond that size.
61  google::protobuf::io::CodedInputStream::Limit limit =
62  coded_input->PushLimit(size);
63 
64  // Parse the message.
65  if (!message->MergeFromCodedStream(coded_input)) return false;
66  if (!coded_input->ConsumedEntireMessage()) return false;
67 
68  // Release the limit.
69  coded_input->PopLimit(limit);
70 
71  return true;
72 }
73 
74 void OpenFileWrite(const std::string& file_name,
75  int* fd,
76  ZeroCopyOutputStream** raw_output,
77  CodedOutputStream** coded_output) {
78  (*fd) = open(file_name.c_str(), O_CREAT | O_RDWR);
79  chmod(file_name.c_str(), 777);
80  (*raw_output) = new FileOutputStream(*fd);
81  (*coded_output) = new CodedOutputStream(*raw_output);
82 }
83 
84 
85 void CloseFile(int* fd,
86  ZeroCopyOutputStream* raw_output,
87  CodedOutputStream* coded_output) {
88  delete coded_output;
89  delete raw_output;
90  close(*fd);
91 }
92 
93 void DumpGraphSketchToFile(const AllDistanceSketchGpb& graph_sketch,
94  std::string output_file) {
95  int fd;
96  ZeroCopyOutputStream* raw_output = NULL;
97  CodedOutputStream* coded_output = NULL;
98  output_file += "_";
99  std::string file_name = output_file + std::to_string(0);
100  OpenFileWrite(file_name, &fd, &raw_output, &coded_output);
101  int size_written = 0;
102  int num_files = 1;
103  coded_output->WriteLittleEndian32(1);
104  WriteMessage(graph_sketch.configuration(), coded_output, &size_written);
105  for (int i=0; i < graph_sketch.nodes_sketches_size(); i++) {
106  coded_output->WriteLittleEndian32(2);
107  WriteMessage(graph_sketch.nodes_sketches(i), coded_output, &size_written);
108  if (size_written >= kMaxFileSize) {
109  CloseFile(&fd, raw_output, coded_output);
110  file_name = output_file + std::to_string(num_files);
111  num_files += 1;
112  size_written = 0;
113  OpenFileWrite(file_name, &fd, &raw_output, &coded_output);
114  }
115  }
116  for (int i=0; i < graph_sketch.node_thresholds_size(); i++) {
117  coded_output->WriteLittleEndian32(3);
118  WriteMessage(graph_sketch.node_thresholds(i), coded_output, &size_written);
119  if (size_written >= kMaxFileSize) {
120  CloseFile(&fd, raw_output, coded_output);
121  file_name = output_file + std::to_string(num_files);
122  num_files += 1;
123  size_written = 0;
124  OpenFileWrite(output_file, &fd, &raw_output, &coded_output);
125  }
126  }
127  CloseFile(&fd, raw_output, coded_output);
128 }
129 
130 void LoadGraphSketchFromFiles(AllDistanceSketchGpb* graph_sketch,
131  std::string file) {
132  std::string pattern = file + "_[0-9]*";
133  auto files = glob(pattern);
134  for (const auto file_name : files) {
135  std::cout << file_name << std::endl;
136  int fd = open(file_name.c_str(), O_RDONLY);
137  ZeroCopyInputStream* raw_input = new FileInputStream(fd);
138  CodedInputStream* coded_input = new CodedInputStream(raw_input);
139  coded_input->SetTotalBytesLimit(kMaxFileSize*2, kMaxFileSize*2);
140  while (coded_input->ExpectAtEnd() == false) {
141  uint32_t message_number;
142  if (coded_input->ReadLittleEndian32(&message_number) == false) {
143  std::cout << "Unable to read! num=" << message_number << std::endl;
144  break;
145  }
146  bool ableToRead = true;
147  if (message_number == 1) {
148  ableToRead = ReadMessage(graph_sketch->mutable_configuration(), coded_input);
149  std::cout << graph_sketch->configuration().DebugString() << std::endl;
150  }
151  if (message_number == 2) {
152  ableToRead = ReadMessage(graph_sketch->add_nodes_sketches(), coded_input);
153  }
154  if (message_number == 3) {
155  ableToRead = ReadMessage(graph_sketch->add_node_thresholds(), coded_input);
156  }
157  if (ableToRead == false) {
158  break;
159  }
160  }
161  }
162 }
163 
164 #endif // ALL_DISTANCE_SKETCH_SRC_SKETCH_UTILS_H_
Definition: common.h:53