1+ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+ # SPDX-License-Identifier: Apache-2.0
3+
4+ import boto3
5+ import json
6+ import datetime
7+ import time
8+
9+ class DateTimeEncoder (json .JSONEncoder ):
10+ def default (self , obj ):
11+ if isinstance (obj , datetime .datetime ):
12+ return obj .isoformat ()
13+ return super ().default (obj )
14+
15+ def get_knowledge_base_id (knowledge_base_name , region_name , bedrock_agent ):
16+ response = bedrock_agent .list_knowledge_bases ()
17+ for kb in response ['knowledgeBaseSummaries' ]:
18+ if kb ['name' ] == knowledge_base_name :
19+ return kb ['knowledgeBaseId' ]
20+ raise ValueError (f"Knowledge base '{ knowledge_base_name } ' not found" )
21+
22+ def get_or_create_data_source (knowledge_base_id , language , region_name , bedrock_agent ):
23+ # List existing data sources
24+ response = bedrock_agent .list_data_sources (knowledgeBaseId = knowledge_base_id )
25+ data_sources = response ['dataSourceSummaries' ]
26+
27+ # Look for existing data source for this SDK
28+ for ds in data_sources :
29+ if language in ds ['name' ] and ds ['name' ] != "default" :
30+ return ds ['dataSourceId' ], ds ['name' ], False # Found existing
31+ if language in ["steering-docs" , "final-specs" ]:
32+ ds_name = f"{ language } -data-source"
33+ bucket_name = f"{ language } -bucket"
34+ else :
35+ ds_name = f"{ language } -premium-data-source"
36+ bucket_name = f"{ language } -premium-bucket"
37+ # Create new data source if none found
38+ response = bedrock_agent .create_data_source (
39+ knowledgeBaseId = knowledge_base_id ,
40+ name = ds_name ,
41+ dataSourceConfiguration = {
42+ "type" : "S3" ,
43+ "s3Configuration" : {
44+ "bucketArn" : f"arn:aws:s3:::{ bucket_name } "
45+ }
46+ },
47+ vectorIngestionConfiguration = {
48+ "chunkingConfiguration" : {
49+ "chunkingStrategy" : "HIERARCHICAL" ,
50+ "hierarchicalChunkingConfiguration" : {
51+ "levelConfigurations" : [
52+ {
53+ "maxTokens" : 1500
54+ },
55+ {
56+ "maxTokens" : 300
57+ }
58+ ],
59+ "overlapTokens" : 75
60+ }
61+ }
62+ }
63+ )
64+ return response ['dataSource' ]['dataSourceId' ], response ['dataSource' ]['name' ], True # Created new
65+
66+ def sync_data_source (knowledge_base_id , data_source_id , region_name , bedrock_agent ):
67+ response = bedrock_agent .start_ingestion_job (
68+ knowledgeBaseId = knowledge_base_id ,
69+ dataSourceId = data_source_id
70+ )
71+ return response
72+
73+ def monitor_ingestion_job (knowledge_base_id , data_source_id , ingestion_job_id , region_name , bedrock_agent ):
74+ max_attempts = 100
75+ attempts = 0
76+
77+ while attempts < max_attempts :
78+ job_status = bedrock_agent .get_ingestion_job (
79+ knowledgeBaseId = knowledge_base_id ,
80+ dataSourceId = data_source_id ,
81+ ingestionJobId = ingestion_job_id
82+ )
83+
84+ status = job_status ['ingestionJob' ]['status' ]
85+ print (f"Current status: { status } - { datetime .datetime .now ().strftime ('%Y-%m-%d %H:%M:%S' )} " )
86+
87+ if status in ['COMPLETE' , 'FAILED' , 'STOPPED' ]:
88+ return job_status
89+
90+ attempts += 1
91+ time .sleep (5 )
92+
93+ return {"status" : "TIMEOUT" , "message" : "Job monitoring timed out after 5 minutes" }
94+
95+ def lambda_handler (event , context ):
96+ language = event .get ('language' , 'python' )
97+ region_name = event .get ('region_name' , 'us-west-2' )
98+ if language in ["steering-docs" , "final-specs" ,"coding-standards" ]:
99+ knowledge_base_name = f"{ language } -KB"
100+ else :
101+ knowledge_base_name = f"{ language } -premium-KB"
102+
103+ bedrock_agent = boto3 .client ('bedrock-agent' , region_name = region_name )
104+
105+ knowledge_base_id = get_knowledge_base_id (knowledge_base_name , region_name , bedrock_agent )
106+
107+ # Get or create data source
108+ data_source_id , data_source_name , is_new = get_or_create_data_source (
109+ knowledge_base_id , language , region_name , bedrock_agent
110+ )
111+
112+ results = {
113+ "data_source" : {
114+ "id" : data_source_id ,
115+ "name" : data_source_name ,
116+ "is_new" : is_new
117+ },
118+ "ingestion_job" : {},
119+ "statistics" : None
120+ }
121+
122+ # Sync the data source
123+ print (f"Syncing data source { data_source_name } ..." )
124+ sync_result = sync_data_source (knowledge_base_id , data_source_id , region_name , bedrock_agent )
125+
126+ ingestion_job_id = sync_result ['ingestionJob' ]['ingestionJobId' ]
127+ results ["ingestion_job" ] = {"id" : ingestion_job_id , "status" : "STARTED" }
128+
129+ # Monitor the ingestion job
130+ final_status = monitor_ingestion_job (
131+ knowledge_base_id , data_source_id , ingestion_job_id , region_name , bedrock_agent
132+ )
133+
134+ results ["ingestion_job" ]["status" ] = final_status .get ('ingestionJob' , {}).get ('status' , 'UNKNOWN' )
135+
136+ # Get statistics
137+ if 'statistics' in final_status .get ('ingestionJob' , {}):
138+ stats = final_status ['ingestionJob' ]['statistics' ]
139+ results ["statistics" ] = {
140+ "documents_processed" : stats .get ('numberOfDocumentsScanned' , 0 ),
141+ "documents_failed" : stats .get ('numberOfDocumentsFailed' , 0 ),
142+ "documents_indexed" : stats .get ('numberOfNewDocumentsIndexed' , 0 ),
143+ "documents_modified_indexed" : stats .get ('numberOfModifiedDocumentsIndexed' ,0 )
144+ }
145+
146+ return {
147+ 'statusCode' : 200 ,
148+ 'body' : json .dumps (results , cls = DateTimeEncoder )
149+ }
0 commit comments