summaryrefslogblamecommitdiff
path: root/makima/src/db/repository.rs
blob: d453f9964e1b199de4ac560ebb4a92090a8897d1 (plain) (tree)
1
2
3
4
5
6
7
8


                                                    
                       


                 
                    



                                                                                    
                                                       
                                                       
                                                  
                                                                                            
                                                                      

                                                 
  
 











                                                      

                                                                


















                                                                               
                                                                                       





                                             





                                                           











                                                                                                      

                                                                                    
                                                                                             


                              

                                                                                  
                                                                                                                                                                         

           



                           
                     







                                                                                     
                                                                                                                                                                      
                  
                     


             



                         
                                               


                                                                          
                                                                                                                                                                      
                  


                                



                    





                                                                                          



                           
                                            





                                             









                                                         




                                                                                


                                                                    
 




                                                                          

                                                                                                         
                                                                                                                                                                             


                 












                                                

                                                                                                         
                                                                                                                                                                             


                 




















                                                                    






                                                                                
                     


             





                                  
                      
                                                                     
                                                                     




                        

                                                                                










                                                                                    

                                                                        

                              


                                                                                                                                                                         







                           
                              











                                        
                                                                                                                                                                      













                                                                                                    
                                                                                                                                                                      









                                
                                      


                               










                                                        
                                     






                                                   
                                                                 

                                                                            

















                                         













                                                                             






































                                                                                                         
                                                                                                                                                                             


















                                                                                                         
                                                                                                                                                                             















































                                                                                



























































































































                                                                                                             
                             















                                                                                              












                                                                                           



                                                                                
                                                                                              

                                                                                                        
                                                     



                                                          




                                                                                            

                                                                  
                                                                                                              
                















                                                                                                       

                                                                     
                                                                                   
                                                     
         
                                                                                      
















                                     

                                     


















                                                                                     
                                         



                                                                                 
                 


                                                                                      
                                                                                      
                    
                                                                            











                                                                                                     
                 


                                                                                      
                                                                                      









                                                   
                                                            




















































                                                                                    






















































































































































                                                                                                  

                                                                                                        
                                                     



                                                                             






                                                                                                                                




                                                                                            

                                                                  
                                                                                                              
                












                                                                                                       




                                                                      






                                                                                       


                              

                                                                               
                                                                                   
                                                      
                                                                  
         
                                                                                                          

















                                     

                                     

                                 
                                 



                    


































                                                                             



















                                                                                  
                                         






                                            
                 


                                                                                      
                                                                                      
                    
                                                                                                







                                                   















































































                                                                                    











                                                                              
                 


                                                                                      
                                                                                      
                    













                                               




                                                                         


                                            
                                                                 


                                     
                 


                                                                                      
                                                                                      
                    
                             
                                 
                                      




                                                   
                 



                    








                                                    
                 


                                                                                      
                                                                                      












































                                                                              
                                                                        

                                                                                 
                                                       













                                                                                   
                                                                                      

                                                             

















                                 
                              
                     









                                                                                   
                                                                                      
                                                


















                                           
                              
                     



















































































































































































































































































                                                                                                                   



















                                                 
































































                                                                                  


















                                                                          














                                                                                
                 


                                                                                              
                                                                                              














                                                                               
                 


                                                                                              
                                                                                              











































































































































































                                                                                  
 

                                                                                                 


                                     

                                             








                                                         
                       



                    
























                                                                                                  
           



















                                                                         
         

                                               

           










































                                                                                               































                                                                               




























                                                                                 


















































                                                                   





































































































































































                                                                                                                                      
































































































                                                                                                                               


                                



                                        

                                                                                             



                   


                        




                     

















































































































































                                                                                         



























                                                                                











































































































                                                                                       
 
 




                                                                                



                                                                       




                                        


                                                   
           

                                                                                                         




                     


                              
                                                          















































                                                                                         
















                                                                      















                                                                               






















                                                                      


                                                                             






                                                 
                                                                  
                                                                   
                             
                                                  



                                                              
                         








                                                                                
                             
                              






































                                                                      




                                                                                             

                                                                              
                                                                                                      



                                                


                                                                                      
                                                     






                                       




                               

                    
                         












                                         








                                                                         










                                                                    





























































                                                                                               
                                                                                





                                                                                
                      








                                                        
                       


                                 
                          













                                                                           
                                 












                                              



















                                                                                   





                                                                         
                                                                                                                             








                                                   


















                                                                                            



































                                                                                               
                                                                                      












                                              























                                                                                    
                                                                                


                                                                                












                                                         























                                                           
                                
                                                  












                                                                             
                                                  
           




                                                                         









                           
                         
                                 
                    













                                                                                            





























































































                                                                                            


















                                                                                                 
                                                                                


                                                                                





                                                                          







                                                  
                                             



















                                                             







                                                                           







                                             







                                                                                            





















                                                                           
                             














                                                             
                                                                   



















                                                                            
                            










                                  
                         





                         




















































































                                                                         






























                                                                           


                                                                        



                                                     


                                                          










                                  























                                                                                   




                                                                           



















































                                                                                






                                                                         

















































                                                                                









                                                                        






















































                                                                                               










                                                                       

















































                                                                                              







                                                                        





                       















































































































































                                                                                   


























                                                                    



































































                                                                                

















                                                                                    





                                                                       



































                                                                                  























                                                                              



                                                                         







                                             
                             















                                        





















































































































                                                                                      






















                                                                                          
































                                                                                
                              

                                                                                               




                                       

                                                                                         

                                                                         















                                                
                          
                             


                                       
                          

                            



                                                   
                                    







                                 






































                                                                       























                                                            






                                    














































































































                                                                                 



























                                                                                    



















                                                       
 


































                                                                                 












                                                                                 
 















                                                                                

                                                                                                                                     










                           
                              
                     











                                                          
                                
                                




















                                                                             



                                                                 




                                                                                             

















                                                                  


                                    

                                       















































                                                                                         
                                                                                             
                                               





                                                                                     
                                                                  













                                       
                         
                 










































                                                                      
                                                                                      

                                                                                         



                                   














                                                                  





                                                                                                   




































                                                                                  
                                     


                     
                                                      




                                       






                  



                                                                                

                                                                              



                                                           
                       





                                      

                                                         









                                  
                       



























                                                                               
































                                                               


































                                                                      








































































































































































                                                                                              
//! Repository pattern for file database operations.

use chrono::Utc;
use serde::Deserialize;
use sqlx::PgPool;
use uuid::Uuid;

use super::models::{
    CheckpointPatch, CheckpointPatchInfo, ConversationMessage, ConversationSnapshot,
    CreateFileRequest, CreateTaskRequest,
    Daemon, DaemonTaskAssignment, DaemonWithCapacity,
    Directive, DirectiveDocument, DirectiveStep, DirectiveSummary,
    CreateDirectiveRequest, CreateDirectiveStepRequest,
    UpdateDirectiveRequest, UpdateDirectiveStepRequest,
    CreateOrderRequest, Order, UpdateOrderRequest,
    CreateDirectiveOrderGroupRequest, DirectiveOrderGroup, UpdateDirectiveOrderGroupRequest,
    File, FileSummary, FileVersion, HistoryEvent, HistoryQueryFilters,
    Task, TaskCheckpoint, TaskEvent, TaskSummary,
    UpdateFileRequest, UpdateTaskRequest,
};

/// Repository error types.
#[derive(Debug)]
pub enum RepositoryError {
    /// Database error
    Database(sqlx::Error),
    /// Version conflict (optimistic locking failure)
    VersionConflict {
        /// The version the client expected
        expected: i32,
        /// The actual current version in the database
        actual: i32,
    },
    /// Caller-facing precondition failure (wrong status, etc.).
    Validation(String),
}

impl From<sqlx::Error> for RepositoryError {
    fn from(e: sqlx::Error) -> Self {
        RepositoryError::Database(e)
    }
}

impl std::fmt::Display for RepositoryError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            RepositoryError::Database(e) => write!(f, "Database error: {}", e),
            RepositoryError::VersionConflict { expected, actual } => {
                write!(
                    f,
                    "Version conflict: expected {}, actual {}",
                    expected, actual
                )
            }
            RepositoryError::Validation(msg) => write!(f, "Validation error: {}", msg),
        }
    }
}

impl std::error::Error for RepositoryError {}

/// Generate a default name based on current timestamp.
fn generate_default_name() -> String {
    let now = Utc::now();
    now.format("Recording - %b %d %Y %H:%M:%S").to_string()
}

/// Internal request for creating files without contract association (e.g., audio transcription).
/// User-facing file creation should use CreateFileRequest which requires contract_id.
pub struct InternalCreateFileRequest {
    pub name: Option<String>,
    pub description: Option<String>,
    pub transcript: Vec<super::models::TranscriptEntry>,
    pub location: Option<String>,
}

/// Create a new file record (internal use, no contract required).
/// For user-facing file creation, use create_file_for_owner which requires a contract.
pub async fn create_file(pool: &PgPool, req: InternalCreateFileRequest) -> Result<File, sqlx::Error> {
    let name = req.name.unwrap_or_else(generate_default_name);
    let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default();
    let body_json = serde_json::to_value::<Vec<super::models::BodyElement>>(vec![]).unwrap();

    sqlx::query_as::<_, File>(
        r#"
        INSERT INTO files (name, description, transcript, location, summary, body)
        VALUES ($1, $2, $3, $4, NULL, $5)
        RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
        "#,
    )
    .bind(&name)
    .bind(&req.description)
    .bind(&transcript_json)
    .bind(&req.location)
    .bind(&body_json)
    .fetch_one(pool)
    .await
}

/// Get a file by ID.
pub async fn get_file(pool: &PgPool, id: Uuid) -> Result<Option<File>, sqlx::Error> {
    sqlx::query_as::<_, File>(
        r#"
        SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
        FROM files
        WHERE id = $1
        "#,
    )
    .bind(id)
    .fetch_optional(pool)
    .await
}

/// List all files, ordered by created_at DESC.
pub async fn list_files(pool: &PgPool) -> Result<Vec<File>, sqlx::Error> {
    sqlx::query_as::<_, File>(
        r#"
        SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
        FROM files
        ORDER BY created_at DESC
        "#,
    )
    .fetch_all(pool)
    .await
}

/// Update a file by ID with optimistic locking.
///
/// If `req.version` is provided, the update will only succeed if the current
/// version matches. Returns `RepositoryError::VersionConflict` if there's a mismatch.
///
/// If `req.version` is None (e.g., internal system updates), version checking is skipped.
pub async fn update_file(
    pool: &PgPool,
    id: Uuid,
    req: UpdateFileRequest,
) -> Result<Option<File>, RepositoryError> {
    // Get the existing file first
    let existing = get_file(pool, id).await?;
    let Some(existing) = existing else {
        return Ok(None);
    };

    // Check version if provided (optimistic locking)
    if let Some(expected_version) = req.version {
        if existing.version != expected_version {
            return Err(RepositoryError::VersionConflict {
                expected: expected_version,
                actual: existing.version,
            });
        }
    }

    // Apply updates
    let name = req.name.unwrap_or(existing.name);
    let description = req.description.or(existing.description);
    let transcript = req.transcript.unwrap_or(existing.transcript);
    let transcript_json = serde_json::to_value(&transcript).unwrap_or_default();
    let summary = req.summary.or(existing.summary);
    let body = req.body.unwrap_or(existing.body);
    let body_json = serde_json::to_value(&body).unwrap_or_default();

    // Update with version check in WHERE clause for race condition safety
    let result = if req.version.is_some() {
        sqlx::query_as::<_, File>(
            r#"
            UPDATE files
            SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
            WHERE id = $1 AND version = $7
            RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
            "#,
        )
        .bind(id)
        .bind(&name)
        .bind(&description)
        .bind(&transcript_json)
        .bind(&summary)
        .bind(&body_json)
        .bind(req.version.unwrap())
        .fetch_optional(pool)
        .await?
    } else {
        // No version check for internal updates
        sqlx::query_as::<_, File>(
            r#"
            UPDATE files
            SET name = $2, description = $3, transcript = $4, summary = $5, body = $6, updated_at = NOW()
            WHERE id = $1
            RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
            "#,
        )
        .bind(id)
        .bind(&name)
        .bind(&description)
        .bind(&transcript_json)
        .bind(&summary)
        .bind(&body_json)
        .fetch_optional(pool)
        .await?
    };

    // If versioned update returned None, there was a race condition
    if result.is_none() && req.version.is_some() {
        // Re-fetch to get the actual version
        if let Some(current) = get_file(pool, id).await? {
            return Err(RepositoryError::VersionConflict {
                expected: req.version.unwrap(),
                actual: current.version,
            });
        }
    }

    Ok(result)
}

/// Delete a file by ID.
pub async fn delete_file(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(
        r#"
        DELETE FROM files
        WHERE id = $1
        "#,
    )
    .bind(id)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() > 0)
}

/// Count total files.
pub async fn count_files(pool: &PgPool) -> Result<i64, sqlx::Error> {
    let result: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM files")
        .fetch_one(pool)
        .await?;

    Ok(result.0)
}

// =============================================================================
// Owner-Scoped File Functions
// =============================================================================

/// Create a new file record for a specific owner.
pub async fn create_file_for_owner(
    pool: &PgPool,
    owner_id: Uuid,
    req: CreateFileRequest,
) -> Result<File, sqlx::Error> {
    let name = req.name.unwrap_or_else(generate_default_name);
    let transcript_json = serde_json::to_value(&req.transcript).unwrap_or_default();
    let body_json = serde_json::to_value(&req.body).unwrap_or_default();

    sqlx::query_as::<_, File>(
        r#"
        INSERT INTO files (owner_id, name, description, transcript, location, summary, body, repo_file_path)
        VALUES ($1, $2, $3, $4, $5, NULL, $6, $7)
        RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
        "#,
    )
    .bind(owner_id)
    .bind(&name)
    .bind(&req.description)
    .bind(&transcript_json)
    .bind(&req.location)
    .bind(&body_json)
    .bind(&req.repo_file_path)
    .fetch_one(pool)
    .await
}

/// Get a file by ID, scoped to owner.
pub async fn get_file_for_owner(
    pool: &PgPool,
    id: Uuid,
    owner_id: Uuid,
) -> Result<Option<File>, sqlx::Error> {
    sqlx::query_as::<_, File>(
        r#"
        SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
        FROM files
        WHERE id = $1 AND owner_id = $2
        "#,
    )
    .bind(id)
    .bind(owner_id)
    .fetch_optional(pool)
    .await
}

/// List all files for an owner, ordered by created_at DESC.
pub async fn list_files_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<File>, sqlx::Error> {
    sqlx::query_as::<_, File>(
        r#"
        SELECT id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
        FROM files
        WHERE owner_id = $1
        ORDER BY created_at DESC
        "#,
    )
    .bind(owner_id)
    .fetch_all(pool)
    .await
}

/// Database row type for file summary
#[derive(Debug, sqlx::FromRow)]
struct FileSummaryRow {
    id: Uuid,
    name: String,
    description: Option<String>,
    #[sqlx(json)]
    transcript: Vec<crate::db::models::TranscriptEntry>,
    version: i32,
    repo_file_path: Option<String>,
    repo_sync_status: Option<String>,
    created_at: chrono::DateTime<chrono::Utc>,
    updated_at: chrono::DateTime<chrono::Utc>,
}

/// List file summaries for an owner.
pub async fn list_file_summaries_for_owner(
    pool: &PgPool,
    owner_id: Uuid,
) -> Result<Vec<FileSummary>, sqlx::Error> {
    let rows = sqlx::query_as::<_, FileSummaryRow>(
        r#"
        SELECT
            f.id, f.name, f.description, f.transcript, f.version,
            f.repo_file_path, f.repo_sync_status, f.created_at, f.updated_at
        FROM files f
        WHERE f.owner_id = $1
        ORDER BY f.created_at DESC
        "#,
    )
    .bind(owner_id)
    .fetch_all(pool)
    .await?;

    Ok(rows
        .into_iter()
        .map(|row| {
            let duration = row
                .transcript
                .iter()
                .map(|t| t.end)
                .fold(0.0_f32, f32::max);
            FileSummary {
                id: row.id,
                name: row.name,
                description: row.description,
                transcript_count: row.transcript.len(),
                duration: if duration > 0.0 { Some(duration) } else { None },
                version: row.version,
                repo_file_path: row.repo_file_path,
                repo_sync_status: row.repo_sync_status,
                created_at: row.created_at,
                updated_at: row.updated_at,
            }
        })
        .collect())
}

/// Update a file by ID with optimistic locking, scoped to owner.
pub async fn update_file_for_owner(
    pool: &PgPool,
    id: Uuid,
    owner_id: Uuid,
    req: UpdateFileRequest,
) -> Result<Option<File>, RepositoryError> {
    // Get the existing file first (scoped to owner)
    let existing = get_file_for_owner(pool, id, owner_id).await?;
    let Some(existing) = existing else {
        return Ok(None);
    };

    // Check version if provided (optimistic locking)
    if let Some(expected_version) = req.version {
        if existing.version != expected_version {
            return Err(RepositoryError::VersionConflict {
                expected: expected_version,
                actual: existing.version,
            });
        }
    }

    // Apply updates
    let name = req.name.unwrap_or(existing.name);
    let description = req.description.or(existing.description);
    let transcript = req.transcript.unwrap_or(existing.transcript);
    let transcript_json = serde_json::to_value(&transcript).unwrap_or_default();
    let summary = req.summary.or(existing.summary);
    let body = req.body.unwrap_or(existing.body);
    let body_json = serde_json::to_value(&body).unwrap_or_default();

    // Update with version check in WHERE clause for race condition safety
    let result = if req.version.is_some() {
        sqlx::query_as::<_, File>(
            r#"
            UPDATE files
            SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
            WHERE id = $1 AND owner_id = $2 AND version = $8
            RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
            "#,
        )
        .bind(id)
        .bind(owner_id)
        .bind(&name)
        .bind(&description)
        .bind(&transcript_json)
        .bind(&summary)
        .bind(&body_json)
        .bind(req.version.unwrap())
        .fetch_optional(pool)
        .await?
    } else {
        // No version check for internal updates
        sqlx::query_as::<_, File>(
            r#"
            UPDATE files
            SET name = $3, description = $4, transcript = $5, summary = $6, body = $7, updated_at = NOW()
            WHERE id = $1 AND owner_id = $2
            RETURNING id, owner_id, name, description, transcript, location, summary, body, version, repo_file_path, repo_synced_at, repo_sync_status, created_at, updated_at
            "#,
        )
        .bind(id)
        .bind(owner_id)
        .bind(&name)
        .bind(&description)
        .bind(&transcript_json)
        .bind(&summary)
        .bind(&body_json)
        .fetch_optional(pool)
        .await?
    };

    // If versioned update returned None, there was a race condition
    if result.is_none() && req.version.is_some() {
        // Re-fetch to get the actual version
        if let Some(current) = get_file_for_owner(pool, id, owner_id).await? {
            return Err(RepositoryError::VersionConflict {
                expected: req.version.unwrap(),
                actual: current.version,
            });
        }
    }

    Ok(result)
}

/// Delete a file by ID, scoped to owner.
pub async fn delete_file_for_owner(
    pool: &PgPool,
    id: Uuid,
    owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(
        r#"
        DELETE FROM files
        WHERE id = $1 AND owner_id = $2
        "#,
    )
    .bind(id)
    .bind(owner_id)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() > 0)
}

// =============================================================================
// Version History Functions
// =============================================================================

/// Set the version source for the current transaction.
/// This is used by the trigger to record who made the change.
pub async fn set_version_source(pool: &PgPool, source: &str) -> Result<(), sqlx::Error> {
    sqlx::query(&format!("SET LOCAL app.version_source = '{}'", source))
        .execute(pool)
        .await?;
    Ok(())
}

/// Set the change description for the current transaction.
pub async fn set_change_description(pool: &PgPool, description: &str) -> Result<(), sqlx::Error> {
    // Escape single quotes for SQL
    let escaped = description.replace('\'', "''");
    sqlx::query(&format!("SET LOCAL app.change_description = '{}'", escaped))
        .execute(pool)
        .await?;
    Ok(())
}

/// List all versions of a file, ordered by version DESC.
pub async fn list_file_versions(pool: &PgPool, file_id: Uuid) -> Result<Vec<FileVersion>, sqlx::Error> {
    // First get the current version from the files table
    let current = get_file(pool, file_id).await?;

    let mut versions = sqlx::query_as::<_, FileVersion>(
        r#"
        SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at
        FROM file_versions
        WHERE file_id = $1
        ORDER BY version DESC
        "#,
    )
    .bind(file_id)
    .fetch_all(pool)
    .await?;

    // Add the current version as the first entry if it exists
    if let Some(file) = current {
        let current_version = FileVersion {
            id: file.id,
            file_id: file.id,
            version: file.version,
            name: file.name,
            description: file.description,
            summary: file.summary,
            body: file.body,
            source: "user".to_string(), // Current version source
            change_description: None,
            created_at: file.updated_at,
        };
        versions.insert(0, current_version);
    }

    Ok(versions)
}

