diff --git a/.gitignore b/.gitignore index 2eea525..4af8698 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -.env \ No newline at end of file +.env +/logs \ No newline at end of file diff --git a/db/db.go b/db/db.go new file mode 100644 index 0000000..ee72389 --- /dev/null +++ b/db/db.go @@ -0,0 +1,54 @@ +package db + +import ( + "database/sql" + "fmt" + "log" + "os" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/joho/godotenv" +) + +// DB is the global database connection pool +var DB *sql.DB + +func InitDB() (*sql.DB, error) { + // Load environment variables from .env file + err := godotenv.Load() + if err != nil { + return nil, fmt.Errorf("error loading .env file: %v", err) + } + + // Get connection details from environment variables + connStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?parseTime=true", + os.Getenv("DB_USER"), + os.Getenv("DB_PASSWORD"), + os.Getenv("DB_HOST"), + os.Getenv("DB_PORT"), + os.Getenv("DB_NAME"), + ) + + // Open the database connection + DB, err = sql.Open("mysql", connStr) + if err != nil { + return nil, fmt.Errorf("error opening database: %v", err) + } + // Set connection pool parameters + DB.SetMaxOpenConns(100) // Maximum number of open connections to the database + DB.SetMaxIdleConns(100) // Maximum number of connections in the idle connection pool + DB.SetConnMaxLifetime(5 * time.Minute) // Maximum amount of time a connection may be reused + + // Check if the database connection is working + if err := DB.Ping(); err != nil { + log.Printf("Database connection lost: %v. Reconnecting...", err) + DB, err = InitDB() + if err != nil { + log.Fatalf("Failed to reconnect to database: %v", err) + } + } + + log.Print("Database connected successfully!") + return DB, nil +} diff --git a/go.mod b/go.mod index 3310282..96e9f88 100644 --- a/go.mod +++ b/go.mod @@ -6,3 +6,8 @@ require ( github.com/joho/godotenv v1.5.1 github.com/streadway/amqp v1.1.0 ) + +require ( + filippo.io/edwards25519 v1.1.0 // indirect + github.com/go-sql-driver/mysql v1.9.3 // indirect +) diff --git a/go.sum b/go.sum index 26256dc..095a5e7 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo= +github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= diff --git a/main.go b/main.go index 42c6cd2..f35c4d0 100644 --- a/main.go +++ b/main.go @@ -142,7 +142,6 @@ func consumeJobCompletions() { fmt.Println("Shutting down job completion consumer...") return default: - // Try to connect to RabbitMQ conn, err := amqp.Dial(rabbitURL) if err != nil { fmt.Println("Failed to connect to RabbitMQ:", err) @@ -162,10 +161,10 @@ func consumeJobCompletions() { // Declare the job_completions queue q, err := ch.QueueDeclare( "job_completions", - true, // durable - false, // auto-delete - false, // exclusive - false, // no-wait + true, + false, + false, + false, nil, ) if err != nil { @@ -176,9 +175,9 @@ func consumeJobCompletions() { // Set QoS to process one message at a time err = ch.Qos( - 1, // prefetch count - 0, // prefetch size - false, // global + 1, + 0, + false, ) if err != nil { fmt.Println("Failed to set QoS:", err) @@ -188,11 +187,11 @@ func consumeJobCompletions() { msgs, err := ch.Consume( q.Name, - "completion-consumer", // consumer tag - false, // manual ack - false, // not exclusive - false, // no-local - false, // no-wait + "completion-consumer", + false, + false, + false, + false, nil, ) if err != nil { @@ -201,7 +200,6 @@ func consumeJobCompletions() { continue } - // Start processing fmt.Println("Job completion consumer started...") for d := range msgs { var completionMsg JobCompletionMessage @@ -211,18 +209,15 @@ func consumeJobCompletions() { continue } - // Process the completion message if err := processCompletionMessage(completionMsg); err != nil { log.Printf("Failed to process completion message: %v", err) - d.Nack(false, true) // requeue the message + d.Nack(false, true) continue } - // Acknowledge successful processing d.Ack(false) } - // In case the channel is closed for any reason, loop will reconnect fmt.Println("Job completion consumer disconnected. Reconnecting...") time.Sleep(5 * time.Second) }