SQL at the limits of Big Data

We often face disbelief from clients when presenting how large the performance degredation can be when Redshift (or any database) queries don't align with the realities of Big Data. "How is it possible to accelerate a properly written query by 20,000 times?" This whitepaper will lay out how and why this happens and a coding example in case you need to prove it to yourself.

First a definition of Big Data is needed to set the context of this paper. My preferred definition of Big Data is "data operations that stress the limits of current data technology." There are a number of dimensions by which data operations can be stressed and the most common one for Redshift is data size. In this paper we are looking at data that is very large, where its size stresses the limits of current compute and network technologies. When working with Redshift, and most data warehousing technologies, data volume is the most common stressor.

Since we are talking about the limits of compute technology, it is important to understand how data size affects compute speeds. Current computers use a hierarchy of data storage structures, each with its own technological limits on size and speed. Very small amounts of data will fit in the CPU registers and can be operated upon at very high speed. The next level is the CPU caches which are still small (compared to today's data sizes) but can be accessed rapidly. The third layer is memory where substantial data can be located and access speeds are reasonable. This is the layer where most database operations are performed - enough data fits in memory such that queries run at "memory speed". Lastly there are disks - while all data starts on disk, the speed at which it is operated on is limited by memory speed. However, if the amount of working data is so much larger than memory the computer system has no choice but to use disk as active storage for query execution. Operating in this mode is slow but allows the computer to work on extremely large amounts of active data. In a general purpose computer this mode is called "swapping" and in a database system it is called "spilling". In either case the net effect is the same, very slow execution.

To avoid spilling and enabling execution at "memory speed", database designers put lots of memory in their computers, but there are technological limits to how much memory can be included. As Big Data grows, well, bigger, database designers have found new ways to keep utilizing more memory. This need has contributed to the push for clustered computer solutions since scaling horizontally presents fewer challenges to increasing available memory for queries to utilize. (Other challenges are created but these are not the topic of this whitepaper.) Even with all the advances in database and computer technology, data sizes are outpacing these advances and cost limitations prevent use all possible technologies. At the limits, queries still run at "disk speed".

Unnecessarily crossing from "memory speed" execution to "disk speed" execution of queries is what can be changed. It is not always easy to know when query design patterns will cross this line since patterns that are optimal when run on smaller data sets don't scale well to very large data sets. Query patterns that are tried and true in one space end up being anti-patterns in Big Data. In Big Data understanding all aspects of the involved technologies is needed because you don't know where the limitations will come from when you are near technological limits. The following a case where a "small data" best practice turned into a Big Data anti-pattern.

A fairly common problem for query writers is to turn a table of accounts into a chart of active accounts by day. Starting with an accounts table that includes start_date and end_date for every account and producing a list of active accounts vs. time. This is a report you may see generated for a bank or for a subcription service. Databases have been producing this report for decades. SQL for achieving this often look like the code below where test_dates_table contains the dates over which the accounts range and test_user_table contains the user account information.

  
select d.mdate, count(r.id) as active_accounts
from
    test_dates_table d 
left join 
    test_user_table r
on r.start_date <= d.mdate
    and  (r.end_date is null or r.end_date >= d.mdate)
group by mdate
order by mdate
;
  

If you need to scale this up to a bigger scenario such as computing active sessions for a a very popular web service. The total number of sessions over a year will be billions and you want to produce a report with an hour level granularity - this has now become a Big Data problem. This tried and true method for producing this report has a run time of hours, if it completes at all. A common reaction to this result is to buy faster (and more expensive) technology to "solve" the issue. This query has become the cost driver for your database and no one is happy with its runtime or the solution cost.

Stepping back and assessing the technological limiters reveals another solution. The issue is the amount of intermediate data produced by the classic solution. Joining tables together with inequality conditions leads to row replication and increases in data. For the "small data" world this amount of data increase is not important as the query can still run in memory and at "memory speeds". However, for the network session case where Big Data sets are being used as the source data, this explosion of data is disasterous. The query, at best, will run at "disk speed" and, at worst, will fill up all the disks attached to the database and crash, possibly taking other workloads with it. This report needs to be generated in a different way one that doesn't create such massive amounts of intermediate data.

Such a solution exists. Let me lay it out with an analogy. If I asked you to produce an active accounts by date report based off of 100 index cards that have account start and end dates, how would you do it? (Sorry no computers, just pen and paper.) Would you make new cards based off each source index card, one for each day between that account's start and end date? Sounds like a lot of writing, right? Take a minute before reading on to think about how you would do this. Thinking through this question will greatly help in understanding different approaches.

Index Cards

You likely came up with a solution that doesn't make new cards for every active day of each account, but rather kept a count of how many accounts started and ended on each day. You are making one card for each day and recording starts and ends for that day. From this you can just count the sum of all starts before any day minus the sum of all ends for the previous day and you will get the active accounts for that day. There are a number of variations on how you would make the report, but it is unlikely you made a new card for every account for every day it is active. You wouldn't do this because writing all those cards would take too long. So why ask the database to do this when writing all those new cards (records) will overwhelm memory?

Let's write the query the way we would perform the task if we had to do it manually. Just as before we need a table of the dates we are interested in and this list of tables sets the pattern for these new index cards (records) where we will record daily start and ends. To achieve this we can take the dates tables, UNION ALL each account start date, and UNION ALL each account end date. Performing a GROUP BY date and we can get the number of starts and ends by date. The last step is to calculate the rolling sums needed to calculate the active accounts by day. The SQL for this looks like:

  
select mdate,
    sum(count(start_date)) over ( order by mdate rows unbounded preceding ) -
    sum(count(end_date)) over ( order by mdate rows unbounded preceding ) as active_accounts
from
(
    select mdate, null::int as id, null::date as start_date, null::date as end_date from test_dates_table
    UNION ALL
    select start_date as mdate, id, start_date, null::date as end_date from test_user_table
    UNION ALL
    select end_date + 1 as mdate, id, null::date as start_date, end_date from test_user_table
        where end_date is not null
)
group by mdate
order by mdate
;
  

This new code uses some relatively expensive window functions to calculate the rolling sums so it is understandable why this method isn't used for smaller data sets. For small data sets, as compared to the database's memory, using large amounts of fast memory to compute the daily active accounts can be the fastest method. However, if the data set is large as compared to memory (or if database memory is at a premium for executing the entire database's workload), then using these slightly more expensive functions can greatly pay off. Near the limits of data technology (Big Data) how you write your queries matters greatly.

Can the orignal code be made to perform well? Yes, by throwing more hardware, i.e. memory, at the problem but this can get very costly. You just need enough memory to store the intermediate data set that is created by the inequality join on condition. However, in the world of Big Data this can be absurdly large. I have had clients try to apply Spark clusters that cost four times as much as their Redshift cluster and only increase execution speeds 2X where rewriting the query sped it up 20,000X with no change in cluster size or operational cost.

Weiner Advanced Development operates at this intersection of Big Data and Redshift. This scenario is only one example of how hardware technology understanding plays a critical role when working in Big Data. Plus data size in only one Big Data limitation that can be encountered. Please reach out to theredshiftwhisperer@gmail.com or @rs_whisperer to understand how Weiner Advanced Development can you break through these technological barriers.

Below is the SQL you can use to demonstrate this effect for yourself. This code shows the dramatic differences between these methods without execution taking too long. It is written for Redshfit (because that is what we do) and sized to show the "memory speed" vs. "disk speed" comparison for a single dc2.large cluster. This cluster costs $.25/hr so you can demonstrate the difference for under $1 of investment. If you want to run this on your database, you will want to scale up the date range and number of accounts in order to cross out of memory based execution. There is nothing unique to Redshift in this solution so feel free to port to your database - this is simply memory efficiency of query design and data size vs. available memory (and cost requirements).

First create a table of the numbers 0-255 for use in creating our test data tables. (Redshift doesn't support generate_series() used to create data but this method works everywhere.)

  
drop table if exists twofivesix;
create table twofivesix ( n int8 );

insert into twofivesix (
SELECT 
    p0.n 
    + p1.n*2 
    + p2.n * POWER(2,2) 
    + p3.n * POWER(2,3)
    + p4.n * POWER(2,4)
    + p5.n * POWER(2,5)
    + p6.n * POWER(2,6)
    + p7.n * POWER(2,7) 
    as number
  FROM 
    (SELECT 0 as n UNION SELECT 1) p0,
    (SELECT 0 as n UNION SELECT 1) p1,
    (SELECT 0 as n UNION SELECT 1) p2,
    (SELECT 0 as n UNION SELECT 1) p3,
    (SELECT 0 as n UNION SELECT 1) p4,
    (SELECT 0 as n UNION SELECT 1) p5,
    (SELECT 0 as n UNION SELECT 1) p6,
    (SELECT 0 as n UNION SELECT 1) p7
  Order by 1
);
commit;
  

Next create a table of user data

  
drop table if exists test_user_table;
create table test_user_table (
  start_date  date        encode zstd,
  end_date    date        encode zstd,
  id           int         encode zstd,
  value       float        encode zstd,
  acct_type   varchar(32) encode zstd  
)
DISTKEY(id)
SORTKEY(start_date, end_date);

insert into test_user_table (
select   decode(date_part('dw',start_date) in (0,6), true, start_date - 2, start_date) \
             as start_date,  -- No weekend starts
        decode(date_part('dw',end_date) in (0,6), true, end_date + 2, end_date) \
                     as end_date,  -- No weekend ends
        id, value, acct_type
from (
    select   decode(start_date >= end_date, true, end_date - (random() * 90 + 1)::int, start_date ) \
                 as start_date,  -- No starts after ends
            decode(end_date < current_date, true, end_date, NULL) \
                 as end_date,  -- No future ends
            id, value, acct_type
    from (
        select  current_date - ((random() * 1000)::int) as start_date, 
                current_date - ((random() * 1000)::int - 90) as end_date,
                n as id, n::float / 7000000 as value, 'type-' || to_hex(n % 16) as acct_type
        FROM (select (a.n + b.n + c.n + d.n) as n from twofivesix a 
            cross join (select n*256 as n from twofivesix) b 
            cross join (select n*65536 as n from twofivesix) c 
            cross join (select n*16777216 as n from ( select distinct (n/64)::int as n from twofivesix ) ) d
        )
    )
)
order by start_date, end_date
);

analyze test_user_table; 
commit;
  

We also need to create a table of dates

  
drop table if exists test_dates_table;
create table test_dates_table(
  mdate  date        encode zstd
)
DISTSTYLE ALL
SORTKEY (mdate);

insert into test_dates_table (
select current_date - n as mdate
from (select (a.n + b.n) as n from twofivesix a 
        cross join (select n*256 as n from twofivesix) b
      )
where n < 1200 -- Only go back to cover span of artificial data
order by mdate
);
insert into test_dates_table (select current_date + 1 as mdate);
insert into test_dates_table (select current_date + 2 as mdate);
insert into test_dates_table (select current_date + 3 as mdate);
insert into test_dates_table (select current_date + 4 as mdate);
commit;

analyze test_dates_table; 
commit;
  

The test input data has been created. We can test the classic SQL and put the results into a table so we can compare the output with the memory optimized SQL.

  
drop table if exists dates_join;
create table dates_join as (
    select d.mdate, count(r.id) as active_accounts
    from
        test_dates_table d
    left join
        test_user_table r
    on r.start_date <= d.mdate
        and  (r.end_date is null or r.end_date >= d.mdate)
    group by mdate
    order by mdate
);
commit;
  

Next test the memory optimized SQL.

  
drop table if exists dates_union;
create table dates_union as (
select mdate,
    sum(count(start_date)) over ( order by mdate rows unbounded preceding ) -
    sum(count(end_date)) over ( order by mdate rows unbounded preceding ) as active_accounts

    from
    (
        select mdate, null::int as id, null::date as start_date, null::date as end_date from test_dates_table
        UNION ALL
        select start_date as mdate, id, start_date, null::date as end_date from test_user_table
        UNION ALL
        select end_date + 1 as mdate, id, null::date as start_date, end_date from test_user_table
            where end_date is not null
    )
    group by mdate
    order by mdate

);
commit;
  

Finally we should check that the two methods produce the same results

  
select * from dates_join
except
select * from dates_union
union all
select * from dates_union
except
select * from dates_join;
  

On my single node dc2.large cluster the classic SQL took 7 minutes to complete while the memory optimized SQL took 6 seconds. Not quite 20,000 X difference but I didn't want to make you wait 90 minutes for the classic method to complete. With changes to the source data the classic method can be made to take an arbitrary amount of time even to the point where it cannot complete.