Skip to main content
The job system powers long-running operations in Spacedrive. It provides automatic persistence, progress tracking, and graceful interruption handling for tasks like indexing, file processing, and sync operations. Jobs execute asynchronously with minimal boilerplate. They persist their state to survive crashes and resume where they left off. The system integrates with Spacedrive’s task executor for efficient resource usage.

Core Concepts

A job represents a resumable unit of work. Jobs report progress, handle interruptions, and maintain state across executions. The system manages job lifecycles automatically.
Jobs are library-scoped. Each library maintains its own job database and execution queue.

Job Lifecycle

Jobs transition through defined states during execution:
1

Queued

Job created and waiting for execution. Initial state after dispatch.
2

Running

Job actively executing. Progress updates flow to subscribers.
3

Paused

Job interrupted but resumable. State persisted to database.
4

Completed

Job finished successfully. Moved to history table.
Failed or cancelled jobs cannot resume. The system distinguishes between recoverable interruptions and permanent failures.

Key Components

The job system consists of several interconnected parts: Job Manager coordinates all job operations. It maintains the job database, tracks running jobs, and handles lifecycle transitions. Located at core/src/infra/job/manager.rs. Job Registry enables automatic job discovery. Jobs register themselves at compile time using the derive macro. The registry creates jobs dynamically from saved state. See core/src/infra/job/registry.rs. Job Context provides execution environment. Jobs access the database, report progress, and interact with services through context. Implementation in core/src/infra/job/context.rs. Job Executor bridges jobs with the task system. It manages interruption signals and checkpoint operations. Found at core/src/infra/job/executor.rs.

Defining Jobs

Jobs implement two traits: Job for metadata and JobHandler for execution logic.
use sd_task_system::TaskId;
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize, Debug)]
struct ProcessFilesJob {
    location_id: i32,
    files: Vec<PathBuf>,
    processed: usize,
}

#[typetag::serde(name = "process_files")]
impl Job for ProcessFilesJob {
    const NAME: &'static str = "process_files";
    const VERSION: u32 = 1;
    const IS_RESUMABLE: bool = true;
}

#[async_trait::async_trait]
impl JobHandler for ProcessFilesJob {
    async fn run(&mut self, ctx: &JobContext) -> Result<Vec<i32>, JobError> {
        // Skip already processed files when resuming
        let files_to_process = &self.files[self.processed..];
        
        for (idx, file) in files_to_process.iter().enumerate() {
            // Check for interruption
            if ctx.check_interrupted().await? {
                return Err(JobError::Interrupted);
            }
            
            // Process file
            process_file(file, ctx).await?;
            
            // Update progress
            self.processed += 1;
            ctx.report_count_progress(self.processed, self.files.len()).await;
            
            // Save checkpoint periodically
            if idx % 100 == 0 {
                ctx.checkpoint().await?;
            }
        }
        
        Ok(vec![])
    }
}
The #[typetag::serde] attribute enables polymorphic serialization. Jobs must be serializable to support resumption.

Progress Reporting

Jobs communicate progress through the context. The system supports multiple progress types:
// Count-based progress
ctx.report_count_progress(current, total).await;

// Percentage progress
ctx.report_percentage_progress(0.75).await;

// Bytes processed
ctx.report_bytes_progress(processed_bytes, total_bytes).await;

// Custom structured data
ctx.report_structured_progress("phase", json!({
    "stage": "analyzing",
    "files_found": 1500
})).await;
Progress updates throttle automatically. The system batches updates to prevent database overhead.

Error Handling

Jobs distinguish between recoverable and permanent errors:
// Recoverable - job can resume
if network_unavailable() {
    return Err(JobError::Interrupted);
}

// Permanent failure - job cannot resume
if corrupt_data() {
    return Err(JobError::Critical(
        "Data corruption detected".into()
    ));
}

// Non-critical errors accumulate
ctx.report_non_critical_error("Skipped locked file").await;
Always check ctx.check_interrupted() in loops. This enables graceful shutdown and pause operations.

Dispatching Jobs

The job manager provides typed and dynamic dispatch methods:
// Typed dispatch
let handle = manager.dispatch(ProcessFilesJob {
    location_id: 1,
    files: vec![path1, path2],
    processed: 0,
}).await?;

