# IO, CREATE/INSERT, and External Data¶

## Setup¶

[1]:

import ibis
import os
hdfs_port = int(os.environ.get('IBIS_TEST_WEBHDFS_PORT', 50070))
user = os.environ.get('IBIS_TEST_WEBHDFS_USER', 'ubuntu')
hdfs = ibis.hdfs_connect(host='impala', user=user, port=hdfs_port)
con = ibis.impala.connect(host='impala', database='ibis_testing',
hdfs_client=hdfs)
ibis.options.interactive = True


## Creating new Impala tables from Ibis expressions¶

Suppose you have an Ibis expression that produces a table:

[2]:

table = con.table('functional_alltypes')

t2 = table[table, (table.bigint_col - table.int_col).name('foo')]

expr = (t2
[t2.bigint_col > 30]
.group_by('string_col')
.aggregate(min_foo=lambda t: t.foo.min(),
max_foo=lambda t: t.foo.max(),
sum_foo=lambda t: t.foo.sum()))
expr

[2]:

  string_col  max_foo  min_foo  sum_foo
0          9       81       81    59130
1          6       54       54    39420
2          4       36       36    26280
3          8       72       72    52560
4          7       63       63    45990
5          5       45       45    32850


To create a table in the database from the results of this expression, use the connection’s create_table method:

[3]:

con.create_table('testing_table', expr, database='ibis_testing')


By default, this creates a table stored as Parquet format in HDFS. Support for views, external tables, configurable file formats, and so forth, will come in the future. Feedback on what kind of interface would be useful for that would help.

[4]:

con.table('testing_table')

[4]:

  string_col  max_foo  min_foo  sum_foo
0          9       81       81    59130
1          6       54       54    39420
2          4       36       36    26280
3          8       72       72    52560
4          7       63       63    45990
5          5       45       45    32850


Tables can be similarly dropped with drop_table

[5]:

con.drop_table('testing_table', database='ibis_testing')


## Inserting data into existing Impala tables¶

The client’s insert method can append new data to an existing table or overwrite the data that is in there.

[6]:

con.create_table('testing_table', expr)
con.table('testing_table')

[6]:

  string_col  max_foo  min_foo  sum_foo
0          9       81       81    59130
1          6       54       54    39420
2          4       36       36    26280
3          8       72       72    52560
4          7       63       63    45990
5          5       45       45    32850

[7]:

con.insert('testing_table', expr)
con.table('testing_table')

[7]:

   string_col  max_foo  min_foo  sum_foo
0           9       81       81    59130
1           6       54       54    39420
2           4       36       36    26280
3           8       72       72    52560
4           7       63       63    45990
5           5       45       45    32850
6           9       81       81    59130
7           6       54       54    39420
8           4       36       36    26280
9           8       72       72    52560
10          7       63       63    45990
11          5       45       45    32850

[8]:

con.drop_table('testing_table')


If you’ve set up an HDFS connection, you can use the Ibis HDFS interface to look through your data and read and write files to and from HDFS:

[9]:

hdfs = con.hdfs
hdfs.ls('/__ibis/ibis-testing-data')

[9]:

['README.md',
'avro',
'awards_players.csv',
'batting.csv',
'csv',
'diamonds.csv',
'functional_alltypes.csv',
'functional_alltypes.parquet',
'geo.csv',
'ibis_testing.db',
'parquet',
'struct_table.avro',
'udf']

[10]:

hdfs.ls('/__ibis/ibis-testing-data/parquet')

[10]:

['functional_alltypes',
'tpch_customer',
'tpch_lineitem',
'tpch_nation',
'tpch_orders',
'tpch_part',
'tpch_partsupp',
'tpch_region',
'tpch_supplier']


Suppose we wanted to download /__ibis/ibis-testing-data/parquet/functional_alltypes, which is a directory. We need only do:

[11]:

!rm -rf parquet_dir/
hdfs.get('/__ibis/ibis-testing-data/parquet/functional_alltypes', 'parquet_dir')

[11]:

'/ibis/docs/source/tutorial/parquet_dir'


Now we have that directory locally:

[12]:

!ls parquet_dir/

