Skip to content

Nomos Framework

Nomos is a custom event-sourcing framework purpose-built for this application. It combines deterministic replay, optimistic local-first collaboration, semantic conflict resolution, and full compile-time type safety without reflection.

Nomos is a local-first, intent-driven event-sourcing system that:

  • Enables offline-first collaboration with deterministic server-side validation
  • Uses a single-ledger model per timeline with append-only intents
  • Supports branching timelines for drafts, proposals, and conflict resolution
  • Maintains full audit trails with reversible operations (undo/redo)
  • Achieves zero reflection with compile-time registry systems
  • Ensures deterministic replay for testing, validation, and auditability

The framework decomposes user actions into a three-tier pipeline:

Intent (what user wants) → Directives (how to apply changes) → Events (immutable facts)

An Intent is a user-facing command that expresses what the user wants to do. It’s the unit of rollback and unit of audit.

Key characteristics:

  • User-initiated action (can be domain-specific)
  • Atomic boundary - all or nothing execution
  • Assigned a unique ID automatically (UUIDv7)
  • Includes context: workspace, timeline, actor, timestamps
  • Serializable for persistence and replay

Example:

class CreateTodoIntent extends Intent {
final String title;
CreateTodoIntent({required this.title});
@override
Map<String, dynamic> toJson() => {
'id': id,
'title': title,
'serialisationTargetClassName': 'CreateTodoIntent',
};
static CreateTodoIntent fromJson(Map<String, dynamic> json) {
return CreateTodoIntent(title: json['title'] as String);
}
@override
NomosIntentDescription describe(NomosIntentDescribeCtx ctx) =>
NomosIntentDescription(
template: '{actor} created todo "{title}"',
variables: {
'actor': ctx.scope.actorId.value,
'title': title,
},
);
}

Dispatching an Intent:

await app.dispatch(
CreateTodoIntent(title: 'Buy milk'),
ctx: IntentContext(
workspaceId: NomosWorkspaceId('workspace-123'),
timelineId: NomosTimelineId('main'),
actorId: NomosActorId('user-456'),
),
);

A Directive is an atomic mutation applied to a single aggregate. One intent may produce multiple directives targeting different aggregates.

Key characteristics:

  • Pure and deterministic - no side effects
  • Typed generically: Directive<TAgg, TPayload> where TAgg is the aggregate type and TPayload is the input
  • Two-phase execution:
    • Plan phase (impure): Read snapshots, permissions, features, dependencies
    • Execute phase (pure): Generate immutable events
  • Versioned for safe schema evolution

Structure:

class CreateTodoDirective extends Directive<TodoAggregate, CreateTodoPayload> {
final CreateTodoPayload payload;
CreateTodoDirective({required this.payload});
@override
int get contextVersion => 1;
/// Impure phase: determine what to execute and load existing state
@override
Future<List<ExecutionTarget<TodoAggregate, CreateTodoPayload>>> plan(
DirectiveCtx ctx,
IntentBase intent,
SnapshotReader snapshots,
Map<AggregateId, AggregateBase> stagedUpdates,
) async {
final aggregate = TodoAggregate.empty();
return [
ExecutionTarget<TodoAggregate, CreateTodoPayload>(
aggregateId: payload.entityId,
payload: payload,
snapshot: aggregate,
),
];
}
/// Pure phase: transform snapshot + payload into events
@override
List<Event> execute(TodoAggregate snapshot, CreateTodoPayload payload) =>
[TodoCreatedEvent(title: payload.title, createdAt: DateTime.now())];
}

The Two-Phase Model:

PhaseCharacteristicPurpose
PlanImpure, asyncResolve dependencies, check permissions, load state
ExecutePure, deterministicGenerate events from snapshot + payload

This separation ensures that:

  • Execution is deterministic and replayable
  • Side effects happen in controlled, auditable places
  • The same intent can be replayed to verify server-side validity

An Event is an immutable fact about what happened when a directive was applied. Events form the audit trail and are used to reconstruct aggregate state.

Key characteristics:

  • Immutable value objects
  • Carry only the data needed to reconstruct state
  • Include sequence numbers for ordering
  • Typed with factory registrations for deserialization

