@@ -47,15 +47,13 @@ TWeatherStation = class
4747 fname: string;
4848 weatherDictionary: TWeatherDictionaryLG;
4949 weatherStationList: TStringList;
50- procedure ReadMeasurementsInChunk ;
51- procedure ProcessChunk (const chunkData: pansichar; const dataSize: int64;
52- const chunkIndex: int64);
53- procedure ParseStationAndTempFromLine (const line: string);
50+ procedure ReadMeasurements ;
51+ procedure ParseStationAndTemp (const line: string);
5452 procedure AddCityTemperatureLG (const cityName: string; const newTemp: int64);
5553 procedure SortWeatherStationAndStats ;
5654 procedure PrintSortedWeatherStationAndStats ;
5755 public
58- constructor Create(filename: string);
56+ constructor Create(const filename: string);
5957 destructor Destroy; override;
6058 // The main algorithm to process the temp measurements from various weather stations
6159 procedure ProcessMeasurements ;
@@ -102,18 +100,18 @@ function RemoveDots(const line: string): string;
102100 end ;
103101end ;
104102
105- function RoundEx (x: currency): double; inline;
103+ function RoundEx (const x: currency): double; inline;
106104begin
107105 Result := Ceil(x * 10 ) / 10 ;
108106end ;
109107
110- function RoundExInteger (x: currency): integer; inline;
108+ function RoundExInteger (const x: currency): integer; inline;
111109begin
112110 Result := Ceil(x * 10 );
113111end ;
114112
115113{ Neater version by @bytebites from Lazarus forum }
116- function RoundExString (x: currency): string; inline;
114+ function RoundExString (const x: currency): string; inline;
117115var
118116 V, Q, R: integer;
119117begin
@@ -138,11 +136,10 @@ function TStat.ToString: string;
138136 minR := RoundEx(self.min / 10 );
139137 maxR := RoundEx(self.max / 10 );
140138 meanR := RoundEx(self.sum / self.cnt / 10 );
141- Result := FormatFloat(' 0.0' , minR) + ' /' + FormatFloat(' 0.0' , meanR) +
142- ' /' + FormatFloat(' 0.0' , maxR);
139+ Result := FormatFloat(' 0.0' , minR) + ' /' + FormatFloat(' 0.0' , meanR) + ' /' + FormatFloat(' 0.0' , maxR);
143140end ;
144141
145- constructor TWeatherStation.Create(filename: string);
142+ constructor TWeatherStation.Create(const filename: string);
146143begin
147144 // Assign filename
148145 fname := filename;
@@ -213,10 +210,12 @@ procedure TWeatherStation.AddCityTemperatureLG(const cityName: string;
213210 stat := self.weatherDictionary[cityName];
214211
215212 // If the temp lower then min, set the new min.
216- if newTemp < stat.min then stat.min := newTemp;
213+ if newTemp < stat.min then
214+ stat.min := newTemp;
217215
218216 // If the temp higher than max, set the new max.
219- if newTemp > stat.max then stat.max := newTemp;
217+ if newTemp > stat.max then
218+ stat.max := newTemp;
220219
221220 // Add count for this city.
222221 stat.sum := stat.sum + newTemp;
@@ -243,7 +242,7 @@ procedure TWeatherStation.AddCityTemperatureLG(const cityName: string;
243242 end ;
244243end ;
245244
246- procedure TWeatherStation.ParseStationAndTempFromLine (const line: string);
245+ procedure TWeatherStation.ParseStationAndTemp (const line: string);
247246var
248247 delimiterPos: integer;
249248 parsedStation, strTemp: string;
@@ -268,7 +267,6 @@ procedure TWeatherStation.ParseStationAndTempFromLine(const line: string);
268267 // in each iteration. Saved approx 20-30 seconds for 1 billion row.
269268 // Remove dots turns a float into an int.
270269 strTemp := RemoveDots(strTemp);
271- strTemp := StringReplace(strTemp, ' \n' , ' ' , [rfReplaceAll]);
272270
273271 // Add the weather station and the recorded temp (as int64) in the TDictionary
274272 Val(strTemp, parsedTemp, valCode);
@@ -279,115 +277,37 @@ procedure TWeatherStation.ParseStationAndTempFromLine(const line: string);
279277 end ;
280278end ;
281279
282- procedure TWeatherStation.ProcessChunk ( const chunkData: pansichar; const dataSize: int64; const chunkIndex: int64) ;
280+ procedure TWeatherStation.ReadMeasurements ;
283281var
284- bufferStream: TMemoryStream ;
282+ fileStream: TFileStream ;
285283 streamReader: TStreamReader;
286284 line: string;
287285begin
288286
289- { $IFDEF DEBUG}
290- WriteLn(' Processing chunk: ' , inttostr(chunkIndex), ' .' );
291- { $ENDIF DEBUG}
292-
293- // Create a memory stream from the buffer
294- bufferStream := TMemoryStream.Create;
295- try
296- { Write buffer to a stream, only up to specified data size!
297- This ensures we parse the data until we reach to last `\n` character in
298- the chunk/buffer.}
299- bufferStream.Write(chunkData^, dataSize);
300- bufferStream.Position := 0 ;
301-
302- // Create a TStreamReader to read lines from the buffer
303- streamReader := TStreamReader.Create(bufferStream);
304- try
305- // Read lines until end of this buffer
306- while not streamReader.EOF do
307- begin
308- line := streamReader.ReadLine;
309- // Now, parse this line.
310- self.ParseStationAndTempFromLine(line);
311- end ;
312- finally
313- streamReader.Free;
314- end ;
315- finally
316- bufferStream.Free;
317- end ;
318- end ;
319-
320- procedure TWeatherStation.ReadMeasurementsInChunk ;
321- var
322- fileStream: TFileStream;
323- buffer: pansichar;
324- bytesRead, TotalBytesRead: int64;
325- lineBreakPos: int64;
326- chunkIndex: int64;
327- chunkSize: int64 = 1073741824 ; // 1 GB in bytes
328- begin
329-
330- // Set buffer size here, not too big.
331- chunkSize := chunkSize * 1 ;
332-
333287 // Open the file for reading
334- fileStream := TFileStream.Create(self.fname, fmOpenRead);
288+ fileStream := TFileStream.Create(self.fname, fmOpenRead or fmShareDenyNone );
335289 try
336- // Allocate memory buffer for reading chunks
337- GetMem(buffer, chunkSize);
290+ streamReader := TStreamReader.Create(fileStream);
338291 try
339- totalBytesRead := 0 ;
340- chunkIndex := 0 ;
341-
342292 // Read and parse chunks of data until EOF -------------------------------
343- while totalBytesRead < fileStream.Size do
293+ while not streamReader.EOF do
344294 begin
345- bytesRead := fileStream.Read(buffer^, chunkSize);
346- Inc(TotalBytesRead, BytesRead);
347-
348- // Find the position of the last newline character in the chunk
349- LineBreakPos := BytesRead;
350- while (LineBreakPos > 0 ) and (Buffer[LineBreakPos - 1 ] <> #10 ) do
351- Dec(LineBreakPos);
352-
353- { Now, must ensure that if the last byte read in the current chunk
354- is not a newline character, the file pointer is moved back to include
355- that byte and any preceding bytes of the partial line in the next
356- chunk's read operation.
357-
358- Also, no need to update the BytesRead variable in this context because
359- it represents the actual number of bytes read from the file, including
360- any partial line that may have been included due to moving the file
361- pointer back.
362- Ref: https://www.freepascal.org/docs-html/rtl/classes/tstream.seek.html}
363- if lineBreakPos < bytesRead then
364- fileStream.Seek(-(bytesRead - lineBreakPos), soCurrent);
365-
366- // Write the chunk data to a file using the separate procedure
367- ProcessChunk(buffer, lineBreakPos, chunkIndex);
368-
369- { $IFDEF DEBUG}
370- // Display user feedback
371- WriteLn(' Chunk ' , ChunkIndex, ' , Total bytes read:' , IntToStr(totalBytesRead));
372- { $ENDIF DEBUG}
373-
374- // Increase chunk index - a counter
375- Inc(chunkIndex);
295+ line := streamReader.ReadLine;
296+ self.ParseStationAndTemp(line);
376297 end ;// End of read and parse chunks of data ------------------------------
377298 finally
378- // Free the memory buffer
379- FreeMem(buffer);
299+ streamReader.Free;
380300 end ;
381301 finally
382302 // Close the file
383- FileStream .Free;
303+ fileStream .Free;
384304 end ;
385305end ;
386306
387307// The main algorithm
388308procedure TWeatherStation.ProcessMeasurements ;
389309begin
390- self.ReadMeasurementsInChunk ;
310+ self.ReadMeasurements ;
391311 self.SortWeatherStationAndStats;
392312 self.PrintSortedWeatherStationAndStats;
393313end ;
0 commit comments