/// Get a specific version of a file.
pub async fn get_file_version(
    pool: &PgPool,
    file_id: Uuid,
    version: i32,
) -> Result<Option<FileVersion>, sqlx::Error> {
    // First check if this is the current version
    if let Some(file) = get_file(pool, file_id).await? {
        if file.version == version {
            return Ok(Some(FileVersion {
                id: file.id,
                file_id: file.id,
                version: file.version,
                name: file.name,
                description: file.description,
                summary: file.summary,
                body: file.body,
                source: "user".to_string(),
                change_description: None,
                created_at: file.updated_at,
            }));
        }
    }

    // Otherwise, look in the versions table
    sqlx::query_as::<_, FileVersion>(
        r#"
        SELECT id, file_id, version, name, description, summary, body, source, change_description, created_at
        FROM file_versions
        WHERE file_id = $1 AND version = $2
        "#,
    )
    .bind(file_id)
    .bind(version)
    .fetch_optional(pool)
    .await
}

/// Restore a file to a previous version.
/// This creates a new version with the content from the target version.
pub async fn restore_file_version(
    pool: &PgPool,
    file_id: Uuid,
    target_version: i32,
    current_version: i32,
) -> Result<Option<File>, RepositoryError> {
    // Get the target version content
    let target = get_file_version(pool, file_id, target_version).await?;
    let Some(target) = target else {
        return Ok(None);
    };

    // Set version source and description for the trigger
    set_version_source(pool, "system").await?;
    set_change_description(pool, &format!("Restored from version {}", target_version)).await?;

    // Update the file with the target version's content
    // This will trigger the save_file_version trigger to save the current state first
    let update_req = UpdateFileRequest {
        name: Some(target.name),
        description: target.description,
        transcript: None,
        summary: target.summary,
        body: Some(target.body),
        version: Some(current_version),
        repo_file_path: None,
    };

    update_file(pool, file_id, update_req).await
}

/// Count versions for a file.
pub async fn count_file_versions(pool: &PgPool, file_id: Uuid) -> Result<i64, sqlx::Error> {
    let result: (i64,) = sqlx::query_as(
        "SELECT COUNT(*) + 1 FROM file_versions WHERE file_id = $1", // +1 for current version
    )
    .bind(file_id)
    .fetch_one(pool)
    .await?;

    Ok(result.0)
}

// =============================================================================
// Task Functions
// =============================================================================

/// Create a new task.
///
/// If creating a subtask (parent_task_id is set) and repository settings are not provided,
/// the subtask will inherit repository_url, base_branch, target_branch, merge_mode,
/// and target_repo_path from the parent task. Depth is calculated from parent and limited
/// to max 1 (2 levels: orchestrator at depth 0, subtasks at depth 1).
///
/// NOTE: completion_action is NOT inherited - subtasks should not auto-merge unless
/// explicitly configured. The supervisor controls when completion steps happen.
///
/// Task spawning is now controlled by supervisors at the application level.
/// Depth is no longer constrained in the database.
pub async fn create_task(pool: &PgPool, req: CreateTaskRequest) -> Result<Task, sqlx::Error> {
    // Calculate depth + inherit settings from parent if applicable.
    let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
        if let Some(parent_id) = req.parent_task_id {
            let parent = get_task(pool, parent_id).await?
                .ok_or_else(|| sqlx::Error::RowNotFound)?;

            let new_depth = parent.depth + 1;
            let repo_url = req.repository_url.clone().or(parent.repository_url);
            let base_branch = req.base_branch.clone().or(parent.base_branch);
            let target_branch = req.target_branch.clone().or(parent.target_branch);
            let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
            let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
            let completion_action = req.completion_action.clone();

            (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
        } else {
            (
                0,
                req.repository_url.clone(),
                req.base_branch.clone(),
                req.target_branch.clone(),
                req.merge_mode.clone(),
                req.target_repo_path.clone(),
                req.completion_action.clone(),
            )
        };

    let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());

    sqlx::query_as::<_, Task>(
        r#"
        INSERT INTO tasks (
            parent_task_id, depth, name, description, plan, priority,
            repository_url, base_branch, target_branch, merge_mode,
            target_repo_path, completion_action, continue_from_task_id, copy_files,
            branched_from_task_id, conversation_state
        )
        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
        RETURNING *
        "#,
    )
    .bind(req.parent_task_id)
    .bind(depth)
    .bind(&req.name)
    .bind(&req.description)
    .bind(&req.plan)
    .bind(req.priority)
    .bind(&repo_url)
    .bind(&base_branch)
    .bind(&target_branch)
    .bind(&merge_mode)
    .bind(&target_repo_path)
    .bind(&completion_action)
    .bind(&req.continue_from_task_id)
    .bind(&copy_files_json)
    .bind(&req.branched_from_task_id)
    .bind(&req.conversation_history)
    .fetch_one(pool)
    .await
}

/// Get a task by ID.
pub async fn get_task(pool: &PgPool, id: Uuid) -> Result<Option<Task>, sqlx::Error> {
    sqlx::query_as::<_, Task>(
        r#"
        SELECT *
        FROM tasks
        WHERE id = $1
        "#,
    )
    .bind(id)
    .fetch_optional(pool)
    .await
}

/// List all top-level tasks (no parent), ordered by created_at DESC.
/// Hidden tasks are excluded by default.
pub async fn list_tasks(pool: &PgPool) -> Result<Vec<TaskSummary>, sqlx::Error> {
    sqlx::query_as::<_, TaskSummary>(
        r#"
        SELECT
            t.id,
            t.parent_task_id, t.depth, t.name, t.status, t.priority,
            t.progress_summary,
            (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
            t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
        FROM tasks t
        WHERE t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false
        ORDER BY t.priority DESC, t.created_at DESC
        "#,
    )
    .fetch_all(pool)
    .await
}

/// List subtasks of a parent task.
pub async fn list_subtasks(pool: &PgPool, parent_id: Uuid) -> Result<Vec<TaskSummary>, sqlx::Error> {
    sqlx::query_as::<_, TaskSummary>(
        r#"
        SELECT
            t.id,
            t.parent_task_id, t.depth, t.name, t.status, t.priority,
            t.progress_summary,
            (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
            t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
        FROM tasks t
        WHERE t.parent_task_id = $1
        ORDER BY t.priority DESC, t.created_at DESC
        "#,
    )
    .bind(parent_id)
    .fetch_all(pool)
    .await
}

/// List all tasks in a contract (for supervisor tree view).
pub async fn mark_task_for_retry(
    pool: &PgPool,
    task_id: Uuid,
    failed_daemon_id: Uuid,
) -> Result<Option<Task>, sqlx::Error> {
    sqlx::query_as::<_, Task>(
        r#"
        UPDATE tasks
        SET status = 'pending',
            daemon_id = NULL,
            retry_count = retry_count + 1,
            failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2),
            last_active_daemon_id = $2,
            interrupted_at = NOW(),
            error_message = 'Daemon disconnected, awaiting retry',
            updated_at = NOW()
        WHERE id = $1
          AND retry_count < max_retries
        RETURNING *
        "#,
    )
    .bind(task_id)
    .bind(failed_daemon_id)
    .fetch_optional(pool)
    .await
}

/// Mark a task as permanently failed (exceeded retry limit).
pub async fn mark_task_permanently_failed(
    pool: &PgPool,
    task_id: Uuid,
    failed_daemon_id: Uuid,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        UPDATE tasks
        SET status = 'failed',
            daemon_id = NULL,
            retry_count = retry_count + 1,
            failed_daemon_ids = array_append(COALESCE(failed_daemon_ids, '{}'), $2),
            last_active_daemon_id = $2,
            error_message = 'Task failed: exceeded maximum retry attempts',
            updated_at = NOW()
        WHERE id = $1
        "#,
    )
    .bind(task_id)
    .bind(failed_daemon_id)
    .execute(pool)
    .await?;
    Ok(())
}

/// Update a task by ID with optimistic locking.
pub async fn update_task(
    pool: &PgPool,
    id: Uuid,
    req: UpdateTaskRequest,
) -> Result<Option<Task>, RepositoryError> {
    // Get the existing task first
    let existing = get_task(pool, id).await?;
    let Some(existing) = existing else {
        return Ok(None);
    };

    // Check version if provided (optimistic locking)
    if let Some(expected_version) = req.version {
        if existing.version != expected_version {
            return Err(RepositoryError::VersionConflict {
                expected: expected_version,
                actual: existing.version,
            });
        }
    }

    // Apply updates
    let name = req.name.unwrap_or(existing.name);
    let description = req.description.or(existing.description);
    let plan = req.plan.unwrap_or(existing.plan);
    let status = req.status.unwrap_or(existing.status);
    let priority = req.priority.unwrap_or(existing.priority);
    let progress_summary = req.progress_summary.or(existing.progress_summary);
    let last_output = req.last_output.or(existing.last_output);
    let error_message = req.error_message.or(existing.error_message);
    let merge_mode = req.merge_mode.or(existing.merge_mode);
    let pr_url = req.pr_url.or(existing.pr_url);
    let target_repo_path = req.target_repo_path.or(existing.target_repo_path);
    let completion_action = req.completion_action.or(existing.completion_action);
    // Handle clear_daemon_id: if true, set to NULL; otherwise use provided value or keep existing
    let daemon_id = if req.clear_daemon_id {
        None
    } else {
        req.daemon_id.or(existing.daemon_id)
    };

    // Update with version check in WHERE clause for race condition safety
    let result = if req.version.is_some() {
        sqlx::query_as::<_, Task>(
            r#"
            UPDATE tasks
            SET name = $2, description = $3, plan = $4, status = $5, priority = $6,
                progress_summary = $7, last_output = $8, error_message = $9,
                merge_mode = $10, pr_url = $11, daemon_id = $12,
                target_repo_path = $13, completion_action = $14, updated_at = NOW()
            WHERE id = $1 AND version = $15
            RETURNING *
            "#,
        )
        .bind(id)
        .bind(&name)
        .bind(&description)
        .bind(&plan)
        .bind(&status)
        .bind(priority)
        .bind(&progress_summary)
        .bind(&last_output)
        .bind(&error_message)
        .bind(&merge_mode)
        .bind(&pr_url)
        .bind(daemon_id)
        .bind(&target_repo_path)
        .bind(&completion_action)
        .bind(req.version.unwrap())
        .fetch_optional(pool)
        .await?
    } else {
        sqlx::query_as::<_, Task>(
            r#"
            UPDATE tasks
            SET name = $2, description = $3, plan = $4, status = $5, priority = $6,
                progress_summary = $7, last_output = $8, error_message = $9,
                merge_mode = $10, pr_url = $11, daemon_id = $12,
                target_repo_path = $13, completion_action = $14, updated_at = NOW()
            WHERE id = $1
            RETURNING *
            "#,
        )
        .bind(id)
        .bind(&name)
        .bind(&description)
        .bind(&plan)
        .bind(&status)
        .bind(priority)
        .bind(&progress_summary)
        .bind(&last_output)
        .bind(&error_message)
        .bind(&merge_mode)
        .bind(&pr_url)
        .bind(daemon_id)
        .bind(&target_repo_path)
        .bind(&completion_action)
        .fetch_optional(pool)
        .await?
    };

    // If versioned update returned None, there was a race condition
    if result.is_none() && req.version.is_some() {
        if let Some(current) = get_task(pool, id).await? {
            return Err(RepositoryError::VersionConflict {
                expected: req.version.unwrap(),
                actual: current.version,
            });
        }
    }

    Ok(result)
}

/// Delete a task by ID.
pub async fn delete_task(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(
        r#"
        DELETE FROM tasks
        WHERE id = $1
        "#,
    )
    .bind(id)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() > 0)
}

/// Count total tasks.
pub async fn count_tasks(pool: &PgPool) -> Result<i64, sqlx::Error> {
    let result: (i64,) = sqlx::query_as(
        "SELECT COUNT(*) FROM tasks WHERE parent_task_id IS NULL",
    )
    .fetch_one(pool)
    .await?;

    Ok(result.0)
}

// =============================================================================
// Owner-Scoped Task Functions
// =============================================================================

/// Create a new task for a specific owner.
pub async fn create_task_for_owner(
    pool: &PgPool,
    owner_id: Uuid,
    req: CreateTaskRequest,
) -> Result<Task, sqlx::Error> {
    // Calculate depth + inherit settings from parent if applicable.
    let (depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action) =
        if let Some(parent_id) = req.parent_task_id {
            let parent = get_task_for_owner(pool, parent_id, owner_id).await?
                .ok_or_else(|| sqlx::Error::RowNotFound)?;

            let new_depth = parent.depth + 1;
            if new_depth >= 2 {
                return Err(sqlx::Error::Protocol(format!(
                    "Maximum task depth exceeded. Cannot create subtask at depth {} (max is 1). Subtasks cannot have children.",
                    new_depth
                )));
            }

            let repo_url = req.repository_url.clone().or(parent.repository_url);
            let base_branch = req.base_branch.clone().or(parent.base_branch);
            let target_branch = req.target_branch.clone().or(parent.target_branch);
            let merge_mode = req.merge_mode.clone().or(parent.merge_mode);
            let target_repo_path = req.target_repo_path.clone().or(parent.target_repo_path);
            let completion_action = req.completion_action.clone();

            (new_depth, repo_url, base_branch, target_branch, merge_mode, target_repo_path, completion_action)
        } else {
            (
                0,
                req.repository_url.clone(),
                req.base_branch.clone(),
                req.target_branch.clone(),
                req.merge_mode.clone(),
                req.target_repo_path.clone(),
                req.completion_action.clone(),
            )
        };

    let copy_files_json = req.copy_files.as_ref().map(|f| serde_json::to_value(f).unwrap_or_default());

    // Resolve directive_document_id from the directive's currently-
    // active contract row (directive_documents table) so the task
    // lands under the right tasks/ subfolder in the sidebar. Failures
    // are non-fatal — the task is created with NULL document_id and
    // the sidebar tolerates that.
    let directive_document_id = match req.directive_id {
        Some(directive_id) => resolve_active_document_for_directive(pool, directive_id)
            .await
            .unwrap_or(None),
        None => None,
    };

    sqlx::query_as::<_, Task>(
        r#"
        INSERT INTO tasks (
            owner_id, parent_task_id, depth, name, description, plan, priority,
            repository_url, base_branch, target_branch, merge_mode,
            target_repo_path, completion_action, continue_from_task_id, copy_files,
            branched_from_task_id, conversation_state,
            directive_id, directive_step_id, directive_document_id
        )
        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)
        RETURNING *
        "#,
    )
    .bind(owner_id)
    .bind(req.parent_task_id)
    .bind(depth)
    .bind(&req.name)
    .bind(&req.description)
    .bind(&req.plan)
    .bind(req.priority)
    .bind(&repo_url)
    .bind(&base_branch)
    .bind(&target_branch)
    .bind(&merge_mode)
    .bind(&target_repo_path)
    .bind(&completion_action)
    .bind(&req.continue_from_task_id)
    .bind(&copy_files_json)
    .bind(&req.branched_from_task_id)
    .bind(&req.conversation_history)
    .bind(&req.directive_id)
    .bind(&req.directive_step_id)
    .bind(&directive_document_id)
    .fetch_one(pool)
    .await
}