// Dynamic dispatch by name
let handle = manager.dispatch_by_name(
    "process_files",
    json!({
        "location_id": 1,
        "files": ["path1", "path2"],
        "processed": 0
    })
).await?;

// Wait for completion
let result = handle.wait().await?;
Job handles provide status monitoring and progress streaming:
// Subscribe to status changes
let mut status_rx = handle.status();
while let Ok(status) = status_rx.recv().await {
    match status {
        JobStatus::Running => println!("Job started"),
        JobStatus::Completed => break,
        _ => {}
    }
}

// Stream progress updates
let mut progress_rx = handle.progress();
while let Ok(progress) = progress_rx.recv().await {
    if let Some(count) = progress.count {
        println!("Processed {}/{}", count.current, count.total);
    }
}

Database Schema

Jobs persist to a dedicated SQLite database (jobs.db) with three tables:
jobs
table
Active job records containing:
job_history
table
Completed jobs moved here for audit trails
job_checkpoints
table
Resumption checkpoints for long-running jobs

Advanced Features

Job Versioning

Jobs specify versions for schema evolution:
impl Job for DataMigrationJob {
    const VERSION: u32 = 2; // Increment when schema changes
}
The registry validates versions during resumption. Incompatible versions fail to load.

Extension Jobs

The system supports WASM-based extension jobs:
manager.dispatch_extension_job(
    extension_id,
    job_name,
    job_data
).await?;
Extensions run in isolated contexts with limited capabilities.

Performance Considerations

The job system optimizes for throughput and resumability:
  • Progress updates batch at 2-second intervals
  • Checkpoints save incrementally
  • Database operations use prepared statements
  • Channels use bounded capacity to prevent memory growth
For high-frequency operations, batch work into larger chunks. This reduces checkpoint overhead and improves performance.

Integration Points

Jobs integrate with core Spacedrive systems: Task System: Jobs execute as tasks with configurable priority. The executor handles work distribution across threads. Event System: State changes emit events for UI updates. Subscribe to JOB_MANAGER_EVENTS for notifications. Action System: User actions spawn jobs with audit context. The system tracks who initiated operations. Library System: Each library maintains independent job state. Jobs cannot access cross-library data.

Common Patterns

Batch Processing

Process items in chunks for efficiency:
const BATCH_SIZE: usize = 1000;

for (batch_idx, chunk) in items.chunks(BATCH_SIZE).enumerate() {
    if ctx.check_interrupted().await? {
        self.batch_idx = batch_idx;
        return Err(JobError::Interrupted);
    }
    
    process_batch(chunk).await?;
    
    ctx.report_count_progress(
        batch_idx * BATCH_SIZE + chunk.len(),
        items.len()
    ).await;
    
    ctx.checkpoint().await?;
}

Phased Execution

Split complex jobs into phases:
match self.phase {
    Phase::Discovery => {
        let items = discover_items(ctx).await?;
        self.items = items;
        self.phase = Phase::Processing;
        ctx.checkpoint().await?;
    }
    Phase::Processing => {
        process_items(&mut self.items, ctx).await?;
        self.phase = Phase::Cleanup;
        ctx.checkpoint().await?;
    }
    Phase::Cleanup => {
        cleanup_resources(ctx).await?;
    }
}

Child Jobs

Spawn dependent jobs (feature in development):
let child_ids = ctx.spawn_children(vec![
    AnalyzeFileJob { path: file1 },
    AnalyzeFileJob { path: file2 },
]).await?;

ctx.wait_for_children(child_ids).await?;

Debugging

Enable file-based logging for troubleshooting:
std::env::set_var("SD_JOBS_FILE_LOG", "1");
Logs write to .spacedrive/jobs/{job_id}.log with detailed execution traces. Monitor job metrics through the context:
let metrics = ctx.get_metrics();
println!("Execution time: {}s", metrics.elapsed_seconds);
println!("Memory used: {}MB", metrics.memory_mb);
Never block the job executor thread. Use tokio::task::spawn_blocking for CPU-intensive work.
The job system provides the foundation for reliable background processing in Spacedrive. Its resumable design ensures operations complete despite interruptions, while the progress system keeps users informed of ongoing work.