Skip to content

Commit 767818b

Browse files
committed
LWLocks are used instead locks.
Change deadlock test logic, error response are normal in such cases.
1 parent ad0e674 commit 767818b

4 files changed

Lines changed: 58 additions & 60 deletions

File tree

pg_query_state.c

Lines changed: 41 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ static shm_toc *toc = NULL;
9191
pg_qs_params *params = NULL;
9292
shm_mq *mq = NULL;
9393
uint32 *mq_req_id = NULL;
94+
static LWLock *shmem_locks;
9495

9596
/*
9697
* Estimate amount of shared memory needed.
@@ -158,6 +159,7 @@ pg_qs_shmem_startup(void)
158159
mq_req_id = shm_toc_lookup(toc, num_toc++, false);
159160
#endif
160161
}
162+
shmem_locks = &(GetNamedLWLockTranche("pg_query_state"))->lock;
161163
LWLockRelease(AddinShmemInitLock);
162164

163165
if (prev_shmem_startup_hook)
@@ -185,6 +187,7 @@ _PG_init(void)
185187
shmem_request_hook = pg_qs_shmem_request;
186188
#else
187189
RequestAddinShmemSpace(pg_qs_shmem_size());
190+
RequestNamedLWLockTranche("pg_query_state", 2);
188191
#endif
189192

190193
/* Register interrupt on custom signal of polling query state */
@@ -252,6 +255,7 @@ pg_qs_shmem_request(void)
252255
prev_shmem_request_hook();
253256

254257
RequestAddinShmemSpace(pg_qs_shmem_size());
258+
RequestNamedLWLockTranche("pg_query_state", 2);
255259
}
256260
#endif
257261

@@ -407,24 +411,25 @@ search_be_status(int pid)
407411

408412

409413
void
410-
UnlockShmem(LOCKTAG *tag)
414+
UnlockShmem(uint32 key)
411415
{
412-
LockRelease(tag, ExclusiveLock, false);
416+
LWLock *lock;
417+
418+
Assert(key <= PG_QS_SND_KEY);
419+
lock = (LWLock *) ((LWLockPadded *) shmem_locks + key);
420+
Assert(LWLockHeldByMe(lock));
421+
LWLockRelease(lock);
413422
}
414423

415424
void
416-
LockShmem(LOCKTAG *tag, uint32 key)
425+
LockShmem(uint32 key)
417426
{
418-
LockAcquireResult result;
419-
tag->locktag_field1 = PG_QS_MODULE_KEY;
420-
tag->locktag_field2 = key;
421-
tag->locktag_field3 = 0;
422-
tag->locktag_field4 = 0;
423-
tag->locktag_type = LOCKTAG_USERLOCK;
424-
tag->locktag_lockmethodid = USER_LOCKMETHOD;
425-
result = LockAcquire(tag, ExclusiveLock, false, false);
426-
Assert(result == LOCKACQUIRE_OK);
427-
elog(DEBUG1, "LockAcquireResult is not OK %d", result);
427+
LWLock *lock;
428+
429+
Assert(key <= PG_QS_SND_KEY);
430+
lock = (LWLock *) ((LWLockPadded *) shmem_locks + key);
431+
Assert(!LWLockHeldByMe(lock));
432+
LWLockAcquire(lock, LW_EXCLUSIVE);
428433
}
429434

430435

@@ -507,7 +512,6 @@ pg_query_state(PG_FUNCTION_ARGS)
507512

508513
if (SRF_IS_FIRSTCALL())
509514
{
510-
LOCKTAG tag;
511515
bool verbose = PG_GETARG_BOOL(1),
512516
costs = PG_GETARG_BOOL(2),
513517
timing = PG_GETARG_BOOL(3),
@@ -556,14 +560,14 @@ pg_query_state(PG_FUNCTION_ARGS)
556560
* init and acquire lock so that any other concurrent calls of this fuction
557561
* can not occupy shared queue for transfering query state
558562
*/
559-
LockShmem(&tag, PG_QS_RCV_KEY);
563+
LockShmem(PG_QS_RCV_KEY);
560564

561565
reqid = pg_atomic_add_fetch_u32(&params->cur_reqid, 1);
562566

563567
counterpart_user_id = GetRemoteBackendUserId(proc);
564568
if (!(superuser() || GetUserId() == counterpart_user_id))
565569
{
566-
UnlockShmem(&tag);
570+
UnlockShmem(PG_QS_RCV_KEY);
567571
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
568572
errmsg("permission denied")));
569573
}
@@ -583,7 +587,7 @@ pg_query_state(PG_FUNCTION_ARGS)
583587
if (list_length(msgs) == 0)
584588
{
585589
elog(WARNING, "backend does not reply");
586-
UnlockShmem(&tag);
590+
UnlockShmem(PG_QS_RCV_KEY);
587591
SRF_RETURN_DONE(funcctx);
588592
}
589593