/// Pick the directive's "current" document for tasks/steps to attach to.
///
/// Selection rule, in order of preference:
///   1. The most recently `updated_at` document with `status = 'active'`.
///   2. If no active doc exists, the most recently `updated_at` document
///      with `status = 'draft'` — covers the case where a fresh draft was
///      auto-created post-ship and the orchestrator is now spawning tasks
///      against it before the user has even touched it.
///   3. None — directive has no documents at all.
///
/// Returning `Ok(None)` is fine and expected (e.g., directives that pre-date
/// the document model on a fresh DB, or directives whose only doc is shipped
/// + no fresh draft exists yet). The task/step is then stored with
/// `directive_document_id = NULL`, which the sidebar already tolerates.
async fn resolve_active_document_for_directive(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<Option<Uuid>, sqlx::Error> {
    let row: Option<(Uuid,)> = sqlx::query_as(
        r#"
        SELECT id FROM directive_documents
        WHERE directive_id = $1
          AND status IN ('active', 'draft')
        ORDER BY
            CASE status WHEN 'active' THEN 0 WHEN 'draft' THEN 1 ELSE 2 END,
            updated_at DESC
        LIMIT 1
        "#,
    )
    .bind(directive_id)
    .fetch_optional(pool)
    .await?;
    Ok(row.map(|r| r.0))
}

/// Get a task by ID, scoped to owner.
pub async fn get_task_for_owner(
    pool: &PgPool,
    id: Uuid,
    owner_id: Uuid,
) -> Result<Option<Task>, sqlx::Error> {
    sqlx::query_as::<_, Task>(
        r#"
        SELECT *
        FROM tasks
        WHERE id = $1 AND owner_id = $2
        "#,
    )
    .bind(id)
    .bind(owner_id)
    .fetch_optional(pool)
    .await
}

/// List all top-level tasks (no parent) for an owner, ordered by created_at DESC.
/// Hidden tasks are excluded by default.
pub async fn list_tasks_for_owner(
    pool: &PgPool,
    owner_id: Uuid,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
    sqlx::query_as::<_, TaskSummary>(
        r#"
        SELECT
            t.id,
            t.parent_task_id, t.depth, t.name, t.status, t.priority,
            t.progress_summary,
            (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
            t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
        FROM tasks t
        WHERE t.owner_id = $1 AND t.parent_task_id IS NULL AND COALESCE(t.hidden, false) = false
        ORDER BY t.priority DESC, t.created_at DESC
        "#,
    )
    .bind(owner_id)
    .fetch_all(pool)
    .await
}

// =============================================================================
// Tmp directive — per-owner scratchpad
// =============================================================================

/// Get the owner's tmp directive, creating it on the fly if absent. Idempotent
/// thanks to the partial unique index on (owner_id) WHERE is_tmp.
///
/// We try an INSERT first with ON CONFLICT DO NOTHING; if a row was inserted
/// it's returned, otherwise we fall back to a SELECT for the row some other
/// request just created (or one that already existed).
pub async fn get_or_create_tmp_directive(
    pool: &PgPool,
    owner_id: Uuid,
) -> Result<Directive, sqlx::Error> {
    // Try insert first. RETURNING fires only if a row was actually written;
    // if the partial unique index trips (a tmp directive already exists)
    // we get None and fall through to the SELECT.
    let inserted = sqlx::query_as::<_, Directive>(
        r#"
        INSERT INTO directives
            (owner_id, title, goal, status, reconcile_mode, is_tmp)
        VALUES
            ($1, 'tmp', '', 'idle', 'auto', true)
        ON CONFLICT DO NOTHING
        RETURNING *
        "#,
    )
    .bind(owner_id)
    .fetch_optional(pool)
    .await?;

    if let Some(d) = inserted {
        return Ok(d);
    }

    // Pre-existing or just-created-by-someone-else: fetch.
    sqlx::query_as::<_, Directive>(
        r#"SELECT * FROM directives WHERE owner_id = $1 AND is_tmp = true LIMIT 1"#,
    )
    .bind(owner_id)
    .fetch_one(pool)
    .await
}

/// Find every tmp directive (across owners). Used by the 30-day expiry
/// sweep — we need to know which directives are scratchpads so we know
/// which tasks to age out.
pub async fn list_all_tmp_directives(
    pool: &PgPool,
) -> Result<Vec<Directive>, sqlx::Error> {
    sqlx::query_as::<_, Directive>(
        r#"SELECT * FROM directives WHERE is_tmp = true"#,
    )
    .fetch_all(pool)
    .await
}

/// Delete tasks attached to a tmp directive that are older than 30 days.
/// Returns the number of rows deleted (informational; we log it).
///
/// We only sweep top-level tasks (parent_task_id IS NULL) — subtasks die
/// when their parent dies via the FK cascade.
pub async fn delete_expired_tmp_tasks(
    pool: &PgPool,
    tmp_directive_id: Uuid,
) -> Result<u64, sqlx::Error> {
    let result = sqlx::query(
        r#"
        DELETE FROM tasks
         WHERE directive_id = $1
           AND parent_task_id IS NULL
           AND created_at < NOW() - INTERVAL '30 days'
        "#,
    )
    .bind(tmp_directive_id)
    .execute(pool)
    .await?;
    Ok(result.rows_affected())
}

/// List ephemeral tasks attached to a directive — tasks with `directive_id`
/// set but no `directive_step_id`. These are the "spinoff" tasks the user
/// created via the directive folder context menu, distinct from
/// step-spawned execution tasks. Hidden tasks excluded.
pub async fn list_ephemeral_directive_tasks_for_owner(
    pool: &PgPool,
    owner_id: Uuid,
    directive_id: Uuid,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
    sqlx::query_as::<_, TaskSummary>(
        r#"
        SELECT
            t.id,
            t.parent_task_id, t.depth, t.name, t.status, t.priority,
            t.progress_summary,
            (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
            t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
        FROM tasks t
        WHERE t.owner_id = $1
          AND t.directive_id = $2
          AND t.directive_step_id IS NULL
          AND t.parent_task_id IS NULL
          AND COALESCE(t.hidden, false) = false
        ORDER BY t.created_at DESC
        "#,
    )
    .bind(owner_id)
    .bind(directive_id)
    .fetch_all(pool)
    .await
}

/// List top-level tasks attached to the owner's tmp directive. These are
/// the scratchpad / orphan tasks surfaced under the sidebar's `tmp/`
/// folder. Auto-creates the tmp directive if it doesn't exist yet so the
/// caller never has to handle "no tmp directive".
pub async fn list_tmp_tasks_for_owner(
    pool: &PgPool,
    owner_id: Uuid,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
    let tmp = get_or_create_tmp_directive(pool, owner_id).await?;
    sqlx::query_as::<_, TaskSummary>(
        r#"
        SELECT
            t.id,
            t.parent_task_id, t.depth, t.name, t.status, t.priority,
            t.progress_summary,
            (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
            t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
        FROM tasks t
        WHERE t.owner_id = $1
          AND t.directive_id = $2
          AND t.parent_task_id IS NULL
          AND COALESCE(t.hidden, false) = false
        ORDER BY t.priority DESC, t.created_at DESC
        "#,
    )
    .bind(owner_id)
    .bind(tmp.id)
    .fetch_all(pool)
    .await
}

/// List subtasks of a parent task, scoped to owner.
pub async fn list_subtasks_for_owner(
    pool: &PgPool,
    parent_id: Uuid,
    owner_id: Uuid,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
    sqlx::query_as::<_, TaskSummary>(
        r#"
        SELECT
            t.id,
            t.parent_task_id, t.depth, t.name, t.status, t.priority,
            t.progress_summary,
            (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
            t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
        FROM tasks t
        WHERE t.owner_id = $1 AND t.parent_task_id = $2
        ORDER BY t.priority DESC, t.created_at DESC
        "#,
    )
    .bind(owner_id)
    .bind(parent_id)
    .fetch_all(pool)
    .await
}

/// Update a task by ID with optimistic locking, scoped to owner.
pub async fn update_task_for_owner(
    pool: &PgPool,
    id: Uuid,
    owner_id: Uuid,
    req: UpdateTaskRequest,
) -> Result<Option<Task>, RepositoryError> {
    // Get the existing task first (scoped to owner)
    let existing = get_task_for_owner(pool, id, owner_id).await?;
    let Some(existing) = existing else {
        return Ok(None);
    };

    // Check version if provided (optimistic locking)
    if let Some(expected_version) = req.version {
        if existing.version != expected_version {
            return Err(RepositoryError::VersionConflict {
                expected: expected_version,
                actual: existing.version,
            });
        }
    }

    // Apply updates
    let name = req.name.unwrap_or(existing.name);
    let description = req.description.or(existing.description);
    let plan = req.plan.unwrap_or(existing.plan);
    let status = req.status.unwrap_or(existing.status);
    let priority = req.priority.unwrap_or(existing.priority);
    let progress_summary = req.progress_summary.or(existing.progress_summary);
    let last_output = req.last_output.or(existing.last_output);
    let error_message = req.error_message.or(existing.error_message);
    let merge_mode = req.merge_mode.or(existing.merge_mode);
    let pr_url = req.pr_url.or(existing.pr_url);
    let repository_url = req.repository_url.or(existing.repository_url);
    let target_repo_path = req.target_repo_path.or(existing.target_repo_path);
    let completion_action = req.completion_action.or(existing.completion_action);
    let hidden = req.hidden.unwrap_or(existing.hidden);
    let daemon_id = if req.clear_daemon_id {
        None
    } else {
        req.daemon_id.or(existing.daemon_id)
    };

    // Update with version check in WHERE clause for race condition safety
    let result = if req.version.is_some() {
        sqlx::query_as::<_, Task>(
            r#"
            UPDATE tasks
            SET name = $3, description = $4, plan = $5, status = $6, priority = $7,
                progress_summary = $8, last_output = $9, error_message = $10,
                merge_mode = $11, pr_url = $12, daemon_id = $13,
                target_repo_path = $14, completion_action = $15, repository_url = $16,
                hidden = $17, updated_at = NOW()
            WHERE id = $1 AND owner_id = $2 AND version = $18
            RETURNING *
            "#,
        )
        .bind(id)
        .bind(owner_id)
        .bind(&name)
        .bind(&description)
        .bind(&plan)
        .bind(&status)
        .bind(priority)
        .bind(&progress_summary)
        .bind(&last_output)
        .bind(&error_message)
        .bind(&merge_mode)
        .bind(&pr_url)
        .bind(daemon_id)
        .bind(&target_repo_path)
        .bind(&completion_action)
        .bind(&repository_url)
        .bind(hidden)
        .bind(req.version.unwrap())
        .fetch_optional(pool)
        .await?
    } else {
        sqlx::query_as::<_, Task>(
            r#"
            UPDATE tasks
            SET name = $3, description = $4, plan = $5, status = $6, priority = $7,
                progress_summary = $8, last_output = $9, error_message = $10,
                merge_mode = $11, pr_url = $12, daemon_id = $13,
                target_repo_path = $14, completion_action = $15, repository_url = $16,
                hidden = $17, updated_at = NOW()
            WHERE id = $1 AND owner_id = $2
            RETURNING *
            "#,
        )
        .bind(id)
        .bind(owner_id)
        .bind(&name)
        .bind(&description)
        .bind(&plan)
        .bind(&status)
        .bind(priority)
        .bind(&progress_summary)
        .bind(&last_output)
        .bind(&error_message)
        .bind(&merge_mode)
        .bind(&pr_url)
        .bind(daemon_id)
        .bind(&target_repo_path)
        .bind(&completion_action)
        .bind(&repository_url)
        .bind(hidden)
        .fetch_optional(pool)
        .await?
    };

    // If versioned update returned None, there was a race condition
    if result.is_none() && req.version.is_some() {
        if let Some(current) = get_task_for_owner(pool, id, owner_id).await? {
            return Err(RepositoryError::VersionConflict {
                expected: req.version.unwrap(),
                actual: current.version,
            });
        }
    }

    Ok(result)
}

/// Delete a task by ID, scoped to owner.
pub async fn delete_task_for_owner(
    pool: &PgPool,
    id: Uuid,
    owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(
        r#"
        DELETE FROM tasks
        WHERE id = $1 AND owner_id = $2
        "#,
    )
    .bind(id)
    .bind(owner_id)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() > 0)
}

/// Update task status and record event.
pub async fn update_task_status(
    pool: &PgPool,
    id: Uuid,
    new_status: &str,
    event_data: Option<serde_json::Value>,
) -> Result<Option<Task>, sqlx::Error> {
    // Get existing status
    let existing = get_task(pool, id).await?;
    let Some(existing) = existing else {
        return Ok(None);
    };

    let previous_status = existing.status.clone();

    // Update task status
    let task = sqlx::query_as::<_, Task>(
        r#"
        UPDATE tasks
        SET status = $2, updated_at = NOW(),
            started_at = CASE WHEN $2 = 'running' AND started_at IS NULL THEN NOW() ELSE started_at END,
            completed_at = CASE WHEN $2 IN ('done', 'failed', 'merged') THEN NOW() ELSE completed_at END
        WHERE id = $1
        RETURNING *
        "#,
    )
    .bind(id)
    .bind(new_status)
    .fetch_optional(pool)
    .await?;

    // Record event
    if task.is_some() {
        let _ = create_task_event(
            pool,
            id,
            "status_change",
            Some(&previous_status),
            Some(new_status),
            event_data,
        )
        .await;
    }

    Ok(task)
}

// =============================================================================
// Task Event Functions
// =============================================================================

/// Create a task event.
pub async fn create_task_event(
    pool: &PgPool,
    task_id: Uuid,
    event_type: &str,
    previous_status: Option<&str>,
    new_status: Option<&str>,
    event_data: Option<serde_json::Value>,
) -> Result<TaskEvent, sqlx::Error> {
    sqlx::query_as::<_, TaskEvent>(
        r#"
        INSERT INTO task_events (task_id, event_type, previous_status, new_status, event_data)
        VALUES ($1, $2, $3, $4, $5)
        RETURNING *
        "#,
    )
    .bind(task_id)
    .bind(event_type)
    .bind(previous_status)
    .bind(new_status)
    .bind(event_data)
    .fetch_one(pool)
    .await
}

/// List events for a task.
pub async fn list_task_events(
    pool: &PgPool,
    task_id: Uuid,
    limit: Option<i64>,
) -> Result<Vec<TaskEvent>, sqlx::Error> {
    let limit = limit.unwrap_or(100);
    sqlx::query_as::<_, TaskEvent>(
        r#"
        SELECT *
        FROM task_events
        WHERE task_id = $1
        ORDER BY created_at DESC
        LIMIT $2
        "#,
    )
    .bind(task_id)
    .bind(limit)
    .fetch_all(pool)
    .await
}

// =============================================================================
// Daemon Functions
// =============================================================================

/// Register a new daemon connection.
pub async fn register_daemon(
    pool: &PgPool,
    owner_id: Uuid,
    connection_id: &str,
    hostname: Option<&str>,
    machine_id: Option<&str>,
    max_concurrent_tasks: i32,
) -> Result<Daemon, sqlx::Error> {
    sqlx::query_as::<_, Daemon>(
        r#"
        INSERT INTO daemons (owner_id, connection_id, hostname, machine_id, max_concurrent_tasks)
        VALUES ($1, $2, $3, $4, $5)
        RETURNING *
        "#,
    )
    .bind(owner_id)
    .bind(connection_id)
    .bind(hostname)
    .bind(machine_id)
    .bind(max_concurrent_tasks)
    .fetch_one(pool)
    .await
}

/// Get a daemon by ID.
pub async fn get_daemon(pool: &PgPool, id: Uuid) -> Result<Option<Daemon>, sqlx::Error> {
    sqlx::query_as::<_, Daemon>(
        r#"
        SELECT *
        FROM daemons
        WHERE id = $1
        "#,
    )
    .bind(id)
    .fetch_optional(pool)
    .await
}

/// Get a daemon by connection ID.
pub async fn get_daemon_by_connection(
    pool: &PgPool,
    connection_id: &str,
) -> Result<Option<Daemon>, sqlx::Error> {
    sqlx::query_as::<_, Daemon>(
        r#"
        SELECT *
        FROM daemons
        WHERE connection_id = $1
        "#,
    )
    .bind(connection_id)
    .fetch_optional(pool)
    .await
}

/// List all daemons.
pub async fn list_daemons(pool: &PgPool) -> Result<Vec<Daemon>, sqlx::Error> {
    sqlx::query_as::<_, Daemon>(
        r#"
        SELECT *
        FROM daemons
        ORDER BY connected_at DESC
        "#,
    )
    .fetch_all(pool)
    .await
}

/// List daemons for a specific owner.
pub async fn list_daemons_for_owner(pool: &PgPool, owner_id: Uuid) -> Result<Vec<Daemon>, sqlx::Error> {
    sqlx::query_as::<_, Daemon>(
        r#"
        SELECT *
        FROM daemons
        WHERE owner_id = $1
        ORDER BY connected_at DESC
        "#,
    )
    .bind(owner_id)
    .fetch_all(pool)
    .await
}

/// Get a daemon by ID for a specific owner.
pub async fn get_daemon_for_owner(pool: &PgPool, id: Uuid, owner_id: Uuid) -> Result<Option<Daemon>, sqlx::Error> {
    sqlx::query_as::<_, Daemon>(
        r#"
        SELECT *
        FROM daemons
        WHERE id = $1 AND owner_id = $2
        "#,
    )
    .bind(id)
    .bind(owner_id)
    .fetch_optional(pool)
    .await
}

/// Update daemon heartbeat.
pub async fn update_daemon_heartbeat(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(
        r#"
        UPDATE daemons
        SET last_heartbeat_at = NOW(), status = 'connected'
        WHERE id = $1
        "#,
    )
    .bind(id)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() > 0)
}

/// Update daemon status.
pub async fn update_daemon_status(
    pool: &PgPool,
    id: Uuid,
    status: &str,
) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(
        r#"
        UPDATE daemons
        SET status = $2,
            disconnected_at = CASE WHEN $2 = 'disconnected' THEN NOW() ELSE disconnected_at END
        WHERE id = $1
        "#,
    )
    .bind(id)
    .bind(status)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() > 0)
}

/// Mark daemon as disconnected by connection_id.
pub async fn disconnect_daemon_by_connection(
    pool: &PgPool,
    connection_id: &str,
) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(
        r#"
        UPDATE daemons
        SET status = 'disconnected',
            disconnected_at = NOW()
        WHERE connection_id = $1
        "#,
    )
    .bind(connection_id)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() > 0)
}

/// Update daemon task count.
pub async fn update_daemon_task_count(
    pool: &PgPool,
    id: Uuid,
    delta: i32,
) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(
        r#"
        UPDATE daemons
        SET current_task_count = GREATEST(0, current_task_count + $2)
        WHERE id = $1
        "#,
    )
    .bind(id)
    .bind(delta)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() > 0)
}

/// Delete a daemon by ID.
pub async fn delete_daemon(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(
        r#"
        DELETE FROM daemons
        WHERE id = $1
        "#,
    )
    .bind(id)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() > 0)
}

/// Delete a daemon by connection ID.
pub async fn delete_daemon_by_connection(
    pool: &PgPool,
    connection_id: &str,
) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(
        r#"
        DELETE FROM daemons
        WHERE connection_id = $1
        "#,
    )
    .bind(connection_id)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() > 0)
}

/// Count connected daemons.
pub async fn count_daemons(pool: &PgPool) -> Result<i64, sqlx::Error> {
    let result: (i64,) = sqlx::query_as(
        "SELECT COUNT(*) FROM daemons WHERE status = 'connected'",
    )
    .fetch_one(pool)
    .await?;

    Ok(result.0)
}

/// Delete stale daemons that haven't sent a heartbeat within the timeout.
/// Returns the number of deleted daemons.
pub async fn delete_stale_daemons(
    pool: &PgPool,
    timeout_seconds: i64,
) -> Result<u64, sqlx::Error> {
    let result = sqlx::query(
        r#"
        DELETE FROM daemons
        WHERE last_heartbeat_at < NOW() - INTERVAL '1 second' * $1
        "#,
    )
    .bind(timeout_seconds)
    .execute(pool)
    .await?;

    Ok(result.rows_affected())
}

// =============================================================================
// Sibling Awareness Functions
// =============================================================================

