diff --git a/magicblock-committor-service/README.md b/magicblock-committor-service/README.md index 714dfcb0e..891d7ad5a 100644 --- a/magicblock-committor-service/README.md +++ b/magicblock-committor-service/README.md @@ -23,18 +23,18 @@ IntentExecutor - responsible for execution of Intent. Calls **TransactionPrepar TransactionPreparator - is an entity that handles all of the above "Transaction preparation" calling **TaskBuilderV1**, **TaskStrategist**, **DeliveryPreparator** and then assempling it all and passing to **MessageExecutor** ## DeliveryPreparator -After our **BaseTask**s are ready we need to prepare eveything for their successful execution. **DeliveryPreparator** - handles ALTs and commit buffers +After our **Task**s are ready we need to prepare eveything for their successful execution. **DeliveryPreparator** - handles ALTs and commit buffers ## TaskBuilder First, lets build atomic tasks from scheduled message/intent. -High level: TaskBuilder responsible for creating BaseTasks(to be renamed...) from ScheduledBaseIntent(to be renamed...). +High level: TaskBuilder responsible for creating Tasks(to be renamed...) from ScheduledBaseIntent(to be renamed...). Details: To do that is requires additional information from DelegationMetadata, it is provided **CommitIdFetcher** -### BaseTask -High level: BaseTask - is an atomic operation that is to be performed on the Base layer, like: Commit, Undelegate, Finalize, Action. +### Task +High level: Task - is an atomic operation that is to be performed on the Base layer, like: Commit, Undelegate, Finalize, Action. -Details: There's to implementation of BaseTask: ArgsTask, BufferTask. ArgsTask - gives instruction using args. BufferTask - gives instruction using buffer. BufferTask at the moment supports only commits +Details: There's to implementation of Task: ArgsTask, BufferTask. ArgsTask - gives instruction using args. BufferTask - gives instruction using buffer. BufferTask at the moment supports only commits ### TaskInfoFetcher High level: for account to be accepted by `dlp` it needs to have incremental commit ids. TaskInfoFetcher provides a user with the correct ids/nonces for set of committees diff --git a/magicblock-committor-service/src/intent_executor/error.rs b/magicblock-committor-service/src/intent_executor/error.rs index 9484be711..1978a6423 100644 --- a/magicblock-committor-service/src/intent_executor/error.rs +++ b/magicblock-committor-service/src/intent_executor/error.rs @@ -11,7 +11,7 @@ use solana_transaction_error::TransactionError; use crate::{ tasks::{ task_builder::TaskBuilderError, task_strategist::TaskStrategistError, - BaseTask, TaskType, + Task, TaskType, }, transaction_preparator::error::TransactionPreparatorError, }; @@ -174,7 +174,7 @@ impl TransactionStrategyExecutionError { pub fn try_from_transaction_error( err: TransactionError, signature: Option, - tasks: &[Box], + tasks: &[Task], ) -> Result { // There's always 2 budget instructions in front const OFFSET: u8 = 2; @@ -256,7 +256,7 @@ impl metrics::LabelValue for TransactionStrategyExecutionError { } pub(crate) struct IntentTransactionErrorMapper<'a> { - pub tasks: &'a [Box], + pub tasks: &'a [Task], } impl TransactionErrorMapper for IntentTransactionErrorMapper<'_> { type ExecutionError = TransactionStrategyExecutionError; diff --git a/magicblock-committor-service/src/intent_executor/mod.rs b/magicblock-committor-service/src/intent_executor/mod.rs index 4c26ac8e0..fd53ee73e 100644 --- a/magicblock-committor-service/src/intent_executor/mod.rs +++ b/magicblock-committor-service/src/intent_executor/mod.rs @@ -48,7 +48,7 @@ use crate::{ StrategyExecutionMode, TaskStrategist, TransactionStrategy, }, task_visitors::utility_visitor::TaskVisitorUtils, - BaseTask, TaskType, + Task, TaskType, }, transaction_preparator::{ delivery_preparator::BufferExecutionError, @@ -161,7 +161,7 @@ where Some(value) => value, None => { // Build tasks for commit stage - let commit_tasks = TaskBuilderImpl::commit_tasks( + let commit_tasks = TaskBuilderImpl::create_commit_tasks( &self.task_info_fetcher, &base_intent, persister, @@ -186,7 +186,7 @@ where // Build tasks for commit & finalize stages let (commit_tasks, finalize_tasks) = { - let commit_tasks_fut = TaskBuilderImpl::commit_tasks( + let commit_tasks_fut = TaskBuilderImpl::create_commit_tasks( &self.task_info_fetcher, &base_intent, persister, @@ -633,7 +633,7 @@ where async fn execute_message_with_retries( &self, prepared_message: VersionedMessage, - tasks: &[Box], + tasks: &[Task], ) -> IntentExecutorResult { struct IntentErrorMapper { diff --git a/magicblock-committor-service/src/tasks/args_task.rs b/magicblock-committor-service/src/tasks/args_task.rs deleted file mode 100644 index 1aec8326e..000000000 --- a/magicblock-committor-service/src/tasks/args_task.rs +++ /dev/null @@ -1,217 +0,0 @@ -use dlp::{ - args::{CallHandlerArgs, CommitDiffArgs, CommitStateArgs}, - compute_diff, -}; -use magicblock_metrics::metrics::LabelValue; -use solana_account::ReadableAccount; -use solana_instruction::{AccountMeta, Instruction}; -use solana_pubkey::Pubkey; - -#[cfg(test)] -use crate::tasks::TaskStrategy; -use crate::tasks::{ - buffer_task::{BufferTask, BufferTaskType}, - visitor::Visitor, - BaseActionTask, BaseTask, BaseTaskError, BaseTaskResult, CommitDiffTask, - CommitTask, FinalizeTask, PreparationState, TaskType, UndelegateTask, -}; - -/// Task that will be executed on Base layer via arguments -#[derive(Clone)] -pub enum ArgsTaskType { - Commit(CommitTask), - CommitDiff(CommitDiffTask), - Finalize(FinalizeTask), - Undelegate(UndelegateTask), // Special action really - BaseAction(BaseActionTask), -} - -#[derive(Clone)] -pub struct ArgsTask { - preparation_state: PreparationState, - pub task_type: ArgsTaskType, -} - -impl From for ArgsTask { - fn from(value: ArgsTaskType) -> Self { - Self::new(value) - } -} - -impl ArgsTask { - pub fn new(task_type: ArgsTaskType) -> Self { - Self { - preparation_state: PreparationState::NotNeeded, - task_type, - } - } -} - -impl BaseTask for ArgsTask { - fn instruction(&self, validator: &Pubkey) -> Instruction { - match &self.task_type { - ArgsTaskType::Commit(value) => { - let args = CommitStateArgs { - nonce: value.commit_id, - lamports: value.committed_account.account.lamports, - data: value.committed_account.account.data.clone(), - allow_undelegation: value.allow_undelegation, - }; - dlp::instruction_builder::commit_state( - *validator, - value.committed_account.pubkey, - value.committed_account.account.owner, - args, - ) - } - ArgsTaskType::CommitDiff(value) => { - let args = CommitDiffArgs { - nonce: value.commit_id, - lamports: value.committed_account.account.lamports, - diff: compute_diff( - value.base_account.data(), - value.committed_account.account.data(), - ) - .to_vec(), - allow_undelegation: value.allow_undelegation, - }; - - dlp::instruction_builder::commit_diff( - *validator, - value.committed_account.pubkey, - value.committed_account.account.owner, - args, - ) - } - ArgsTaskType::Finalize(value) => { - dlp::instruction_builder::finalize( - *validator, - value.delegated_account, - ) - } - ArgsTaskType::Undelegate(value) => { - dlp::instruction_builder::undelegate( - *validator, - value.delegated_account, - value.owner_program, - value.rent_reimbursement, - ) - } - ArgsTaskType::BaseAction(value) => { - let action = &value.action; - let account_metas = action - .account_metas_per_program - .iter() - .map(|short_meta| AccountMeta { - pubkey: short_meta.pubkey, - is_writable: short_meta.is_writable, - is_signer: false, - }) - .collect(); - dlp::instruction_builder::call_handler( - *validator, - action.destination_program, - action.escrow_authority, - account_metas, - CallHandlerArgs { - data: action.data_per_program.data.clone(), - escrow_index: action.data_per_program.escrow_index, - }, - ) - } - } - } - - fn try_optimize_tx_size( - self: Box, - ) -> Result, Box> { - match self.task_type { - ArgsTaskType::Commit(value) => { - Ok(Box::new(BufferTask::new_preparation_required( - BufferTaskType::Commit(value), - ))) - } - ArgsTaskType::CommitDiff(value) => { - Ok(Box::new(BufferTask::new_preparation_required( - BufferTaskType::CommitDiff(value), - ))) - } - ArgsTaskType::BaseAction(_) - | ArgsTaskType::Finalize(_) - | ArgsTaskType::Undelegate(_) => Err(self), - } - } - - /// Nothing to prepare for [`ArgsTaskType`] type - fn preparation_state(&self) -> &PreparationState { - &self.preparation_state - } - - fn switch_preparation_state( - &mut self, - new_state: PreparationState, - ) -> BaseTaskResult<()> { - if !matches!(new_state, PreparationState::NotNeeded) { - Err(BaseTaskError::PreparationStateTransitionError) - } else { - // Do nothing - Ok(()) - } - } - - fn compute_units(&self) -> u32 { - match &self.task_type { - ArgsTaskType::Commit(_) => 70_000, - ArgsTaskType::CommitDiff(_) => 70_000, - ArgsTaskType::BaseAction(task) => task.action.compute_units, - ArgsTaskType::Undelegate(_) => 70_000, - ArgsTaskType::Finalize(_) => 70_000, - } - } - - #[cfg(test)] - fn strategy(&self) -> TaskStrategy { - TaskStrategy::Args - } - - fn task_type(&self) -> TaskType { - match &self.task_type { - ArgsTaskType::Commit(_) => TaskType::Commit, - ArgsTaskType::CommitDiff(_) => TaskType::Commit, - ArgsTaskType::BaseAction(_) => TaskType::Action, - ArgsTaskType::Undelegate(_) => TaskType::Undelegate, - ArgsTaskType::Finalize(_) => TaskType::Finalize, - } - } - - /// For tasks using Args strategy call corresponding `Visitor` method - fn visit(&self, visitor: &mut dyn Visitor) { - visitor.visit_args_task(self); - } - - fn reset_commit_id(&mut self, commit_id: u64) { - match &mut self.task_type { - ArgsTaskType::Commit(task) => { - task.commit_id = commit_id; - } - ArgsTaskType::CommitDiff(task) => { - task.commit_id = commit_id; - } - ArgsTaskType::BaseAction(_) - | ArgsTaskType::Finalize(_) - | ArgsTaskType::Undelegate(_) => {} - }; - } -} - -impl LabelValue for ArgsTask { - fn value(&self) -> &str { - match self.task_type { - ArgsTaskType::Commit(_) => "args_commit", - ArgsTaskType::CommitDiff(_) => "args_commit_diff", - ArgsTaskType::BaseAction(_) => "args_action", - ArgsTaskType::Finalize(_) => "args_finalize", - ArgsTaskType::Undelegate(_) => "args_undelegate", - } - } -} diff --git a/magicblock-committor-service/src/tasks/buffer_lifecycle.rs b/magicblock-committor-service/src/tasks/buffer_lifecycle.rs new file mode 100644 index 000000000..9ed6a0e89 --- /dev/null +++ b/magicblock-committor-service/src/tasks/buffer_lifecycle.rs @@ -0,0 +1,212 @@ +use magicblock_committor_program::{ + instruction_builder::{ + close_buffer::{create_close_ix, CreateCloseIxArgs}, + init_buffer::{create_init_ix, CreateInitIxArgs}, + realloc_buffer::{ + create_realloc_buffer_ixs, CreateReallocBufferIxArgs, + }, + write_buffer::{create_write_ix, CreateWriteIxArgs}, + }, + pdas, ChangesetChunks, Chunks, +}; +use magicblock_program::magic_scheduled_base_intent::CommittedAccount; +use solana_account::Account; +use solana_instruction::Instruction; +use solana_pubkey::Pubkey; + +use crate::consts::MAX_WRITE_CHUNK_SIZE; + +#[derive(Debug, Clone)] +pub struct BufferLifecycle { + pub preparation: CreateBufferTask, + pub cleanup: DestroyTask, +} + +impl BufferLifecycle { + pub fn new( + commit_id: u64, + account: &CommittedAccount, + base_account: Option<&Account>, + ) -> BufferLifecycle { + let data = if let Some(base_account) = base_account { + dlp::compute_diff(&base_account.data, &account.account.data) + .to_vec() + } else { + account.account.data.clone() + }; + + BufferLifecycle { + preparation: CreateBufferTask { + commit_id, + pubkey: account.pubkey, + chunks: Chunks::from_data_length( + data.len(), + MAX_WRITE_CHUNK_SIZE, + ), + state_or_diff: data, + }, + cleanup: DestroyTask { + pubkey: account.pubkey, + commit_id, + }, + } + } +} + +#[derive(Clone, Debug)] +pub struct CreateBufferTask { + pub commit_id: u64, + pub pubkey: Pubkey, + pub chunks: Chunks, + pub state_or_diff: Vec, +} + +impl CreateBufferTask { + /// Returns initialization [`Instruction`] + pub fn instruction(&self, authority: &Pubkey) -> Instruction { + // // SAFETY: as object_length internally uses only already allocated or static buffers, + // // and we don't use any fs writers, so the only error that may occur here is of kind + // // OutOfMemory or WriteZero. This is impossible due to: + // // Chunks::new panics if its size exceeds MAX_ACCOUNT_ALLOC_PER_INSTRUCTION_SIZE or 10_240 + // // https://github.com/near/borsh-rs/blob/f1b75a6b50740bfb6231b7d0b1bd93ea58ca5452/borsh/src/ser/helpers.rs#L59 + let chunks_account_size = + borsh::object_length(&self.chunks).unwrap() as u64; + let buffer_account_size = self.state_or_diff.len() as u64; + + let (instruction, _, _) = create_init_ix(CreateInitIxArgs { + authority: *authority, + pubkey: self.pubkey, + chunks_account_size, + buffer_account_size, + commit_id: self.commit_id, + chunk_count: self.chunks.count(), + chunk_size: self.chunks.chunk_size(), + }); + + instruction + } + + /// Returns compute units required for realloc instruction + pub fn init_compute_units(&self) -> u32 { + 12_000 + } + + /// Returns realloc instruction required for Buffer preparation + #[allow(clippy::let_and_return)] + pub fn realloc_instructions(&self, authority: &Pubkey) -> Vec { + let buffer_account_size = self.state_or_diff.len() as u64; + let realloc_instructions = + create_realloc_buffer_ixs(CreateReallocBufferIxArgs { + authority: *authority, + pubkey: self.pubkey, + buffer_account_size, + commit_id: self.commit_id, + }); + + realloc_instructions + } + + /// Returns compute units required for realloc instruction + pub fn realloc_compute_units(&self) -> u32 { + 6_000 + } + + /// Returns realloc instruction required for Buffer preparation + #[allow(clippy::let_and_return)] + pub fn write_instructions(&self, authority: &Pubkey) -> Vec { + let chunks_iter = + ChangesetChunks::new(&self.chunks, self.chunks.chunk_size()) + .iter(&self.state_or_diff); + let write_instructions = chunks_iter + .map(|chunk| { + create_write_ix(CreateWriteIxArgs { + authority: *authority, + pubkey: self.pubkey, + offset: chunk.offset, + data_chunk: chunk.data_chunk, + commit_id: self.commit_id, + }) + }) + .collect::>(); + + write_instructions + } + + pub fn write_compute_units(&self, bytes_count: usize) -> u32 { + const PER_BYTE: u32 = 3; + + u32::try_from(bytes_count) + .ok() + .and_then(|bytes_count| bytes_count.checked_mul(PER_BYTE)) + .unwrap_or(u32::MAX) + } + + pub fn chunks_pda(&self, authority: &Pubkey) -> Pubkey { + pdas::chunks_pda( + authority, + &self.pubkey, + self.commit_id.to_le_bytes().as_slice(), + ) + .0 + } + + pub fn buffer_pda(&self, authority: &Pubkey) -> Pubkey { + pdas::buffer_pda( + authority, + &self.pubkey, + self.commit_id.to_le_bytes().as_slice(), + ) + .0 + } + + pub fn cleanup_task(&self) -> DestroyTask { + DestroyTask { + pubkey: self.pubkey, + commit_id: self.commit_id, + } + } +} + +#[derive(Clone, Debug)] +pub struct DestroyTask { + pub pubkey: Pubkey, + pub commit_id: u64, +} + +impl DestroyTask { + pub fn instruction(&self, authority: &Pubkey) -> Instruction { + create_close_ix(CreateCloseIxArgs { + authority: *authority, + pubkey: self.pubkey, + commit_id: self.commit_id, + }) + } + + /// Returns compute units required to execute [`CleanupTask`] + pub fn compute_units(&self) -> u32 { + 30_000 + } + + /// Returns a number of [`CleanupTask`]s that is possible to fit in single + pub const fn max_tx_fit_count_with_budget() -> usize { + 8 + } + + pub fn chunks_pda(&self, authority: &Pubkey) -> Pubkey { + pdas::chunks_pda( + authority, + &self.pubkey, + self.commit_id.to_le_bytes().as_slice(), + ) + .0 + } + + pub fn buffer_pda(&self, authority: &Pubkey) -> Pubkey { + pdas::buffer_pda( + authority, + &self.pubkey, + self.commit_id.to_le_bytes().as_slice(), + ) + .0 + } +} diff --git a/magicblock-committor-service/src/tasks/buffer_task.rs b/magicblock-committor-service/src/tasks/buffer_task.rs deleted file mode 100644 index 19c59b7ac..000000000 --- a/magicblock-committor-service/src/tasks/buffer_task.rs +++ /dev/null @@ -1,216 +0,0 @@ -use dlp::{args::CommitStateFromBufferArgs, compute_diff}; -use magicblock_committor_program::Chunks; -use magicblock_metrics::metrics::LabelValue; -use solana_instruction::Instruction; -use solana_pubkey::Pubkey; - -#[cfg(any(test, feature = "dev-context-only-utils"))] -use super::args_task::ArgsTaskType; -#[cfg(test)] -use crate::tasks::TaskStrategy; -use crate::{ - consts::MAX_WRITE_CHUNK_SIZE, - tasks::{ - visitor::Visitor, BaseTask, BaseTaskError, BaseTaskResult, - CommitDiffTask, CommitTask, PreparationState, PreparationTask, - TaskType, - }, -}; - -/// Tasks that could be executed using buffers -#[derive(Clone)] -pub enum BufferTaskType { - Commit(CommitTask), - CommitDiff(CommitDiffTask), - // Action in the future -} - -#[derive(Clone)] -pub struct BufferTask { - preparation_state: PreparationState, - pub task_type: BufferTaskType, -} - -impl BufferTask { - pub fn new_preparation_required(task_type: BufferTaskType) -> Self { - Self { - preparation_state: Self::preparation_required(&task_type), - task_type, - } - } - - pub fn new( - preparation_state: PreparationState, - task_type: BufferTaskType, - ) -> Self { - Self { - preparation_state, - task_type, - } - } - - fn preparation_required(task_type: &BufferTaskType) -> PreparationState { - match task_type { - BufferTaskType::Commit(task) => { - let data = task.committed_account.account.data.clone(); - let chunks = - Chunks::from_data_length(data.len(), MAX_WRITE_CHUNK_SIZE); - - PreparationState::Required(PreparationTask { - commit_id: task.commit_id, - pubkey: task.committed_account.pubkey, - committed_data: data, - chunks, - }) - } - - BufferTaskType::CommitDiff(task) => { - let diff = compute_diff( - &task.base_account.data, - &task.committed_account.account.data, - ) - .to_vec(); - let chunks = - Chunks::from_data_length(diff.len(), MAX_WRITE_CHUNK_SIZE); - - PreparationState::Required(PreparationTask { - commit_id: task.commit_id, - pubkey: task.committed_account.pubkey, - committed_data: diff, - chunks, - }) - } - } - } -} - -#[cfg(any(test, feature = "dev-context-only-utils"))] -impl From for BufferTaskType { - fn from(value: ArgsTaskType) -> Self { - match value { - ArgsTaskType::Commit(task) => BufferTaskType::Commit(task), - ArgsTaskType::CommitDiff(task) => BufferTaskType::CommitDiff(task), - _ => unimplemented!( - "Only commit task can be BufferTask currently. Fix your tests" - ), - } - } -} - -impl BaseTask for BufferTask { - fn instruction(&self, validator: &Pubkey) -> Instruction { - match &self.task_type { - BufferTaskType::Commit(task) => { - let commit_id_slice = task.commit_id.to_le_bytes(); - let (commit_buffer_pubkey, _) = - magicblock_committor_program::pdas::buffer_pda( - validator, - &task.committed_account.pubkey, - &commit_id_slice, - ); - - dlp::instruction_builder::commit_state_from_buffer( - *validator, - task.committed_account.pubkey, - task.committed_account.account.owner, - commit_buffer_pubkey, - CommitStateFromBufferArgs { - nonce: task.commit_id, - lamports: task.committed_account.account.lamports, - allow_undelegation: task.allow_undelegation, - }, - ) - } - BufferTaskType::CommitDiff(task) => { - let commit_id_slice = task.commit_id.to_le_bytes(); - let (commit_buffer_pubkey, _) = - magicblock_committor_program::pdas::buffer_pda( - validator, - &task.committed_account.pubkey, - &commit_id_slice, - ); - - dlp::instruction_builder::commit_diff_from_buffer( - *validator, - task.committed_account.pubkey, - task.committed_account.account.owner, - commit_buffer_pubkey, - CommitStateFromBufferArgs { - nonce: task.commit_id, - lamports: task.committed_account.account.lamports, - allow_undelegation: task.allow_undelegation, - }, - ) - } - } - } - - /// No further optimizations - fn try_optimize_tx_size( - self: Box, - ) -> Result, Box> { - Err(self) - } - - fn preparation_state(&self) -> &PreparationState { - &self.preparation_state - } - - fn switch_preparation_state( - &mut self, - new_state: PreparationState, - ) -> BaseTaskResult<()> { - if matches!(new_state, PreparationState::NotNeeded) { - Err(BaseTaskError::PreparationStateTransitionError) - } else { - self.preparation_state = new_state; - Ok(()) - } - } - - fn compute_units(&self) -> u32 { - match self.task_type { - BufferTaskType::Commit(_) => 70_000, - BufferTaskType::CommitDiff(_) => 70_000, - } - } - - #[cfg(test)] - fn strategy(&self) -> TaskStrategy { - TaskStrategy::Buffer - } - - fn task_type(&self) -> TaskType { - match self.task_type { - BufferTaskType::Commit(_) => TaskType::Commit, - BufferTaskType::CommitDiff(_) => TaskType::Commit, - } - } - - /// For tasks using Args strategy call corresponding `Visitor` method - fn visit(&self, visitor: &mut dyn Visitor) { - visitor.visit_buffer_task(self); - } - - fn reset_commit_id(&mut self, commit_id: u64) { - match &mut self.task_type { - BufferTaskType::Commit(task) => { - task.commit_id = commit_id; - } - BufferTaskType::CommitDiff(task) => { - task.commit_id = commit_id; - } - }; - - self.preparation_state = Self::preparation_required(&self.task_type) - } -} - -impl LabelValue for BufferTask { - fn value(&self) -> &str { - match self.task_type { - BufferTaskType::Commit(_) => "buffer_commit", - BufferTaskType::CommitDiff(_) => "buffer_commit_diff", - } - } -} diff --git a/magicblock-committor-service/src/tasks/commit_task.rs b/magicblock-committor-service/src/tasks/commit_task.rs new file mode 100644 index 000000000..4548c0dcc --- /dev/null +++ b/magicblock-committor-service/src/tasks/commit_task.rs @@ -0,0 +1,305 @@ +use dlp::{ + args::{CommitDiffArgs, CommitStateArgs, CommitStateFromBufferArgs}, + compute_diff, +}; +use magicblock_program::magic_scheduled_base_intent::CommittedAccount; +use solana_account::{Account, ReadableAccount}; +use solana_instruction::Instruction; +use solana_pubkey::Pubkey; + +use super::{BufferLifecycle, DataDelivery, DeliveryStrategy, TaskInstruction}; + +#[derive(Debug, Clone)] +pub enum DataDeliveryStrategy { + StateInArgs, + StateInBuffer { + lifecycle: BufferLifecycle, + }, + DiffInArgs { + base_account: Account, + }, + DiffInBuffer { + base_account: Account, + lifecycle: BufferLifecycle, + }, +} + +// CommitTask owns both "what to commit" (committed_account) and "how to commit" (strategy). +#[derive(Debug, Clone)] +pub struct CommitTask { + pub commit_id: u64, + pub allow_undelegation: bool, + pub committed_account: CommittedAccount, + pub delivery: DataDeliveryStrategy, +} + +impl TaskInstruction for CommitTask { + fn instruction(&self, validator: &Pubkey) -> Instruction { + match &self.delivery { + DataDeliveryStrategy::StateInArgs => { + self.create_commit_state_ix(validator) + } + DataDeliveryStrategy::StateInBuffer { lifecycle: _ } => { + self.create_commit_state_from_buffer_ix(validator) + } + DataDeliveryStrategy::DiffInArgs { base_account } => { + self.create_commit_diff_ix(validator, base_account) + } + DataDeliveryStrategy::DiffInBuffer { + base_account: _, + lifecycle: _, + } => self.create_commit_diff_from_buffer_ix(validator), + } + } +} + +impl CommitTask { + // Accounts larger than COMMIT_STATE_SIZE_THRESHOLD, use CommitDiff to + // reduce instruction size. Below this, commit is sent as CommitState. + // Chose 256 as thresold seems good enough as it could hold 8 u32 fields + // or 4 u64 fields! + pub const COMMIT_STATE_SIZE_THRESHOLD: usize = 256; + + // Max Solana transaction size (binary, not base64). + // See: runtime transaction packet limit (~1232 bytes). + pub const MAX_TX_SIZE: usize = 1232; + + pub fn new( + commit_id: u64, + allow_undelegation: bool, + committed_account: CommittedAccount, + base_account: Option, + ) -> CommitTask { + let base_account = if committed_account.account.data.len() + > Self::COMMIT_STATE_SIZE_THRESHOLD + { + base_account + } else { + None + }; + + let delivery = match base_account { + Some(base_account) => { + let diff = compute_diff( + &base_account.data, + &committed_account.account.data, + ) + .len(); + + if diff < Self::MAX_TX_SIZE { + DataDeliveryStrategy::DiffInArgs { base_account } + } else { + let lifecycle = BufferLifecycle::new( + commit_id, + &committed_account, + Some(&base_account), + ); + DataDeliveryStrategy::DiffInBuffer { + base_account, + lifecycle, + } + } + } + None => DataDeliveryStrategy::StateInArgs, + }; + + CommitTask { + commit_id, + allow_undelegation, + committed_account, + delivery, + } + } + + pub fn lifecycle(&self) -> Option<&BufferLifecycle> { + match &self.delivery { + DataDeliveryStrategy::StateInArgs => None, + DataDeliveryStrategy::StateInBuffer { lifecycle } => { + Some(lifecycle) + } + DataDeliveryStrategy::DiffInArgs { base_account: _ } => None, + DataDeliveryStrategy::DiffInBuffer { + lifecycle, + base_account: _, + } => Some(lifecycle), + } + } + + pub fn task_strategy(&self) -> DeliveryStrategy { + match &self.delivery { + DataDeliveryStrategy::StateInArgs => DeliveryStrategy::Args, + DataDeliveryStrategy::StateInBuffer { .. } => { + DeliveryStrategy::Buffer + } + DataDeliveryStrategy::DiffInArgs { base_account: _ } => { + DeliveryStrategy::Args + } + DataDeliveryStrategy::DiffInBuffer { .. } => { + DeliveryStrategy::Buffer + } + } + } + + pub fn data_delivery(&self) -> DataDelivery { + match &self.delivery { + DataDeliveryStrategy::StateInArgs => DataDelivery::StateInArgs, + DataDeliveryStrategy::StateInBuffer { .. } => { + DataDelivery::StateInBuffer + } + DataDeliveryStrategy::DiffInArgs { base_account: _ } => { + DataDelivery::DiffInArgs + } + DataDeliveryStrategy::DiffInBuffer { .. } => { + DataDelivery::DiffInBuffer + } + } + } + + pub fn reset_commit_id(&mut self, commit_id: u64) { + if self.commit_id == commit_id { + return; + } + + self.commit_id = commit_id; + let lifecycle = match &mut self.delivery { + DataDeliveryStrategy::StateInArgs => None, + DataDeliveryStrategy::StateInBuffer { lifecycle } => { + Some(lifecycle) + } + DataDeliveryStrategy::DiffInArgs { base_account: _ } => None, + DataDeliveryStrategy::DiffInBuffer { + base_account: _, + lifecycle, + } => Some(lifecycle), + }; + + if let Some(lifecycle) = lifecycle { + lifecycle.preparation.commit_id = commit_id; + lifecycle.cleanup.commit_id = commit_id; + } + } + + fn create_commit_state_ix(&self, validator: &Pubkey) -> Instruction { + let args = CommitStateArgs { + nonce: self.commit_id, + lamports: self.committed_account.account.lamports, + data: self.committed_account.account.data.clone(), + allow_undelegation: self.allow_undelegation, + }; + dlp::instruction_builder::commit_state( + *validator, + self.committed_account.pubkey, + self.committed_account.account.owner, + args, + ) + } + + fn create_commit_diff_ix( + &self, + validator: &Pubkey, + base_account: &Account, + ) -> Instruction { + let args = CommitDiffArgs { + nonce: self.commit_id, + lamports: self.committed_account.account.lamports, + diff: compute_diff( + base_account.data(), + self.committed_account.account.data(), + ) + .to_vec(), + allow_undelegation: self.allow_undelegation, + }; + + dlp::instruction_builder::commit_diff( + *validator, + self.committed_account.pubkey, + self.committed_account.account.owner, + args, + ) + } + + fn create_commit_state_from_buffer_ix( + &self, + validator: &Pubkey, + ) -> Instruction { + let commit_id_slice = self.commit_id.to_le_bytes(); + let (commit_buffer_pubkey, _) = + magicblock_committor_program::pdas::buffer_pda( + validator, + &self.committed_account.pubkey, + &commit_id_slice, + ); + + dlp::instruction_builder::commit_state_from_buffer( + *validator, + self.committed_account.pubkey, + self.committed_account.account.owner, + commit_buffer_pubkey, + CommitStateFromBufferArgs { + nonce: self.commit_id, + lamports: self.committed_account.account.lamports, + allow_undelegation: self.allow_undelegation, + }, + ) + } + + fn create_commit_diff_from_buffer_ix( + &self, + validator: &Pubkey, + ) -> Instruction { + let commit_id_slice = self.commit_id.to_le_bytes(); + let (commit_buffer_pubkey, _) = + magicblock_committor_program::pdas::buffer_pda( + validator, + &self.committed_account.pubkey, + &commit_id_slice, + ); + + dlp::instruction_builder::commit_diff_from_buffer( + *validator, + self.committed_account.pubkey, + self.committed_account.account.owner, + commit_buffer_pubkey, + CommitStateFromBufferArgs { + nonce: self.commit_id, + lamports: self.committed_account.account.lamports, + allow_undelegation: self.allow_undelegation, + }, + ) + } + + /// + /// In order to reduce the transition size, this function + /// flips *_InArgs to *_InBuffer and attach a LifecycleTask. + /// + #[allow(clippy::result_large_err)] + pub fn try_optimize_tx_size(mut self) -> Result { + // The only way to optimize for tx size is to use buffer strategy. + // If the task is already using buffer strategy, then it cannot optimize further. + match self.delivery { + DataDeliveryStrategy::StateInArgs => { + self.delivery = DataDeliveryStrategy::StateInBuffer { + lifecycle: BufferLifecycle::new( + self.commit_id, + &self.committed_account, + None, + ), + }; + Ok(self) + } + DataDeliveryStrategy::StateInBuffer { .. } => Err(self), + DataDeliveryStrategy::DiffInArgs { base_account } => { + self.delivery = DataDeliveryStrategy::DiffInBuffer { + lifecycle: BufferLifecycle::new( + self.commit_id, + &self.committed_account, + Some(&base_account), + ), + base_account, + }; + Ok(self) + } + DataDeliveryStrategy::DiffInBuffer { .. } => Err(self), + } + } +} diff --git a/magicblock-committor-service/src/tasks/mod.rs b/magicblock-committor-service/src/tasks/mod.rs index d8b4a178e..68599bd4a 100644 --- a/magicblock-committor-service/src/tasks/mod.rs +++ b/magicblock-committor-service/src/tasks/mod.rs @@ -1,35 +1,35 @@ -use dyn_clone::DynClone; -use magicblock_committor_program::{ - instruction_builder::{ - close_buffer::{create_close_ix, CreateCloseIxArgs}, - init_buffer::{create_init_ix, CreateInitIxArgs}, - realloc_buffer::{ - create_realloc_buffer_ixs, CreateReallocBufferIxArgs, - }, - write_buffer::{create_write_ix, CreateWriteIxArgs}, - }, - pdas, ChangesetChunks, Chunks, -}; -use magicblock_metrics::metrics::LabelValue; -use magicblock_program::magic_scheduled_base_intent::{ - BaseAction, CommittedAccount, -}; -use solana_account::Account; -use solana_instruction::Instruction; +use std::fmt::Debug; + +use dlp::args::CallHandlerArgs; +use magicblock_program::magic_scheduled_base_intent::BaseAction; +use solana_instruction::{AccountMeta, Instruction}; use solana_pubkey::Pubkey; use thiserror::Error; -use crate::tasks::visitor::Visitor; - -pub mod args_task; -pub mod buffer_task; pub mod task_builder; pub mod task_strategist; pub(crate) mod task_visitors; pub mod utils; pub mod visitor; -pub use task_builder::TaskBuilderImpl; +mod buffer_lifecycle; +mod commit_task; +mod task; + +pub use buffer_lifecycle::*; +pub use commit_task::*; +pub use task::*; +// +// TODO (snawaz): Ideally, TaskType should not exist. +// Instead we should have Task, an enum with all its variants. +// +// Also, instead of TaskStrategy, we can have requires_buffer() -> bool? +// + +/// The only requirement for a type to become a task is to implement TaskInstruction. +pub trait TaskInstruction { + fn instruction(&self, validator: &Pubkey) -> Instruction; +} #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum TaskType { @@ -42,278 +42,101 @@ pub enum TaskType { #[derive(Clone, Debug)] pub enum PreparationState { NotNeeded, - Required(PreparationTask), - Cleanup(CleanupTask), + Required(CreateBufferTask), + Cleanup(DestroyTask), } #[derive(Copy, Clone, PartialEq, Eq, Debug)] -pub enum TaskStrategy { +pub enum DeliveryStrategy { Args, Buffer, } -/// A trait representing a task that can be executed on Base layer -pub trait BaseTask: Send + Sync + DynClone + LabelValue { - /// Gets all pubkeys that involved in Task's instruction - fn involved_accounts(&self, validator: &Pubkey) -> Vec { - self.instruction(validator) - .accounts - .iter() - .map(|meta| meta.pubkey) - .collect() - } - - /// Gets instruction for task execution - fn instruction(&self, validator: &Pubkey) -> Instruction; - - /// Optimize for transaction size so that more instructions can be buddled together in a single - /// transaction. Return Ok(new_tx_optimized_task), else Err(self) if task cannot be optimized. - fn try_optimize_tx_size( - self: Box, - ) -> Result, Box>; - - /// Returns [`PreparationTask`] if task needs to be prepared before executing, - /// otherwise returns None - fn preparation_state(&self) -> &PreparationState; - - /// Switched [`PreparationTask`] to a new one - fn switch_preparation_state( - &mut self, - new_state: PreparationState, - ) -> BaseTaskResult<()>; - - /// Returns [`Task`] budget - fn compute_units(&self) -> u32; - - /// Returns current [`TaskStrategy`] - #[cfg(test)] - fn strategy(&self) -> TaskStrategy; - - /// Returns [`TaskType`] - fn task_type(&self) -> TaskType; - - /// Calls [`Visitor`] with specific task type - fn visit(&self, visitor: &mut dyn Visitor); - - /// Resets commit id - fn reset_commit_id(&mut self, commit_id: u64); -} - -dyn_clone::clone_trait_object!(BaseTask); - -#[derive(Clone)] -pub struct CommitTask { - pub commit_id: u64, - pub allow_undelegation: bool, - pub committed_account: CommittedAccount, -} - -#[derive(Clone, Debug)] -pub struct CommitDiffTask { - pub commit_id: u64, - pub allow_undelegation: bool, - pub committed_account: CommittedAccount, - pub base_account: Account, +#[derive(Copy, Clone, PartialEq, Eq, Debug)] +pub enum DataDelivery { + StateInArgs, + StateInBuffer, + DiffInArgs, + DiffInBuffer, } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct UndelegateTask { pub delegated_account: Pubkey, pub owner_program: Pubkey, pub rent_reimbursement: Pubkey, } -#[derive(Clone)] -pub struct FinalizeTask { - pub delegated_account: Pubkey, -} - -#[derive(Clone)] -pub struct BaseActionTask { - pub action: BaseAction, -} - -#[derive(Clone, Debug)] -pub struct PreparationTask { - pub commit_id: u64, - pub pubkey: Pubkey, - pub chunks: Chunks, - - // TODO(edwin): replace with reference once done - pub committed_data: Vec, -} - -impl PreparationTask { - /// Returns initialization [`Instruction`] - pub fn init_instruction(&self, authority: &Pubkey) -> Instruction { - // // SAFETY: as object_length internally uses only already allocated or static buffers, - // // and we don't use any fs writers, so the only error that may occur here is of kind - // // OutOfMemory or WriteZero. This is impossible due to: - // // Chunks::new panics if its size exceeds MAX_ACCOUNT_ALLOC_PER_INSTRUCTION_SIZE or 10_240 - // // https://github.com/near/borsh-rs/blob/f1b75a6b50740bfb6231b7d0b1bd93ea58ca5452/borsh/src/ser/helpers.rs#L59 - let chunks_account_size = - borsh::object_length(&self.chunks).unwrap() as u64; - let buffer_account_size = self.committed_data.len() as u64; - - let (instruction, _, _) = create_init_ix(CreateInitIxArgs { - authority: *authority, - pubkey: self.pubkey, - chunks_account_size, - buffer_account_size, - commit_id: self.commit_id, - chunk_count: self.chunks.count(), - chunk_size: self.chunks.chunk_size(), - }); - - instruction - } - - /// Returns compute units required for realloc instruction - pub fn init_compute_units(&self) -> u32 { - 12_000 - } - - /// Returns realloc instruction required for Buffer preparation - #[allow(clippy::let_and_return)] - pub fn realloc_instructions(&self, authority: &Pubkey) -> Vec { - let buffer_account_size = self.committed_data.len() as u64; - let realloc_instructions = - create_realloc_buffer_ixs(CreateReallocBufferIxArgs { - authority: *authority, - pubkey: self.pubkey, - buffer_account_size, - commit_id: self.commit_id, - }); - - realloc_instructions - } - - /// Returns compute units required for realloc instruction - pub fn realloc_compute_units(&self) -> u32 { - 6_000 - } - - /// Returns realloc instruction required for Buffer preparation - #[allow(clippy::let_and_return)] - pub fn write_instructions(&self, authority: &Pubkey) -> Vec { - let chunks_iter = - ChangesetChunks::new(&self.chunks, self.chunks.chunk_size()) - .iter(&self.committed_data); - let write_instructions = chunks_iter - .map(|chunk| { - create_write_ix(CreateWriteIxArgs { - authority: *authority, - pubkey: self.pubkey, - offset: chunk.offset, - data_chunk: chunk.data_chunk, - commit_id: self.commit_id, - }) - }) - .collect::>(); - - write_instructions - } - - pub fn write_compute_units(&self, bytes_count: usize) -> u32 { - const PER_BYTE: u32 = 3; - - u32::try_from(bytes_count) - .ok() - .and_then(|bytes_count| bytes_count.checked_mul(PER_BYTE)) - .unwrap_or(u32::MAX) - } - - pub fn chunks_pda(&self, authority: &Pubkey) -> Pubkey { - pdas::chunks_pda( - authority, - &self.pubkey, - self.commit_id.to_le_bytes().as_slice(), +impl TaskInstruction for UndelegateTask { + fn instruction(&self, validator: &Pubkey) -> Instruction { + dlp::instruction_builder::undelegate( + *validator, + self.delegated_account, + self.owner_program, + self.rent_reimbursement, ) - .0 - } - - pub fn buffer_pda(&self, authority: &Pubkey) -> Pubkey { - pdas::buffer_pda( - authority, - &self.pubkey, - self.commit_id.to_le_bytes().as_slice(), - ) - .0 - } - - pub fn cleanup_task(&self) -> CleanupTask { - CleanupTask { - pubkey: self.pubkey, - commit_id: self.commit_id, - } } } -#[derive(Clone, Debug)] -pub struct CleanupTask { - pub pubkey: Pubkey, - pub commit_id: u64, +#[derive(Debug, Clone)] +pub struct FinalizeTask { + pub delegated_account: Pubkey, } -impl CleanupTask { - pub fn instruction(&self, authority: &Pubkey) -> Instruction { - create_close_ix(CreateCloseIxArgs { - authority: *authority, - pubkey: self.pubkey, - commit_id: self.commit_id, - }) - } - - /// Returns compute units required to execute [`CleanupTask`] - pub fn compute_units(&self) -> u32 { - 30_000 - } - - /// Returns a number of [`CleanupTask`]s that is possible to fit in single - pub const fn max_tx_fit_count_with_budget() -> usize { - 8 +impl TaskInstruction for FinalizeTask { + fn instruction(&self, validator: &Pubkey) -> Instruction { + dlp::instruction_builder::finalize(*validator, self.delegated_account) } +} - pub fn chunks_pda(&self, authority: &Pubkey) -> Pubkey { - pdas::chunks_pda( - authority, - &self.pubkey, - self.commit_id.to_le_bytes().as_slice(), - ) - .0 - } +#[derive(Debug, Clone)] +pub struct BaseActionTask { + pub action: BaseAction, +} - pub fn buffer_pda(&self, authority: &Pubkey) -> Pubkey { - pdas::buffer_pda( - authority, - &self.pubkey, - self.commit_id.to_le_bytes().as_slice(), +impl TaskInstruction for BaseActionTask { + fn instruction(&self, validator: &Pubkey) -> Instruction { + let action = &self.action; + let account_metas = action + .account_metas_per_program + .iter() + .map(|short_meta| AccountMeta { + pubkey: short_meta.pubkey, + is_writable: short_meta.is_writable, + is_signer: false, + }) + .collect(); + dlp::instruction_builder::call_handler( + *validator, + action.destination_program, + action.escrow_authority, + account_metas, + CallHandlerArgs { + data: action.data_per_program.data.clone(), + escrow_index: action.data_per_program.escrow_index, + }, ) - .0 } } #[derive(Error, Debug)] -pub enum BaseTaskError { +pub enum TaskError { #[error("Invalid preparation state transition")] PreparationStateTransitionError, } -pub type BaseTaskResult = Result; +pub type TaskResult = Result; #[cfg(test)] mod serialization_safety_test { use magicblock_program::{ - args::ShortAccountMeta, magic_scheduled_base_intent::ProgramArgs, + args::ShortAccountMeta, + magic_scheduled_base_intent::{CommittedAccount, ProgramArgs}, }; use solana_account::Account; - use crate::tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - buffer_task::{BufferTask, BufferTaskType}, - *, - }; + use crate::tasks::{Task, *}; // Test all ArgsTask variants #[test] @@ -321,10 +144,10 @@ mod serialization_safety_test { let validator = Pubkey::new_unique(); // Test Commit variant - let commit_task: ArgsTask = ArgsTaskType::Commit(CommitTask { - commit_id: 123, - allow_undelegation: true, - committed_account: CommittedAccount { + let commit_task = Task::Commit(CommitTask::new( + 123, + true, + CommittedAccount { pubkey: Pubkey::new_unique(), account: Account { lamports: 1000, @@ -334,29 +157,26 @@ mod serialization_safety_test { rent_epoch: 0, }, }, - }) - .into(); + None, + )); assert_serializable(&commit_task.instruction(&validator)); // Test Finalize variant - let finalize_task = - ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { - delegated_account: Pubkey::new_unique(), - })); + let finalize_task = Task::Finalize(FinalizeTask { + delegated_account: Pubkey::new_unique(), + }); assert_serializable(&finalize_task.instruction(&validator)); // Test Undelegate variant - let undelegate_task: ArgsTask = - ArgsTaskType::Undelegate(UndelegateTask { - delegated_account: Pubkey::new_unique(), - owner_program: Pubkey::new_unique(), - rent_reimbursement: Pubkey::new_unique(), - }) - .into(); + let undelegate_task = Task::Undelegate(UndelegateTask { + delegated_account: Pubkey::new_unique(), + owner_program: Pubkey::new_unique(), + rent_reimbursement: Pubkey::new_unique(), + }); assert_serializable(&undelegate_task.instruction(&validator)); // Test BaseAction variant - let base_action: ArgsTask = ArgsTaskType::BaseAction(BaseActionTask { + let base_action = Task::BaseAction(BaseActionTask { action: BaseAction { destination_program: Pubkey::new_unique(), escrow_authority: Pubkey::new_unique(), @@ -370,8 +190,7 @@ mod serialization_safety_test { }, compute_units: 10_000, }, - }) - .into(); + }); assert_serializable(&base_action.instruction(&validator)); } @@ -380,23 +199,22 @@ mod serialization_safety_test { fn test_buffer_task_instruction_serialization() { let validator = Pubkey::new_unique(); - let buffer_task = BufferTask::new_preparation_required( - BufferTaskType::Commit(CommitTask { - commit_id: 456, - allow_undelegation: false, - committed_account: CommittedAccount { - pubkey: Pubkey::new_unique(), - account: Account { - lamports: 2000, - data: vec![7, 8, 9], - owner: Pubkey::new_unique(), - executable: false, - rent_epoch: 0, - }, + let task = Task::Commit(CommitTask::new( + 456, + false, + CommittedAccount { + pubkey: Pubkey::new_unique(), + account: Account { + lamports: 2000, + data: vec![7, 8, 9], + owner: Pubkey::new_unique(), + executable: false, + rent_epoch: 0, }, - }), - ); - assert_serializable(&buffer_task.instruction(&validator)); + }, + None, + )); + assert_serializable(&task.instruction(&validator)); } // Test preparation instructions @@ -404,30 +222,33 @@ mod serialization_safety_test { fn test_preparation_instructions_serialization() { let authority = Pubkey::new_unique(); - // Test BufferTask preparation - let buffer_task = BufferTask::new_preparation_required( - BufferTaskType::Commit(CommitTask { - commit_id: 789, - allow_undelegation: true, - committed_account: CommittedAccount { - pubkey: Pubkey::new_unique(), - account: Account { - lamports: 3000, - data: vec![0; 1024], // Larger data to test chunking - owner: Pubkey::new_unique(), - executable: false, - rent_epoch: 0, - }, + // Test buffer strategy preparation + let task = Task::Commit(CommitTask::new( + 789, + true, + CommittedAccount { + pubkey: Pubkey::new_unique(), + account: Account { + lamports: 3000, + data: vec![0; 1024], // Larger data to test chunking + owner: Pubkey::new_unique(), + executable: false, + rent_epoch: 0, }, - }), - ); + }, + None, + )); - let PreparationState::Required(preparation_task) = - buffer_task.preparation_state() - else { - panic!("invalid preparation state on creation!"); - }; - assert_serializable(&preparation_task.init_instruction(&authority)); + assert_eq!(task.strategy(), DeliveryStrategy::Args); + + let task = task.try_optimize_tx_size().unwrap(); + + assert_eq!(task.strategy(), DeliveryStrategy::Buffer); + + let lifecycle = task.lifecycle().unwrap(); + let preparation_task = &lifecycle.preparation; + + assert_serializable(&preparation_task.instruction(&authority)); for ix in preparation_task.realloc_instructions(&authority) { assert_serializable(&ix); } @@ -465,8 +286,8 @@ fn test_close_buffer_limit() { // Each task unique: commit_id increments; pubkey is new_unique each time let base_commit_id = 101u64; - let ixs_iter = (0..CleanupTask::max_tx_fit_count_with_budget()).map(|i| { - let task = CleanupTask { + let ixs_iter = (0..DestroyTask::max_tx_fit_count_with_budget()).map(|i| { + let task = DestroyTask { commit_id: base_commit_id + i as u64, pubkey: Pubkey::new_unique(), }; @@ -485,9 +306,9 @@ fn test_close_buffer_limit() { ); // One more unique task should overflow - let overflow_task = CleanupTask { + let overflow_task = DestroyTask { commit_id: base_commit_id - + CleanupTask::max_tx_fit_count_with_budget() as u64, + + DestroyTask::max_tx_fit_count_with_budget() as u64, pubkey: Pubkey::new_unique(), }; ixs.push(overflow_task.instruction(&authority.pubkey())); diff --git a/magicblock-committor-service/src/tasks/task.rs b/magicblock-committor-service/src/tasks/task.rs new file mode 100644 index 000000000..ada7a8ab8 --- /dev/null +++ b/magicblock-committor-service/src/tasks/task.rs @@ -0,0 +1,138 @@ +use magicblock_metrics::metrics::LabelValue; +use solana_instruction::Instruction; +use solana_pubkey::Pubkey; + +use super::{BufferLifecycle, TaskInstruction, TaskResult}; +use crate::tasks::{ + visitor::Visitor, BaseActionTask, CommitTask, DeliveryStrategy, + FinalizeTask, PreparationState, TaskError, TaskType, UndelegateTask, +}; + +/// Task to be executed on the Base layer. +#[derive(Debug, Clone)] +#[allow(clippy::large_enum_variant)] +pub enum Task { + Commit(CommitTask), + Finalize(FinalizeTask), + Undelegate(UndelegateTask), // Special action really + BaseAction(BaseActionTask), +} + +impl TaskInstruction for Task { + fn instruction(&self, validator: &Pubkey) -> Instruction { + match &self { + Task::Commit(value) => value.instruction(validator), + Task::Finalize(value) => value.instruction(validator), + Task::Undelegate(value) => value.instruction(validator), + Task::BaseAction(value) => value.instruction(validator), + } + } +} +impl Task { + pub fn involved_accounts(&self, validator: &Pubkey) -> Vec { + // TODO (snawaz): rewrite it. + // currently it is slow as it discards heavy computations and memory allocations. + self.instruction(validator) + .accounts + .iter() + .map(|meta| meta.pubkey) + .collect() + } + + #[allow(clippy::result_large_err)] + pub fn try_optimize_tx_size(self) -> Result { + match self { + Task::Commit(value) => value + .try_optimize_tx_size() + .map(Task::Commit) + .map_err(Task::Commit), + Task::BaseAction(_) | Task::Finalize(_) | Task::Undelegate(_) => { + Err(self) + } + } + } + + /// Nothing to prepare for [`ArgsTaskType`] type + pub fn preparation_state(&self) -> &PreparationState { + todo!() + } + + pub fn switch_preparation_state( + &mut self, + new_state: PreparationState, + ) -> TaskResult<()> { + if !matches!(new_state, PreparationState::NotNeeded) { + Err(TaskError::PreparationStateTransitionError) + } else { + // Do nothing + Ok(()) + } + } + + pub fn lifecycle(&self) -> Option<&BufferLifecycle> { + match &self { + Task::Commit(commit) => commit.lifecycle(), + Task::BaseAction(_) => None, + Task::Undelegate(_) => None, + Task::Finalize(_) => None, + } + } + + pub fn compute_units(&self) -> u32 { + match &self { + Task::Commit(_) => 70_000, + Task::BaseAction(task) => task.action.compute_units, + Task::Undelegate(_) => 70_000, + Task::Finalize(_) => 70_000, + } + } + + pub fn strategy(&self) -> DeliveryStrategy { + match &self { + Task::Commit(commit) => commit.task_strategy(), + Task::BaseAction(_) => DeliveryStrategy::Args, + Task::Undelegate(_) => DeliveryStrategy::Args, + Task::Finalize(_) => DeliveryStrategy::Args, + } + } + + pub fn task_type(&self) -> TaskType { + match &self { + Task::Commit(_) => TaskType::Commit, + Task::BaseAction(_) => TaskType::Action, + Task::Undelegate(_) => TaskType::Undelegate, + Task::Finalize(_) => TaskType::Finalize, + } + } + + /// For tasks using Args strategy call corresponding `Visitor` method + pub fn visit(&self, visitor: &mut dyn Visitor) { + visitor.visit_task(self); + } + + pub fn reset_commit_id(&mut self, commit_id: u64) { + let Task::Commit(commit_task) = self else { + return; + }; + commit_task.reset_commit_id(commit_id); + } +} + +impl LabelValue for Task { + fn value(&self) -> &str { + use Task::*; + + use super::DataDeliveryStrategy::*; + match self { + Commit(commit) => match commit.delivery { + StateInArgs => "state_args_commit", + StateInBuffer { .. } => "state_buffer_commit", + DiffInArgs { .. } => "diff_args_commit", + DiffInBuffer { .. } => "diff_buffer_commit", + }, + Finalize(_) => "finalize", + Undelegate(_) => "undelegate", + BaseAction(_) => "base_action", + } + } +} diff --git a/magicblock-committor-service/src/tasks/task_builder.rs b/magicblock-committor-service/src/tasks/task_builder.rs index 7a9787fe2..20eb950d2 100644 --- a/magicblock-committor-service/src/tasks/task_builder.rs +++ b/magicblock-committor-service/src/tasks/task_builder.rs @@ -6,99 +6,52 @@ use magicblock_program::magic_scheduled_base_intent::{ CommitType, CommittedAccount, MagicBaseIntent, ScheduledBaseIntent, UndelegateType, }; -use solana_account::Account; use solana_pubkey::Pubkey; use solana_signature::Signature; -use super::{CommitDiffTask, CommitTask}; +use super::CommitTask; use crate::{ intent_executor::task_info_fetcher::{ TaskInfoFetcher, TaskInfoFetcherError, }, persist::IntentPersister, - tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - BaseActionTask, BaseTask, FinalizeTask, UndelegateTask, - }, + tasks::{BaseActionTask, FinalizeTask, Task, UndelegateTask}, }; #[async_trait] pub trait TasksBuilder { // Creates tasks for commit stage - async fn commit_tasks( + async fn create_commit_tasks( commit_id_fetcher: &Arc, base_intent: &ScheduledBaseIntent, persister: &Option

