Skip to content

Commit 5f73b7f

Browse files
committed
Java: MultiDataSource 依赖 apijson-cassandra 来简化代码
1 parent 0f09871 commit 5f73b7f

File tree

4 files changed

+72
-74
lines changed

4 files changed

+72
-74
lines changed
Binary file not shown.

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@
8888
<artifactId>apijson-mongodb</artifactId>
8989
<version>1.0.0</version>
9090
</dependency>
91+
<dependency>
92+
<groupId>com.github.APIJSON</groupId>
93+
<artifactId>apijson-cassandra</artifactId>
94+
<version>0.9.0</version>
95+
</dependency>
9196
<!-- 可使用 libs 目录的 apijson-orm.jar, apijson-framework.jar, apijson-column.jar 来替代,两种方式二选一 >>>>>>>>>> -->
9297

9398
<!-- 需要用的数据库 JDBC 驱动 -->

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/src/main/java/apijson/demo/DemoSQLConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public String getLimitString() {
9191
static {
9292
DATABASE_LIST.add(DATABASE_MONGODB);
9393
DATABASE_LIST.add(DATABASE_MILVUS);
94+
DATABASE_LIST.add(DATABASE_CASSANDRA);
9495

9596
SQL_FUNCTION_MAP.put("vMatch", "");
9697
SQL_FUNCTION_MAP.put("consistencyLevel", "");
@@ -201,6 +202,9 @@ public String getDBVersion() {
201202
if (isMongoDB()) {
202203
return "6.0.12"; //TODO 改成你自己的
203204
}
205+
if (isCassandra()) {
206+
return "4.0.1"; //TODO 改成你自己的
207+
}
204208

205209
return null;
206210
}
@@ -254,6 +258,9 @@ public String getDBUri() {
254258
if (isMongoDB()) {
255259
return "jdbc:mongodb://atlas-sql-6593c65c296c5865121e6ebe-xxskv.a.query.mongodb.net/myVirtualDatabase?ssl=true&authSource=admin";
256260
}
261+
if (isCassandra()) {
262+
return "http://localhost:7001";
263+
}
257264

258265
return null;
259266
}
@@ -303,6 +310,9 @@ public String getDBAccount() {
303310
if (isMongoDB()) {
304311
return "root"; //TODO 改成你自己的
305312
}
313+
if (isCassandra()) {
314+
return "root"; //TODO 改成你自己的
315+
}
306316

307317
return null;
308318
}
@@ -352,6 +362,9 @@ public String getDBPassword() {
352362
if (isMongoDB()) {
353363
return "apijson"; //TODO 改成你自己的
354364
}
365+
if (isCassandra()) {
366+
return "apijson"; //TODO 改成你自己的
367+
}
355368

356369
return null;
357370
}

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/src/main/java/apijson/demo/DemoSQLExecutor.java

Lines changed: 54 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -14,50 +14,35 @@
1414

1515
package apijson.demo;
1616

17-
import apijson.*;
17+
import apijson.JSON;
18+
import apijson.Log;
19+
import apijson.NotNull;
20+
import apijson.RequestMethod;
21+
import apijson.boot.DemoApplication;
22+
import apijson.cassandra.CassandraUtil;
23+
import apijson.framework.APIJSONSQLExecutor;
1824
import apijson.influxdb.InfluxDBUtil;
1925
import apijson.milvus.MilvusUtil;
2026
import apijson.mongodb.MongoUtil;
27+
import apijson.orm.SQLConfig;
2128
import com.alibaba.druid.pool.DruidDataSource;
2229
import com.alibaba.fastjson.JSONObject;
23-
import com.datastax.oss.driver.api.core.CqlSession;
24-
25-
import java.sql.ResultSet;
26-
import com.datastax.oss.driver.api.core.cql.Row;
27-
//import com.vesoft.nebula.jdbc.impl.NebulaDriver;
28-
//import com.zaxxer.hikari.HikariDataSource;
29-
30-
import java.io.Serializable;
31-
import java.net.URL;
32-
import java.sql.Connection;
33-
import java.sql.ResultSetMetaData;
34-
import java.sql.SQLException;
35-
import java.util.*;
36-
import java.util.concurrent.TimeUnit;
37-
38-
import javax.sql.DataSource;
39-
40-
import apijson.boot.DemoApplication;
41-
import apijson.framework.APIJSONSQLExecutor;
42-
import apijson.orm.SQLConfig;
43-
import io.milvus.client.MilvusServiceClient;
44-
import io.milvus.param.ConnectParam;
45-
import org.datayoo.moql.ColumnDefinition;
46-
import org.datayoo.moql.RecordSet;
47-
import org.datayoo.moql.RecordSetDefinition;
48-
import org.datayoo.moql.querier.DataQuerier;
49-
import org.datayoo.moql.querier.milvus.MilvusQuerier;
50-
import org.influxdb.BatchOptions;
51-
import org.influxdb.InfluxDB;
52-
import org.influxdb.InfluxDBFactory;
53-
import org.influxdb.dto.Query;
54-
import org.influxdb.dto.QueryResult;
5530
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
5631
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
5732
import org.springframework.data.redis.core.RedisTemplate;
5833
import org.springframework.data.redis.serializer.GenericToStringSerializer;
5934
import org.springframework.data.redis.serializer.StringRedisSerializer;
6035

36+
import javax.sql.DataSource;
37+
import java.io.Serializable;
38+
import java.sql.Connection;
39+
import java.sql.ResultSet;
40+
import java.sql.ResultSetMetaData;
41+
import java.util.ArrayList;
42+
import java.util.List;
43+
import java.util.Map;
44+
import java.util.concurrent.TimeUnit;
45+
6146
import static apijson.demo.DemoSQLConfig.DATABASE_MILVUS;
6247
import static apijson.framework.APIJSONConstant.PRIVACY_;
6348
import static apijson.framework.APIJSONConstant.USER_;
@@ -215,16 +200,6 @@ public JSONObject execute(@NotNull SQLConfig<Long> config, boolean unknownType)
215200
if (isMilvus || isCassandra || isInfluxDB) {
216201
// TODO 把 execute 内与缓存无关只与数据库读写逻辑相关的代码抽取到 executeSQL 函数
217202
String sql = config.getSQL(false); // config.isPrepared());
218-
List<JSONObject> cache = getCache(sql, config);
219-
int position = config.getPosition();
220-
JSONObject result = getCacheItem(cache, position, config);
221-
if (result != null) {
222-
if (position == 0 && cache != null && cache.size() > 1) {
223-
result.put(KEY_RAW_LIST, cache);
224-
}
225-
return result;
226-
}
227-
228203
if (sql != null && config.getMethod() == null) {
229204
String trimmedSQL = sql.trim();
230205
String sqlPrefix = trimmedSQL.length() < 7 ? "" : trimmedSQL.substring(0, 7).toUpperCase();
@@ -239,45 +214,41 @@ else if (sqlPrefix.startsWith("DELETE ")) {
239214
}
240215
}
241216

242-
List<JSONObject> resultList = new ArrayList<>();
217+
boolean isWrite = ! RequestMethod.isQueryMethod(config.getMethod());
243218

244-
if (isMilvus) {
245-
return MilvusUtil.execute(config, sql, unknownType);
219+
List<JSONObject> cache = isWrite ? null : getCache(sql, config);
220+
int position = config.getPosition();
221+
JSONObject result = getCacheItem(cache, position, config);
222+
if (result != null) {
223+
if (position == 0 && cache != null && cache.size() > 1) {
224+
result.put(KEY_RAW_LIST, cache);
225+
}
226+
return result;
246227
}
247228

248-
if (isCassandra) {
249-
CqlSession session = CqlSession.builder()
250-
// .withCloudSecureConnectBundle(Paths.get("/path/to/secure-connect-database_name.zip"))
251-
.withCloudSecureConnectBundle(new URL(config.getDBUri()))
252-
.withAuthCredentials(config.getDBAccount(), config.getDBPassword())
253-
.withKeyspace(config.getSchema())
254-
.build();
255-
256-
// if (config.isPrepared()) {
257-
// PreparedStatement stt = session.prepare(sql);
258-
//
259-
// List<Object> pl = config.getPreparedValueList();
260-
// if (pl != null) {
261-
// for (Object o : pl) {
262-
// stt.bind(pl.toArray());
263-
// }
264-
// }
265-
// sql = stt.getQuery();
266-
// }
267-
268-
com.datastax.oss.driver.api.core.cql.ResultSet rs = session.execute(sql);
269-
270-
List<Row> list = rs.all();
271-
if (list == null || list.isEmpty()) {
272-
return new JSONObject(true);
229+
230+
List<JSONObject> resultList = new ArrayList<>();
231+
232+
if (isMilvus) {
233+
if (isWrite) {
234+
return MilvusUtil.executeUpdate(config, sql);
273235
}
274236

275-
for (int i = 0; i < list.size(); i++) {
276-
resultList.add(JSON.parseObject(list.get(i)));
237+
resultList = MilvusUtil.executeQuery(config, sql, unknownType);
238+
}
239+
else if (isCassandra) {
240+
if (isWrite) {
241+
return CassandraUtil.executeUpdate(config, sql);
277242
}
243+
244+
resultList = CassandraUtil.executeQuery(config, sql, unknownType);
278245
}
279246
else if (isInfluxDB) {
280-
return InfluxDBUtil.execute(config, sql, unknownType);
247+
if (isWrite) {
248+
return InfluxDBUtil.executeUpdate(config, sql);
249+
}
250+
251+
resultList = InfluxDBUtil.executeQuery(config, sql, unknownType);
281252
}
282253

283254
// TODO 把 execute 内与缓存无关只与数据库读写逻辑相关的代码抽取到 executeSQL 函数
@@ -294,6 +265,15 @@ else if (isInfluxDB) {
294265
return super.execute(config, unknownType);
295266
}
296267

268+
@Override
269+
public void close() {
270+
super.close();
271+
272+
MilvusUtil.closeAllClient();
273+
CassandraUtil.closeAllSession();
274+
InfluxDBUtil.closeAllClient();
275+
}
276+
297277
// 不需要隐藏字段这个功能时,取消注释来提升性能
298278
// @Override
299279
// protected boolean isHideColumn(SQLConfig<Long> config, java.sql.ResultSet rs, ResultSetMetaData rsmd, int tablePosition,

0 commit comments

Comments
 (0)