@@ -600,12 +604,12 @@ pg_query_state(PG_FUNCTION_ARGS)
600604
else
601605
elog(INFO, "backend is not running query");
602606

603-
UnlockShmem(&tag);
607+
UnlockShmem(PG_QS_RCV_KEY);
604608
SRF_RETURN_DONE(funcctx);
605609
}
606610
case STAT_DISABLED:
607611
elog(INFO, "query execution statistics disabled");
608-
UnlockShmem(&tag);
612+
UnlockShmem(PG_QS_RCV_KEY);
609613
SRF_RETURN_DONE(funcctx);
610614
case QS_RETURNED:
611615
{
@@ -667,7 +671,7 @@ pg_query_state(PG_FUNCTION_ARGS)
667671
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "leader_pid", INT4OID, -1, 0);
668672
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
669673

670-
UnlockShmem(&tag);
674+
UnlockShmem(PG_QS_RCV_KEY);
671675
MemoryContextSwitchTo(oldcontext);
672676
}
673677
break;
@@ -736,7 +740,6 @@ GetRemoteBackendUserId(PGPROC *proc)
736740
shm_mq_result mq_receive_result;
737741
shm_mq_userid_msg *msg;
738742
Size msg_len;
739-
LOCKTAG tag;
740743

741744
#if PG_VERSION_NUM >= 170000
742745
Assert(proc && proc->vxid.procNumber != INVALID_PROC_NUMBER);
@@ -747,12 +750,12 @@ GetRemoteBackendUserId(PGPROC *proc)
747750
Assert(UserIdPollReason != INVALID_PROCSIGNAL);
748751
Assert(mq);
749752

750-
LockShmem(&tag, PG_QS_SND_KEY);
753+
LockShmem(PG_QS_SND_KEY);
751754
mq = shm_mq_create(mq, QUEUE_SIZE);
752755
shm_mq_set_sender(mq, proc);
753756
shm_mq_set_receiver(mq, MyProc);
754757
*mq_req_id = pg_atomic_read_u32(&params->cur_reqid);
755-
UnlockShmem(&tag);
758+
UnlockShmem(PG_QS_SND_KEY);
756759

757760
#if PG_VERSION_NUM >= 170000
758761
sig_result = SendProcSignal(proc->pid, UserIdPollReason, proc->vxid.procNumber);
@@ -897,10 +900,9 @@ SendBgWorkerPids(void)
897900
int msg_len;
898901
int i;
899902
shm_mq_handle *mqh;
900-
LOCKTAG tag;
901903
msg_by_parts_result result;
902904

903-
LockShmem(&tag, PG_QS_SND_KEY);
905+
LockShmem(PG_QS_SND_KEY);
904906
mqh = shm_mq_attach(mq, NULL, NULL);
905907

906908
if (*mq_req_id != pg_atomic_read_u32(&params->cur_reqid) || shm_mq_get_sender(mq) != MyProc)
@@ -911,7 +913,7 @@ SendBgWorkerPids(void)
911913
#else
912914
shm_mq_detach(mqh);
913915
#endif
914-
UnlockShmem(&tag);
916+
UnlockShmem(PG_QS_SND_KEY);
915917
return;
916918
}
917919

@@ -950,7 +952,7 @@ SendBgWorkerPids(void)
950952
shm_mq_detach(mqh);
951953
#endif
952954
}
953-
UnlockShmem(&tag);
955+
UnlockShmem(PG_QS_SND_KEY);
954956
}
955957

