📶Aggregations

Write SQL to calculate complex computations (SUMs, AVGs, Top N, etc) over your on-chain data

All indexing clusters deployed in Flair have a Managed Flink already integrated and installed ready to run SQL-based Flink streaming or batch jobs.

This Flink deployment has 2 customised "connectors" ready to be used in your queries:

  • connector: "stream" gives you access to real-time stream of your on-chain entities (e.g. decoded events) to run aggregations over.

  • connector: "database" gives you "read" and "write" access to your existing data in your namespace.

Full docs about Flink SQL is available at flink.apache.org. Feel free to ask our engineer if you need help writing queries.

Stream Jobs

A stream SQL job is a continuous process which can listen to incoming events and update/create new entities in real-time.

For example listening to Swap events and creating a totalVolumeUSD per Pool for your AMM. Or create hourly/daily time-series data.

Example of a stream job to create hourly total volume buckets per Pool
  1. Add new folder in your project for example under src/aggregations/pool-volume-buckets

  2. Create a new SQL file named streaming.sql with a content like:

-- This instructs Flink that this is a real-time continuous job
-- that must always keep running.
SET 'execution.runtime-mode' = 'STREAMING';

-- Here we're creating a virtual table against a stream of incoming Swap events.
-- Note that that these fields are whatever you upsert in your processors.
-- Visit "examples" repo to see how each field like amountUSD is added.
CREATE TABLE swaps_stream (
    `entityId` STRING,
    `protocol` STRING,
    `chainId` BIGINT,
    `poolId` STRING,
    `amount0` STRING,
    `amount1` STRING,
    `amount0USD` FLOAT,
    `amount1USD` FLOAT,
    `amountUSD` FLOAT,
    `hourBucket` STRING,
    `dayBucket` STRING,
    `weekBucket` STRING,
    `monthBucket` STRING,
    `blockTimestamp` BIGINT,
    `timestamp` BIGINT,
    -- Field below tells Flink how to order incoming events based on timestamp
    -- which in normal cases is derived from blockTimestamp.
    `ts` as TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
    WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTES
) WITH (
    'connector' = 'stream',
    'mode' = 'append',
    -- Namespace is automatically interpolated based on your cluster manifest.yml
    'namespace' = '{{ namespace }}',
    -- The entityType is same as what you have provided in your processor "upsert"
    'entity-type' = 'Swap'
);

-- Here we create a destination sink towards your namespace on a different entityType
CREATE TABLE pool_stats_sink (
    `entityId` STRING,
    `protocol` STRING,
    `chainId` BIGINT,
    `poolId` STRING,
    `granularity` STRING,
    `timeBucket` STRING,
    `bucketTimestamp` BIGINT,
    `volumeUSD` FLOAT,
    `volumeToken0` STRING,
    `volumeToken1` STRING,
    `volumeToken0USD` FLOAT,
    `volumeToken1USD` FLOAT,
    PRIMARY KEY (`entityId`) NOT ENFORCED
) PARTITIONED BY (`entityId`) WITH (
    'connector' = 'database',
    'mode' = 'write',
    'namespace' = '{{ namespace }}',
    'entity-type' = 'PoolStat'
);

-- Here we create a temporary inmemory view table that is basically an aggregated
-- table grouped by "Pool" and SUMs the USD-values and BigNumber-values
CREATE VIEW swap_volumes_buckets AS
SELECT
    `poolId` || ':' || `hourBucket` as `entityId`,
    `protocol`,
    `chainId`,
    `poolId`,
    'hour' as `granularity`,
    `hourBucket` as `timeBucket`,
    MAX(`blockTimestamp`) as `bucketTimestamp`,
    COALESCE(SUM(`amountUSD`), 0) as `volumeUSD`,
    -- SUM_BN() is a custom function already imported in all Flink deployments
    -- usedful for Web3-context where we have many values with 18 decimal
    -- like "wei" value of ERC20 transfers:
    COALESCE(SUM_BN(`amount0`), '0') as `volumeToken0`,
    COALESCE(SUM_BN(`amount1`), '0') as `volumeToken1`,
    COALESCE(SUM(`amount0USD`), 0) as `volumeToken0USD`,
    COALESCE(SUM(`amount1USD`), 0) as `volumeToken1USD`