Example:

class TodoCreatedEvent extends Event {
final String title;
final DateTime createdAt;
TodoCreatedEvent({required this.title, required this.createdAt});
@override
Object get payload => {'title': title, 'createdAt': createdAt};
@override
int? get globalSequenceNumber => null;
@override
Event copyWith({int? globalSequenceNumber}) => this;
@override
Map<String, dynamic> toJson() => {
'title': title,
'createdAt': createdAt.toIso8601String(),
'serialisationTargetClassName': 'TodoCreatedEvent',
};
static TodoCreatedEvent fromJson(Map<String, dynamic> json) {
return TodoCreatedEvent(
title: json['title'] as String,
createdAt: DateTime.parse(json['createdAt'] as String),
);
}
}

An Aggregate is the root entity that holds state and can apply events. Aggregates are:

  • Typed: class MyAggregate extends Aggregate<MyAggregate>
  • Immutable in style (create new instances via apply())
  • Snapshotted for performance
  • Reconstructed by replaying events from directives

Example:

class TodoAggregate extends Aggregate<TodoAggregate> {
final String title;
final bool isCompleted;
final DateTime createdAt;
// For creating new aggregates from directives
TodoAggregate({
required this.title,
required this.isCompleted,
required this.createdAt,
}) : super();
// For snapshots during directive plan()
factory TodoAggregate.empty() => TodoAggregate._forSnapshot(
title: '',
isCompleted: false,
createdAt: DateTime.now(),
);
TodoAggregate._forSnapshot({
required this.title,
required this.isCompleted,
required this.createdAt,
}) : super.forSnapshot();
/// Apply an event to produce a new aggregate state
@override
TodoAggregate apply(Event event) {
if (event is TodoCreatedEvent) {
return TodoAggregate._withId(
id: id,
title: event.title,
isCompleted: false,
createdAt: event.createdAt,
);
} else if (event is TodoCompletedEvent) {
return TodoAggregate._withId(
id: id,
title: title,
isCompleted: true,
createdAt: createdAt,
);
}
return this;
}
@override
void validate() {
// Business rule validation
}
@override
Map<String, dynamic> toJson() => {
'id': id.value,
'title': title,
'isCompleted': isCompleted,
'createdAt': createdAt.toIso8601String(),
'serialisationTargetClassName': 'TodoAggregate',
};
static TodoAggregate fromJson(Map<String, dynamic> json, AggregateId id) {
return TodoAggregate._withId(
id: id,
title: json['title'] as String,
isCompleted: json['isCompleted'] as bool,
createdAt: DateTime.parse(json['createdAt'] as String),
);
}
}

Nomos uses a workspace/timeline model for isolation and branching:

A workspace is a multi-tenant boundary. Each workspace has its own isolated ledger and snapshots.

  • Enables complete data isolation between tenants
  • Each workspace has independent timelines
  • Can represent: organizations, teams, projects, or any isolation boundary

A timeline is a branching point within a workspace where state can diverge. Think of it as a version of the ledger.

  • Main timeline: The canonical, always-available timeline
  • Alternate timelines: Created during conflicts or for drafts/proposals
  • Siblings: Timelines can be merged or discarded
  • Snapshots: Cached state at each intent sequence for fast reads

In the ledger:

workspaces/{workspaceId}/
timelines/{timelineId}/
intents/{sequence} # append-only command history
aggregates/{id}/
snapshots/{sequence} # cached state for fast reads

Every intent requires a context that specifies where and who:

final ctx = IntentContext(
workspaceId: NomosWorkspaceId('my-workspace'),
timelineId: NomosTimelineId('main'),
actorId: NomosActorId('user-123'),
// Optional: trace request chains
correlationId: NomosCorrelationId('request-abc'),
causationId: NomosCausationId('intent-xyz'),
// Defaults to now if not specified
nomosClientTimestamp: DateTime.now(),
);
// Set a default context to avoid repeating it
app.setDefaultIntentContext(ctx);
// Then dispatch without ctx parameter
await app.dispatch(CreateTodoIntent(title: 'Task'));

