Syncing data using advanced SQL

As most developers in the last few weeks, I had to implement some GDPR related features. This was one of the more interesting ones..

Once or twice per month we send a subset of our user spam information about third party products. As this is a German project, data protection laws are very strict (long before GDPR), and we made very sure to only spam inform users who opted in.

But during all the GDPR related discussions, management decided that we need to fine-tune our terms and allow users to disagree on being spammed informed about third party products, while still keeping their account to use the core services. Fair enough!

But there was a small implementation detail: We have one central OAuth2 server that also manages the so-called term-agreements (i.e. which user agreed to which terms during a given time frame). We have a different, slightly legacy system to manage newsletters and send them to users. So we now need to sync the detailed term-agreement info from the Auth system to the Newsletter system.0

Terms Agreement

We store the term-agreements in a table looking like this:

 CREATE TABLE usr_to_term (
    id          SERIAL,
    usr         UUID NOT NULL,
    term        UUID NOT NULL,
    created     TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
    valid_from  TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
    valid_until TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'infinity'
  );

usr and term are UUIDs referencing the user and the term, created is the timestamp the record was created. valid_from denotes the timestamp the user agreed to the term, while valid_until denotes the timestamp the user revoked the term. Note that valid_until defaults to infinity, so we can get all current agreed upon terms of a user using this query:

SELECT term FROM usr_to_term WHERE usr = '08866638-917c-477c-925e-83b8444f8965'
 AND now() BETWEEN valid_from AND valid_until;

When a user revokes a term, we just set valid_until to the current timestamp. If a user later decides to agree again to the term, we insert a new row. This way we have a very good data trail showing when a user agreed to which term, and for how long.

Storing term-agreements in Newsletter app

In our newsletter app we do not care about all that detail. We just need to know which terms a user currently has agreed to. Therefore, we added this table:

 CREATE TABLE term_agreement (
    usr      UUID PRIMARY KEY NOT NULL,
    terms    UUID[],
    modified TIMESTAMP WITH TIME ZONE
  );

usr again stores the UUID referencing the user. term is an array of UUIDs (notice the []!). Here we will store all currently agreed terms in a list. modified contains the most current timestamp any of the terms have been modified.

We can now get all users who agreed to a given term using (for example) this query:

 SELECT usr FROM term_agreement WHERE '5ff173aa-247c-49ba-9531-673c215047db' = any (terms);

.. and of course use this query when figuring out which users we can "inform about third party products".

Syncing term-agreements to the Newsletter App

Now we need to actually sync the term-agreement data! Therefore we call an API endpoint in the Auth server that returns the current agreements (more on that endpoint in a bit..). But we do not want to transfer all agreements all the time, as this is not very performant. As we already store the agreements we only need the diff from the last import (i.e. new agreements and revoked terms). An easy way to do this (at least on the client side) is to get the last modified date, and tell the API to hand out all data touched since that time. If the importer dies, or cannot start due to some cron-misconfiguration, we won't have any problems, as we're basically using a self-healing algorithm:

 my $prev_last_mod='2010-01-01';
 while (1) {
     my $last_mod = $dbh->selectrow_array(
         "SELECT TO_CHAR(MAX(modified),'YYYY-MM-DD"T"HH24:MI:SS') FROM term_agreement"
     );
     if ( $prev_last_mod eq $last_mod ) {
         $log->warnf( "last_mod not changed %s vs %s, endless loop or done",
                      $last_mod, $prev_last_mod );
         exit;
     }
     $log->infof( "Fetching term agreements since %s", $last_mod );

     my $agrees = $client->get_active_agreements( $last_mod );
     if ( @$agrees == 0 ) {
         $log->infof( "No data, we're done" );
         exit;
     }
     foreach my $row ( @$agrees ) {
         $dbh->do(
             "INSERT INTO term_agreement (usr, modified, terms) VALUES (?, ?, ?)
              ON CONFLICT (usr) DO UPDATE SET modified=?,terms=?", undef,
              @$row, $row->[1], $row->[2] );
     }
     $prev_last_mod = $last_mod;
 }