FROM
    swaps_stream
WHERE
    -- This condition helps Flink to ignore older data to keep active memory usage low.
    -- It is mainly needed for "streaming" jobs where it is suppose to run and long-duration of time.
    `ts` >= CAST(FLOOR((CURRENT_TIMESTAMP - INTERVAL '2' HOUR) TO HOUR) AS TIMESTAMP(3))
GROUP BY
    `protocol`,
    `chainId`,
    `poolId`,
    `hourBucket`
;

-- Finally we would "upsert" the data into our final destination entity-type.
-- Remember that due to how Flink works order of fields here must be
-- exact same as table definition above.
INSERT INTO
    pool_stats_sink
SELECT
    -- Each upsert must have at least "entityId" otherwise the connector does not
    -- know what ID to use.
    -- In this case we are creating an entityId based on Pool address + hourBucket
    -- which is defined in the above definition of swap_volumes_buckets view.
    `entityId`,
    `protocol`,
    `chainId`,
    `poolId`,
    `granularity`,
    `timeBucket`,
    `bucketTimestamp`,
    `volumeUSD`,
    `volumeToken0`,
    `volumeToken1`,
    `volumeToken0USD`,
    `volumeToken1USD`
FROM
    swap_volumes_buckets;

Now this job will upsert new entities in your namespace. Remember, to sync this data (or any other processed entity) to your own database (e.g. Postgres) the flow is exactly same as what is described in Database docs.

  1. Now we need to add this aggregation definition in the manifest.yml so when you deploy the cluster this job starts working:

# ...

enrichers:
  - id: pool-volume-buckets-streaming
    engine: flink
    inputSql: ./src/aggregations/pool-volume-buckets/streaming.sql
  1. Deploy your cluster to get this enricher running:

flair deploy
  1. Check for logs to see if there are issues with your enricher such as SQL Syntax issues:

flair logs -t EnricherId=pool-volume-buckets-streaming JobId=none
  1. Now you can view your running Flink job in the native UI to see the stats and number of processed entities:

Batch Jobs

When dealing large historical data you can run a batch job to prepopulate historical data.

For example if you want to generate all the hourly time-series data for all past Pool data for your AMM.

Example of batch job to backfill historical hourly volume buckets per Pool
  1. Add new folder in your project (if not done yet) for example under src/aggregations/pool-volume-buckets

  2. Create a new SQL file named batch.sql with a content like:

-- This instructs Flink that this is a batch job that must run only
-- when triggered explicitly by user.
SET 'execution.runtime-mode' = 'BATCH';

-- Similar to "streaming" example above we need to create a virtual table for swaps.
-- Here we're creating a virtual table against existing database of Swap events.
-- Note that that these fields are whatever you upsert in your processors.
-- Visit "examples" repo to see how each field like amountUSD is added.
CREATE TABLE swaps_store (
    `entityId` STRING,
    `protocol` STRING,
    `chainId` BIGINT,
    `poolId` STRING,
    `amount0` STRING,
    `amount1` STRING,
    `amount0USD` FLOAT,
    `amount1USD` FLOAT,
    `amountUSD` FLOAT,
    `hourBucket` STRING,
    `dayBucket` STRING,
    `weekBucket` STRING,
    `monthBucket` STRING,
    `blockTimestamp` BIGINT,
    PRIMARY KEY (`entityId`) NOT ENFORCED
) PARTITIONED BY (`entityId`) WITH (
    -- As oppose to stream example above, we need to read all the
    -- existing data from database (vs incoming stream)
    'connector' = 'database',
    'mode' = 'read',
    'namespace' = '{{ namespace }}',
    'entity-type' = 'Swap',
    
    -- OPTIONALLY for large amounts of data (e.g. 100m swaps)
    -- you can instruct Flink to partition in smaller batches
    -- which means job won't fail due to memory constraints.
    --
    -- 'scan.partition.num' = '1000',
    -- 'scan.partition.column' = 'blockTimestamp',
    -- 'scan.partition.lower-bound' = '1577917612',
    -- 'scan.partition.upper-bound' = '1702247157'
);

