Skip to content

Load Data Into Dash

Page Topics:


Loading of data is somewhat different from a normal relational database. The underlying architecture is based on data lake technology and is optimised for large data. because of this, it is important to understand and follow some basic principles from the start.

Once data has been ingested, it cannot be updated or deleted, which needs to be taken into account when deciding how to structure the data on the way in. Also, a table can only be created by uploading data, and not the other way around. Our process then looks at the data and decides what the schema should be. The drawback of this is that if it gets the schema wrong, it requires some manual intervention (which currently means a support request to the ComoDash team). The benefit, however, is that for large data sets the efficiency gains are significant.

Match the Source

The best approach for loading data is to ensure that it matches the source as closely as possible. The reason for this is to:

  • Simplify the transformation and loading process.
  • Ensure that no information is lost during importing.

No information left behind

Keep in mind that summarisation and transformation of data could leave behind key information. Part of the power of this lake model is that all data should be imported and be available to the analyst.


All Data Loaded as CSV

Data must be converted to UTF-8 encoded CSV files to be uploaded via the API. Care should be taken that the schema does not change should the table exist already, otherwise a temporary table will be created for the day's partition. Also, large data sets must be broken up into multiple files of no more than 2MB (gzipped) to be pushed via the API. The idea is that the source systems (such as administration systems) push data to Dash regularly - meaning smaller chunks per import.


Data is Partitioned Per Day and by Service Client

Partitions divide a table into separate parts and appear as an extra column when querying the data. If you limit a query to certain partitions in the "where" clause, then it prevents the whole table from being scanned. This helps to optimise queries.

The data lake is partitioned based on the date that the data is loaded, as well as service_client. Find out more about service client here.

  • A new partition in the lake is created for each day, which can be accessed in the data_import_batch column

  • Partitions for insights tables can be defined explicitly.


Deleting and Updating of Data

Once data has been loaded into a table, it cannot be updated or deleted, however, there are a few strategies to employ in your data structure to allow for changes and updates.

Strategy Description
Row Level delta Only rows that have been changed are pushed to Dash
Snapshot load A full snapshot is pushed every time

Row-Level Delta

The row-level delta approach is where a source system pushes only the "deltas" or changed rows of a table to Comotion Dash periodically (say hourly or daily). All rows that are changed, created, or deleted are pushed to Comotion Dash.

  • The row-level delta approach requires a unique identifier in the source data for each row.
  • Each time the row is uploaded, it is treated as an update of that row.

This approach therefore also requires an extra column to indicate whether that row has been deleted in the source, as well as a changedDate column to identify the latest version of that row.

The row that has been updated will appear multiple times in the lake

Therefore, when querying, care should be taken to only select the latest line. This can be achieved using the row_number() window function that is available in Presto SQL.

Example of Row-Level Delta

For example, say we have a table called "People" in a source database, which has a primary identifier of "id". The table below shows what we should expect if we have one subsequent update and delete. when querying this table in the data lake, care should be taken to only retrieve the latest information.

id name surname changedDate deleted
1 Bob Builder 2020-01-02 13:00:00 false
1 William Builder 2020-02-03 13:30:00 false
1 William Builder 2020-02-03 13:30:00 true

To query this table in the lake, the following SQL will ensure that only the latest row for each id is retrieved:

select * from (
    select *, row_number() over (partition by id order by changedDate desc) as rn
    from People
) as temp
where rn = 1

Strategies for adding a "deleted column"

Adding a deleted column to data, if it does not exist in the source, is a challenge. The options include:

  1. Refactor your source system to add a deleted column to the table in question and refactor any logic that deletes rows to simply set the deleted column to true.
  2. Add a database trigger in your source that adds the IDs and changedDate of deleted rows to another table, and then import that as a separate table into Comotion Dash.

Snapshot Data

The snapshot level approach means that a full snapshot of a table is pushed every time. This means that if a row is not in the latest snapshot, it is assumed to be deleted, therefore you do not need a "deleted" indicator, as you can assume that any data not in the latest snapshot has been deleted.

  • For every snapshot, you should generate some kind of snapshot id that can be used to identify it.
  • The daily partition that is created may be useful for this but is risky as you may load two snapshots in a day.

Selecting a snapshot identifier

The snapshot identifier can simply be a timestamp, however, it is important to keep the same timestamp for the entire snapshot."

When querying this type of table, you simply need to limit the query to the latest snapshot.

Example of Snapshot

Say we use the snapshot approach for our "People" table in a source database, which has a primary identifier of id and a snapshot identifier called "batch". Each time we load the data into Dash, we should increment the batch identifier by 1. Note that - differently to the row delta approach - we do not have a deleted column.