/// List sibling tasks (tasks with the same parent, excluding the given task).
pub async fn list_sibling_tasks(
    pool: &PgPool,
    task_id: Uuid,
    parent_id: Option<Uuid>,
) -> Result<Vec<TaskSummary>, sqlx::Error> {
    match parent_id {
        Some(parent) => {
            sqlx::query_as::<_, TaskSummary>(
                r#"
                SELECT
            t.id,
                    t.parent_task_id, t.depth, t.name, t.status, t.priority,
                    t.progress_summary,
                    (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
                    t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
                FROM tasks t
                WHERE t.parent_task_id = $1 AND t.id != $2
                ORDER BY t.priority DESC, t.created_at DESC
                "#,
            )
            .bind(parent)
            .bind(task_id)
            .fetch_all(pool)
            .await
        }
        None => {
            // Top-level tasks (no parent) - siblings are other top-level tasks
            sqlx::query_as::<_, TaskSummary>(
                r#"
                SELECT
            t.id,
                    t.parent_task_id, t.depth, t.name, t.status, t.priority,
                    t.progress_summary,
                    (SELECT COUNT(*) FROM tasks WHERE parent_task_id = t.id) as subtask_count,
                    t.version, COALESCE(t.hidden, false) as hidden, t.created_at, t.updated_at
                FROM tasks t
                WHERE t.parent_task_id IS NULL AND t.id != $1
                ORDER BY t.priority DESC, t.created_at DESC
                "#,
            )
            .bind(task_id)
            .fetch_all(pool)
            .await
        }
    }
}

/// Get running sibling tasks (for context injection).
pub async fn get_running_siblings(
    pool: &PgPool,
    owner_id: Uuid,
    task_id: Uuid,
    parent_id: Option<Uuid>,
) -> Result<Vec<Task>, sqlx::Error> {
    match parent_id {
        Some(parent) => {
            sqlx::query_as::<_, Task>(
                r#"
                SELECT *
                FROM tasks t
                WHERE t.owner_id = $1
                    AND t.parent_task_id = $2
                    AND t.id != $3
                    AND t.status = 'running'
                ORDER BY t.priority DESC
                "#,
            )
            .bind(owner_id)
            .bind(parent)
            .bind(task_id)
            .fetch_all(pool)
            .await
        }
        None => {
            sqlx::query_as::<_, Task>(
                r#"
                SELECT *
                FROM tasks t
                WHERE t.owner_id = $1
                    AND t.parent_task_id IS NULL
                    AND t.id != $2
                    AND t.status = 'running'
                ORDER BY t.priority DESC
                "#,
            )
            .bind(owner_id)
            .bind(task_id)
            .fetch_all(pool)
            .await
        }
    }
}

/// Get task with its siblings for context awareness.
pub async fn get_task_with_siblings(
    pool: &PgPool,
    id: Uuid,
) -> Result<Option<(Task, Vec<TaskSummary>)>, sqlx::Error> {
    let task = get_task(pool, id).await?;
    let Some(task) = task else {
        return Ok(None);
    };

    let siblings = list_sibling_tasks(pool, id, task.parent_task_id).await?;
    Ok(Some((task, siblings)))
}

// =============================================================================
// Task Output Persistence Functions
// =============================================================================

/// Save task output to the database.
/// This stores output in the task_events table with event_type='output'.
pub async fn save_task_output(
    pool: &PgPool,
    task_id: Uuid,
    message_type: &str,
    content: &str,
    tool_name: Option<&str>,
    tool_input: Option<serde_json::Value>,
    is_error: Option<bool>,
    cost_usd: Option<f64>,
    duration_ms: Option<u64>,
) -> Result<TaskEvent, sqlx::Error> {
    let event_data = serde_json::json!({
        "messageType": message_type,
        "content": content,
        "toolName": tool_name,
        "toolInput": tool_input,
        "isError": is_error,
        "costUsd": cost_usd,
        "durationMs": duration_ms,
    });

    create_task_event(pool, task_id, "output", None, None, Some(event_data)).await
}

/// Get task output from the database.
/// Retrieves all output events for a task, ordered by creation time.
pub async fn get_task_output(
    pool: &PgPool,
    task_id: Uuid,
    limit: Option<i64>,
) -> Result<Vec<TaskEvent>, sqlx::Error> {
    let limit = limit.unwrap_or(1000);
    sqlx::query_as::<_, TaskEvent>(
        r#"
        SELECT *
        FROM task_events
        WHERE task_id = $1 AND event_type = 'output'
        ORDER BY created_at ASC
        LIMIT $2
        "#,
    )
    .bind(task_id)
    .bind(limit)
    .fetch_all(pool)
    .await
}

/// Update task completion status with error message.
/// Sets the task status to 'done' or 'failed' and records completion time.
pub async fn complete_task(
    pool: &PgPool,
    task_id: Uuid,
    success: bool,
    error_message: Option<&str>,
) -> Result<Option<Task>, sqlx::Error> {
    let status = if success { "done" } else { "failed" };

    let task = sqlx::query_as::<_, Task>(
        r#"
        UPDATE tasks
        SET status = $2,
            error_message = COALESCE($3, error_message),
            completed_at = NOW(),
            updated_at = NOW()
        WHERE id = $1
        RETURNING *
        "#,
    )
    .bind(task_id)
    .bind(status)
    .bind(error_message)
    .fetch_optional(pool)
    .await?;

    // Record completion event
    if task.is_some() {
        let event_data = serde_json::json!({
            "success": success,
            "errorMessage": error_message,
        });
        let _ = create_task_event(
            pool,
            task_id,
            "complete",
            Some("running"),
            Some(status),
            Some(event_data),
        )
        .await;
    }

    Ok(task)
}


/// Get task tree from a specific root task.
pub async fn get_task_tree(pool: &PgPool, root_task_id: Uuid) -> Result<Vec<Task>, sqlx::Error> {
    sqlx::query_as::<_, Task>(
        r#"
        WITH RECURSIVE task_tree AS (
            -- Base case: the root task
            SELECT * FROM tasks WHERE id = $1
            UNION ALL
            -- Recursive case: children of current level
            SELECT t.* FROM tasks t
            JOIN task_tree tt ON t.parent_task_id = tt.id
        )
        SELECT * FROM task_tree
        ORDER BY depth, created_at
        "#,
    )
    .bind(root_task_id)
    .fetch_all(pool)
    .await
}

// ============================================================================
// Task Checkpoints
// ============================================================================

/// Create a checkpoint for a task.
pub async fn create_task_checkpoint(
    pool: &PgPool,
    task_id: Uuid,
    commit_sha: &str,
    branch_name: &str,
    message: &str,
    files_changed: Option<serde_json::Value>,
    lines_added: Option<i32>,
    lines_removed: Option<i32>,
) -> Result<TaskCheckpoint, sqlx::Error> {
    // Get current checkpoint count and increment
    let checkpoint_number: i32 = sqlx::query_scalar(
        "SELECT COALESCE(MAX(checkpoint_number), 0) + 1 FROM task_checkpoints WHERE task_id = $1",
    )
    .bind(task_id)
    .fetch_one(pool)
    .await?;

    // Update task's checkpoint tracking
    sqlx::query(
        r#"
        UPDATE tasks
        SET last_checkpoint_sha = $1,
            checkpoint_count = $2,
            checkpoint_message = $3,
            updated_at = NOW()
        WHERE id = $4
        "#,
    )
    .bind(commit_sha)
    .bind(checkpoint_number)
    .bind(message)
    .bind(task_id)
    .execute(pool)
    .await?;

    sqlx::query_as::<_, TaskCheckpoint>(
        r#"
        INSERT INTO task_checkpoints (
            task_id, checkpoint_number, commit_sha, branch_name, message,
            files_changed, lines_added, lines_removed
        )
        VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
        RETURNING *
        "#,
    )
    .bind(task_id)
    .bind(checkpoint_number)
    .bind(commit_sha)
    .bind(branch_name)
    .bind(message)
    .bind(files_changed)
    .bind(lines_added)
    .bind(lines_removed)
    .fetch_one(pool)
    .await
}

/// Get a checkpoint by ID.
pub async fn get_task_checkpoint(
    pool: &PgPool,
    id: Uuid,
) -> Result<Option<TaskCheckpoint>, sqlx::Error> {
    sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE id = $1")
        .bind(id)
        .fetch_optional(pool)
        .await
}

/// Get a checkpoint by commit SHA.
pub async fn get_task_checkpoint_by_sha(
    pool: &PgPool,
    commit_sha: &str,
) -> Result<Option<TaskCheckpoint>, sqlx::Error> {
    sqlx::query_as::<_, TaskCheckpoint>("SELECT * FROM task_checkpoints WHERE commit_sha = $1")
        .bind(commit_sha)
        .fetch_optional(pool)
        .await
}

/// List checkpoints for a task.
pub async fn list_task_checkpoints(
    pool: &PgPool,
    task_id: Uuid,
) -> Result<Vec<TaskCheckpoint>, sqlx::Error> {
    sqlx::query_as::<_, TaskCheckpoint>(
        "SELECT * FROM task_checkpoints WHERE task_id = $1 ORDER BY checkpoint_number DESC",
    )
    .bind(task_id)
    .fetch_all(pool)
    .await
}

// ============================================================================
// Daemon Selection
// ============================================================================

/// Get daemons with capacity info for selection.
pub async fn get_available_daemons(
    pool: &PgPool,
    owner_id: Uuid,
) -> Result<Vec<DaemonWithCapacity>, sqlx::Error> {
    sqlx::query_as::<_, DaemonWithCapacity>(
        r#"
        SELECT id, owner_id, connection_id, hostname, machine_id,
               max_concurrent_tasks, current_task_count,
               capacity_score, task_queue_length, supports_migration,
               status, last_heartbeat_at, connected_at
        FROM daemons
        WHERE owner_id = $1 AND status = 'connected'
        ORDER BY
            COALESCE(capacity_score, 100) DESC,
            (max_concurrent_tasks - current_task_count) DESC,
            COALESCE(task_queue_length, 0) ASC
        "#,
    )
    .bind(owner_id)
    .fetch_all(pool)
    .await
}

/// Get daemons with capacity info for selection, excluding specified daemon IDs.
/// Used for task retry to avoid reassigning to daemons that have already failed.
pub async fn get_available_daemons_excluding(
    pool: &PgPool,
    owner_id: Uuid,
    exclude_daemon_ids: &[Uuid],
) -> Result<Vec<DaemonWithCapacity>, sqlx::Error> {
    sqlx::query_as::<_, DaemonWithCapacity>(
        r#"
        SELECT id, owner_id, connection_id, hostname, machine_id,
               max_concurrent_tasks, current_task_count,
               capacity_score, task_queue_length, supports_migration,
               status, last_heartbeat_at, connected_at
        FROM daemons
        WHERE owner_id = $1
          AND status = 'connected'
          AND id != ALL($2)
        ORDER BY
            COALESCE(capacity_score, 100) DESC,
            (max_concurrent_tasks - current_task_count) DESC,
            COALESCE(task_queue_length, 0) ASC
        "#,
    )
    .bind(owner_id)
    .bind(exclude_daemon_ids)
    .fetch_all(pool)
    .await
}

/// Create a daemon task assignment.
pub async fn create_daemon_task_assignment(
    pool: &PgPool,
    daemon_id: Uuid,
    task_id: Uuid,
) -> Result<DaemonTaskAssignment, sqlx::Error> {
    sqlx::query_as::<_, DaemonTaskAssignment>(
        r#"
        INSERT INTO daemon_task_assignments (daemon_id, task_id)
        VALUES ($1, $2)
        RETURNING *
        "#,
    )
    .bind(daemon_id)
    .bind(task_id)
    .fetch_one(pool)
    .await
}

/// Update daemon task assignment status.
pub async fn update_daemon_task_assignment_status(
    pool: &PgPool,
    task_id: Uuid,
    status: &str,
) -> Result<DaemonTaskAssignment, sqlx::Error> {
    sqlx::query_as::<_, DaemonTaskAssignment>(
        r#"
        UPDATE daemon_task_assignments
        SET status = $1
        WHERE task_id = $2
        RETURNING *
        "#,
    )
    .bind(status)
    .bind(task_id)
    .fetch_one(pool)
    .await
}

/// Get daemon task assignment for a task.
pub async fn get_daemon_task_assignment(
    pool: &PgPool,
    task_id: Uuid,
) -> Result<Option<DaemonTaskAssignment>, sqlx::Error> {
    sqlx::query_as::<_, DaemonTaskAssignment>(
        "SELECT * FROM daemon_task_assignments WHERE task_id = $1",
    )
    .bind(task_id)
    .fetch_optional(pool)
    .await
}

// ============================================================================
// Repository History Functions
// ============================================================================

use super::models::RepositoryHistoryEntry;

/// List all repository history entries for an owner, ordered by use_count DESC, last_used_at DESC.
pub async fn list_repository_history_for_owner(
    pool: &PgPool,
    owner_id: Uuid,
) -> Result<Vec<RepositoryHistoryEntry>, sqlx::Error> {
    sqlx::query_as::<_, RepositoryHistoryEntry>(
        r#"
        SELECT id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at
        FROM repository_history
        WHERE owner_id = $1
        ORDER BY use_count DESC, last_used_at DESC
        "#,
    )
    .bind(owner_id)
    .fetch_all(pool)
    .await
}

/// Get repository suggestions for an owner, optionally filtered by source type and query.
pub async fn get_repository_suggestions(
    pool: &PgPool,
    owner_id: Uuid,
    source_type: Option<&str>,
    query: Option<&str>,
    limit: i32,
) -> Result<Vec<RepositoryHistoryEntry>, sqlx::Error> {
    // Build query dynamically based on filters
    let mut sql = String::from(
        r#"
        SELECT id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at
        FROM repository_history
        WHERE owner_id = $1
        "#,
    );

    let mut param_idx = 2;

    if source_type.is_some() {
        sql.push_str(&format!(" AND source_type = ${}", param_idx));
        param_idx += 1;
    }

    if query.is_some() {
        sql.push_str(&format!(
            " AND (LOWER(name) LIKE ${} OR LOWER(COALESCE(repository_url, '')) LIKE ${} OR LOWER(COALESCE(local_path, '')) LIKE ${})",
            param_idx, param_idx, param_idx
        ));
        param_idx += 1;
    }

    sql.push_str(&format!(
        " ORDER BY use_count DESC, last_used_at DESC LIMIT ${}",
        param_idx
    ));

    // Build and execute query with the appropriate bindings
    let mut query_builder = sqlx::query_as::<_, RepositoryHistoryEntry>(&sql).bind(owner_id);

    if let Some(st) = source_type {
        query_builder = query_builder.bind(st);
    }

    if let Some(q) = query {
        let search_pattern = format!("%{}%", q.to_lowercase());
        query_builder = query_builder.bind(search_pattern);
    }

    query_builder = query_builder.bind(limit);

    query_builder.fetch_all(pool).await
}

/// Add or update a repository history entry.
/// If an entry with the same URL (for remote) or path (for local) already exists,
/// increment use_count and update last_used_at and name.
/// Otherwise, create a new entry.
pub async fn add_or_update_repository_history(
    pool: &PgPool,
    owner_id: Uuid,
    name: &str,
    repository_url: Option<&str>,
    local_path: Option<&str>,
    source_type: &str,
) -> Result<RepositoryHistoryEntry, sqlx::Error> {
    // Use UPSERT (INSERT ... ON CONFLICT)
    if source_type == "remote" {
        let url = repository_url.ok_or_else(|| {
            sqlx::Error::Protocol("repository_url required for remote type".to_string())
        })?;

        sqlx::query_as::<_, RepositoryHistoryEntry>(
            r#"
            INSERT INTO repository_history (owner_id, name, repository_url, local_path, source_type, use_count, last_used_at)
            VALUES ($1, $2, $3, NULL, $4, 1, NOW())
            ON CONFLICT (owner_id, repository_url) WHERE source_type = 'remote' AND repository_url IS NOT NULL
            DO UPDATE SET
                name = EXCLUDED.name,
                use_count = repository_history.use_count + 1,
                last_used_at = NOW()
            RETURNING id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at
            "#,
        )
        .bind(owner_id)
        .bind(name)
        .bind(url)
        .bind(source_type)
        .fetch_one(pool)
        .await
    } else if source_type == "local" {
        let path = local_path.ok_or_else(|| {
            sqlx::Error::Protocol("local_path required for local type".to_string())
        })?;

        sqlx::query_as::<_, RepositoryHistoryEntry>(
            r#"
            INSERT INTO repository_history (owner_id, name, repository_url, local_path, source_type, use_count, last_used_at)
            VALUES ($1, $2, NULL, $3, $4, 1, NOW())
            ON CONFLICT (owner_id, local_path) WHERE source_type = 'local' AND local_path IS NOT NULL
            DO UPDATE SET
                name = EXCLUDED.name,
                use_count = repository_history.use_count + 1,
                last_used_at = NOW()
            RETURNING id, owner_id, name, repository_url, local_path, source_type, use_count, last_used_at, created_at
            "#,
        )
        .bind(owner_id)
        .bind(name)
        .bind(path)
        .bind(source_type)
        .fetch_one(pool)
        .await
    } else {
        Err(sqlx::Error::Protocol(format!(
            "Invalid source_type: {}",
            source_type
        )))
    }
}

/// Delete a repository history entry.
/// Returns true if an entry was deleted, false if not found.
pub async fn delete_repository_history(
    pool: &PgPool,
    id: Uuid,
    owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(
        r#"
        DELETE FROM repository_history
        WHERE id = $1 AND owner_id = $2
        "#,
    )
    .bind(id)
    .bind(owner_id)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() > 0)
}

// ============================================================================
// Conversation Snapshots
// ============================================================================

/// Create a new conversation snapshot
pub async fn create_conversation_snapshot(
    pool: &PgPool,
    task_id: Uuid,
    checkpoint_id: Option<Uuid>,
    snapshot_type: &str,
    message_count: i32,
    conversation_state: serde_json::Value,
    metadata: Option<serde_json::Value>,
) -> Result<ConversationSnapshot, sqlx::Error> {
    sqlx::query_as::<_, ConversationSnapshot>(
        r#"
        INSERT INTO conversation_snapshots (task_id, checkpoint_id, snapshot_type, message_count, conversation_state, metadata)
        VALUES ($1, $2, $3, $4, $5, $6)
        RETURNING *
        "#
    )
    .bind(task_id)
    .bind(checkpoint_id)
    .bind(snapshot_type)
    .bind(message_count)
    .bind(conversation_state)
    .bind(metadata)
    .fetch_one(pool)
    .await
}

/// Get a conversation snapshot by ID
pub async fn get_conversation_snapshot(
    pool: &PgPool,
    id: Uuid,
) -> Result<Option<ConversationSnapshot>, sqlx::Error> {
    sqlx::query_as::<_, ConversationSnapshot>(
        "SELECT * FROM conversation_snapshots WHERE id = $1"
    )
    .bind(id)
    .fetch_optional(pool)
    .await
}

/// Get conversation snapshot at a specific checkpoint
pub async fn get_conversation_at_checkpoint(
    pool: &PgPool,
    checkpoint_id: Uuid,
) -> Result<Option<ConversationSnapshot>, sqlx::Error> {
    sqlx::query_as::<_, ConversationSnapshot>(
        "SELECT * FROM conversation_snapshots WHERE checkpoint_id = $1 ORDER BY created_at DESC LIMIT 1"
    )
    .bind(checkpoint_id)
    .fetch_optional(pool)
    .await
}

/// List conversation snapshots for a task
pub async fn list_conversation_snapshots(
    pool: &PgPool,
    task_id: Uuid,
    limit: Option<i32>,
) -> Result<Vec<ConversationSnapshot>, sqlx::Error> {
    let limit = limit.unwrap_or(100);
    sqlx::query_as::<_, ConversationSnapshot>(
        "SELECT * FROM conversation_snapshots WHERE task_id = $1 ORDER BY created_at DESC LIMIT $2"
    )
    .bind(task_id)
    .bind(limit)
    .fetch_all(pool)
    .await
}

/// Delete conversation snapshots older than retention period
pub async fn cleanup_old_snapshots(
    pool: &PgPool,
    retention_days: i32,
) -> Result<u64, sqlx::Error> {
    let result = sqlx::query(
        "DELETE FROM conversation_snapshots WHERE created_at < NOW() - INTERVAL '1 day' * $1"
    )
    .bind(retention_days)
    .execute(pool)
    .await?;
    Ok(result.rows_affected())
}

// ============================================================================
// History Events
// ============================================================================

/// Record a new history event
#[allow(clippy::too_many_arguments)]
pub async fn record_history_event(
    pool: &PgPool,
    owner_id: Uuid,
    task_id: Option<Uuid>,
    event_type: &str,
    event_subtype: Option<&str>,
    event_data: serde_json::Value,
) -> Result<HistoryEvent, sqlx::Error> {
    sqlx::query_as::<_, HistoryEvent>(
        r#"
        INSERT INTO history_events (owner_id, task_id, event_type, event_subtype, event_data)
        VALUES ($1, $2, $3, $4, $5)
        RETURNING *
        "#
    )
    .bind(owner_id)
    .bind(task_id)
    .bind(event_type)
    .bind(event_subtype)
    .bind(event_data)
    .fetch_one(pool)
    .await
}

/// Get task history
pub async fn get_task_history(
    pool: &PgPool,
    task_id: Uuid,
    owner_id: Uuid,
    filters: &HistoryQueryFilters,
) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> {
    let limit = filters.limit.unwrap_or(100);

    let events = sqlx::query_as::<_, HistoryEvent>(
        r#"
        SELECT * FROM history_events
        WHERE task_id = $1 AND owner_id = $2
        ORDER BY created_at DESC
        LIMIT $3
        "#
    )
    .bind(task_id)
    .bind(owner_id)
    .bind(limit)
    .fetch_all(pool)
    .await?;

    let count: i64 = sqlx::query_scalar(
        "SELECT COUNT(*) FROM history_events WHERE task_id = $1 AND owner_id = $2"
    )
    .bind(task_id)
    .bind(owner_id)
    .fetch_one(pool)
    .await?;

    Ok((events, count))
}

/// Get unified timeline for an owner
pub async fn get_timeline(
    pool: &PgPool,
    owner_id: Uuid,
    filters: &HistoryQueryFilters,
) -> Result<(Vec<HistoryEvent>, i64), sqlx::Error> {
    let limit = filters.limit.unwrap_or(100);

    let events = sqlx::query_as::<_, HistoryEvent>(
        r#"
        SELECT * FROM history_events
        WHERE owner_id = $1
        ORDER BY created_at DESC
        LIMIT $2
        "#
    )
    .bind(owner_id)
    .bind(limit)
    .fetch_all(pool)
    .await?;

    let count: i64 = sqlx::query_scalar(
        "SELECT COUNT(*) FROM history_events WHERE owner_id = $1"
    )
    .bind(owner_id)
    .fetch_one(pool)
    .await?;

    Ok((events, count))
}

// ============================================================================
// Task Conversation Retrieval
// ============================================================================

// Helper struct for parsing task output events
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskOutputEvent {
    message_type: String,
    content: Option<String>,
    tool_name: Option<String>,
    tool_input: Option<serde_json::Value>,
    is_error: Option<bool>,
    cost_usd: Option<f32>,
}

/// Get task conversation messages (reconstructed from task_events)
pub async fn get_task_conversation(
    pool: &PgPool,
    task_id: Uuid,
    include_tool_calls: bool,
    include_tool_results: bool,
    limit: Option<i32>,
) -> Result<Vec<ConversationMessage>, sqlx::Error> {
    let limit = limit.unwrap_or(1000);

    // Get output events that represent conversation turns
    let events = sqlx::query_as::<_, TaskEvent>(
        r#"
        SELECT * FROM task_events
        WHERE task_id = $1 AND event_type = 'output'
        ORDER BY created_at ASC
        LIMIT $2
        "#
    )
    .bind(task_id)
    .bind(limit)
    .fetch_all(pool)
    .await?;

    // Convert task events to conversation messages
    let mut messages = Vec::new();
    for event in events {
        if let Some(data) = event.event_data {
            // Parse the event data to extract message info
            if let Ok(output) = serde_json::from_value::<TaskOutputEvent>(data.clone()) {
                let should_include = match output.message_type.as_str() {
                    "tool_use" => include_tool_calls,
                    "tool_result" => include_tool_results,
                    _ => true,
                };

                if should_include {
                    messages.push(ConversationMessage {
                        id: event.id.to_string(),
                        role: match output.message_type.as_str() {
                            "assistant" => "assistant".to_string(),
                            "tool_use" => "assistant".to_string(),
                            "tool_result" => "tool".to_string(),
                            "system" => "system".to_string(),
                            "error" => "system".to_string(),
                            _ => "user".to_string(),
                        },
                        content: output.content.unwrap_or_default(),
                        timestamp: event.created_at,
                        tool_calls: None,
                        tool_name: output.tool_name,
                        tool_input: output.tool_input,
                        tool_result: None,
                        is_error: output.is_error,
                        token_count: None,
                        cost_usd: output.cost_usd.map(|c| c as f64),
                    });
                }
            }
        }
    }

    Ok(messages)
}


// =============================================================================
// Anonymous Task Cleanup Functions
// =============================================================================

/// Delete stale anonymous tasks (tasks with contract_id = NULL) that:
/// - Are in a terminal state (done, failed, merged)
/// - Are older than the specified number of days
///
/// Returns the number of deleted tasks.
pub async fn cleanup_stale_anonymous_tasks(
    pool: &PgPool,
    max_age_days: i32,
) -> Result<i64, sqlx::Error> {
    let result = sqlx::query(
        r#"
        DELETE FROM tasks
        WHERE contract_id IS NULL
          AND status IN ('done', 'failed', 'merged')
          AND created_at < NOW() - INTERVAL '1 day' * $1
        "#,
    )
    .bind(max_age_days)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() as i64)
}

// ============================================================================
// Checkpoint Patches (for task recovery)
// ============================================================================