-- Exactly same as "stremaing" example above we need to create
-- a virtual table for the destination entityType.
-- Here we create a temporary inmemory view table that is basically an aggregated
-- table grouped by "Pool" and SUMs the USD-values and BigNumber-values
CREATE TABLE pool_stats_sink (
    `entityId` STRING,
    `protocol` STRING,
    `chainId` BIGINT,
    `poolId` STRING,
    `granularity` STRING,
    `timeBucket` STRING,
    `bucketTimestamp` BIGINT,
    `volumeUSD` FLOAT,
    `volumeToken0` STRING,
    `volumeToken1` STRING,
    `volumeToken0USD` FLOAT,
    `volumeToken1USD` FLOAT,
    PRIMARY KEY (`entityId`) NOT ENFORCED
) PARTITIONED BY (`entityId`) WITH (
    'connector' = 'database',
    'mode' = 'write',
    'namespace' = '{{ namespace }}',
    'entity-type' = 'PoolStat'
);

-- Here we create a temporary in-memory view table that is basically an aggregated
-- table grouped by "Pool" and SUMs the USD-values and BigNumber-values
CREATE VIEW swap_volumes_buckets AS
SELECT
    `poolId` || ':' || `hourBucket` as `entityId`,
    `protocol`,
    `chainId`,
    `poolId`,
    'hour' as `granularity`,
    `hourBucket` as `timeBucket`,
    MAX(`blockTimestamp`) as `bucketTimestamp`,
    COALESCE(SUM(`amountUSD`), 0) as `volumeUSD`,
    -- SUM_BN() is a custom function already imported in all Flink deployments
    -- usedful for Web3-context where we have many values with 18 decimal
    -- like "wei" value of ERC20 transfers:
    COALESCE(SUM_BN(`amount0`), '0') as `volumeToken0`,
    COALESCE(SUM_BN(`amount1`), '0') as `volumeToken1`,
    COALESCE(SUM(`amount0USD`), 0) as `volumeToken0USD`,
    COALESCE(SUM(`amount1USD`), 0) as `volumeToken1USD`
FROM
    swaps_store
GROUP BY
    `protocol`,
    `chainId`,
    `poolId`,
    `hourBucket`
;

-- Finally we would "upsert" the data into our final destination entity-type.
-- Remember that due to how Flink works order of fields here must be
-- exact same as table definition above.
INSERT INTO
    pool_stats_sink
SELECT
    -- Each upsert must have at least "entityId" otherwise the connector does not
    -- know what ID to use.
    -- In this case we are creating an entityId based on Pool address + hourBucket
    -- which is defined in the above definition of swap_volumes_buckets view.
    `entityId`,
    `protocol`,
    `chainId`,
    `poolId`,
    `granularity`,
    `timeBucket`,
    `bucketTimestamp`,
    `volumeUSD`,
    `volumeToken0`,
    `volumeToken1`,
    `volumeToken0USD`,
    `volumeToken1USD`
FROM
    swap_volumes_buckets;
  1. Now we need to add this aggregation definition in the manifest.yml so when you deploy the cluster this job starts working:

# ...

enrichers:
  - id: pool-volume-buckets-batch
    engine: flink
    inputSql: ./src/aggregations/pool-volume-buckets/batch.sql
  1. Deploy your cluster to get this enricher running:

flair deploy
  1. Manually trigger the batch job, this might take few minutes to provision the required resources and runners:

flair enricher trigger pool-volume-buckets-batch

Now this job will upsert all the historical entities in your namespace. Remember, to sync this data to your own database (e.g. Postgres) the flow is described in Database docs.

Last updated