To show off a recent command line tool for sketching, dsrs, let’s plot the rolling 28-day average daily count of active reviewers on Amazon.

The raw data here is item,user,rating,timestamp so this would map to a sophisticated GROUP BY with a COUNT DISTINCT over 28-day windows in SQL. But since the data’s only available as CSV, how can we get to the same answer? If we’re just interested in an approximate solution, can we do this without using a bunch of memory or custom (shuffle-inducing…) sliding window implementation?

All timings below done on a 16-physical CPU machine (AWS r4.8xlarge).

# https://nijianmo.github.io/amazon/index.html
# 6.7gb
# May 1996 - Oct 2018, e.g.:
# 0449819906,A3U4E9PIZ8OWH1,5.0,1383696000
# timestamp is then unix time in seconds.
prefix = 'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFilesSmall/'
review_data = {
  'Amazon Fashion': 'AMAZON_FASHION.csv',
  'All Beauty': 'All_Beauty.csv',
  'Appliances': 'Appliances.csv',
  'Arts, Crafts and Sewing': 'Arts_Crafts_and_Sewing.csv',
  'Automotive': 'Automotive.csv',
  'Books': 'Books.csv',
  'CDs and Vinyl': 'CDs_and_Vinyl.csv',
  'Cell Phones and Accessories': 'Cell_Phones_and_Accessories.csv',
  'Clothing, Shoes and Jewelry': 'Clothing_Shoes_and_Jewelry.csv',
  'Digital Music': 'Digital_Music.csv',
  'Electronics': 'Electronics.csv',
  'Gift Cards': 'Gift_Cards.csv',
  'Grocery and Gourmet Food': 'Grocery_and_Gourmet_Food.csv',
  'Home and Kitchen': 'Home_and_Kitchen.csv',
  'Industrial and Scientific': 'Industrial_and_Scientific.csv',
  'Kindle Store': 'Kindle_Store.csv',
  'Luxury Beauty': 'Luxury_Beauty.csv',
  'Magazine Subscriptions': 'Magazine_Subscriptions.csv',
  'Movies and TV': 'Movies_and_TV.csv',
  'Musical Instruments': 'Musical_Instruments.csv',
  'Office Products': 'Office_Products.csv',
  'Patio, Lawn and Garden': 'Patio_Lawn_and_Garden.csv',
  'Pet Supplies': 'Pet_Supplies.csv',
  'Prime Pantry': 'Prime_Pantry.csv',
  'Software': 'Software.csv',
  'Sports and Outdoors': 'Sports_and_Outdoors.csv',
  'Tools and Home Improvement': 'Tools_and_Home_Improvement.csv',
  'Toys and Games': 'Toys_and_Games.csv',
  'Video Games': 'Video_Games.csv'
}
review_data = {k: prefix + v for k, v in review_data.items()}

Even with a 28d sliding window, if we’re sliding by a day, it’s still quite a few data points.

import pandas as pd
(pd.Timestamp('Oct 2018') - pd.Timestamp('May 1996')) / pd.Timedelta('1d')
8188.0

Store all urls in a variable

from shlex import quote
urls = ' '.join(list(map(quote, review_data.values())))
%%bash -s {urls}

echo 'will cite' | parallel --citation 1> /dev/null 2> /dev/null 

parallel curl -o "/tmp/amazon{#}.csv" -s {} ::: "$@"
%%bash

# Total data size
du -hsc /tmp/amazon*.csv | tail -1

# How many reviews?
parallel --pipepart wc -l :::: /tmp/amazon*.csv \
  | awk '{s+=$1}END{print s}'
9.0G	total
230139802
%%bash

# How many users?
parallel --pipepart 'cut -d, -f2 | dsrs --raw' :::: /tmp/amazon*.csv \
  | dsrs --merge
43404924
%%writefile /tmp/date-user-extract.awk
#!/usr/bin/awk

BEGIN {
    FS = "," 
}

1 {
    user = $2;
    epoch_sec = $4;
    # round down to nearest day
    rounded_epoch_sec = strftime("%Y %m %d 00 00 00", epoch_sec);
    rounded_epoch_sec = mktime(rounded_epoch_sec)
    for (i = 0; i < 28; i += 1) {
        dt = strftime("%F", rounded_epoch_sec);
        print dt " " user
        # a day can be more than this many seconds due to leaps but
        # since we only decrement 28 times the undershoot doesn't matter
        rounded_epoch_sec -= 86400
    }
}
Overwriting /tmp/date-user-extract.awk
%%bash

# test date mapper
echo 0449819906,A3U4E9PIZ8OWH1,5.0,1383696000 | awk -f /tmp/date-user-extract.awk | head -3
2013-11-06 A3U4E9PIZ8OWH1
2013-11-05 A3U4E9PIZ8OWH1
2013-11-04 A3U4E9PIZ8OWH1
%%bash
  
# How many 28d users?
parallel --pipepart 'awk -f /tmp/date-user-extract.awk' :::: /tmp/amazon*.csv \
  | dsrs --key >/tmp/ts
t = pd.read_csv('/tmp/ts', delimiter=' ', names=["date", "cnt"])
t.set_index("date", inplace=True, verify_integrity=True)
t.sort_index(inplace=True)
t.head()
cnt
date
1996-04-23 1
1996-04-24 1
1996-04-25 1
1996-04-26 1
1996-04-27 1
from matplotlib import pyplot as plt
%matplotlib inline
(t/28).plot(rot=45, legend=False)
plt.xlabel("date")
plt.ylabel("users")
plt.title("28-day rolling average amazon reviewers")
plt.show()

png

%%bash

start=`date +%s`
parallel --pipepart 'cut -d, -f2' :::: /tmp/amazon*.csv \
  | awk '{a[$1]=1}END{print length(a)}'
end=`date +%s`
echo "How many users? awk time" $((end-start)) "sec"
echo

start=`date +%s`
parallel --pipepart 'cut -d, -f2' :::: /tmp/amazon*.csv \
  | dsrs
end=`date +%s`
echo "How many users? serial sketching time" $((end-start)) "sec"
echo

start=`date +%s`
parallel --pipepart 'cut -d, -f2 | dsrs --raw' :::: /tmp/amazon*.csv \
  | dsrs --merge
end=`date +%s`
echo "How many users? parallel sketching time" $((end-start)) "sec"
43249276
How many users? awk time 190 sec

43206238
How many users? serial sketching time 11 sec

43404924
How many users? parallel sketching time 4 sec

I tried comparing the sketch-based rolling average computation to an awk one:

parallel --pipepart 'awk -f /tmp/date-user-extract.awk' :::: /tmp/amazon*.csv \
  | awk '{a[ \\(1][ \\)2]=1}END{for(i in a)print i " " length(a[i])}' >/tmp/ts-awk

But this got OOM killed after 2700 seconds on a 240GB RAM machine. Perhaps the easiest non-sketch approach here would require ingesting the CSVs into postgres and just using a window function, but at this point we’re well over a few-line solution.

Try the notebook out yourself.