, - ) -> TaskBuilderResult>>; + ) -> TaskBuilderResult>; // Create tasks for finalize stage async fn finalize_tasks( info_fetcher: &Arc, base_intent: &ScheduledBaseIntent, - ) -> TaskBuilderResult>>; + ) -> TaskBuilderResult>; } /// V1 Task builder /// V1: Actions are part of finalize tx pub struct TaskBuilderImpl; -// Accounts larger than COMMIT_STATE_SIZE_THRESHOLD use CommitDiff to -// reduce instruction size. Below this threshold, the commit is sent -// as CommitState. The value (256) is chosen because it is sufficient -// for small accounts, which typically could hold up to 8 u32 fields or -// 4 u64 fields. These integers are expected to be on the hot path -// and updated continuously. -pub const COMMIT_STATE_SIZE_THRESHOLD: usize = 256; - -impl TaskBuilderImpl { - pub fn create_commit_task( - commit_id: u64, - allow_undelegation: bool, - account: CommittedAccount, - base_account: Option, - ) -> ArgsTask { - let base_account = - if account.account.data.len() > COMMIT_STATE_SIZE_THRESHOLD { - base_account - } else { - None - }; - - if let Some(base_account) = base_account { - ArgsTaskType::CommitDiff(CommitDiffTask { - commit_id, - allow_undelegation, - committed_account: account, - base_account, - }) - } else { - ArgsTaskType::Commit(CommitTask { - commit_id, - allow_undelegation, - committed_account: account, - }) - } - .into() - } -} - #[async_trait] impl TasksBuilder for TaskBuilderImpl { /// Returns [`Task`]s for Commit stage - async fn commit_tasks( - commit_id_fetcher: &Arc, + async fn create_commit_tasks( + task_info_fetcher: &Arc, base_intent: &ScheduledBaseIntent, persister: &Option