9a41de519352ab07-4e76bc4d9fb5a789_1624886651_data.0.parq
9a41de519352ab07-4e76bc4d9fb5a78a_778826485_data.0.parq
9a41de519352ab07-4e76bc4d9fb5a78b_1277612014_data.0.parq


Files and directories can be written to HDFS just as easily using put:

[13]:

path = '/__ibis/dir-write-example'
if hdfs.exists(path):
hdfs.rmdir(path)
hdfs.put(path, 'parquet_dir', verbose=True)

[13]:

'/__ibis/dir-write-example'

[14]:

hdfs.ls('/__ibis/dir-write-example')

[14]:

['9a41de519352ab07-4e76bc4d9fb5a789_1624886651_data.0.parq',
'9a41de519352ab07-4e76bc4d9fb5a78a_778826485_data.0.parq',
'9a41de519352ab07-4e76bc4d9fb5a78b_1277612014_data.0.parq']


Delete files with rm or directories with rmdir:

[15]:

hdfs.rmdir('/__ibis/dir-write-example')

[16]:

!rm -rf parquet_dir/


## Queries on Parquet, Avro, and Delimited files in HDFS¶

Ibis can easily create temporary or persistent Impala tables that reference data in the following formats:

• Parquet (parquet_file)

• Avro (avro_file)

• Delimited text formats (CSV, TSV, etc.) (delimited_file)

Parquet is the easiest because the schema can be read from the data files:

[17]:

path = '/__ibis/ibis-testing-data/parquet/tpch_lineitem'

lineitem = con.parquet_file(path)
lineitem.limit(2)

[17]:

   l_orderkey  l_partkey  l_suppkey  l_linenumber l_quantity l_extendedprice  \
0           1     155190       7706             1      17.00        21168.23
1           1      67310       7311             2      36.00        45983.16

l_discount l_tax l_returnflag l_linestatus  l_shipdate l_commitdate  \
0       0.04  0.02            N            O  1996-03-13   1996-02-12
1       0.09  0.06            N            O  1996-04-12   1996-02-28

l_receiptdate     l_shipinstruct l_shipmode  \
0    1996-03-22  DELIVER IN PERSON      TRUCK
1    1996-04-20   TAKE BACK RETURN       MAIL

l_comment
0             egular courts above the
1  ly final dependencies: slyly bold

[18]:

lineitem.l_extendedprice.sum()

[18]:

Decimal('229577310901.20')


If you want to query a Parquet file and also create a table in Impala that remains after your session, you can pass more information to parquet_file:

[19]:

table = con.parquet_file(path, name='my_parquet_table',
database='ibis_testing',
persist=True)
table.l_extendedprice.sum()

[19]:

Decimal('229577310901.20')

[20]:

con.table('my_parquet_table').l_extendedprice.sum()

[20]:

Decimal('229577310901.20')

[21]:

con.drop_table('my_parquet_table')


To query delimited files, you need to write down an Ibis schema. At some point we’d like to build some helper tools that will infer the schema for you, all in good time.

There’s some CSV files in the test folder, so let’s use those:

[22]:

hdfs.get('/__ibis/ibis-testing-data/csv', 'csv-files')

[22]:

'/ibis/docs/source/tutorial/csv-files'

[23]:

!cat csv-files/0.csv

63IEbRheTh,0.679388707915,6
mG4hlqnjeG,2.80710565922,15
JTPdX9SZH5,-0.155126406372,55
2jcl6FypOl,1.03787834032,21
rP5J4xvinM,-0.442092712869,22
WniUylixYt,-0.863748033806,27
znsDuKOB1n,-0.566029637098,47
4SRP9jlo1M,0.331460412318,88
KsfjPyDf5e,-0.578930506363,70

[24]:

!rm -rf csv-files/


The schema here is pretty simple (see ibis.schema for more):

[25]:

schema = ibis.schema([('foo', 'string'),
('bar', 'double'),
('baz', 'int32')])

table = con.delimited_file('/__ibis/ibis-testing-data/csv',
schema)
table.limit(10)

[25]:

          foo       bar  baz
