transx

package module
v0.0.0-...-b78be72 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

README

transx

A Go-based data migration library that supports multiple transfer methods for moving data between databases and storage systems.

Features

  • Error-Only Approach: Structured error handling for seamless integration with logging frameworks (zerolog, logrus, etc.)
  • Multi-Protocol Transfer: Core data transfer functionality with support for rsync and object-storage-api methods
  • Optional Data Processing: Backup and restore operations as supplementary features when needed
  • Explicit Configuration: User-specified transfer methods (no auto-detection)
  • Direct & Relay Modes: Flexible migration patterns supporting both direct transfers and relay node scenarios
  • Data Integrity: Built-in validation and verification with comprehensive error context
  • Sensitive Data Encryption: Hybrid encryption (AES-256-GCM + RSA-OAEP) for secure transmission of credentials

Quick Start

import "github.com/cloud-barista/cm-beetle/transx"

// Core transfer operation
err := transx.Transfer(dataModel)
if err != nil {
    log.Error().Err(err).Msg("Transfer failed")
}

// Complete migration with optional backup/restore
err = transx.MigrateData(dataModel)
if err != nil {
    // Rich error context available for logging frameworks
    log.Error().Err(err).Msg("Migration failed")
}

Transfer Methods

Method Description Use Case
rsync SSH-based file transfers using rsync Remote server migrations with authentication
object-storage-api HTTP-based transfers with Object Storage APIs S3-compatible storage (CB-Spider, AWS S3, etc.)

Note: File operations on the same host are handled automatically when endpoint is empty.

Architecture

The library implements a Transfer-Centric Data Migration Model with the following components:

Core Functionality:

  • Transfer: Move data between systems using specified transfer methods (rsync, object-storage-api)

Optional Operations (when backup/restore commands are provided):

  • Backup: Export data from source systems before transfer
  • Restore: Import transferred data into destination systems

The transfer operation is the primary focus, with backup and restore serving as optional pre/post-processing steps.

Transfer Modes
  • Direct Mode: Source → Destination (at least one endpoint is local)
  • Relay Mode: Source → Relay Node → Destination (both endpoints are remote)

Error Handling

The library implements an Error-Only Approach with unified error handling:

// Unified OperationError provides rich context for all operations
type OperationError struct {
    Operation   string            // "backup", "restore", "transfer"
    Method      string            // transfer method (for transfer operations)
    Source      string            // source path/endpoint
    Destination string            // destination path/endpoint
    Command     string            // executed command (for backup/restore)
    Output      string            // command output (for backup/restore)
    IsRelayMode bool              // relay mode flag (for transfer)
    Context     map[string]string // additional context information
    Err         error             // underlying error
}

// Simple interface for users - just use err.Error()
// Advanced users can access rich context via type assertion

The transfer operation always executes, while backup and restore are conditional based on configuration.

Examples

MariaDB Migration

See examples/mariadb-migration for a complete database migration example:

  • Direct mode migration (local ↔ remote)
  • Relay mode migration (remote ↔ remote)
  • Individual step execution (backup, transfer, restore)
  • Comprehensive testing and validation
Object Storage Migration

See examples/object-storage for Object Storage integration:

  • CB-Spider presigned URL integration
  • Bidirectional transfers (local ↔ object storage)
  • Cross-cloud migration scenarios
  • AWS S3 compatibility

Installation

go get github.com/cloud-barista/cm-beetle/transx

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.


Appendix

Sensitive Data Encryption

[!NOTE] transx was developed as an internal tool/package for easy data transfer between systems, so secure transmission of sensitive data was not a primary design goal. However, we provide a minimal encryption option for users who need additional security.

transx provides built-in encryption for sensitive fields like SSH private keys and S3 credentials. The encryption uses a hybrid approach combining AES-256-GCM for data encryption and RSA-OAEP for key exchange, supporting data of any size.

Sensitive Fields

The following fields are automatically encrypted when using EncryptModel:

Field Path Description
*.filesystem.ssh.privateKey SSH private key content
*.objectStorage.minio.accessKeyId S3 access key ID
*.objectStorage.minio.secretAccessKey S3 secret access key
*.objectStorage.spider.auth.basic.password Spider Basic auth password
*.objectStorage.spider.auth.jwt.token Spider JWT token
*.objectStorage.tumblebug.auth.basic.password Tumblebug Basic auth password
*.objectStorage.tumblebug.auth.jwt.token Tumblebug JWT token
Encryption Algorithm

transx uses hybrid encryption combining asymmetric and symmetric cryptography:

Component Algorithm Purpose
Key Exchange RSA-2048-OAEP-SHA256 Securely transmit AES key
Data Encryption AES-256-GCM Fast encryption for any data size

Why Hybrid?

  • RSA alone can only encrypt ~245 bytes (2048-bit key limit)
  • AES-GCM handles unlimited data size efficiently
  • Each field gets a unique AES key (no key reuse)
