{
“cells”: [
{

“cell_type”: “markdown”, “metadata”: {}, “source”: [

“# Advanced Topics: Top-K and Self Joins”

]

}, {

“cell_type”: “markdown”, “metadata”: {}, “source”: [

“## Setup”

]

}, {

“cell_type”: “code”, “execution_count”: null, “metadata”: {}, “outputs”: [], “source”: [

“import ibisn”, “import osn”, “hdfs_port = os.environ.get(‘IBIS_WEBHDFS_PORT’, 50070)n”, “hdfs = ibis.impala.hdfs_connect(host=’impala’, port=hdfs_port)n”, “con = ibis.impala.connect(host=’impala’, database=’ibis_testing’,n”, ” hdfs_client=hdfs)n”, “ibis.options.interactive = True”

]

}, {

“cell_type”: “markdown”, “metadata”: {}, “source”: [

“## "Top-K" Filteringn”, “n”, “n”, “A common analytical pattern involves subsetting based on some method of ranking. For example, "the 5 most frequently occurring widgets in a dataset". By choosing the right metric, you can obtain the most important or least important items from some dimension, for some definition of important.n”, “n”, “To carry out the pattern by hand involves the followingn”, “n”, “- Choose a ranking metricn”, “- Aggregate, computing the ranking metric, by the target dimensionn”, “- Order by the ranking metric and take the highest K valuesn”, “- Use those values as a set filter (either with semi_join or isin) in your next queryn”, “n”, “For example, let’s look at the TPC-H tables and find the 5 or 10 customers who placed the most orders over their lifetime:”

]

}, {

“cell_type”: “code”, “execution_count”: null, “metadata”: {}, “outputs”: [], “source”: [

“orders = con.table(‘tpch_orders’)n”, “n”, “top_orders = (ordersn”, ” .group_by(‘o_custkey’)n”, ” .size()n”, ” .sort_by((‘count’, False))n”, ” .limit(5))n”, “top_orders”

]

}, {

“cell_type”: “markdown”, “metadata”: {}, “source”: [

“Now, we could use these customer keys as a filter in some other analysis:”

]

}, {

“cell_type”: “code”, “execution_count”: null, “metadata”: {}, “outputs”: [], “source”: [

“# Among the top 5 most frequent customers, what’s the histogram of their order statuses?n”, “analysis = (orders[orders.o_custkey.isin(top_orders.o_custkey)]n”, ” .group_by(‘o_orderstatus’)n”, ” .size())n”, “analysis”

]

}, {

“cell_type”: “markdown”, “metadata”: {}, “source”: [

“This is such a common pattern that Ibis supports a high level primitive topk operation, which can be used immediately as a filter:”

]

}, {

“cell_type”: “code”, “execution_count”: null, “metadata”: {}, “outputs”: [], “source”: [

“top_orders = orders.o_custkey.topk(5)n”, “orders[top_orders].group_by(‘o_orderstatus’).size()”

]

}, {

“cell_type”: “markdown”, “metadata”: {}, “source”: [

“This goes a little further. Suppose now we want to rank customers by their total spending instead of the number of orders, perhaps a more meaningful metric:”

]

}, {

“cell_type”: “code”, “execution_count”: null, “metadata”: {}, “outputs”: [], “source”: [

“total_spend = orders.o_totalprice.sum().name(‘total’)n”, “top_spenders = (ordersn”, ” .group_by(‘o_custkey’)n”, ” .aggregate(total_spend)n”, ” .sort_by((‘total’, False))n”, ” .limit(5))n”, “top_spenders”

]

}, {

“cell_type”: “markdown”, “metadata”: {}, “source”: [

“To use another metric, just pass it to the by argument in topk:”

]

}, {

“cell_type”: “code”, “execution_count”: null, “metadata”: {}, “outputs”: [], “source”: [

“top_spenders = orders.o_custkey.topk(5, by=total_spend)n”, “orders[top_spenders].group_by(‘o_orderstatus’).size()”

]

}, {

“cell_type”: “markdown”, “metadata”: {}, “source”: [

“## Self joinsn”, “n”, “n”, “If you’re a relational data guru, you may have wondered how it’s possible to join tables with themselves, because joins clauses involve column references back to the original table.n”, “n”, “Consider the SQLn”, “n”, “`sql\n", "    SELECT t1.key, sum(t1.value - t2.value) AS metric\n", "    FROM my_table t1\n", "      JOIN my_table t2\n", "        ON t1.key = t2.subkey\n", "    GROUP BY 1\n", "`n”, ” n”, “Here, we have an unambiguous way to refer to each of the tables through aliasing.”

]

}, {

“cell_type”: “markdown”, “metadata”: {}, “source”: [

“Let’s consider the TPC-H database, and support we want to compute year-over-year change in total order amounts by region using joins.”

]

}, {

“cell_type”: “code”, “execution_count”: null, “metadata”: {}, “outputs”: [], “source”: [

“region = con.table(‘tpch_region’)n”, “nation = con.table(‘tpch_nation’)n”, “customer = con.table(‘tpch_customer’)n”, “orders = con.table(‘tpch_orders’)n”, “n”, “orders.limit(5)”

]

}, {

“cell_type”: “markdown”, “metadata”: {}, “source”: [

“First, let’s join all the things and select the fields we care about:”

]

}, {

“cell_type”: “code”, “execution_count”: null, “metadata”: {}, “outputs”: [], “source”: [

“fields_of_interest = [region.r_name.name(‘region’), n”, ” nation.n_name.name(‘nation’),n”, ” orders.o_totalprice.name(‘amount’),n”, ” orders.o_orderdate.cast(‘timestamp’).name(‘odate’) # these are stringsn”, ” ]n”, “n”, “joined_all = (region.join(nation, region.r_regionkey == nation.n_regionkey)n”, ” .join(customer, customer.c_nationkey == nation.n_nationkey)n”, ” .join(orders, orders.o_custkey == customer.c_custkey)n”, ” [fields_of_interest])”

]

}, {

“cell_type”: “markdown”, “metadata”: {}, “source”: [

“Okay, great, let’s have a look:”

]

}, {

“cell_type”: “code”, “execution_count”: null, “metadata”: {}, “outputs”: [], “source”: [

“joined_all.limit(5)”

]

}, {

“cell_type”: “markdown”, “metadata”: {}, “source”: [

“Sweet, now let’s aggregate by year and region:”

]

}, {

“cell_type”: “code”, “execution_count”: null, “metadata”: {}, “outputs”: [], “source”: [

“year = joined_all.odate.year().name(‘year’)n”, “n”, “total = joined_all.amount.sum().cast(‘double’).name(‘total’)n”, “n”, “annual_amounts = (joined_alln”, ” .group_by([‘region’, year])n”, ” .aggregate(total))n”, “annual_amounts”

]

}, {

“cell_type”: “markdown”, “metadata”: {}, “source”: [

“Looking good so far. Now, we need to join this table on itself, by subtracting 1 from one of the year columns.n”, “n”, “We do this by creating a "joinable" view of a table that is considered a distinct object within Ibis. To do this, use the view function:”

]

}, {

“cell_type”: “code”, “execution_count”: null, “metadata”: {}, “outputs”: [], “source”: [

“current = annual_amountsn”, “prior = annual_amounts.view()n”, “n”, “yoy_change = (current.total - prior.total).name(‘yoy_change’)n”, “n”, “results = (current.join(prior, ((current.region == prior.region) & n”, ” (current.year == (prior.year - 1))))n”, ” [current.region, current.year, yoy_change])n”, “df = results.execute()”

]

}, {

“cell_type”: “code”, “execution_count”: null, “metadata”: {}, “outputs”: [], “source”: [

“df[‘yoy_pretty’] = df.yoy_change.map(lambda x: ‘$%.2fmm’ % (x / 1000000.))n”, “df”

]

}, {

“cell_type”: “markdown”, “metadata”: {}, “source”: [

“If you’re being fastidious and want to consider the first year occurring in the dataset for each region to have 0 for the prior year, you will instead need to do an outer join and treat nulls in the prior side of the join as zero:”

]

}, {

“cell_type”: “code”, “execution_count”: null, “metadata”: {}, “outputs”: [], “source”: [

“yoy_change = (current.total - prior.total.zeroifnull()).name(‘yoy_change’)n”, “results = (current.outer_join(prior, ((current.region == prior.region) & n”, ” (current.year == (prior.year - 1))))n”, ” [current.region, current.year, current.total,n”, ” prior.total.zeroifnull().name(‘prior_total’), n”, ” yoy_change])n”, “n”, “results.limit(10)”

]

}

], “metadata”: {

“kernelspec”: {

“display_name”: “Python 3”, “language”: “python”, “name”: “python3”

}, “language_info”: {

“codemirror_mode”: {

“name”: “ipython”, “version”: 3

}, “file_extension”: “.py”, “mimetype”: “text/x-python”, “name”: “python”, “nbconvert_exporter”: “python”, “pygments_lexer”: “ipython3”, “version”: “3.6.3”

}

}, “nbformat”: 4, “nbformat_minor”: 1

}