High-level API
New in version 0.7.1.
The high-level API is a wrapper over the mid-level connection API, combined with a connection pool.
Pooling
The high-level API provides a connection pool that automatically maintains a certain amount of idle connections to the PostgreSQL server. As only one query can be issued to a server at any one time, this provides an effective way of performinng multiple queries concurrently.
Warning
These connections are PERSISTENT connections. The pool does not reap connections for idle connections. If you use something ala pgbouncer, that automatically closes idle connections, and your application is relatively low activity, there will be a cascading failure as broken connections are checked out of the pool and aren’t reconnected until an obvious error happens upon trying to query on a disconnected connection.
Connecting
To create a connection pool, use open_pool()
.
- pool.open_pool(username, *, connection_count=None, port=5432, password=None, database=None, ssl_context=None)
Opens a new connection pool to a PostgreSQL server. This is an asynchronous context manager.
This takes the same arguments and keyworrd arguments as
open_database_connection()
, except for the optionalconnection_count
parameter.- Parameters:
connection_count (
Optional
[int
]) – The ideal number of connections to keep open at any one time. The pool may shrink slightly as connections are closed due to network errors and aren’t immediately re-opened.- Return type:
By default, the connection count is (CPU_COUNT * 2) + 1.
Warning
Your connection count should be relatively low. The default is a very good idea for nearly all applications. Don’t change it unless you have the benchmarks to prove it’s a good idea. See: https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing
Querying
The connection pool object has a similar high-level query API to the mid-level API.
- async PooledDatabaseInterface.execute(query, *params, **kwargs)
Executes a query on the next available connection. See
AsyncPostgresConnection.execute()
for more information.- Return type:
- async PooledDatabaseInterface.fetch(query, *params, **kwargs)
Fetches the result of a query on the next available connection. See
AsyncPostgresConnection.fetch()
for more information.
- async PooledDatabaseInterface.fetch_one(query, *params, **kwargs)
Like
fetch()
, but only returns one row. SeeAsyncPostgresConnection.fetch_one()
for more information.- Return type:
Transactions
As two subsequent queries may not be on the same connection, transactions get tricky. For that end,
the pool has a special PooledDatabaseInterface.checkout_in_transaction()
method which
checks out a connection for exclusive usage in a transaction block.
- PooledDatabaseInterface.checkout_in_transaction()
Checks out a new connection that automatically runs a transaction. This method MUST be used if you wish to execute something in a transaction.
- Return type:
async with pool.checkout_in_transaction() as conn:
await conn.fetch("insert into ...")
The transaction will be automatically committed or rolled back as appropriate at the end of the
async with
block, and the connection will not be reused until the checkout is done.
Converters
You can add converters like the other two APIs using PooledDatabaseInterface.add_converter()
.
This will add it to all open connections, as well as any future connections that may be opened.
- PooledDatabaseInterface.add_converter(converter)
Registers a converter for all the connections on this pool.
- Return type:
If you wish to automatically add the array converter for converting PostgreSQL arrays of a custom
type that is converted, use PooledDatabaseInterface.add_converter_with_array()
.
Cancellation
Application-level cancellation is supported automatically. If a query is cancelled via a cancel scope, a cancellation request will be issued to the server to avoid having to drain more events from the server if possible.
# timeout block will automatically cancel the query after a while
# and it will be returned to the pool for use (hopefully) immediately
with anyio.move_on_after(timeout):
async for result in pool.fetch(really_long_query):
await do_long_running_operation(result)
This also works automatically in transactions - the insertion will be cancelled and the transaction will be rolled back.