Encryption Workflow
┌─────────────────────────────────────────────────────────────────────────┐
│                     Encryption Workflow                                 │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   CLIENT                                SERVER                          │
│   ──────                                ──────                          │
│                                                                         │
│   ① Request Key ─────────────────────► Generate RSA KeyPair            │
│                                         Store PrivateKey in KeyStore    │
│              ◄───────────────────────── Return PublicKeyBundle          │
│                                         (publicKey, keyId, expiresAt)   │
│                                                                         │
│   ② EncryptModel()                                                      │
│      For each sensitive field:                                          │
│      - Generate random AES-256 key (32 bytes)                           │
│      - Generate random nonce (12 bytes)                                 │
│      - Encrypt data with AES-256-GCM → ciphertext                       │
│      - Encrypt AES key with RSA-OAEP → encryptedKey                     │
│      - Package as Base64(JSON{ek, n, ct})                               │
│                                                                         │
│   ③ Send encrypted model ────────────► Receive                         │
│                                                                         │
│                                         ④ DecryptModel()                │
│                                            - Lookup PrivateKey by keyId │
│                                            - For each encrypted field:  │
│                                              - Decode Base64 payload    │
│                                              - Decrypt AES key with RSA │
│                                              - Decrypt data with AES    │
│                                            - Auto-delete key (one-time) │
│                                                                         │
│                                         ⑤ Execute migration             │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘
Encrypted Field Payload Structure

Each encrypted field is stored as a Base64-encoded JSON payload:

{
  "v": 1, // Schema version for future compatibility
  "ek": "...", // AES key encrypted with RSA-OAEP (Base64)
  "n": "...", // AES-GCM nonce, 12 bytes (Base64)
  "ct": "..." // Ciphertext encrypted with AES-GCM (Base64)
}

Key Points:

  • Each sensitive field has its own unique AES key (generated per-field)
  • RSA public key is shared once; AES keys are embedded in each field's payload
  • Server only stores RSA private key; AES keys travel with the encrypted data
Usage: Plaintext Transmission (No Encryption)

For internal/trusted environments where encryption is not required:

Server Side

package main

import (
    "encoding/json"
    "net/http"

    "github.com/cloud-barista/cm-beetle/transx"
)

// main initializes the HTTP server for plaintext data migration.
func main() {
    http.HandleFunc("/api/v1/migration", handleMigration)
    http.ListenAndServe(":8080", nil)
}

// handleMigration processes plaintext migration requests.
// It validates the request body and executes the migration directly.
func handleMigration(w http.ResponseWriter, r *http.Request) {
    var model transx.DataMigrationModel
    if err := json.NewDecoder(r.Body).Decode(&model); err != nil {
        http.Error(w, "Invalid request", http.StatusBadRequest)
        return
    }

    // Validate the model
    if err := transx.Validate(model); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    // Execute migration directly with plaintext model
    if err := transx.MigrateData(model); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    w.WriteHeader(http.StatusOK)
    json.NewEncoder(w).Encode(map[string]string{"status": "completed"})
}

Client Side

package main

import (
    "bytes"
    "encoding/json"
    "log"
    "net/http"

    "github.com/cloud-barista/cm-beetle/transx"
)

// main demonstrates sending a plaintext migration request to the server.
// Use this approach only in trusted/internal environments.
func main() {
    // Create migration model with sensitive data (plaintext)
    model := transx.DataMigrationModel{
        Source: transx.DataLocation{
            StorageType: transx.StorageTypeFilesystem,
            Path:        "/data/source",
            Filesystem: &transx.FilesystemAccess{
                AccessType: transx.AccessTypeSSH,
                SSH: &transx.SSHConfig{
                    Host:       "192.168.1.100",
                    Port:       22,
                    Username:   "ubuntu",
                    PrivateKey: "-----BEGIN RSA PRIVATE KEY-----\nMIIE...\n-----END RSA PRIVATE KEY-----",
                },
            },
        },
        Destination: transx.DataLocation{
            StorageType: transx.StorageTypeObjectStorage,
            Path:        "my-bucket/backup",
            ObjectStorage: &transx.ObjectStorageAccess{
                AccessType: transx.AccessTypeMinio,
                Minio: &transx.S3MinioConfig{
                    Endpoint:        "s3.amazonaws.com",
                    AccessKeyId:     "AKIAIOSFODNN7EXAMPLE",
                    SecretAccessKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
                    Region:          "us-east-1",
                },
            },
        },
    }

    // Send plaintext model directly
    jsonData, _ := json.Marshal(model)
    resp, err := http.Post("http://localhost:8080/api/v1/migration",
        "application/json", bytes.NewReader(jsonData))
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    log.Printf("Migration status: %d", resp.StatusCode)
}

For secure transmission of sensitive data over untrusted networks:

Server Side

package main

import (
    "encoding/json"
    "net/http"
    "time"

    "github.com/cloud-barista/cm-beetle/transx"
)

// Global KeyStore for managing encryption keys
var keyStore = transx.NewKeyStore()

// main initializes the HTTP server with encryption key management.
// It starts a background routine to clean up expired keys.
func main() {
    // Start background cleanup for expired keys (every 10 minutes)
    stopCleanup := make(chan struct{})
    keyStore.StartCleanupRoutine(10*time.Minute, stopCleanup)
    defer close(stopCleanup)

    http.HandleFunc("/api/v1/encryption/key", handleGetKey)
    http.HandleFunc("/api/v1/migration/secure", handleSecureMigration)
    http.ListenAndServe(":8080", nil)
}

