Welcome to riopg’s documentation!

riopg is a Python 3.6+ library for interfacing with PostgreSQL databases using the curio or trio libraries.

Getting Started

To install riopg, you can either install it from PyPI:

$ pipenv install riopg

Or for the latest development version:

$ pipenv install git+https://github.com/Fuyukai/riopg.git#egg=riopg

riopg depends on multio (to provide a compatability layer between curio and trio) and psycopg2 (to drive the connection to the database server).

Picking a Library

In order to use riopg, you must first pick an async library to use on the backend, using the multio module.

import multio
# enable curio for this thread
multio.init('curio')
# or, enable trio for this thread
multio.init('trio')

Note

The choice of library is per-thread; you must call multio.init in every thread you wish to use async code in.

Basic Usage

The usage of riopg is intentionally designed to be similar to the psycopg2 API. To create a connection, you can use Connection.open() like so:

# must be called from an async context
conn = await Connection.open("postgresql://127.0.0.1/postgres")

It is recommended you use this connection as an async context manager:

async with conn:
   ...

async with (await Connection.open(...)) as connection:
   ...

This will automatically close the connection when you are done with it.

The connection object is intentionally similar to a psycopg2 connection object. For example, to open a cursor and perform a query:

cur = await conn.cursor()
await cur.execute("SELECT 1;")
result = await cur.fetchone()

Like above, it is recommended to use async with with the cursor:

async with (await conn.cursor()) as cursor:
   ...

Most methods on a Connection or a Cursor are wrapped in an async wrapper; they will perform the task then automatically read/write to the socket as appropriate. No threads are used in the operation of a connection. For example, using Connection.commit() or Connection.rollback() works automatically.

Additionally, cursors also support the async iterator protocol; you can iterate over a cursor with async for:

await cur.execute("SELECT * FROM users;")
async for item in cur:
    ...

Connection Pooling

riopg also supports connection pooling with a Pool. To use, simply create a new instance of Pool with pool.create_pool(), like so:

pool = await create_pool("postgresql://127.0.0.1/postgres")
async with pool.acquire() as connection:
  ...

API Reference

class riopg.connection.Connection

Wraps a psycopg2.Connection object, making it work with an async library.

Do not construct this object manually; use Connection.open() or Pool.acquire().

await close()

Closes this connection.

await cursor(**kwargs)

Gets a new cursor object.

Return type:Cursor
Returns:A cursor.Cursor object attached to this connection.
classmethod await open(*args, **kwargs)

Opens a new connection.

Return type:Connection
class riopg.cursor.Cursor(connection, kwargs)

Wraps a psycopg2.cursor object.

await execute(sql, params=None)

Executes some SQL in this cursor.

Parameters:
  • sql (str) – The SQL to execute.
  • params (Union[Tuple[Any], Dict[str, Any], None]) – The parameters to pass to the SQL query.
Return type:

None

await fetchall()

Fetches all the rows from this cursor.

Return type:List[Sequence[Any]]
Returns:A list of tuples with the results of the current query.
await fetchmany(size=None)

Fetches many rows from this cursor.

Parameters:size (Optional[int]) – The number of rows to fetch.
Return type:List[Sequence[Any]]
Returns:A list of tuples with the results of the current query.
await fetchone()

Fetches one result from this cursor.

Return type:Sequence[Any]
Returns:A tuple with the results of the previous query.
await open()

Opens this cursor.

This is usually called automatically by Connection.open().

await scroll(value, mode='relative')

Scrolls this cursor.

Parameters:
  • value (int) – The number of rows to scroll.
  • mode (str) – The scroll mode to perform.
await riopg.pool.create_pool(dsn, pool_size=12, *, connection_factory=None)

Creates a new Pool.

Parameters:
  • dsn (str) – The DSN to connect to the database with.
  • pool_size (int) – The number of connections to hold at any time.
  • connection_factory (Optional[Callable[[], Connection]]) – The pool factory callable to use to
Return type:

Pool

Returns:

A new Pool.

class riopg.pool.Pool(dsn, pool_size=12, *, connection_factory=None)

Represents a pool of connections.

acquire()

Acquires a connection from the pool. This returns an object that can be used with async with to automatically release it when done.

Return type:_PoolConnectionAcquirer
await close()

Closes this pool.

await release(conn)

Releases a connection.

Parameters:conn (Connection) – The Connection to release back to the connection pool.