Let's break this down a bit... We're running in an endless loop (while (1)), and first get the most recent modified date via:

 my $last_mod = $dbh->selectrow_array(
      "SELECT TO_CHAR(MAX(modified),'YYYY-MM-DD"T"HH24:MI:SS') FROM term_agreement"
  );

max makes sure we get the latest timestamp, and to_char formats it the way the API will need it.

if ( $prev_last_mod eq $last_mod ) {
     $log->warnf( "last_mod not changed %s vs %s, endless loop or done",
                  $last_mod, $prev_last_mod );
     exit;
 }

If $last_mod has not changed since the last run of the loop, we could have either fallen into an annoying corner case1 or reached the present time. In any case, we exit the syncing script.

my $agrees = $client->get_active_agreements( $last_mod );
     if ( @$agrees == 0 ) {
         $log->infof( "No data, we're done" );
         exit;
     }

client is an API client that abstracts away all the hassles of calling a JSON API. It will return the decoded payload as a Perl data structure, in this case an array. If the array is empty, there is no data to fetch, so stop.

Now we loop trough the data we got and store it in our local table.

foreach my $row ( @$agrees ) {
     $dbh->do(
         "INSERT INTO term_agreement (usr, modified, terms) VALUES (?, ?, ?)
          ON CONFLICT (usr) DO UPDATE SET modified=?,terms=?", undef,
          @$row, $row->[1], $row->[2] );
 }

As we can either get a totally new entry, or an update on an existing entry, we're using something called UPSERT (UPdate or inSERT, c.f. update_or_create in DBIx::Class). Upsert is a recent feature of Postgres (introduced in 9.5), and very handy in this case. The syntax is a bit strange though: After a regular INSERT we can define what should happen if there is a CONFLICT on a given field (usr in this case). And we define to do an UPDATE, setting modified and terms.

What I don't particularly like is that I have to pass some values twice (once in the @row array, and then again explicitly as $row->[1], $row->[2]. But well, a small price to pay for an otherwise awesome feature!

Update: Greg just told me (in an email) that we can use EXCLUDED so we don't have to pass the same values multiple times:

 $dbh->do(
    "INSERT INTO term_agreement (usr, modified, terms) VALUES (?, ?, ?)
     ON CONFLICT (usr) DO UPDATE
     SET modified = EXCLUDED.modified, terms=EXCLUDED.terms", undef,
   @$row);

Much nicer! Thanks Greg!

The last thing to do is to store the current $last_mod timestamp for the next loop iteration:

$prev_last_mod = $last_mod;

We can now start this script as often as we want, and it will always fetch all new terms, either from the beginning of the project time frame, or from whenever the script last ran. Nice!

The API

Now let's take a look at the other side, i.e. the API exporting the data.

The endpoint

Here's the boring endpoint, as it should be:

sub active_agreements_GET {
     my ( $self, $req ) = @_;
 
     $req->requires_jwt_role_terms;

     my $last_mod = $req->param('last_mod');
 
     my $data = $self->term_model->aggregate_active_agreements( $last_mod );
 
     return $req->json_response($data);
 }

First we check if the JWT grants the right to access the terms API (requires_jwt_role_terms). We get the last_mod param from the request. And then call aggregate_active_agreements to get the data, which we return as JSON.

You will notice that this follows the best practice for API/endpoint/controller code: unpack stuff from the request, call a Fat Model to do the actual work, and pack the data into whatever format the API shall return.

The model

The actual, interesting stuff happens in the model, or - in fact - in the database:

sub aggregate_active_agreements {
     my ($self, $last_mod) = @_;
 
     my $sth = $self->dbh->prepare(q{
 WITH touched AS (
     SELECT  usr,
             MAX(GREATEST(valid_from, CASE valid_until WHEN 'infinity'
                 THEN '1970-01-01' ELSE valid_until END)) as last_mod
     FROM    usr_to_term
     WHERE   valid_from >= ?
         OR  (valid_until != 'infinity' AND valid_until >= ?)
     GROUP BY 1
 ),
 active_terms AS (
     SELECT  touched.usr, term
     FROM    usr_to_term, touched
     WHERE   touched.usr = usr_to_term.usr
         AND now() BETWEEN valid_from AND valid_until
 )
 SELECT  touched.usr,
         MAX(touched.last_mod) AS last_mod,
         ARRAY_AGG(term ORDER BY term) FILTER (WHERE term IS NOT NULL)
 FROM    touched
     LEFT JOIN active_terms
         ON touched.usr = active_terms.usr
 GROUP BY 1
 ORDER BY last_mod
 LIMIT 100
     });
 
     $sth->execute($last_mod, $last_mod);
     my @data;
     while (my @r = $sth->fetchrow_array) {
         push(@data,\@r);
     }
     return \@data;
 }

