Architecture
Job System
This guide will walk you through creating jobs in Spacedrive's job system. The job system is designed to handle long-running tasks with features like progress tracking, pause/resume capabilities, and error handling.
Job System Overview
The job system consists of several key components:
- Job: The high-level operation (e.g., file deletion, copying)
- Tasks: Individual units of work within a job
- Behaviors: Implementations of specific operations (e.g., different deletion strategies)
- Progress Tracking: System for monitoring and reporting job progress
- State Management: Handles job persistence and recovery
- Undo Support: Optional capability for jobs to define reversible operations
- Location Handling: Support for both location-bound and location-independent operations
The Job Trait
The Job
trait is the core abstraction in Spacedrive's job system. It defines the interface that all jobs must implement:
pub trait Job: Send + Sync + Hash + 'static {
const TYPE_NAME: JobName;
fn name(&self) -> JobName;
fn resume_tasks<OuterCtx: OuterContext>(...) -> impl Future<...>;
fn run<OuterCtx: OuterContext>(...) -> impl Future<...>;
}
Key Features
-
Asynchronous Execution: Jobs are async by design, suitable for IO-intensive operations
-
Progress Reporting: Jobs can report their progress through
ProgressUpdate
:pub enum ProgressUpdate { TaskCount(u64), // Total number of tasks CompletedTaskCount(u64), // Number of completed tasks Message(String), // Informative message Phase(String), // Current phase of the job }
-
State Management: Jobs can be serialized and deserialized for persistence
-
Task Dispatching: Jobs use
JobTaskDispatcher
to manage individual tasks -
Context Access: Jobs have access to:
- Database client
- Sync manager
- Query invalidation
- Event reporting
- Data directory
Return Status
Jobs can complete with different statuses:
pub enum ReturnStatus {
Completed(JobReturn),
Shutdown(Result<Option<Vec<u8>>, EncodeError>),
Canceled(JobReturn)
}
Built-in Job Types
The system includes several built-in job types:
pub enum JobName {
Indexer, // File system indexing
FileIdentifier, // File type identification
MediaProcessor, // Media processing
Copy, // File copying
Move, // File moving
Delete, // File deletion
Erase, // Secure erasure
FileValidator // File validation
}
Creating a New Job
Let's walk through creating a job using the file deletion system as an example. The complete implementation can be found in core/crates/file-actions/src/deleter/
.
0. Register the Job
Before implementing the job itself, you need to register it in two places:
- Add it to the
JobName
enum injob_system/job.rs
:
pub enum JobName {
Indexer,
FileIdentifier,
MediaProcessor,
Copy,
Move,
Delete, // <-- Your new job name here
// ...
}
- Add your job's metadata type to
ReportInputMetadata
injob_system/report.rs
:
pub enum ReportInputMetadata {
// ... other jobs ...
Deleter {
location_id: location::id::Type,
file_path_ids: Vec<file_path::id::Type>,
},
// Add your job's metadata here with the fields needed to track its progress
}
This registration is crucial for:
- Job identification in the system
- Progress tracking and reporting
- Job persistence and recovery
- UI display and management
1. Define Job Structure
First, create your job structure that will hold the necessary state:
#[derive(Debug)]
pub struct DeleterJob<B> {
// Required parameters
location_id: Option<location::id::Type>,
file_path_ids: Vec<file_path::id::Type>,
// Task management state
pending_tasks: Option<Vec<TaskHandle<e>>>,
shutdown_tasks: Option<Vec<Box<dyn Task<e>>>>,
accumulative_errors: Option<Vec<e>>,
// Generic behavior type
behavior: PhantomData<fn(B) -> B>,
}
2. Define Job State
Create a serializable state structure for job persistence:
#[derive(Debug, Serialize, Deserialize)]
pub struct DeleterState {
location_id: Option<location::id::Type>,
file_path_ids: Vec<file_path::id::Type>,
shutdown_tasks: Option<SerializedTasks>,
accumulative_errors: Option<Vec<NonCriticalError>>,
}
3. Implement Job Trait
Implement the Job
trait for your job structure:
impl Job for DeleterJob {
const TYPE_NAME: JobName = JobName::Delete;
fn name(&self) -> JobName {
Self::TYPE_NAME
}
async fn run<OuterCtx: OuterContext>(
self,
dispatcher: JobTaskDispatcher,
ctx: impl JobContext<OuterCtx>,
) -> Result<ReturnStatus, Error> {
// Your job implementation here
}
async fn resume_tasks<OuterCtx: OuterContext>(
&mut self,
dispatcher: &JobTaskDispatcher,
ctx: &impl JobContext<OuterCtx>,
serialized_tasks: SerializedTasks,
) -> Result<(), Error> {
// Task resumption logic here
}
}
4. Define Behavior Trait
If your job supports multiple implementations (like delete vs move to trash), create a behavior trait:
pub trait DeleteBehavior {
fn delete(file: FileData) -> impl Future<Output = Result<ExecStatus, ()>> + Send;
fn delete_all<I>(
files: I,
interrupter: Option<&Interrupter>,
) -> impl Future<Output = Result<ExecStatus, ()>> + Send
where
I: IntoIterator<Item = FileData> + Send + 'static,
I::IntoIter: Send;
}
5. Implement Behaviors
Create concrete implementations of your behavior:
// Permanent deletion behavior
pub struct RemoveBehavior;
impl DeleteBehavior for RemoveBehavior {
async fn delete(file_data: FileData) -> Result<ExecStatus, ()> {
if file_data.full_path.is_dir() {
tokio::fs::remove_dir_all(&file_data.full_path).await
} else {
tokio::fs::remove_file(&file_data.full_path).await
};
Ok(ExecStatus::Done(TaskOutput::Empty))
}
}
// Move to trash behavior
pub struct MoveToTrashBehavior;
impl DeleteBehavior for MoveToTrashBehavior {
async fn delete_all<I>(files: I, interrupter: Option<&Interrupter>) -> Result<ExecStatus, ()>
where
I: IntoIterator<Item = FileData> + Send + 'static,
I::IntoIter: Send + 'static,
{
if let Some(interrupter) = interrupter {
check_interruption!(interrupter);
}
task::spawn_blocking(|| trash::delete_all(files.into_iter().map(|x| x.full_path))).await;
Ok(ExecStatus::Done(().into()))
}
}
6. Create Task Implementation
Define the task that will perform the actual work:
pub struct RemoveTask<B> {
id: TaskId,
files: Vec<FileData>,
counter: Arc<AtomicU64>,
behavior: PhantomData<fn(B) -> B>,
}
#[async_trait::async_trait]
impl<B: DeleteBehavior + Send + 'static> Task<e> for RemoveTask<B> {
fn id(&self) -> TaskId {
self.id
}
async fn run(&mut self, interrupter: &Interrupter) -> Result<ExecStatus, Error> {
let size = self.files.len();
match B::delete_all(self.files.clone(), Some(interrupter)).await {
Ok(ExecStatus::Done(_)) => {
self.counter.fetch_add(size as _, Ordering::AcqRel);
Ok(ExecStatus::Done(().into()))
}
Ok(status) => Ok(status),
Err(_) => Err(Error::Deleter("Task failed".into()))
}
}
}
Key Components
-
mod.rs
- Public exports for your job module
- Re-exports of commonly used types
mod job; mod progress; mod tasks; pub use job::*; pub use progress::*; pub use tasks::*;
-
job.rs
- Job struct and implementation
- State management
- Task coordination
pub struct YourJob<C> { // Job parameters params: JobParams, // Task coordination tasks: Vec<TaskHandle>, // Progress tracking progress: Arc<AtomicU64>, }
-
progress.rs
- Progress tracking enums/structs
- Progress calculation utilities
#[derive(Debug, Clone, Serialize, Deserialize)] pub enum JobProgress { Started { total: u64 }, Progress { current: u64, total: u64 }, }
-
tasks/behaviors/
- Trait defining behavior interface
- Different implementation strategies
#[async_trait] pub trait TaskBehavior { async fn execute(&self, params: TaskParams) -> Result<(), Error>; }
-
tasks/batch.rs
- Batch processing logic
- Performance optimizations
pub struct BatchedOperation { items: Vec<Item>, batch_size: usize, } impl BatchedOperation { pub fn process(&self, dispatcher: &JobTaskDispatcher) -> Result<(), Error> { for chunk in items.chunks(batch_size) { let task = MyTask::new(chunk.to_vec(), progress_counter.clone()); tasks.push(dispatcher.dispatch(task)?); } } }
This structure provides:
- Clear separation of concerns
- Easy navigation of code
- Consistent organization across jobs
- Reusable components
- Maintainable implementations
Best Practices
-
Error Handling
- Use accumulative errors for non-critical failures
- Include context in error messages
- Handle cleanup on errors
-
Progress Reporting
- Report progress regularly using
ctx.progress()
- Include meaningful progress messages
- Use atomic counters for thread-safe progress tracking
- Report progress regularly using
-
Resource Management
- Clean up resources on cancellation
- Use
Arc
for shared resources - Implement proper task cleanup
-
State Management
- Keep serialized state minimal
- Handle state restoration properly
- Use
Option
for non-persistent state
Best Practices for Progress Reporting
Use Structured Data
Always send structured data instead of formatted strings. This allows the frontend to handle localization and formatting:
❌ Don't do this:
ctx.progress(ProgressUpdate::Message(format!(
"Processed {} of {} items",
completed,
total
))).await;
✅ Do this instead:
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum JobProgress {
Processing {
completed: u64,
total: u64,
bytes_processed: u64,
current_item: Option<String>,
},
Complete {
total_processed: u64,
total_bytes: u64,
duration: Duration,
}
}
// In your task:
ctx.progress(JobProgress::Processing {
completed: files_processed,
total: total_files,
bytes_processed,
current_item: Some(current_file.name.clone()),
}).await;
Include All Necessary Data
Progress updates should include enough information for the UI to show:
- Overall progress (e.g., files processed / total files)
- Current operation details (e.g., current file name, size)
- Performance metrics (e.g., speed, estimated time remaining)
- Operation-specific details (e.g., copy source/target)
Progress Granularity
Report progress at appropriate intervals:
- Major state changes (Started, Complete)
- Per-item updates for user feedback
- Periodic updates for long-running operations
Example of a well-structured progress enum:
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CopyProgress {
Started {
total_files: u64,
total_bytes: u64,
},
File {
name: String,
current_file: u64,
total_files: u64,
bytes: u64,
source: PathBuf,
target: PathBuf,
},
BytesWritten {
bytes: u64,
speed: f64,
eta_seconds: u32,
},
Complete {
files_copied: u64,
total_bytes: u64,
duration: Duration,
},
}
This approach:
- Enables proper i18n/l10n in the UI
- Provides flexibility in how data is displayed
- Makes it easier to modify the UI without backend changes
- Ensures consistent progress reporting across different locales
Common Patterns
- Batch Processing
let batch_size = 100;
for chunk in items.chunks(batch_size) {
let task = MyTask::new(chunk.to_vec(), progress_counter.clone());
tasks.push(dispatcher.dispatch(task)?);
}
- Error Accumulation
if let Err(e) = result {
self.accumulative_errors.get_or_insert(Vec::new()).push(e);
}
Debugging Tips
- Use tracing for better debugging:
tracing::debug!(task_id = %self.id, "Starting task execution");
- Monitor task states:
while let Some(status) = task_handle.status().await {
match status {
TaskStatus::Running => continue,
TaskStatus::Paused => handle_pause(),
TaskStatus::Completed => break,
TaskStatus::Failed(e) => handle_error(e),
}
}
- Add proper error context:
Error::TaskFailed(format!("Failed to process item {}: {}", item_id, e))
These optimizations help ensure your job:
- Handles large operations efficiently through batching
- Resolves conflicts reliably
- Adapts to different scenarios
- Provides detailed progress feedback
Directory Structure
When implementing a new job, follow this recommended directory structure based on the file-actions crate:
src/
├── your_job/
│ ├── mod.rs # Public exports and job registration
│ ├── job.rs # Job implementation and state management
│ ├── progress.rs # Progress tracking types
│ └── tasks/
│ ├── mod.rs # Task exports and shared types
│ ├── behaviors/ # Different implementation strategies
│ │ ├── mod.rs # Behavior trait and utilities
│ │ ├── fast.rs # Fast implementation (e.g., for small files)
│ │ └── stream.rs # Streaming implementation (e.g., for large files)
│ ├── batch.rs # Batch processing logic
│ ├── conflict.rs # Conflict resolution utilities
│ └── task_name.rs # Individual task implementations
Key Components
-
mod.rs
- Public exports for your job module
- Re-exports of commonly used types
mod job; mod progress; mod tasks; pub use job::*; pub use progress::*; pub use tasks::*;
-
job.rs
- Job struct and implementation
- State management
- Task coordination
pub struct YourJob<C> { // Job parameters params: JobParams, // Task management pending_tasks: Option<Vec<TaskHandle>>, // Progress tracking progress: Arc<AtomicU64>, }
-
progress.rs
- Progress tracking enums/structs
- Progress calculation utilities
#[derive(Debug, Clone, Serialize, Deserialize)] pub enum JobProgress { Started { total: u64 }, Progress { current: u64, total: u64 }, }
-
tasks/behaviors/
- Trait defining behavior interface
- Different implementation strategies
#[async_trait] pub trait TaskBehavior { async fn execute(&self, params: TaskParams) -> Result<(), Error>; }
-
tasks/batch.rs
- Batch processing logic
- Performance optimizations
pub struct BatchedOperation { items: Vec<Item>, batch_size: usize, } impl BatchedOperation { pub fn process(&self, dispatcher: &JobTaskDispatcher) -> Result<(), Error> { for chunk in items.chunks(batch_size) { let task = MyTask::new(chunk.to_vec(), progress_counter.clone()); tasks.push(dispatcher.dispatch(task)?); } } }
This structure provides:
- Clear separation of concerns
- Easy navigation of code
- Consistent organization across jobs
- Reusable components
- Maintainable implementations