-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathJoin.h
More file actions
129 lines (117 loc) · 3.36 KB
/
Join.h
File metadata and controls
129 lines (117 loc) · 3.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#ifndef JOIN_H
#define JOIN_H
#include "Operator.h"
#include "Record.h"
#include "FileWriter.h"
class Join : public Operator {
private:
FileWriter FW;
Operator* input1;
Operator* input2;
FileReader fr1;
FileReader fr2;
string Sel_attr1;
string Sel_attr2;
int rightrecIndex;
int leftrecIndex;
vector <Record> leftPage;
vector <Record> rightPage;
bool first_time = true;
bool materializin = false;
static inline int tempFileCount;
string tempfileName;
public:
Join(Operator *op1, Operator *op2, string pAttr1, string pAttr2)
{
input1 = op1;
input2 = op2;
Sel_attr1 = pAttr1;
Sel_attr2 = pAttr2;
materializin = true;
tempfileName = "temporary" + to_string(++tempFileCount) + ".txt";
}
Join(Operator *op, string fileName, string pAttr1, string pAttr2):
fr1(fileName)
{
input1 = op;
input2 = &fr1;
Sel_attr1 = pAttr1;
Sel_attr2 = pAttr2;
}
Join(string fileName1, string fileName2, string pAttr1, string pAttr2):
fr1(fileName1), fr2(fileName2)
{
input1 = &fr1;
input2 = &fr2;
Sel_attr1 = pAttr1;
Sel_attr2 = pAttr2;
}
void open() {
input1->open();
input2->open();
}
vector<Record> next() {
vector<Record> outPage;
if(first_time){
initialize_iterators();
if(materializin){
FW.open(tempfileName);
FW.WritePage(rightPage, true);
}
}
while(outPage.size() < pageSize){
advanceNextPairRec();
if(leftPage.empty()){
break;
}
Record leftrec = leftPage[leftrecIndex];
Record rightrec = rightPage[rightrecIndex];
if(leftrec.attributes_match(rightrec, Sel_attr1, Sel_attr2)){
Record concRec = leftrec.concatonaterecs(rightrec);
outPage.push_back(concRec);
}
}
return outPage;
}
void advanceNextPairRec(){
rightrecIndex++;
if(rightrecIndex == rightPage.size()){
rightrecIndex = 0;
leftrecIndex++;
if(leftrecIndex == leftPage.size()){
leftrecIndex = 0;
rightPage = input2->next();
if(!rightPage.empty() && materializin){
FW.WritePage(rightPage, false);
}
if(rightPage.empty()){
leftPage = input1->next();
if(leftPage.empty()){
return;
}
input2->close();
if(materializin){
materializin = false;
FW.close();
fr2.setFilename(tempfileName);
input2 = &fr2;
}
input2->open();
rightPage = input2->next();
}
}
}
}
void initialize_iterators(){
leftPage = input1->next();
rightPage = input2->next();
first_time = false;
rightrecIndex = 0;
leftrecIndex = 0;
}
void close() {
input1->close();
input2->close();
}
};
#endif