Let's skip the SQL for a bit, and look at the Perl code:

sub aggregate_active_agreements {
     my ($self, $last_mod) = @_;
 
     my $sth = $self->dbh->prepare($SQL);
 
     $sth->execute($last_mod, $last_mod);
     my @data;
     while (my @r = $sth->fetchrow_array) {
         push(@data,\@r);
     }
     return \@data;
 }

As you can see, this is very plain and un-sexy Perl: Prepare a DBI $sth, execute it using the $last_mod timestamp, fetch the rows and return them.2

Greg again pointed out a small annoyance / code smell: $sth->execute($last_mod, $last_mod); needs to pass $last_mod twice. A better solution is to switch from positional DBI placeholders to so-called dollar sign numbers (a DBD::Pg feature). The SQL now looks like this:

    WHERE   valid_from >= $1
         OR  (valid_until != 'infinity' AND valid_until >= $1)

Just make sure you use single quotes when defining the SQL string! The calling code can now be simplified to $sth->execute($last_mod); Which is more DRY and thus a definite improvement. Thanks again, Greg!

Anyway, let's look at some SQL

What we have here are two common table expressions (CTEs) followed by a SELECT using a LEFT JOIN on one of the CTEs. Let's start with the first CTE:

WITH touched AS (
     SELECT  usr,
             MAX(GREATEST(
                 valid_from,
                 CASE valid_until
                   WHEN 'infinity'
                     THEN '1970-01-01'
                     ELSE valid_until
                 END
             )) as last_mod
     FROM    usr_to_term
     WHERE   valid_from >= ?
         OR  (valid_until != 'infinity' AND valid_until >= ?)
     GROUP BY 1
 ),

Here we want to get all users who have touched their term agreements by either agreeing to a new one, or by revoking a term. Remember that we set valid_until to infinity as long as a user agrees. So we want users where either valid_from is bigger (i.e. happened after) our last_mod timestamp, or users that have a valid_until that is not set to infinity but to a date after the last_mod timestamp. Or, in SQL:

WHERE   valid_from >= ?
    OR  (valid_until != 'infinity' AND valid_until >= ?)

One problem with infinity is that it's rather big. So to get the last_mod timestamp of a given row, we have to do a bit of extra work:

MAX(GREATEST(
     valid_from,
     CASE valid_until
       WHEN 'infinity'
         THEN '1970-01-01'
         ELSE valid_until
     END
 )) as last_mod

GREATEST returns the biggest of the passed values. To work around the infinity-problem (which will always win GREATEST), we return a rather old timestamp if valid_until is in fact set to infinity, or whatever valid_until is set to if it's not infinity (i.e. if the user revoked this term).

Together with MAX and GROUP BY 1 this query returns a list of all users that have either agreed to or revoked a term after a given timestamp, and the timestamp at which they their action happened:

            usr                       |           last_mod            
--------------------------------------+-------------------------------
 10ab58dc-8daf-45a2-8cc5-36785941c5c5 | 2018-05-27 11:00:19.469262+00
 a2b3dc39-2379-4150-bdf6-fafd580597c4 | 2018-05-27 11:03:30.874196+00
 34226bf6-0bd0-4130-bb86-a81171407a1c | 2018-05-27 11:04:55.930807+00
 ...

But what terms has the user agreed to? Or has the user revoked a term? The next CTE answers that:

active_terms AS (
     SELECT  touched.usr, term
     FROM    usr_to_term, touched
     WHERE   touched.usr = usr_to_term.usr
         AND now() BETWEEN valid_from AND valid_until
 )

This is some very simple SQL that just selects all the currently agreed terms of the touched users using now() BETWEEN valid_from AND valid_until. It does not, though, include users who have revoked a term.

                      usr             |                 term                 