The normal flow: dispatch an intent, wait for it to be written to the ledger, then return the result.

final entry = await app.dispatch(
CreateTodoIntent(title: 'Buy milk'),
ctx: IntentContext(
workspaceId: NomosWorkspaceId('ws-1'),
timelineId: NomosTimelineId('main'),
actorId: NomosActorId('user-1'),
),
);
// entry.success tells you if the intent succeeded
// entry.intent.id is the intent ID for tracing
// entry.workspaceIntentSequence is the sequence number
print('Sequence: ${entry.workspaceIntentSequence}');

Retry Logic:

  • If a dispatch fails due to a concurrency conflict (LedgerWriteConflict), it automatically retries up to 10 times
  • Conflicts are brief - another client wrote first; retry after 1ms

For responsive UIs, dispatch optimistically while the actual write happens in the background:

final optimistic = await app.dispatchOptimistic(
CreateTodoIntent(title: 'Buy milk'),
ctx: IntentContext(...),
);
// The optimistic result contains what WOULD happen
// plus the new aggregate state ready to display
print('Optimistic aggregates: ${optimistic.aggregates}');
// The framework automatically buffers this state for queries
// until the real ledger write confirms it

Use case: Mobile and offline clients that need instant UI feedback.

If you’ve already computed the optimistic result, pass it to avoid re-computing:

final optimisticAgg = TodoAggregate(
title: 'Buy milk',
isCompleted: false,
createdAt: DateTime.now(),
);
await app.dispatch(
CreateTodoIntent(title: 'Buy milk'),
ctx: context,
optimisticAggregates: {
todoId: optimisticAgg,
},
);
final todo = await app.readLatest<TodoAggregate>(
NomosWorkspaceId('ws-1'),
NomosTimelineId('main'),
AggregateId('todo-123'),
);
if (todo != null) {
print('Title: ${todo.title}');
}

Reconstruct the entire state as of a specific intent sequence number:

final state = await app.stateAtSequence(
NomosWorkspaceId('ws-1'),
NomosTimelineId('main'),
inclusiveSequence: 42,
useSnapshots: true, // prefer cached snapshots
);
for (final agg in state.values) {
print('Aggregate: ${agg.id}');
}

Watch a collection of aggregates with filtering and ordering:

final stream = app.watchQuery<TodoAggregate>(
NomosWorkspaceId('ws-1'),
NomosTimelineId('main'),
BasicQuery(
limit: 50,
descending: true,
orderByField: 'createdAt',
),
);
stream.listen((todos) {
print('Got ${todos.length} todos');
for (final todo in todos) {
print('- ${todo.title} (${todo.isCompleted ? 'done' : 'pending'})');
}
});

Features:

  • Debounced emissions (16ms) to prevent UI thrashing
  • Merges optimistic changes with confirmed state
  • Seeded with latest snapshot for fast first paint
  • Live updates via watchers if configured
app.watchTimeline(
workspaceId: NomosWorkspaceId('ws-1'),
timelineId: NomosTimelineId('main'),
startingWorkspaceIntentSequence: 0,
).listen((entry) {
print('Intent: ${entry.intent.runtimeType} by ${entry.actorId}');
print('Success: ${entry.success}');
print('Sequence: ${entry.workspaceIntentSequence}');
});

Nomos supports reversible operations through system intents:

final undoEntry = await app.undoLatest(
workspaceId: NomosWorkspaceId('ws-1'),
timelineId: NomosTimelineId('main'),
actorId: NomosActorId('user-1'), // optional: undo only user's intents
);
if (undoEntry != null) {
print('Undone intent: ${undoEntry.strikeOutIntentIds}');
}

How it works:

  1. Creates a system UndoIntent that “strikes out” the target
  2. Struck-out intents are skipped during replay (no effect)
  3. Snapshots are recomputed to reflect the new state