// handleGetKey generates a new RSA key pair and returns the public key bundle.
// The key is stored in KeyStore and valid for 30 minutes (one-time use).
func handleGetKey(w http.ResponseWriter, r *http.Request) {
    // Generate a one-time use key (30 minutes validity)
    keyPair, err := keyStore.GenerateKeyPair(30 * time.Minute)
    if err != nil {
        http.Error(w, "Key generation failed", http.StatusInternalServerError)
        return
    }

    // Export public key bundle for client
    bundle, err := keyPair.ExportPublicBundle()
    if err != nil {
        http.Error(w, "Key export failed", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(bundle)
}

// handleSecureMigration processes encrypted migration requests.
// It decrypts the model using the one-time key, then executes the migration.
// The encryption key is automatically deleted after successful decryption.
func handleSecureMigration(w http.ResponseWriter, r *http.Request) {
    var model transx.DataMigrationModel
    if err := json.NewDecoder(r.Body).Decode(&model); err != nil {
        http.Error(w, "Invalid request", http.StatusBadRequest)
        return
    }

    // Check if model is encrypted
    if !model.IsEncrypted() {
        http.Error(w, "Encrypted model required", http.StatusBadRequest)
        return
    }

    // Decrypt the model (key is auto-deleted after decryption - one-time use)
    decryptedModel, err := transx.DecryptModel(model)
    if err != nil {
        switch err {
        case transx.ErrKeyNotFound:
            http.Error(w, "Key not found or already used", http.StatusBadRequest)
        case transx.ErrKeyExpired:
            http.Error(w, "Key has expired", http.StatusBadRequest)
        default:
            http.Error(w, "Decryption failed", http.StatusInternalServerError)
        }
        return
    }

    // Now 'decryptedModel' contains plaintext sensitive data
    // Execute migration
    if err := transx.MigrateData(decryptedModel); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    w.WriteHeader(http.StatusOK)
    json.NewEncoder(w).Encode(map[string]string{"status": "completed"})
}

Client Side

package main

import (
    "bytes"
    "encoding/json"
    "log"
    "net/http"

    "github.com/cloud-barista/cm-beetle/transx"
)

// main demonstrates the complete encrypted transmission workflow:
// 1. Request public key from server
// 2. Encrypt sensitive fields in the model
// 3. Send encrypted model to server
func main() {
    // Step 1: Request public key from server
    resp, err := http.Get("http://localhost:8080/api/v1/encryption/key")
    if err != nil {
        log.Fatal("Failed to get encryption key:", err)
    }
    defer resp.Body.Close()

    var bundle transx.PublicKeyBundle
    if err := json.NewDecoder(resp.Body).Decode(&bundle); err != nil {
        log.Fatal("Failed to parse key bundle:", err)
    }

    log.Printf("Received key: %s (expires: %v)", bundle.KeyID, bundle.ExpiresAt)

    // Step 2: Parse the public key
    publicKey, err := transx.ParsePublicKeyBundle(bundle)
    if err != nil {
        log.Fatal("Failed to parse public key:", err)
    }

    // Step 3: Create migration model with sensitive data
    model := transx.DataMigrationModel{
        Source: transx.DataLocation{
            StorageType: transx.StorageTypeFilesystem,
            Path:        "/data/source",
            Filesystem: &transx.FilesystemAccess{
                AccessType: transx.AccessTypeSSH,
                SSH: &transx.SSHConfig{
                    Host:       "192.168.1.100",
                    Port:       22,
                    Username:   "ubuntu",
                    // This 3KB+ private key will be encrypted using hybrid encryption
                    PrivateKey: "-----BEGIN RSA PRIVATE KEY-----\nMIIEpQIBAAKCAQEA...(long key content)...\n-----END RSA PRIVATE KEY-----",
                },
            },
        },
        Destination: transx.DataLocation{
            StorageType: transx.StorageTypeObjectStorage,
            Path:        "my-bucket/backup",
            ObjectStorage: &transx.ObjectStorageAccess{
                AccessType: transx.AccessTypeMinio,
                Minio: &transx.S3MinioConfig{
                    Endpoint:        "s3.amazonaws.com",
                    AccessKeyId:     "AKIAIOSFODNN7EXAMPLE",
                    SecretAccessKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
                    Region:          "us-east-1",
                },
            },
        },
    }

    // Step 4: Encrypt sensitive fields
    encryptedModel, err := transx.EncryptModel(model, publicKey, bundle.KeyID)
    if err != nil {
        log.Fatal("Failed to encrypt model:", err)
    }

    log.Printf("Model encrypted with key: %s", encryptedModel.EncryptionKeyID)

    // Step 5: Send encrypted model to server
    jsonData, _ := json.Marshal(encryptedModel)
    resp, err = http.Post("http://localhost:8080/api/v1/migration/secure",
        "application/json", bytes.NewReader(jsonData))
    if err != nil {
        log.Fatal("Failed to send request:", err)
    }
    defer resp.Body.Close()

    log.Printf("Migration status: %d", resp.StatusCode)
}
Encrypted JSON Example

When transmitted, the encrypted model looks like this:

{
  "source": {
    "storageType": "filesystem",
    "path": "/data/source",
    "filesystem": {
      "accessType": "ssh",
      "ssh": {
        "host": "192.168.1.100",
        "port": 22,
        "username": "ubuntu",
        "privateKey": "eyJ2IjoxLCJlayI6IkFBRUJBd1FGQmdj..."
      }
    }
  },
  "destination": {
    "storageType": "objectstorage",
    "path": "my-bucket/backup",
    "objectStorage": {
      "accessType": "minio",
      "minio": {
        "endpoint": "s3.amazonaws.com",
        "accessKeyId": "eyJ2IjoxLCJlayI6IkhJSktMTU5P...",
        "secretAccessKey": "eyJ2IjoxLCJlayI6IlBRUlNUVVZX...",
        "region": "us-east-1"
      }
    }
  },
  "encryptionKeyId": "key-a1b2c3d4e5f6..."
}

Note: The encryptionKeyId field indicates which key was used for encryption. Sensitive fields are automatically encrypted based on the predefined list in transx-sec.go. No need to specify which fields are encrypted - the server knows from the internal sensitiveFields definition.

Security Considerations
Aspect Implementation
Algorithm Hybrid: AES-256-GCM (data) + RSA-2048-OAEP (key exchange)
Key Validity Default 30 minutes (configurable)
One-Time Use Keys are automatically deleted after decryption
Large Data Support Hybrid encryption handles data of any size
Transport Security Always use HTTPS in production
Key Storage In-memory only (keys don't survive restart)

Documentation

Overview

Package transx provides transfer executors for data migration. Supports:

  • Rsync: Local/remote filesystem transfers with SSH support
  • S3: Object Storage transfers using presigned URLs

Package transx provides S3-compatible Object Storage providers. Supported providers:

  • Direct: AWS S3, MinIO, and S3-compatible storage (via minio-go SDK)
  • Spider: via CB-Spider Object Storage API
  • Tumblebug: via CB-Tumblebug Object Storage API

Index

Constants

View Source
const (
	StageBackup   = "backup"
	StageTransfer = "transfer"
	StageRestore  = "restore"
)

Migration stages

View Source
const (
	OperationBackup   = "backup"
	OperationRestore  = "restore"
	OperationTransfer = "transfer"
	OperationPreCmd   = "pre-command"
	OperationPostCmd  = "post-command"
)

Operation types

View Source
const (
	MethodLocal = "local" // Local filesystem transfer
	MethodSSH   = "ssh"   // Remote transfer via SSH/rsync
	MethodS3    = "s3"    // S3-compatible object storage
)

Transfer method constants

View Source
const (
	CategoryRsync         = "rsync"          // rsync-based transfers (local/ssh)
	CategoryObjectStorage = "object-storage" // Object storage transfers (S3, etc.)
)

Transfer category constants

View Source
const (
	// StorageTypeFilesystem represents local or remote filesystem storage.
	StorageTypeFilesystem = "filesystem"

	// StorageTypeObjectStorage represents S3-compatible object storage.
	StorageTypeObjectStorage = "objectstorage"
)
View Source
const (
	// AccessTypeLocal represents local filesystem access (no network).
	AccessTypeLocal = "local"

	// AccessTypeSSH represents remote filesystem access via SSH/rsync.
	AccessTypeSSH = "ssh"
)

Filesystem access types

View Source
const (
	// AccessTypeMinio represents direct S3 SDK access using minio-go.
	AccessTypeMinio = "minio"

	// AccessTypeSpider represents access via CB-Spider Object Storage API.
	AccessTypeSpider = "spider"

	// AccessTypeTumblebug represents access via CB-Tumblebug Object Storage API.
	AccessTypeTumblebug = "tumblebug"
)

Object Storage access types

View Source
const (
	// StrategyAuto automatically selects the best transfer method.
	StrategyAuto = "auto"

	// StrategyDirect forces direct transfer (e.g., SSH agent forwarding).
	StrategyDirect = "direct"

	// StrategyRelay forces relay via local machine.
	StrategyRelay = "relay"
)
View Source
const (
	PipelineFilesystemTransfer    = "filesystem-transfer"
	PipelineObjectStorageTransfer = "objectstorage-transfer"
	PipelineCrossStorageTransfer  = "cross-storage-transfer"

	StepRsyncTransfer   = "rsync-transfer"
	StepDownloadFromS3  = "download-from-s3"
	StepUploadToS3      = "upload-to-s3"
	StepRsyncFromServer = "rsync-from-server"
	StepRsyncToServer   = "rsync-to-server"
)
View Source
const (
	// AuthTypeBasic represents HTTP Basic Authentication.
	AuthTypeBasic = "basic"

	// AuthTypeJWT represents JWT (JSON Web Token) Authentication.
	AuthTypeJWT = "jwt"
)

Auth types

View Source
const (
	// DefaultStagingPath is the default local staging directory for relay transfers.
	DefaultStagingPath = "/tmp/transx-staging"
)

Variables

View Source
var (
	// ErrKeyNotFound is returned when the requested key is not in the store.
	ErrKeyNotFound = fieldsec.ErrKeyNotFound

	// ErrKeyExpired is returned when the key has expired.
	ErrKeyExpired = fieldsec.ErrKeyExpired

	// ErrKeyMismatch is returned when the key ID doesn't match.
	ErrKeyMismatch = fieldsec.ErrKeyMismatch

	// ErrDecryptionFailed is returned when decryption fails.
	ErrDecryptionFailed = fieldsec.ErrDecryptionFailed

	// ErrInvalidPublicKey is returned when public key parsing fails.
	ErrInvalidPublicKey = fieldsec.ErrInvalidPublicKey
)
View Source
var (
	NewKeyStore          = fieldsec.NewKeyStore
	ParsePublicKeyBundle = fieldsec.ParsePublicKeyBundle
)

Re-export functions from fieldsec

Functions

func Backup

func Backup(dmm DataMigrationModel) error

Backup executes the PreCmd defined in the source DataLocation. Deprecated: Use executePreCommand directly or set Source.PreCmd and call MigrateData.

func GetCategory

func GetCategory(method string) string

GetCategory returns the transfer category for the given method.

func GetKeyExpiry

func GetKeyExpiry() time.Duration

GetKeyExpiry returns the configured key expiry duration. Panics if InitKeyStore() was not called.

func InitKeyStore

func InitKeyStore(keyExpiryDuration, cleanupInterval time.Duration)

InitKeyStore initializes the singleton KeyStore and starts the cleanup routine. This should be called once from main() during server startup. Thread-safe: uses sync.Once to ensure single initialization.

Parameters:

  • keyExpiryDuration: duration after which generated keys expire (e.g., 30*time.Minute)
  • cleanupInterval: interval for background cleanup of expired keys (e.g., 10*time.Minute)

func IsObjectStorageMethod

func IsObjectStorageMethod(method string) bool

IsObjectStorageMethod returns true if the method uses object storage.

func IsRsyncMethod

func IsRsyncMethod(method string) bool

IsRsyncMethod returns true if the method uses rsync for transfer.

func MigrateData

func MigrateData(dmm DataMigrationModel) error

MigrateData manages the complete data migration workflow: 1. If Source.PreCmd is defined, perform pre-processing (e.g., backup) 2. Always perform Transfer 3. If Destination.PostCmd is defined, perform post-processing (e.g., restore)

func ParseBucketAndKey

func ParseBucketAndKey(path string) (bucket, key string)

ParseBucketAndKey parses the path into bucket and key components. Path format: "bucket-name/path/to/object" or "bucket-name/"

func Restore

func Restore(dmm DataMigrationModel) error

Restore executes the PostCmd defined in the destination DataLocation. Deprecated: Use executePostCommand directly or set Destination.PostCmd and call MigrateData.

func Transfer

func Transfer(dmm DataMigrationModel) error

Transfer runs the data transfer as defined by the given DataMigrationModel. It automatically selects the appropriate transfer strategy based on source/destination types.

func Validate

func Validate(dmm DataMigrationModel) error

Validate checks if DataMigrationModel satisfies requirements.

Types

type AuthConfig

type AuthConfig struct {
	AuthType string           `json:"authType" validate:"required"` // "basic", "jwt" ("apikey", "oauth" not tested yet)
	Basic    *BasicAuthConfig `json:"basic,omitempty"`              // For authType="basic"
	JWT      *JWTAuthConfig   `json:"jwt,omitempty"`                // For authType="jwt"

}

AuthConfig defines authentication configuration. Use authType to specify the authentication method.

type BasicAuthConfig

type BasicAuthConfig struct {
	Username string `json:"username" validate:"required"`
	Password string `json:"password" validate:"required"`
}

BasicAuthConfig defines HTTP Basic Authentication credentials.

type DataLocation

type DataLocation struct {
	// StorageType: What kind of storage
	// "filesystem": Local or remote filesystem
	// "objectstorage": S3-compatible object storage
	StorageType string `json:"storageType" validate:"required,oneof=filesystem objectstorage"`

	// Path to the data
	// For Filesystem: File path (e.g., "/data", "/home/user/data")
	// For ObjectStorage: Bucket/Key (e.g., "my-bucket/my-key")
	Path string `json:"path" validate:"required"`

	// Access configuration (one of the following based on StorageType)
	Filesystem    *FilesystemAccess    `json:"filesystem,omitempty"`    // For storageType="filesystem"
	ObjectStorage *ObjectStorageAccess `json:"objectStorage,omitempty"` // For storageType="objectstorage"

	// Filter defines file filtering options
	Filter *FilterOption `json:"filter,omitempty"`

	// Hooks for pre/post processing
	PreCmd  string `json:"preCmd,omitempty"`  // Command to run before transfer (source only)
	PostCmd string `json:"postCmd,omitempty"` // Command to run after transfer (destination only)
}

DataLocation defines any data location with separated storage type and access method.

func (DataLocation) IsFilesystem

func (loc DataLocation) IsFilesystem() bool

IsFilesystem returns true if the location uses filesystem storage.

func (DataLocation) IsLocal

func (loc DataLocation) IsLocal() bool

IsLocal returns true if the location is local filesystem.

func (DataLocation) IsObjectStorage

func (loc DataLocation) IsObjectStorage() bool

IsObjectStorage returns true if the location uses object storage.

func (DataLocation) IsRemote

func (loc DataLocation) IsRemote() bool

IsRemote returns true if the location requires network access.

func (DataLocation) NeedsLocalStaging

func (loc DataLocation) NeedsLocalStaging() bool

NeedsLocalStaging returns true if this location needs local staging for relay.

type DataMigrationModel

type DataMigrationModel struct {
	Source      DataLocation `json:"source" validate:"required"`
	Destination DataLocation `json:"destination" validate:"required"`

	// Strategy determines how the transfer is orchestrated.
	// "auto": Automatically select best method.
	// "direct": Force direct transfer (e.g., SSH agent forwarding).
	// "relay": Force relay via local machine.
	Strategy string `json:"strategy,omitempty" default:"auto" validate:"omitempty,oneof=auto direct relay"`

	// EncryptionKeyID indicates that sensitive fields are encrypted.
	// Empty string means plaintext, non-empty means encrypted with the specified key.
	// The key is one-time use and will be deleted after decryption.
	EncryptionKeyID string `json:"encryptionKeyId,omitempty"`
}

DataMigrationModel defines a single data migration task.

func DecryptModel

func DecryptModel(model DataMigrationModel) (DataMigrationModel, error)

DecryptModel decrypts all sensitive fields in DataMigrationModel using the singleton KeyStore. After successful decryption, the key is automatically deleted (one-time use). Panics if InitKeyStore() was not called.

Parameters:

  • model: the DataMigrationModel to decrypt

Returns a new DataMigrationModel with decrypted fields and EncryptionKeyID cleared.

func DecryptModelWith

func DecryptModelWith(model DataMigrationModel, keyPair *KeyPair) (DataMigrationModel, error)

DecryptModelWith decrypts all sensitive fields in DataMigrationModel using the provided key pair. If the model is not encrypted (EncryptionKeyID is empty), returns as-is. Use this for testing or when managing keys externally.

Parameters:

  • model: the DataMigrationModel to decrypt
  • keyPair: the key pair containing the private key

Returns a new DataMigrationModel with decrypted fields and EncryptionKeyID cleared.

func EncryptModel

func EncryptModel(model DataMigrationModel, publicKey *rsa.PublicKey, keyID string) (DataMigrationModel, error)

EncryptModel encrypts all sensitive fields in DataMigrationModel. Uses hybrid encryption (AES-256-GCM + RSA-OAEP) for fields of any size.

Parameters:

  • model: the DataMigrationModel to encrypt
  • publicKey: RSA public key for encryption
  • keyID: identifier for the key (for server-side lookup)

Returns a new DataMigrationModel with encrypted fields and EncryptionKeyID set.

func (DataMigrationModel) IsEncrypted

func (m DataMigrationModel) IsEncrypted() bool

IsEncrypted returns true if the model has encrypted sensitive fields.

type Executor

type Executor interface {
	// Execute performs the transfer from source to destination.
	// Returns an error if the transfer fails.
	Execute(source, destination DataLocation) error
}

Executor defines the interface for transfer operations.

type FilesystemAccess

type FilesystemAccess struct {
	// AccessType: How to access the filesystem
	// "local": Local filesystem (no network)
	// "ssh": Remote filesystem via SSH
	AccessType string `json:"accessType" validate:"required,oneof=local ssh"`

	// SSH configuration (required when accessType="ssh")
	SSH *SSHConfig `json:"ssh,omitempty"`
}

FilesystemAccess defines how to access filesystem storage.

type FilterOption

type FilterOption struct {
	Include []string `json:"include,omitempty"` // Patterns to include (e.g., "*.txt", "data/**")
	Exclude []string `json:"exclude,omitempty"` // Patterns to exclude (e.g., "*.log", "temp/**")
}

FilterOption defines file filtering options for transfers.

type JWTAuthConfig

type JWTAuthConfig struct {
	Token string `json:"token" validate:"required"`
}

JWTAuthConfig defines JWT authentication configuration.

type KeyPair

type KeyPair = fieldsec.KeyPair

Re-export types from fieldsec for convenience

type KeyStore

type KeyStore = fieldsec.KeyStore

Re-export types from fieldsec for convenience

func GetKeyStore

func GetKeyStore() *KeyStore

GetKeyStore returns the singleton KeyStore instance. Panics if InitKeyStore() was not called.

type MigrationError

type MigrationError struct {
	Stage string // "backup", "transfer", or "restore"
	Err   error
}

MigrationError represents an error during the migration process

func (*MigrationError) Error

func (e *MigrationError) Error() string

func (*MigrationError) Unwrap

func (e *MigrationError) Unwrap() error

type MinioConfig

type MinioConfig struct {
	Endpoint        string `json:"endpoint" validate:"required"`
	AccessKeyId     string `json:"accessKeyId" validate:"required"`
	SecretAccessKey string `json:"secretAccessKey" validate:"required"`
	Region          string `json:"region,omitempty" default:"us-east-1"`
	UseSSL          bool   `json:"useSSL,omitempty" default:"true"`
}

MinioConfig defines configuration for S3-compatible storage access using minio-go SDK. Supports: AWS S3, MinIO, Ceph, DigitalOcean Spaces, etc.

MinioConfig is defined here as it's S3-specific. SpiderConfig and TumblebugConfig are defined in model.go as they're shared with the main transx package.

type MinioProvider

type MinioProvider struct {
	// contains filtered or unexported fields
}

MinioProvider implements Provider using minio-go SDK. Supports: AWS S3, MinIO, Ceph, DigitalOcean Spaces, and other S3-compatible services.

func NewMinioProvider

func NewMinioProvider(config *MinioConfig, bucket string) (*MinioProvider, error)

NewMinioProvider creates a new MinioProvider from MinioConfig.

func (*MinioProvider) DownloadFile

func (p *MinioProvider) DownloadFile(key, localPath string) error

DownloadFile downloads a file from S3 to local path.

func (*MinioProvider) GeneratePresignedURL

func (p *MinioProvider) GeneratePresignedURL(action, key string) (string, error)

GeneratePresignedURL generates a presigned URL for S3 operations.

func (*MinioProvider) GetBucket

func (p *MinioProvider) GetBucket() string

GetBucket returns the bucket name.

func (*MinioProvider) ListObjects

func (p *MinioProvider) ListObjects(prefix string) ([]ObjectInfo, error)

ListObjects lists objects in the bucket with the given prefix.

func (*MinioProvider) UploadFile

func (p *MinioProvider) UploadFile(localPath, key string) error

UploadFile uploads a local file to S3.

type ObjectInfo

type ObjectInfo struct {
	Key          string // Object key (path)
	Size         int64  // Size in bytes
	LastModified string // Last modified timestamp
	ETag         string // Entity tag (hash)
}

ObjectInfo represents metadata about a storage object.

type ObjectStorageAccess

type ObjectStorageAccess struct {
	// AccessType: How to access object storage
	// "minio": Direct S3 SDK access using minio-go
	// "spider": Via CB-Spider Object Storage API
	// "tumblebug": Via CB-Tumblebug Object Storage API
	AccessType string `json:"accessType" validate:"required,oneof=minio spider tumblebug"`

	// Provider-specific configurations (one required based on accessType)
	Minio     *S3MinioConfig   `json:"minio,omitempty"`     // For accessType="minio"
	Spider    *SpiderConfig    `json:"spider,omitempty"`    // For accessType="spider"
	Tumblebug *TumblebugConfig `json:"tumblebug,omitempty"` // For accessType="tumblebug"
}

ObjectStorageAccess defines how to access object storage.

type OperationError

type OperationError struct {
	Operation   string            // "backup", "restore", "transfer"
	Method      string            // transfer method (for transfer operations)
	Source      string            // source path/endpoint
	Destination string            // destination path/endpoint
	Command     string            // executed command (for backup/restore)
	Output      string            // command output (for backup/restore)
	IsRelayMode bool              // relay mode flag (for transfer)
	Context     map[string]string // additional context information
	Err         error             // underlying error
}

OperationError provides detailed context about transx operation failures This unified error type handles backup, restore, and transfer operations

func (*OperationError) Error

func (e *OperationError) Error() string

func (*OperationError) GetMethod

func (e *OperationError) GetMethod() string

GetMethod returns the transfer method (applicable to transfer operations)

func (*OperationError) GetOutput

func (e *OperationError) GetOutput() string

GetOutput returns the command output for debugging (applicable to backup/restore operations)

func (*OperationError) IsOperation

func (e *OperationError) IsOperation(operation string) bool

IsOperation checks if the error is for a specific operation type

func (*OperationError) Unwrap

func (e *OperationError) Unwrap() error

type Pipeline

type Pipeline struct {
	Name     string
	Strategy string
	Steps    []Step
}

Pipeline represents a planned transfer with multiple steps.

func Plan

func Plan(model DataMigrationModel) (*Pipeline, error)

Plan analyzes the DataMigrationModel and returns the optimal transfer Pipeline. The routing is based on StorageType combinations (3 cases only).

func (*Pipeline) Execute

func (p *Pipeline) Execute() error

Execute runs all steps in the pipeline sequentially.

type PublicKeyBundle

type PublicKeyBundle = fieldsec.PublicKeyBundle

Re-export types from fieldsec for convenience

type RsyncExecutor

type RsyncExecutor struct {
	Mode             TransferMode // Transfer mode (pull, push, agent-forward)
	DeleteExtraneous bool         // --delete: remove extraneous files from destination
	DryRun           bool         // --dry-run: perform trial run without changes
	Verbose          bool         // -v: increase verbosity
	AdditionalArgs   []string     // Additional rsync arguments
	// contains filtered or unexported fields
}

RsyncExecutor implements Executor using rsync for file transfers. Supports three transfer modes: Pull, Push, and Agent Forwarding.

func NewRsyncExecutor

func NewRsyncExecutor(src, dst DataLocation) (*RsyncExecutor, error)

NewRsyncExecutor creates a new RsyncExecutor with automatically determined transfer mode:

  • SSH → SSH: AgentForward
  • SSH → Local: Pull
  • Local → SSH: Push
  • Local → Local: not supported (returns error)

func (*RsyncExecutor) Execute

func (e *RsyncExecutor) Execute(source, destination DataLocation) error

Execute performs rsync transfer from source to destination.

type S3Executor

type S3Executor struct {
	Provider S3Provider // S3 provider for generating presigned URLs
}

S3Executor implements Executor for S3 object storage transfers. Uses presigned URLs for authentication-free upload/download.

func NewS3Executor

func NewS3Executor(provider S3Provider) *S3Executor

NewS3Executor creates a new S3Executor with the given provider.

func (*S3Executor) Execute

func (e *S3Executor) Execute(source, destination DataLocation) error

Execute performs S3 transfer from source to destination.

type S3MinioConfig

type S3MinioConfig struct {
	Endpoint        string `json:"endpoint" validate:"required"`
	AccessKeyId     string `json:"accessKeyId" validate:"required"`
	SecretAccessKey string `json:"secretAccessKey" validate:"required"`
	Region          string `json:"region,omitempty" default:"us-east-1"`
	UseSSL          bool   `json:"useSSL,omitempty" default:"true"`
}

S3MinioConfig defines S3 SDK configuration using minio-go.

type S3Provider

type S3Provider interface {
	// GeneratePresignedURL generates a presigned URL for upload or download.
	// action: "upload" or "download"
	// key: object key (file path within bucket)
	GeneratePresignedURL(action, key string) (string, error)

	// ListObjects lists objects with the given prefix.
	ListObjects(prefix string) ([]ObjectInfo, error)

	// GetBucket returns the bucket/container name for this provider.
	GetBucket() string
}

S3Provider defines the interface for S3-compatible object storage operations.

func NewS3Provider

func NewS3Provider(loc DataLocation) (S3Provider, error)

NewS3Provider creates an S3 provider from DataLocation.

type SSHConfig

type SSHConfig struct {
	// Connection details
	Host           string `json:"host" validate:"required"`
	Port           int    `json:"port,omitempty" default:"22"`
	Username       string `json:"username" validate:"required"`
	ConnectTimeout int    `json:"connectTimeout,omitempty" default:"30"`

	// Authentication (priority: PrivateKey > PrivateKeyPath > Agent > none)
	// At least one authentication method should be available.
	//
	// PrivateKey: PEM-encoded private key content (preferred for injected secrets).
	// In JSON, use single line with \n for newlines:
	//   "privateKey": "-----BEGIN RSA PRIVATE KEY-----\nMIIE...\n-----END RSA PRIVATE KEY-----"
	PrivateKey     string `json:"privateKey,omitempty"`
	PrivateKeyPath string `json:"privateKeyPath,omitempty"` // Path to private key file (legacy, prefer PrivateKey)
	UseAgent       bool   `json:"useAgent,omitempty"`       // Use SSH agent for authentication (supports agent forwarding)

	// Rsync options
	Archive  bool `json:"archive,omitempty" default:"true"`
	Compress bool `json:"compress,omitempty" default:"true"`
	Delete   bool `json:"delete,omitempty"`
	Verbose  bool `json:"verbose,omitempty"`
	DryRun   bool `json:"dryRun,omitempty"`
}

SSHConfig defines SSH connection details and rsync options.

type SpiderConfig

type SpiderConfig struct {
	Endpoint       string      `json:"endpoint" validate:"required"`       // CB-Spider API base URL (e.g., "http://localhost:1024/spider")
	ConnectionName string      `json:"connectionName" validate:"required"` // CB-Spider connection name (e.g., "aws-connection")
	Expires        int         `json:"expires,omitempty" default:"3600"`   // Presigned URL expiration in seconds
	Auth           *AuthConfig `json:"auth,omitempty"`                     // Optional authentication configuration
}

SpiderConfig defines CB-Spider Object Storage API configuration. Endpoint should include /spider prefix (e.g., "http://localhost:1024/spider").

type SpiderProvider

type SpiderProvider struct {
	// contains filtered or unexported fields
}

SpiderProvider implements Provider for CB-Spider S3 Object Storage API. Based on CB-Spider swagger.yaml [S3 Object Storage Management] endpoints.

func NewSpiderProvider

func NewSpiderProvider(config *SpiderConfig, bucket string) (*SpiderProvider, error)

NewSpiderProvider creates a new SpiderProvider.

func (*SpiderProvider) GeneratePresignedURL

func (p *SpiderProvider) GeneratePresignedURL(action, key string) (string, error)

GeneratePresignedURL generates a presigned URL via CB-Spider S3 API. Uses the CB-Spider special feature endpoints:

  • GET /s3/presigned/download/{BucketName}/{ObjectKey} for download
  • GET /s3/presigned/upload/{BucketName}/{ObjectKey} for upload

func (*SpiderProvider) GetBucket

func (p *SpiderProvider) GetBucket() string

GetBucket returns the bucket name.

func (*SpiderProvider) ListObjects

func (p *SpiderProvider) ListObjects(prefix string) ([]ObjectInfo, error)

ListObjects lists objects via CB-Spider S3 API. Uses GET /s3/{BucketName}?ConnectionName=xxx to list objects in bucket.

type Step

type Step struct {
	Name        string
	Source      DataLocation
	Destination DataLocation
	Executor    Executor
}

Step represents a single transfer step in the pipeline.

type TransferMode

type TransferMode string

TransferMode defines how rsync transfer is executed.

const (
	// TransferModePull pulls data from remote source to local.
	// Direction: remote-source → local
	TransferModePull TransferMode = "pull"

	// TransferModePush pushes data from local to remote destination.
	// Direction: local → remote-destination
	TransferModePush TransferMode = "push"

	// TransferModeAgentForward uses SSH Agent Forwarding to execute rsync
	// on the source server, transferring directly to destination.
	// Direction: remote-source → remote-destination (via source server)
	TransferModeAgentForward TransferMode = "agent-forward"
)

type TumblebugConfig

type TumblebugConfig struct {
	Endpoint string      `json:"endpoint" validate:"required"`     // CB-Tumblebug API base URL (e.g., "http://localhost:1323/tumblebug")
	NsId     string      `json:"nsId" validate:"required"`         // Namespace ID for multi-tenancy
	OsId     string      `json:"osId" validate:"required"`         // Object Storage ID
	Expires  int         `json:"expires,omitempty" default:"3600"` // Presigned URL expiration in seconds
	Auth     *AuthConfig `json:"auth,omitempty"`                   // Optional authentication configuration
}

TumblebugConfig defines CB-Tumblebug Object Storage API configuration. Endpoint should include /tumblebug prefix (e.g., "http://localhost:1323/tumblebug").

type TumblebugProvider

type TumblebugProvider struct {
	// contains filtered or unexported fields
}

TumblebugProvider implements Provider for CB-Tumblebug Object Storage API. Based on CB-Tumblebug swagger.yaml [Infra Resource] Object Storage Management endpoints.

func NewTumblebugProvider

func NewTumblebugProvider(config *TumblebugConfig) (*TumblebugProvider, error)

NewTumblebugProvider creates a new TumblebugProvider.

func (*TumblebugProvider) GeneratePresignedURL

func (p *TumblebugProvider) GeneratePresignedURL(action, key string) (string, error)

GeneratePresignedURL generates a presigned URL via CB-Tumblebug API. Uses the new endpoint: GET /ns/{nsId}/resources/objectStorage/{osId}/object/{objectKey} Query parameters:

  • operation: "upload" or "download"
  • expires: expiration time in seconds (default: 3600)

func (*TumblebugProvider) GetBucket

func (p *TumblebugProvider) GetBucket() string

GetBucket returns the osId as the bucket identifier.

func (*TumblebugProvider) ListObjects

func (p *TumblebugProvider) ListObjects(prefix string) ([]ObjectInfo, error)

ListObjects lists objects via CB-Tumblebug API. Uses GET /ns/{nsId}/resources/objectStorage/{osId} to list objects in bucket.

type UnsupportedTransferError

type UnsupportedTransferError struct {
	SourceMethod      string
	DestinationMethod string
}

UnsupportedTransferError indicates that no executor is available for the given transfer combination.

func (*UnsupportedTransferError) Error

func (e *UnsupportedTransferError) Error() string

Directories

Path Synopsis
examples
object-storage command
Package fieldsec provides field-level encryption for Go structs using hybrid encryption.
Package fieldsec provides field-level encryption for Go structs using hybrid encryption.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL