Asynchronous connection API

This API combines the client of the low-level API with the asynchronous capabilities of the anyio library to create an actual, networked client.

Warning

You probably don’t want to use this class directly. Instead, use the High-level API when possible. You should still read the documentation for this API, however, especially if you wish to use pooled transactions.

Connecting

In order to get anywhere, you need to actually connect to the server.

pg_purepy.connection.open_database_connection(address_or_path, username, *, port=5432, password=None, database=None, ssl_context=None)

Opens a new connection to the PostgreSQL database server. This is an asynchronous context manager.

async with open_database_connection("localhost", username="postgres") as db:
    ...

Required parameters:

Parameters:
  • address_or_path (str | PathLike[str]) – The address of the server or the absolute path of its Unix socket.

  • username (str) – The username to authenticate with.

Return type:

AsyncGenerator[AsyncPostgresConnection, None]

Optional parameters:

Parameters:
  • port (int) – The port to connect to. Ignored for unix sockets.

  • password (Optional[str]) – The password to authenticate with.

  • database (Optional[str]) – The database to connect to. Defaults to the username.

  • ssl_context (Optional[SSLContext]) – The SSL context to use for TLS connection. Enables TLS if specified.

class pg_purepy.connection.AsyncPostgresConnection(address_or_path, port, ssl_context, stream, state, block_transactions=False)

Bases: object

An asynchronous connection to a PostgreSQL server. This class should not be directly instantiated; instead, use open_database_connection().

property ready: bool

Returns if this connection is ready for another query.

property in_transaction: bool

Returns if this connection is currently in a transaction.

property dead: bool

Returns if this connection is dead or otherwise unusable.

property connection_parameters: Mapping[str, str]

Returns a read-only view of the current connection;

property server_timezone: tzinfo

Returns the timezone of the server.

Querying

There’s two ways to query a PostgreSQL database.

  • Eager queries, which load all data into memory at once.

  • Lazy queries, which lets the client apply backpressure by iterating over every row as it arrives.

There are high-level APIs for both eager and lazy queries, which wraps a low-level API that allows finer control of the actual messages arriving.

Warning

Querying to the server is protected by a lock, as only one query can be issued at once. Allowing multiple queries simultaneously would require complex tracking logic for incoming messages, and wouldn’t help anyway because the server only processes one query at a time.

Querying, Eagerly

Whilst pg-purepy doesn’t export a DBAPI 2.0 API as such, there are three high-level functions that resemble DBAPI. These three functions are likely the most useful functions when querying, but they are all eager functions and load the entire returned dataset into memory at once.

async AsyncPostgresConnection.fetch(query, *params, max_rows=None, **kwargs)

Eagerly fetches the result of a query. This returns a list of DataRow objects.

If you wish to lazily load the results of a query, use query() instead.

Parameters:
Return type:

list[DataRow]

async AsyncPostgresConnection.execute(query, *params, max_rows=None, **kwargs)

Executes a query, returning its row count. This will discard all data rows.

Parameters:
Return type:

int

async AsyncPostgresConnection.fetch_one(query, *params, **kwargs)

Like fetch(), but only fetches one row.

Raises:

MissingRowError – If there’s no row in the result.

Return type:

DataRow

For example, to insert some data, check how many rows were inserted, and verify it with a select:

async with open_database_connection(...) as conn:
    inserted = await conn.execute("insert into some_table(...) values (...);")
    print(f"Inserted {inserted} rows")
    row = await conn.fetch_one("select count(*) from some_table;")
    assert row.data[0] == inserted

Warning

Eager functions only support one query at a time, due to limitations in API design and the underlying protocol.

Querying, Lazily

If you have large data sets, or want to query lazily for other reasons, then query() can be used. This function is an asynchronous context manager, returning a QueryResult.

AsyncPostgresConnection.query(query, *params, max_rows=None, **kwargs)

Mid-level query API.

The query parameter can either be a string or a PreparedStatementInfo, as returned from AsyncPostgresConnection.create_prepared_statement(). If it is a string, and it has parameters, they must be provided as keyword arguments. If it is a pre-prepared statement, and it has parameters, they must be provided as positional arguments.

If keyword arguments are provided or a prepared statement is passed, an extended query with secure argument parsing will be used. Otherwise, a simple query will be used, which saves bandwidth over the extended query protocol.

If the server is currently in a failed transaction, then your query will be ignored. Make sure to issue a rollback beforehand, if needed.

