Provider Migrations
Strategies for migrating between storage providers.
Migration Patterns
| Pattern | Downtime | Complexity | Use Case |
|---|---|---|---|
| Big Bang | Yes | Low | Small datasets, dev/staging |
| Dual Write | No | Medium | Large datasets, production |
| Shadow Read | No | High | Validation before cutover |
Big Bang Migration
Export from source, import to destination.
package migration
import (
"context"
"fmt"
"github.com/zoobz-io/grub"
)
func MigrateStore[T any](
ctx context.Context,
source *grub.Store[T],
dest *grub.Store[T],
prefix string,
) error {
const batchSize = 100
marker := prefix
for {
keys, err := source.List(ctx, marker, batchSize)
if err != nil {
return fmt.Errorf("listing keys: %w", err)
}
if len(keys) == 0 {
break
}
// Batch read
values, err := source.GetBatch(ctx, keys)
if err != nil {
return fmt.Errorf("reading batch: %w", err)
}
// Batch write (TTL=0, preserve existing TTL separately if needed)
if err := dest.SetBatch(ctx, values, 0); err != nil {
return fmt.Errorf("writing batch: %w", err)
}
fmt.Printf("Migrated %d keys\n", len(keys))
if len(keys) < batchSize {
break
}
marker = keys[len(keys)-1] + "\x00"
}
return nil
}
Usage
// Migrate from Badger to Redis
sourceDB, _ := badgerdb.Open(opts)
sourceStore := grub.NewStore[Session](badger.New(sourceDB))
destClient := goredis.NewClient(&goredis.Options{Addr: "redis:6379"})
destStore := grub.NewStore[Session](redis.New(destClient))
err := MigrateStore(ctx, sourceStore, destStore, "session:")
Dual Write Pattern
Write to both stores during migration period.
package migration
import (
"context"
"errors"
"log"
"time"
"github.com/zoobz-io/grub"
)
type DualWriteStore[T any] struct {
primary *grub.Store[T]
secondary *grub.Store[T]
readFrom string // "primary" or "secondary"
}
func NewDualWriteStore[T any](primary, secondary *grub.Store[T]) *DualWriteStore[T] {
return &DualWriteStore[T]{
primary: primary,
secondary: secondary,
readFrom: "primary",
}
}
func (d *DualWriteStore[T]) Get(ctx context.Context, key string) (*T, error) {
if d.readFrom == "secondary" {
return d.secondary.Get(ctx, key)
}
return d.primary.Get(ctx, key)
}
func (d *DualWriteStore[T]) Set(ctx context.Context, key string, val *T, ttl time.Duration) error {
// Write to primary first
if err := d.primary.Set(ctx, key, val, ttl); err != nil {
return err
}
// Write to secondary (async, log errors)
go func() {
if err := d.secondary.Set(context.Background(), key, val, ttl); err != nil {
log.Printf("secondary write failed: %v", err)
}
}()
return nil
}
func (d *DualWriteStore[T]) Delete(ctx context.Context, key string) error {
err1 := d.primary.Delete(ctx, key)
err2 := d.secondary.Delete(ctx, key)
// Return primary error, log secondary
if err2 != nil && !errors.Is(err2, grub.ErrNotFound) {
log.Printf("secondary delete failed: %v", err2)
}
return err1
}
func (d *DualWriteStore[T]) SwitchToSecondary() {
d.readFrom = "secondary"
}
func (d *DualWriteStore[T]) Exists(ctx context.Context, key string) (bool, error) {
if d.readFrom == "secondary" {
return d.secondary.Exists(ctx, key)
}
return d.primary.Exists(ctx, key)
}
func (d *DualWriteStore[T]) List(ctx context.Context, prefix string, limit int) ([]string, error) {
if d.readFrom == "secondary" {
return d.secondary.List(ctx, prefix, limit)
}
return d.primary.List(ctx, prefix, limit)
}
Migration Steps
// 1. Start dual writes
oldStore := grub.NewStore[Session](badger.New(db))
newStore := grub.NewStore[Session](redis.New(client))
dualStore := NewDualWriteStore(oldStore, newStore)
// 2. Migrate existing data
MigrateStore(ctx, oldStore, newStore, "session:")
// 3. Verify data consistency
ValidateStores(ctx, oldStore, newStore, "session:")
// 4. Switch reads to new store
dualStore.SwitchToSecondary()
// 5. After confidence period, switch to single store
sessionStore := newStore
Shadow Read Pattern
Read from both and compare to validate before cutover.
package migration
import (
"context"
"log"
"reflect"
"time"
"github.com/zoobz-io/grub"
)
type ShadowReadStore[T any] struct {
primary *grub.Store[T]
shadow *grub.Store[T]
compare func(*T, *T) bool
onMismatch func(key string, primary, shadow *T)
}
func (s *ShadowReadStore[T]) Get(ctx context.Context, key string) (*T, error) {
// Read from primary (authoritative)
primaryVal, err := s.primary.Get(ctx, key)
// Shadow read (async, for comparison)
go func() {
shadowVal, shadowErr := s.shadow.Get(context.Background(), key)
// Compare results
if err != shadowErr {
log.Printf("shadow: error mismatch for %s: primary=%v shadow=%v", key, err, shadowErr)
return
}
if err == nil && !s.compare(primaryVal, shadowVal) {
s.onMismatch(key, primaryVal, shadowVal)
}
}()
return primaryVal, err
}
Usage
shadowStore := &ShadowReadStore[User]{
primary: oldStore,
shadow: newStore,
compare: func(a, b *User) bool {
return reflect.DeepEqual(a, b)
},
onMismatch: func(key string, primary, shadow *User) {
log.Printf("MISMATCH %s: primary=%+v shadow=%+v", key, primary, shadow)
metrics.Increment("migration.mismatch")
},
}
Validation Utilities
Compare Stores
func ValidateStores[T any](
ctx context.Context,
source *grub.Store[T],
dest *grub.Store[T],
prefix string,
) (int, []string, error) {
const batchSize = 100
var mismatches []string
total := 0
marker := prefix
for {
keys, err := source.List(ctx, marker, batchSize)
if err != nil {
return 0, nil, err
}
if len(keys) == 0 {
break
}
sourceVals, _ := source.GetBatch(ctx, keys)
destVals, _ := dest.GetBatch(ctx, keys)
for key, sourceVal := range sourceVals {
total++
destVal, exists := destVals[key]
if !exists {
mismatches = append(mismatches, key+" (missing)")
continue
}
if !reflect.DeepEqual(sourceVal, destVal) {
mismatches = append(mismatches, key+" (different)")
}
}
if len(keys) < batchSize {
break
}
marker = keys[len(keys)-1] + "\x00"
}
return total, mismatches, nil
}
Sync Missing Keys
func SyncMissing[T any](
ctx context.Context,
source *grub.Store[T],
dest *grub.Store[T],
keys []string,
) error {
for _, key := range keys {
val, err := source.Get(ctx, key)
if err != nil {
continue
}
if err := dest.Set(ctx, key, val, 0); err != nil {
return err
}
}
return nil
}
Bucket Migration
Migrate blob storage between providers.
func MigrateBucket[T any](
ctx context.Context,
source *grub.Bucket[T],
dest *grub.Bucket[T],
prefix string,
) error {
const batchSize = 50
marker := prefix
for {
infos, err := source.List(ctx, marker, batchSize)
if err != nil {
return err
}
if len(infos) == 0 {
break
}
for _, info := range infos {
obj, err := source.Get(ctx, info.Key)
if err != nil {
return fmt.Errorf("reading %s: %w", info.Key, err)
}
if err := dest.Put(ctx, obj); err != nil {
return fmt.Errorf("writing %s: %w", info.Key, err)
}
fmt.Printf("Migrated %s (%d bytes)\n", info.Key, info.Size)
}
if len(infos) < batchSize {
break
}
marker = infos[len(infos)-1].Key
}
return nil
}
Database Migration
For SQL databases, migrations typically involve schema changes rather than provider changes.
Provider-Agnostic Schema
Use grub's Database wrapper with different renderers:
// Development: SQLite
devDB := grub.NewDatabase[User](sqliteConn, "users", sqlite.New())
// Production: PostgreSQL
prodDB := grub.NewDatabase[User](pgConn, "users", postgres.New())
Data Export/Import
func ExportTable[T any](ctx context.Context, db *grub.Database[T]) ([]*T, error) {
return db.ExecQuery(ctx, grub.QueryAll, nil)
}
func ImportTable[T any](ctx context.Context, db *grub.Database[T], records []*T, keyFn func(*T) string) error {
for _, record := range records {
if err := db.Set(ctx, keyFn(record), record); err != nil {
return err
}
}
return nil
}
Rollback Strategy
Always plan for rollback.
type MigrationState struct {
Phase string // "dual_write", "shadow_read", "cutover", "complete"
StartedAt time.Time
CutoverAt *time.Time
RollbackAt *time.Time
KeysMigrated int
KeysValidated int
MismatchCount int
}
func (m *MigrationState) CanRollback() bool {
// Can rollback if dual write is still active
return m.Phase == "dual_write" || m.Phase == "shadow_read"
}
func (m *MigrationState) Rollback() {
m.Phase = "rolled_back"
now := time.Now()
m.RollbackAt = &now
}
Checklist
Before Migration
- Estimate data volume and migration time
- Plan maintenance window (if big bang)
- Set up monitoring for dual write errors
- Test migration on staging
- Document rollback procedure
During Migration
- Monitor error rates
- Check replication lag (if applicable)
- Validate sample records
- Watch resource utilization
After Migration
- Run full validation
- Monitor for mismatches
- Keep old store available for rollback period
- Clean up old infrastructure after confidence period