init commit for consumer
This commit is contained in:
@@ -0,0 +1,246 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/joho/godotenv"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
type JobCompletionMessage struct {
|
||||
JobID string `json:"job_id"`
|
||||
InputPath string `json:"input_path"`
|
||||
OutputPath string `json:"output_path"`
|
||||
ProcessingTime float64 `json:"processing_time_seconds"`
|
||||
Status string `json:"status"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
FileSize int64 `json:"file_size,omitempty"`
|
||||
ImageDimensions string `json:"image_dimensions,omitempty"`
|
||||
ErrorMessage string `json:"error_message,omitempty"`
|
||||
}
|
||||
|
||||
type ProcessedImageRecord struct {
|
||||
JobID string `json:"job_id"`
|
||||
InputPath string `json:"input_path"`
|
||||
OutputPath string `json:"output_path"`
|
||||
SHA256Hash string `json:"sha256_hash"`
|
||||
FileSize int64 `json:"file_size"`
|
||||
ImageDimensions string `json:"image_dimensions"`
|
||||
ProcessingTime float64 `json:"processing_time_seconds"`
|
||||
Status string `json:"status"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
ErrorMessage string `json:"error_message,omitempty"`
|
||||
}
|
||||
|
||||
func calculateSHA256(filePath string) (string, error) {
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to open file: %v", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
hasher := sha256.New()
|
||||
if _, err := io.Copy(hasher, file); err != nil {
|
||||
return "", fmt.Errorf("failed to calculate hash: %v", err)
|
||||
}
|
||||
|
||||
return hex.EncodeToString(hasher.Sum(nil)), nil
|
||||
}
|
||||
|
||||
func processCompletionMessage(msg JobCompletionMessage) error {
|
||||
log.Printf("Processing completion message for job: %s", msg.JobID)
|
||||
|
||||
// Create the processed image record
|
||||
record := ProcessedImageRecord{
|
||||
JobID: msg.JobID,
|
||||
InputPath: msg.InputPath,
|
||||
OutputPath: msg.OutputPath,
|
||||
FileSize: msg.FileSize,
|
||||
ImageDimensions: msg.ImageDimensions,
|
||||
ProcessingTime: msg.ProcessingTime,
|
||||
Status: msg.Status,
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
ErrorMessage: msg.ErrorMessage,
|
||||
}
|
||||
|
||||
// If the job completed successfully, calculate SHA256
|
||||
if msg.Status == "completed" && msg.OutputPath != "" {
|
||||
hash, err := calculateSHA256(msg.OutputPath)
|
||||
if err != nil {
|
||||
log.Printf("Failed to calculate SHA256 for %s: %v", msg.OutputPath, err)
|
||||
record.Status = "hash_calculation_failed"
|
||||
record.ErrorMessage = fmt.Sprintf("SHA256 calculation failed: %v", err)
|
||||
} else {
|
||||
record.SHA256Hash = hash
|
||||
log.Printf("Calculated SHA256 for %s: %s", msg.OutputPath, hash)
|
||||
}
|
||||
}
|
||||
|
||||
// Here you would typically save to a database
|
||||
// For now, we'll just log and save to a JSON file as an example
|
||||
return saveToDatabase(record)
|
||||
}
|
||||
|
||||
func saveToDatabase(record ProcessedImageRecord) error {
|
||||
// TODO: Replace this with actual database integration
|
||||
// This is just a placeholder that saves to a JSON file
|
||||
|
||||
// Create logs directory if it doesn't exist
|
||||
if err := os.MkdirAll("./logs", 0755); err != nil {
|
||||
return fmt.Errorf("failed to create logs directory: %v", err)
|
||||
}
|
||||
|
||||
// Append to a JSON lines file
|
||||
file, err := os.OpenFile("./logs/processed_images.jsonl", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open log file: %v", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Marshal record to JSON
|
||||
data, err := json.Marshal(record)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal record: %v", err)
|
||||
}
|
||||
|
||||
// Write to file
|
||||
if _, err := file.Write(append(data, '\n')); err != nil {
|
||||
return fmt.Errorf("failed to write record: %v", err)
|
||||
}
|
||||
|
||||
log.Printf("Saved record for job %s to database", record.JobID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func consumeJobCompletions() {
|
||||
rabbitURL := os.Getenv("RABBITMQ_URL")
|
||||
if rabbitURL == "" {
|
||||
log.Println("RABBITMQ_URL not set, using default")
|
||||
rabbitURL = "amqp://guest:guest@rabbitmq:5672/"
|
||||
} else {
|
||||
log.Println("Using RabbitMQ URL:", rabbitURL)
|
||||
}
|
||||
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fmt.Println("Shutting down job completion consumer...")
|
||||
return
|
||||
default:
|
||||
// Try to connect to RabbitMQ
|
||||
conn, err := amqp.Dial(rabbitURL)
|
||||
if err != nil {
|
||||
fmt.Println("Failed to connect to RabbitMQ:", err)
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
fmt.Println("Failed to open RabbitMQ channel:", err)
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
defer ch.Close()
|
||||
|
||||
// Declare the job_completions queue
|
||||
q, err := ch.QueueDeclare(
|
||||
"job_completions",
|
||||
true, // durable
|
||||
false, // auto-delete
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Println("Queue declare failed:", err)
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
// Set QoS to process one message at a time
|
||||
err = ch.Qos(
|
||||
1, // prefetch count
|
||||
0, // prefetch size
|
||||
false, // global
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Println("Failed to set QoS:", err)
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
msgs, err := ch.Consume(
|
||||
q.Name,
|
||||
"completion-consumer", // consumer tag
|
||||
false, // manual ack
|
||||
false, // not exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Println("Queue consume failed:", err)
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
// Start processing
|
||||
fmt.Println("Job completion consumer started...")
|
||||
for d := range msgs {
|
||||
var completionMsg JobCompletionMessage
|
||||
if err := json.Unmarshal(d.Body, &completionMsg); err != nil {
|
||||
log.Printf("Invalid completion message payload: %v", err)
|
||||
d.Nack(false, false) // discard the message
|
||||
continue
|
||||
}
|
||||
|
||||
// Process the completion message
|
||||
if err := processCompletionMessage(completionMsg); err != nil {
|
||||
log.Printf("Failed to process completion message: %v", err)
|
||||
d.Nack(false, true) // requeue the message
|
||||
continue
|
||||
}
|
||||
|
||||
// Acknowledge successful processing
|
||||
d.Ack(false)
|
||||
}
|
||||
|
||||
// In case the channel is closed for any reason, loop will reconnect
|
||||
fmt.Println("Job completion consumer disconnected. Reconnecting...")
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Load environment variables
|
||||
err := godotenv.Load()
|
||||
if err != nil {
|
||||
log.Println(".env file not found or could not be loaded")
|
||||
}
|
||||
|
||||
fmt.Println("Starting Image Processing Completion Consumer")
|
||||
fmt.Println("===========================================")
|
||||
fmt.Printf("Consumer will process job completion messages from 'job_completions' queue\n")
|
||||
fmt.Printf("Processed records will be saved to: ./logs/processed_images.jsonl\n\n")
|
||||
|
||||
// Start consuming job completion messages
|
||||
consumeJobCompletions()
|
||||
}
|
||||
Reference in New Issue
Block a user