0  63IEbRheTh  0.679389    6
1  mG4hlqnjeG  2.807106   15
2  JTPdX9SZH5 -0.155126   55
3  2jcl6FypOl  1.037878   21
4  k3TbJLaadQ -1.401908   23
5  rP5J4xvinM -0.442093   22
6  WniUylixYt -0.863748   27
7  znsDuKOB1n -0.566030   47
8  4SRP9jlo1M  0.331460   88
9  KsfjPyDf5e -0.578931   70

[26]:

table.bar.summary()

[26]:

   count  nulls       min       max       sum    mean  approx_nunique
0    100      0 -1.401908  2.807106  8.479978  0.0848              10


For functions like parquet_file and delimited_file, an HDFS directory must be passed (we’ll add support for S3 and other filesystems later) and the directory must contain files all having the same schema.

If you have Avro data, you can query it too if you have the full avro schema:

[27]:

avro_schema = {
"fields": [
{"type": ["int", "null"], "name": "R_REGIONKEY"},
{"type": ["string", "null"], "name": "R_NAME"},
{"type": ["string", "null"], "name": "R_COMMENT"}],
"type": "record",
"name": "a"
}

path = '/__ibis/ibis-testing-data/avro/tpch.region'

hdfs.mkdir(path)
table = con.avro_file(path, avro_schema)
table

[27]:

Empty DataFrame
Columns: [r_regionkey, r_name, r_comment]
Index: []


## Other helper functions for interacting with the database¶

We’re adding a growing list of useful utility functions for interacting with an Impala cluster on the client object. The idea is that you should be able to do any database-admin-type work with Ibis and not have to switch over to the Impala SQL shell. Any ways we can make this more pleasant, please let us know.

Here’s some of the features, which we’ll give examples for:

• Listing and searching for available databases and tables

• Creating and dropping databases

• Getting table schemas

[28]:

con.list_databases(like='ibis*')

[28]:

['ibis_testing', 'ibis_testing_tmp_db']

[29]:

con.list_tables(database='ibis_testing', like='tpch*')

[29]:

['tpch_customer',
'tpch_lineitem',
'tpch_nation',
'tpch_orders',
'tpch_part',
'tpch_partsupp',
'tpch_region',
'tpch_region_avro',
'tpch_supplier']

[30]:

schema = con.get_schema('functional_alltypes')
schema

[30]:

ibis.Schema {
id               int32
bool_col         boolean
tinyint_col      int8
smallint_col     int16
int_col          int32
bigint_col       int64
float_col        float32
double_col       float64
date_string_col  string
string_col       string
timestamp_col    timestamp
year             int32
month            int32
}


Databases can be created, too, and you can set the storage path in HDFS you want for the data files

[31]:

db = 'ibis_testing2'
con.create_database(db, path='/__ibis/my-test-database', force=True)

# you may or may not have to give the impala user write and execute permissions to '/__ibis/my-test-database'
hdfs.chmod('/__ibis/my-test-database', '777')

[32]:

con.create_table('example_table', con.table('functional_alltypes'),
database=db, force=True)


Hopefully, there will be data files in the indicated spot in HDFS:

[33]:

hdfs.ls('/__ibis/my-test-database')

[33]:

['example_table']


To drop a database, including all tables in it, you can use drop_database with force=True:

[34]:

con.drop_database(db, force=True)


## Dealing with Partitioned tables in Impala¶

Placeholder: This is not yet implemented. If you have use cases, please let us know.

## Faster queries on small data in Impala¶

Since Impala internally uses LLVM to compile parts of queries (aka “codegen”) to make them faster on large data sets there is a certain amount of overhead with running many kinds of queries, even on small datasets. You can disable LLVM code generation when using Ibis, which may significantly speed up queries on smaller datasets:

[35]:

from numpy.random import rand

[36]:

con.disable_codegen()

[37]:

t = con.table('ibis_testing.functional_alltypes')

%timeit (t.double_col + rand()).sum().execute()

29.3 ms ± 2.23 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

[38]:

# Turn codegen back on
con.disable_codegen(False)

[39]:

%timeit (t.double_col + rand()).sum().execute()

26.3 ms ± 772 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


It’s important to remember that codegen is a fixed overhead and will significantly speed up queries on big data