956958
/*
@@ -966,7 +968,6 @@ GetRemoteBackendWorkers(PGPROC *proc)
966968
Size msg_len;
967969
int i;
968970
List *result = NIL;
969-
LOCKTAG tag;
970971
bool mqh_attached = false;
971972

972973
#if PG_VERSION_NUM >= 170000
@@ -978,12 +979,12 @@ GetRemoteBackendWorkers(PGPROC *proc)
978979
Assert(WorkerPollReason != INVALID_PROCSIGNAL);
979980
Assert(mq);
980981

981-
LockShmem(&tag, PG_QS_SND_KEY);
982+
LockShmem(PG_QS_SND_KEY);
982983
mq = shm_mq_create(mq, QUEUE_SIZE);
983984
shm_mq_set_sender(mq, proc);
984985
shm_mq_set_receiver(mq, MyProc);
985986
*mq_req_id = pg_atomic_read_u32(&params->cur_reqid);
986-
UnlockShmem(&tag);
987+
UnlockShmem(PG_QS_SND_KEY);
987988

988989
#if PG_VERSION_NUM >= 170000
989990
sig_result = SendProcSignal(proc->pid, WorkerPollReason, proc->vxid.procNumber);
@@ -1138,7 +1139,6 @@ GetRemoteBackendQueryStates(PGPROC *leader,
11381139
shm_mq_result mq_receive_result;
11391140
shm_mq_msg *msg;
11401141
Size len;
1141-
LOCKTAG tag;
11421142

11431143
Assert(QueryStatePollReason != INVALID_PROCSIGNAL);
11441144
Assert(mq);
@@ -1153,12 +1153,12 @@ GetRemoteBackendQueryStates(PGPROC *leader,
11531153
pg_write_barrier();
11541154

11551155
/* initialize message queue that will transfer query states */
1156-
LockShmem(&tag, PG_QS_SND_KEY);
1156+
LockShmem(PG_QS_SND_KEY);
11571157
mq = shm_mq_create(mq, QUEUE_SIZE);
11581158
shm_mq_set_sender(mq, leader);
11591159
shm_mq_set_receiver(mq, MyProc);
11601160
*mq_req_id = pg_atomic_read_u32(&params->cur_reqid);
1161-
UnlockShmem(&tag);
1161+
UnlockShmem(PG_QS_SND_KEY);
11621162

11631163
/*
11641164
* send signal `QueryStatePollReason` to all processes and define all alive
@@ -1207,14 +1207,14 @@ GetRemoteBackendQueryStates(PGPROC *leader,
12071207

12081208
/* prepare message queue to transfer data */
12091209
elog(DEBUG1, "Wait response from worker %d", proc->pid);
1210-
LockShmem(&tag, PG_QS_SND_KEY);
1210+
LockShmem(PG_QS_SND_KEY);
12111211
mq = shm_mq_create(mq, QUEUE_SIZE);
12121212
shm_mq_set_sender(mq, proc);
12131213
shm_mq_set_receiver(mq, MyProc); /* this function notifies the
12141214
counterpart to come into data
12151215
transfer */
12161216
*mq_req_id = pg_atomic_read_u32(&params->cur_reqid);
1217-
UnlockShmem(&tag);
1217+
UnlockShmem(PG_QS_SND_KEY);
12181218

12191219
#if PG_VERSION_NUM >= 170000
12201220
sig_result = SendProcSignal(proc->pid,
@@ -1438,7 +1438,6 @@ pg_progress_bar(PG_FUNCTION_ARGS)
14381438
List *msgs;
14391439
double progress;
14401440
double old_progress;
1441-
LOCKTAG tag;
14421441

14431442
if (PG_NARGS() == 2)
14441443
{
@@ -1472,7 +1471,7 @@ pg_progress_bar(PG_FUNCTION_ARGS)
14721471
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
14731472
errmsg("backend with pid=%d not found", pid)));
14741473

1475-
LockShmem(&tag, PG_QS_RCV_KEY);
1474+
LockShmem(PG_QS_RCV_KEY);
14761475

14771476
if (SRF_IS_FIRSTCALL())
14781477
{
@@ -1500,19 +1499,19 @@ pg_progress_bar(PG_FUNCTION_ARGS)
15001499
{
15011500
case QUERY_NOT_RUNNING:
15021501
elog(INFO, "query not runing");
1503-
UnlockShmem(&tag);
1502+
UnlockShmem(PG_QS_RCV_KEY);
15041503
PG_RETURN_FLOAT8((float8) -1);
15051504
break;
15061505
case STAT_DISABLED:
15071506
elog(INFO, "query execution statistics disabled");
1508-
UnlockShmem(&tag);
1507+
UnlockShmem(PG_QS_RCV_KEY);
15091508
PG_RETURN_FLOAT8((float8) -1);
15101509
default:
15111510
break;
15121511
}
15131512
if (msg->result_code == QS_RETURNED && delay == 0)
15141513
{
1515-
UnlockShmem(&tag);
1514+
UnlockShmem(PG_QS_RCV_KEY);
15161515
progress = GetCurrentNumericState(msg);
15171516
if (progress < 0)
15181517
{
@@ -1555,9 +1554,9 @@ pg_progress_bar(PG_FUNCTION_ARGS)
15551554
}
15561555
if (progress > -1)
15571556
elog(INFO, "\rProgress = 1.000000");
1558-
UnlockShmem(&tag);
1557+
UnlockShmem(PG_QS_RCV_KEY);
15591558
PG_RETURN_FLOAT8((float8) 1);
15601559
}
1561-
UnlockShmem(&tag);
1560+
UnlockShmem(PG_QS_RCV_KEY);
15621561
PG_RETURN_FLOAT8((float8) -1);
15631562
}

