Skip to content

dst

dst

import "github.com/danmestas/libfossil/dst"

Index

func AllBlobUUIDsSorted

func AllBlobUUIDsSorted(r *repo.Repo) ([]string, error)

AllBlobUUIDsSorted returns a sorted slice of UUIDs for deterministic comparison.

func CheckBlobIntegrity

func CheckBlobIntegrity(nodeID string, r *repo.Repo) error

CheckBlobIntegrity verifies that every blob in the repo has a UUID matching the hash of its expanded content. This catches corruption from buggify (content.Expand byte-flip) or storage bugs.

func CheckConvergence

func CheckConvergence(master *repo.Repo, leaves map[NodeID]*repo.Repo) error

CheckConvergence verifies that every leaf repo contains the same set of artifact UUIDs as the master repo.

func CheckDeltaChains

func CheckDeltaChains(nodeID string, r *repo.Repo) error

CheckDeltaChains verifies that every delta’s srcid points to an existing blob (no dangling references).

func CheckNoOrphanPhantoms

func CheckNoOrphanPhantoms(nodeID string, r *repo.Repo) error

CheckNoOrphanPhantoms verifies that phantom entries reference blobs that actually exist in the blob table (they’re just missing content).

func CheckSubsetOf

func CheckSubsetOf(master *repo.Repo, leaves map[NodeID]*repo.Repo) error

CheckSubsetOf verifies that all artifacts in the master are present in every leaf. Unlike CheckConvergence, this allows leaves to have extra artifacts (useful when only pull is being tested).

func CheckTableSyncConvergence

func CheckTableSyncConvergence(repos map[string]*repo.Repo) error

CheckTableSyncConvergence verifies that all repos have identical rows for every shared synced table. Tables present in some repos but not others are skipped (schema may still be propagating).

func CheckTableSyncIntegrity

func CheckTableSyncIntegrity(nodeID string, r *repo.Repo) error

CheckTableSyncIntegrity verifies that: 1. Every row in every synced table has a valid PK hash. 2. Every row’s mtime is positive. 3. The computed catalog hash is 40 hex chars.

func CheckTagxrefIntegrity

func CheckTagxrefIntegrity(nodeID string, r *repo.Repo) error

CheckTagxrefIntegrity verifies that: 1. Every tagxref.rid references a valid blob 2. Every tagxref.tagid references a valid tag 3. Propagated entries (srcid=0) have tagtype=2 4. No tagxref.rid references a phantom blob

func CheckUVConvergence

func CheckUVConvergence(master *repo.Repo, leaves map[NodeID]*repo.Repo) error

CheckUVConvergence verifies that all repos have identical UV content hashes.

func CheckUVIntegrity

func CheckUVIntegrity(nodeID string, r *repo.Repo) error

CheckUVIntegrity verifies that every non-tombstone entry in the unversioned table has a hash matching the SHA1 of its decompressed content, and sz matches the content length.

func CountBlobs

func CountBlobs(r *repo.Repo) (int, error)

CountBlobs returns the number of non-phantom blobs in the repo.

func HasBlob

func HasBlob(r *repo.Repo, uuid string) bool

HasBlob checks if a specific artifact exists in the repo.

type Action

Action is the result of processing a single event via Tick.

type Action struct {
    Type   ActionType
    Result *sync.SyncResult // non-nil when Type == ActionSynced
    Err    error            // non-nil on sync failure
}

type ActionType

ActionType classifies the result of processing an event.

type ActionType int

const (
    ActionNone   ActionType = iota
    ActionSynced            // a sync round was executed
)

type DefaultNode

DefaultNode is a simple Node that calls sync.Sync directly. No NATS, no HTTP — just a repo + transport.

type DefaultNode struct {
    // contains filtered or unexported fields
}

func NewDefaultNode

func NewDefaultNode(r *repo.Repo, t sync.Transport, projectCode, serverCode string, opts DefaultNodeOpts) *DefaultNode

NewDefaultNode creates a Node backed by sync.Sync.

func (*DefaultNode) Repo

func (n *DefaultNode) Repo() *repo.Repo

