diff --git a/omnilink/concurrentqueue/ConcurrentQueueAPI.tla b/omnilink/concurrentqueue/ConcurrentQueueAPI.tla index 560fb45b..d2457eab 100644 --- a/omnilink/concurrentqueue/ConcurrentQueueAPI.tla +++ b/omnilink/concurrentqueue/ConcurrentQueueAPI.tla @@ -202,8 +202,8 @@ Next == QueueTryEnqueueBulk(elements, count, success, thread) \/ \E thread \in Threads, element \in Elements, success \in BOOLEAN : QueueTryDequeue(element, success, thread) - \/ \E elements \in BulkBufferSet, max \in 1..MaxBulkSize, count \in 0..MaxBulkSize, producers \in Seq(Threads) : - QueueTryDequeueBulk(elements, max, count, producers) + \* \/ \E elements \in BulkBufferSet, max \in 1..MaxBulkSize, count \in 0..MaxBulkSize, producers \in Seq(Threads) : + \* QueueTryDequeueBulk(elements, max, count, producers) \* \/ \E thread \in Threads, element \in Elements, success \in BOOLEAN : \* QueueTryDequeueFromProducer(thread, element, success, thread) \* \/ \E thread \in Threads, elements \in BulkBufferSet, max \in 1..MaxBulkSize, count \in 0..MaxBulkSize, producers \in Seq(Threads) : diff --git a/omnilink/concurrentqueue/MCConcurrentQueueAPIValidate.cfg b/omnilink/concurrentqueue/MCConcurrentQueueAPIValidate.cfg index 1694c68f..46fb2c3e 100644 --- a/omnilink/concurrentqueue/MCConcurrentQueueAPIValidate.cfg +++ b/omnilink/concurrentqueue/MCConcurrentQueueAPIValidate.cfg @@ -4,9 +4,10 @@ NEXT Next CONSTANTS Elements <- ElementsImpl Threads <- ThreadsImpl - - MaxElements = 5 - MaxBulkSize = 3 + \* MaxElements <- MaxElementsImpl + \* MaxBulkSize <- MaxBulkSizeImpl + MaxElements = 1000 + MaxBulkSize = 5 INVARIANTS TypeOK @@ -17,5 +18,5 @@ ALIAS DebugAlias POSTCONDITION PostCondition CHECK_DEADLOCK FALSE -VIEW PragmaticView +\* VIEW PragmaticView diff --git a/omnilink/concurrentqueue/MCConcurrentQueueAPIValidate.tla b/omnilink/concurrentqueue/MCConcurrentQueueAPIValidate.tla index e98af5e2..ce7d1e08 100644 --- a/omnilink/concurrentqueue/MCConcurrentQueueAPIValidate.tla +++ b/omnilink/concurrentqueue/MCConcurrentQueueAPIValidate.tla @@ -33,7 +33,38 @@ ThreadsImpl == TLCCache( : i \in DOMAIN __traces[t] } : t \in DOMAIN __traces }, {"ThreadsImpl"}) +\* MaxOrZero(S) == +\* IF S = {} THEN 0 +\* ELSE CHOOSE m \in S : \A x \in S : x <= m + +\* MaxElementsImpl == TLCCache( +\* LET diffs == UNION { +\* UNION { +\* IF "state" \in DOMAIN __traces[t][i] +\* /\ "enqueued" \in DOMAIN __traces[t][i].state +\* /\ "dequeued" \in DOMAIN __traces[t][i].state +\* THEN { __traces[t][i].state.enqueued - __traces[t][i].state.dequeued } +\* ELSE {} +\* : i \in DOMAIN __traces[t] } +\* : t \in DOMAIN __traces } +\* IN MaxOrZero(diffs), +\* {"MaxElementsImpl"} +\* ) +\* MaxBulkSizeImpl == TLCCache( +\* LET Ops == +\* UNION { { __traces[t][i].operation : +\* i \in DOMAIN __traces[t] } : +\* t \in DOMAIN __traces } +\* OpsWithEls == +\* { op \in Ops : "elements" \in DOMAIN op } +\* Counts == +\* { Len(op.elements) : op \in OpsWithEls } +\* IN MaxOrZero(Counts), +\* {"MaxBulkSizeImpl"} +\* ) + + DebugAlias == __TraceOps!DebugAlias PostCondition == __TraceOps!PostCondition diff --git a/omnilink/concurrentqueue/package.mill b/omnilink/concurrentqueue/package.mill index c6ee933b..75afdd1e 100644 --- a/omnilink/concurrentqueue/package.mill +++ b/omnilink/concurrentqueue/package.mill @@ -13,8 +13,8 @@ object `package` extends Module: object v1_0_4 extends CQTracingConfigModule: def nixName = "omnilink.concurrentqueue.workload" object defaultConfig extends ConfigModule: - def threadCount = 2 - def operationCount = 1 + def threadCount = 5 + def operationCount = 10 end defaultConfig end v1_0_4 diff --git a/omnilink/concurrentqueue/workload/main.cpp b/omnilink/concurrentqueue/workload/main.cpp index 1fecb523..fedb9382 100644 --- a/omnilink/concurrentqueue/workload/main.cpp +++ b/omnilink/concurrentqueue/workload/main.cpp @@ -215,7 +215,6 @@ struct QueueWorkloadContext: public omnilink::WorkloadContext& ctx) { - int32_t max = rand_bulk_size(); - std::vector elements(max); - size_t dequeued = workload_context.queue.try_dequeue_bulk(elements.data(), max); - if (dequeued < static_cast(elements.size())) { - elements.resize(dequeued); - } - std::vector producer_threads; - if (dequeued > 0) { - if (auto resolved = workload_context.resolve_producers_for_bulk(elements.data(), dequeued)) { - producer_threads = std::move(*resolved); - } - } - ctx.op = ConcurrentQueueAPI::QueueTryDequeueBulk{elements, max, static_cast(dequeued)}; - std::map meta{ - {"consumer_thread", static_cast(thread_idx)} - }; - if (!producer_threads.empty()) { - meta["producer_threads"] = producer_threads; - ctx.op.producers = producer_threads; - } else { - ctx.op.producers = std::vector{}; - } - ctx.op._meta = std::move(meta); - } + // void perform_operation(Ctx& ctx) { + // int32_t max = rand_bulk_size(); + // std::vector elements(max); + // size_t dequeued = workload_context.queue.try_dequeue_bulk(elements.data(), max); + // if (dequeued < static_cast(elements.size())) { + // elements.resize(dequeued); + // } + // std::vector producer_threads; + // if (dequeued > 0) { + // if (auto resolved = workload_context.resolve_producers_for_bulk(elements.data(), dequeued)) { + // producer_threads = std::move(*resolved); + // } + // } + // ctx.op = ConcurrentQueueAPI::QueueTryDequeueBulk{elements, max, static_cast(dequeued)}; + // std::map meta{ + // {"consumer_thread", static_cast(thread_idx)} + // }; + // if (!producer_threads.empty()) { + // meta["producer_threads"] = producer_threads; + // ctx.op.producers = producer_threads; + // } else { + // ctx.op.producers = std::vector{}; + // } + // ctx.op._meta = std::move(meta); + // } // void perform_operation(Ctx& ctx) { // int32_t max = rand_bulk_size();