Asynchronous event sourcing.
Library provides a logical layer for storing and querying events as a stream.
Heavily inspired by Greg Young’s Event Store and Streamstone
solutions.
Overview
Designed to be easily extended with custom storage backends.
Despite the fact that component implements a logical layer for storing and querying events as a stream,
it does not provide functionality of domain aggregate
, such as state mutation, conflict resolution etc., but serves more as persistence layer
for it.
Packages
Name | Description | Package |
---|---|---|
StreamStore | Asynchronous event streaming | |
StreamStore.Storage | Base abstract implementation of persistence layer of StreamStore component. | |
StreamStore.Storage.Contracts | Persistence layer interfaces of StreamStore component. | |
StreamStore.Serialization.Protobuf | Protobuf event serializer/deserializer. | |
StreamStore.Storage.EventFlow | Adapter of StreamStore’s storage backends to use as event store in EventFlow . |
Storage packages
Name | Description | Concurrency Control | Multitenancy | Event Duplication Detection | Package |
---|---|---|---|---|---|
StreamStore.NoSql.Cassandra | Apache Cassandra and Azure Cosmos DB for Apache Cassandra storage |
Optimistic |
:white_check_mark: | :x: | |
StreamStore.Sql.PostgreSql | PostgreSQL storage |
Optimistic |
:white_check_mark: | :white_check_mark: | |
StreamStore.Sql.Sqlite | SQLite storage |
Optimistic |
:white_check_mark: | :white_check_mark: | |
StreamStore.InMemory | In-memory storage is provided for testing and educational purposes only | Optimistic |
:white_check_mark: | :white_check_mark: | |
StreamStore.S3.AWS | Amazon S3 storage |
Pessimistic |
:x: | :x: | |
StreamStore.S3.B2 | Backblaze B2 storage (not working at the moment!) |
Pessimistic |
:x: | :x: |
Concepts
About basic concepts you can read in CONCEPTS.md.
Features
The general idea is to highlight the common characteristics and features of event sourcing storage:
- Asynchronous read and write operations.
- Multitenancy support.
- Automatic provisioning of storage schema.
- Event ordering.
- Serialization/deserialization of events.
- Optimistic concurrency control.
- Event duplication detection based on event ID.
- Storage agnostic test framework.
- Binary serialization support.
- Custom event properties.
Storages
Also add implementations of particular storage, such as:
In-Memory
- for testing purposes.Binary Object
storages:Backblaze B2
- Backblaze B2.Amazon S3
- Amazon S3.
SQL
based DBMS:NoSQL
based DBMS:
Roadmap
- Composite stream identifier
- External transaction support (?).
- Transactional outbox pattern implementation (?).
Installation
To install the package, you can use the following command from the command line:
# Install StreamStore package
dotnet add package StreamStore
# Install package of particular storage implementation, for instance InMemory
dotnet add package StreamStore.InMemory
or from NuGet Package Manager Console:
# Install StreamStore package
Install-Package StreamStore
# Install package of particular storage implementation, for instance SQLite storage backend
Install-Package StreamStore.Sql.Sqlite
Usage
- Register store in DI container
// Register StreamStore
services.AddStreamStore(x =>
// Optional. Enable automatic provisioning of storage schema provisioning, default: false.
x.EnableAutomaticProvisioning()
// Optional. Enable multitenancy
// By explicitly passing tenant identifiers
.EnableMultitenancy("tenant-1", "tenant-2")
// Or providing your own ITenantProvider implementation
.EnableMultitenancy<MyTenantProvider>()
// Optional. Define size of the batch for reading events, default: 1000.
.WithReadingPageSize(1000)
// Optional. Define algorith of event reading, default: Queue
.WithReadingMode(StreamReadingMode.Queue)
// Optional. Configure custom serialization/deserialization of events, default: Newtonsoft.Json
.ConfigureSerialization(...)
// Configure event persistence layer, you can use one of the provided storage implementations
x.ConfigurePersistence(c =>
// For instance, SQLite storage backend in single-tenant mode
c.UseSqlite(x =>
c.WithConnectionString(connectionString))
// Or SQLite in multi-tenant mode
.UseSqliteWithMultitenancy(x =>
c.WithConnectionString(connectionString))
)
);
- Use store in your application
// Inject IStreamStore in your service or controller for single storage implementation
public class MyService
{
private readonly IStreamStore store;
public MyService(IStreamStore store)
{
this.store = store;
}
}
// Or IStreamStoreFactory for multitenancy
public class MyService
{
private readonly IStreamStoreFactory storeFactory;
public MyService(IStreamStoreFactory storeFactory)
{
this.storeFactory = storeFactory;
}
}
// Append events to stream or create a new stream if it does not exist
// EventObject property is where you store your event
var events = new Event[] {
new Event { Id = "event-1", Timestamp = DateTime.Now, EventObject = eventObject }
...
};
try {
store
.BeginAppendAsync("stream-1") // Open stream like new since revision is not provided
.AppendAsync(x => // Append events one by one using fluent API
x.WithId("event-3") // Specify event ID
.Dated(DateTime.Now) // Specify event timestamp
.WithEvent(eventObject) // Specify event object
// Optional. Add custom property to event
.WithCustomProperty("property-name", "property-value")
// Or use WithCustomProperties method to add multiple custom properties
.WithCustomProperties(new Dictionary<string, object> { { "property-name", "property-value" } })
)
...
.AppendRangeAsync(events) // Or append range of events by passing IEnumerable<Event>
.SaveChangesAsync(token);
} catch (ConcurrencyException ex) {
// Read from stream and implement your logic for handling optimistic concurrency exception (optional)
await foreach(var @event in await store.BeginReadAsync("stream-1", token)) {
...
}
// Get stream metadata to get actual revision
var metadata = await store.GetMetadataAsync("stream-1", token);
// Push result to the end of stream
store
// Open stream with revision to handle concurrency
.BeginAppendAsync("stream-1", metadata.Revision)
// Append events one by one using fluent API
.AppendAsync(x =>
x.WithId( "event-4")
.Dated(DateTime.Now)
.WithEvent(yourEventObject)
)
...
.SaveChangesAsync(streamId);
}
More examples of reading and writing events you can find in test scenarios of StreamStore.Testing project.
Examples
Examples of using StreamStore with different storage implementations you can find in separate streamstore-examples
repository.
Customization
How to create your own persistence layer implementation you can find in STORAGE.md.
How to create your own serialization/deserialization implementation you can find in SERIALIZATION.md.
Error handling
How to handle errors and exceptions in your application you can find in ERRORHANDLING.md.
Contributing
If you experience any issues, have a question or a suggestion, or if you wish to contribute, feel free to open an issue or start a discussion.