--------------------------------------+--------------------------------------
 10ab58dc-8daf-45a2-8cc5-36785941c5c5 | 8dabed54-1de3-443b-8348-10e9add2dc1b
 10ab58dc-8daf-45a2-8cc5-36785941c5c5 | 106f67fb-19de-4cc3-96ed-3cde35af4e76
 10ab58dc-8daf-45a2-8cc5-36785941c5c5 | e3310525-a462-4933-bc17-0bb77fdb774d
 10ab58dc-8daf-45a2-8cc5-36785941c5c5 | 5227d0ff-513a-4097-a00a-90228ecca232
 a2b3dc39-2379-4150-bdf6-fafd580597c4 | 8dabed54-1de3-443b-8348-10e9add2dc1b
 a2b3dc39-2379-4150-bdf6-fafd580597c4 | 5227d0ff-513a-4097-a00a-90228ecca232

So now let's stitch it all together:

 SELECT  touched.usr,
         MAX(touched.last_mod) AS last_mod,
         ARRAY_AGG(term ORDER BY term) FILTER (WHERE term IS NOT NULL)
 FROM    touched
     LEFT JOIN active_terms
         ON touched.usr = active_terms.usr
 GROUP BY 1
 ORDER BY last_mod
 LIMIT 100

We select the usr and MAX(last_mod), plus all the agreed terms as an array using ARRAY_AGG. A nice feature I learned while working on this query is that you can FILTER the data aggregated by ARRAY_AGG, which we're using here to make sure we return an empty array instead of an array containing NULL. This is more or less cosmetically, but it did make it easier to import the data without having to weed out NULLs (or @undef@s in Perl space).

The LEFT JOIN makes sure we get both the users that have agreed to a term (via active_terms) and the users who currently do not have an active term (and thus revoked their terms) and are therefore not present in active_terms.

ORDER_BY and LIMIT make sure that we get the right amount of data in the correct order, so that the client consuming this data can be implemented as shown above.

The final result looks like this, which is exactly3 what we need in the Newsletter app:

            realm_account             |           last_mod            |    array_agg
--------------------------------------+-------------------------------+---------------------
 10ab58dc-8daf-45a2-8cc5-36785941c5c5 | 2018-05-27 11:00:19.469262+00 | {8dabed54-1de3-443b-8348-10e9add2dc1b,106f67fb-19de-4cc3-96ed-3cde35af4e76,e3310525-a462-4933-bc17-0bb77fdb774d,5227d0ff-513a-4097-a00a-90228ecca232}
 a2b3dc39-2379-4150-bdf6-fafd580597c4 | 2018-05-27 11:03:30.874196+00 | {8dabed54-1de3-443b-8348-10e9add2dc1b,5227d0ff-513a-4097-a00a-90228ecca232}
 34226bf6-0bd0-4130-bb86-a81171407a1c | 2018-05-27 11:04:55.930807+00 | 

Summary

I hope this once again shows the power of raw SQL, which combined with a bit of Perl makes for really nice, clean and performant code that's easy to use!

Thanks

  • to koki for suggesting the (client) architecture.
  • to the European Parliament and Council of the European Union for coming up with the GDPR.
  • any readers pointing out SQL errors or potential improvements.
  • Postgres and Perl for being fun and interesting tools to work with.
  • Greg for showing me EXCLUDED, reminding me of $1 and some more "bugfixes"

Updates

  • 2018-05-28, 23:00: Mention EXCLUDES in the UPSERT section (pointed out by Greg)
  • 2018-05-28, 23:00: Use $1 to simplify calling the big query (again pointed out be Greg)

Footnotes

0 The more obvious solution to have the newsletter system ask Auth via an API call if a given user has agreed does not scale too well when you need to send 100.000 mails in a reasonable time frame.

1 The API only returns n entries per call. If more than n entries would have happened in the same second, we would get the same set back forever. Luckily, I know the data and can verify that this does not happen (at least not now, but maybe we'll get a lot more users per day in the future..)

2 This could probably be wrapped into a call to selectall_arrayref, but I always have to loop up the exact syntax, and maybe will have to do some per-row data munging, so I prefer to just walk the resultset myself.

3 Not per accident, but by design!