zoobzio January 6, 2025 Edit this page

Provider Migrations

Strategies for migrating between storage providers.

Migration Patterns

PatternDowntimeComplexityUse Case
Big BangYesLowSmall datasets, dev/staging
Dual WriteNoMediumLarge datasets, production
Shadow ReadNoHighValidation before cutover

Big Bang Migration

Export from source, import to destination.

package migration

import (
    "context"
    "fmt"

    "github.com/zoobz-io/grub"
)

func MigrateStore[T any](
    ctx context.Context,
    source *grub.Store[T],
    dest *grub.Store[T],
    prefix string,
) error {
    const batchSize = 100
    marker := prefix

    for {
        keys, err := source.List(ctx, marker, batchSize)
        if err != nil {
            return fmt.Errorf("listing keys: %w", err)
        }

        if len(keys) == 0 {
            break
        }

        // Batch read
        values, err := source.GetBatch(ctx, keys)
        if err != nil {
            return fmt.Errorf("reading batch: %w", err)
        }

        // Batch write (TTL=0, preserve existing TTL separately if needed)
        if err := dest.SetBatch(ctx, values, 0); err != nil {
            return fmt.Errorf("writing batch: %w", err)
        }

        fmt.Printf("Migrated %d keys\n", len(keys))

        if len(keys) < batchSize {
            break
        }

        marker = keys[len(keys)-1] + "\x00"
    }

    return nil
}

Usage

// Migrate from Badger to Redis
sourceDB, _ := badgerdb.Open(opts)
sourceStore := grub.NewStore[Session](badger.New(sourceDB))

destClient := goredis.NewClient(&goredis.Options{Addr: "redis:6379"})
destStore := grub.NewStore[Session](redis.New(destClient))

err := MigrateStore(ctx, sourceStore, destStore, "session:")

Dual Write Pattern

Write to both stores during migration period.

package migration

import (
    "context"
    "errors"
    "log"
    "time"

    "github.com/zoobz-io/grub"
)

type DualWriteStore[T any] struct {
    primary   *grub.Store[T]
    secondary *grub.Store[T]
    readFrom  string // "primary" or "secondary"
}

func NewDualWriteStore[T any](primary, secondary *grub.Store[T]) *DualWriteStore[T] {
    return &DualWriteStore[T]{
        primary:   primary,
        secondary: secondary,
        readFrom:  "primary",
    }
}

func (d *DualWriteStore[T]) Get(ctx context.Context, key string) (*T, error) {
    if d.readFrom == "secondary" {
        return d.secondary.Get(ctx, key)
    }
    return d.primary.Get(ctx, key)
}

func (d *DualWriteStore[T]) Set(ctx context.Context, key string, val *T, ttl time.Duration) error {
    // Write to primary first
    if err := d.primary.Set(ctx, key, val, ttl); err != nil {
        return err
    }

    // Write to secondary (async, log errors)
    go func() {
        if err := d.secondary.Set(context.Background(), key, val, ttl); err != nil {
            log.Printf("secondary write failed: %v", err)
        }
    }()

    return nil
}

func (d *DualWriteStore[T]) Delete(ctx context.Context, key string) error {
    err1 := d.primary.Delete(ctx, key)
    err2 := d.secondary.Delete(ctx, key)

    // Return primary error, log secondary
    if err2 != nil && !errors.Is(err2, grub.ErrNotFound) {
        log.Printf("secondary delete failed: %v", err2)
    }

    return err1
}

func (d *DualWriteStore[T]) SwitchToSecondary() {
    d.readFrom = "secondary"
}

func (d *DualWriteStore[T]) Exists(ctx context.Context, key string) (bool, error) {
    if d.readFrom == "secondary" {
        return d.secondary.Exists(ctx, key)
    }
    return d.primary.Exists(ctx, key)
}

func (d *DualWriteStore[T]) List(ctx context.Context, prefix string, limit int) ([]string, error) {
    if d.readFrom == "secondary" {
        return d.secondary.List(ctx, prefix, limit)
    }
    return d.primary.List(ctx, prefix, limit)
}

Migration Steps

// 1. Start dual writes
oldStore := grub.NewStore[Session](badger.New(db))
newStore := grub.NewStore[Session](redis.New(client))
dualStore := NewDualWriteStore(oldStore, newStore)

// 2. Migrate existing data
MigrateStore(ctx, oldStore, newStore, "session:")

// 3. Verify data consistency
ValidateStores(ctx, oldStore, newStore, "session:")

// 4. Switch reads to new store
dualStore.SwitchToSecondary()

// 5. After confidence period, switch to single store
sessionStore := newStore

Shadow Read Pattern

Read from both and compare to validate before cutover.

package migration

import (
    "context"
    "log"
    "reflect"
    "time"

    "github.com/zoobz-io/grub"
)

type ShadowReadStore[T any] struct {
    primary   *grub.Store[T]
    shadow    *grub.Store[T]
    compare   func(*T, *T) bool
    onMismatch func(key string, primary, shadow *T)
}