This is an asynchronous context manager that yields a QueryResult, that can be asynchronously iterated over for the data rows of the query. Once all data rows have been iterated over, you can call row_count() to get the total row count.

If max_rows is specified, then the query will only return up to that many rows. Otherwise, an unlimited amount may potentially be returned.

Return type:

AsyncGenerator[QueryResult, None]

class pg_purepy.connection.QueryResult(iterator)

Bases: AsyncIterator[DataRow]

Wraps the execution of a query. This can be asynchronously iterated over in order to get incoming data rows.

async row_count()

Gets the row count for this query. :rtype: int

Warning

This will discard any remaining data rows in the currently executing query.

Example usage:

async with conn.query("select * from table") as query:
    async for row in query:
        print(row.data)

    print("Total rows:", await query.row_count())

Warning

Exiting from the asynchronous generator early will require the next query issued to keep reading the data rows of the previous query until the query returned. Use limits, or cursors, for particularly large queries.

Warning

The lazy function only support one query at a time, due to limitations in API design and the underlying protocol.

Paramaterised Queries

Parameterised queries are also supported, using either positional arguments or keyword arguments, in either eager loading mode or lazy loading mode. Positional argument parameters follow the PostgreSQL parameter syntax, where parameters are specified with $N where N is the index of the parameter. Keyword argument parameters follow the DBAPI colon-named syntax, where parameters are specified with :name where name is the keyword passed to the function.

Note

Internally, keyword argument parameters are converted into the positional format when creating the prepared statement. This means that only the positional format parameters are available when using explicitly created or loaded prepared statements.

selected = await conn.fetch("select * from some_table where column = :name;",
                            name=some_variable)
inserted = await conn.execute("insert into some_table(foo) values ($0, $1);",
                              x, y)

Low-level querying

If, for some reason, you need to access the messages returned during a query cycle, you can use the method lowlevel_query().

async AsyncPostgresConnection.lowlevel_query(query, *params, max_rows=None, **kwargs)

Performs a query to the server. This is an asynchronous generator; you must iterate over values in order to get the messages returned from the server.

Return type:

AsyncGenerator[QueryResultMessage, None]

This function yields out the raw PostgresMessage objects that are received from the protocol, as well as handling any error responses.

Warning

As this is a raw asynchronous generator, this must be wrapped in an aclosing() block. See https://github.com/python-trio/trio/issues/265.

async with aclosing(conn.query("select * from table")) as agen:
    async for message in agen:
        if isinstance(message, RowDescription):
            print(f"Got row description:", message)

        elif isinstance(message, DataRow):
            print(f"Got data row", message.data)

For most queries, this function will yield the following sequence of messages, in this order:

The last message will always be a CommandComplete instance.

Error handling

The underlying low-level client reports server-side errors as ErrorOrNoticeResponse instances, but the mid-level connection objects will turn these into proper exceptions in the query functions.

All exceptions raised from ErrorResponses inherit from BaseDatabaseError.

exception pg_purepy.messages.BaseDatabaseError(response, query=None)

Bases: PostgresqlError

An exception produceed from the database, usually from an ErrorOrNoticeResponse message. This does NOT include things such as protocol parsing errors.

However, you shouldn’t catch this exception as the client differentiates these into two subtypes - recoverable errors via RecoverableDatabaseError, and unrecoverable errors via UnrecoverableDatabaseError. A general rule is that you should only catch the recoverable variant.

exception pg_purepy.messages.RecoverableDatabaseError(response, query=None)

Bases: BaseDatabaseError

A subclass of BaseDatabaseError that the client may potentially recover from. Examples include query errors.

exception pg_purepy.messages.UnrecoverableDatabaseError(response, query=None)

Bases: BaseDatabaseError

A subclass of BaseDatabaseError that the client must not recover from. This usually implies internal errors in the server.

Transaction Helpers

The mid-level API does nothing with transactions by default, operating in autocommit mode. However, it does supply a transaction helper which will automatically commit at the end of the async with block, or rollback if an error happens.

AsyncPostgresConnection.with_transaction()

Asynchronous context manager that automatically opens and closes a transaction.

Return type:

AsyncGenerator[None, None]

Warning

This will NOT protect against different tasks from calling query functions inside your transaction. This would require overly complicated locking logic! Instead, wrap your acquisition of this inside a different lock, and guard all other transaction helpers with it.