diff --git a/backend/apps/db/db.py b/backend/apps/db/db.py index 74d2e6b1..51084236 100644 --- a/backend/apps/db/db.py +++ b/backend/apps/db/db.py @@ -24,7 +24,7 @@ from apps.datasource.utils.utils import aes_decrypt from apps.db.constant import DB, ConnectType from apps.db.engine import get_engine_config -from apps.system.crud.assistant import get_ds_engine +from apps.system.crud.assistant import get_out_ds_conf from apps.system.schemas.system_schema import AssistantOutDsSchema from common.core.deps import Trans from common.utils.utils import SQLBotLogUtil, equals_ignore_case @@ -146,92 +146,25 @@ def get_engine(ds: CoreDatasource, timeout: int = 0) -> Engine: def get_session(ds: CoreDatasource | AssistantOutDsSchema): - engine = get_engine(ds) if isinstance(ds, CoreDatasource) else get_ds_engine(ds) + # engine = get_engine(ds) if isinstance(ds, CoreDatasource) else get_ds_engine(ds) + if isinstance(ds, AssistantOutDsSchema): + out_conf = get_out_ds_conf(ds, 30) + ds.configuration = out_conf + + engine = get_engine(ds) session_maker = sessionmaker(bind=engine) session = session_maker() return session def check_connection(trans: Optional[Trans], ds: CoreDatasource | AssistantOutDsSchema, is_raise: bool = False): - if isinstance(ds, CoreDatasource): - db = DB.get_db(ds.type) - if db.connect_type == ConnectType.sqlalchemy: - conn = get_engine(ds, 10) - try: - with conn.connect() as connection: - SQLBotLogUtil.info("success") - return True - except Exception as e: - SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}") - if is_raise: - raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}') - return False - else: - conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration))) - extra_config_dict = get_extra_config(conf) - if equals_ignore_case(ds.type, 'dm'): - with dmPython.connect(user=conf.username, password=conf.password, server=conf.host, - port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor: - try: - cursor.execute('select 1', timeout=10).fetchall() - SQLBotLogUtil.info("success") - return True - except Exception as e: - SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}") - if is_raise: - raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}') - return False - elif equals_ignore_case(ds.type, 'doris', 'starrocks'): - with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host, - port=conf.port, db=conf.database, connect_timeout=10, - read_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor: - try: - cursor.execute('select 1') - SQLBotLogUtil.info("success") - return True - except Exception as e: - SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}") - if is_raise: - raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}') - return False - elif equals_ignore_case(ds.type, 'redshift'): - with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database, - user=conf.username, - password=conf.password, - timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor: - try: - cursor.execute('select 1') - SQLBotLogUtil.info("success") - return True - except Exception as e: - SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}") - if is_raise: - raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}') - return False - elif equals_ignore_case(ds.type, 'kingbase'): - with psycopg2.connect(host=conf.host, port=conf.port, database=conf.database, - user=conf.username, - password=conf.password, - connect_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor: - try: - cursor.execute('select 1') - SQLBotLogUtil.info("success") - return True - except Exception as e: - SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}") - if is_raise: - raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}') - return False - elif equals_ignore_case(ds.type, 'es'): - es_conn = get_es_connect(conf) - if es_conn.ping(): - SQLBotLogUtil.info("success") - return True - else: - SQLBotLogUtil.info("failed") - return False - else: - conn = get_ds_engine(ds) + if isinstance(ds, AssistantOutDsSchema): + out_conf = get_out_ds_conf(ds, 10) + ds.configuration = out_conf + + db = DB.get_db(ds.type) + if db.connect_type == ConnectType.sqlalchemy: + conn = get_engine(ds, 10) try: with conn.connect() as connection: SQLBotLogUtil.info("success") @@ -241,26 +174,102 @@ def check_connection(trans: Optional[Trans], ds: CoreDatasource | AssistantOutDs if is_raise: raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}') return False + else: + conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration))) + extra_config_dict = get_extra_config(conf) + if equals_ignore_case(ds.type, 'dm'): + with dmPython.connect(user=conf.username, password=conf.password, server=conf.host, + port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor: + try: + cursor.execute('select 1', timeout=10).fetchall() + SQLBotLogUtil.info("success") + return True + except Exception as e: + SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}") + if is_raise: + raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}') + return False + elif equals_ignore_case(ds.type, 'doris', 'starrocks'): + with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host, + port=conf.port, db=conf.database, connect_timeout=10, + read_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor: + try: + cursor.execute('select 1') + SQLBotLogUtil.info("success") + return True + except Exception as e: + SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}") + if is_raise: + raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}') + return False + elif equals_ignore_case(ds.type, 'redshift'): + with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database, + user=conf.username, + password=conf.password, + timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor: + try: + cursor.execute('select 1') + SQLBotLogUtil.info("success") + return True + except Exception as e: + SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}") + if is_raise: + raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}') + return False + elif equals_ignore_case(ds.type, 'kingbase'): + with psycopg2.connect(host=conf.host, port=conf.port, database=conf.database, + user=conf.username, + password=conf.password, + connect_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor: + try: + cursor.execute('select 1') + SQLBotLogUtil.info("success") + return True + except Exception as e: + SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}") + if is_raise: + raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}') + return False + elif equals_ignore_case(ds.type, 'es'): + es_conn = get_es_connect(conf) + if es_conn.ping(): + SQLBotLogUtil.info("success") + return True + else: + SQLBotLogUtil.info("failed") + return False + # else: + # conn = get_ds_engine(ds) + # try: + # with conn.connect() as connection: + # SQLBotLogUtil.info("success") + # return True + # except Exception as e: + # SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}") + # if is_raise: + # raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}') + # return False return False def get_version(ds: CoreDatasource | AssistantOutDsSchema): version = '' - conf = None if isinstance(ds, CoreDatasource): conf = DatasourceConf( **json.loads(aes_decrypt(ds.configuration))) if not equals_ignore_case(ds.type, "excel") else get_engine_config() - if isinstance(ds, AssistantOutDsSchema): - conf = DatasourceConf() - conf.host = ds.host - conf.port = ds.port - conf.username = ds.user - conf.password = ds.password - conf.database = ds.dataBase - conf.dbSchema = ds.db_schema - conf.timeout = 10 + else: + conf = DatasourceConf(**json.loads(aes_decrypt(get_out_ds_conf(ds, 10)))) + # if isinstance(ds, AssistantOutDsSchema): + # conf = DatasourceConf() + # conf.host = ds.host + # conf.port = ds.port + # conf.username = ds.user + # conf.password = ds.password + # conf.database = ds.dataBase + # conf.dbSchema = ds.db_schema + # conf.timeout = 10 db = DB.get_db(ds.type) sql = get_version_sql(ds, conf) try: diff --git a/backend/apps/system/crud/assistant.py b/backend/apps/system/crud/assistant.py index 06b77abf..da9ec232 100644 --- a/backend/apps/system/crud/assistant.py +++ b/backend/apps/system/crud/assistant.py @@ -10,6 +10,7 @@ # from apps.datasource.embedding.table_embedding import get_table_embedding from apps.datasource.models.datasource import CoreDatasource, DatasourceConf +from apps.datasource.utils.utils import aes_encrypt from apps.system.models.system_model import AssistantModel from apps.system.schemas.auth import CacheName, CacheNamespace from apps.system.schemas.system_schema import AssistantHeader, AssistantOutDsSchema, UserInfoDTO @@ -266,3 +267,19 @@ def get_ds_engine(ds: AssistantOutDsSchema) -> Engine: else: engine = create_engine(uri, connect_args={"connect_timeout": timeout}, pool_timeout=timeout) return engine + + +def get_out_ds_conf(ds: AssistantOutDsSchema, timeout:int=30) -> str: + conf = { + "host":ds.host, + "port":ds.port, + "username":ds.user, + "password":ds.password, + "database":ds.dataBase, + "driver":'', + "extraJdbc":ds.extraParams or '', + "dbSchema":ds.db_schema or '', + "timeout":timeout + } + conf.extraJdbc = '' + return aes_encrypt(json.dumps(conf)) diff --git a/backend/apps/system/schemas/system_schema.py b/backend/apps/system/schemas/system_schema.py index e37bacab..f16f6dd3 100644 --- a/backend/apps/system/schemas/system_schema.py +++ b/backend/apps/system/schemas/system_schema.py @@ -178,6 +178,7 @@ class AssistantOutDsBase(BaseModel): type_name: Optional[str] = None comment: Optional[str] = None description: Optional[str] = None + configuration: Optional[str] = None class AssistantOutDsSchema(AssistantOutDsBase):