Skip to content

Commit a0e703d

Browse files
committed
Java: MultiDataSource 新增支持 AI 向量数据库 Milvus
1 parent 83640a9 commit a0e703d

File tree

4 files changed

+154
-29
lines changed

4 files changed

+154
-29
lines changed
Binary file not shown.

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@
7373
<artifactId>apijson-router</artifactId>
7474
<version>1.8.0</version>
7575
</dependency>
76+
<dependency>
77+
<groupId>com.github.APIJSON</groupId>
78+
<artifactId>apijson-mongodb</artifactId>
79+
<version>1.0.0</version>
80+
</dependency>
7681
<!-- 可使用 libs 目录的 apijson-orm.jar, apijson-framework.jar, apijson-column.jar 来替代,两种方式二选一 >>>>>>>>>> -->
7782

7883
<!-- 需要用的数据库 JDBC 驱动 -->
@@ -139,6 +144,17 @@
139144
<!-- Oracle, SQLServer 等其它数据库的 JDBC 驱动,可以在这里加上 Maven 依赖或 libs 目录放 Jar 包并依赖 -->
140145
<!-- 数据库 JDBC 驱动 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -->
141146

147+
<dependency>
148+
<groupId>org.datayoo.moql</groupId>
149+
<artifactId>moql-querier-milvus</artifactId>
150+
<version>1.0.0</version>
151+
<exclusions>
152+
<exclusion>
153+
<groupId>org.apache.logging.log4j</groupId>
154+
<artifactId>log4j-slf4j-impl</artifactId>
155+
</exclusion>
156+
</exclusions>
157+
</dependency>
142158

143159
<!-- APIJSONBoot 需要用的 SpringBoot 框架,1.4.0 以上 -->
144160
<dependency>

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/src/main/java/apijson/demo/DemoSQLConfig.java

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,26 +48,54 @@ public DemoSQLConfig(RequestMethod method, String table) {
4848
}
4949

5050
// 支持 NoSQL 数据库 MongoDB,APIJSON 6.4.0- 版本需要手动添加相关代码 <<<<<<<<<<<<<<<<<<<<<<<<<<<<<
51-
// public static final String DATABASE_MONGODB = "MONGODB";
52-
// @Override
53-
// public boolean isPrepared() {
54-
// return super.isPrepared() && ! isMongoDB(); // MongoDB JDBC 还不支持预编译
55-
// }
56-
//
57-
// public boolean isMongoDB() {
58-
// return DATABASE_MONGODB.equals(getDatabase());
59-
// }
51+
public static final String DATABASE_MONGODB = "MONGODB";
52+
public static final String DATABASE_MILVUS = "MILVUS";
6053

61-
// MongoDB 同时支持 `tbl` 反引号 和 "col" 双引号
62-
// @Override
63-
// public String getQuote() {
64-
// return "MONGODB".equals(getDatabase()) ? "`" : super.getQuote();
65-
// }
54+
@Override
55+
public boolean isPrepared() {
56+
return super.isPrepared() && ! isMongoDB(); // MongoDB JDBC 还不支持预编译
57+
}
6658