/// Create a checkpoint patch for task recovery.
pub async fn create_checkpoint_patch(
    pool: &PgPool,
    task_id: Uuid,
    checkpoint_id: Option<Uuid>,
    base_commit_sha: &str,
    patch_data: &[u8],
    files_count: i32,
    ttl_hours: i64,
) -> Result<CheckpointPatch, sqlx::Error> {
    sqlx::query_as::<_, CheckpointPatch>(
        r#"
        INSERT INTO checkpoint_patches (
            task_id, checkpoint_id, base_commit_sha, patch_data,
            patch_size_bytes, files_count, expires_at
        )
        VALUES ($1, $2, $3, $4, $5, $6, NOW() + INTERVAL '1 hour' * $7)
        RETURNING *
        "#,
    )
    .bind(task_id)
    .bind(checkpoint_id)
    .bind(base_commit_sha)
    .bind(patch_data)
    .bind(patch_data.len() as i32)
    .bind(files_count)
    .bind(ttl_hours)
    .fetch_one(pool)
    .await
}

/// Get the latest checkpoint patch for a task.
pub async fn get_latest_checkpoint_patch(
    pool: &PgPool,
    task_id: Uuid,
) -> Result<Option<CheckpointPatch>, sqlx::Error> {
    sqlx::query_as::<_, CheckpointPatch>(
        r#"
        SELECT * FROM checkpoint_patches
        WHERE task_id = $1 AND expires_at > NOW()
        ORDER BY created_at DESC
        LIMIT 1
        "#,
    )
    .bind(task_id)
    .fetch_optional(pool)
    .await
}

/// Get a checkpoint patch by ID.
pub async fn get_checkpoint_patch(
    pool: &PgPool,
    id: Uuid,
) -> Result<Option<CheckpointPatch>, sqlx::Error> {
    sqlx::query_as::<_, CheckpointPatch>(
        "SELECT * FROM checkpoint_patches WHERE id = $1",
    )
    .bind(id)
    .fetch_optional(pool)
    .await
}

/// List all checkpoint patches for a task (without patch data for efficiency).
pub async fn list_checkpoint_patches(
    pool: &PgPool,
    task_id: Uuid,
) -> Result<Vec<CheckpointPatchInfo>, sqlx::Error> {
    sqlx::query_as::<_, CheckpointPatchInfo>(
        r#"
        SELECT id, task_id, checkpoint_id, base_commit_sha,
               patch_size_bytes, files_count, created_at, expires_at
        FROM checkpoint_patches
        WHERE task_id = $1
        ORDER BY created_at DESC
        "#,
    )
    .bind(task_id)
    .fetch_all(pool)
    .await
}

/// Delete expired checkpoint patches.
/// Returns the number of deleted patches.
pub async fn cleanup_expired_checkpoint_patches(
    pool: &PgPool,
) -> Result<i64, sqlx::Error> {
    let result = sqlx::query("DELETE FROM checkpoint_patches WHERE expires_at < NOW()")
        .execute(pool)
        .await?;
    Ok(result.rows_affected() as i64)
}

/// Delete all checkpoint patches for a task.
pub async fn delete_checkpoint_patches_for_task(
    pool: &PgPool,
    task_id: Uuid,
) -> Result<i64, sqlx::Error> {
    let result = sqlx::query("DELETE FROM checkpoint_patches WHERE task_id = $1")
        .bind(task_id)
        .execute(pool)
        .await?;
    Ok(result.rows_affected() as i64)
}


// =============================================================================
// Directive CRUD
// =============================================================================

/// Create a new directive for an owner.
///
/// If `req.contract_body` is set, also auto-creates a first contract
/// with that body so the directive is immediately ready to start. Both
/// inserts run in the same transaction.
pub async fn create_directive_for_owner(
    pool: &PgPool,
    owner_id: Uuid,
    req: CreateDirectiveRequest,
) -> Result<Directive, sqlx::Error> {
    let mut tx = pool.begin().await?;

    let directive = sqlx::query_as::<_, Directive>(
        r#"
        INSERT INTO directives (owner_id, title, repository_url, local_path, base_branch, reconcile_mode)
        VALUES ($1, $2, $3, $4, $5, $6)
        RETURNING *
        "#,
    )
    .bind(owner_id)
    .bind(&req.title)
    .bind(&req.repository_url)
    .bind(&req.local_path)
    .bind(&req.base_branch)
    .bind(req.reconcile_mode.as_deref().unwrap_or("auto"))
    .fetch_one(&mut *tx)
    .await?;

    if let Some(body) = &req.contract_body {
        sqlx::query(
            r#"
            INSERT INTO directive_documents (directive_id, title, body, status, position)
            VALUES ($1, '', $2, 'draft', 0)
            "#,
        )
        .bind(directive.id)
        .bind(body)
        .execute(&mut *tx)
        .await?;
    }

    tx.commit().await?;
    Ok(directive)
}

/// Resolve the body of the directive's "current spec" — the active
/// contract's body, falling back to the most-recently-updated draft if
/// none is active. Returns empty string when the directive has no
/// usable contracts (orchestrator should refuse to spawn in that case).
pub async fn get_active_contract_body(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<String, sqlx::Error> {
    let row: Option<(String,)> = sqlx::query_as(
        r#"
        SELECT body FROM directive_documents
         WHERE directive_id = $1
           AND status IN ('active', 'queued', 'draft')
         ORDER BY
             CASE status
                 WHEN 'active' THEN 0
                 WHEN 'queued' THEN 1
                 WHEN 'draft'  THEN 2
                 ELSE 3
             END,
             updated_at DESC
         LIMIT 1
        "#,
    )
    .bind(directive_id)
    .fetch_optional(pool)
    .await?;
    Ok(row.map(|r| r.0).unwrap_or_default())
}

/// Get a single directive for an owner.
pub async fn get_directive_for_owner(
    pool: &PgPool,
    owner_id: Uuid,
    id: Uuid,
) -> Result<Option<Directive>, sqlx::Error> {
    sqlx::query_as::<_, Directive>(
        r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#,
    )
    .bind(id)
    .bind(owner_id)
    .fetch_optional(pool)
    .await
}

/// Get a directive without an owner scope check.
///
/// Used by background orchestration code that has already established the
/// directive identity through other means (e.g. it just received the
/// directive_id from a different already-authorized query). HTTP handlers
/// must continue to use `get_directive_for_owner` to enforce isolation.
pub async fn get_directive(
    pool: &PgPool,
    id: Uuid,
) -> Result<Option<Directive>, sqlx::Error> {
    sqlx::query_as::<_, Directive>(r#"SELECT * FROM directives WHERE id = $1"#)
        .bind(id)
        .fetch_optional(pool)
        .await
}

/// Get a directive with all its steps.
pub async fn get_directive_with_steps_for_owner(
    pool: &PgPool,
    owner_id: Uuid,
    id: Uuid,
) -> Result<Option<(Directive, Vec<DirectiveStep>)>, sqlx::Error> {
    let directive = sqlx::query_as::<_, Directive>(
        r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#,
    )
    .bind(id)
    .bind(owner_id)
    .fetch_optional(pool)
    .await?;

    match directive {
        Some(d) => {
            let steps = list_directive_steps(pool, d.id).await?;
            Ok(Some((d, steps)))
        }
        None => Ok(None),
    }
}

/// List all directives for an owner with step counts. Excludes the per-owner
/// tmp directive (the scratchpad surface; surfaced via the sidebar's
/// dedicated `tmp/` folder, not the regular directive list).
pub async fn list_directives_for_owner(
    pool: &PgPool,
    owner_id: Uuid,
) -> Result<Vec<DirectiveSummary>, sqlx::Error> {
    sqlx::query_as::<_, DirectiveSummary>(
        r#"
        SELECT
            d.id, d.owner_id, d.title, d.status, d.repository_url,
            d.orchestrator_task_id, d.pr_url, d.completion_task_id,
            d.reconcile_mode,
            d.version, d.created_at, d.updated_at,
            COALESCE(s.total_steps, 0) as total_steps,
            COALESCE(s.completed_steps, 0) as completed_steps,
            COALESCE(s.running_steps, 0) as running_steps,
            COALESCE(s.failed_steps, 0) as failed_steps
        FROM directives d
        LEFT JOIN LATERAL (
            SELECT
                COUNT(*) as total_steps,
                COUNT(*) FILTER (WHERE status = 'completed') as completed_steps,
                COUNT(*) FILTER (WHERE status = 'running') as running_steps,
                COUNT(*) FILTER (WHERE status = 'failed') as failed_steps
            FROM directive_steps
            WHERE directive_id = d.id
        ) s ON true
        WHERE d.owner_id = $1
          AND d.is_tmp = false
        ORDER BY d.created_at DESC
        "#,
    )
    .bind(owner_id)
    .fetch_all(pool)
    .await
}

/// Update a directive with optimistic locking.
pub async fn update_directive_for_owner(
    pool: &PgPool,
    owner_id: Uuid,
    id: Uuid,
    req: UpdateDirectiveRequest,
) -> Result<Option<Directive>, RepositoryError> {
    let current = sqlx::query_as::<_, Directive>(
        r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#,
    )
    .bind(id)
    .bind(owner_id)
    .fetch_optional(pool)
    .await
    .map_err(RepositoryError::Database)?;

    let current = match current {
        Some(c) => c,
        None => return Ok(None),
    };

    if let Some(expected_version) = req.version {
        if expected_version != current.version {
            return Err(RepositoryError::VersionConflict {
                expected: expected_version,
                actual: current.version,
            });
        }
    }

    let title = req.title.as_deref().unwrap_or(&current.title);
    let status = req.status.as_deref().unwrap_or(&current.status);
    let repository_url = req.repository_url.as_deref().or(current.repository_url.as_deref());
    let local_path = req.local_path.as_deref().or(current.local_path.as_deref());
    let base_branch = req.base_branch.as_deref().or(current.base_branch.as_deref());
    let orchestrator_task_id = req.orchestrator_task_id.or(current.orchestrator_task_id);
    let pr_url = req.pr_url.as_deref().or(current.pr_url.as_deref());
    let pr_branch = req.pr_branch.as_deref().or(current.pr_branch.as_deref());
    let reconcile_mode = req.reconcile_mode.clone().unwrap_or_else(|| current.reconcile_mode.clone());

    let result = sqlx::query_as::<_, Directive>(
        r#"
        UPDATE directives
        SET title = $3, status = $4, repository_url = $5, local_path = $6,
            base_branch = $7, orchestrator_task_id = $8, pr_url = $9, pr_branch = $10,
            reconcile_mode = $11,
            version = version + 1, updated_at = NOW()
        WHERE id = $1 AND owner_id = $2
        RETURNING *
        "#,
    )
    .bind(id)
    .bind(owner_id)
    .bind(title)
    .bind(status)
    .bind(repository_url)
    .bind(local_path)
    .bind(base_branch)
    .bind(orchestrator_task_id)
    .bind(pr_url)
    .bind(pr_branch)
    .bind(reconcile_mode)
    .fetch_optional(pool)
    .await
    .map_err(RepositoryError::Database)?;

    Ok(result)
}

/// Delete a directive for an owner.
pub async fn delete_directive_for_owner(
    pool: &PgPool,
    owner_id: Uuid,
    id: Uuid,
) -> Result<bool, sqlx::Error> {
    // Delete all tasks associated with this directive
    sqlx::query(
        r#"DELETE FROM tasks WHERE directive_id = $1 AND owner_id = $2"#,
    )
    .bind(id)
    .bind(owner_id)
    .execute(pool)
    .await?;

    let result = sqlx::query(
        r#"DELETE FROM directives WHERE id = $1 AND owner_id = $2"#,
    )
    .bind(id)
    .bind(owner_id)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() > 0)
}

/// Clean up terminal tasks associated with a directive.
///
/// Deletes tasks in terminal states (completed, failed, merged, done, interrupted)
/// that belong to this directive, excluding tasks currently referenced by
/// `completion_task_id` or `orchestrator_task_id` on the directive.
/// NULLs out `task_id` on directive_steps for deleted tasks.
pub async fn cleanup_directive_tasks(
    pool: &PgPool,
    owner_id: Uuid,
    directive_id: Uuid,
) -> Result<i64, sqlx::Error> {
    // NULL out task_id on steps that reference terminal tasks we're about to delete
    sqlx::query(
        r#"
        UPDATE directive_steps
        SET task_id = NULL
        WHERE directive_id = $1
          AND task_id IS NOT NULL
          AND task_id IN (
            SELECT t.id FROM tasks t
            WHERE t.directive_id = $1
              AND t.owner_id = $2
              AND t.status IN ('completed', 'failed', 'merged', 'done', 'interrupted')
              AND t.id NOT IN (
                SELECT COALESCE(d.completion_task_id, '00000000-0000-0000-0000-000000000000')
                FROM directives d WHERE d.id = $1
                UNION
                SELECT COALESCE(d.orchestrator_task_id, '00000000-0000-0000-0000-000000000000')
                FROM directives d WHERE d.id = $1
              )
          )
        "#,
    )
    .bind(directive_id)
    .bind(owner_id)
    .execute(pool)
    .await?;

    // Delete terminal tasks not currently referenced by the directive
    let result = sqlx::query(
        r#"
        DELETE FROM tasks
        WHERE directive_id = $1
          AND owner_id = $2
          AND status IN ('completed', 'failed', 'merged', 'done', 'interrupted')
          AND id NOT IN (
            SELECT COALESCE(d.completion_task_id, '00000000-0000-0000-0000-000000000000')
            FROM directives d WHERE d.id = $1
            UNION
            SELECT COALESCE(d.orchestrator_task_id, '00000000-0000-0000-0000-000000000000')
            FROM directives d WHERE d.id = $1
          )
        "#,
    )
    .bind(directive_id)
    .bind(owner_id)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() as i64)
}

// =============================================================================
// Directive Completion Helpers
// =============================================================================

/// Row type for completed step tasks.
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct CompletedStepTask {
    pub step_id: Uuid,
    pub step_name: String,
    pub task_id: Uuid,
    pub task_name: String,
}

/// Row type for directive completion task status check.
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct DirectiveCompletionCheck {
    pub directive_id: Uuid,
    pub owner_id: Uuid,
    pub completion_task_id: Uuid,
    pub task_status: String,
    pub pr_url: Option<String>,
    pub task_name: String,
}

/// Get idle directives that need a completion task spawned.
/// Conditions: status = 'idle', no completion_task_id, has repository_url,
/// and has at least one completed step with a task_id.
pub async fn get_idle_directives_needing_completion(
    pool: &PgPool,
) -> Result<Vec<Directive>, sqlx::Error> {
    sqlx::query_as::<_, Directive>(
        r#"
        SELECT d.*
        FROM directives d
        WHERE d.status = 'idle'
          AND d.completion_task_id IS NULL
          AND d.pr_branch IS NULL
          AND d.repository_url IS NOT NULL
          AND EXISTS (
              SELECT 1 FROM directive_steps ds
              WHERE ds.directive_id = d.id
                AND ds.status = 'completed'
                AND ds.task_id IS NOT NULL
          )
        "#,
    )
    .fetch_all(pool)
    .await
}

/// Get directives that attempted completion (pr_branch set) but have no PR URL yet
/// and no active completion task. These need a verification task spawned.
pub async fn get_directives_needing_verification(
    pool: &PgPool,
) -> Result<Vec<Directive>, sqlx::Error> {
    sqlx::query_as::<_, Directive>(
        r#"
        SELECT d.*
        FROM directives d
        WHERE d.status = 'idle'
          AND d.pr_branch IS NOT NULL
          AND d.pr_url IS NULL
          AND d.completion_task_id IS NULL
          AND d.repository_url IS NOT NULL
        "#,
    )
    .fetch_all(pool)
    .await
}

/// Get directives with active completion tasks, joined with task status.
pub async fn get_completion_tasks_to_check(
    pool: &PgPool,
) -> Result<Vec<DirectiveCompletionCheck>, sqlx::Error> {
    sqlx::query_as::<_, DirectiveCompletionCheck>(
        r#"
        SELECT d.id as directive_id, d.owner_id, d.completion_task_id, t.status as task_status, d.pr_url, t.name as task_name
        FROM directives d
        JOIN tasks t ON t.id = d.completion_task_id
        WHERE d.completion_task_id IS NOT NULL
        "#,
    )
    .fetch_all(pool)
    .await
}

/// Atomically claim a directive for completion by setting a placeholder completion_task_id.
/// Returns true if the claim was successful (no other task already claimed it).
pub async fn claim_directive_for_completion(
    pool: &PgPool,
    directive_id: Uuid,
    task_id: Uuid,
) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(
        r#"UPDATE directives SET completion_task_id = $2, updated_at = NOW()
           WHERE id = $1 AND completion_task_id IS NULL"#,
    )
    .bind(directive_id)
    .bind(task_id)
    .execute(pool)
    .await?;
    Ok(result.rows_affected() > 0)
}

/// Assign a completion task to a directive (unconditional update).
pub async fn assign_completion_task(
    pool: &PgPool,
    directive_id: Uuid,
    task_id: Uuid,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"UPDATE directives SET completion_task_id = $2, updated_at = NOW() WHERE id = $1"#,
    )
    .bind(directive_id)
    .bind(task_id)
    .execute(pool)
    .await?;
    Ok(())
}

/// Clear the completion task from a directive.
pub async fn clear_completion_task(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"UPDATE directives SET completion_task_id = NULL, updated_at = NOW() WHERE id = $1"#,
    )
    .bind(directive_id)
    .execute(pool)
    .await?;
    Ok(())
}

