High-level API

New in version 0.7.1.

The high-level API is a wrapper over the mid-level connection API, combined with a connection pool.

Pooling

The high-level API provides a connection pool that automatically maintains a certain amount of idle connections to the PostgreSQL server. As only one query can be issued to a server at any one time, this provides an effective way of performinng multiple queries concurrently.

Warning

These connections are PERSISTENT connections. The pool does not reap connections for idle connections. If you use something ala pgbouncer, that automatically closes idle connections, and your application is relatively low activity, there will be a cascading failure as broken connections are checked out of the pool and aren’t reconnected until an obvious error happens upon trying to query on a disconnected connection.

Connecting

To create a connection pool, use open_pool().

pool.open_pool(username, *, connection_count=None, port=5432, password=None, database=None, ssl_context=None)

Opens a new connection pool to a PostgreSQL server. This is an asynchronous context manager.

This takes the same arguments and keyworrd arguments as open_database_connection(), except for the optional connection_count parameter.

Parameters:

connection_count (Optional[int]) – The ideal number of connections to keep open at any one time. The pool may shrink slightly as connections are closed due to network errors and aren’t immediately re-opened.

Return type:

AsyncGenerator[PooledDatabaseInterface, None]

By default, the connection count is (CPU_COUNT * 2) + 1.

Warning

Your connection count should be relatively low. The default is a very good idea for nearly all applications. Don’t change it unless you have the benchmarks to prove it’s a good idea. See: https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing

class pg_purepy.pool.PooledDatabaseInterface(count, nursery, conn_fn)

Bases: object

Connection pool based PostgreSQL interface.

property max_connections: int

The maximum number of connections this pool may have idle.

property idle_connections: int

The number of the connections that haven’t currently been checked out.

property waiting_tasks: int

The number of tasks that are currently waiting for a connection to be used.

Querying

The connection pool object has a similar high-level query API to the mid-level API.

async PooledDatabaseInterface.execute(query, *params, **kwargs)

Executes a query on the next available connection. See AsyncPostgresConnection.execute() for more information.

Return type:

int

async PooledDatabaseInterface.fetch(query, *params, **kwargs)

Fetches the result of a query on the next available connection. See AsyncPostgresConnection.fetch() for more information.

Return type:

list[DataRow]

async PooledDatabaseInterface.fetch_one(query, *params, **kwargs)

Like fetch(), but only returns one row. See AsyncPostgresConnection.fetch_one() for more information.

Return type:

DataRow

Transactions

As two subsequent queries may not be on the same connection, transactions get tricky. For that end, the pool has a special PooledDatabaseInterface.checkout_in_transaction() method which checks out a connection for exclusive usage in a transaction block.

PooledDatabaseInterface.checkout_in_transaction()

Checks out a new connection that automatically runs a transaction. This method MUST be used if you wish to execute something in a transaction.

Return type:

AsyncGenerator[AsyncPostgresConnection, None]

async with pool.checkout_in_transaction() as conn:
    await conn.fetch("insert into ...")

The transaction will be automatically committed or rolled back as appropriate at the end of the async with block, and the connection will not be reused until the checkout is done.

Converters

You can add converters like the other two APIs using PooledDatabaseInterface.add_converter(). This will add it to all open connections, as well as any future connections that may be opened.

PooledDatabaseInterface.add_converter(converter)

Registers a converter for all the connections on this pool.

Return type:

None

If you wish to automatically add the array converter for converting PostgreSQL arrays of a custom type that is converted, use PooledDatabaseInterface.add_converter_with_array().

async PooledDatabaseInterface.add_converter_with_array(converter, **kwargs)

Registers a converter, and adds the array type converter to it too.

Return type:

None

Cancellation

Application-level cancellation is supported automatically. If a query is cancelled via a cancel scope, a cancellation request will be issued to the server to avoid having to drain more events from the server if possible.

# timeout block will automatically cancel the query after a while
# and it will be returned to the pool for use (hopefully) immediately
with anyio.move_on_after(timeout):
    async for result in pool.fetch(really_long_query):
        await do_long_running_operation(result)

This also works automatically in transactions - the insertion will be cancelled and the transaction will be rolled back.