Data synchronization
You probably think about services such as Firebase
when we mention data synchronization. Creating something like Firebase
is of course a complex task, but you can created your own data synchronization with XSockets as well.
You can decide if you want to use Pub/Sub
or RPC
(or even a mix of them) when building a custom data synchronization feature in XSockets. Most likely you will mix Pub/Sub
and RPC
.
Introduction
What we want to accomplish is something that let us get information about CRUD
operations on a certain entity.
When someone:
- Add a record
- Change a record
- Delete a record
we want to tell others about the change (data synchronization).
To keep things simple we will store data in memory, but replacing the in-memory storage with a persistent one should be straight forward.
Data Sync Controller
To be able to re-use the data synchronization we will create a generic base class for data synchronization. We will also need a interface so that we know that each entity being synced has a Id. To avoid strings in the code we will use some constants.
Pre-Req
// For having an ID on the entity we sync
public interface IDataSyncObject
{
Guid Id { get; set; }
}
// Operation being performed
public static class DataSyncCommand
{
public const string Add = "add";
public const string Update = "update";
public const string Delete = "delete";
public const string Init = "init";
}
Controller
This data synchronization controller is just a sample of how you can do this. Adding/removing logic should be very straight forward.
The controller is a abstract generic class that takes the "real" controller and the entity to sync as generic parameters.
using XSockets.Core.XSocket;
using XSockets.Core.XSocket.Helpers;
using System.Threading.Tasks;
using XSockets.Core.Common.Socket;
using XSockets.Plugin.Framework;
using XSockets.Core.Utility.Storage;
using System;
using System.Collections.Generic;
public abstract class DataSyncController<T, TV> : XSocketController
where T : class, IXSocketController
where TV : class, IDataSyncObject
{
private static T _controller;
private readonly string _topic = typeof(TV).Name;
static DataSyncController()
{
_controller = (T)Composable.GetExport<IXSocketController>(typeof(T));
}
/// <summary>
/// To get the correct controller type, cant use abstract class when sending data
/// </summary>
private T Controller
{
get { return _controller ?? (_controller = (T)Composable.GetExport<IXSocketController>(typeof(T))); }
}
/// <summary>
/// Will fetch and send all data in the repository for each topic sent in with the connection.
/// </summary>
public override async Task OnOpened()
{
//Send back the data in the repo for the type TV
await this.Invoke(Repository<Guid, TV>.GetAll(), string.Format("{0}:{1}", DataSyncCommand.Init, _topic));
}
/// <summary>
/// Search the repository, not exposed to clients since accessor is protected
/// </summary>
/// <param name="expression"></param>
/// <returns></returns>
protected virtual IEnumerable<TV> Find(Func<TV, bool> expression)
{
return Repository<Guid, TV>.Find(expression);
}
/// <summary>
/// Adds/Updates the data from in reposiotry and tells all (subscribing) clients about it.
///
/// Override to implement custom logic
/// </summary>
/// <param name="model"></param>
public virtual async Task Update(TV model)
{
var command = DataSyncCommand.Update;
if (model.Id == Guid.Empty)
{
model.Id = Guid.NewGuid();
command = DataSyncCommand.Add;
}
model = Repository<Guid, TV>.AddOrUpdate(model.Id, model);
await Sync(command, model);
}
/// <summary>
/// Deletes the data from the reposiotry and tells all (subscribing) clients about it
///
/// Override to implement custom logic
/// </summary>
/// <param name="model"></param>
public virtual async Task Delete(TV model)
{
model = Repository<Guid, TV>.GetById(model.Id);
if (model != null)
{
Repository<Guid, TV>.Remove(model.Id);
await Sync(DataSyncCommand.Delete, model);
}
}
/// <summary>
/// Will do a PUBLISH of changes by default, override to implement specific logic and/or RPC
/// </summary>
/// <param name="command"></param>
/// <param name="model"></param>
protected virtual async Task Sync(string command, TV model)
{
await Controller.PublishToAll(model, string.Format("{0}:{1}", command, _topic));
}
}
Implementation and Usage
First of all we need a entity to perform data synchronization on. We keep it simple and use a Person
class.
Model
public class Person : IDataSyncObject
{
public Guid Id { get; set; }
public string Name { get; set; }
}
Data Sync Controller
To use this basic class with data synchronization we just inherit the base controller for data sync and pass in the Person
class. This implementation is very basic by default since all logic is in the base class.
using System.Threading.Tasks;
using XSockets.Plugin.Framework.Attributes;
[XSocketMetadata("person")]
public class PersonController : DataSyncController<PersonController, Person>{}
To be able to demo this from Putty
we extend the implementation with a few simple methods. The reason for this is that it is hard to send complex models via Putty
.
By adding the methods Add
and Del
we can now test the data sync via Putty
.
using System.Threading.Tasks;
using XSockets.Plugin.Framework.Attributes;
[XSocketMetadata("person")]
public class PersonController : DataSyncController<PersonController, Person>
{
public async Task Add(string name)
{
await this.Update(new Person { Name = name });
}
public async Task Del(string name)
{
foreach(var p in this.Find(p => p.Name == name))
{
await this.Delete(p);
}
}
}
Test
Now we will open up 2 instances of Putty
. One of them will subscribe for the topics add:person
and delete:person
. Since the init:person
is sent with RPC
we do not need a subscription for that.
1st Client Connects
The client connects to our controller for data synchronization. The controller instantly sends the list of people being synced. The list is empty since there was no record in memory.
Then we subscribe to add:person
and delete:person
so that this client will get information about these events.
Next the client add the people steve
and ben
. Since there is a subscription for the add:person
topic the information is sent back to the client.
2nd Client Connects
Now the second client connects. When the controller is opened the information about the previously added people is sent back with RPC (init:person
)
Now the 2nd client adds a new person named sara
. The 2st client instantly gets information about the new person since there is a subscription for add:person
Finally the 2nd client delete ben
and the 1st client that has a subscription for delete:person
gets information about this instantly.
Summary
Data synchronization is complex, but as you can see in this basic sample it is easy to setup a generic base controller that handles the logic for synchronization. The mix of Pub/Sub
, RPC
and STATE
will help you achieve what you need.
There is a complete data synchronization sample using our JavaScript API with knockoutjs at https://github.com/XSockets/XVA/tree/master/XVA-02-04-DataSyncBasic2