/// Get completed step tasks for a directive (steps that have completed with an assigned task).
pub async fn get_completed_step_tasks(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<Vec<CompletedStepTask>, sqlx::Error> {
    sqlx::query_as::<_, CompletedStepTask>(
        r#"
        SELECT ds.id as step_id, ds.name as step_name, ds.task_id, t.name as task_name
        FROM directive_steps ds
        JOIN tasks t ON t.id = ds.task_id
        WHERE ds.directive_id = $1
          AND ds.status = 'completed'
          AND ds.task_id IS NOT NULL
        ORDER BY ds.order_index, ds.created_at
        "#,
    )
    .bind(directive_id)
    .fetch_all(pool)
    .await
}

/// Get the task ID of the most recently completed step for a directive.
/// Used as a fallback `continue_from_task_id` when dispatching new-generation steps
/// that have no explicit dependencies and no PR branch to continue from.
pub async fn get_last_completed_step_task_id(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<Option<Uuid>, sqlx::Error> {
    let row: Option<(Uuid,)> = sqlx::query_as(
        r#"
        SELECT ds.task_id
        FROM directive_steps ds
        WHERE ds.directive_id = $1
          AND ds.status = 'completed'
          AND ds.task_id IS NOT NULL
        ORDER BY ds.updated_at DESC
        LIMIT 1
        "#,
    )
    .bind(directive_id)
    .fetch_optional(pool)
    .await?;
    Ok(row.map(|r| r.0))
}

// =============================================================================
// Directive Step CRUD
// =============================================================================

/// Get a single directive step by ID.
pub async fn get_directive_step(
    pool: &PgPool,
    step_id: Uuid,
) -> Result<Option<DirectiveStep>, sqlx::Error> {
    sqlx::query_as::<_, DirectiveStep>(
        r#"SELECT * FROM directive_steps WHERE id = $1"#,
    )
    .bind(step_id)
    .fetch_optional(pool)
    .await
}

/// List all steps for a directive, ordered by order_index.
pub async fn list_directive_steps(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<Vec<DirectiveStep>, sqlx::Error> {
    sqlx::query_as::<_, DirectiveStep>(
        r#"
        SELECT * FROM directive_steps
        WHERE directive_id = $1
        ORDER BY order_index, created_at
        "#,
    )
    .bind(directive_id)
    .fetch_all(pool)
    .await
}

/// Create a single directive step.
pub async fn create_directive_step(
    pool: &PgPool,
    directive_id: Uuid,
    req: CreateDirectiveStepRequest,
) -> Result<DirectiveStep, sqlx::Error> {
    let generation = req.generation.unwrap_or(1);
    let order_id = req.order_id;
    let contract_type = req.contract_type.clone();

    // Resolve the document this step belongs to. If the caller supplied one,
    // honour it; otherwise pick the directive's most recently-updated
    // active (or draft) document. Steps that can't be matched to any
    // document fall through with NULL — the sidebar treats those as
    // directive-level orphans.
    let directive_document_id = match req.directive_document_id {
        Some(id) => Some(id),
        None => resolve_active_document_for_directive(pool, directive_id)
            .await
            .unwrap_or(None),
    };

    let step = sqlx::query_as::<_, DirectiveStep>(
        r#"
        INSERT INTO directive_steps (
            directive_id, name, description, task_plan, depends_on,
            order_index, generation, contract_type, directive_document_id
        )
        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
        RETURNING *
        "#,
    )
    .bind(directive_id)
    .bind(&req.name)
    .bind(&req.description)
    .bind(&req.task_plan)
    .bind(&req.depends_on)
    .bind(req.order_index)
    .bind(generation)
    .bind(&contract_type)
    .bind(&directive_document_id)
    .fetch_one(pool)
    .await?;

    // If an order_id was provided, auto-link the order to this step
    if let Some(oid) = order_id {
        sqlx::query(
            r#"UPDATE orders SET directive_step_id = $1, updated_at = NOW() WHERE id = $2"#,
        )
        .bind(step.id)
        .bind(oid)
        .execute(pool)
        .await?;
    }

    Ok(step)
}

/// Batch create multiple directive steps.
pub async fn batch_create_directive_steps(
    pool: &PgPool,
    directive_id: Uuid,
    steps: Vec<CreateDirectiveStepRequest>,
) -> Result<Vec<DirectiveStep>, sqlx::Error> {
    let mut results = Vec::with_capacity(steps.len());
    for req in steps {
        let step = create_directive_step(pool, directive_id, req).await?;
        results.push(step);
    }
    Ok(results)
}

/// Update a directive step.
pub async fn update_directive_step(
    pool: &PgPool,
    step_id: Uuid,
    req: UpdateDirectiveStepRequest,
) -> Result<Option<DirectiveStep>, sqlx::Error> {
    let current = sqlx::query_as::<_, DirectiveStep>(
        r#"SELECT * FROM directive_steps WHERE id = $1"#,
    )
    .bind(step_id)
    .fetch_optional(pool)
    .await?;

    let current = match current {
        Some(c) => c,
        None => return Ok(None),
    };

    let name = req.name.as_deref().unwrap_or(&current.name);
    let description = req.description.as_deref().or(current.description.as_deref());
    let task_plan = req.task_plan.as_deref().or(current.task_plan.as_deref());
    let depends_on = req.depends_on.as_deref().unwrap_or(&current.depends_on);
    let status = req.status.as_deref().unwrap_or(&current.status);
    let task_id = req.task_id.or(current.task_id);
    let order_index = req.order_index.unwrap_or(current.order_index);

    // Set started_at when transitioning to running
    let started_at = if status == "running" && current.status != "running" {
        Some(Utc::now())
    } else {
        current.started_at
    };

    // Set completed_at when transitioning to terminal state
    let completed_at = if matches!(status, "completed" | "failed" | "skipped")
        && !matches!(current.status.as_str(), "completed" | "failed" | "skipped")
    {
        Some(Utc::now())
    } else {
        current.completed_at
    };

    sqlx::query_as::<_, DirectiveStep>(
        r#"
        UPDATE directive_steps
        SET name = $2, description = $3, task_plan = $4, depends_on = $5,
            status = $6, task_id = $7, order_index = $8, started_at = $9, completed_at = $10
        WHERE id = $1
        RETURNING *
        "#,
    )
    .bind(step_id)
    .bind(name)
    .bind(description)
    .bind(task_plan)
    .bind(depends_on)
    .bind(status)
    .bind(task_id)
    .bind(order_index)
    .bind(started_at)
    .bind(completed_at)
    .fetch_optional(pool)
    .await
}

/// Delete a directive step.
pub async fn delete_directive_step(
    pool: &PgPool,
    step_id: Uuid,
) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(r#"DELETE FROM directive_steps WHERE id = $1"#)
        .bind(step_id)
        .execute(pool)
        .await?;

    Ok(result.rows_affected() > 0)
}

/// Delete all directive steps that have not started execution (pending, ready, failed, skipped).
/// Completed and running steps are preserved.
/// Returns the number of deleted steps.
pub async fn clear_pending_directive_steps(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<u64, sqlx::Error> {
    let result = sqlx::query(
        r#"DELETE FROM directive_steps
           WHERE directive_id = $1
             AND status IN ('pending', 'ready', 'failed', 'skipped')"#,
    )
    .bind(directive_id)
    .execute(pool)
    .await?;

    Ok(result.rows_affected())
}

// =============================================================================
// Directive Document CRUD
// =============================================================================

/// List all contracts under a directive in queue order.
///
/// Ordered by `position` (lower = earlier), with `created_at` as a stable
/// tie-break. Position is the queue order in the unified directive UI;
/// only one contract is active at a time, and the next-up contract is
/// the lowest-position non-shipped row.
pub async fn list_directive_documents(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<Vec<DirectiveDocument>, sqlx::Error> {
    sqlx::query_as::<_, DirectiveDocument>(
        r#"
        SELECT * FROM directive_documents
        WHERE directive_id = $1
        ORDER BY position ASC, created_at ASC
        "#,
    )
    .bind(directive_id)
    .fetch_all(pool)
    .await
}

/// Get a single directive document by ID.
pub async fn get_directive_document(
    pool: &PgPool,
    document_id: Uuid,
) -> Result<Option<DirectiveDocument>, sqlx::Error> {
    sqlx::query_as::<_, DirectiveDocument>(
        r#"SELECT * FROM directive_documents WHERE id = $1"#,
    )
    .bind(document_id)
    .fetch_optional(pool)
    .await
}

/// Create a new directive document (contract). Status defaults to 'draft'.
///
/// The new row's `position` is computed server-side as
/// `MAX(position) + 1` over the directive's existing contracts, so it
/// lands at the bottom of the queue. Callers that want to insert in the
/// middle should call `reorder_directive_document_position` afterwards.
/// `merge_mode` defaults to 'shared' on creation; flip later via
/// `update_directive_document`.
pub async fn create_directive_document(
    pool: &PgPool,
    directive_id: Uuid,
    title: &str,
    body: &str,
) -> Result<DirectiveDocument, sqlx::Error> {
    sqlx::query_as::<_, DirectiveDocument>(
        r#"
        INSERT INTO directive_documents (directive_id, title, body, status, position)
        VALUES (
            $1, $2, $3, 'draft',
            COALESCE(
                (SELECT MAX(position) + 1 FROM directive_documents WHERE directive_id = $1),
                0
            )
        )
        RETURNING *
        "#,
    )
    .bind(directive_id)
    .bind(title)
    .bind(body)
    .fetch_one(pool)
    .await
}

/// Update a directive document's title and/or body.
///
/// Bumps `version` and `updated_at`. If the document was previously in the
/// `shipped` state and the body actually changed, the status flips back to
/// `active` and `shipped_at` is cleared — this implements the "editing a
/// shipped contract reactivates it" behaviour. The wiring from the API /
/// handlers will be added in a later step.
pub async fn update_directive_document(
    pool: &PgPool,
    document_id: Uuid,
    title: Option<&str>,
    body: Option<&str>,
    merge_mode: Option<&str>,
) -> Result<Option<DirectiveDocument>, sqlx::Error> {
    let current = sqlx::query_as::<_, DirectiveDocument>(
        r#"SELECT * FROM directive_documents WHERE id = $1"#,
    )
    .bind(document_id)
    .fetch_optional(pool)
    .await?;

    let current = match current {
        Some(c) => c,
        None => return Ok(None),
    };

    let new_title = title.unwrap_or(&current.title);
    let new_body = body.unwrap_or(&current.body);
    let new_merge_mode = merge_mode.unwrap_or(&current.merge_mode);
    let body_changed = new_body != current.body;

    // Reactivation rule: editing the body of a shipped doc flips it back
    // to 'active' and clears shipped_at. Other status transitions remain
    // untouched here and are handled by the dedicated mark/archive helpers.
    let reactivate_from_shipped =
        current.status == "shipped" && body_changed;
    let new_status = if reactivate_from_shipped {
        "active"
    } else {
        current.status.as_str()
    };

    let result = sqlx::query_as::<_, DirectiveDocument>(
        r#"
        UPDATE directive_documents
        SET title = $2,
            body = $3,
            status = $4,
            shipped_at = CASE WHEN $5 THEN NULL ELSE shipped_at END,
            merge_mode = $6,
            version = version + 1,
            updated_at = NOW()
        WHERE id = $1
        RETURNING *
        "#,
    )
    .bind(document_id)
    .bind(new_title)
    .bind(new_body)
    .bind(new_status)
    .bind(reactivate_from_shipped)
    .bind(new_merge_mode)
    .fetch_optional(pool)
    .await?;

    Ok(result)
}

/// Move a contract to a new queue position within its directive.
///
/// Implementation: a single SQL CTE that bumps siblings out of the way
/// based on whether we're moving forward (later) or backward (earlier).
/// Returns the updated contract row.
pub async fn reorder_directive_document_position(
    pool: &PgPool,
    document_id: Uuid,
    new_position: i32,
) -> Result<Option<DirectiveDocument>, sqlx::Error> {
    let mut tx = pool.begin().await?;

    let current = sqlx::query_as::<_, DirectiveDocument>(
        r#"SELECT * FROM directive_documents WHERE id = $1"#,
    )
    .bind(document_id)
    .fetch_optional(&mut *tx)
    .await?;

    let current = match current {
        Some(c) => c,
        None => return Ok(None),
    };

    if current.position == new_position {
        tx.commit().await?;
        return Ok(Some(current));
    }

    // Shift siblings to make room. Moving forward (new > old) drags the
    // intermediate range back by one; moving backward pushes it forward.
    if new_position > current.position {
        sqlx::query(
            r#"
            UPDATE directive_documents
               SET position = position - 1
             WHERE directive_id = $1
               AND id <> $2
               AND position > $3
               AND position <= $4
            "#,
        )
        .bind(current.directive_id)
        .bind(document_id)
        .bind(current.position)
        .bind(new_position)
        .execute(&mut *tx)
        .await?;
    } else {
        sqlx::query(
            r#"
            UPDATE directive_documents
               SET position = position + 1
             WHERE directive_id = $1
               AND id <> $2
               AND position >= $3
               AND position < $4
            "#,
        )
        .bind(current.directive_id)
        .bind(document_id)
        .bind(new_position)
        .bind(current.position)
        .execute(&mut *tx)
        .await?;
    }

    let result = sqlx::query_as::<_, DirectiveDocument>(
        r#"
        UPDATE directive_documents
           SET position = $2,
               updated_at = NOW()
         WHERE id = $1
         RETURNING *
        "#,
    )
    .bind(document_id)
    .bind(new_position)
    .fetch_optional(&mut *tx)
    .await?;

    tx.commit().await?;
    Ok(result)
}

/// Mark a directive document as shipped (PR raised). Sets pr_url, optional
/// pr_branch, status = 'shipped', shipped_at = NOW(), and bumps version.
pub async fn mark_directive_document_shipped(
    pool: &PgPool,
    document_id: Uuid,
    pr_url: &str,
    pr_branch: Option<&str>,
) -> Result<Option<DirectiveDocument>, sqlx::Error> {
    sqlx::query_as::<_, DirectiveDocument>(
        r#"
        UPDATE directive_documents
        SET status = 'shipped',
            pr_url = $2,
            pr_branch = COALESCE($3, pr_branch),
            shipped_at = NOW(),
            version = version + 1,
            updated_at = NOW()
        WHERE id = $1
        RETURNING *
        "#,
    )
    .bind(document_id)
    .bind(pr_url)
    .bind(pr_branch)
    .fetch_optional(pool)
    .await
}

/// Archive a directive document. Sets status = 'archived' and stamps
/// archived_at = NOW(). Idempotent — archiving an already-archived doc
/// re-stamps archived_at and bumps version.
///
/// If the archived contract was `active`, the next-up `queued` contract
/// in the same directive auto-promotes to `active` (sequential queue).
pub async fn archive_directive_document(
    pool: &PgPool,
    document_id: Uuid,
) -> Result<Option<DirectiveDocument>, sqlx::Error> {
    let mut tx = pool.begin().await?;

    let archived = sqlx::query_as::<_, DirectiveDocument>(
        r#"
        UPDATE directive_documents
        SET status = 'archived',
            archived_at = NOW(),
            version = version + 1,
            updated_at = NOW()
        WHERE id = $1
        RETURNING *
        "#,
    )
    .bind(document_id)
    .fetch_optional(&mut *tx)
    .await?;

    if let Some(ref doc) = archived {
        promote_next_queued_contract(&mut tx, doc.directive_id).await?;
    }

    tx.commit().await?;
    Ok(archived)
}

// ============================================================================
// Lifecycle transitions: start / pause / complete / unlock
//
// The lifecycle is `draft → queued → active → shipped → archived`. At most
// one contract per directive sits in `active` at a time — the queue is
// serialised because a directive owns a single shared worktree. Helpers
// below enforce that invariant in SQL transactions.
// ============================================================================

/// Lock a draft contract and either activate it (if no sibling is active)
/// or queue it. Returns the updated row, or `Ok(None)` if the contract
/// doesn't exist. Errors with `RepositoryError::Validation` if the
/// contract is in any state other than `draft`.
///
/// Side effect: if the contract enters `active`, the parent directive
/// is flipped to `active` (from `draft|paused|idle|inactive`). This is
/// what makes the orchestrator reconciler pick the directive up — its
/// gate is `directive.status = 'active' AND orchestrator_task_id IS NULL`.
pub async fn start_contract(
    pool: &PgPool,
    contract_id: Uuid,
) -> Result<Option<DirectiveDocument>, RepositoryError> {
    let mut tx = pool.begin().await?;

    let current = sqlx::query_as::<_, DirectiveDocument>(
        r#"SELECT * FROM directive_documents WHERE id = $1"#,
    )
    .bind(contract_id)
    .fetch_optional(&mut *tx)
    .await?;

    let current = match current {
        Some(c) => c,
        None => return Ok(None),
    };

    if current.status != "draft" {
        return Err(RepositoryError::Validation(format!(
            "contract is in status '{}'; only 'draft' contracts can be started",
            current.status
        )));
    }

    // If any sibling is already active, this one queues. Otherwise it
    // claims the active slot directly.
    let active_count: (i64,) = sqlx::query_as(
        r#"SELECT COUNT(*)::BIGINT FROM directive_documents
           WHERE directive_id = $1 AND status = 'active'"#,
    )
    .bind(current.directive_id)
    .fetch_one(&mut *tx)
    .await?;

    let new_status = if active_count.0 > 0 { "queued" } else { "active" };

    let updated = sqlx::query_as::<_, DirectiveDocument>(
        r#"
        UPDATE directive_documents
           SET status = $2,
               version = version + 1,
               updated_at = NOW()
         WHERE id = $1
        RETURNING *
        "#,
    )
    .bind(contract_id)
    .bind(new_status)
    .fetch_optional(&mut *tx)
    .await?;

    // Flip the parent directive to active so the reconciler picks it up.
    // Only when this contract is actually entering the active slot — a
    // queued contract doesn't drive planning by itself.
    if new_status == "active" {
        activate_parent_directive(&mut tx, current.directive_id).await?;
    }

    tx.commit().await?;
    Ok(updated)
}

/// Pause an active contract — moves it back to `queued` so the next
/// queued sibling can pick up the active slot. The orchestrator-daemon
/// stop is the caller's responsibility.
pub async fn pause_contract(
    pool: &PgPool,
    contract_id: Uuid,
) -> Result<Option<DirectiveDocument>, RepositoryError> {
    let mut tx = pool.begin().await?;

    let current = sqlx::query_as::<_, DirectiveDocument>(
        r#"SELECT * FROM directive_documents WHERE id = $1"#,
    )
    .bind(contract_id)
    .fetch_optional(&mut *tx)
    .await?;

    let current = match current {
        Some(c) => c,
        None => return Ok(None),
    };

    if current.status != "active" {
        return Err(RepositoryError::Validation(format!(
            "contract is in status '{}'; only 'active' contracts can be paused",
            current.status
        )));
    }

    let updated = sqlx::query_as::<_, DirectiveDocument>(
        r#"
        UPDATE directive_documents
           SET status = 'queued',
               version = version + 1,
               updated_at = NOW()
         WHERE id = $1
        RETURNING *
        "#,
    )
    .bind(contract_id)
    .fetch_optional(&mut *tx)
    .await?;

    // The slot is free — promote the next queued contract (lowest
    // position, excluding the one we just paused).
    promote_next_queued_contract(&mut tx, current.directive_id).await?;

    // If no contract is active after the pause+promote, pause the
    // directive too — stops the reconciler from spawning new planners
    // on what is now an idle directive.
    deactivate_parent_directive_if_no_active(
        &mut tx,
        current.directive_id,
        "paused",
    )
    .await?;

    tx.commit().await?;
    Ok(updated)
}

/// Mark an active contract as `shipped` — the work is done. Optional
/// pr_url / pr_branch are recorded if supplied. Promotes the next
/// queued sibling to `active`.
pub async fn complete_contract(
    pool: &PgPool,
    contract_id: Uuid,
    pr_url: Option<&str>,
    pr_branch: Option<&str>,
) -> Result<Option<DirectiveDocument>, RepositoryError> {
    let mut tx = pool.begin().await?;

    let current = sqlx::query_as::<_, DirectiveDocument>(
        r#"SELECT * FROM directive_documents WHERE id = $1"#,
    )
    .bind(contract_id)
    .fetch_optional(&mut *tx)
    .await?;

    let current = match current {
        Some(c) => c,
        None => return Ok(None),
    };

    if current.status != "active" && current.status != "queued" {
        return Err(RepositoryError::Validation(format!(
            "contract is in status '{}'; only 'active' or 'queued' contracts can be completed",
            current.status
        )));
    }

    let updated = sqlx::query_as::<_, DirectiveDocument>(
        r#"
        UPDATE directive_documents
           SET status = 'shipped',
               pr_url = COALESCE($2, pr_url),
               pr_branch = COALESCE($3, pr_branch),
               shipped_at = NOW(),
               version = version + 1,
               updated_at = NOW()
         WHERE id = $1
        RETURNING *
        "#,
    )
    .bind(contract_id)
    .bind(pr_url)
    .bind(pr_branch)
    .fetch_optional(&mut *tx)
    .await?;

    promote_next_queued_contract(&mut tx, current.directive_id).await?;

    // If the ship freed the active slot AND no queued contract was
    // available to promote, the directive itself goes inactive — its
    // iteration is shipped; the next cycle starts via reopen or a new
    // contract.
    deactivate_parent_directive_if_no_active(
        &mut tx,
        current.directive_id,
        "inactive",
    )
    .await?;

    tx.commit().await?;
    Ok(updated)
}

/// Unlock a queued or active contract back to `draft` so the spec is
/// editable again. If the contract was active, the slot frees and the
/// next queued sibling auto-promotes.
pub async fn unlock_contract(
    pool: &PgPool,
    contract_id: Uuid,
) -> Result<Option<DirectiveDocument>, RepositoryError> {
    let mut tx = pool.begin().await?;

    let current = sqlx::query_as::<_, DirectiveDocument>(
        r#"SELECT * FROM directive_documents WHERE id = $1"#,
    )
    .bind(contract_id)
    .fetch_optional(&mut *tx)
    .await?;

    let current = match current {
        Some(c) => c,
        None => return Ok(None),
    };

    if current.status != "queued" && current.status != "active" {
        return Err(RepositoryError::Validation(format!(
            "contract is in status '{}'; only 'queued' or 'active' contracts can be unlocked",
            current.status
        )));
    }

    let was_active = current.status == "active";

    let updated = sqlx::query_as::<_, DirectiveDocument>(
        r#"
        UPDATE directive_documents
           SET status = 'draft',
               version = version + 1,
               updated_at = NOW()
         WHERE id = $1
        RETURNING *
        "#,
    )
    .bind(contract_id)
    .fetch_optional(&mut *tx)
    .await?;

    if was_active {
        promote_next_queued_contract(&mut tx, current.directive_id).await?;
        // If unlocking the active contract leaves no other active under
        // the directive, pause the directive too.
        deactivate_parent_directive_if_no_active(
            &mut tx,
            current.directive_id,
            "paused",
        )
        .await?;
    }

    tx.commit().await?;
    Ok(updated)
}

/// Reopen a shipped contract for amendment. Flips the contract back to
/// `active`, re-activates the parent directive, and clears the
/// directive's PR linkage + orchestrator task so the reconciler spawns a
/// fresh planner. The planner uses `get_latest_merged_revision` to
/// detect the previously-shipped PR and frame the new plan as a delta.
pub async fn reopen_contract(
    pool: &PgPool,
    contract_id: Uuid,
) -> Result<Option<DirectiveDocument>, RepositoryError> {
    let mut tx = pool.begin().await?;

    let current = sqlx::query_as::<_, DirectiveDocument>(
        r#"SELECT * FROM directive_documents WHERE id = $1"#,
    )
    .bind(contract_id)
    .fetch_optional(&mut *tx)
    .await?;

    let current = match current {
        Some(c) => c,
        None => return Ok(None),
    };

    if current.status != "shipped" {
        return Err(RepositoryError::Validation(format!(
            "contract is in status '{}'; only 'shipped' contracts can be reopened",
            current.status
        )));
    }

    let updated = sqlx::query_as::<_, DirectiveDocument>(
        r#"
        UPDATE directive_documents
           SET status = 'active',
               version = version + 1,
               updated_at = NOW()
         WHERE id = $1
        RETURNING *
        "#,
    )
    .bind(contract_id)
    .fetch_optional(&mut *tx)
    .await?;

    // Re-activate the directive and clear the prior PR + orchestrator
    // linkage. Status is forced to `active` regardless of prior value
    // (except archived — guard against re-opening under an archived
    // directive).
    sqlx::query(
        r#"
        UPDATE directives
           SET status = 'active',
               orchestrator_task_id = NULL,
               pr_url = NULL,
               pr_branch = NULL,
               updated_at = NOW(),
               version = version + 1
         WHERE id = $1 AND status <> 'archived'
        "#,
    )
    .bind(current.directive_id)
    .execute(&mut *tx)
    .await?;

    tx.commit().await?;
    Ok(updated)
}

/// Resolve the directive's currently-active contract id. Returns
/// `Ok(None)` when no active contract exists. Used by the
/// auto-complete-on-PR path so the contract row can be shipped at the
/// same moment the directive registers its PR url.
pub async fn get_active_contract_id_for_directive(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<Option<Uuid>, sqlx::Error> {
    let row: Option<(Uuid,)> = sqlx::query_as(
        r#"
        SELECT id FROM directive_documents
         WHERE directive_id = $1 AND status = 'active'
         ORDER BY position ASC, created_at ASC
         LIMIT 1
        "#,
    )
    .bind(directive_id)
    .fetch_optional(pool)
    .await?;
    Ok(row.map(|r| r.0))
}

/// Flip the parent directive to `active` when a child contract just
/// became active. Only promotes from `draft|paused|idle|inactive` —
/// leaves `archived` directives untouched.
async fn activate_parent_directive(
    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
    directive_id: Uuid,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        UPDATE directives
           SET status = 'active',
               updated_at = NOW(),
               version = version + 1
         WHERE id = $1
           AND status IN ('draft', 'paused', 'idle', 'inactive')
        "#,
    )
    .bind(directive_id)
    .execute(&mut **tx)
    .await?;
    Ok(())
}

/// After a contract lifecycle change that may have left no active
/// contract under the directive, transition the directive to the
/// supplied `new_status` (typically `'paused'` for unlock/pause flows,
/// `'inactive'` for ship). No-op if the directive still has an active
/// contract or is already past the destination state.
async fn deactivate_parent_directive_if_no_active(
    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
    directive_id: Uuid,
    new_status: &str,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        UPDATE directives
           SET status = $2,
               updated_at = NOW(),
               version = version + 1
         WHERE id = $1
           AND status = 'active'
           AND NOT EXISTS (
               SELECT 1 FROM directive_documents
                WHERE directive_id = $1 AND status = 'active'
           )
        "#,
    )
    .bind(directive_id)
    .bind(new_status)
    .execute(&mut **tx)
    .await?;
    Ok(())
}

/// Find the lowest-position `queued` contract under a directive and
/// flip it to `active`. No-op when no queued contract exists.
///
/// Caller must hold the parent transaction so the count → promote
/// sequence stays atomic w.r.t. other lifecycle transitions.
async fn promote_next_queued_contract(
    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
    directive_id: Uuid,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        UPDATE directive_documents
           SET status = 'active',
               version = version + 1,
               updated_at = NOW()
         WHERE id = (
             SELECT id FROM directive_documents
              WHERE directive_id = $1 AND status = 'queued'
              ORDER BY position ASC, created_at ASC
              LIMIT 1
         )
        "#,
    )
    .bind(directive_id)
    .execute(&mut **tx)
    .await?;
    Ok(())
}

/// Count the number of currently-active documents under a directive.
/// "Active" here means status = 'active' (not draft, shipped, or archived).
pub async fn count_active_directive_documents(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<i64, sqlx::Error> {
    let row: (i64,) = sqlx::query_as(
        r#"
        SELECT COUNT(*)::BIGINT
        FROM directive_documents
        WHERE directive_id = $1 AND status = 'active'
        "#,
    )
    .bind(directive_id)
    .fetch_one(pool)
    .await?;
    Ok(row.0)
}

/// List all tasks attached to a specific directive document.
///
/// This powers the per-document `tasks/` subfolder in the sidebar — when a
/// document ships, its tasks visually move with it under shipped/. Includes
/// both step-execution tasks (those with directive_step_id set) and
/// "ephemeral" / orchestrator-style tasks (those without a step).
///
/// Hidden tasks are filtered out so dismissed tasks don't reappear in the
/// document's task list.
pub async fn list_directive_document_tasks(
    pool: &PgPool,
    document_id: Uuid,
) -> Result<Vec<Task>, sqlx::Error> {
    sqlx::query_as::<_, Task>(
        r#"
        SELECT *
        FROM tasks
        WHERE directive_document_id = $1
          AND COALESCE(hidden, false) = false
        ORDER BY created_at DESC
        "#,
    )
    .bind(document_id)
    .fetch_all(pool)
    .await
}

/// List directive_steps attached to a specific document, ordered the same
/// way the directive page orders them (by order_index, then created_at).
pub async fn list_directive_document_steps(
    pool: &PgPool,
    document_id: Uuid,
) -> Result<Vec<DirectiveStep>, sqlx::Error> {
    sqlx::query_as::<_, DirectiveStep>(
        r#"
        SELECT *
        FROM directive_steps
        WHERE directive_document_id = $1
        ORDER BY order_index, created_at
        "#,
    )
    .bind(document_id)
    .fetch_all(pool)
    .await
}

// =============================================================================
// Directive DAG Progression
// =============================================================================

/// Advance pending steps to ready if all their dependencies are in terminal states.
/// Returns the newly-ready steps.
pub async fn advance_directive_ready_steps(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<Vec<DirectiveStep>, sqlx::Error> {
    sqlx::query_as::<_, DirectiveStep>(
        r#"
        UPDATE directive_steps SET status = 'ready'
        WHERE directive_id = $1 AND status = 'pending'
          AND NOT EXISTS (
            SELECT 1 FROM unnest(depends_on) AS dep_id
            JOIN directive_steps ds ON ds.id = dep_id
            WHERE ds.status NOT IN ('completed', 'skipped')
          )
          AND NOT EXISTS (
            SELECT 1 FROM directive_steps prev
            WHERE prev.directive_id = $1
              AND prev.order_index < directive_steps.order_index
              AND prev.status NOT IN ('completed', 'skipped', 'failed')
          )
        RETURNING *
        "#,
    )
    .bind(directive_id)
    .fetch_all(pool)
    .await
}

/// Check if all steps in a directive are in terminal states.
/// If so, set the directive to 'idle' (not completed — directives are ongoing).
/// Returns true if the directive was set to idle.
pub async fn check_directive_idle(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(
        r#"
        UPDATE directives SET status = 'idle', updated_at = NOW()
        WHERE id = $1 AND status = 'active'
          AND NOT EXISTS (
            SELECT 1 FROM directive_steps
            WHERE directive_id = $1
              AND status NOT IN ('completed', 'failed', 'skipped')
          )
          AND EXISTS (
            SELECT 1 FROM directive_steps WHERE directive_id = $1
          )
        "#,
    )
    .bind(directive_id)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() > 0)
}

/// Mark a directive 'inactive'. Used at the moment a PR is raised — at that
/// point the contract's current iteration is "shipped" and editing the goal
/// (Stage 4) starts an amendment cycle. Idempotent: no-op if status is
/// already inactive or already past it (e.g. archived).
pub async fn set_directive_inactive(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        UPDATE directives
        SET status = 'inactive',
            updated_at = NOW(),
            version = version + 1
        WHERE id = $1
          AND status IN ('active', 'idle', 'paused')
        "#,
    )
    .bind(directive_id)
    .execute(pool)
    .await?;
    Ok(())
}

/// Reset a directive for a "new draft" cycle: flip status to 'draft' and
/// detach the current pr_url / pr_branch / orchestrator linkage so the
/// next contract activation starts fresh. Prior revisions remain in
/// `directive_revisions` as the historical record.
pub async fn reset_directive_for_new_draft(
    pool: &PgPool,
    owner_id: Uuid,
    directive_id: Uuid,
) -> Result<Option<Directive>, sqlx::Error> {
    sqlx::query_as::<_, Directive>(
        r#"
        UPDATE directives
        SET status = 'draft',
            pr_url = NULL,
            pr_branch = NULL,
            orchestrator_task_id = NULL,
            completion_task_id = NULL,
            updated_at = NOW(),
            version = version + 1
        WHERE id = $1 AND owner_id = $2
        RETURNING *
        "#,
    )
    .bind(directive_id)
    .bind(owner_id)
    .fetch_optional(pool)
    .await
}

// =============================================================================
// Directive Revisions — per-PR snapshots of the contract content.
// =============================================================================

/// Snapshot the directive's current goal as a revision attached to the given
/// PR URL. The version is auto-assigned as MAX(existing) + 1 per directive.
/// Idempotent on (directive_id, pr_url): if a revision already exists for
/// this directive+pr_url combo, returns the existing row instead of creating
/// a duplicate.
pub async fn create_directive_revision(
    pool: &PgPool,
    directive_id: Uuid,
    content: &str,
    pr_url: &str,
    pr_branch: Option<&str>,
) -> Result<crate::db::models::DirectiveRevision, sqlx::Error> {
    // Idempotency: don't double-snapshot if the orchestrator's completion task
    // re-runs and re-sets the same pr_url.
    if let Some(existing) = sqlx::query_as::<_, crate::db::models::DirectiveRevision>(
        r#"
        SELECT * FROM directive_revisions
        WHERE directive_id = $1 AND pr_url = $2
        ORDER BY frozen_at DESC LIMIT 1
        "#,
    )
    .bind(directive_id)
    .bind(pr_url)
    .fetch_optional(pool)
    .await?
    {
        return Ok(existing);
    }

    sqlx::query_as::<_, crate::db::models::DirectiveRevision>(
        r#"
        INSERT INTO directive_revisions
            (directive_id, content, pr_url, pr_branch, pr_state, version, frozen_at)
        SELECT
            $1,
            $2,
            $3,
            $4,
            'open',
            COALESCE(MAX(version), 0) + 1,
            NOW()
        FROM directive_revisions
        WHERE directive_id = $1
        RETURNING *
        "#,
    )
    .bind(directive_id)
    .bind(content)
    .bind(pr_url)
    .bind(pr_branch)
    .fetch_one(pool)
    .await
}

/// List all revisions for a directive, newest first. Scoped by owner via the
/// directive join so callers don't accidentally surface other users' history.
pub async fn list_directive_revisions_for_owner(
    pool: &PgPool,
    owner_id: Uuid,
    directive_id: Uuid,
) -> Result<Vec<crate::db::models::DirectiveRevision>, sqlx::Error> {
    sqlx::query_as::<_, crate::db::models::DirectiveRevision>(
        r#"
        SELECT r.*
        FROM directive_revisions r
        JOIN directives d ON d.id = r.directive_id
        WHERE r.directive_id = $1 AND d.owner_id = $2
        ORDER BY r.frozen_at DESC
        "#,
    )
    .bind(directive_id)
    .bind(owner_id)
    .fetch_all(pool)
    .await
}

/// Update the pr_state on a revision (called by the reconciler when it
/// detects a PR transitioned to merged/closed). New state must be one of
/// 'open' | 'merged' | 'closed' to satisfy the table's CHECK constraint.
pub async fn update_directive_revision_pr_state(
    pool: &PgPool,
    revision_id: Uuid,
    new_state: &str,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"UPDATE directive_revisions SET pr_state = $2 WHERE id = $1"#,
    )
    .bind(revision_id)
    .bind(new_state)
    .execute(pool)
    .await?;
    Ok(())
}

/// Find the most recent merged revision for a directive — used when planning
/// an amendment to know what the previous "frozen" content was so the diff
/// can be passed to the orchestrator.
pub async fn get_latest_merged_revision(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<Option<crate::db::models::DirectiveRevision>, sqlx::Error> {
    sqlx::query_as::<_, crate::db::models::DirectiveRevision>(
        r#"
        SELECT * FROM directive_revisions
        WHERE directive_id = $1 AND pr_state = 'merged'
        ORDER BY frozen_at DESC
        LIMIT 1
        "#,
    )
    .bind(directive_id)
    .fetch_optional(pool)
    .await
}

/// Set a directive's status (used for start/pause/archive transitions).
pub async fn set_directive_status(
    pool: &PgPool,
    owner_id: Uuid,
    directive_id: Uuid,
    status: &str,
) -> Result<Option<Directive>, sqlx::Error> {
    let mut query = String::from(
        r#"UPDATE directives SET status = $3, updated_at = NOW(), version = version + 1"#,
    );
    if status == "active" {
        query.push_str(", started_at = COALESCE(started_at, NOW())");
    }
    query.push_str(" WHERE id = $1 AND owner_id = $2 RETURNING *");

    sqlx::query_as::<_, Directive>(&query)
        .bind(directive_id)
        .bind(owner_id)
        .bind(status)
        .fetch_optional(pool)
        .await
}

// =============================================================================
// Directive Orchestrator Queries
// =============================================================================

/// Get active directives that need planning (no steps, no orchestrator task).
pub async fn get_directives_needing_planning(
    pool: &PgPool,
) -> Result<Vec<Directive>, sqlx::Error> {
    sqlx::query_as::<_, Directive>(
        r#"
        SELECT d.* FROM directives d
        WHERE d.status = 'active'
          AND d.orchestrator_task_id IS NULL
          AND NOT EXISTS (
            SELECT 1 FROM directive_steps WHERE directive_id = d.id
          )
        "#,
    )
    .fetch_all(pool)
    .await
}

/// A step joined with minimal directive info for dispatch.
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct StepForDispatch {
    // Step fields
    pub step_id: Uuid,
    pub directive_id: Uuid,
    pub step_name: String,
    pub step_description: Option<String>,
    pub task_plan: Option<String>,
    pub order_index: i32,
    pub generation: i32,
    pub depends_on: Vec<Uuid>,
    /// Optional contract type — when set, orchestrator creates a contract instead of a task.
    pub contract_type: Option<String>,
    // Directive fields
    pub owner_id: Uuid,
    pub directive_title: String,
    pub repository_url: Option<String>,
    pub base_branch: Option<String>,
    /// The directive's PR branch (if a PR has already been created from previous steps).
    pub pr_branch: Option<String>,
    /// The directive's reconcile mode: "auto", "semi-auto", or "manual".
    pub reconcile_mode: String,
}

/// Get ready steps that need task dispatch.
pub async fn get_ready_steps_for_dispatch(
    pool: &PgPool,
) -> Result<Vec<StepForDispatch>, sqlx::Error> {
    sqlx::query_as::<_, StepForDispatch>(
        r#"
        SELECT
            ds.id AS step_id,
            ds.directive_id,
            ds.name AS step_name,
            ds.description AS step_description,
            ds.task_plan,
            ds.order_index,
            ds.generation,
            ds.depends_on,
            ds.contract_type,
            d.owner_id,
            d.title AS directive_title,
            d.repository_url,
            d.base_branch,
            d.pr_branch,
            d.reconcile_mode
        FROM directive_steps ds
        JOIN directives d ON d.id = ds.directive_id
        WHERE ds.status = 'ready'
          AND ds.task_id IS NULL
          AND ds.contract_id IS NULL
          AND d.status = 'active'
        ORDER BY ds.order_index
        "#,
    )
    .fetch_all(pool)
    .await
}

/// Task info for a dependency step (step → linked task).
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct DependencyTaskInfo {
    pub step_id: Uuid,
    pub task_id: Uuid,
    pub task_name: String,
}

/// Resolve dependency step UUIDs to their linked task IDs and names.
/// Returns results in the same order as the input `depends_on` slice.
pub async fn get_step_dependency_tasks(
    pool: &PgPool,
    depends_on: &[Uuid],
) -> Result<Vec<DependencyTaskInfo>, sqlx::Error> {
    if depends_on.is_empty() {
        return Ok(vec![]);
    }
    let rows = sqlx::query_as::<_, DependencyTaskInfo>(
        r#"
        SELECT ds.id AS step_id, t.id AS task_id, t.name AS task_name
        FROM directive_steps ds
        JOIN tasks t ON t.id = ds.task_id
        WHERE ds.id = ANY($1)
        "#,
    )
    .bind(depends_on)
    .fetch_all(pool)
    .await?;

    // Re-order to match input ordering
    let mut ordered = Vec::with_capacity(depends_on.len());
    for dep_id in depends_on {
        if let Some(row) = rows.iter().find(|r| r.step_id == *dep_id) {
            ordered.push(row.clone());
        }
    }
    Ok(ordered)
}

/// A running step joined with its task's current status.
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct RunningStepWithTask {
    pub step_id: Uuid,
    pub directive_id: Uuid,
    pub task_id: Uuid,
    pub task_status: String,
}

/// Get running steps with their task status for monitoring.
pub async fn get_running_steps_with_tasks(
    pool: &PgPool,
) -> Result<Vec<RunningStepWithTask>, sqlx::Error> {
    sqlx::query_as::<_, RunningStepWithTask>(
        r#"
        SELECT
            ds.id AS step_id,
            ds.directive_id,
            ds.task_id AS "task_id!",
            t.status AS task_status
        FROM directive_steps ds
        JOIN tasks t ON t.id = ds.task_id
        WHERE ds.status = 'running'
          AND ds.task_id IS NOT NULL
          AND ds.contract_id IS NULL
        "#,
    )
    .fetch_all(pool)
    .await
}


/// An orchestrator task to check (directive with pending planning task).
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct OrchestratorTaskCheck {
    pub directive_id: Uuid,
    pub orchestrator_task_id: Uuid,
    pub task_status: String,
    pub owner_id: Uuid,
}

/// Get directives with orchestrator tasks to check completion.
pub async fn get_orchestrator_tasks_to_check(
    pool: &PgPool,
) -> Result<Vec<OrchestratorTaskCheck>, sqlx::Error> {
    sqlx::query_as::<_, OrchestratorTaskCheck>(
        r#"
        SELECT
            d.id AS directive_id,
            d.orchestrator_task_id AS "orchestrator_task_id!",
            t.status AS task_status,
            d.owner_id
        FROM directives d
        JOIN tasks t ON t.id = d.orchestrator_task_id
        WHERE d.orchestrator_task_id IS NOT NULL
          AND d.status = 'active'
        "#,
    )
    .fetch_all(pool)
    .await
}

/// Get active directives that need re-planning (goal updated after latest step).
pub async fn get_directives_needing_replanning(
    pool: &PgPool,
) -> Result<Vec<Directive>, sqlx::Error> {
    sqlx::query_as::<_, Directive>(
        r#"
        SELECT d.* FROM directives d
        WHERE d.status = 'active'
          AND d.orchestrator_task_id IS NULL
          AND EXISTS (
            SELECT 1 FROM directive_steps WHERE directive_id = d.id
          )
          AND d.goal_updated_at > (
            SELECT COALESCE(MAX(ds.created_at), '1970-01-01'::timestamptz)
            FROM directive_steps ds WHERE ds.directive_id = d.id
          )
        "#,
    )
    .fetch_all(pool)
    .await
}

/// Assign a task to a step and set status to running.
pub async fn assign_task_to_step(
    pool: &PgPool,
    step_id: Uuid,
    task_id: Uuid,
) -> Result<Option<DirectiveStep>, sqlx::Error> {
    sqlx::query_as::<_, DirectiveStep>(
        r#"
        UPDATE directive_steps
        SET task_id = $2, status = 'running', started_at = NOW()
        WHERE id = $1
        RETURNING *
        "#,
    )
    .bind(step_id)
    .bind(task_id)
    .fetch_optional(pool)
    .await
}

/// Set the orchestrator_task_id on a directive.
pub async fn assign_orchestrator_task(
    pool: &PgPool,
    directive_id: Uuid,
    task_id: Uuid,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        UPDATE directives
        SET orchestrator_task_id = $2, updated_at = NOW()
        WHERE id = $1
        "#,
    )
    .bind(directive_id)
    .bind(task_id)
    .execute(pool)
    .await?;
    Ok(())
}

/// Clear the orchestrator_task_id on a directive.
pub async fn clear_orchestrator_task(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        UPDATE directives
        SET orchestrator_task_id = NULL, updated_at = NOW()
        WHERE id = $1
        "#,
    )
    .bind(directive_id)
    .execute(pool)
    .await?;
    Ok(())
}

/// Cancel old planning tasks for a directive.
/// Marks any non-terminal planning/re-planning tasks as interrupted,
/// excluding the given new task. Identifies planning tasks by name prefix
/// ("Plan: " or "Re-plan: ") to avoid cancelling completion/verification tasks.
pub async fn cancel_old_planning_tasks(
    pool: &PgPool,
    directive_id: Uuid,
    exclude_task_id: Uuid,
) -> Result<u64, sqlx::Error> {
    let result = sqlx::query(
        r#"
        UPDATE tasks
        SET status = 'interrupted',
            completed_at = NOW(),
            updated_at = NOW()
        WHERE directive_id = $1
          AND id != $2
          AND (name LIKE 'Plan: %' OR name LIKE 'Re-plan: %')
          AND status NOT IN ('completed', 'failed', 'merged', 'done', 'interrupted')
        "#,
    )
    .bind(directive_id)
    .bind(exclude_task_id)
    .execute(pool)
    .await?;
    Ok(result.rows_affected())
}

/// Link a task to a step without changing step status.
pub async fn link_task_to_step(
    pool: &PgPool,
    step_id: Uuid,
    task_id: Uuid,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        UPDATE directive_steps
        SET task_id = $2
        WHERE id = $1
        "#,
    )
    .bind(step_id)
    .bind(task_id)
    .execute(pool)
    .await?;
    Ok(())
}


