Skip to content

CO2 Client SDK

The CO2 Nomos Client SDK (co2_nomos_client) is a reusable, type-safe client library for accessing CO2 domain operations through the Nomos event-sourcing framework. It provides unified access across multiple consumers: admin app, server, CLI, and other integrations.

The SDK is built on top of Nomos, inheriting its event-sourcing, deterministic replay, and offline-first capabilities while adding CO2-specific domain knowledge and query helpers.

The CO2 Client SDK provides:

  • Type-safe queries: Query estates, sites, and attachments with full type safety
  • Intent dispatch: Send domain commands with auditable context and automatic conflict resolution
  • Workspace isolation: Multi-tenant support with estate-based workspaces
  • Domain modules: Pre-registered access to all 13 CO2 domains (estate structures, assets, licensing, permissions, etc.)
  • Real-time subscriptions: Watch aggregates and react to changes via streams
  • Aggregate lifecycle: Revival of soft-deleted items and intent history tracking
  • Server transport: REST-based transport for executing server-only intents
  • Logging: Pluggable logger for SDK diagnostics

Add to pubspec.yaml:

dependencies:
co2_nomos_client:
path: path/to/dart_packages/co2/client/co2_nomos_client
import 'package:co2_nomos_client/co2_nomos_client.dart';
import 'package:nomos_persistence_firestore/nomos_persistence_firestore.dart';
// Create SDK instance with Firestore persistence
final sdk = await Co2NomosApp.create(
persistence: FirestorePersistence(),
blobStorage: FirebaseBlobStorage(),
enableDebugLogging: true,
);
// Optional: inject a logger
sdk.log('info', 'SDK initialized');
// Create an estate if it doesn't exist (idempotent)
await sdk.estates.ensure(
estateId: EstateId('estate-123'),
name: 'Headquarters Campus',
description: 'Main office building',
actorId: 'user-456',
);
// Stream updates to an estate
sdk.estates.watch(EstateId('estate-123')).listen((estate) {
print('Estate: ${estate.name}');
print('Description: ${estate.description}');
});
// Get all sites in an estate as a stream
sdk.sites.watchAll(EstateId('estate-123')).listen((sites) {
print('Found ${sites.length} sites');
for (final site in sites) {
print('- ${site.siteId}: ${site.displayName}');
}
});
import 'package:intents_v1/intents_v1.dart';
import 'package:contracts_v1/contracts_v1.dart';
import 'package:nomos_core/nomos_core.dart';
// Create and dispatch an intent
final entry = await sdk.dispatch(
CreateEstateIntent(
estateId: EstateId('new-estate'),
name: EstateName('New Property'),
description: EstateDescription('A new estate'),
address: null,
),
ctx: IntentContext(
workspaceId: EstateId('new-estate').toWorkspace(),
timelineId: Co2Workspace.defaultTimeline,
actorId: NomosConversionUtils.userIdToActorId(UserId('user-123')),
),
);
print('Intent sequence: ${entry.workspaceIntentSequence}');
print('Success: ${entry.success}');

The main entry point for SDK functionality.

static Future<Co2NomosApp> create({
required NomosPersistence persistence,
required BlobStorage blobStorage,
ServerIntentTransport? serverTransport,
Co2Logger? logger,
bool enableDebugLogging = false,
bool isServerRuntime = false,
}) async

Parameters:

  • persistence - The Nomos persistence layer (Firestore, Memory, etc.)
  • blobStorage - Implementation for storing large files and binary data
  • serverTransport - Optional transport for server-only intents (e.g., Co2RestIntentTransport)
  • logger - Custom logger function for SDK diagnostics
  • enableDebugLogging - Enable verbose Nomos framework logging
  • isServerRuntime - Set to true when running on server (enables optimizations)

These are the main entry points for accessing CO2 data:

/// Type-safe estate queries
final EstateQueries estates = sdk.estates;
/// Type-safe site queries
final SiteQueries sites = sdk.sites;
/// Type-safe attachment queries
final AttachmentQueries attachments = sdk.attachments;
/// Aggregate lifecycle commands (revive, history)
final AggregateCommands aggregates = sdk.aggregates;

Dispatch an intent:

Future<IntentLedgerEntry> dispatch(
IntentBase intent,
{required IntentContext ctx},
)

Dispatch without waiting (fire-and-forget):

void dispatchAsync(
IntentBase intent,
{required IntentContext ctx},
)

Watch a generic query:

Stream<List<T>> watchQuery<T extends Aggregate<T>>({
required NomosWorkspaceId workspaceId,
required NomosTimelineId timelineId,
NomosQuery? query,
})

