|
| 1 | +// Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +// or more contributor license agreements. See the NOTICE file |
| 3 | +// distributed with this work for additional information |
| 4 | +// regarding copyright ownership. The ASF licenses this file |
| 5 | +// to you under the Apache License, Version 2.0 (the |
| 6 | +// "License"); you may not use this file except in compliance |
| 7 | +// with the License. You may obtain a copy of the License at |
| 8 | +// |
| 9 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +// |
| 11 | +// Unless required by applicable law or agreed to in writing, |
| 12 | +// software distributed under the License is distributed on an |
| 13 | +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +// KIND, either express or implied. See the License for the |
| 15 | +// specific language governing permissions and limitations |
| 16 | +// under the License. |
| 17 | + |
| 18 | +#include <gtest/gtest.h> |
| 19 | +#include <bthread/bthread.h> |
| 20 | +#include <bthread/unstable.h> |
| 21 | +#include <butil/logging.h> |
| 22 | +#include <butil/time.h> |
| 23 | +#include <set> |
| 24 | +#include <mutex> |
| 25 | + |
| 26 | +namespace { |
| 27 | + |
| 28 | +// Mock context to simulate per-thread state (e.g., io_uring ring) |
| 29 | +struct MockWorkerContext { |
| 30 | + int worker_id; |
| 31 | + int poll_count; |
| 32 | + |
| 33 | + MockWorkerContext() : worker_id(-1), poll_count(0) {} |
| 34 | +}; |
| 35 | + |
| 36 | +// Thread-local storage to simulate "shared-nothing" architecture |
| 37 | +// In a real scenario, this would hold something like an io_uring instance. |
| 38 | +static __thread MockWorkerContext* tls_context = nullptr; |
| 39 | + |
| 40 | +// Set to collect all unique worker IDs we've seen |
| 41 | +static std::set<int> observed_worker_ids; |
| 42 | +static std::mutex stats_mutex; |
| 43 | + |
| 44 | +// The idle callback function |
| 45 | +// Access per-worker resources via TLS (simulating io_uring per worker) |
| 46 | +bool MockIdlePoller() { |
| 47 | + if (!tls_context) { |
| 48 | + tls_context = new MockWorkerContext(); |
| 49 | + // Use pthread_self or a counter to assign a unique ID |
| 50 | + static std::atomic<int> global_worker_counter(0); |
| 51 | + tls_context->worker_id = global_worker_counter.fetch_add(1); |
| 52 | + |
| 53 | + std::lock_guard<std::mutex> lock(stats_mutex); |
| 54 | + observed_worker_ids.insert(tls_context->worker_id); |
| 55 | + LOG(INFO) << "Worker thread " << pthread_self() << " initialized with ID " << tls_context->worker_id; |
| 56 | + } |
| 57 | + |
| 58 | + tls_context->poll_count++; |
| 59 | + |
| 60 | + // Simulate some work occasionally to wake up the worker immediately |
| 61 | + // For this test, we mostly want to verify it runs and has correct context |
| 62 | + if (tls_context->poll_count % 100 == 0) { |
| 63 | + return true; // Pretend we found work |
| 64 | + } |
| 65 | + |
| 66 | + return false; // Sleep with timeout |
| 67 | +} |
| 68 | + |
| 69 | +class IdleCallbackTest : public ::testing::Test { |
| 70 | +protected: |
| 71 | + void SetUp() override { |
| 72 | + // Reset state |
| 73 | + observed_worker_ids.clear(); |
| 74 | + } |
| 75 | + |
| 76 | + void TearDown() override { |
| 77 | + // Clean up global callback to avoid affecting other tests |
| 78 | + bthread_set_worker_idle_callback(nullptr, 0); |
| 79 | + } |
| 80 | +}; |
| 81 | + |
| 82 | +void* dummy_task(void* arg) { |
| 83 | + bthread_usleep(1000); // Sleep 1ms to allow workers to go idle |
| 84 | + return nullptr; |
| 85 | +} |
| 86 | + |
| 87 | +TEST_F(IdleCallbackTest, WorkerIsolationAndExecution) { |
| 88 | + // 1. Set the idle callback with a short timeout (e.g., 1ms) |
| 89 | + ASSERT_EQ(0, bthread_set_worker_idle_callback(MockIdlePoller, 1000)); |
| 90 | + |
| 91 | + // 2. Determine number of workers (concurrency) |
| 92 | + int concurrency = bthread_getconcurrency(); |
| 93 | + LOG(INFO) << "Current concurrency: " << concurrency; |
| 94 | + |
| 95 | + // 3. Create enough bthreads to ensure all workers are activated at least once |
| 96 | + // but also give them time to become idle. |
| 97 | + std::vector<bthread_t> tids; |
| 98 | + for (int i = 0; i < concurrency * 2; ++i) { |
| 99 | + bthread_t tid; |
| 100 | + bthread_start_background(&tid, nullptr, dummy_task, nullptr); |
| 101 | + tids.push_back(tid); |
| 102 | + } |
| 103 | + |
| 104 | + // 4. Wait for all tasks to complete |
| 105 | + for (bthread_t tid : tids) { |
| 106 | + bthread_join(tid, nullptr); |
| 107 | + } |
| 108 | + |
| 109 | + // 5. Sleep a bit to ensure all workers have had a chance to hit the idle loop |
| 110 | + usleep(50 * 1000); // 50ms |
| 111 | + |
| 112 | + // 6. Verify results |
| 113 | + std::lock_guard<std::mutex> lock(stats_mutex); |
| 114 | + LOG(INFO) << "Observed " << observed_worker_ids.size() << " unique worker contexts."; |
| 115 | + |
| 116 | + // We expect at least one worker to have initialized its context. |
| 117 | + // In a highly concurrent test environment, usually most workers will initialize. |
| 118 | + ASSERT_GT(observed_worker_ids.size(), 0); |
| 119 | + |
| 120 | + // Check that we saw different IDs if concurrency > 1 (though not strictly guaranteed |
| 121 | + // that ALL workers will run if the OS scheduler is quirky, but >1 is highly likely) |
| 122 | + if (concurrency > 1) { |
| 123 | + EXPECT_GT(observed_worker_ids.size(), 1); |
| 124 | + } |
| 125 | +} |
| 126 | + |
| 127 | +} // namespace |
0 commit comments