func (s *ShadowReadStore[T]) Get(ctx context.Context, key string) (*T, error) {
    // Read from primary (authoritative)
    primaryVal, err := s.primary.Get(ctx, key)

    // Shadow read (async, for comparison)
    go func() {
        shadowVal, shadowErr := s.shadow.Get(context.Background(), key)

        // Compare results
        if err != shadowErr {
            log.Printf("shadow: error mismatch for %s: primary=%v shadow=%v", key, err, shadowErr)
            return
        }

        if err == nil && !s.compare(primaryVal, shadowVal) {
            s.onMismatch(key, primaryVal, shadowVal)
        }
    }()

    return primaryVal, err
}

Usage

shadowStore := &ShadowReadStore[User]{
    primary: oldStore,
    shadow:  newStore,
    compare: func(a, b *User) bool {
        return reflect.DeepEqual(a, b)
    },
    onMismatch: func(key string, primary, shadow *User) {
        log.Printf("MISMATCH %s: primary=%+v shadow=%+v", key, primary, shadow)
        metrics.Increment("migration.mismatch")
    },
}

Validation Utilities

Compare Stores

func ValidateStores[T any](
    ctx context.Context,
    source *grub.Store[T],
    dest *grub.Store[T],
    prefix string,
) (int, []string, error) {
    const batchSize = 100
    var mismatches []string
    total := 0
    marker := prefix

    for {
        keys, err := source.List(ctx, marker, batchSize)
        if err != nil {
            return 0, nil, err
        }

        if len(keys) == 0 {
            break
        }

        sourceVals, _ := source.GetBatch(ctx, keys)
        destVals, _ := dest.GetBatch(ctx, keys)

        for key, sourceVal := range sourceVals {
            total++
            destVal, exists := destVals[key]
            if !exists {
                mismatches = append(mismatches, key+" (missing)")
                continue
            }
            if !reflect.DeepEqual(sourceVal, destVal) {
                mismatches = append(mismatches, key+" (different)")
            }
        }

        if len(keys) < batchSize {
            break
        }
        marker = keys[len(keys)-1] + "\x00"
    }

    return total, mismatches, nil
}

Sync Missing Keys

func SyncMissing[T any](
    ctx context.Context,
    source *grub.Store[T],
    dest *grub.Store[T],
    keys []string,
) error {
    for _, key := range keys {
        val, err := source.Get(ctx, key)
        if err != nil {
            continue
        }
        if err := dest.Set(ctx, key, val, 0); err != nil {
            return err
        }
    }
    return nil
}

Bucket Migration

Migrate blob storage between providers.

func MigrateBucket[T any](
    ctx context.Context,
    source *grub.Bucket[T],
    dest *grub.Bucket[T],
    prefix string,
) error {
    const batchSize = 50
    marker := prefix

    for {
        infos, err := source.List(ctx, marker, batchSize)
        if err != nil {
            return err
        }

        if len(infos) == 0 {
            break
        }

        for _, info := range infos {
            obj, err := source.Get(ctx, info.Key)
            if err != nil {
                return fmt.Errorf("reading %s: %w", info.Key, err)
            }

            if err := dest.Put(ctx, obj); err != nil {
                return fmt.Errorf("writing %s: %w", info.Key, err)
            }

            fmt.Printf("Migrated %s (%d bytes)\n", info.Key, info.Size)
        }

        if len(infos) < batchSize {
            break
        }
        marker = infos[len(infos)-1].Key
    }

    return nil
}

Database Migration

For SQL databases, migrations typically involve schema changes rather than provider changes.

Provider-Agnostic Schema

Use grub's Database wrapper with different renderers:

// Development: SQLite
devDB := grub.NewDatabase[User](sqliteConn, "users", sqlite.New())

// Production: PostgreSQL
prodDB := grub.NewDatabase[User](pgConn, "users", postgres.New())

Data Export/Import

func ExportTable[T any](ctx context.Context, db *grub.Database[T]) ([]*T, error) {
    return db.ExecQuery(ctx, grub.QueryAll, nil)
}

func ImportTable[T any](ctx context.Context, db *grub.Database[T], records []*T, keyFn func(*T) string) error {
    for _, record := range records {
        if err := db.Set(ctx, keyFn(record), record); err != nil {
            return err
        }
    }
    return nil
}

Rollback Strategy

Always plan for rollback.

type MigrationState struct {
    Phase           string    // "dual_write", "shadow_read", "cutover", "complete"
    StartedAt       time.Time
    CutoverAt       *time.Time
    RollbackAt      *time.Time
    KeysMigrated    int
    KeysValidated   int
    MismatchCount   int
}

func (m *MigrationState) CanRollback() bool {
    // Can rollback if dual write is still active
    return m.Phase == "dual_write" || m.Phase == "shadow_read"
}

func (m *MigrationState) Rollback() {
    m.Phase = "rolled_back"
    now := time.Now()
    m.RollbackAt = &now
}

Checklist

Before Migration

  • Estimate data volume and migration time
  • Plan maintenance window (if big bang)
  • Set up monitoring for dual write errors
  • Test migration on staging
  • Document rollback procedure

During Migration

  • Monitor error rates
  • Check replication lag (if applicable)
  • Validate sample records
  • Watch resource utilization

After Migration

  • Run full validation
  • Monitor for mismatches
  • Keep old store available for rollback period
  • Clean up old infrastructure after confidence period