/// Set a step to 'running' status (after its task has been dispatched).
pub async fn set_step_running(
    pool: &PgPool,
    step_id: Uuid,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        UPDATE directive_steps
        SET status = 'running', started_at = COALESCE(started_at, NOW())
        WHERE id = $1
        "#,
    )
    .bind(step_id)
    .execute(pool)
    .await?;
    Ok(())
}

/// Get pending directive tasks (tasks with directive_id that are still pending).
pub async fn get_pending_directive_tasks(
    pool: &PgPool,
) -> Result<Vec<Task>, sqlx::Error> {
    sqlx::query_as::<_, Task>(
        r#"
        SELECT * FROM tasks
        WHERE directive_id IS NOT NULL
          AND status = 'pending'
          AND daemon_id IS NULL
        ORDER BY created_at
        "#,
    )
    .fetch_all(pool)
    .await
}

/// Get the max generation number for steps in a directive.
pub async fn get_directive_max_generation(
    pool: &PgPool,
    directive_id: Uuid,
) -> Result<i32, sqlx::Error> {
    let row: (Option<i32>,) = sqlx::query_as(
        r#"SELECT MAX(generation) FROM directive_steps WHERE directive_id = $1"#,
    )
    .bind(directive_id)
    .fetch_one(pool)
    .await?;
    Ok(row.0.unwrap_or(0))
}