Repo returns the node’s Fossil repository.

func (*DefaultNode) Tick

func (n *DefaultNode) Tick(ctx context.Context, event EventType) Action

Tick executes a sync cycle for the given event.

type DefaultNodeOpts

DefaultNodeOpts configures a DefaultNode.

type DefaultNodeOpts struct {
    Push       bool
    Pull       bool
    UV         bool
    XTableSync bool
    Private    bool
    Buggify    sync.BuggifyChecker
}

type Event

Event is a scheduled occurrence in the simulation.

type Event struct {
    Time    time.Time
    Type    EventType
    NodeID  NodeID
    UVName  string // for EvUVWrite/EvUVDelete: file name
    UVData  []byte // for EvUVWrite: file content
    UVMTime int64  // for EvUVWrite/EvUVDelete: mtime
}

type EventQueue

EventQueue is a min-heap of events ordered by time.

type EventQueue []*Event

func (EventQueue) Len

func (q EventQueue) Len() int

func (EventQueue) Less

func (q EventQueue) Less(i, j int) bool

func (*EventQueue) Pop

func (q *EventQueue) Pop() any

func (*EventQueue) PopEvent

func (q *EventQueue) PopEvent() *Event

PopEvent removes and returns the earliest event.

func (*EventQueue) Push

func (q *EventQueue) Push(x any)

func (*EventQueue) PushEvent

func (q *EventQueue) PushEvent(e *Event)

PushEvent adds an event to the queue.

func (EventQueue) Swap

func (q EventQueue) Swap(i, j int)

type EventType

EventType classifies simulation events.

type EventType int

const (
    EvTimer    EventType = iota // leaf poll timer fired
    EvSyncNow                   // manual sync trigger for a leaf
    EvUVWrite                   // write a UV file to a node's repo
    EvUVDelete                  // delete a UV file from a node's repo
)

type InvariantError

InvariantError records which invariant failed, on which node, and why.

type InvariantError struct {
    Invariant string
    NodeID    string // "master" or leaf ID
    Detail    string
}

func (*InvariantError) Error

func (e *InvariantError) Error() string

type MockFossil

MockFossil simulates a Fossil master server using the real HandleSync handler. It manages its own repo and implements sync.Transport by dispatching xfer messages to HandleSyncWithOpts.

type MockFossil struct {
    // contains filtered or unexported fields
}

func NewMockFossil

func NewMockFossil(r *repo.Repo) *MockFossil

NewMockFossil creates a MockFossil backed by the given repo.

func (*MockFossil) Exchange

func (f *MockFossil) Exchange(ctx context.Context, req *xfer.Message) (*xfer.Message, error)

Exchange handles one xfer request/response round by delegating to the real HandleSyncWithOpts. This ensures the DST tests exercise the same code path as production servers.

func (*MockFossil) Repo

func (f *MockFossil) Repo() *repo.Repo

Repo returns the mock fossil’s repository (for seeding and invariants).

func (*MockFossil) SetBuggify

func (f *MockFossil) SetBuggify(b libsync.BuggifyChecker)

SetBuggify configures fault injection for the handler.

func (*MockFossil) StoreArtifact

func (f *MockFossil) StoreArtifact(data []byte) (string, error)

StoreArtifact adds a raw artifact to the mock fossil’s repo. Returns the UUID. Used by tests to seed the master with content.

type Node

Node represents a sync participant in the simulation. The simulator drives events by calling Tick; the Node executes a sync cycle and reports the result. EdgeSync can implement this by wrapping its agent.Agent; the DST provides a DefaultNode for in-module testing.

type Node interface {
    Tick(ctx context.Context, event EventType) Action
    Repo() *repo.Repo
}

type NodeID

NodeID identifies a node in the simulation.

type NodeID string

type PeerNetwork

PeerNetwork simulates a peer-to-peer network where each leaf’s Exchange calls HandleSync on the designated peer’s repo. No bridge, no central server — pure leaf-to-leaf sync under DST control.

type PeerNetwork struct {
    // contains filtered or unexported fields
}

func NewPeerNetwork

