This commit is contained in:
F04C
2025-07-09 07:50:14 +08:00
parent 516c1253bb
commit 3624def617
5 changed files with 78 additions and 19 deletions
+13 -18
View File
@@ -142,7 +142,6 @@ func consumeJobCompletions() {
fmt.Println("Shutting down job completion consumer...")
return
default:
// Try to connect to RabbitMQ
conn, err := amqp.Dial(rabbitURL)
if err != nil {
fmt.Println("Failed to connect to RabbitMQ:", err)
@@ -162,10 +161,10 @@ func consumeJobCompletions() {
// Declare the job_completions queue
q, err := ch.QueueDeclare(
"job_completions",
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
true,
false,
false,
false,
nil,
)
if err != nil {
@@ -176,9 +175,9 @@ func consumeJobCompletions() {
// Set QoS to process one message at a time
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
1,
0,
false,
)
if err != nil {
fmt.Println("Failed to set QoS:", err)
@@ -188,11 +187,11 @@ func consumeJobCompletions() {
msgs, err := ch.Consume(
q.Name,
"completion-consumer", // consumer tag
false, // manual ack
false, // not exclusive
false, // no-local
false, // no-wait
"completion-consumer",
false,
false,
false,
false,
nil,
)
if err != nil {
@@ -201,7 +200,6 @@ func consumeJobCompletions() {
continue
}
// Start processing
fmt.Println("Job completion consumer started...")
for d := range msgs {
var completionMsg JobCompletionMessage
@@ -211,18 +209,15 @@ func consumeJobCompletions() {
continue
}
// Process the completion message
if err := processCompletionMessage(completionMsg); err != nil {
log.Printf("Failed to process completion message: %v", err)
d.Nack(false, true) // requeue the message
d.Nack(false, true)
continue
}
// Acknowledge successful processing
d.Ack(false)
}
// In case the channel is closed for any reason, loop will reconnect
fmt.Println("Job completion consumer disconnected. Reconnecting...")
time.Sleep(5 * time.Second)
}