// =============================================================================
// Order CRUD
// =============================================================================

/// Create a new order for the given owner.
pub async fn create_order(
    pool: &PgPool,
    owner_id: Uuid,
    req: CreateOrderRequest,
) -> Result<Order, sqlx::Error> {
    let priority = req.priority.as_deref().unwrap_or("medium");
    let status = req.status.as_deref().unwrap_or("open");
    let order_type = req.order_type.as_deref().unwrap_or("feature");

    sqlx::query_as::<_, Order>(
        r#"
        INSERT INTO orders (owner_id, title, description, priority, status, order_type, labels, directive_id, repository_url, dog_id)
        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
        RETURNING *
        "#,
    )
    .bind(owner_id)
    .bind(&req.title)
    .bind(&req.description)
    .bind(priority)
    .bind(status)
    .bind(order_type)
    .bind(&req.labels)
    .bind(req.directive_id)
    .bind(&req.repository_url)
    .bind(req.dog_id)
    .fetch_one(pool)
    .await
}

/// List orders for the given owner with optional filters.
pub async fn list_orders(
    pool: &PgPool,
    owner_id: Uuid,
    status_filter: Option<&str>,
    type_filter: Option<&str>,
    priority_filter: Option<&str>,
    directive_id_filter: Option<Uuid>,
    dog_id_filter: Option<Uuid>,
    search_filter: Option<&str>,
) -> Result<Vec<Order>, sqlx::Error> {
    // Build dynamic query with optional filters
    let mut query = String::from("SELECT * FROM orders WHERE owner_id = $1");
    let mut param_idx = 2u32;

    if status_filter.is_some() {
        query.push_str(&format!(" AND status = ${}", param_idx));
        param_idx += 1;
    }
    if type_filter.is_some() {
        query.push_str(&format!(" AND order_type = ${}", param_idx));
        param_idx += 1;
    }
    if priority_filter.is_some() {
        query.push_str(&format!(" AND priority = ${}", param_idx));
        param_idx += 1;
    }
    if directive_id_filter.is_some() {
        query.push_str(&format!(" AND directive_id = ${}", param_idx));
        param_idx += 1;
    }
    if dog_id_filter.is_some() {
        query.push_str(&format!(" AND dog_id = ${}", param_idx));
        param_idx += 1;
    }
    if search_filter.is_some() {
        query.push_str(&format!(
            " AND (title ILIKE ${p} OR description ILIKE ${p} OR directive_name ILIKE ${p})",
            p = param_idx
        ));
        let _ = param_idx; // suppress unused warning
    }
    query.push_str(" ORDER BY created_at DESC");

    let mut q = sqlx::query_as::<_, Order>(&query).bind(owner_id);

    if let Some(s) = status_filter {
        q = q.bind(s);
    }
    if let Some(t) = type_filter {
        q = q.bind(t);
    }
    if let Some(p) = priority_filter {
        q = q.bind(p);
    }
    if let Some(d) = directive_id_filter {
        q = q.bind(d);
    }
    if let Some(d) = dog_id_filter {
        q = q.bind(d);
    }
    if let Some(s) = search_filter {
        q = q.bind(format!("%{}%", s));
    }

    q.fetch_all(pool).await
}

/// Get a single order by ID (owner-scoped).
pub async fn get_order(
    pool: &PgPool,
    owner_id: Uuid,
    order_id: Uuid,
) -> Result<Option<Order>, sqlx::Error> {
    sqlx::query_as::<_, Order>(
        r#"SELECT * FROM orders WHERE id = $1 AND owner_id = $2"#,
    )
    .bind(order_id)
    .bind(owner_id)
    .fetch_optional(pool)
    .await
}

/// Update an order (owner-scoped). Uses COALESCE pattern to only update provided fields.
pub async fn update_order(
    pool: &PgPool,
    owner_id: Uuid,
    order_id: Uuid,
    req: UpdateOrderRequest,
) -> Result<Option<Order>, sqlx::Error> {
    let current = sqlx::query_as::<_, Order>(
        r#"SELECT * FROM orders WHERE id = $1 AND owner_id = $2"#,
    )
    .bind(order_id)
    .bind(owner_id)
    .fetch_optional(pool)
    .await?;

    let current = match current {
        Some(c) => c,
        None => return Ok(None),
    };

    let title = req.title.as_deref().unwrap_or(&current.title);
    let description = req.description.as_deref().or(current.description.as_deref());
    let priority = req.priority.as_deref().unwrap_or(&current.priority);
    let status = req.status.as_deref().unwrap_or(&current.status);
    let order_type = req.order_type.as_deref().unwrap_or(&current.order_type);
    let labels = req.labels.as_ref().unwrap_or(&current.labels);
    let directive_id = req.directive_id.or(current.directive_id);
    let directive_step_id = req.directive_step_id.or(current.directive_step_id);
    let repository_url = req.repository_url.as_deref().or(current.repository_url.as_deref());
    let dog_id = req.dog_id.or(current.dog_id);

    sqlx::query_as::<_, Order>(
        r#"
        UPDATE orders
        SET title = $3, description = $4, priority = $5, status = $6,
            order_type = $7, labels = $8, directive_id = $9, directive_step_id = $10,
            repository_url = $11, dog_id = $12, updated_at = NOW()
        WHERE id = $1 AND owner_id = $2
        RETURNING *
        "#,
    )
    .bind(order_id)
    .bind(owner_id)
    .bind(title)
    .bind(description)
    .bind(priority)
    .bind(status)
    .bind(order_type)
    .bind(labels)
    .bind(directive_id)
    .bind(directive_step_id)
    .bind(repository_url)
    .bind(dog_id)
    .fetch_optional(pool)
    .await
}

/// Delete an order (owner-scoped). Returns true if a row was deleted.
pub async fn delete_order(
    pool: &PgPool,
    owner_id: Uuid,
    order_id: Uuid,
) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(
        r#"DELETE FROM orders WHERE id = $1 AND owner_id = $2"#,
    )
    .bind(order_id)
    .bind(owner_id)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() > 0)
}

/// Link an order to a directive.
pub async fn link_order_to_directive(
    pool: &PgPool,
    owner_id: Uuid,
    order_id: Uuid,
    directive_id: Uuid,
) -> Result<Option<Order>, sqlx::Error> {
    sqlx::query_as::<_, Order>(
        r#"
        UPDATE orders
        SET directive_id = $3, updated_at = NOW()
        WHERE id = $1 AND owner_id = $2
        RETURNING *
        "#,
    )
    .bind(order_id)
    .bind(owner_id)
    .bind(directive_id)
    .fetch_optional(pool)
    .await
}

/// Convert an order to a directive step. Creates a new DirectiveStep from the order's
/// title and description, links the order to the new step, and returns the created step.
/// Uses the order's existing directive_id (which is required for new orders).
pub async fn convert_order_to_step(
    pool: &PgPool,
    owner_id: Uuid,
    order_id: Uuid,
) -> Result<Option<DirectiveStep>, sqlx::Error> {
    // Verify the order exists and belongs to this owner
    let order = sqlx::query_as::<_, Order>(
        r#"SELECT * FROM orders WHERE id = $1 AND owner_id = $2"#,
    )
    .bind(order_id)
    .bind(owner_id)
    .fetch_optional(pool)
    .await?;

    let order = match order {
        Some(o) => o,
        None => return Ok(None),
    };

    // Get the directive_id from the order (required for new orders, but legacy data may have NULL)
    let directive_id = match order.directive_id {
        Some(id) => id,
        None => return Ok(None),
    };

    // Verify the directive exists and belongs to this owner
    let directive = sqlx::query_as::<_, Directive>(
        r#"SELECT * FROM directives WHERE id = $1 AND owner_id = $2"#,
    )
    .bind(directive_id)
    .bind(owner_id)
    .fetch_optional(pool)
    .await?;

    if directive.is_none() {
        return Ok(None);
    }

    // Get the next order_index for this directive
    let max_index: (Option<i32>,) = sqlx::query_as(
        r#"SELECT MAX(order_index) FROM directive_steps WHERE directive_id = $1"#,
    )
    .bind(directive_id)
    .fetch_one(pool)
    .await?;
    let next_index = max_index.0.unwrap_or(-1) + 1;

    // Create the directive step from order data
    let step = sqlx::query_as::<_, DirectiveStep>(
        r#"
        INSERT INTO directive_steps (directive_id, name, description, order_index)
        VALUES ($1, $2, $3, $4)
        RETURNING *
        "#,
    )
    .bind(directive_id)
    .bind(&order.title)
    .bind(&order.description)
    .bind(next_index)
    .fetch_one(pool)
    .await?;

    // Link the order to the new step
    sqlx::query(
        r#"
        UPDATE orders
        SET directive_step_id = $3, updated_at = NOW()
        WHERE id = $1 AND owner_id = $2
        "#,
    )
    .bind(order_id)
    .bind(owner_id)
    .bind(step.id)
    .execute(pool)
    .await?;

    Ok(Some(step))
}

// =============================================================================
// Order Pickup
// =============================================================================

/// Get available orders for pickup: open orders with no directive assigned
/// OR orders already linked to this specific directive that are not yet done,
/// sorted by priority (critical first) then creation date.
pub async fn get_available_orders_for_pickup(
    pool: &PgPool,
    owner_id: Uuid,
    directive_id: Uuid,
) -> Result<Vec<Order>, sqlx::Error> {
    sqlx::query_as::<_, Order>(
        r#"
        SELECT *
        FROM orders
        WHERE owner_id = $1
          AND status IN ('open', 'in_progress')
          AND (directive_id IS NULL OR directive_id = $2)
        ORDER BY CASE priority
            WHEN 'critical' THEN 0
            WHEN 'high' THEN 1
            WHEN 'medium' THEN 2
            WHEN 'low' THEN 3
            ELSE 4
        END ASC, created_at ASC
        "#,
    )
    .bind(owner_id)
    .bind(directive_id)
    .fetch_all(pool)
    .await
}

/// Bulk-link orders to a directive by setting directive_id on matching orders.
/// Returns the count of updated rows.
pub async fn bulk_link_orders_to_directive(
    pool: &PgPool,
    owner_id: Uuid,
    order_ids: &[Uuid],
    directive_id: Uuid,
) -> Result<i64, sqlx::Error> {
    let result = sqlx::query(
        r#"
        UPDATE orders
        SET directive_id = $1, updated_at = NOW()
        WHERE id = ANY($2)
          AND owner_id = $3
        "#,
    )
    .bind(directive_id)
    .bind(order_ids)
    .bind(owner_id)
    .execute(pool)
    .await?;
    Ok(result.rows_affected() as i64)
}

/// Bulk update order status for a set of order IDs.
/// Returns the count of updated rows.
pub async fn bulk_update_order_status(
    pool: &PgPool,
    owner_id: Uuid,
    order_ids: &[Uuid],
    status: &str,
) -> Result<i64, sqlx::Error> {
    let result = sqlx::query(
        r#"UPDATE orders SET status = $1, updated_at = NOW()
           WHERE id = ANY($2) AND owner_id = $3"#,
    )
    .bind(status)
    .bind(order_ids)
    .bind(owner_id)
    .execute(pool)
    .await?;
    Ok(result.rows_affected() as i64)
}

/// Get orders linked to a specific directive step.
pub async fn get_orders_by_step_id(
    pool: &PgPool,
    step_id: Uuid,
) -> Result<Vec<Order>, sqlx::Error> {
    sqlx::query_as::<_, Order>(
        r#"SELECT * FROM orders WHERE directive_step_id = $1"#,
    )
    .bind(step_id)
    .fetch_all(pool)
    .await
}

/// Reconcile directive orders by checking linked step statuses.
/// - Orders linked to completed steps are marked "done"
/// - Orders linked to running/ready steps are marked "under_review"
/// Returns the count of orders updated.
pub async fn reconcile_directive_orders(
    pool: &PgPool,
    owner_id: Uuid,
    directive_id: Uuid,
) -> Result<i64, sqlx::Error> {
    let rows: Vec<(Uuid,)> = sqlx::query_as(
        r#"
        UPDATE orders o
        SET status = CASE
            WHEN ds.status = 'completed' THEN 'done'
            WHEN ds.status IN ('running', 'ready') THEN 'under_review'
            ELSE o.status
          END,
          updated_at = NOW()
        FROM directive_steps ds
        WHERE o.directive_step_id = ds.id
          AND o.directive_id = $1
          AND o.owner_id = $2
          AND o.status NOT IN ('done', 'archived')
          AND ds.status IN ('completed', 'running', 'ready')
        RETURNING o.id
        "#,
    )
    .bind(directive_id)
    .bind(owner_id)
    .fetch_all(pool)
    .await?;

    Ok(rows.len() as i64)
}

// =============================================================================
// Directive Order Group (DOG) CRUD
// =============================================================================

/// Create a new Directive Order Group (DOG) for the given owner and directive.
pub async fn create_directive_order_group(
    pool: &PgPool,
    directive_id: Uuid,
    owner_id: Uuid,
    req: CreateDirectiveOrderGroupRequest,
) -> Result<DirectiveOrderGroup, sqlx::Error> {
    sqlx::query_as::<_, DirectiveOrderGroup>(
        r#"
        INSERT INTO directive_order_groups (directive_id, owner_id, name, description)
        VALUES ($1, $2, $3, $4)
        RETURNING *
        "#,
    )
    .bind(directive_id)
    .bind(owner_id)
    .bind(&req.name)
    .bind(&req.description)
    .fetch_one(pool)
    .await
}

/// List all DOGs for a given directive (owner-scoped).
pub async fn list_directive_order_groups(
    pool: &PgPool,
    directive_id: Uuid,
    owner_id: Uuid,
) -> Result<Vec<DirectiveOrderGroup>, sqlx::Error> {
    sqlx::query_as::<_, DirectiveOrderGroup>(
        r#"
        SELECT * FROM directive_order_groups
        WHERE directive_id = $1 AND owner_id = $2
        ORDER BY created_at DESC
        "#,
    )
    .bind(directive_id)
    .bind(owner_id)
    .fetch_all(pool)
    .await
}

/// Get a single DOG by ID (owner-scoped).
pub async fn get_directive_order_group(
    pool: &PgPool,
    id: Uuid,
    owner_id: Uuid,
) -> Result<Option<DirectiveOrderGroup>, sqlx::Error> {
    sqlx::query_as::<_, DirectiveOrderGroup>(
        r#"SELECT * FROM directive_order_groups WHERE id = $1 AND owner_id = $2"#,
    )
    .bind(id)
    .bind(owner_id)
    .fetch_optional(pool)
    .await
}

/// Update a DOG (owner-scoped). Uses fetch-then-update pattern for partial updates.
pub async fn update_directive_order_group(
    pool: &PgPool,
    id: Uuid,
    owner_id: Uuid,
    req: UpdateDirectiveOrderGroupRequest,
) -> Result<Option<DirectiveOrderGroup>, sqlx::Error> {
    let current = sqlx::query_as::<_, DirectiveOrderGroup>(
        r#"SELECT * FROM directive_order_groups WHERE id = $1 AND owner_id = $2"#,
    )
    .bind(id)
    .bind(owner_id)
    .fetch_optional(pool)
    .await?;

    let current = match current {
        Some(c) => c,
        None => return Ok(None),
    };

    let name = req.name.as_deref().unwrap_or(&current.name);
    let description = req.description.as_deref().or(current.description.as_deref());
    let status = req.status.as_deref().unwrap_or(&current.status);

    sqlx::query_as::<_, DirectiveOrderGroup>(
        r#"
        UPDATE directive_order_groups
        SET name = $3, description = $4, status = $5, updated_at = NOW()
        WHERE id = $1 AND owner_id = $2
        RETURNING *
        "#,
    )
    .bind(id)
    .bind(owner_id)
    .bind(name)
    .bind(description)
    .bind(status)
    .fetch_optional(pool)
    .await
}

/// Delete a DOG (owner-scoped). Returns true if a row was deleted.
pub async fn delete_directive_order_group(
    pool: &PgPool,
    id: Uuid,
    owner_id: Uuid,
) -> Result<bool, sqlx::Error> {
    let result = sqlx::query(
        r#"DELETE FROM directive_order_groups WHERE id = $1 AND owner_id = $2"#,
    )
    .bind(id)
    .bind(owner_id)
    .execute(pool)
    .await?;

    Ok(result.rows_affected() > 0)
}

/// List orders belonging to a specific DOG (owner-scoped).
pub async fn list_orders_by_dog(
    pool: &PgPool,
    dog_id: Uuid,
    owner_id: Uuid,
) -> Result<Vec<Order>, sqlx::Error> {
    sqlx::query_as::<_, Order>(
        r#"
        SELECT * FROM orders
        WHERE dog_id = $1 AND owner_id = $2
        ORDER BY created_at DESC
        "#,
    )
    .bind(dog_id)
    .bind(owner_id)
    .fetch_all(pool)
    .await
}

/// Get available orders for pickup filtered to a specific DOG.
/// Like `get_available_orders_for_pickup` but only returns orders belonging to the given DOG.
pub async fn get_available_orders_for_dog_pickup(
    pool: &PgPool,
    owner_id: Uuid,
    directive_id: Uuid,
    dog_id: Uuid,
) -> Result<Vec<Order>, sqlx::Error> {
    sqlx::query_as::<_, Order>(
        r#"
        SELECT *
        FROM orders
        WHERE owner_id = $1
          AND dog_id = $3
          AND status IN ('open', 'in_progress')
          AND (directive_id IS NULL OR directive_id = $2)
        ORDER BY CASE priority
            WHEN 'critical' THEN 0
            WHEN 'high' THEN 1
            WHEN 'medium' THEN 2
            WHEN 'low' THEN 3
            ELSE 4
        END ASC, created_at ASC
        "#,
    )
    .bind(owner_id)
    .bind(directive_id)
    .bind(dog_id)
    .fetch_all(pool)
    .await
}