modified redis for horizontal scaling

This commit is contained in:
2025-12-16 14:41:32 +08:00
parent 5966901eb5
commit d385044237
2 changed files with 671 additions and 21 deletions
+165 -21
View File
@@ -2,15 +2,41 @@ package services
import (
"authorization/models"
"authorization/redisclient"
"authorization/repository"
"context"
"encoding/json"
"fmt"
"log"
"sync"
"time"
)
// getCachedUserAttributes retrieves user attributes with caching
const (
permissionCachePrefix = "authz:perm:"
policyCachePrefix = "authz:policy:"
userAttrCachePrefix = "authz:userattr:"
cacheTTL = 30 * time.Second
)
// getCachedUserAttributes retrieves user attributes from Redis or DB
func getCachedUserAttributes(s *models.CachedAuthorizationService, userID string) (map[string]string, error) {
// Check cache first
// Try Redis first if available
if redisclient.RDB != nil {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
key := userAttrCachePrefix + userID
val, err := redisclient.RDB.Get(ctx, key).Result()
if err == nil {
var attrs map[string]string
if json.Unmarshal([]byte(val), &attrs) == nil {
return attrs, nil
}
}
}
// Fallback to local cache for backward compatibility
userAttrMutex := s.UserAttrMutex.(*sync.RWMutex)
userAttrMutex.RLock()
attrs, exists := s.UserAttrCache[userID]
@@ -26,7 +52,18 @@ func getCachedUserAttributes(s *models.CachedAuthorizationService, userID string
return nil, err
}
// Store in cache
// Store in both Redis and local cache
if redisclient.RDB != nil {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
attrsJSON, _ := json.Marshal(attrs)
key := userAttrCachePrefix + userID
redisclient.RDB.Set(ctx, key, attrsJSON, cacheTTL)
}()
}
userAttrMutex.Lock()
s.UserAttrCache[userID] = attrs
userAttrMutex.Unlock()
@@ -34,21 +71,75 @@ func getCachedUserAttributes(s *models.CachedAuthorizationService, userID string
return attrs, nil
}
// refreshCache reloads permissions and policies from database
// getPermissionFromCache retrieves permission from Redis or local cache
func getPermissionFromCache(s *models.CachedAuthorizationService, cacheKey string) (*models.Permission, bool) {
// Try Redis first if available
if redisclient.RDB != nil {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
key := permissionCachePrefix + cacheKey
val, err := redisclient.RDB.Get(ctx, key).Result()
if err == nil {
var perm models.Permission
if json.Unmarshal([]byte(val), &perm) == nil {
return &perm, true
}
}
}
// Fallback to local cache
cacheMutex := s.CacheMutex.(*sync.RWMutex)
cacheMutex.RLock()
permission, exists := s.PermissionCache[cacheKey]
cacheMutex.RUnlock()
return permission, exists
}
// getPoliciesFromCache retrieves policies from Redis or local cache
func getPoliciesFromCache(s *models.CachedAuthorizationService, permissionID int) []models.PolicyAttribute {
// Try Redis first if available
if redisclient.RDB != nil {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
key := fmt.Sprintf("%s%d", policyCachePrefix, permissionID)
val, err := redisclient.RDB.Get(ctx, key).Result()
if err == nil {
var policies []models.PolicyAttribute
if json.Unmarshal([]byte(val), &policies) == nil {
return policies
}
}
}
// Fallback to local cache
cacheMutex := s.CacheMutex.(*sync.RWMutex)
cacheMutex.RLock()
policies := s.PolicyCache[permissionID]
cacheMutex.RUnlock()
return policies
}
// refreshCache reloads permissions and policies from database and stores in Redis
func refreshCache(s *models.CachedAuthorizationService) {
// Load all permissions
permissions, err := repository.GetAllPermissions()
if err != nil {
log.Printf("ERROR: Failed to refresh permissions cache: %v", err)
return
}
// Load all policies
policies, err := repository.GetAllPolicyAttributes()
if err != nil {
log.Printf("ERROR: Failed to refresh policies cache: %v", err)
return
}
// Update cache atomically
// Update local cache atomically
newPermCache := make(map[string]*models.Permission)
for i := range permissions {
perm := &permissions[i]
@@ -62,6 +153,31 @@ func refreshCache(s *models.CachedAuthorizationService) {
s.PolicyCache = policies
s.LastCacheRefresh = time.Now()
cacheMutex.Unlock()
// Store in Redis for distributed access (non-blocking)
if redisclient.RDB != nil {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// Store permissions in Redis
for key, perm := range newPermCache {
permJSON, _ := json.Marshal(perm)
redisKey := permissionCachePrefix + key
redisclient.RDB.Set(ctx, redisKey, permJSON, cacheTTL)
}
// Store policies in Redis
for permID, policyList := range policies {
policiesJSON, _ := json.Marshal(policyList)
redisKey := fmt.Sprintf("%s%d", policyCachePrefix, permID)
redisclient.RDB.Set(ctx, redisKey, policiesJSON, cacheTTL)
}
log.Printf("INFO: Cache refreshed and synced to Redis - %d permissions, %d policy groups",
len(newPermCache), len(policies))
}()
}
}
// cleanUserAttributeCache removes old user attribute cache entries
@@ -70,8 +186,7 @@ func cleanUserAttributeCache(s *models.CachedAuthorizationService) {
userAttrMutex.Lock()
defer userAttrMutex.Unlock()
// Clear all user attributes to prevent stale data
// In production, you might want a more sophisticated TTL approach
// Clear local cache if too large (Redis handles its own TTL)
if len(s.UserAttrCache) > 10000 {
s.UserAttrCache = make(map[string]map[string]string)
}
@@ -108,16 +223,13 @@ func NewCachedAuthorizationService() *models.CachedAuthorizationService {
return service
}
// AuthorizeWithCache performs cached RBAC + ABAC authorization
// AuthorizeWithCache performs cached RBAC + ABAC authorization with distributed caching
func AuthorizeWithCache(s *models.CachedAuthorizationService, ctx *models.AuthorizationContext) (*models.AuthorizationResult, error) {
startTime := time.Now()
// Step 1: Get permission from cache
// Step 1: Get permission from distributed cache
cacheKey := ctx.Resource + ":" + ctx.Action
cacheMutex := s.CacheMutex.(*sync.RWMutex)
cacheMutex.RLock()
permission, exists := s.PermissionCache[cacheKey]
cacheMutex.RUnlock()
permission, exists := getPermissionFromCache(s, cacheKey)
log.Print("Cached authorization lookup for user=", ctx.UserID, ", resource=", ctx.Resource, ", action=", ctx.Action)
if !exists {
@@ -127,7 +239,7 @@ func AuthorizeWithCache(s *models.CachedAuthorizationService, ctx *models.Author
}, nil
}
// Step 2: Get user attributes (with cache)
// Step 2: Get user attributes (with distributed cache)
userAttrs, err := getCachedUserAttributes(s, ctx.UserID)
if err != nil {
return &models.AuthorizationResult{
@@ -137,10 +249,8 @@ func AuthorizeWithCache(s *models.CachedAuthorizationService, ctx *models.Author
}
ctx.UserAttributes = userAttrs
// Step 3: Get policies from cache
cacheMutex.RLock()
policies := s.PolicyCache[permission.ID]
cacheMutex.RUnlock()
// Step 3: Get policies from distributed cache
policies := getPoliciesFromCache(s, permission.ID)
// Step 4: Evaluate policies
allowed, reason := EvaluatePolicies(policies, ctx)
@@ -171,15 +281,32 @@ func AuthorizeWithCache(s *models.CachedAuthorizationService, ctx *models.Author
return result, nil
}
// InvalidateUserCache clears cache for a specific user
// InvalidateUserCache clears cache for a specific user from both Redis and local cache
func InvalidateUserCache(s *models.CachedAuthorizationService, userID string) {
// Clear from Redis
if redisclient.RDB != nil {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
key := userAttrCachePrefix + userID
redisclient.RDB.Del(ctx, key)
}()
}
// Clear from local cache
userAttrMutex := s.UserAttrMutex.(*sync.RWMutex)
userAttrMutex.Lock()
delete(s.UserAttrCache, userID)
userAttrMutex.Unlock()
}
// GetCacheStats returns cache statistics
// RefreshCacheNow forces an immediate cache refresh (useful for admin endpoints)
func RefreshCacheNow(s *models.CachedAuthorizationService) {
refreshCache(s)
}
// GetCacheStats returns cache statistics including Redis availability
func GetCacheStats(s *models.CachedAuthorizationService) map[string]interface{} {
cacheMutex := s.CacheMutex.(*sync.RWMutex)
userAttrMutex := s.UserAttrMutex.(*sync.RWMutex)
@@ -188,11 +315,28 @@ func GetCacheStats(s *models.CachedAuthorizationService) map[string]interface{}
defer cacheMutex.RUnlock()
defer userAttrMutex.RUnlock()
return map[string]interface{}{
stats := map[string]interface{}{
"permissions_cached": len(s.PermissionCache),
"policies_cached": len(s.PolicyCache),
"user_attributes_cached": len(s.UserAttrCache),
"last_refresh": s.LastCacheRefresh,
"cache_age_seconds": time.Since(s.LastCacheRefresh).Seconds(),
"distributed_cache": redisclient.RDB != nil,
}
// Add Redis cache stats if available
if redisclient.RDB != nil {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
// Try to get Redis info
if info, err := redisclient.RDB.Info(ctx, "stats").Result(); err == nil {
stats["redis_available"] = true
stats["redis_info"] = info
} else {
stats["redis_available"] = false
}
}
return stats
}