package main import ( "context" "crypto/sha256" "encoding/hex" "encoding/json" "fmt" "io" "log" "os" "os/signal" "syscall" "time" "github.com/joho/godotenv" "github.com/streadway/amqp" ) type JobCompletionMessage struct { JobID string `json:"job_id"` InputPath string `json:"input_path"` OutputPath string `json:"output_path"` ProcessingTime float64 `json:"processing_time_seconds"` Status string `json:"status"` Timestamp string `json:"timestamp"` FileSize int64 `json:"file_size,omitempty"` ImageDimensions string `json:"image_dimensions,omitempty"` ErrorMessage string `json:"error_message,omitempty"` } type ProcessedImageRecord struct { JobID string `json:"job_id"` InputPath string `json:"input_path"` OutputPath string `json:"output_path"` SHA256Hash string `json:"sha256_hash"` FileSize int64 `json:"file_size"` ImageDimensions string `json:"image_dimensions"` ProcessingTime float64 `json:"processing_time_seconds"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` ErrorMessage string `json:"error_message,omitempty"` } func calculateSHA256(filePath string) (string, error) { file, err := os.Open(filePath) if err != nil { return "", fmt.Errorf("failed to open file: %v", err) } defer file.Close() hasher := sha256.New() if _, err := io.Copy(hasher, file); err != nil { return "", fmt.Errorf("failed to calculate hash: %v", err) } return hex.EncodeToString(hasher.Sum(nil)), nil } func processCompletionMessage(msg JobCompletionMessage) error { log.Printf("Processing completion message for job: %s", msg.JobID) // Create the processed image record record := ProcessedImageRecord{ JobID: msg.JobID, InputPath: msg.InputPath, OutputPath: msg.OutputPath, FileSize: msg.FileSize, ImageDimensions: msg.ImageDimensions, ProcessingTime: msg.ProcessingTime, Status: msg.Status, CreatedAt: time.Now(), UpdatedAt: time.Now(), ErrorMessage: msg.ErrorMessage, } // If the job completed successfully, calculate SHA256 if msg.Status == "completed" && msg.OutputPath != "" { hash, err := calculateSHA256(msg.OutputPath) if err != nil { log.Printf("Failed to calculate SHA256 for %s: %v", msg.OutputPath, err) record.Status = "hash_calculation_failed" record.ErrorMessage = fmt.Sprintf("SHA256 calculation failed: %v", err) } else { record.SHA256Hash = hash log.Printf("Calculated SHA256 for %s: %s", msg.OutputPath, hash) } } // Here you would typically save to a database // For now, we'll just log and save to a JSON file as an example return saveToDatabase(record) } func saveToDatabase(record ProcessedImageRecord) error { // TODO: Replace this with actual database integration // This is just a placeholder that saves to a JSON file // Create logs directory if it doesn't exist if err := os.MkdirAll("./logs", 0755); err != nil { return fmt.Errorf("failed to create logs directory: %v", err) } // Append to a JSON lines file file, err := os.OpenFile("./logs/processed_images.jsonl", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { return fmt.Errorf("failed to open log file: %v", err) } defer file.Close() // Marshal record to JSON data, err := json.Marshal(record) if err != nil { return fmt.Errorf("failed to marshal record: %v", err) } // Write to file if _, err := file.Write(append(data, '\n')); err != nil { return fmt.Errorf("failed to write record: %v", err) } log.Printf("Saved record for job %s to database", record.JobID) return nil } func consumeJobCompletions() { rabbitURL := os.Getenv("RABBITMQ_URL") if rabbitURL == "" { log.Println("RABBITMQ_URL not set, using default") rabbitURL = "amqp://guest:guest@rabbitmq:5672/" } else { log.Println("Using RabbitMQ URL:", rabbitURL) } ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() for { select { case <-ctx.Done(): fmt.Println("Shutting down job completion consumer...") return default: conn, err := amqp.Dial(rabbitURL) if err != nil { fmt.Println("Failed to connect to RabbitMQ:", err) time.Sleep(5 * time.Second) continue } defer conn.Close() ch, err := conn.Channel() if err != nil { fmt.Println("Failed to open RabbitMQ channel:", err) time.Sleep(5 * time.Second) continue } defer ch.Close() // Declare the job_completions queue q, err := ch.QueueDeclare( "job_completions", true, false, false, false, nil, ) if err != nil { fmt.Println("Queue declare failed:", err) time.Sleep(5 * time.Second) continue } // Set QoS to process one message at a time err = ch.Qos( 1, 0, false, ) if err != nil { fmt.Println("Failed to set QoS:", err) time.Sleep(5 * time.Second) continue } msgs, err := ch.Consume( q.Name, "completion-consumer", false, false, false, false, nil, ) if err != nil { fmt.Println("Queue consume failed:", err) time.Sleep(5 * time.Second) continue } fmt.Println("Job completion consumer started...") for d := range msgs { var completionMsg JobCompletionMessage if err := json.Unmarshal(d.Body, &completionMsg); err != nil { log.Printf("Invalid completion message payload: %v", err) d.Nack(false, false) // discard the message continue } if err := processCompletionMessage(completionMsg); err != nil { log.Printf("Failed to process completion message: %v", err) d.Nack(false, true) continue } d.Ack(false) } fmt.Println("Job completion consumer disconnected. Reconnecting...") time.Sleep(5 * time.Second) } } } func main() { // Load environment variables err := godotenv.Load() if err != nil { log.Println(".env file not found or could not be loaded") } fmt.Println("Starting Image Processing Completion Consumer") fmt.Println("===========================================") fmt.Printf("Consumer will process job completion messages from 'job_completions' queue\n") fmt.Printf("Processed records will be saved to: ./logs/processed_images.jsonl\n\n") // Start consuming job completion messages consumeJobCompletions() }