Team XSockets.NET

Data synchronization

You probably think about services such as Firebase when we mention data synchronization. Creating something like Firebase is of course a complex task, but you can created your own data synchronization with XSockets as well.

You can decide if you want to use Pub/Sub or RPC (or even a mix of them) when building a custom data synchronization feature in XSockets. Most likely you will mix Pub/Sub and RPC.

Introduction

What we want to accomplish is something that let us get information about CRUD operations on a certain entity.

When someone:

  • Add a record
  • Change a record
  • Delete a record

we want to tell others about the change (data synchronization).

To keep things simple we will store data in memory, but replacing the in-memory storage with a persistent one should be straight forward.

Data Sync Controller

To be able to re-use the data synchronization we will create a generic base class for data synchronization. We will also need a interface so that we know that each entity being synced has a Id. To avoid strings in the code we will use some constants.

Pre-Req

// For having an ID on the entity we sync
public interface IDataSyncObject
{
    Guid Id { get; set; }
}

// Operation being performed
public static class DataSyncCommand
{
    public const string Add = "add";
    public const string Update = "update";
    public const string Delete = "delete";
    public const string Init = "init";
}

Controller

This data synchronization controller is just a sample of how you can do this. Adding/removing logic should be very straight forward.

The controller is a abstract generic class that takes the "real" controller and the entity to sync as generic parameters.

using XSockets.Core.XSocket;
using XSockets.Core.XSocket.Helpers;
using System.Threading.Tasks;
using XSockets.Core.Common.Socket;
using XSockets.Plugin.Framework;
using XSockets.Core.Utility.Storage;
using System;
using System.Collections.Generic;

public abstract class DataSyncController<T, TV> : XSocketController
    where T : class, IXSocketController
    where TV : class, IDataSyncObject
{
    private static T _controller;
    private readonly string _topic = typeof(TV).Name;
    static DataSyncController()
    {
        _controller = (T)Composable.GetExport<IXSocketController>(typeof(T));
    }

    /// <summary>
    /// To get the correct controller type, cant use abstract class when sending data
    /// </summary>
    private T Controller
    {
        get { return _controller ?? (_controller = (T)Composable.GetExport<IXSocketController>(typeof(T))); }
    }

    /// <summary>
    /// Will fetch and send all data in the repository for each topic sent in with the connection.
    /// </summary>
    public override async Task OnOpened()
    {
        //Send back the data in the repo for the type TV
        await this.Invoke(Repository<Guid, TV>.GetAll(), string.Format("{0}:{1}", DataSyncCommand.Init, _topic));
    }

    /// <summary>
    /// Search the repository, not exposed to clients since accessor is protected
    /// </summary>
    /// <param name="expression"></param>
    /// <returns></returns>
    protected virtual IEnumerable<TV> Find(Func<TV, bool> expression)
    {
        return Repository<Guid, TV>.Find(expression);
    }

    /// <summary>
    /// Adds/Updates the data from in reposiotry and tells all (subscribing) clients about it.
    /// 
    /// Override to implement custom logic
    /// </summary>
    /// <param name="model"></param>
    public virtual async Task Update(TV model)
    {
        var command = DataSyncCommand.Update;
        if (model.Id == Guid.Empty)
        {
            model.Id = Guid.NewGuid();
            command = DataSyncCommand.Add;
        }

        model = Repository<Guid, TV>.AddOrUpdate(model.Id, model);
        await Sync(command, model);
    }

    /// <summary>
    /// Deletes the data from the reposiotry and tells all (subscribing) clients about it
    /// 
    /// Override to implement custom logic
    /// </summary>
    /// <param name="model"></param>
    public virtual async Task Delete(TV model)
    {
        model = Repository<Guid, TV>.GetById(model.Id);
        if (model != null)
        {
            Repository<Guid, TV>.Remove(model.Id);
            await Sync(DataSyncCommand.Delete, model);
        }
    }

    /// <summary>
    /// Will do a PUBLISH of changes by default, override to implement specific logic and/or RPC
    /// </summary>
    /// <param name="command"></param>
    /// <param name="model"></param>
    protected virtual async Task Sync(string command, TV model)
    {
        await Controller.PublishToAll(model, string.Format("{0}:{1}", command, _topic));
    }
}

Implementation and Usage

First of all we need a entity to perform data synchronization on. We keep it simple and use a Person class.

Model

public class Person : IDataSyncObject
{
    public Guid Id { get; set; }

    public string Name { get; set; }
}

Data Sync Controller

To use this basic class with data synchronization we just inherit the base controller for data sync and pass in the Person class. This implementation is very basic by default since all logic is in the base class.

using System.Threading.Tasks;
using XSockets.Plugin.Framework.Attributes;

[XSocketMetadata("person")]
public class PersonController : DataSyncController<PersonController, Person>{}

To be able to demo this from Putty we extend the implementation with a few simple methods. The reason for this is that it is hard to send complex models via Putty.

By adding the methods Add and Del we can now test the data sync via Putty.

using System.Threading.Tasks;
using XSockets.Plugin.Framework.Attributes;

[XSocketMetadata("person")]
public class PersonController : DataSyncController<PersonController, Person>
{

    public async Task Add(string name)
    {
        await this.Update(new Person { Name = name });
    }

    public async Task Del(string name)
    {
        foreach(var p in this.Find(p => p.Name == name))
        {
            await this.Delete(p);
        }
    }
}

Test

Now we will open up 2 instances of Putty. One of them will subscribe for the topics add:person and delete:person. Since the init:person is sent with RPC we do not need a subscription for that.

1st Client Connects

The client connects to our controller for data synchronization. The controller instantly sends the list of people being synced. The list is empty since there was no record in memory.

Then we subscribe to add:person and delete:person so that this client will get information about these events.

data sync 1

Next the client add the people steve and ben. Since there is a subscription for the add:person topic the information is sent back to the client. data sync 2

2nd Client Connects

Now the second client connects. When the controller is opened the information about the previously added people is sent back with RPC (init:person)

data sync 3

Now the 2nd client adds a new person named sara. The 2st client instantly gets information about the new person since there is a subscription for add:person

data sync 4

Finally the 2nd client delete ben and the 1st client that has a subscription for delete:person gets information about this instantly.

data sync 5

Summary

Data synchronization is complex, but as you can see in this basic sample it is easy to setup a generic base controller that handles the logic for synchronization. The mix of Pub/Sub, RPC and STATE will help you achieve what you need.

There is a complete data synchronization sample using our JavaScript API with knockoutjs at https://github.com/XSockets/XVA/tree/master/XVA-02-04-DataSyncBasic2

results matching ""

    No results matching ""