To show off a recent command line tool for sketching, dsrs, let’s plot the rolling 28-day average daily count of active reviewers on Amazon.

The raw data here is item,user,rating,timestamp so this would map to a sophisticated GROUP BY with a COUNT DISTINCT over 28-day windows in SQL. But since the data’s only available as CSV, how can we get to the same answer? If we’re just interested in an approximate solution, can we do this without using a bunch of memory or custom (shuffle-inducing…) sliding window implementation?

All timings below done on a 16-physical CPU machine (AWS r4.8xlarge).

# 6.7gb
# May 1996 - Oct 2018, e.g.:
# 0449819906,A3U4E9PIZ8OWH1,5.0,1383696000
# timestamp is then unix time in seconds.
prefix = ''
review_data = {
  'Amazon Fashion': 'AMAZON_FASHION.csv',
  'All Beauty': 'All_Beauty.csv',
  'Appliances': 'Appliances.csv',
  'Arts, Crafts and Sewing': 'Arts_Crafts_and_Sewing.csv',
  'Automotive': 'Automotive.csv',
  'Books': 'Books.csv',
  'CDs and Vinyl': 'CDs_and_Vinyl.csv',
  'Cell Phones and Accessories': 'Cell_Phones_and_Accessories.csv',
  'Clothing, Shoes and Jewelry': 'Clothing_Shoes_and_Jewelry.csv',
  'Digital Music': 'Digital_Music.csv',
  'Electronics': 'Electronics.csv',
  'Gift Cards': 'Gift_Cards.csv',
  'Grocery and Gourmet Food': 'Grocery_and_Gourmet_Food.csv',
  'Home and Kitchen': 'Home_and_Kitchen.csv',
  'Industrial and Scientific': 'Industrial_and_Scientific.csv',
  'Kindle Store': 'Kindle_Store.csv',
  'Luxury Beauty': 'Luxury_Beauty.csv',
  'Magazine Subscriptions': 'Magazine_Subscriptions.csv',
  'Movies and TV': 'Movies_and_TV.csv',
  'Musical Instruments': 'Musical_Instruments.csv',
  'Office Products': 'Office_Products.csv',
  'Patio, Lawn and Garden': 'Patio_Lawn_and_Garden.csv',
  'Pet Supplies': 'Pet_Supplies.csv',
  'Prime Pantry': 'Prime_Pantry.csv',
  'Software': 'Software.csv',
  'Sports and Outdoors': 'Sports_and_Outdoors.csv',
  'Tools and Home Improvement': 'Tools_and_Home_Improvement.csv',
  'Toys and Games': 'Toys_and_Games.csv',
  'Video Games': 'Video_Games.csv'
review_data = {k: prefix + v for k, v in review_data.items()}

Even with a 28d sliding window, if we’re sliding by a day, it’s still quite a few data points.

import pandas as pd
(pd.Timestamp('Oct 2018') - pd.Timestamp('May 1996')) / pd.Timedelta('1d')

Store all urls in a variable

from shlex import quote
urls = ' '.join(list(map(quote, review_data.values())))
%%bash -s {urls}

echo 'will cite' | parallel --citation 1> /dev/null 2> /dev/null 

parallel curl -o "/tmp/amazon{#}.csv" -s {} ::: "$@"

# Total data size
du -hsc /tmp/amazon*.csv | tail -1

# How many reviews?
parallel --pipepart wc -l :::: /tmp/amazon*.csv \
  | awk '{s+=$1}END{print s}'
9.0G	total

# How many users?
parallel --pipepart 'cut -d, -f2 | dsrs --raw' :::: /tmp/amazon*.csv \
  | dsrs --merge
%%writefile /tmp/date-user-extract.awk

    FS = "," 

1 {
    user = $2;
    epoch_sec = $4;
    # round down to nearest day
    rounded_epoch_sec = strftime("%Y %m %d 00 00 00", epoch_sec);
    rounded_epoch_sec = mktime(rounded_epoch_sec)
    for (i = 0; i < 28; i += 1) {
        dt = strftime("%F", rounded_epoch_sec);
        print dt " " user
        # a day can be more than this many seconds due to leaps but
        # since we only decrement 28 times the undershoot doesn't matter
        rounded_epoch_sec -= 86400
Overwriting /tmp/date-user-extract.awk

# test date mapper
echo 0449819906,A3U4E9PIZ8OWH1,5.0,1383696000 | awk -f /tmp/date-user-extract.awk | head -3
2013-11-06 A3U4E9PIZ8OWH1
2013-11-05 A3U4E9PIZ8OWH1
2013-11-04 A3U4E9PIZ8OWH1
# How many 28d users?
parallel --pipepart 'awk -f /tmp/date-user-extract.awk' :::: /tmp/amazon*.csv \
  | dsrs --key >/tmp/ts
t = pd.read_csv('/tmp/ts', delimiter=' ', names=["date", "cnt"])
t.set_index("date", inplace=True, verify_integrity=True)
1996-04-23 1
1996-04-24 1
1996-04-25 1
1996-04-26 1
1996-04-27 1
from matplotlib import pyplot as plt
%matplotlib inline
(t/28).plot(rot=45, legend=False)
plt.title("28-day rolling average amazon reviewers")



start=`date +%s`
parallel --pipepart 'cut -d, -f2' :::: /tmp/amazon*.csv \
  | awk '{a[$1]=1}END{print length(a)}'
end=`date +%s`
echo "How many users? awk time" $((end-start)) "sec"

start=`date +%s`
parallel --pipepart 'cut -d, -f2' :::: /tmp/amazon*.csv \
  | dsrs
end=`date +%s`
echo "How many users? serial sketching time" $((end-start)) "sec"

start=`date +%s`
parallel --pipepart 'cut -d, -f2 | dsrs --raw' :::: /tmp/amazon*.csv \
  | dsrs --merge
end=`date +%s`
echo "How many users? parallel sketching time" $((end-start)) "sec"
How many users? awk time 190 sec

How many users? serial sketching time 11 sec

How many users? parallel sketching time 4 sec

I tried comparing the sketch-based rolling average computation to an awk one:

parallel --pipepart 'awk -f /tmp/date-user-extract.awk' :::: /tmp/amazon*.csv \
  | awk '{a[ \\(1][ \\)2]=1}END{for(i in a)print i " " length(a[i])}' >/tmp/ts-awk

But this got OOM killed after 2700 seconds on a 240GB RAM machine. Perhaps the easiest non-sketch approach here would require ingesting the CSVs into postgres and just using a window function, but at this point we’re well over a few-line solution.

Try the notebook out yourself.