if (await app.canUndoLatest(
workspaceId: NomosWorkspaceId('ws-1'),
timelineId: NomosTimelineId('main'),
)) {
// Show undo button
}
final redoEntry = await app.redoLatest(
workspaceId: NomosWorkspaceId('ws-1'),
timelineId: NomosTimelineId('main'),
);
final app = NomosApp(
// Persistence backend
persistence: MemoryPersistence(), // or FirestorePersistence, etc.
// Policy router (intent → directives resolution)
policy: MyPolicyV1(),
// Domain, contract, and intent modules
domains: [MyDomainV1()],
contracts: [MyContractsV1()],
intents: [MyIntentsV1()],
// Optional features
options: NomosOptions(
enableBlobOffload: true, // offload large payloads
blobOffloadThresholdBytes: 65536, // threshold in bytes
enableResilientSnapshots: true, // retry on snapshot read failure
isServerRuntime: false, // set true for server validators
),
blobStorage: MyBlobStorage(), // for large file support
);

Each module registers its types:

class MyIntentsV1 extends IntentsModule {
@override
void registerIntents() {
IntentFactoryRegistry.register<CreateTodoIntent>(
(json) => CreateTodoIntent.fromJson(json),
);
TypeRegistry.register<CreateTodoIntent>();
}
}
class MyDomainV1 extends DomainModule {
@override
void registerDomainTypes() {
// Aggregates
AggregateFactoryRegistry.registerFromJson<TodoAggregate>(
(json, id) => TodoAggregate.fromJson(json, id),
);
// Directives
DirectiveFactoryRegistry.register<CreateTodoDirective>(
(json) => CreateTodoDirective.fromJson(json),
);
// Events
EventFactoryRegistry.register<TodoCreatedEvent>(
(json) => TodoCreatedEvent.fromJson(json),
);
TypeRegistry.register<TodoAggregate>();
TypeRegistry.register<TodoCreatedEvent>();
TypeRegistry.register<CreateTodoDirective>();
}
}
class MyContractsV1 extends ContractsModule {
@override
void registerContracts() {
PayloadFactoryRegistry.register<CreateTodoPayload>(
(json) => CreateTodoPayload.fromJson(json),
);
TypeRegistry.register<CreateTodoPayload>();
}
}

Here’s the complete journey of an intent:

1. User Action
2. Create Intent (assigned UUIDv7 ID)
└─ e.g., CreateTodoIntent(title: "Buy milk")
3. Dispatch to NomosApp
└─ ctx: IntentContext(workspace, timeline, actor)
4. Policy Router Resolves Intent → Directives
└─ Determines what aggregate changes are needed
5. For Each Directive:
├─ PLAN PHASE (Impure)
│ ├─ Load snapshots from SnapshotReader
│ ├─ Check permissions, features, dependencies
│ ├─ Return ExecutionTargets (aggregate ID + payload + snapshot)
└─ EXECUTE PHASE (Pure)
├─ Apply directive.execute(snapshot, payload)
├─ Receive list of Events
└─ No side effects, deterministic
6. Persist to Ledger (Append-Only)
├─ Write IntentLedgerEntry to ledger
├─ Contains: intent, directives, events, metadata
└─ Assigned workspaceIntentSequence
7. Generate Snapshots
├─ Write SnapshotWriter.saveAtIntent()
├─ Cache state for fast reads
└─ Mark with sequence number
8. Return IntentLedgerEntry to Caller
└─ Success, sequence, audit trail ready

Intents can be marked for server-only execution:

class ValidatePaymentIntent extends Intent {
final String orderId;
@override
bool get requiresServerExecution => true;
// ... rest of intent implementation
}

On the client:

  • If no ServerIntentTransport is configured, throws an error
  • If configured, sends intent to server for execution

On the server:

  • Set NomosOptions(isServerRuntime: true) to execute locally
  • Can re-validate deterministically using same domain models

For payloads larger than a threshold:

// Upload a blob first
final blobRef = await app.blob?.uploadBlob(
bytes: pdfData,
contentType: 'application/pdf',
);
// Reference it in your intent
await app.dispatch(
AttachmentIntent(
blobRef: blobRef!,
fileName: 'contract.pdf',
),
);

How it works:

  • Blobs stored separately (GCS, S3, etc.)
  • Ledger entry keeps content hash/URI
  • Lazy deserialization when needed
  • Keeps ledger lightweight

