Skip to content

Commit ccd4178

Browse files
authored
add Timeplus APIs (#18)
1 parent d13f648 commit ccd4178

30 files changed

Lines changed: 1383 additions & 31 deletions

.clang-format

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ Language: Cpp
22
BasedOnStyle: Google
33

44
AccessModifierOffset: -4
5-
AlignConsecutiveAssignments: true
5+
AlignConsecutiveAssignments: false
66
AllowShortFunctionsOnASingleLine: InlineOnly
77
ColumnLimit: 140
88
DerivePointerAlignment: false
99
FixNamespaceComments: true
10+
IncludeBlocks: Preserve
1011
IndentWidth: 4
1112
PointerAlignment: Left

.github/workflows/timeplus_cpp_ci.yml

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ jobs:
2121
fail-fast: false
2222
matrix:
2323
os: [ubuntu-20.04]
24-
compiler: [clang-10, clang-18, gcc-7, gcc-8, gcc-9]
24+
compiler: [clang-10, clang-18, gcc-8, gcc-9]
2525
ssl: [ssl_ON, ssl_OFF]
2626
dependencies: [dependencies_BUILT_IN]
27-
timeplusd: [2.3.22]
27+
timeplusd: [2.3.23]
2828

2929
include:
3030
- compiler: clang-10
@@ -37,11 +37,6 @@ jobs:
3737
C_COMPILER: clang-18
3838
CXX_COMPILER: clang++-18
3939

40-
- compiler: gcc-7
41-
COMPILER_INSTALL: gcc-7 g++-7
42-
C_COMPILER: gcc-7
43-
CXX_COMPILER: g++-7
44-
4540
- compiler: gcc-8
4641
COMPILER_INSTALL: gcc-8 g++-8
4742
C_COMPILER: gcc-8
@@ -58,7 +53,7 @@ jobs:
5853
- dependencies: dependencies_SYSTEM
5954
compiler: compiler_SYSTEM
6055
os: ubuntu-22.04
61-
timeplusd: 2.3.22
56+
timeplusd: 2.3.23
6257
COMPILER_INSTALL: gcc g++
6358
C_COMPILER: gcc
6459
CXX_COMPILER: g++

CMakeLists.txt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,24 @@ OPTION (WITH_SYSTEM_ABSEIL "Use system ABSEIL" OFF)
1616
OPTION (WITH_SYSTEM_LZ4 "Use system LZ4" OFF)
1717
OPTION (WITH_SYSTEM_CITYHASH "Use system cityhash" OFF)
1818
OPTION (WITH_SYSTEM_ZSTD "Use system ZSTD" OFF)
19-
OPTION (DEBUG_DEPENDENCIES "Print debug info about dependencies duting build" ON)
19+
OPTION (DEBUG_DEPENDENCIES "Print debug info about dependencies during build" ON)
2020
OPTION (CHECK_VERSION "Check that version number corresponds to git tag, usefull in CI/CD to validate that new version published on GitHub has same version in sources" OFF)
2121
OPTION (BUILD_GTEST "Build Google Test" OFF)
2222
OPTION (ENABLE_GEOMETRIC_TEST "Enable geometric test" OFF)
23+
OPTION (ENABLE_TRACE_TIMEPLUS_CPP "Enable tracing log" OFF)
2324

2425
PROJECT (TIMEPLUS-CLIENT
2526
VERSION "${TIMEPLUS_CPP_VERSION}"
2627
DESCRIPTION "Timeplus C++ client library"
2728
)
2829

30+
set(CMAKE_EXPORT_COMPILE_COMMANDS 1)
31+
32+
IF (ENABLE_TRACE_TIMEPLUS_CPP)
33+
ADD_DEFINITIONS(-DTRACE_TIMEPLUS_CPP)
34+
ENDIF()
35+
36+
2937
USE_CXX17 ()
3038
USE_OPENSSL ()
3139

@@ -127,6 +135,8 @@ IF (BUILD_TESTS)
127135
INCLUDE_DIRECTORIES (contrib/gtest/include contrib/gtest)
128136
SUBDIRS (
129137
tests/simple
138+
tests/insert
139+
tests/insert-async
130140
)
131141
ENDIF (BUILD_TESTS)
132142

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ Optional dependencies:
3939
```sh
4040
$ mkdir build .
4141
$ cd build
42-
$ cmake -D CMAKE_BUILD_TYPE=Release ..
42+
$ cmake -D CMAKE_BUILD_TYPE=Release -D BUILD_TESTS=ON -D BUILD_EXAMPLES=ON -D BUILD_GTEST=ON ..
4343
$ make
4444
```
4545

examples/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,6 @@ add_executable(timeplus-client
33
main.cpp)
44

55
target_link_libraries(timeplus-client PRIVATE timeplus-cpp-lib)
6+
7+
add_executable(insert-examples insert_examples.cpp)
8+
target_link_libraries(insert-examples PRIVATE timeplus-cpp-lib)

examples/insert_examples.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#include <timeplus/timeplus.h>
2+
3+
#include <iostream>
4+
5+
using namespace timeplus;
6+
7+
/// Stream to insert is created with DDL:
8+
/// `CREATE STREAM insert_examples(i uint64, v string)`
9+
const std::string TABLE_NAME = "insert_examples";
10+
11+
int main() {
12+
TimeplusConfig config;
13+
config.client_options.endpoints.push_back({"localhost", 8463});
14+
config.max_connections = 3;
15+
config.max_retries = 10;
16+
config.wait_time_before_retry_ms = 1000;
17+
config.task_executors = 1;
18+
19+
Timeplus tp{std::move(config)};
20+
21+
auto block = std::make_shared<Block>();
22+
23+
auto col_i = std::make_shared<ColumnUInt64>();
24+
col_i->Append(5);
25+
col_i->Append(7);
26+
block->AppendColumn("i", col_i);
27+
28+
auto col_v = std::make_shared<ColumnString>();
29+
col_v->Append("five");
30+
col_v->Append("seven");
31+
block->AppendColumn("v", col_v);
32+
33+
/// Use synchronous insert API.
34+
auto insert_result = tp.Insert(TABLE_NAME, block, /*idempotent_id=*/"block-1");
35+
if (insert_result.ok()) {
36+
std::cout << "Synchronous insert suceeded." << std::endl;
37+
} else {
38+
std::cout << "Synchronous insert failed: code=" << insert_result.err_code << " msg=" << insert_result.err_msg << std::endl;
39+
}
40+
41+
/// Use asynchrounous insert API.
42+
std::atomic<bool> done = false;
43+
tp.InsertAsync(TABLE_NAME, block, /*idempotent_id=*/"block-2", [&done](const BaseResult& result) {
44+
const auto& async_insert_result = static_cast<const InsertResult&>(result);
45+
if (async_insert_result.ok()) {
46+
std::cout << "Asynchronous insert suceeded." << std::endl;
47+
} else {
48+
std::cout << "Asynchronous insert failed: code=" << async_insert_result.err_code << " msg=" << async_insert_result.err_msg
49+
<< std::endl;
50+
}
51+
done = true;
52+
});
53+
54+
while (!done) {
55+
}
56+
57+
return 0;
58+
}

tests/insert-async/CMakeLists.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
ADD_EXECUTABLE (insert-async-test
2+
main.cpp
3+
)
4+
5+
TARGET_LINK_LIBRARIES (insert-async-test
6+
timeplus-cpp-lib
7+
)
8+
9+
IF (MSVC)
10+
TARGET_LINK_LIBRARIES (insert-async-test Crypt32)
11+
ENDIF()

tests/insert-async/main.cpp

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
#include <timeplus/blocking_queue.h>
2+
#include <timeplus/timeplus.h>
3+
4+
#include <atomic>
5+
#include <chrono>
6+
#include <iomanip>
7+
#include <iostream>
8+
#include <thread>
9+
10+
using namespace timeplus;
11+
12+
const size_t INSERT_BLOCKS = 100'000;
13+
const size_t BLOCKS_PER_BATCH = 1000;
14+
15+
const std::vector<std::pair<std::string, uint16_t>> HOST_PORTS = {
16+
/// Single instance
17+
{"localhost", 8463},
18+
/// Cluster nodes
19+
{"localhost", 18463},
20+
{"localhost", 28463},
21+
{"localhost", 38463},
22+
};
23+
24+
void prepareTable() {
25+
ClientOptions options;
26+
for (const auto& [host, port] : HOST_PORTS) {
27+
options.endpoints.push_back({host, port});
28+
}
29+
30+
Client client{options};
31+
client.Execute("DROP STREAM IF EXISTS insert_async_test;");
32+
client.Execute("CREATE STREAM insert_async_test (i uint64, s string);");
33+
}
34+
35+
auto timestamp() {
36+
auto now = std::chrono::system_clock::now();
37+
std::time_t now_time = std::chrono::system_clock::to_time_t(now);
38+
std::tm* local_time = std::localtime(&now_time);
39+
return std::put_time(local_time, "%Y-%m-%d %H:%M:%S");
40+
}
41+
42+
int main() {
43+
prepareTable();
44+
45+
TimeplusConfig config;
46+
for (const auto& [host, port] : HOST_PORTS) {
47+
config.client_options.endpoints.push_back({host, port});
48+
}
49+
config.max_connections = 4;
50+
config.max_retries = 5;
51+
config.task_executors = 4;
52+
config.task_queue_capacity = BLOCKS_PER_BATCH; /// use large input queue to avoid deadlock on retry failure
53+
54+
Timeplus tp{std::move(config)};
55+
56+
auto block = std::make_shared<Block>();
57+
auto col_i = std::make_shared<ColumnUInt64>(std::vector<uint64_t>{5, 7, 4, 8});
58+
auto col_s = std::make_shared<ColumnString>(
59+
std::vector<std::string>{"Before my bed, the moon is bright,", "I think that it is frost on the ground.",
60+
"I raise my head to gaze at the bright moon,", "And lower it to think of my hometown."});
61+
block->AppendColumn("i", col_i);
62+
block->AppendColumn("s", col_s);
63+
64+
/// Queue to store failed inserts which need to be resent.
65+
BlockingQueue<std::pair<size_t, BlockPtr>> insert_failure(BLOCKS_PER_BATCH);
66+
std::atomic<size_t> insert_success_count{0};
67+
68+
auto handle_insert_result = [&insert_failure, &insert_success_count](size_t block_id, const InsertResult& result) {
69+
if (result.ok()) {
70+
insert_success_count.fetch_add(1);
71+
} else {
72+
std::cout << "[" << timestamp() << "]\t Failed to insert block: insert_id=" << block_id << " err=" << result.err_msg
73+
<< std::endl;
74+
insert_failure.emplace(block_id, result.block);
75+
}
76+
};
77+
78+
auto async_insert_block = [&tp, &handle_insert_result](size_t block_id, BlockPtr block) {
79+
tp.InsertAsync(/*table_name=*/"insert_async_test", block, [block_id, &handle_insert_result](const BaseResult& result) {
80+
const auto& insert_result = static_cast<const InsertResult&>(result);
81+
handle_insert_result(block_id, insert_result);
82+
});
83+
};
84+
85+
auto start_time = std::chrono::high_resolution_clock::now();
86+
auto last_time = start_time;
87+
for (size_t batch = 0; batch < INSERT_BLOCKS / BLOCKS_PER_BATCH; ++batch) {
88+
insert_success_count = 0;
89+
/// Insert blocks asynchronously.
90+
for (size_t i = 0; i < BLOCKS_PER_BATCH; ++i) {
91+
async_insert_block(i, block);
92+
}
93+
94+
/// Wait for all blocks of the batch are inserted.
95+
while (insert_success_count.load() != BLOCKS_PER_BATCH) {
96+
if (!insert_failure.empty()) {
97+
/// Re-insert the failed blocks
98+
auto blocks = insert_failure.drain();
99+
for (auto &[i, b] : blocks) {
100+
async_insert_block(i, b);
101+
}
102+
}
103+
104+
std::this_thread::yield();
105+
}
106+
107+
/// Print insert statistics of the batch.
108+
auto current_time = std::chrono::high_resolution_clock::now();
109+
std::chrono::duration<double> elapsed = current_time - last_time;
110+
last_time = current_time;
111+
std::cout << "[" << timestamp() << "]\t" << (batch + 1) * BLOCKS_PER_BATCH << " blocks inserted\telapsed = " << elapsed.count()
112+
<< " sec\teps = " << static_cast<double>(BLOCKS_PER_BATCH * block->GetRowCount()) / elapsed.count() << std::endl;
113+
}
114+
115+
/// Print summary.
116+
auto current_time = std::chrono::high_resolution_clock::now();
117+
std::chrono::duration<double> elapsed = current_time - start_time;
118+
std::cout << "\nInsert Done. Total Events = " << INSERT_BLOCKS * block->GetRowCount() << " Total Time = " << elapsed.count() << " sec"
119+
<< std::endl;
120+
121+
return 0;
122+
}

tests/insert/CMakeLists.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
ADD_EXECUTABLE (insert-test
2+
main.cpp
3+
)
4+
5+
TARGET_LINK_LIBRARIES (insert-test
6+
timeplus-cpp-lib
7+
)
8+
9+
IF (MSVC)
10+
TARGET_LINK_LIBRARIES (insert-test Crypt32)
11+
ENDIF()

tests/insert/main.cpp

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#include <timeplus/timeplus.h>
2+
3+
#include <chrono>
4+
#include <iomanip>
5+
#include <iostream>
6+
7+
using namespace timeplus;
8+
9+
const size_t INSERT_BLOCKS = 100'000;
10+
11+
const std::vector<std::pair<std::string, uint16_t>> HOST_PORTS = {
12+
/// Single instance
13+
{"localhost", 8463},
14+
/// Cluster nodes
15+
{"localhost", 18463},
16+
{"localhost", 28463},
17+
{"localhost", 38463},
18+
};
19+
20+
void prepareTable() {
21+
ClientOptions options;
22+
for (const auto& [host, port] : HOST_PORTS) {
23+
options.endpoints.push_back({host, port});
24+
}
25+
26+
Client client{options};
27+
client.Execute("DROP STREAM IF EXISTS insert_test;");
28+
client.Execute("CREATE STREAM insert_test (i uint64, s string);");
29+
}
30+
31+
auto timestamp() {
32+
auto now = std::chrono::system_clock::now();
33+
std::time_t now_time = std::chrono::system_clock::to_time_t(now);
34+
std::tm* local_time = std::localtime(&now_time);
35+
return std::put_time(local_time, "%Y-%m-%d %H:%M:%S");
36+
}
37+
38+
int main() {
39+
prepareTable();
40+
41+
TimeplusConfig config;
42+
for (const auto& [host, port] : HOST_PORTS) {
43+
config.client_options.endpoints.push_back({host, port});
44+
}
45+
config.max_connections = 1;
46+
config.max_retries = 5;
47+
config.task_executors = 0;
48+
Timeplus tp{std::move(config)};
49+
50+
auto block = std::make_shared<Block>();
51+
auto col_i = std::make_shared<ColumnUInt64>(std::vector<uint64_t>{5, 7, 4, 8});
52+
auto col_s = std::make_shared<ColumnString>(
53+
std::vector<std::string>{"Before my bed, the moon is bright,", "I think that it is frost on the ground.",
54+
"I raise my head to gaze at the bright moon,", "And lower it to think of my hometown."});
55+
block->AppendColumn("i", col_i);
56+
block->AppendColumn("s", col_s);
57+
58+
auto start_time = std::chrono::high_resolution_clock::now();
59+
auto last_time = start_time;
60+
for (size_t i = 1; i <= INSERT_BLOCKS; ++i) {
61+
while (true) {
62+
try {
63+
tp.Insert("insert_test", block);
64+
break;
65+
} catch (const std::exception& ex) {
66+
std::cout << timestamp() << "\t Failed to insert block " << i << " : " << ex.what() << std::endl;
67+
}
68+
}
69+
70+
if (i % 1000 == 0) {
71+
auto current_time = std::chrono::high_resolution_clock::now();
72+
std::chrono::duration<double> elapsed = current_time - last_time;
73+
last_time = current_time;
74+
std::cout << "[" << timestamp() << "]\t" << i << " blocks inserted\telapsed = " << elapsed.count()
75+
<< " sec\teps = " << 1000.0 * block->GetRowCount() / elapsed.count() << std::endl;
76+
}
77+
}
78+
79+
auto current_time = std::chrono::high_resolution_clock::now();
80+
std::chrono::duration<double> elapsed = current_time - start_time;
81+
std::cout << "\nInsert Done. Total Events = " << INSERT_BLOCKS * block->GetRowCount() << " Total Time = " << elapsed.count() << " sec"
82+
<< std::endl;
83+
84+
return 0;
85+
}

0 commit comments

Comments
 (0)