From 516c1253bbf4f81fb389f6791f04988871ad9976 Mon Sep 17 00:00:00 2001 From: F04C Date: Tue, 8 Jul 2025 13:25:57 +0800 Subject: [PATCH] init commit for consumer --- .gitignore | 1 + go.mod | 8 ++ go.sum | 4 + main.go | 246 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 259 insertions(+) create mode 100644 .gitignore create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2eea525 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.env \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3310282 --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module consumer + +go 1.23.2 + +require ( + github.com/joho/godotenv v1.5.1 + github.com/streadway/amqp v1.1.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..26256dc --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= +github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg= diff --git a/main.go b/main.go new file mode 100644 index 0000000..42c6cd2 --- /dev/null +++ b/main.go @@ -0,0 +1,246 @@ +package main + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "log" + "os" + "os/signal" + "syscall" + "time" + + "github.com/joho/godotenv" + "github.com/streadway/amqp" +) + +type JobCompletionMessage struct { + JobID string `json:"job_id"` + InputPath string `json:"input_path"` + OutputPath string `json:"output_path"` + ProcessingTime float64 `json:"processing_time_seconds"` + Status string `json:"status"` + Timestamp string `json:"timestamp"` + FileSize int64 `json:"file_size,omitempty"` + ImageDimensions string `json:"image_dimensions,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` +} + +type ProcessedImageRecord struct { + JobID string `json:"job_id"` + InputPath string `json:"input_path"` + OutputPath string `json:"output_path"` + SHA256Hash string `json:"sha256_hash"` + FileSize int64 `json:"file_size"` + ImageDimensions string `json:"image_dimensions"` + ProcessingTime float64 `json:"processing_time_seconds"` + Status string `json:"status"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + ErrorMessage string `json:"error_message,omitempty"` +} + +func calculateSHA256(filePath string) (string, error) { + file, err := os.Open(filePath) + if err != nil { + return "", fmt.Errorf("failed to open file: %v", err) + } + defer file.Close() + + hasher := sha256.New() + if _, err := io.Copy(hasher, file); err != nil { + return "", fmt.Errorf("failed to calculate hash: %v", err) + } + + return hex.EncodeToString(hasher.Sum(nil)), nil +} + +func processCompletionMessage(msg JobCompletionMessage) error { + log.Printf("Processing completion message for job: %s", msg.JobID) + + // Create the processed image record + record := ProcessedImageRecord{ + JobID: msg.JobID, + InputPath: msg.InputPath, + OutputPath: msg.OutputPath, + FileSize: msg.FileSize, + ImageDimensions: msg.ImageDimensions, + ProcessingTime: msg.ProcessingTime, + Status: msg.Status, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + ErrorMessage: msg.ErrorMessage, + } + + // If the job completed successfully, calculate SHA256 + if msg.Status == "completed" && msg.OutputPath != "" { + hash, err := calculateSHA256(msg.OutputPath) + if err != nil { + log.Printf("Failed to calculate SHA256 for %s: %v", msg.OutputPath, err) + record.Status = "hash_calculation_failed" + record.ErrorMessage = fmt.Sprintf("SHA256 calculation failed: %v", err) + } else { + record.SHA256Hash = hash + log.Printf("Calculated SHA256 for %s: %s", msg.OutputPath, hash) + } + } + + // Here you would typically save to a database + // For now, we'll just log and save to a JSON file as an example + return saveToDatabase(record) +} + +func saveToDatabase(record ProcessedImageRecord) error { + // TODO: Replace this with actual database integration + // This is just a placeholder that saves to a JSON file + + // Create logs directory if it doesn't exist + if err := os.MkdirAll("./logs", 0755); err != nil { + return fmt.Errorf("failed to create logs directory: %v", err) + } + + // Append to a JSON lines file + file, err := os.OpenFile("./logs/processed_images.jsonl", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return fmt.Errorf("failed to open log file: %v", err) + } + defer file.Close() + + // Marshal record to JSON + data, err := json.Marshal(record) + if err != nil { + return fmt.Errorf("failed to marshal record: %v", err) + } + + // Write to file + if _, err := file.Write(append(data, '\n')); err != nil { + return fmt.Errorf("failed to write record: %v", err) + } + + log.Printf("Saved record for job %s to database", record.JobID) + return nil +} + +func consumeJobCompletions() { + rabbitURL := os.Getenv("RABBITMQ_URL") + if rabbitURL == "" { + log.Println("RABBITMQ_URL not set, using default") + rabbitURL = "amqp://guest:guest@rabbitmq:5672/" + } else { + log.Println("Using RabbitMQ URL:", rabbitURL) + } + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + for { + select { + case <-ctx.Done(): + fmt.Println("Shutting down job completion consumer...") + return + default: + // Try to connect to RabbitMQ + conn, err := amqp.Dial(rabbitURL) + if err != nil { + fmt.Println("Failed to connect to RabbitMQ:", err) + time.Sleep(5 * time.Second) + continue + } + defer conn.Close() + + ch, err := conn.Channel() + if err != nil { + fmt.Println("Failed to open RabbitMQ channel:", err) + time.Sleep(5 * time.Second) + continue + } + defer ch.Close() + + // Declare the job_completions queue + q, err := ch.QueueDeclare( + "job_completions", + true, // durable + false, // auto-delete + false, // exclusive + false, // no-wait + nil, + ) + if err != nil { + fmt.Println("Queue declare failed:", err) + time.Sleep(5 * time.Second) + continue + } + + // Set QoS to process one message at a time + err = ch.Qos( + 1, // prefetch count + 0, // prefetch size + false, // global + ) + if err != nil { + fmt.Println("Failed to set QoS:", err) + time.Sleep(5 * time.Second) + continue + } + + msgs, err := ch.Consume( + q.Name, + "completion-consumer", // consumer tag + false, // manual ack + false, // not exclusive + false, // no-local + false, // no-wait + nil, + ) + if err != nil { + fmt.Println("Queue consume failed:", err) + time.Sleep(5 * time.Second) + continue + } + + // Start processing + fmt.Println("Job completion consumer started...") + for d := range msgs { + var completionMsg JobCompletionMessage + if err := json.Unmarshal(d.Body, &completionMsg); err != nil { + log.Printf("Invalid completion message payload: %v", err) + d.Nack(false, false) // discard the message + continue + } + + // Process the completion message + if err := processCompletionMessage(completionMsg); err != nil { + log.Printf("Failed to process completion message: %v", err) + d.Nack(false, true) // requeue the message + continue + } + + // Acknowledge successful processing + d.Ack(false) + } + + // In case the channel is closed for any reason, loop will reconnect + fmt.Println("Job completion consumer disconnected. Reconnecting...") + time.Sleep(5 * time.Second) + } + } +} + +func main() { + // Load environment variables + err := godotenv.Load() + if err != nil { + log.Println(".env file not found or could not be loaded") + } + + fmt.Println("Starting Image Processing Completion Consumer") + fmt.Println("===========================================") + fmt.Printf("Consumer will process job completion messages from 'job_completions' queue\n") + fmt.Printf("Processed records will be saved to: ./logs/processed_images.jsonl\n\n") + + // Start consuming job completion messages + consumeJobCompletions() +}