Read latest state of an aggregate:

Future<T?> readLatest<T extends AggregateBase>({
required NomosWorkspaceId workspaceId,
required NomosTimelineId timelineId,
required AggregateId aggregateId,
})

Get entire workspace state:

Future<Map<AggregateId, AggregateBase>> state(
NomosWorkspaceId workspaceId,
NomosTimelineId timelineId,
{bool useSnapshots = true},
)

Ensure an estate exists (idempotent):

Future<void> ensureEstate({
required EstateId estateId,
required String name,
String? description,
required String actorId,
})

Clean up resources:

void dispose()

Clear test state (in-memory persistence only):

Future<void> clearTestState()

Type-safe query operations for estate aggregates.

class EstateQueries {
/// Watch an estate by ID - emits on every change
Stream<EstateAggregate> watch(EstateId estateId)
/// Read current state of an estate
Future<EstateAggregate?> read(EstateId estateId)
/// Ensure an estate exists, creating if necessary (idempotent)
Future<void> ensure({
required EstateId estateId,
required String name,
String? description,
required String actorId,
})
}

Example:

// Watch a single estate
sdk.estates.watch(EstateId('estate-1')).listen((estate) {
print('Estate name: ${estate.name}');
});
// Read estate once
final estate = await sdk.estates.read(EstateId('estate-1'));
if (estate != null) {
print('Estate found: ${estate.name}');
}

Type-safe query operations for site (location) aggregates.

class SiteQueries {
/// Watch all sites in an estate
Stream<List<SiteRootAggregate>> watchAll(EstateId estateId)
/// Watch a specific site - emits on change or null if not found
Stream<SiteRootAggregate?> watch(EstateId estateId, SiteId siteId)
/// Read current state of a site
Future<SiteRootAggregate?> read(EstateId estateId, SiteId siteId)
}

Example:

// Watch all sites in an estate
sdk.sites.watchAll(EstateId('estate-1')).listen((sites) {
for (final site in sites) {
print('Site ${site.siteId}: ${site.displayName}');
}
});
// Watch a specific site
sdk.sites.watch(
EstateId('estate-1'),
SiteId('site-1'),
).listen((site) {
if (site != null) {
print('Site updated: ${site.displayName}');
}
});
// Read once
final site = await sdk.sites.read(
EstateId('estate-1'),
SiteId('site-1'),
);

Type-safe query operations for files (attachments) and folders.

class AttachmentQueries {
/// Watch all files for an estate
Stream<List<AttachmentAggregate>> watchFiles(EstateId estateId)
/// Watch all folders for an estate
Stream<List<FolderAggregate>> watchFolders(EstateId estateId)
/// Read a specific file by ID
Future<AttachmentAggregate?> readFile(
EstateId estateId,
AttachmentId attachmentId,
)
/// Read a specific folder by ID
Future<FolderAggregate?> readFolder(
EstateId estateId,
FolderId folderId,
)
}

Example:

// Watch all files
sdk.attachments.watchFiles(EstateId('estate-1')).listen((files) {
print('Total files: ${files.length}');
for (final file in files) {
print('- ${file.id}: ${file.metadata}');
}
});
// Watch all folders
sdk.attachments.watchFolders(EstateId('estate-1')).listen((folders) {
print('Folders: ${folders.length}');
});
// Read a specific file
final file = await sdk.attachments.readFile(
EstateId('estate-1'),
AttachmentId('file-123'),
);

Commands for managing aggregate lifecycle (reviving soft-deleted items, querying history).

class AggregateCommands {
/// Revive a soft-deleted aggregate
Future<IntentLedgerEntry> revive({
required EstateId estateId,
required NomosTimelineId timelineId,
required AggregateId aggregateId,
required Type aggregateType,
})
/// Revive a site specifically
Future<IntentLedgerEntry> reviveSite({
required EstateId estateId,
required SiteId siteId,
})
/// Get intent history for an aggregate
Future<List<IntentLedgerEntry>> history({
required NomosWorkspaceId workspaceId,
required NomosTimelineId timelineId,
required AggregateId aggregateId,
})
/// Get intent history for an estate aggregate (convenience method)
Future<List<IntentLedgerEntry>> estateHistory({
required EstateId estateId,
required AggregateId aggregateId,
})
}

Example:

// Revive a deleted site
await sdk.aggregates.reviveSite(
estateId: EstateId('estate-1'),
siteId: SiteId('site-1'),
);
// Get audit trail for a site
final history = await sdk.aggregates.estateHistory(
estateId: EstateId('estate-1'),
aggregateId: SiteId('site-1').toAggregateId(),
);
for (final entry in history) {
print('Sequence ${entry.workspaceIntentSequence}: '
'${entry.intent.runtimeType} by ${entry.actorId}');
}