When two clients write to the same timeline simultaneously:

  1. First writer wins: Gets sequence number, second gets LedgerWriteConflict
  2. Automatic retry: Nomos retries up to 10 times with 1ms backoff
  3. Manual merge: If persistent conflicts, consider creating alternate timeline:
    // Client A and B both tried to update same aggregate
    // Create a draft timeline to merge changes
    final draftTimeline = NomosTimelineId('draft-merge-${DateTime.now().millisecondsSinceEpoch}');
    // Apply B's changes to draft
    await app.dispatch(BIntent(...), ctx: context.copyWith(timelineId: draftTimeline));
    // Later, manually merge or promote draft to main

One intent can modify multiple aggregates:

class AssignSiteToUserIntent extends Intent {
final AggregateId siteId;
final AggregateId userId;
@override
Map<String, dynamic> toJson() => {
'id': id,
'siteId': siteId.value,
'userId': userId.value,
'serialisationTargetClassName': 'AssignSiteToUserIntent',
};
// ... fromJson, describe
}
// The policy router produces multiple directives:
class MyPolicyV1 extends PolicyRouter {
@override
Future<List<DirectiveBase>> resolve(
IntentBase intent,
DirectiveCtx ctx,
) async {
if (intent is AssignSiteToUserIntent) {
return [
UpdateSiteDirective(payload: ...),
UpdateUserDirective(payload: ...),
];
}
// ...
}
}

Both directives are planned and executed, updating both aggregates atomically within the intent.

Reconstruct state at any point in the past:

// What was the state 50 intents ago?
final historicalState = await app.stateAtSequence(
workspace,
timeline,
inclusiveSequence: currentSequence - 50,
);
// What intents touched this aggregate?
final history = await app.aggregateIntentHistory(
workspaceId: workspace,
timelineId: timeline,
aggregateId: todoId,
startingWorkspaceIntentSequence: 0,
endingWorkspaceIntentSequence: 100,
);
for (final entry in history) {
print('${entry.workspaceIntentSequence}: ${entry.intent.runtimeType}');
}

Test directives deterministically without the framework:

final directive = CreateTodoDirective(
payload: CreateTodoPayload(title: 'Test', entityId: todoId),
);
final events = directive.execute(
TodoAggregate.empty(),
payload,
);
expect(events.length, equals(1));
expect(events[0], isA<TodoCreatedEvent>());
test('creates todo and updates list', () async {
final app = NomosApp(
persistence: MemoryPersistence(),
policy: MyPolicyV1(),
domains: [MyDomainV1()],
contracts: [MyContractsV1()],
intents: [MyIntentsV1()],
);
final ctx = IntentContext.fromStrings(
workspaceId: 'test-ws',
timelineId: 'main',
actorId: 'test-user',
);
await app.dispatch(CreateTodoIntent(title: 'Buy milk'), ctx: ctx);
final todos = await app.watchQuery<TodoAggregate>(
ctx.workspaceId,
ctx.timelineId,
null,
).first;
expect(todos.length, equals(1));
expect(todos[0].title, equals('Buy milk'));
});

Deterministic Testing with Custom ID Generator

Section titled “Deterministic Testing with Custom ID Generator”
final ctx = IntentContext(
workspaceId: NomosWorkspaceId('test-ws'),
timelineId: NomosTimelineId('main'),
actorId: NomosActorId('test-user'),
idGenerator: () => 'fixed-id-for-test', // always generates same ID
);

Nomos provides:

FeatureBenefit
Intent-drivenUser intent is the unit of audit and rollback
Event-sourcedComplete immutable history of all changes
Zero reflectionCompile-time safety with registry patterns
DeterministicSame intent + snapshot = same events (replayable)
Offline-firstDispatch intents locally, sync later
BranchableWorkspaces and timelines for isolation and drafts
ReversibleUndo/redo via system intents
Multi-tenantWorkspace-based isolation
Type-safeFull Dart generic type safety end-to-end
  • CO2 Client SDK - Client-side SDK
  • Contracts - Shared type definitions
  • Intents Reference - All available intents
  • Nomos Core Package: /dart_packages/nomos/packages/nomos_core
  • Todo Example: /dart_packages/nomos/packages/todo_example