pg_query_state.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ extern uint32 *mq_req_id;
112112
/* signal_handler.c */
113113
extern void SendQueryState(void);
114114
extern void SendCurrentUserId(void);
115-
extern void UnlockShmem(LOCKTAG *tag);
116-
extern void LockShmem(LOCKTAG *tag, uint32 key);
115+
extern void UnlockShmem(uint32 key);
116+
extern void LockShmem(uint32 key);
117117
extern msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data);
118118

119119
#endif

signal_handler.c

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -218,17 +218,16 @@ void
218218
SendQueryState(void)
219219
{
220220
shm_mq_handle *mqh;
221-
LOCKTAG tag;
222221

223-
LockShmem(&tag, PG_QS_SND_KEY);
222+
LockShmem(PG_QS_SND_KEY);
224223

225224
elog(DEBUG1, "Worker %d receives pg_query_state request from %d", shm_mq_get_sender(mq)->pid, shm_mq_get_receiver(mq)->pid);
226225
mqh = shm_mq_attach(mq, NULL, NULL);
227226

228227
if (*mq_req_id != pg_atomic_read_u32(&params->cur_reqid) || shm_mq_get_sender(mq) != MyProc)
229228
{
230229
elog(WARNING, "could not send message queue to shared-memory queue: receiver has been interrupted and new request is being processed now.");
231-
UnlockShmem(&tag);
230+
UnlockShmem(PG_QS_SND_KEY);
232231
return;
233232
}
234233
/* check if module is enabled */
@@ -277,7 +276,7 @@ SendQueryState(void)
277276
}
278277
}
279278
elog(DEBUG1, "Worker %d sends response for pg_query_state to %d", shm_mq_get_sender(mq)->pid, shm_mq_get_receiver(mq)->pid);
280-
UnlockShmem(&tag);
279+
UnlockShmem(PG_QS_SND_KEY);
281280

282281
return;
283282

@@ -287,18 +286,17 @@ SendQueryState(void)
287286
#else
288287
shm_mq_detach(mqh);
289288
#endif
290-
UnlockShmem(&tag);
289+
UnlockShmem(PG_QS_SND_KEY);
291290
}
292291

293292
void
294293
SendCurrentUserId(void)
295294
{
296295
shm_mq_handle *mqh;
297296
shm_mq_userid_msg msg;
298-
LOCKTAG tag;
299297

300298
msg.userid = GetUserId();
301-
LockShmem(&tag, PG_QS_SND_KEY);
299+
LockShmem(PG_QS_SND_KEY);
302300

303301
mqh = shm_mq_attach(mq, NULL, NULL);
304302
msg.reqid = *mq_req_id;
@@ -312,5 +310,5 @@ SendCurrentUserId(void)
312310
#else
313311
shm_mq_detach(mqh);
314312
#endif
315-
UnlockShmem(&tag);
313+
UnlockShmem(PG_QS_SND_KEY);
316314
}

tests/test_cases.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,23 @@ def test_deadlock(config):
2020
acon1, acon2 = common.n_async_connect(config, 2)
2121
acurs1 = acon1.cursor()
2222
acurs2 = acon2.cursor()
23+
checkFurther = True
2324

24-
while True:
25+
while checkFurther:
2526
acurs1.callproc('pg_query_state', (acon2.get_backend_pid(),))
2627
acurs2.callproc('pg_query_state', (acon1.get_backend_pid(),))
2728

2829
# listen acon1, acon2 with timeout = 10 sec to determine deadlock
2930
r, w, x = select.select([acon1.fileno(), acon2.fileno()], [], [], 10)
3031
assert (r or w or x), "Deadlock is happened under cross reading of query states"
3132

32-
common.wait(acon1)
33-
common.wait(acon2)
34-
35-
# exit from loop if one backend could read state of execution 'pg_query_state'
36-
# from other backend
37-
if acurs1.fetchone() or acurs2.fetchone():
38-
break
33+
for acon in [acon1, acon2]:
34+
try:
35+
common.wait(acon)
36+
except psycopg2.errors.InternalError as e:
37+
assert(checkFurther), "Failure should occur once"
38+
assert(e.pgcode == "XX000")
39+
checkFurther = False
3940

4041
common.n_close((acon1, acon2))
4142

0 commit comments

Comments
 (0)