func NewPeerNetwork(rng *rand.Rand) *PeerNetwork

NewPeerNetwork creates a simulated peer network.

func (*PeerNetwork) AddPeer

func (n *PeerNetwork) AddPeer(id NodeID, r *repo.Repo)

AddPeer registers a leaf’s repo as a sync target for other leaves.

func (*PeerNetwork) Heal

func (n *PeerNetwork) Heal(id NodeID)

Heal removes a node from the partition set.

func (*PeerNetwork) HealAll

func (n *PeerNetwork) HealAll()

HealAll removes all partitions.

func (*PeerNetwork) Partition

func (n *PeerNetwork) Partition(id NodeID)

Partition isolates a node — all messages to/from it are dropped.

func (*PeerNetwork) SetBuggify

func (n *PeerNetwork) SetBuggify(b libsync.BuggifyChecker)

SetBuggify configures fault injection for the handler.

func (*PeerNetwork) SetDropRate

func (n *PeerNetwork) SetDropRate(rate float64)

SetDropRate sets the probability that any message is dropped.

func (*PeerNetwork) Transport

func (n *PeerNetwork) Transport(source, target NodeID) *PeerTransport

Transport returns a sync.Transport for the given source node that routes to a specific target peer’s HandleSync.

type PeerTransport

PeerTransport implements sync.Transport by routing through the PeerNetwork to a specific target peer.

type PeerTransport struct {
    // contains filtered or unexported fields
}

func (*PeerTransport) Exchange

func (t *PeerTransport) Exchange(ctx context.Context, req *xfer.Message) (*xfer.Message, error)

Exchange sends a request through the peer network to the target’s HandleSync.

type SeededBuggify

SeededBuggify implements sync.BuggifyChecker with a deterministic PRNG.

type SeededBuggify struct {
    // contains filtered or unexported fields
}

func (*SeededBuggify) Check

func (b *SeededBuggify) Check(_ string, probability float64) bool

Check returns true with the given probability, using the seeded PRNG.

type SimConfig

SimConfig configures a simulation run.

type SimConfig struct {
    Seed                int64
    NumLeaves           int
    PollInterval        time.Duration
    TmpDir              string            // directory for repo files
    Upstream            libsync.Transport // mock Fossil master
    Buggify             bool              // enable BUGGIFY fault injection
    UV                  bool              // sync unversioned files
    Private             bool              // sync private artifacts
    SafetyCheckInterval int               // run CheckSafety() every N steps; 0 = disabled
}

type SimNetwork

SimNetwork simulates the network between leaf agents and the upstream transport (mock Fossil master). It supports message dropping, response truncation, and network partitions, controlled by a seeded PRNG for deterministic behavior.

In the original EdgeSync DST, messages routed through a Bridge that forwarded to the upstream. Since Bridge.HandleRequest was just upstream.Exchange, SimNetwork now calls the upstream directly.

type SimNetwork struct {
    // contains filtered or unexported fields
}

func NewSimNetwork

func NewSimNetwork(rng *rand.Rand, upstream libsync.Transport) *SimNetwork

NewSimNetwork creates a simulated network connected to the given upstream transport.

func (*SimNetwork) Heal

func (n *SimNetwork) Heal(id NodeID)

Heal removes a node from the partition set.

func (*SimNetwork) HealAll

func (n *SimNetwork) HealAll()

HealAll removes all partitions.

func (*SimNetwork) Partition

func (n *SimNetwork) Partition(id NodeID)

Partition isolates a node — all messages to/from it are dropped.

func (*SimNetwork) SetDropRate

func (n *SimNetwork) SetDropRate(rate float64)

SetDropRate sets the probability that any message is dropped entirely.

func (*SimNetwork) SetTruncateRate

func (n *SimNetwork) SetTruncateRate(rate float64)

SetTruncateRate sets the probability that a response is truncated (random suffix of cards dropped). Simulates partial delivery.

func (*SimNetwork) Transport

func (n *SimNetwork) Transport(nodeID NodeID) *SimTransport

Transport returns a sync.Transport for the given node that routes messages through this simulated network to the upstream.

type SimTransport

