1515package apijson .demo ;
1616
1717import apijson .*;
18+ import apijson .milvus .MilvusUtil ;
1819import apijson .mongodb .MongoUtil ;
1920import com .alibaba .druid .pool .DruidDataSource ;
2021import com .alibaba .fastjson .JSONObject ;
4344import org .datayoo .moql .ColumnDefinition ;
4445import org .datayoo .moql .RecordSet ;
4546import org .datayoo .moql .RecordSetDefinition ;
47+ import org .datayoo .moql .querier .DataQuerier ;
4648import org .datayoo .moql .querier .milvus .MilvusQuerier ;
4749import org .influxdb .BatchOptions ;
4850import org .influxdb .InfluxDB ;
@@ -205,70 +207,12 @@ public Connection getConnection(SQLConfig<Long> config) throws Exception {
205207
206208 @ Override
207209 public JSONObject execute (@ NotNull SQLConfig <Long > config , boolean unknownType ) throws Exception {
208- if (DATABASE_MILVUS .equals (config .getDatabase ())) { // 3.0.0 及以下要这样连接
209- String uri = config .getDBUri ();
210- //
211- // int start = uri.indexOf("://");
212- // String prefix = uri.substring(0, start);
213- //
214- // uri = uri.substring(start + "://".length());
215- // int end = uri.indexOf(":");
216- // int port = Integer.parseInt(uri.substring(end + 1));
217- // String host = uri.substring(0, end);
218-
219- // 构建Milvus客户端
220- MilvusServiceClient milvusClient = new MilvusServiceClient (
221- ConnectParam .newBuilder ().withUri (uri ).build ()
222- );
223-
224- // 使用Milvus客户端创建Milvus查询器
225- MilvusQuerier milvusQuerier = new MilvusQuerier (milvusClient );
226-
227- /*
228- 查询语句含义:从book集合中筛选数据,并返回col1,col2两个列。筛选条件为,当数据的col3列值为4,col4列值为'a','b','c'中的任意一
229- 个,且vec向量字段采用'L2'类型匹配,值为'[[1.0, 2.0, 3.0],[1.1,2.1,3.1]]'。另外,采用强一致性级别在10个单元内进行检索,取第11到第15,5条命中记录。
230- */
231- String sql = config .getSQL (false ); //
232- // String sql = "select id,userId,momentId,content,date from Comment where vMatch(vec, 'L2', '[[1]]') and consistencyLevel('STRONG') limit 1,1";
233- // 使用查询器执行sql语句,并返回查询结果
234- RecordSet recordSet = milvusQuerier .query (sql );
235-
236- // int count = recordSet == null ? 0 : recordSet.getRecordsCount();
237- List <Map <String , Object >> list = recordSet == null ? null : recordSet .getRecordsAsMaps ();
238- // RecordSetDefinition def = recordSet.getRecordSetDefinition();
239- // List<ColumnDefinition> cols = def.getColumns();
240-
241- // List<Object[]> list = count <= 0 ? null : recordSet.getRecords();
242-
243- if (list == null || list .isEmpty ()) {
244- return new JSONObject (true );
245- }
246-
247- List <JSONObject > nl = new ArrayList <>(list .size ());
248- for (int i = 0 ; i < list .size (); i ++) {
249- Map <String , Object > map = list .get (i );
250-
251- JSONObject obj = new JSONObject (map == null ? new HashMap <>() : map );
252- // obj.put(col.getValue(), os[j]);
253- // for (int j = 0; j < os.length; j++) {
254- // ColumnDefinition col = cols.get(j);
255- // obj.put(col.getValue(), os[j]);
256- // }
257- nl .add (obj );
258- }
259-
260- JSONObject result = nl .get (0 ); // JSON.parseObject(list.get(0));
261- if (nl .size () > 1 ) {
262- result .put (KEY_RAW_LIST , nl );
263- }
264-
265- return result ;
266- }
267-
210+ boolean isMilvus = DATABASE_MILVUS .equals (config .getDatabase ()); // APIJSON 6.4.0+ 可用 config.isMilvus();
268211 boolean isCassandra = config .isCassandra ();
269212 boolean isInfluxDB = config .isInfluxDB ();
270213
271- if (isCassandra || isInfluxDB ) {
214+ if (isMilvus || isCassandra || isInfluxDB ) {
215+ // TODO 把 execute 内与缓存无关只与数据库读写逻辑相关的代码抽取到 executeSQL 函数
272216 String sql = config .getSQL (false ); // config.isPrepared());
273217 List <JSONObject > cache = getCache (sql , config );
274218 int position = config .getPosition ();
@@ -288,13 +232,18 @@ public JSONObject execute(@NotNull SQLConfig<Long> config, boolean unknownType)
288232 isWrite = sqlPrefix .startsWith ("INSERT " ) || sqlPrefix .startsWith ("UPDATE " ) || sqlPrefix .startsWith ("DELETE " );
289233 }
290234
235+ List <JSONObject > resultList = new ArrayList <>();
236+
237+ if (isMilvus ) {
238+ return MilvusUtil .execute (config , unknownType );
239+ }
291240
292241 if (isCassandra ) {
293242 CqlSession session = CqlSession .builder ()
294243// .withCloudSecureConnectBundle(Paths.get("/path/to/secure-connect-database_name.zip"))
295244 .withCloudSecureConnectBundle (new URL (config .getDBUri ()))
296245 .withAuthCredentials (config .getDBAccount (), config .getDBPassword ())
297- .withKeyspace (config .getSchema ())
246+ .withKeyspace (config .getSQLSchema ())
298247 .build ();
299248
300249 // if (config.isPrepared()) {
@@ -316,16 +265,11 @@ public JSONObject execute(@NotNull SQLConfig<Long> config, boolean unknownType)
316265 return new JSONObject (true );
317266 }
318267
319- result = JSON .parseObject (list .get (0 ));
320- if (list .size () > 1 ) {
321- result .put (KEY_RAW_LIST , list );
268+ for (int i = 0 ; i < list .size (); i ++) {
269+ resultList .add (JSON .parseObject (list .get (i )));
322270 }
323-
324- return result ;
325271 }
326-
327-
328- if (isInfluxDB ) {
272+ else if (isInfluxDB ) {
329273 InfluxDB influxDB = InfluxDBFactory .connect (config .getDBUri (), config .getDBAccount (), config .getDBPassword ());
330274 influxDB .setDatabase (config .getSchema ());
331275
@@ -382,8 +326,6 @@ public JSONObject execute(@NotNull SQLConfig<Long> config, boolean unknownType)
382326 return new JSONObject (true );
383327 }
384328
385- List <JSONObject > resultList = new ArrayList <>();
386-
387329 for (int i = 0 ; i < list .size (); i ++) {
388330 QueryResult .Result qyrt = list .get (i );
389331 List <QueryResult .Series > seriesList = qyrt .getSeries ();
@@ -413,16 +355,17 @@ public JSONObject execute(@NotNull SQLConfig<Long> config, boolean unknownType)
413355 }
414356 }
415357
416- result = resultList .isEmpty () ? new JSONObject () : resultList .get (0 );
417- if (resultList .size () > 1 ) {
418- result .put (KEY_RAW_LIST , resultList );
419- }
420-
421- putCache (sql , resultList , config );
358+ }
422359
423- return result ;
360+ // TODO 把 execute 内与缓存无关只与数据库读写逻辑相关的代码抽取到 executeSQL 函数
361+ result = resultList .isEmpty () ? new JSONObject () : resultList .get (0 );
362+ if (resultList .size () > 1 ) {
363+ result .put (KEY_RAW_LIST , resultList );
424364 }
425365
366+ putCache (sql , resultList , config );
367+
368+ return result ;
426369 }
427370
428371 return super .execute (config , unknownType );
0 commit comments