1 #ifndef ALL_DISTANCE_SKETCH_SRC_SKETCH_UTILS_H_
2 #define ALL_DISTANCE_SKETCH_SRC_SKETCH_UTILS_H_
11 #include <google/protobuf/io/coded_stream.h>
12 #include <google/protobuf/io/zero_copy_stream_impl.h>
13 #include "../common.h"
17 using namespace boost;
20 static const int kMaxFileSize = 256000000;
22 inline std::vector<std::string> glob(
const std::string& pat){
25 glob(pat.c_str(),GLOB_TILDE,NULL,&glob_result);
27 for(
unsigned int i=0;i<glob_result.gl_pathc;++i){
28 ret.push_back(
string(glob_result.gl_pathv[i]));
30 globfree(&glob_result);
34 bool WriteMessage(
const google::protobuf::MessageLite& message,
35 CodedOutputStream* coded_output,
37 const int size = message.ByteSize();
38 coded_output->WriteVarint32(size);
40 uint8_t* buffer = coded_output->GetDirectBufferForNBytesAndAdvance(size);
44 message.SerializeWithCachedSizesToArray(buffer);
47 message.SerializeWithCachedSizes(coded_output);
48 if (coded_output->HadError())
return false;
50 (*size_written) += size;
54 bool ReadMessage(google::protobuf::MessageLite* message,
55 google::protobuf::io::CodedInputStream* coded_input) {
58 if (!coded_input->ReadVarint32(&size))
return false;
61 google::protobuf::io::CodedInputStream::Limit limit =
62 coded_input->PushLimit(size);
65 if (!message->MergeFromCodedStream(coded_input))
return false;
66 if (!coded_input->ConsumedEntireMessage())
return false;
69 coded_input->PopLimit(limit);
74 void OpenFileWrite(
const std::string& file_name,
76 ZeroCopyOutputStream** raw_output,
77 CodedOutputStream** coded_output) {
78 (*fd) = open(file_name.c_str(), O_CREAT | O_RDWR);
79 chmod(file_name.c_str(), 777);
80 (*raw_output) =
new FileOutputStream(*fd);
81 (*coded_output) =
new CodedOutputStream(*raw_output);
85 void CloseFile(
int* fd,
86 ZeroCopyOutputStream* raw_output,
87 CodedOutputStream* coded_output) {
93 void DumpGraphSketchToFile(
const AllDistanceSketchGpb& graph_sketch,
94 std::string output_file) {
96 ZeroCopyOutputStream* raw_output = NULL;
97 CodedOutputStream* coded_output = NULL;
99 std::string file_name = output_file + std::to_string(0);
100 OpenFileWrite(file_name, &fd, &raw_output, &coded_output);
101 int size_written = 0;
103 coded_output->WriteLittleEndian32(1);
104 WriteMessage(graph_sketch.configuration(), coded_output, &size_written);
105 for (
int i=0; i < graph_sketch.nodes_sketches_size(); i++) {
106 coded_output->WriteLittleEndian32(2);
107 WriteMessage(graph_sketch.nodes_sketches(i), coded_output, &size_written);
108 if (size_written >= kMaxFileSize) {
109 CloseFile(&fd, raw_output, coded_output);
110 file_name = output_file + std::to_string(num_files);
113 OpenFileWrite(file_name, &fd, &raw_output, &coded_output);
116 for (
int i=0; i < graph_sketch.node_thresholds_size(); i++) {
117 coded_output->WriteLittleEndian32(3);
118 WriteMessage(graph_sketch.node_thresholds(i), coded_output, &size_written);
119 if (size_written >= kMaxFileSize) {
120 CloseFile(&fd, raw_output, coded_output);
121 file_name = output_file + std::to_string(num_files);
124 OpenFileWrite(output_file, &fd, &raw_output, &coded_output);
127 CloseFile(&fd, raw_output, coded_output);
130 void LoadGraphSketchFromFiles(AllDistanceSketchGpb* graph_sketch,
132 std::string pattern = file +
"_[0-9]*";
133 auto files = glob(pattern);
134 for (
const auto file_name : files) {
135 std::cout << file_name << std::endl;
136 int fd = open(file_name.c_str(), O_RDONLY);
137 ZeroCopyInputStream* raw_input =
new FileInputStream(fd);
138 CodedInputStream* coded_input =
new CodedInputStream(raw_input);
139 coded_input->SetTotalBytesLimit(kMaxFileSize*2, kMaxFileSize*2);
140 while (coded_input->ExpectAtEnd() ==
false) {
141 uint32_t message_number;
142 if (coded_input->ReadLittleEndian32(&message_number) ==
false) {
143 std::cout <<
"Unable to read! num=" << message_number << std::endl;
146 bool ableToRead =
true;
147 if (message_number == 1) {
148 ableToRead = ReadMessage(graph_sketch->mutable_configuration(), coded_input);
149 std::cout << graph_sketch->configuration().DebugString() << std::endl;
151 if (message_number == 2) {
152 ableToRead = ReadMessage(graph_sketch->add_nodes_sketches(), coded_input);
154 if (message_number == 3) {
155 ableToRead = ReadMessage(graph_sketch->add_node_thresholds(), coded_input);
157 if (ableToRead ==
false) {
164 #endif // ALL_DISTANCE_SKETCH_SRC_SKETCH_UTILS_H_