SimTransport implements sync.Transport by routing through the SimNetwork.

type SimTransport struct {
    // contains filtered or unexported fields
}

func (*SimTransport) Exchange

func (t *SimTransport) Exchange(ctx context.Context, req *xfer.Message) (*xfer.Message, error)

Exchange sends a request through the simulated network to the upstream.

type Simulator

Simulator drives a deterministic simulation of multiple leaf nodes syncing through a simulated network to an upstream transport (mock Fossil master).

type Simulator struct {
    Seed int64

    // Stats
    Steps         int
    TotalSyncs    int
    TotalErrors   int
    TotalUVSent   int
    TotalUVRecvd  int
    TotalUVGimmes int
    // contains filtered or unexported fields
}

func New

func New(cfg SimConfig) (*Simulator, error)

New creates a Simulator with the given configuration. It creates leaf repos, nodes, and the simulated network. All I/O is local SQLite — no NATS or HTTP connections are made.

func (*Simulator) CheckAllConverged

func (s *Simulator) CheckAllConverged(master *repo.Repo) error

CheckAllConverged checks convergence between master and all leaves. Only meaningful after a fault-free sync period.

func (*Simulator) CheckAllTableSyncConverged

func (s *Simulator) CheckAllTableSyncConverged() error

CheckAllTableSyncConverged checks table sync convergence across all nodes.

func (*Simulator) CheckAllUVConverged

func (s *Simulator) CheckAllUVConverged(master *repo.Repo) error

CheckAllUVConverged checks UV convergence between master and all leaves.

func (*Simulator) CheckSafety

func (s *Simulator) CheckSafety() error

CheckSafety runs all safety invariants on every node in the simulation.

func (*Simulator) CheckTombstoneConvergence

func (s *Simulator) CheckTombstoneConvergence(t *testing.T, masterRepo *repo.Repo, tableName string, def repo.TableDef)

CheckTombstoneConvergence verifies that all nodes agree on which rows are tombstones for each synced table. Two nodes may have different catalog hashes during convergence, but once converged, their tombstone sets must match.

func (*Simulator) Clock

func (s *Simulator) Clock() *simio.SimClock

Clock returns the simulator’s virtual clock.

func (*Simulator) Close

func (s *Simulator) Close() error

Close cleans up all leaf node repos and disables buggify. Iterates leafIDs (not map) for deterministic error reporting.

func (*Simulator) Leaf

func (s *Simulator) Leaf(id NodeID) Node

Leaf returns the node for the given node ID.

func (*Simulator) LeafIDs

func (s *Simulator) LeafIDs() []NodeID

LeafIDs returns the ordered list of leaf node IDs.

func (*Simulator) Network

func (s *Simulator) Network() *SimNetwork

Network returns the simulated network for fault injection.

func (*Simulator) Run

func (s *Simulator) Run(maxSteps int) error

Run processes up to maxSteps events. Returns nil on success or the first invariant/error encountered.

func (*Simulator) RunUntil

func (s *Simulator) RunUntil(deadline time.Time) error

RunUntil processes events until the clock reaches the given time.

func (*Simulator) ScheduleSyncNow

func (s *Simulator) ScheduleSyncNow(id NodeID)

ScheduleSyncNow injects a SyncNow event for the given leaf at the current time.

func (*Simulator) ScheduleUVDelete

func (s *Simulator) ScheduleUVDelete(id NodeID, at time.Time, name string, mtime int64)

ScheduleUVDelete injects a UV delete event for the given node at the specified time.

func (*Simulator) ScheduleUVWrite

func (s *Simulator) ScheduleUVWrite(id NodeID, at time.Time, name string, data []byte, mtime int64)

ScheduleUVWrite injects a UV write event for the given node at the specified time.

func (*Simulator) SetMasterRepo

func (s *Simulator) SetMasterRepo(r *repo.Repo)

SetMasterRepo registers the master repo for UV events targeting “master”.

func (*Simulator) Step

func (s *Simulator) Step() (bool, error)

Step processes the next event in the queue. Returns false if the queue is empty.

Generated by gomarkdoc