1010
1111from process_tracker .models .model_base import Base
1212from process_tracker .models .actor import Actor
13+ from process_tracker .models .capacity import Cluster , ClusterProcess
1314from process_tracker .models .extract import ExtractStatus
1415from process_tracker .models .process import (
1516 ErrorType ,
@@ -182,7 +183,18 @@ def initialize_data_store(self, overwrite=False):
182183
183184 self .logger .debug ("Finished the initialization check." )
184185
185- def topic_creator (self , topic , name , parent = None , child = None ):
186+ def topic_creator (
187+ self ,
188+ topic ,
189+ name ,
190+ parent = None ,
191+ child = None ,
192+ max_processing = None ,
193+ processing_unit = None ,
194+ max_memory = None ,
195+ memory_unit = None ,
196+ cluster = None ,
197+ ):
186198 """
187199 For the command line tool, validate the topic and create the new instance.
188200 :param topic: The name of the topic.
@@ -191,8 +203,19 @@ def topic_creator(self, topic, name, parent=None, child=None):
191203 :type name: string
192204 :param parent: The parent process' name, if creating a process dependency
193205 :type parent: string
194- :param child: The child process' name, if creating a process dependency
206+ :param child: The child process' name, if creating a process dependency.For cluster/process relationships, the
207+ name of the process.
195208 :type child: string
209+ :param max_processing: For performance clusters, the maximum processing ability allocated to the cluster
210+ :type max_processing: string
211+ :param max_memory: For performance clusters, the maximum memory allocated to the cluster
212+ :type max_memory: string
213+ :param processing_unit: For performance clusters, the unit of processing ability allocated to the cluster
214+ :type processing_unit: string
215+ :param memory_unit: For performance clusters, the unit of allocated memory to the cluster
216+ :type memory_unit: string
217+ :param cluster: For cluster/process relationships, the name of the cluster.
218+ :type cluster: string
196219 :return:
197220 """
198221 self .logger .info ("Attempting to create %s item: %s" % (topic , name ))
@@ -203,6 +226,33 @@ def topic_creator(self, topic, name, parent=None, child=None):
203226 item = self .get_or_create_item (model = Actor , actor_name = name )
204227 self .logger .info ("Actor created: %s" % item .__repr__ )
205228
229+ elif topic == "cluster" :
230+ item = self .get_or_create_item (
231+ model = Cluster ,
232+ cluster_name = name ,
233+ cluster_max_memory = max_memory ,
234+ cluster_max_memory_unit = memory_unit ,
235+ cluster_max_processing = max_processing ,
236+ cluster_max_processing_unit = processing_unit ,
237+ )
238+ self .logger .info ("Cluster created: %s" % item .__repr__ )
239+
240+ elif topic == "cluster process" :
241+ cluster = self .get_or_create_item (
242+ model = Cluster , create = False , cluster_name = cluster
243+ )
244+ process = self .get_or_create_item (
245+ model = Process , create = False , process_name = child
246+ )
247+
248+ item = self .get_or_create_item (
249+ model = ClusterProcess ,
250+ cluster_id = cluster .cluster_id ,
251+ process_id = process .process_id ,
252+ )
253+
254+ self .logger .info ("Cluster Process created: %s" % item .__repr__ )
255+
206256 elif topic == "extract status" :
207257 item = self .get_or_create_item (
208258 model = ExtractStatus , extract_status_name = name
@@ -260,7 +310,7 @@ def topic_creator(self, topic, name, parent=None, child=None):
260310
261311 return item
262312
263- def topic_deleter (self , topic , name , parent = None , child = None ):
313+ def topic_deleter (self , topic , name , parent = None , child = None , cluster = None ):
264314 """
265315 For the command line tool, validate that the topic name is not a default value and if not, delete it.
266316 :param topic: The SQLAlchemy object type
@@ -269,8 +319,9 @@ def topic_deleter(self, topic, name, parent=None, child=None):
269319 :type name: string
270320 :param parent: The parent process' name, if deleting a process dependency
271321 :type parent: string
272- :param child: The child process' name, if deleting a process dependency
322+ :param child: The child process' name, if deleting a process dependency. For cluster/process relationship, the name of the process.
273323 :type child: string
324+ :param cluster: For cluster/process relationship, the name of the cluster.
274325 :return:
275326 """
276327 item_delete = False
@@ -284,6 +335,33 @@ def topic_deleter(self, topic, name, parent=None, child=None):
284335 self .session .query (Actor ).filter (Actor .actor_name == name ).delete ()
285336 self .logger .info ("%s %s deleted." % (topic , name ))
286337
338+ elif topic == "cluster" :
339+ item_delete = True
340+ self .session .query (Cluster ).filter (
341+ Cluster .cluster_name == name
342+ ).delete ()
343+ self .logger .info ("%s %s deleted." % (topic , name ))
344+
345+ elif topic == "cluster process" :
346+ item_delete = True
347+ cluster = self .get_or_create_item (
348+ model = Cluster , create = False , cluster_name = cluster
349+ )
350+ process = self .get_or_create_item (
351+ model = Process , create = False , process_name = child
352+ )
353+
354+ item = self .get_or_create_item (
355+ model = ClusterProcess ,
356+ create = False ,
357+ cluster_id = cluster .cluster_id ,
358+ process_id = process .process_id ,
359+ )
360+
361+ self .session .delete (item )
362+
363+ self .logger .info ("%s %s - %s deleted." % (topic , cluster , child ))
364+
287365 elif topic == "extract status" and name not in preload_extract_status_types :
288366 item_delete = True
289367 self .session .query (ExtractStatus ).filter (
@@ -359,7 +437,16 @@ def topic_deleter(self, topic, name, parent=None, child=None):
359437
360438 return "blarg"
361439
362- def topic_updater (self , topic , initial_name , name ):
440+ def topic_updater (
441+ self ,
442+ topic ,
443+ initial_name ,
444+ name ,
445+ max_processing = None ,
446+ processing_unit = None ,
447+ max_memory = None ,
448+ memory_unit = None ,
449+ ):
363450 """
364451 For the command line tool, validate that the topic name is not a default value and if not, update it.
365452 :param topic: name of the SQLAlchemy object
@@ -368,6 +455,14 @@ def topic_updater(self, topic, initial_name, name):
368455 :type initial_name: string
369456 :param name: The updated name of the object to be updated.
370457 :type name: string
458+ :param max_processing: For performance clusters, the maximum processing ability allocated to the cluster
459+ :type max_processing: string
460+ :param max_memory: For performance clusters, the maximum memory allocated to the cluster
461+ :type max_memory: string
462+ :param processing_unit: For performance clusters, the unit of processing ability allocated to the cluster
463+ :type processing_unit: string
464+ :param memory_unit: For performance clusters, the unit of allocated memory to the cluster
465+ :type memory_unit: string
371466 :return:
372467 """
373468 if self .topic_validator (topic = topic ):
@@ -378,6 +473,26 @@ def topic_updater(self, topic, initial_name, name):
378473 item .actor_name = name
379474 self .logger .info ("%s %s updated." % (topic , name ))
380475
476+ elif topic == "cluster" :
477+ item = self .get_or_create_item (
478+ model = Cluster , create = False , cluster_name = initial_name
479+ )
480+
481+ item .cluster_name = name
482+ if max_memory is not None :
483+ item .cluster_max_memory = max_memory
484+
485+ if memory_unit is not None :
486+ item .cluster_max_memory_unit = memory_unit
487+
488+ if max_processing is not None :
489+ item .cluster_max_processing = max_processing
490+
491+ if processing_unit is not None :
492+ item .cluster_max_processing_unit = processing_unit
493+
494+ self .logger .info ("%s %s updated." % (topic , name ))
495+
381496 elif (
382497 topic == "extract status"
383498 and initial_name not in preload_extract_status_types
@@ -455,6 +570,7 @@ def topic_validator(self, topic):
455570 # Only data store topics that should be allowed to be created from the command line tool.
456571 valid_topics = [
457572 "actor" ,
573+ "cluster" ,
458574 "error type" ,
459575 "extract status" ,
460576 "process dependency" ,
0 commit comments