The SDK uses Nomos’ workspace/timeline model for data organization:

A workspace is an isolation boundary. In CO2, each estate gets its own workspace.

// Convert estate ID to workspace ID
final workspaceId = EstateId('estate-1').toWorkspace();

A timeline is a branch within a workspace. CO2 uses the default timeline for normal operations:

// The standard CO2 timeline
final timelineId = Co2Workspace.defaultTimeline;

Every intent dispatch requires a context specifying where and who:

final ctx = IntentContext(
workspaceId: estateId.toWorkspace(),
timelineId: Co2Workspace.defaultTimeline,
actorId: NomosConversionUtils.userIdToActorId(UserId('user-123')),
// Optional: tracing IDs
correlationId: NomosCorrelationId('request-abc'),
causationId: NomosCausationId('intent-xyz'),
);

The SDK is built on Nomos and provides type-safe access to its core features:

All 13 CO2 domains are pre-registered:

// Exported types from all domains:
import 'package:co2_nomos_client/co2_nomos_client.dart';
// Estate Structures
EstateAggregate, SiteRootAggregate
// Trackable Assets
TrackableAssetAggregate
// Attachments
AttachmentAggregate, FolderAggregate
// Identity
UserAggregate
// Permissions
PermissionAggregate
// And more...

See domain_modules.dart for the complete registration list.

Send any CO2 intent through the SDK:

import 'package:intents_v1/intents_v1.dart';
// Example: Create an estate
await sdk.dispatch(
CreateEstateIntent(
estateId: EstateId('new-estate'),
name: EstateName('My Estate'),
description: null,
address: null,
),
ctx: context,
);
// Other available intents from intents_v1 package
// - CreateSiteIntent
// - UpdateSiteIntent
// - DeleteSiteIntent
// - And many more domain-specific intents

The SDK re-exports contract types for payload definitions:

import 'package:co2_nomos_client/co2_nomos_client.dart';
// All contract types from contracts_v1 are available

For intents that require server validation, use Co2RestIntentTransport:

import 'package:co2_nomos_client/co2_nomos_client.dart';
import 'package:firebase_auth/firebase_auth.dart';
final transport = Co2RestIntentTransport(
baseUrl: Uri.parse('https://nomos-server.example.com'),
getToken: () async {
final user = FirebaseAuth.instance.currentUser;
return await user?.getIdToken() ?? '';
},
);
final sdk = await Co2NomosApp.create(
persistence: FirestorePersistence(),
blobStorage: FirebaseBlobStorage(),
serverTransport: transport, // Enable server intents
);
  1. Intent is marked with requiresServerExecution = true
  2. SDK detects this and sends to server via Co2RestIntentTransport
  3. Server executes with full domain context (database queries, external APIs, etc.)
  4. Result is returned as IntentLedgerEntry

Example server intent:

class ValidatePaymentIntent extends Intent {
final String orderId;
@override
bool get requiresServerExecution => true;
// Marked for server-only execution
}

The SDK supports custom logging via the Co2Logger callback:

final sdk = await Co2NomosApp.create(
persistence: persistence,
blobStorage: blobStorage,
logger: (level, message, {error, stackTrace, data}) {
print('[$level] $message');
if (data != null) print(' Data: $data');
if (error != null) print(' Error: $error');
},
enableDebugLogging: true,
);

Log levels:

  • trace - Very detailed internal events
  • debug - Query execution, dispatch flow
  • info - Important operations
  • warn - Recoverable issues
  • error - Failure conditions

The SDK provides extension methods for converting between CO2 and Nomos IDs:

// Estate ID → Workspace ID
final workspaceId = estateId.toWorkspace();
// User ID → Workspace ID
final userWorkspaceId = userId.toWorkspace();
// Site ID → Aggregate ID
final aggregateId = siteId.toAggregateId();
// Estate ID → Aggregate ID
final aggregateId = estateId.toAggregateId();

These helpers are defined in workspace_helpers.dart and automatically imported.