, - ) -> TaskBuilderResult>> { + ) -> TaskBuilderResult> { let (accounts, allow_undelegation) = match &base_intent.base_intent { MagicBaseIntent::BaseActions(actions) => { let tasks = actions .iter() .map(|el| { - let task = BaseActionTask { action: el.clone() }; - let task = - ArgsTask::new(ArgsTaskType::BaseAction(task)); - Box::new(task) as Box + Task::BaseAction(BaseActionTask { action: el.clone() }) }) .collect(); @@ -119,14 +72,15 @@ impl TasksBuilder for TaskBuilderImpl { let diffable_pubkeys = accounts .iter() .filter(|account| { - account.account.data.len() > COMMIT_STATE_SIZE_THRESHOLD + account.account.data.len() + > CommitTask::COMMIT_STATE_SIZE_THRESHOLD }) .map(|account| account.pubkey) .collect::>(); tokio::join!( - commit_id_fetcher.fetch_next_commit_ids(&committed_pubkeys), - commit_id_fetcher + task_info_fetcher.fetch_next_commit_ids(&committed_pubkeys), + task_info_fetcher .get_base_accounts(diffable_pubkeys.as_slice()) ) }; @@ -155,12 +109,14 @@ impl TasksBuilder for TaskBuilderImpl { .iter() .map(|account| { let commit_id = *commit_ids.get(&account.pubkey).expect("CommitIdFetcher provide commit ids for all listed pubkeys, or errors!"); - // TODO (snawaz): if accounts do not have duplicate, then we can use remove - // instead: - // let base_account = base_accounts.remove(&account.pubkey); - let base_account = base_accounts.get(&account.pubkey).cloned(); - let task = Self::create_commit_task(commit_id, allow_undelegation, account.clone(), base_account); - Box::new(task) as Box + let base_account = base_accounts.get(&account.pubkey).cloned(); + + Task::Commit(CommitTask::new( + commit_id, + allow_undelegation, + account.clone(), + base_account, + )) }).collect(); Ok(tasks) @@ -170,30 +126,28 @@ impl TasksBuilder for TaskBuilderImpl { async fn finalize_tasks( info_fetcher: &Arc, base_intent: &ScheduledBaseIntent, - ) -> TaskBuilderResult>> { + ) -> TaskBuilderResult> { // Helper to create a finalize task - fn finalize_task(account: &CommittedAccount) -> Box { - let task_type = ArgsTaskType::Finalize(FinalizeTask { + fn finalize_task(account: &CommittedAccount) -> Task { + Task::Finalize(FinalizeTask { delegated_account: account.pubkey, - }); - Box::new(ArgsTask::new(task_type)) + }) } // Helper to create an undelegate task fn undelegate_task( account: &CommittedAccount, rent_reimbursement: &Pubkey, - ) -> Box { - let task_type = ArgsTaskType::Undelegate(UndelegateTask { + ) -> Task { + Task::Undelegate(UndelegateTask { delegated_account: account.pubkey, owner_program: account.account.owner, rent_reimbursement: *rent_reimbursement, - }); - Box::new(ArgsTask::new(task_type)) + }) } // Helper to process commit types - fn process_commit(commit: &CommitType) -> Vec> { + fn process_commit(commit: &CommitType) -> Vec { match commit { CommitType::Standalone(accounts) => { accounts.iter().map(finalize_task).collect() @@ -207,12 +161,9 @@ impl TasksBuilder for TaskBuilderImpl { .map(finalize_task) .collect::>(); tasks.extend(base_actions.iter().map(|action| { - let task = BaseActionTask { + Task::BaseAction(BaseActionTask { action: action.clone(), - }; - let task = - ArgsTask::new(ArgsTaskType::BaseAction(task)); - Box::new(task) as Box + }) })); tasks } @@ -246,12 +197,9 @@ impl TasksBuilder for TaskBuilderImpl { UndelegateType::Standalone => Ok(tasks), UndelegateType::WithBaseActions(actions) => { tasks.extend(actions.iter().map(|action| { - let task = BaseActionTask { + Task::BaseAction(BaseActionTask { action: action.clone(), - }; - let task = - ArgsTask::new(ArgsTaskType::BaseAction(task)); - Box::new(task) as Box + }) })); Ok(tasks) diff --git a/magicblock-committor-service/src/tasks/task_strategist.rs b/magicblock-committor-service/src/tasks/task_strategist.rs index 33bbd29e4..297738f1c 100644 --- a/magicblock-committor-service/src/tasks/task_strategist.rs +++ b/magicblock-committor-service/src/tasks/task_strategist.rs @@ -4,21 +4,21 @@ use solana_keypair::Keypair; use solana_pubkey::Pubkey; use solana_signer::{Signer, SignerError}; +use super::TaskInstruction; use crate::{ persist::IntentPersister, tasks::{ - args_task::{ArgsTask, ArgsTaskType}, task_visitors::persistor_visitor::{ PersistorContext, PersistorVisitor, }, utils::TransactionUtils, - BaseTask, FinalizeTask, + FinalizeTask, Task, }, transactions::{serialize_and_encode_base64, MAX_ENCODED_TRANSACTION_SIZE}, }; pub struct TransactionStrategy { - pub optimized_tasks: Vec>, + pub optimized_tasks: Vec, pub lookup_tables_keys: Vec, } @@ -65,16 +65,16 @@ impl StrategyExecutionMode { } } -/// Takes [`BaseTask`]s and chooses the best way to fit them in TX +/// Takes [`Task`]s and chooses the best way to fit them in TX /// It may change Task execution strategy so all task would fit in tx pub struct TaskStrategist; impl TaskStrategist { - /// Builds execution strategy from [`BaseTask`]s + /// Builds execution strategy from [`Task`]s /// 1. Optimizes tasks to fit in TX /// 2. Chooses the fastest execution mode for Tasks pub fn build_execution_strategy( - commit_tasks: Vec>, - finalize_tasks: Vec>, + commit_tasks: Vec, + finalize_tasks: Vec, authority: &Pubkey, persister: &Option

, ) -> TaskStrategistResult { @@ -141,8 +141,8 @@ impl TaskStrategist { } fn build_two_stage( - commit_tasks: Vec>, - finalize_tasks: Vec>, + commit_tasks: Vec, + finalize_tasks: Vec, authority: &Pubkey, persister: &Option

, ) -> TaskStrategistResult { @@ -166,7 +166,7 @@ impl TaskStrategist { /// Returns [`TransactionStrategy`] for tasks /// Returns Error if all optimizations weren't enough pub fn build_strategy( - mut tasks: Vec>, + mut tasks: Vec, validator: &Pubkey, persistor: &Option

, ) -> TaskStrategistResult { @@ -221,7 +221,7 @@ impl TaskStrategist { /// Attempt to use ALTs for ALL keys in tx /// Returns `true` if ALTs make tx fit, otherwise `false` /// TODO(edwin): optimize to use only necessary amount of pubkeys - pub fn attempt_lookup_tables(tasks: &[Box]) -> bool { + pub fn attempt_lookup_tables(tasks: &[Task]) -> bool { let placeholder = Keypair::new(); // Gather all involved keys in tx let budgets = TransactionUtils::tasks_compute_units(tasks); @@ -256,7 +256,7 @@ impl TaskStrategist { pub fn collect_lookup_table_keys( authority: &Pubkey, - tasks: &[Box], + tasks: &[Task], ) -> Vec { let budgets = TransactionUtils::tasks_compute_units(tasks); let budget_instructions = @@ -274,10 +274,10 @@ impl TaskStrategist { /// Note that the returned size, though possibly optimized one, may still not be under /// the limit MAX_ENCODED_TRANSACTION_SIZE. The caller needs to check and make decision accordingly. fn try_optimize_tx_size_if_needed( - tasks: &mut [Box], + tasks: &mut [Task], ) -> Result { // Get initial transaction size - let calculate_tx_length = |tasks: &[Box]| { + let calculate_tx_length = |tasks: &[Task]| { match TransactionUtils::assemble_tasks_tx( &Keypair::new(), // placeholder tasks, @@ -321,11 +321,9 @@ impl TaskStrategist { let task = { // This is tmp task that will be replaced by old or optimized one - let tmp_task = - ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { - delegated_account: Pubkey::new_unique(), - })); - let tmp_task = Box::new(tmp_task) as Box; + let tmp_task = Task::Finalize(FinalizeTask { + delegated_account: Pubkey::new_unique(), + }); std::mem::replace(&mut tasks[index], tmp_task) }; match task.try_optimize_tx_size() { @@ -378,7 +376,6 @@ mod tests { }; use solana_account::Account; use solana_program::system_program; - use solana_pubkey::Pubkey; use solana_system_program::id as system_program_id; use super::*; @@ -389,10 +386,8 @@ mod tests { }, persist::IntentPersisterImpl, tasks::{ - task_builder::{ - TaskBuilderImpl, TasksBuilder, COMMIT_STATE_SIZE_THRESHOLD, - }, - BaseActionTask, TaskStrategy, UndelegateTask, + task_builder::{TaskBuilderImpl, TasksBuilder}, + BaseActionTask, CommitTask, DeliveryStrategy, Task, UndelegateTask, }, }; @@ -433,7 +428,7 @@ mod tests { commit_id: u64, data_size: usize, diff_len: usize, - ) -> ArgsTask { + ) -> Task { let committed_account = CommittedAccount { pubkey: Pubkey::new_unique(), account: Account { @@ -446,12 +441,12 @@ mod tests { }; if diff_len == 0 { - TaskBuilderImpl::create_commit_task( + Task::Commit(CommitTask::new( commit_id, false, committed_account, None, - ) + )) } else { let base_account = { let mut acc = committed_account.account.clone(); @@ -461,18 +456,18 @@ mod tests { } acc }; - TaskBuilderImpl::create_commit_task( + Task::Commit(CommitTask::new( commit_id, false, committed_account, Some(base_account), - ) + )) } } // Helper to create a Base action task - fn create_test_base_action_task(len: usize) -> ArgsTask { - ArgsTask::new(ArgsTaskType::BaseAction(BaseActionTask { + fn create_test_base_action_task(len: usize) -> Task { + Task::BaseAction(BaseActionTask { action: BaseAction { destination_program: Pubkey::new_unique(), escrow_authority: Pubkey::new_unique(), @@ -483,30 +478,29 @@ mod tests { }, compute_units: 30_000, }, - })) + }) } // Helper to create a finalize task - fn create_test_finalize_task() -> ArgsTask { - ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { + fn create_test_finalize_task() -> Task { + Task::Finalize(FinalizeTask { delegated_account: Pubkey::new_unique(), - })) + }) } // Helper to create an undelegate task - fn create_test_undelegate_task() -> ArgsTask { - ArgsTask::new(ArgsTaskType::Undelegate(UndelegateTask { + fn create_test_undelegate_task() -> Task { + Task::Undelegate(UndelegateTask { delegated_account: Pubkey::new_unique(), owner_program: system_program_id(), rent_reimbursement: Pubkey::new_unique(), - })) + }) } #[test] fn test_build_strategy_with_single_small_task() { let validator = Pubkey::new_unique(); - let task = create_test_commit_task(1, 100, 0); - let tasks = vec![Box::new(task) as Box]; + let tasks = vec![create_test_commit_task(1, 100, 0)]; let strategy = TaskStrategist::build_strategy( tasks, @@ -523,8 +517,7 @@ mod tests { fn test_build_strategy_optimizes_to_buffer_when_needed() { let validator = Pubkey::new_unique(); - let task = create_test_commit_task(1, 1000, 0); // Large task - let tasks = vec![Box::new(task) as Box]; + let tasks = vec![create_test_commit_task(1, 1000, 0)]; // Large task let strategy = TaskStrategist::build_strategy( tasks, @@ -536,7 +529,7 @@ mod tests { assert_eq!(strategy.optimized_tasks.len(), 1); assert!(matches!( strategy.optimized_tasks[0].strategy(), - TaskStrategy::Buffer + DeliveryStrategy::Buffer )); } @@ -544,8 +537,7 @@ mod tests { fn test_build_strategy_optimizes_to_buffer_u16_exceeded() { let validator = Pubkey::new_unique(); - let task = create_test_commit_task(1, 66_000, 0); // Large task - let tasks = vec![Box::new(task) as Box]; + let tasks = vec![create_test_commit_task(1, 66_000, 0)]; // Large task let strategy = TaskStrategist::build_strategy( tasks, @@ -557,7 +549,7 @@ mod tests { assert_eq!(strategy.optimized_tasks.len(), 1); assert!(matches!( strategy.optimized_tasks[0].strategy(), - TaskStrategy::Buffer + DeliveryStrategy::Buffer )); } @@ -565,9 +557,11 @@ mod tests { fn test_build_strategy_does_not_optimize_large_account_but_small_diff() { let validator = Pubkey::new_unique(); - let task = - create_test_commit_task(1, 66_000, COMMIT_STATE_SIZE_THRESHOLD); // large account but small diff - let tasks = vec![Box::new(task) as Box]; + let tasks = vec![create_test_commit_task( + 1, + 66_000, + CommitTask::COMMIT_STATE_SIZE_THRESHOLD, + )]; // large account but small diff let strategy = TaskStrategist::build_strategy( tasks, @@ -577,7 +571,10 @@ mod tests { .expect("Should build strategy with buffer optimization"); assert_eq!(strategy.optimized_tasks.len(), 1); - assert_eq!(strategy.optimized_tasks[0].strategy(), TaskStrategy::Args); + assert_eq!( + strategy.optimized_tasks[0].strategy(), + DeliveryStrategy::Args + ); } #[test] @@ -585,9 +582,11 @@ mod tests { ) { let validator = Pubkey::new_unique(); - let task = - create_test_commit_task(1, 66_000, COMMIT_STATE_SIZE_THRESHOLD + 1); // large account but small diff - let tasks = vec![Box::new(task) as Box]; + let tasks = vec![create_test_commit_task( + 1, + 66_000, + CommitTask::COMMIT_STATE_SIZE_THRESHOLD + 1, + )]; // large account but small diff let strategy = TaskStrategist::build_strategy( tasks, @@ -597,16 +596,21 @@ mod tests { .expect("Should build strategy with buffer optimization"); assert_eq!(strategy.optimized_tasks.len(), 1); - assert_eq!(strategy.optimized_tasks[0].strategy(), TaskStrategy::Args); + assert_eq!( + strategy.optimized_tasks[0].strategy(), + DeliveryStrategy::Args + ); } #[test] fn test_build_strategy_does_optimize_large_account_and_large_diff() { let validator = Pubkey::new_unique(); - let task = - create_test_commit_task(1, 66_000, COMMIT_STATE_SIZE_THRESHOLD * 4); // large account but small diff - let tasks = vec![Box::new(task) as Box]; + let tasks = vec![create_test_commit_task( + 1, + 66_000, + CommitTask::COMMIT_STATE_SIZE_THRESHOLD * 4, + )]; // large account but small diff let strategy = TaskStrategist::build_strategy( tasks, @@ -618,7 +622,7 @@ mod tests { assert_eq!(strategy.optimized_tasks.len(), 1); assert_eq!( strategy.optimized_tasks[0].strategy(), - TaskStrategy::Buffer + DeliveryStrategy::Buffer ); } @@ -631,8 +635,7 @@ mod tests { let tasks = (0..NUM_COMMITS) .map(|i| { - let task = create_test_commit_task(i, 500, 0); // Large task - Box::new(task) as Box + create_test_commit_task(i, 500, 0) // Large task }) .collect(); @@ -644,7 +647,10 @@ mod tests { .expect("Should build strategy with buffer optimization"); for optimized_task in strategy.optimized_tasks { - assert!(matches!(optimized_task.strategy(), TaskStrategy::Buffer)); + assert!(matches!( + optimized_task.strategy(), + DeliveryStrategy::Buffer + )); } assert!(strategy.lookup_tables_keys.is_empty()); } @@ -659,8 +665,7 @@ mod tests { let tasks = (0..NUM_COMMITS) .map(|i| { // Large task - let task = create_test_commit_task(i, 10000, 0); - Box::new(task) as Box + create_test_commit_task(i, 10000, 0) }) .collect(); @@ -672,7 +677,10 @@ mod tests { .expect("Should build strategy with buffer optimization"); for optimized_task in strategy.optimized_tasks { - assert!(matches!(optimized_task.strategy(), TaskStrategy::Buffer)); + assert!(matches!( + optimized_task.strategy(), + DeliveryStrategy::Buffer + )); } assert!(!strategy.lookup_tables_keys.is_empty()); } @@ -686,8 +694,7 @@ mod tests { let tasks = (0..NUM_COMMITS) .map(|i| { // Large task - let task = create_test_commit_task(i, 1000, 0); - Box::new(task) as Box + create_test_commit_task(i, 1000, 0) }) .collect(); @@ -702,25 +709,25 @@ mod tests { #[test] fn test_optimize_strategy_prioritizes_largest_tasks() { let mut tasks = [ - Box::new(create_test_commit_task(1, 100, 0)) as Box, - Box::new(create_test_commit_task(2, 1000, 0)) as Box, // Larger task - Box::new(create_test_commit_task(3, 1000, 0)) as Box, // Larger task + create_test_commit_task(1, 100, 0), + create_test_commit_task(2, 1000, 0), // Larger task + create_test_commit_task(3, 1000, 0), // Larger task ]; let _ = TaskStrategist::try_optimize_tx_size_if_needed(&mut tasks); // The larger task should have been optimized first - assert!(matches!(tasks[0].strategy(), TaskStrategy::Args)); - assert!(matches!(tasks[1].strategy(), TaskStrategy::Buffer)); + assert!(matches!(tasks[0].strategy(), DeliveryStrategy::Args)); + assert!(matches!(tasks[1].strategy(), DeliveryStrategy::Buffer)); } #[test] fn test_mixed_task_types_with_optimization() { let validator = Pubkey::new_unique(); let tasks = vec![ - Box::new(create_test_commit_task(1, 1000, 0)) as Box, - Box::new(create_test_finalize_task()) as Box, - Box::new(create_test_base_action_task(500)) as Box, - Box::new(create_test_undelegate_task()) as Box, + create_test_commit_task(1, 1000, 0), + create_test_finalize_task(), + create_test_base_action_task(500), + create_test_undelegate_task(), ]; let strategy = TaskStrategist::build_strategy( @@ -732,7 +739,7 @@ mod tests { assert_eq!(strategy.optimized_tasks.len(), 4); - let strategies: Vec = strategy + let strategies: Vec = strategy .optimized_tasks .iter() .map(|t| t.strategy()) @@ -741,10 +748,10 @@ mod tests { assert_eq!( strategies, vec![ - TaskStrategy::Buffer, // Commit task optimized - TaskStrategy::Args, // Finalize stays - TaskStrategy::Args, // BaseAction stays - TaskStrategy::Args, // Undelegate stays + DeliveryStrategy::Buffer, // Commit task optimized + DeliveryStrategy::Args, // Finalize stays + DeliveryStrategy::Args, // BaseAction stays + DeliveryStrategy::Args, // Undelegate stays ] ); // This means that couldn't squeeze task optimization @@ -759,7 +766,7 @@ mod tests { let intent = create_test_intent(0, &pubkey, false); let info_fetcher = Arc::new(MockInfoFetcher); - let commit_task = TaskBuilderImpl::commit_tasks( + let commit_task = TaskBuilderImpl::create_commit_tasks( &info_fetcher, &intent, &None::, @@ -791,7 +798,7 @@ mod tests { let intent = create_test_intent(0, &pubkeys, true); let info_fetcher = Arc::new(MockInfoFetcher); - let commit_task = TaskBuilderImpl::commit_tasks( + let commit_task = TaskBuilderImpl::create_commit_tasks( &info_fetcher, &intent, &None::, @@ -828,7 +835,7 @@ mod tests { let intent = create_test_intent(0, &pubkeys, false); let info_fetcher = Arc::new(MockInfoFetcher); - let commit_task = TaskBuilderImpl::commit_tasks( + let commit_task = TaskBuilderImpl::create_commit_tasks( &info_fetcher, &intent, &None::, diff --git a/magicblock-committor-service/src/tasks/task_visitors/persistor_visitor.rs b/magicblock-committor-service/src/tasks/task_visitors/persistor_visitor.rs index 2f216c869..4417d3d1b 100644 --- a/magicblock-committor-service/src/tasks/task_visitors/persistor_visitor.rs +++ b/magicblock-committor-service/src/tasks/task_visitors/persistor_visitor.rs @@ -2,11 +2,7 @@ use log::error; use crate::{ persist::{CommitStrategy, IntentPersister}, - tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - buffer_task::{BufferTask, BufferTaskType}, - visitor::Visitor, - }, + tasks::{visitor::Visitor, DataDelivery, Task}, }; pub enum PersistorContext { @@ -23,87 +19,46 @@ impl

Visitor for PersistorVisitor<'_, P> where P: IntentPersister, { - fn visit_args_task(&mut self, task: &ArgsTask) { - match self.context { - PersistorContext::PersistStrategy { uses_lookup_tables } => { - let commit_strategy = |is_diff: bool| { - if is_diff { + fn visit_task(&mut self, task: &Task) { + let PersistorContext::PersistStrategy { uses_lookup_tables } = + self.context; + + match &task { + Task::Commit(task) => { + let commit_strategy = match task.data_delivery() { + DataDelivery::StateInArgs => { + if uses_lookup_tables { + CommitStrategy::StateArgsWithLookupTable + } else { + CommitStrategy::StateArgs + } + } + DataDelivery::StateInBuffer => { + if uses_lookup_tables { + CommitStrategy::StateBufferWithLookupTable + } else { + CommitStrategy::StateBuffer + } + } + DataDelivery::DiffInArgs => { if uses_lookup_tables { CommitStrategy::DiffArgsWithLookupTable } else { CommitStrategy::DiffArgs } - } else if uses_lookup_tables { - CommitStrategy::StateArgsWithLookupTable - } else { - CommitStrategy::StateArgs } - }; - - let (commit_id, pubkey, commit_strategy) = match &task.task_type - { - ArgsTaskType::Commit(task) => ( - task.commit_id, - &task.committed_account.pubkey, - commit_strategy(false), - ), - ArgsTaskType::CommitDiff(task) => ( - task.commit_id, - &task.committed_account.pubkey, - commit_strategy(true), - ), - _ => return, - }; - - if let Err(err) = self.persistor.set_commit_strategy( - commit_id, - pubkey, - commit_strategy, - ) { - error!( - "Failed to persist commit strategy {}: {}", - commit_strategy.as_str(), - err - ); - } - } - } - } - - fn visit_buffer_task(&mut self, task: &BufferTask) { - match self.context { - PersistorContext::PersistStrategy { uses_lookup_tables } => { - let commit_strategy = |is_diff: bool| { - if is_diff { + DataDelivery::DiffInBuffer => { if uses_lookup_tables { CommitStrategy::DiffBufferWithLookupTable } else { CommitStrategy::DiffBuffer } - } else if uses_lookup_tables { - CommitStrategy::StateBufferWithLookupTable - } else { - CommitStrategy::StateBuffer } }; - let (commit_id, pubkey, commit_strategy) = match &task.task_type - { - BufferTaskType::Commit(task) => ( - task.commit_id, - &task.committed_account.pubkey, - commit_strategy(false), - ), - BufferTaskType::CommitDiff(task) => ( - task.commit_id, - &task.committed_account.pubkey, - commit_strategy(true), - ), - }; - if let Err(err) = self.persistor.set_commit_strategy( - commit_id, - pubkey, + task.commit_id, + &task.committed_account.pubkey, commit_strategy, ) { error!( @@ -113,6 +68,7 @@ where ); } } - } + Task::Finalize(_) | Task::Undelegate(_) | Task::BaseAction(_) => {} + }; } } diff --git a/magicblock-committor-service/src/tasks/task_visitors/utility_visitor.rs b/magicblock-committor-service/src/tasks/task_visitors/utility_visitor.rs index 470e25341..f4d1c31cc 100644 --- a/magicblock-committor-service/src/tasks/task_visitors/utility_visitor.rs +++ b/magicblock-committor-service/src/tasks/task_visitors/utility_visitor.rs @@ -1,10 +1,6 @@ use solana_pubkey::Pubkey; -use crate::tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - buffer_task::{BufferTask, BufferTaskType}, - visitor::Visitor, -}; +use crate::tasks::{visitor::Visitor, Task}; pub struct CommitMeta { pub committed_pubkey: Pubkey, @@ -16,42 +12,17 @@ pub enum TaskVisitorUtils { } impl Visitor for TaskVisitorUtils { - fn visit_args_task(&mut self, task: &ArgsTask) { + fn visit_task(&mut self, task: &Task) { let Self::GetCommitMeta(commit_meta) = self; - match &task.task_type { - ArgsTaskType::Commit(task) => { - *commit_meta = Some(CommitMeta { - committed_pubkey: task.committed_account.pubkey, - commit_id: task.commit_id, - }) - } - ArgsTaskType::CommitDiff(task) => { - *commit_meta = Some(CommitMeta { - committed_pubkey: task.committed_account.pubkey, - commit_id: task.commit_id, - }) - } - _ => *commit_meta = None, - } - } - - fn visit_buffer_task(&mut self, task: &BufferTask) { - let Self::GetCommitMeta(commit_meta) = self; + if let Task::Commit(ref commit_task) = task { + *commit_meta = Some(CommitMeta { + committed_pubkey: commit_task.committed_account.pubkey, - match &task.task_type { - BufferTaskType::Commit(task) => { - *commit_meta = Some(CommitMeta { - committed_pubkey: task.committed_account.pubkey, - commit_id: task.commit_id, - }) - } - BufferTaskType::CommitDiff(task) => { - *commit_meta = Some(CommitMeta { - committed_pubkey: task.committed_account.pubkey, - commit_id: task.commit_id, - }) - } + commit_id: commit_task.commit_id, + }) + } else { + *commit_meta = None } } } diff --git a/magicblock-committor-service/src/tasks/utils.rs b/magicblock-committor-service/src/tasks/utils.rs index 0846a08fc..4a861a0c6 100644 --- a/magicblock-committor-service/src/tasks/utils.rs +++ b/magicblock-committor-service/src/tasks/utils.rs @@ -11,7 +11,8 @@ use solana_pubkey::Pubkey; use solana_signer::Signer; use solana_transaction::versioned::VersionedTransaction; -use crate::tasks::{task_strategist::TaskStrategistResult, BaseTask}; +use super::TaskInstruction; +use crate::tasks::{task_strategist::TaskStrategistResult, Task}; pub struct TransactionUtils; impl TransactionUtils { @@ -28,7 +29,7 @@ impl TransactionUtils { } pub fn unique_involved_pubkeys( - tasks: &[Box], + tasks: &[Task], validator: &Pubkey, budget_instructions: &[Instruction], ) -> Vec { @@ -49,7 +50,7 @@ impl TransactionUtils { pub fn tasks_instructions( validator: &Pubkey, - tasks: &[Box], + tasks: &[Task], ) -> Vec { tasks .iter() @@ -59,7 +60,7 @@ impl TransactionUtils { pub fn assemble_tasks_tx( authority: &Keypair, - tasks: &[Box], + tasks: &[Task], compute_unit_price: u64, lookup_tables: &[AddressLookupTableAccount], ) -> TaskStrategistResult { @@ -123,8 +124,8 @@ impl TransactionUtils { Ok(tx) } - pub fn tasks_compute_units(tasks: &[impl AsRef]) -> u32 { - tasks.iter().map(|task| task.as_ref().compute_units()).sum() + pub fn tasks_compute_units(tasks: &[Task]) -> u32 { + tasks.iter().map(|task| task.compute_units()).sum() } pub fn budget_instructions( diff --git a/magicblock-committor-service/src/tasks/visitor.rs b/magicblock-committor-service/src/tasks/visitor.rs index 1b9940a09..171b984f0 100644 --- a/magicblock-committor-service/src/tasks/visitor.rs +++ b/magicblock-committor-service/src/tasks/visitor.rs @@ -1,6 +1,5 @@ -use crate::tasks::{args_task::ArgsTask, buffer_task::BufferTask}; +use crate::tasks::Task; pub trait Visitor { - fn visit_args_task(&mut self, task: &ArgsTask); - fn visit_buffer_task(&mut self, task: &BufferTask); + fn visit_task(&mut self, task: &Task); } diff --git a/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs b/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs index 4b927af36..08c7aaeb1 100644 --- a/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs +++ b/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs @@ -30,8 +30,12 @@ use solana_transaction_error::TransactionError; use crate::{ persist::{CommitStatus, IntentPersister}, tasks::{ - task_strategist::TransactionStrategy, BaseTask, BaseTaskError, - CleanupTask, PreparationState, PreparationTask, + task_strategist::TransactionStrategy, + CreateBufferTask, + DestroyTask, + // PreparationState, + Task, + TaskError, }, utils::persist_status_update, ComputeBudgetConfig, @@ -67,7 +71,7 @@ impl DeliveryPreparator { strategy.optimized_tasks.iter_mut().map(|task| async move { let _timer = metrics::observe_committor_intent_task_preparation_time( - task.as_ref(), + task, ); self.prepare_task_handling_errors(authority, task, persister) .await @@ -95,15 +99,20 @@ impl DeliveryPreparator { pub async fn prepare_task( &self, authority: &Keypair, - task: &mut dyn BaseTask, + task: &mut Task, persister: &Option

, ) -> DeliveryPreparatorResult<(), InternalError> { - let PreparationState::Required(preparation_task) = - task.preparation_state() - else { + // let PreparationState::Required(preparation_task) = + // task.preparation_state() + // else { + // return Ok(()); + // }; + let Some(lifecycle) = task.lifecycle() else { return Ok(()); }; + let preparation_task = &lifecycle.preparation; + // Persist as failed until rewritten let update_status = CommitStatus::BufferAndChunkPartiallyInitialized; persist_status_update( @@ -138,8 +147,8 @@ impl DeliveryPreparator { update_status, ); - let cleanup_task = preparation_task.cleanup_task(); - task.switch_preparation_state(PreparationState::Cleanup(cleanup_task))?; + //let cleanup_task = preparation_task.cleanup_task(); + //task.switch_preparation_state(PreparationState::Cleanup(cleanup_task))?; Ok(()) } @@ -148,10 +157,10 @@ impl DeliveryPreparator { pub async fn prepare_task_handling_errors( &self, authority: &Keypair, - task: &mut Box, + task: &mut Task, persister: &Option

, ) -> Result<(), InternalError> { - let res = self.prepare_task(authority, task.as_mut(), persister).await; + let res = self.prepare_task(authority, task, persister).await; match res { Err(InternalError::BufferExecutionError( BufferExecutionError::AccountAlreadyInitializedError( @@ -169,21 +178,30 @@ impl DeliveryPreparator { } // Prepare cleanup task - let PreparationState::Required(preparation_task) = - task.preparation_state().clone() - else { + // let PreparationState::Required(preparation_task) = + // task.preparation_state().clone() + // else { + // return Ok(()); + // }; + // task.switch_preparation_state(PreparationState::Cleanup( + // preparation_task.cleanup_task(), + // ))?; + // self.cleanup(authority, std::slice::from_ref(task), &[]) + // .await?; + // task.switch_preparation_state(PreparationState::Required( + // preparation_task, + // ))?; + + // self.prepare_task(authority, task, persister).await + + let Some(_lifecycle) = task.lifecycle() else { return Ok(()); }; - task.switch_preparation_state(PreparationState::Cleanup( - preparation_task.cleanup_task(), - ))?; + self.cleanup(authority, std::slice::from_ref(task), &[]) .await?; - task.switch_preparation_state(PreparationState::Required( - preparation_task, - ))?; - self.prepare_task(authority, task.as_mut(), persister).await + self.prepare_task(authority, task, persister).await } /// Initializes buffer account for future writes @@ -191,11 +209,10 @@ impl DeliveryPreparator { async fn initialize_buffer_account( &self, authority: &Keypair, - preparation_task: &PreparationTask, + preparation_task: &CreateBufferTask, ) -> DeliveryPreparatorResult<(), BufferExecutionError> { let authority_pubkey = authority.pubkey(); - let init_instruction = - preparation_task.init_instruction(&authority_pubkey); + let init_instruction = preparation_task.instruction(&authority_pubkey); let realloc_instructions = preparation_task.realloc_instructions(&authority_pubkey); @@ -241,7 +258,7 @@ impl DeliveryPreparator { async fn write_buffer_with_retries( &self, authority: &Keypair, - preparation_task: &PreparationTask, + preparation_task: &CreateBufferTask, ) -> DeliveryPreparatorResult<(), InternalError> { let authority_pubkey = authority.pubkey(); let write_instructions = @@ -423,7 +440,7 @@ impl DeliveryPreparator { pub async fn cleanup( &self, authority: &Keypair, - tasks: &[Box], + tasks: &[Task], lookup_table_keys: &[Pubkey], ) -> DeliveryPreparatorResult<(), BufferExecutionError> { self.table_mania @@ -435,13 +452,14 @@ impl DeliveryPreparator { let cleanup_tasks: Vec<_> = tasks .iter() .filter_map(|task| { - if let PreparationState::Cleanup(cleanup_task) = - task.preparation_state() - { - Some(cleanup_task) - } else { - None - } + task.lifecycle().map(|lc| &lc.cleanup) + //if let PreparationState::Cleanup(cleanup_task) = + // task.preparation_state() + //{ + // Some(cleanup_task) + //} else { + // None + //} }) .collect(); @@ -450,7 +468,7 @@ impl DeliveryPreparator { } let close_futs = cleanup_tasks - .chunks(CleanupTask::max_tx_fit_count_with_budget()) + .chunks(DestroyTask::max_tx_fit_count_with_budget()) .map(|cleanup_tasks| { let compute_units = cleanup_tasks[0].compute_units() * cleanup_tasks.len() as u32; @@ -552,8 +570,8 @@ pub enum InternalError { MagicBlockRpcClientError(Box), #[error("BufferExecutionError: {0}")] BufferExecutionError(#[from] BufferExecutionError), - #[error("BaseTaskError: {0}")] - BaseTaskError(#[from] BaseTaskError), + #[error("TaskError: {0}")] + TaskError(#[from] TaskError), } impl From for InternalError { diff --git a/magicblock-committor-service/src/transaction_preparator/mod.rs b/magicblock-committor-service/src/transaction_preparator/mod.rs index 4eef17995..c5a603f7d 100644 --- a/magicblock-committor-service/src/transaction_preparator/mod.rs +++ b/magicblock-committor-service/src/transaction_preparator/mod.rs @@ -8,7 +8,7 @@ use solana_pubkey::Pubkey; use crate::{ persist::IntentPersister, tasks::{ - task_strategist::TransactionStrategy, utils::TransactionUtils, BaseTask, + task_strategist::TransactionStrategy, utils::TransactionUtils, Task, }, transaction_preparator::{ delivery_preparator::{ @@ -25,7 +25,7 @@ pub mod error; #[async_trait] pub trait TransactionPreparator: Send + Sync + 'static { /// Return [`VersionedMessage`] corresponding to [`TransactionStrategy`] - /// Handles all necessary preparation needed for successful [`BaseTask`] execution + /// Handles all necessary preparation needed for successful [`Task`] execution async fn prepare_for_strategy( &self, authority: &Keypair, @@ -37,7 +37,7 @@ pub trait TransactionPreparator: Send + Sync + 'static { async fn cleanup_for_strategy( &self, authority: &Keypair, - tasks: &[Box], + tasks: &[Task], lookup_table_keys: &[Pubkey], ) -> DeliveryPreparatorResult<(), BufferExecutionError>; } @@ -112,7 +112,7 @@ impl TransactionPreparator for TransactionPreparatorImpl { async fn cleanup_for_strategy( &self, authority: &Keypair, - tasks: &[Box], + tasks: &[Task], lookup_table_keys: &[Pubkey], ) -> DeliveryPreparatorResult<(), BufferExecutionError> { self.delivery_preparator diff --git a/test-integration/test-committor-service/tests/common.rs b/test-integration/test-committor-service/tests/common.rs index 2c3e1b0e9..b21451868 100644 --- a/test-integration/test-committor-service/tests/common.rs +++ b/test-integration/test-committor-service/tests/common.rs @@ -174,10 +174,10 @@ pub fn generate_random_bytes(length: usize) -> Vec { #[allow(dead_code)] pub fn create_commit_task(data: &[u8]) -> CommitTask { static COMMIT_ID: AtomicU64 = AtomicU64::new(0); - CommitTask { - commit_id: COMMIT_ID.fetch_add(1, Ordering::Relaxed), - allow_undelegation: false, - committed_account: CommittedAccount { + CommitTask::new( + COMMIT_ID.fetch_add(1, Ordering::Relaxed), + false, + CommittedAccount { pubkey: Pubkey::new_unique(), account: Account { lamports: 1000, @@ -187,7 +187,8 @@ pub fn create_commit_task(data: &[u8]) -> CommitTask { rent_epoch: 0, }, }, - } + None, + ) } #[allow(dead_code)] diff --git a/test-integration/test-committor-service/tests/test_delivery_preparator.rs b/test-integration/test-committor-service/tests/test_delivery_preparator.rs index dded722ee..e7e60e0b7 100644 --- a/test-integration/test-committor-service/tests/test_delivery_preparator.rs +++ b/test-integration/test-committor-service/tests/test_delivery_preparator.rs @@ -1,13 +1,10 @@ use borsh::BorshDeserialize; -use futures::future::join_all; use magicblock_committor_program::Chunks; use magicblock_committor_service::{ persist::IntentPersisterImpl, tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - buffer_task::{BufferTask, BufferTaskType}, task_strategist::{TaskStrategist, TransactionStrategy}, - BaseTask, PreparationState, + PreparationState, Task, }, }; use solana_sdk::signer::Signer; @@ -22,11 +19,9 @@ async fn test_prepare_10kb_buffer() { let preparator = fixture.create_delivery_preparator(); let data = generate_random_bytes(10 * 1024); - let buffer_task = BufferTaskType::Commit(create_commit_task(&data)); + let buffer_task = Task::Commit(create_commit_task(&data)); let mut strategy = TransactionStrategy { - optimized_tasks: vec![Box::new(BufferTask::new_preparation_required( - buffer_task, - ))], + optimized_tasks: vec![buffer_task], lookup_tables_keys: vec![], }; @@ -84,15 +79,14 @@ async fn test_prepare_multiple_buffers() { let datas = [ generate_random_bytes(10 * 1024), - generate_random_bytes(10), + generate_random_bytes(100 * 1024), generate_random_bytes(500 * 1024), ]; - let buffer_tasks = join_all(datas.iter().map(|data| async { - let task = BufferTaskType::Commit(create_commit_task(data.as_slice())); - Box::new(BufferTask::new_preparation_required(task)) - as Box - })) - .await; + let buffer_tasks = datas + .iter() + .map(|data| Task::Commit(create_commit_task(data.as_slice()))) + .collect(); + let mut strategy = TransactionStrategy { optimized_tasks: buffer_tasks, lookup_tables_keys: vec![], @@ -163,11 +157,10 @@ async fn test_lookup_tables() { generate_random_bytes(20), generate_random_bytes(30), ]; - let tasks = join_all(datas.iter().map(|data| async { - let task = ArgsTaskType::Commit(create_commit_task(data.as_slice())); - Box::::new(task.into()) as Box - })) - .await; + let tasks: Vec<_> = datas + .into_iter() + .map(|data| Task::Commit(create_commit_task(data.as_slice()))) + .collect(); let lookup_tables_keys = TaskStrategist::collect_lookup_table_keys( &fixture.authority.pubkey(), @@ -208,11 +201,9 @@ async fn test_already_initialized_error_handled() { let data = generate_random_bytes(10 * 1024); let mut task = create_commit_task(&data); - let buffer_task = BufferTaskType::Commit(task.clone()); + let buffer_task = Task::Commit(task.clone()); let mut strategy = TransactionStrategy { - optimized_tasks: vec![Box::new(BufferTask::new_preparation_required( - buffer_task, - ))], + optimized_tasks: vec![buffer_task], lookup_tables_keys: vec![], }; @@ -247,11 +238,9 @@ async fn test_already_initialized_error_handled() { let data = generate_random_bytes(task.committed_account.account.data.len() - 2); task.committed_account.account.data = data.clone(); - let buffer_task = BufferTaskType::Commit(task); + let buffer_task = Task::Commit(task); let mut strategy = TransactionStrategy { - optimized_tasks: vec![Box::new(BufferTask::new_preparation_required( - buffer_task, - ))], + optimized_tasks: vec![buffer_task], lookup_tables_keys: vec![], }; @@ -303,21 +292,10 @@ async fn test_prepare_cleanup_and_reprepare_mixed_tasks() { let mut strategy = TransactionStrategy { optimized_tasks: vec![ // Args task — shouldn't need buffers - { - let t = ArgsTaskType::Commit(commit_args.clone()); - Box::::new(t.into()) as Box - }, + { Task::Commit(commit_args.clone()) }, // Two buffer tasks - { - let t = BufferTaskType::Commit(commit_a.clone()); - Box::new(BufferTask::new_preparation_required(t)) - as Box - }, - { - let t = BufferTaskType::Commit(commit_b.clone()); - Box::new(BufferTask::new_preparation_required(t)) - as Box - }, + { Task::Commit(commit_a.clone()) }, + { Task::Commit(commit_b.clone()) }, ], lookup_tables_keys: vec![], }; @@ -404,20 +382,9 @@ async fn test_prepare_cleanup_and_reprepare_mixed_tasks() { // --- Step 4: re-prepare with the same logical tasks (same commit IDs, mutated data) --- let mut strategy2 = TransactionStrategy { optimized_tasks: vec![ - { - let t = ArgsTaskType::Commit(commit_args.clone()); - Box::::new(t.into()) as Box - }, - { - let t = BufferTaskType::Commit(commit_a.clone()); - Box::new(BufferTask::new_preparation_required(t)) - as Box - }, - { - let t = BufferTaskType::Commit(commit_b.clone()); - Box::new(BufferTask::new_preparation_required(t)) - as Box - }, + { Task::Commit(commit_args.clone()) }, + { Task::Commit(commit_a.clone()) }, + { Task::Commit(commit_b.clone()) }, ], lookup_tables_keys: vec![], }; diff --git a/test-integration/test-committor-service/tests/test_intent_executor.rs b/test-integration/test-committor-service/tests/test_intent_executor.rs index 912668019..88b75b778 100644 --- a/test-integration/test-committor-service/tests/test_intent_executor.rs +++ b/test-integration/test-committor-service/tests/test_intent_executor.rs @@ -963,7 +963,7 @@ async fn single_flow_transaction_strategy( task_info_fetcher: &Arc, intent: &ScheduledBaseIntent, ) -> TransactionStrategy { - let mut tasks = TaskBuilderImpl::commit_tasks( + let mut tasks = TaskBuilderImpl::create_commit_tasks( task_info_fetcher, intent, &None::, diff --git a/test-integration/test-committor-service/tests/test_transaction_preparator.rs b/test-integration/test-committor-service/tests/test_transaction_preparator.rs index b82aa3aa5..770586794 100644 --- a/test-integration/test-committor-service/tests/test_transaction_preparator.rs +++ b/test-integration/test-committor-service/tests/test_transaction_preparator.rs @@ -3,12 +3,10 @@ use magicblock_committor_program::Chunks; use magicblock_committor_service::{ persist::IntentPersisterImpl, tasks::{ - args_task::{ArgsTask, ArgsTaskType}, - buffer_task::BufferTask, task_strategist::{TaskStrategist, TransactionStrategy}, utils::TransactionUtils, - BaseActionTask, BaseTask, FinalizeTask, PreparationState, - TaskBuilderImpl, UndelegateTask, + BaseActionTask, CommitTask, FinalizeTask, PreparationState, Task, + UndelegateTask, }, transaction_preparator::TransactionPreparator, }; @@ -35,15 +33,10 @@ async fn test_prepare_commit_tx_with_single_account() { let committed_account = create_committed_account(&account_data); let tasks = vec![ - Box::new(TaskBuilderImpl::create_commit_task( - 1, - true, - committed_account.clone(), - None, - )) as Box, - Box::new(ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { + Task::Commit(CommitTask::new(1, true, committed_account.clone(), None)), + Task::Finalize(FinalizeTask { delegated_account: committed_account.pubkey, - }))), + }), ]; let mut tx_strategy = TransactionStrategy { optimized_tasks: tasks, @@ -90,35 +83,31 @@ async fn test_prepare_commit_tx_with_multiple_accounts() { let account2_data = generate_random_bytes(12); let committed_account2 = create_committed_account(&account2_data); - let buffer_commit_task = BufferTask::new_preparation_required( - TaskBuilderImpl::create_commit_task( - 1, - true, - committed_account2.clone(), - None, - ) - .task_type - .into(), - ); + let buffer_commit_task = Task::Commit(CommitTask::new( + 1, + true, + committed_account2.clone(), + None, + )); // Create test data let tasks = vec![ // account 1 - Box::new(TaskBuilderImpl::create_commit_task( + Task::Commit(CommitTask::new( 1, true, committed_account1.clone(), None, - )) as Box, + )), // account 2 - Box::new(buffer_commit_task), + buffer_commit_task, // finalize account 1 - Box::new(ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { + Task::Finalize(FinalizeTask { delegated_account: committed_account1.pubkey, - }))), + }), // finalize account 2 - Box::new(ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { + Task::Finalize(FinalizeTask { delegated_account: committed_account2.pubkey, - }))), + }), ]; let mut tx_strategy = TransactionStrategy { optimized_tasks: tasks, @@ -191,27 +180,19 @@ async fn test_prepare_commit_tx_with_base_actions() { }], }; - let buffer_commit_task = BufferTask::new_preparation_required( - TaskBuilderImpl::create_commit_task( - 1, - true, - committed_account.clone(), - None, - ) - .task_type - .into(), - ); + let buffer_commit_task = + Task::Commit(CommitTask::new(1, true, committed_account.clone(), None)); let tasks = vec![ // commit account - Box::new(buffer_commit_task.clone()) as Box, + buffer_commit_task, // finalize account - Box::new(ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { + Task::Finalize(FinalizeTask { delegated_account: committed_account.pubkey, - }))), + }), // BaseAction - Box::new(ArgsTask::new(ArgsTaskType::BaseAction(BaseActionTask { + Task::BaseAction(BaseActionTask { action: base_action, - }))), + }), ]; // Test preparation @@ -272,17 +253,17 @@ async fn test_prepare_finalize_tx_with_undelegate_with_atls() { // Create test data let committed_account = create_committed_account(&[1, 2, 3]); - let tasks: Vec> = vec![ + let tasks: Vec = vec![ // finalize account - Box::new(ArgsTask::new(ArgsTaskType::Finalize(FinalizeTask { + Task::Finalize(FinalizeTask { delegated_account: committed_account.pubkey, - }))), + }), // BaseAction - Box::new(ArgsTask::new(ArgsTaskType::Undelegate(UndelegateTask { + Task::Undelegate(UndelegateTask { delegated_account: committed_account.pubkey, owner_program: Pubkey::new_unique(), rent_reimbursement: Pubkey::new_unique(), - }))), + }), ]; let lookup_tables_keys = TaskStrategist::collect_lookup_table_keys(