@@ -16,10 +16,12 @@ import (
1616 "github.com/fly-apps/postgres-flex/internal/privnet"
1717 "github.com/fly-apps/postgres-flex/internal/utils"
1818 "github.com/jackc/pgx/v5"
19+ "golang.org/x/exp/slices"
1920)
2021
2122type Node struct {
2223 AppName string
24+ MachineID string
2325 PrivateIP string
2426 PrimaryRegion string
2527 DataDir string
@@ -52,6 +54,8 @@ func NewNode() (*Node, error) {
5254
5355 node .PrivateIP = ipv6 .String ()
5456
57+ node .MachineID = os .Getenv ("FLY_MACHINE_ID" )
58+
5559 node .PrimaryRegion = os .Getenv ("PRIMARY_REGION" )
5660 if node .PrimaryRegion == "" {
5761 return nil , fmt .Errorf ("PRIMARY_REGION environment variable must be set" )
@@ -88,7 +92,9 @@ func NewNode() (*Node, error) {
8892 UserConfigPath : "/data/repmgr.user.conf" ,
8993 PasswordConfigPath : "/data/.pgpass" ,
9094 DataDir : node .DataDir ,
95+ HostName : node .Hostname (),
9196 PrivateIP : node .PrivateIP ,
97+ MachineID : node .MachineID ,
9298 Port : 5433 ,
9399 DatabaseName : "repmgr" ,
94100 Credentials : node .ReplCredentials ,
@@ -182,7 +188,7 @@ func (n *Node) Init(ctx context.Context) error {
182188 }
183189 } else {
184190 log .Println ("Provisioning standby" )
185- cloneTarget , err := n .RepMgr .ResolveMemberOverDNS (ctx )
191+ cloneTarget , err := n .RepMgr .ResolvePrimaryOverDNS (ctx )
186192 if err != nil {
187193 return fmt .Errorf ("failed to resolve member over dns: %s" , err )
188194 }
@@ -225,14 +231,6 @@ func (n *Node) Init(ctx context.Context) error {
225231
226232// PostInit are operations that need to be executed against a running Postgres on boot.
227233func (n * Node ) PostInit (ctx context.Context ) error {
228- if ZombieLockExists () {
229- log .Println ("[ERROR] Manual intervention required." )
230- log .Println ("[ERROR] If a new primary has been established, consider adding a new replica with `fly machines clone <primary-machine-id>` and then remove this member." )
231- log .Println ("[ERROR] Sleeping for 5 minutes." )
232- time .Sleep (5 * time .Minute )
233- return fmt .Errorf ("unrecoverable zombie" )
234- }
235-
236234 // Use the Postgres user on boot, since our internal user may not have been created yet.
237235 conn , err := n .NewLocalConnection (ctx , "postgres" , n .OperatorCredentials )
238236 if err != nil {
@@ -265,7 +263,7 @@ func (n *Node) PostInit(ctx context.Context) error {
265263 return fmt .Errorf ("failed to resolve member role: %s" , err )
266264 }
267265
268- // Restart repmgrd in the event the IP changes for an already registered node.
266+ // Restart repmgrd in the event the machine ID changes for an already registered node.
269267 // This can happen if the underlying volume is moved to a different node.
270268 daemonRestartRequired := n .RepMgr .daemonRestartRequired (member )
271269
@@ -291,14 +289,22 @@ func (n *Node) PostInit(ctx context.Context) error {
291289 return fmt .Errorf ("failed to run zombie diagnosis: %s" , err )
292290 }
293291
294- // This should never happen
295- if primary != n .PrivateIP {
292+ // This should never happen, but check anyways for correctness
293+ if primary != n .Hostname () {
296294 return fmt .Errorf ("resolved primary '%s' does not match ourself '%s'. this should not happen" ,
297295 primary ,
298- n .PrivateIP ,
296+ n .Hostname () ,
299297 )
300298 }
301299
300+ // Clear the zombie lock if it exists.
301+ if ZombieLockExists () {
302+ log .Println ("[INFO] Clearing zombie lock and re-enabling read/write" )
303+ if err := RemoveZombieLock (); err != nil {
304+ return fmt .Errorf ("failed to remove zombie lock: %s" , err )
305+ }
306+ }
307+
302308 // Re-register primary to apply any configuration changes.
303309 if err := n .RepMgr .registerPrimary (daemonRestartRequired ); err != nil {
304310 return fmt .Errorf ("failed to re-register existing primary: %s" , err )
@@ -311,6 +317,10 @@ func (n *Node) PostInit(ctx context.Context) error {
311317 }
312318 }
313319 case StandbyRoleName :
320+ if err := n .migrateNodeNameIfNeeded (ctx , repConn ); err != nil {
321+ return fmt .Errorf ("failed to migrate node name: %s" , err )
322+ }
323+
314324 // Register existing standby to apply any configuration changes.
315325 if err := n .RepMgr .registerStandby (daemonRestartRequired ); err != nil {
316326 return fmt .Errorf ("failed to register existing standby: %s" , err )
@@ -399,7 +409,7 @@ func (n *Node) PostInit(ctx context.Context) error {
399409 return fmt .Errorf ("failed to enable repmgr: %s" , err )
400410 }
401411
402- primary , err := n .RepMgr .ResolveMemberOverDNS (ctx )
412+ primary , err := n .RepMgr .ResolvePrimaryOverDNS (ctx )
403413 if err != nil {
404414 return fmt .Errorf ("failed to resolve primary member: %s" , err )
405415 }
@@ -527,3 +537,55 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro
527537
528538 return nil
529539}
540+
541+ // migrate node name from 6pn to machine ID if needed
542+ func (n * Node ) migrateNodeNameIfNeeded (ctx context.Context , repConn * pgx.Conn ) error {
543+ primary , err := n .RepMgr .PrimaryMember (ctx , repConn )
544+ if err != nil {
545+ return fmt .Errorf ("failed to resolve primary member when updating standby: %s" , err )
546+ }
547+
548+ primaryConn , err := n .RepMgr .NewRemoteConnection (ctx , primary .Hostname )
549+ if err != nil {
550+ return fmt .Errorf ("failed to establish connection to primary: %s" , err )
551+ }
552+ defer func () { _ = primaryConn .Close (ctx ) }()
553+
554+ rows , err := primaryConn .Query (ctx , "select application_name from pg_stat_replication" )
555+ if err != nil {
556+ return fmt .Errorf ("failed to query pg_stat_replication: %s" , err )
557+ }
558+ defer rows .Close ()
559+
560+ var applicationNames []string
561+ for rows .Next () {
562+ var applicationName string
563+ if err := rows .Scan (& applicationName ); err != nil {
564+ return fmt .Errorf ("failed to scan application_name: %s" , err )
565+ }
566+ applicationNames = append (applicationNames , applicationName )
567+ }
568+ if err := rows .Err (); err != nil {
569+ return fmt .Errorf ("failed to iterate over rows: %s" , err )
570+ }
571+
572+ // if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql
573+ if slices .Contains (applicationNames , n .PrivateIP ) {
574+ log .Printf ("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID..." )
575+
576+ if err := n .RepMgr .regenReplicationConf (ctx ); err != nil {
577+ return fmt .Errorf ("failed to clone standby: %s" , err )
578+ }
579+
580+ if err := admin .ReloadPostgresConfig (ctx , repConn ); err != nil {
581+ return fmt .Errorf ("failed to reload postgresql: %s" , err )
582+ }
583+ }
584+
585+ return nil
586+ }
587+
588+ // Hostname returns the hostname of the node.
589+ func (n * Node ) Hostname () string {
590+ return fmt .Sprintf ("%s.vm.%s.internal" , n .MachineID , n .AppName )
591+ }
0 commit comments