@@ -64,6 +64,9 @@ static int checkpoint_timeout(PGconn *backup_conn);
6464static void * StreamLog (void * arg );
6565static bool stop_streaming (XLogRecPtr xlogpos , uint32 timeline ,
6666 bool segment_finished );
67+ static void append_wal_segment (parray * filelist , uint32 timeline ,
68+ XLogRecPtr xlogpos , char * basedir ,
69+ uint32 xlog_seg_size );
6770
6871/*
6972 * Run IDENTIFY_SYSTEM through a given connection and
@@ -166,6 +169,8 @@ StreamLog(void *arg)
166169 */
167170 stream_arg -> startpos -= stream_arg -> startpos % instance_config .xlog_seg_size ;
168171
172+ xlog_files_list = parray_new ();
173+
169174 /* Initialize timeout */
170175 stream_stop_begin = 0 ;
171176
@@ -239,6 +244,18 @@ StreamLog(void *arg)
239244 elog (ERROR , "Problem in receivexlog" );
240245#endif
241246
247+ /* sort xlog_files_list */
248+ parray_qsort (xlog_files_list , pgFileCompareRelPathWithExternal );
249+
250+ append_wal_segment (xlog_files_list , stream_arg -> starttli ,
251+ stop_stream_lsn , (char * ) stream_arg -> basedir ,
252+ instance_config .xlog_seg_size );
253+
254+ /*
255+ * TODO: remove redundant WAL segments
256+ * walk pg_wal and remove files with segno greater that of stop_lsn`s segno +1
257+ */
258+
242259 elog (LOG , "finished streaming WAL at %X/%X (timeline %u)" ,
243260 (uint32 ) (stop_stream_lsn >> 32 ), (uint32 ) stop_stream_lsn , stream_arg -> starttli );
244261 stream_arg -> ret = 0 ;
@@ -275,46 +292,12 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
275292 /* we assume that we get called once at the end of each segment */
276293 if (segment_finished )
277294 {
278- XLogSegNo xlog_segno ;
279- char wal_segment_name [MAXFNAMELEN ];
280- char wal_segment_relpath [MAXPGPATH ];
281- char wal_segment_fullpath [MAXPGPATH ];
282- pgFile * file = NULL ;
283-
284- elog (VERBOSE , _ ("finished segment at %X/%X (timeline %u)" ),
295+ elog (INFO , _ ("finished segment at %X/%X (timeline %u)" ),
285296 (uint32 ) (xlogpos >> 32 ), (uint32 ) xlogpos , timeline );
286297
287- /* Add streamed xlog file into the backup's list of files */
288- if (!xlog_files_list )
289- xlog_files_list = parray_new ();
290-
291- GetXLogSegNo (xlogpos , xlog_segno , instance_config .xlog_seg_size );
292-
293- /*
294- * xlogpos points to the current segment, and we need the finished - previous one
295- * inless xlogpos points to not 0 offset in segment
296- */
297- if (WalSegmentOffset (xlogpos , instance_config .xlog_seg_size ) == 0 )
298- xlog_segno -- ;
299-
300- GetXLogFileName (wal_segment_name , timeline , xlog_segno ,
301- instance_config .xlog_seg_size );
302-
303- join_path_components (wal_segment_fullpath ,
304- stream_thread_arg .basedir , wal_segment_name );
305-
306- join_path_components (wal_segment_relpath ,
307- PG_XLOG_DIR , wal_segment_name );
308-
309- /* append file to filelist */
310- file = pgFileNew (wal_segment_fullpath , wal_segment_relpath , false, 0 , FIO_BACKUP_HOST );
311- file -> name = file -> rel_path ;
312- file -> crc = pgFileGetCRC (wal_segment_fullpath , true, false);
313-
314- /* Should we recheck it using stat? */
315- file -> write_size = instance_config .xlog_seg_size ;
316- file -> uncompressed_size = instance_config .xlog_seg_size ;
317- parray_append (xlog_files_list , file );
298+ append_wal_segment (xlog_files_list , timeline , xlogpos ,
299+ (char * ) stream_thread_arg .basedir ,
300+ instance_config .xlog_seg_size );
318301 }
319302
320303 /*
@@ -400,9 +383,60 @@ start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path, ConnectionOption
400383int
401384wait_WAL_streaming_end (parray * backup_files_list )
402385{
403- parray_concat (backup_files_list , xlog_files_list );
404- parray_free (xlog_files_list );
386+ pthread_join (stream_thread , NULL );
405387
406- pthread_join (stream_thread , NULL );
407- return stream_thread_arg .ret ;
388+ parray_concat (backup_files_list , xlog_files_list );
389+ parray_free (xlog_files_list );
390+ return stream_thread_arg .ret ;
408391}
392+
393+ /* Append streamed WAL segment to filelist */
394+ void
395+ append_wal_segment (parray * filelist , uint32 timeline , XLogRecPtr xlogpos , char * basedir , uint32 xlog_seg_size )
396+ {
397+ XLogSegNo xlog_segno ;
398+ char wal_segment_name [MAXFNAMELEN ];
399+ char wal_segment_relpath [MAXPGPATH ];
400+ char wal_segment_fullpath [MAXPGPATH ];
401+ pgFile * file = NULL ;
402+
403+ GetXLogSegNo (xlogpos , xlog_segno , xlog_seg_size );
404+
405+ /*
406+ * xlogpos points to the current segment, and we need the finished - previous one
407+ * inless xlogpos points to not 0 offset in segment
408+ */
409+ if (WalSegmentOffset (xlogpos , xlog_seg_size ) == 0 )
410+ xlog_segno -- ;
411+
412+ GetXLogFileName (wal_segment_name , timeline , xlog_segno , xlog_seg_size );
413+
414+ join_path_components (wal_segment_fullpath , basedir , wal_segment_name );
415+ join_path_components (wal_segment_relpath , PG_XLOG_DIR , wal_segment_name );
416+
417+ file = pgFileNew (wal_segment_fullpath , wal_segment_relpath , false, 0 , FIO_BACKUP_HOST );
418+ file -> name = file -> rel_path ;
419+
420+ /*
421+ * Check if file is already in the list
422+ * stop_lsn segment can be added to this list twice, so
423+ * try not to add duplicates
424+ */
425+ if (parray_bsearch (filelist , file , pgFileCompareRelPathWithExternal ))
426+ {
427+ if (!parray_rm (filelist , file , pgFileCompareRelPathWithExternal ))
428+ elog (ERROR , "Failed to remove duplicate from array of streamed segments: %s" ,
429+ file -> rel_path );
430+ }
431+
432+ /* calculate crc */
433+ file -> crc = pgFileGetCRC (wal_segment_fullpath , true, false);
434+
435+ /* Should we recheck it using stat? */
436+ file -> write_size = xlog_seg_size ;
437+ file -> uncompressed_size = xlog_seg_size ;
438+
439+ /* append file to filelist */
440+ elog (VERBOSE , "Append WAL segment: \"%s\"" , wal_segment_relpath );
441+ parray_append (filelist , file );
442+ }
0 commit comments