final sdk = await Co2NomosApp.create(
persistence: FirestorePersistence(),
blobStorage: FirebaseBlobStorage(),
);
final estateId = EstateId('my-estate');
// Ensure estate exists
await sdk.estates.ensure(
estateId: estateId,
name: 'My Estate',
actorId: 'user-123',
);
// Watch estate
sdk.estates.watch(estateId).listen((estate) {
print('Estate: ${estate.name}');
});
// Watch sites
sdk.sites.watchAll(estateId).listen((sites) {
print('Sites: ${sites.length}');
});
// Watch files
sdk.attachments.watchFiles(estateId).listen((files) {
print('Files: ${files.length}');
});
// Get entire workspace state at once
final state = await sdk.state(
estateId.toWorkspace(),
Co2Workspace.defaultTimeline,
);
// Access specific aggregates
for (final (id, aggregate) in state.entries) {
print('Aggregate ${id.value}: ${aggregate.runtimeType}');
}
// Get all changes to a site
final history = await sdk.aggregates.estateHistory(
estateId: estateId,
aggregateId: siteId.toAggregateId(),
);
// Print timeline
for (final entry in history) {
print('Intent #${entry.workspaceIntentSequence}');
print(' Type: ${entry.intent.runtimeType}');
print(' Actor: ${entry.actorId}');
print(' Success: ${entry.success}');
print(' Time: ${entry.createdAt}');
}
// Check if site still exists (not soft-deleted)
final site = await sdk.sites.read(estateId, siteId);
if (site == null) {
print('Site is deleted');
// Revive it
await sdk.aggregates.reviveSite(
estateId: estateId,
siteId: siteId,
);
print('Site revived');
}
import 'package:nomos_persistence_memory/nomos_persistence_memory.dart';
test('estates workflow', () async {
final sdk = await Co2NomosApp.create(
persistence: MemoryPersistence(),
blobStorage: MemoryBlobStorage(),
);
final estateId = EstateId('test-estate');
await sdk.estates.ensure(
estateId: estateId,
name: 'Test Estate',
actorId: 'test-user',
);
final estate = await sdk.estates.read(estateId);
expect(estate, isNotNull);
expect(estate!.name, equals('Test Estate'));
// Clean up
await sdk.clearTestState();
sdk.dispose();
});
test('watch estate updates', () async {
final sdk = await Co2NomosApp.create(
persistence: MemoryPersistence(),
blobStorage: MemoryBlobStorage(),
);
final estateId = EstateId('test-estate');
// Capture emissions
final estates = <EstateAggregate>[];
final sub = sdk.estates.watch(estateId).listen((estate) {
estates.add(estate);
});
// Initially ensure estate
await sdk.estates.ensure(
estateId: estateId,
name: 'Test Estate',
actorId: 'test-user',
);
// Wait for emission
await Future.delayed(Duration(milliseconds: 100));
expect(estates.length, greaterThan(0));
expect(estates.first.name, equals('Test Estate'));
await sub.cancel();
sdk.dispose();
});

The SDK requires:

  • Dart SDK: >= 3.0.0
  • Nomos Core: For event-sourcing framework
  • CO2 Packages:
    • contracts_v1 - Shared types
    • intents_v1 - All intents
    • policy_v1 - Intent resolution policy
    • All 13 domain packages (estate structures, assets, etc.)
  • Utilities: collection, http

See pubspec.yaml for complete dependency list.

This occurs when watching an estate that hasn’t been created yet:

// Solution: ensure estate exists first
await sdk.estates.ensure(
estateId: estateId,
name: 'Name',
actorId: 'user-id',
);
// Then watch
sdk.estates.watch(estateId).listen(...);

Check that intent context is correct:

// Workspace MUST match estate
final ctx = IntentContext(
workspaceId: estateId.toWorkspace(), // Use estateId.toWorkspace()!
timelineId: Co2Workspace.defaultTimeline,
actorId: NomosConversionUtils.userIdToActorId(UserId('...')),
);

Ensure token provider returns valid tokens:

final transport = Co2RestIntentTransport(
baseUrl: Uri.parse('https://server.example.com'),
getToken: () async {
// Must return non-empty token
final token = await getAuthToken();
if (token.isEmpty) throw Exception('No token available');
return token;
},
);

Streams are debounced (16ms) to prevent thrashing. Small rapid changes may batch together:

// Expected: single emission with latest state
sdk.estates.watch(estateId)
.listen((estate) {
// May receive batched updates
print('Estate updated: ${estate.id}');
});
co2_nomos_client/
├── lib/
│ ├── co2_nomos_client.dart # Main export file
│ └── src/
│ ├── co2_nomos_app.dart # SDK entry point
│ ├── domain_modules.dart # Domain registration
│ ├── workspace_helpers.dart # ID conversion helpers
│ ├── queries/
│ │ ├── estate_queries.dart # Estate operations
│ │ ├── site_queries.dart # Site operations
│ │ └── attachment_queries.dart # Attachment operations
│ ├── commands/
│ │ └── aggregate_commands.dart # Lifecycle commands
│ └── transport/
│ └── rest_intent_transport.dart # Server transport
└── pubspec.yaml