id name surname changedDate batch
1 Bob Builder 2020-01-02 13:00:00 1
1 William Builder 2020-02-03 13:30:00 2
1 William Builder 2020-02-03 13:30:00 3
2 Tim Tailor 2020-01-02 13:00:00 1
2 Tim Tailor 2020-02-03 13:30:00 2

In this example, Only record 1 (William Builder) will persist in the latest batch.

To query this table in the lake, the following SQL will ensure that only the latest row for each id is retrieved:

select *
from People
where batch = (select max(batch) from People)

Currently, we do not support bespoke partition schemes, which means that adding the date-loaded partition as a backstop can speed up the query.

If, for instance, you know that the latest batch was loaded after 1/1/2020, then you could add the following to your query:

select *
from People
where batch = (select max(batch) from People where data_import_batch > '2020-01-01')

Data Ingestion API

Data should be pushed to the Comotion Dash API in parts using the following endpoint:

Endpoint https://api.comodash.io/v1/data-input-file
Request Type POST
Max Payload Size 2MB
File Type GZipped CSV (UTF 8 Encoded)
Header: Content-Type application/gzip
Header: service_client_id 0
Header: x-api-key API key provided
Header: org-name org name provided
Header: table-name the name of the table that you are adding to

Python SDK

There is a python SDK available that can be used to easily push data to dash. You can find the full details here.

Strategies for pushing data from a source system

Generally, when pushing data from a source system, the delta rows strategy is preferred.

One approach to integrating with the API is to have a scheduled job in the source system that pushes the changes of the relevant tables to Comotion Dash regularly (say every 30 minutes).

To achieve this, a few prerequisites are required for each table that you would like to synchronise to Dash (called "Source Tables"):

  • Ensure all processes that create database transactions run within some timeframe X (say 30 minutes)
  • All changes update a changedDate column on the table with the current date-time.
  • All manual data updates made to the table should also respect the changedDate, and update it.
  • All lines (including any legacy data) have non-null changedDates
  • ChangedDates should not be overly "clustered". Too many identical changedDates could result in batch sizes that are too big. More on this below.
  • No deletes are ever made to tables (see commentary on row level deltas above.)

Once your prerequisites are in place, create a table that will keep track of your bookmarks for each source table. The bookmark for each table represents the changedDate that was last sent to Dash. Only lines changed since this changed date need to be sent on the next round.

The table would look something like this:

table_name bookmarkdate
source_table_1 2020-03-01 01:02:03
source_table_2 2020-03-15 16:03:03
source_table_3 2020-03-14 02:04:03

This means that the last line of source_table_1 that was sent had a changedDate of 2020-03-01 01:02:03 and so on.

To determine the next batch of rows to send, the following query could be used (Postgres in this instance for illustration purposes):

    BEGIN;  

    /*  bookmarktable is the table used 
        to store your bookmarks for each source table.  This assumes it has been initialised with a row for each source table  */

    with bookmark as (
        select bookmarkdate from bookmarktable 
        where table_name = 'source_table_1')

    ,initial_batch as (
        select * 
        from source_table_1 
        where 
            changedDate > (select bookmarkdate from bookmark)
            and changedDate < current_timestamp - (30 * interval '1 minute')
        order by changedDate desc
        limit 10000
    )

    /* this step handles the situation where there could be multiple 
    identical changed dates that could be cut off by the limit and 
    then be missed due to the bookmark moving on */

    , max_date as (
        select max(changedDate) as max_date from batch
    )

    , batch as (
        select * 
        from source_table_1 
        where 
            changedDate > (select bookmarkdate from bookmark)
            and changedDate <= max_date.max_date
        order by changedDate desc
    )

    , updateresult as (
        update bookmarktable
        set bookmarkdate as max_date.max_date
        where table_name = 'source_table_1'
        returning *
    )

    select * from batch

    /* this will produce the data to send to the dash via api. Only commit once successfully sent, otherwise ROLLBACK */

    COMMIT;

Note the following

  • The timeframe delay used in this example is 30 minutes, it should be longer than timeframe X mentioned above.
  • Error handling when sending to the API should rather err on the side of sending duplicate data twice (as mentioned above, you can dedupe in Dash), rather than missing data to be sent.
  • Appropriate indexes should be added to the changedDate column to ensure the query performs well at scale.
  • In the scenario where there are multiple identical changedDates, the size of the batch to send to the API may breach the maximum size limit. This scenario needs to be handled by the application - breaking up the file before sending in successive calls.
  • It is a good idea to visualise the bookmarks in the front end of the application so that the synchronisation process can be monitored.

Last update: June 21, 2021