67-
static {
68-
// DATABASE_LIST.add(DATABASE_MONGODB);
59+
public boolean isMongoDB() {
60+
return DATABASE_MONGODB.equals(getDatabase());
6961

70-
// 支持 NoSQL 数据库 MongoDB,APIJSON 6.4.0- 版本需要手动添加相关代码 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
62+
}
63+
public boolean isMilvus() {
64+
return DATABASE_MILVUS.equals(getDatabase());
65+
}
66+
67+
// MongoDB 同时支持 `tbl` 反引号 和 "col" 双引号
68+
@Override
69+
public String getQuote() {
70+
return isMilvus() ? "`" : super.getQuote();
71+
}
72+
73+
@Override
74+
public String getLimitString() {
75+
if (DATABASE_MILVUS.equals(getDatabase()) && RequestMethod.isGetMethod(getMethod(), true)) {
76+
int count = getCount();
77+
int offset = getOffset(getPage(), count);
78+
return " LIMIT " + offset + ", " + count;
79+
}
80+
return super.getLimitString();
81+
}
82+
83+
static {
84+
DATABASE_LIST.add(DATABASE_MONGODB);
85+
DATABASE_LIST.add(DATABASE_MILVUS);
86+
87+
SQL_FUNCTION_MAP.put("vMatch", "");
88+
SQL_FUNCTION_MAP.put("consistencyLevel", "");
89+
SQL_FUNCTION_MAP.put("partitionBy", "");
90+
SQL_FUNCTION_MAP.put("gracefulTime", "");
91+
SQL_FUNCTION_MAP.put("guaranteeTimestamp", "");
92+
SQL_FUNCTION_MAP.put("roundDecimal", "");
93+
SQL_FUNCTION_MAP.put("travelTimestamp", "");
94+
SQL_FUNCTION_MAP.put("nProbe", "");
95+
SQL_FUNCTION_MAP.put("ef", "");
96+
SQL_FUNCTION_MAP.put("searchK", "");
97+
98+
// 支持 NoSQL 数据库 MongoDB,APIJSON 6.4.0- 版本需要手动添加相关代码 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
7199

72100
DEFAULT_DATABASE = DATABASE_MYSQL; //TODO 默认数据库类型,改成你自己的。TiDB, MariaDB, OceanBase 这类兼容 MySQL 的可当做 MySQL 使用
73101
DEFAULT_SCHEMA = "sys"; // ""apijson"; //TODO 默认数据库名/模式,改成你自己的,默认情况是 MySQL: sys, PostgreSQL: sys, SQL Server: dbo, Oracle:
@@ -159,6 +187,9 @@ public String getDBVersion() {
159187
if (isTDengine()) {
160188
return "2.6.0.8"; //TODO 改成你自己的
161189
}
190+
if (isMilvus()) {
191+
return "2.3.4"; //TODO 改成你自己的
192+
}
162193
if (isMongoDB()) {
163194
return "6.0.12"; //TODO 改成你自己的
164195
}
@@ -206,7 +237,10 @@ public String getDBUri() {
206237
return "jdbc:TAOS-RS://localhost:6041"; //TODO 改成你自己的
207238
}
208239
if (isInfluxDB()) {
209-
return "http://localhost:8086";
240+
return "http://203.189.6.3:8086";
241+
}
242+
if (isMilvus()) {
243+
return "http://localhost:19530";
210244
}
211245
if (isMongoDB()) {
212246
return "jdbc:mongodb://atlas-sql-6593c65c296c5865121e6ebe-xxskv.a.query.mongodb.net/myVirtualDatabase?ssl=true&authSource=admin";
@@ -270,6 +304,9 @@ public String getDBAccount() {
270304
return "root"; //TODO 改成你自己的
271305
}
272306
if (isInfluxDB()) {
307+
return "iotos";
308+
}
309+
if (isMilvus()) {
273310
return "root";
274311
}
275312
if (isMongoDB()) {
@@ -318,6 +355,9 @@ public String getDBPassword() {
318355
if (isInfluxDB()) {
319356
return "apijson@123"; //TODO 改成你自己的
320357
}
358+
if (isMilvus()) {
359+
return "apijson"; //TODO 改成你自己的
360+
}
321361
if (isMongoDB()) {
322362
return "apijson"; //TODO 改成你自己的
323363
}

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/src/main/java/apijson/demo/DemoSQLExecutor.java

Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,18 @@
1515
package apijson.demo;
1616

1717
import apijson.*;
18-
import apijson.orm.AbstractSQLConfig;
18+
import apijson.mongodb.MongoUtil;
1919
import com.alibaba.druid.pool.DruidDataSource;
20-
import com.alibaba.fastjson.JSONArray;
2120
import com.alibaba.fastjson.JSONObject;
2221
import com.datastax.oss.driver.api.core.CqlSession;
23-
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
22+
2423
import java.sql.ResultSet;
2524
import com.datastax.oss.driver.api.core.cql.Row;
2625
//import com.vesoft.nebula.jdbc.impl.NebulaDriver;
27-
import com.zaxxer.hikari.HikariDataSource;
26+
//import com.zaxxer.hikari.HikariDataSource;
2827

2928
import java.io.Serializable;
30-
import java.net.URI;
3129
import java.net.URL;
32-
import java.nio.file.Paths;
3330
import java.sql.Connection;
3431
import java.sql.ResultSetMetaData;
3532
import java.sql.SQLException;
@@ -41,6 +38,12 @@
4138
import apijson.boot.DemoApplication;
4239
import apijson.framework.APIJSONSQLExecutor;
4340
import apijson.orm.SQLConfig;
41+
import io.milvus.client.MilvusServiceClient;
42+
import io.milvus.param.ConnectParam;
43+
import org.datayoo.moql.ColumnDefinition;
44+
import org.datayoo.moql.RecordSet;
45+
import org.datayoo.moql.RecordSetDefinition;
46+
import org.datayoo.moql.querier.milvus.MilvusQuerier;
4447
import org.influxdb.BatchOptions;
4548
import org.influxdb.InfluxDB;
4649
import org.influxdb.InfluxDBFactory;
@@ -52,6 +55,7 @@
5255
import org.springframework.data.redis.serializer.GenericToStringSerializer;
5356
import org.springframework.data.redis.serializer.StringRedisSerializer;
5457

58+
import static apijson.demo.DemoSQLConfig.DATABASE_MILVUS;
5559
import static apijson.framework.APIJSONConstant.PRIVACY_;
5660
import static apijson.framework.APIJSONConstant.USER_;
5761

@@ -162,10 +166,10 @@ public Connection getConnection(SQLConfig<Long> config) throws Exception {
162166
try {
163167
DataSource ds;
164168
switch (datasource) {
165-
case "HIKARICP":
166-
ds = DemoApplication.getApplicationContext().getBean(HikariDataSource.class);
167-
// 另一种方式是 DemoDataSourceConfig 初始化获取到 DataSource 后给静态变量 DATA_SOURCE_HIKARICP 赋值: ds = DemoDataSourceConfig.DATA_SOURCE_HIKARICP.getConnection();
168-
break;
169+
// case "HIKARICP":
170+
// ds = DemoApplication.getApplicationContext().getBean(HikariDataSource.class);
171+
// // 另一种方式是 DemoDataSourceConfig 初始化获取到 DataSource 后给静态变量 DATA_SOURCE_HIKARICP 赋值: ds = DemoDataSourceConfig.DATA_SOURCE_HIKARICP.getConnection();
172+
// break;
169173
default:
170174
Map<String, DruidDataSource> dsMap = DemoApplication.getApplicationContext().getBeansOfType(DruidDataSource.class);
171175
// 另一种方式是 DemoDataSourceConfig 初始化获取到 DataSource 后给静态变量 DATA_SOURCE_DRUID 赋值: ds = DemoDataSourceConfig.DATA_SOURCE_DRUID.getConnection();
@@ -199,9 +203,68 @@ public Connection getConnection(SQLConfig<Long> config) throws Exception {
199203
return super.getConnection(config);
200204
}
201205

202-
203206
@Override
204207
public JSONObject execute(@NotNull SQLConfig<Long> config, boolean unknownType) throws Exception {
208+
if (DATABASE_MILVUS.equals(config.getDatabase())) { // 3.0.0 及以下要这样连接
209+
String uri = config.getDBUri();
210+
//
211+
// int start = uri.indexOf("://");
212+
// String prefix = uri.substring(0, start);
213+
//
214+
// uri = uri.substring(start + "://".length());
215+
// int end = uri.indexOf(":");
216+
// int port = Integer.parseInt(uri.substring(end + 1));
217+
// String host = uri.substring(0, end);
218+
219+
// 构建Milvus客户端
220+
MilvusServiceClient milvusClient = new MilvusServiceClient(
221+
ConnectParam.newBuilder().withUri(uri).build()
222+
);
223+
224+
// 使用Milvus客户端创建Milvus查询器
225+
MilvusQuerier milvusQuerier = new MilvusQuerier(milvusClient);
226+
227+
/*
228+
查询语句含义:从book集合中筛选数据,并返回col1,col2两个列。筛选条件为,当数据的col3列值为4,col4列值为'a','b','c'中的任意一
229+
个,且vec向量字段采用'L2'类型匹配,值为'[[1.0, 2.0, 3.0],[1.1,2.1,3.1]]'。另外,采用强一致性级别在10个单元内进行检索,取第11到第15,5条命中记录。
230+
*/
231+
String sql = config.getSQL(false); //
232+
// String sql = "select id,userId,momentId,content,date from Comment where vMatch(vec, 'L2', '[[1]]') and consistencyLevel('STRONG') limit 1,1";
233+
// 使用查询器执行sql语句,并返回查询结果
234+
RecordSet recordSet = milvusQuerier.query(sql);
235+
236+
// int count = recordSet == null ? 0 : recordSet.getRecordsCount();
237+
List<Map<String, Object>> list = recordSet == null ? null : recordSet.getRecordsAsMaps();
238+
// RecordSetDefinition def = recordSet.getRecordSetDefinition();
239+
// List<ColumnDefinition> cols = def.getColumns();
240+
241+
// List<Object[]> list = count <= 0 ? null : recordSet.getRecords();
242+
243+
if (list == null || list.isEmpty()) {
244+
return new JSONObject(true);
245+
}
246+
247+
List<JSONObject> nl = new ArrayList<>(list.size());
248+
for (int i = 0; i < list.size(); i++) {
249+
Map<String, Object> map = list.get(i);
250+
251+
JSONObject obj = new JSONObject(map == null ? new HashMap<>() : map);
252+
// obj.put(col.getValue(), os[j]);
253+
// for (int j = 0; j < os.length; j++) {
254+
// ColumnDefinition col = cols.get(j);
255+
// obj.put(col.getValue(), os[j]);
256+
// }
257+
nl.add(obj);
258+
}
259+
260+
JSONObject result = nl.get(0); // JSON.parseObject(list.get(0));
261+
if (nl.size() > 1) {
262+
result.put(KEY_RAW_LIST, nl);
263+
}
264+
265+
return result;
266+
}
267+
205268
boolean isCassandra = config.isCassandra();
206269
boolean isInfluxDB = config.isInfluxDB();
207270

@@ -383,4 +446,10 @@ public JSONObject execute(@NotNull SQLConfig<Long> config, boolean unknownType)
383446
// return StringUtil.firstCase(JSONResponse.formatUnderline(key, true), false);
384447
// }
385448

449+
450+
@Override
451+
protected Object getValue(SQLConfig<Long> config, ResultSet rs, ResultSetMetaData rsmd, int tablePosition, JSONObject table, int columnIndex, String lable, Map<String, JSONObject> childMap) throws Exception {
452+
Object v = super.getValue(config, rs, rsmd, tablePosition, table, columnIndex, lable, childMap);
453+
return MongoUtil.getValue(v);
454+
}
386455
}

0 commit comments

Comments
 (0)