@@ -54,6 +54,12 @@ public class SubjectScopedSelect implements TaskRefTask
5454 protected final Map <String , String > _settings = new CaseInsensitiveHashMap <>();
5555 protected ContainerUser _containerUser ;
5656
57+ private enum MODE
58+ {
59+ UPDATE_ONLY ,
60+ TRUNCATE ;
61+ }
62+
5763 private enum Settings
5864 {
5965 subjectRemoteSource (false ),
@@ -90,7 +96,18 @@ public boolean isRequired()
9096 }
9197 }
9298
93- final int BATCH_SIZE = 500 ;
99+ final int BATCH_SIZE = 100 ;
100+
101+ private MODE getMode ()
102+ {
103+ String rawVal = StringUtils .trimToNull (_settings .get ("mode" ));
104+ if (rawVal == null )
105+ {
106+ return MODE .TRUNCATE ;
107+ }
108+
109+ return MODE .valueOf (rawVal );
110+ }
94111
95112 @ Override
96113 public RecordedActionSet run (@ NotNull PipelineJob job ) throws PipelineJobException
@@ -113,51 +130,90 @@ private void processBatch(List<String> subjects, Logger log)
113130
114131 try
115132 {
116- // Find / Delete existing values:
117- Set <ColumnInfo > keyFields = destinationTable .getColumns ().stream ().filter (ColumnInfo ::isKeyField ).collect (Collectors .toSet ());
118- final SimpleFilter subjectFilter = new SimpleFilter (FieldKey .fromString (_settings .get (Settings .targetSubjectColumn .name ())), subjects , CompareType .IN );
119- if (_settings .get (Settings .targetAdditionalFilters .name ()) != null )
133+ if (getMode () == MODE .TRUNCATE )
120134 {
121- List <CompareType .CompareClause > additionalFilters = parseAdditionalFilters (_settings .get (Settings .targetAdditionalFilters .name ()));
122- additionalFilters .forEach (subjectFilter ::addCondition );
123- }
135+ // Find / Delete existing values:
136+ Set <ColumnInfo > keyFields = destinationTable .getColumns ().stream ().filter (ColumnInfo ::isKeyField ).collect (Collectors .toSet ());
137+ final SimpleFilter subjectFilter = new SimpleFilter (FieldKey .fromString (_settings .get (Settings .targetSubjectColumn .name ())), subjects , CompareType .IN );
138+ if (_settings .get (Settings .targetAdditionalFilters .name ()) != null )
139+ {
140+ List <CompareType .CompareClause > additionalFilters = parseAdditionalFilters (_settings .get (Settings .targetAdditionalFilters .name ()));
141+ additionalFilters .forEach (subjectFilter ::addCondition );
142+ }
124143
125- if (destinationTable .getColumn (FieldKey .fromString (_settings .get (Settings .targetSubjectColumn .name ()))) == null )
126- {
127- throw new IllegalStateException ("Unknown column on table " + destinationTable .getName () + ": " + _settings .get (Settings .targetSubjectColumn .name ()));
128- }
144+ if (destinationTable .getColumn (FieldKey .fromString (_settings .get (Settings .targetSubjectColumn .name ()))) == null )
145+ {
146+ throw new IllegalStateException ("Unknown column on table " + destinationTable .getName () + ": " + _settings .get (Settings .targetSubjectColumn .name ()));
147+ }
129148
130- Collection <Map <String , Object >> existingRows = new TableSelector (destinationTable , keyFields , subjectFilter , null ).getMapCollection ();
131- if (!existingRows .isEmpty ())
132- {
133- log .info ("deleting " + existingRows .size () + " rows" );
134- qus .deleteRows (_containerUser .getUser (), _containerUser .getContainer (), new ArrayList <>(existingRows ), null , null );
149+ Collection <Map <String , Object >> existingRows = new TableSelector (destinationTable , keyFields , subjectFilter , null ).getMapCollection ();
150+ if (!existingRows .isEmpty ())
151+ {
152+ log .info ("deleting " + existingRows .size () + " rows" );
153+ qus .deleteRows (_containerUser .getUser (), _containerUser .getContainer (), new ArrayList <>(existingRows ), null , null );
154+ }
155+ else
156+ {
157+ log .info ("No rows to delete for this subject batch" );
158+ }
135159 }
136160 else
137161 {
138- log .info ("No rows to delete for this subject batch " );
162+ log .info ("Using " + getMode (). name () + " mode, source records will not be deleted " );
139163 }
140164
141165 // Query data and import
142- List <Map <String , Object >> toImport = getRowsToImport (subjects , log );
143- if (!toImport .isEmpty ())
166+ List <Map <String , Object >> toImportOrUpdate = getRowsToImport (subjects , log );
167+ if (!toImportOrUpdate .isEmpty ())
144168 {
145- log .info ("inserting " + toImport .size () + " rows" );
146- BatchValidationException bve = new BatchValidationException ();
147- qus .insertRows (_containerUser .getUser (), _containerUser .getContainer (), toImport , bve , null , null );
148- if (bve .hasErrors ())
169+ if (getMode () == MODE .TRUNCATE )
170+ {
171+ log .info ("inserting " + toImportOrUpdate .size () + " rows" );
172+ BatchValidationException bve = new BatchValidationException ();
173+ qus .insertRows (_containerUser .getUser (), _containerUser .getContainer (), toImportOrUpdate , bve , null , null );
174+ if (bve .hasErrors ())
175+ {
176+ throw bve ;
177+ }
178+ }
179+ else if (getMode () == MODE .UPDATE_ONLY )
180+ {
181+ log .info ("updating " + toImportOrUpdate .size () + " rows" );
182+ BatchValidationException bve = new BatchValidationException ();
183+
184+ Collection <String > keyFields = destinationTable .getPkColumnNames ();
185+ List <Map <String , Object >> keys = toImportOrUpdate .stream ().map (x -> {
186+ Map <String , Object > map = new HashMap <>();
187+ for (String keyField : keyFields )
188+ {
189+ if (x .get (keyField ) != null )
190+ {
191+ map .put (keyField , x .get (keyField ));
192+ }
193+ }
194+
195+ return map ;
196+ }).toList ();
197+
198+ qus .updateRows (_containerUser .getUser (), _containerUser .getContainer (), toImportOrUpdate , keys , bve , null , null );
199+ if (bve .hasErrors ())
200+ {
201+ throw bve ;
202+ }
203+ }
204+ else
149205 {
150- throw bve ;
206+ throw new IllegalStateException ( "Unknown mode: " + getMode ()) ;
151207 }
152208 }
153209 else
154210 {
155- log .info ("No rows to import for this subject batch" );
211+ log .info ("No rows to import/update for this subject batch" );
156212 }
157213 }
158214 catch (SQLException | InvalidKeyException | BatchValidationException | QueryUpdateServiceException | DuplicateKeyException e )
159215 {
160- throw new IllegalStateException ("Error Importing Rows" , e );
216+ throw new IllegalStateException ("Error Importing/Updating Rows" , e );
161217 }
162218 }
163219
@@ -350,7 +406,7 @@ else if (f.getParamVals().length == 1)
350406 throw new IllegalStateException ("Table is missing column: " + _settings .get (Settings .dataSourceSubjectColumn .name ()));
351407 }
352408
353- final SimpleFilter filter = new SimpleFilter (_settings .get (Settings .dataSourceSubjectColumn .name ()), subjects , CompareType .IN );
409+ final SimpleFilter filter = new SimpleFilter (FieldKey . fromString ( _settings .get (Settings .dataSourceSubjectColumn .name () )), subjects , CompareType .IN );
354410 if (_settings .get (Settings .dataSourceAdditionalFilters .name ()) != null )
355411 {
356412 List <CompareType .CompareClause > additionalFilters = parseAdditionalFilters (_settings .get (Settings .dataSourceAdditionalFilters .name ()));
0 commit comments