pycassa.pool – Connection Pooling

Connection pooling for Cassandra connections.

class pycassa.pool.ConnectionPool(keyspace, server_list=['localhost:9160'], credentials=None, timeout=0.5, use_threadlocal=True, pool_size=5, prefill=True, socket_factory=<function default_socket_factory at 0x1cd7668>, transport_factory=<function default_transport_factory at 0x1d41758>, **kwargs)

A pool that maintains a queue of open connections.

All connections in the pool will be opened to keyspace.

server_list is a sequence of servers in the form "host:port" that the pool will connect to. The port defaults to 9160 if excluded. The list will be randomly shuffled before being drawn from sequentially. server_list may also be a function that returns the sequence of servers.

If authentication or authorization is required, credentials must be supplied. This should be a dictionary containing ‘username’ and ‘password’ keys with appropriate string values.

timeout specifies in seconds how long individual connections will block before timing out. If set to None, connections will never timeout.

If use_threadlocal is set to True, repeated calls to get() within the same application thread will return the same ConnectionWrapper object if one is already checked out from the pool. Be careful when setting use_threadlocal to False in a multithreaded application, especially with retries enabled. Synchronization may be required to prevent the connection from changing while another thread is using it.

The pool will keep up pool_size open connections in the pool at any time. When a connection is returned to the pool, the connection will be discarded is the pool already contains pool_size connections. The total number of simultaneous connections the pool will allow is pool_size + max_overflow, and the number of “sleeping” connections the pool will allow is pool_size.

A good choice for pool_size is a multiple of the number of servers passed to the Pool constructor. If a size less than this is chosen, the last (len(server_list) - pool_size) servers may not be used until either overflow occurs, a connection is recycled, or a connection fails. Similarly, if a multiple of len(server_list) is not chosen, those same servers would have a decreased load. By default, overflow is disabled.

If prefill is set to True, pool_size connections will be opened when the pool is created.

Example Usage:

>>> pool = pycassa.ConnectionPool(keyspace='Keyspace1', server_list=['10.0.0.4:9160', '10.0.0.5:9160'], prefill=False)
>>> cf = pycassa.ColumnFamily(pool, 'Standard1')
>>> cf.insert('key', {'col': 'val'})
1287785685530679
max_overflow

Whether or not a new connection may be opened when the pool is empty is controlled by max_overflow. This specifies how many additional connections may be opened after the pool has reached pool_size; keep in mind that these extra connections will be discarded upon checkin until the pool is below pool_size. This may be set to -1 to indicate no overflow limit. The default value is 0, which does not allow for overflow.

pool_timeout = 30

If pool_size + max_overflow connections have already been checked out, an attempt to retrieve a new connection from the pool will wait up to pool_timeout seconds for a connection to be returned to the pool before giving up. Note that this setting is only meaningful when you are accessing the pool concurrently, such as with multiple threads. This may be set to 0 to fail immediately or -1 to wait forever. The default value is 30.

recycle = 10000

After performing recycle number of operations, connections will be replaced when checked back in to the pool. This may be set to -1 to disable connection recycling. The default value is 10,000.

max_retries = 5

When an operation on a connection fails due to an TimedOutException or UnavailableException, which tend to indicate single or multiple node failure, the operation will be retried on different nodes up to max_retries times before an MaximumRetryException is raised. Setting this to 0 disables retries and setting to -1 allows unlimited retries. The default value is 5.

logging_name = None

By default, each pool identifies itself in the logs using id(self). If multiple pools are in use for different purposes, setting logging_name will help individual pools to be identified in the logs.

get()

Gets a connection from the pool.

put(conn)

Returns a connection to the pool.

execute(f, *args, **kwargs)

Get a connection from the pool, execute f on it with *args and **kwargs, return the connection to the pool, and return the result of f.

fill()

Adds connections to the pool until at least pool_size connections exist, whether they are currently checked out from the pool or not.

New in version 1.2.0.

dispose()

Closes all checked in connections in the pool.

set_server_list(server_list)

Sets the server list that the pool will make connections to.

server_list should be sequence of servers in the form "host:port" that the pool will connect to. The list will be randomly permuted before being used. server_list may also be a function that returns the sequence of servers.

size()

Returns the capacity of the pool.

overflow()

Returns the number of overflow connections that are currently open.

checkedin()

Returns the number of connections currently in the pool.

checkedout()

Returns the number of connections currently checked out from the pool.

add_listener(listener)

Add a PoolListener-like object to this pool.

listener may be an object that implements some or all of PoolListener, or a dictionary of callables containing implementations of some or all of the named methods in PoolListener.

exception pycassa.pool.AllServersUnavailable

Raised when none of the servers given to a pool can be connected to.

exception pycassa.pool.NoConnectionAvailable

Raised when there are no connections left in a pool.

exception pycassa.pool.MaximumRetryException

Raised when a ConnectionWrapper has retried the maximum allowed times before being returned to the pool; note that all of the retries do not have to be on the same operation.

exception pycassa.pool.InvalidRequestError

Pycassa was asked to do something it can’t do.

This error generally corresponds to runtime state errors.

class pycassa.pool.ConnectionWrapper(pool, max_retries, *args, **kwargs)

Creates a wrapper for a Connection object, adding pooling related functionality while still allowing access to the thrift API calls.

These should not be created directly, only obtained through Pool’s get() method.

get_keyspace_description(keyspace=None, use_dict_for_col_metadata=False)

Describes the given keyspace.

If use_dict_for_col_metadata is True, the column metadata will be stored as a dictionary instead of a list

A dictionary of the form {column_family_name: CfDef} is returned.

return_to_pool()

Returns this to the pool.

This has the same effect as calling ConnectionPool.put() on the wrapper.

class pycassa.pool.PoolListener

Hooks into the lifecycle of connections in a ConnectionPool.

Usage:

class MyListener(PoolListener):
    def connection_created(self, dic):
        '''perform connect operations'''
    # etc.

# create a new pool with a listener
p = ConnectionPool(..., listeners=[MyListener()])

# or add a listener after the fact
p.add_listener(MyListener())

Listeners receive a dictionary that contains event information and is indexed by a string describing that piece of info. For example, all event dictionaries include ‘level’, so dic[‘level’] will return the prescribed logging level.

There is no need to subclass PoolListener to handle events. Any class that implements one or more of these methods can be used as a pool listener. The ConnectionPool will inspect the methods provided by a listener object and add the listener to one or more internal event queues based on its capabilities. In terms of efficiency and function call overhead, you’re much better off only providing implementations for the hooks you’ll be using.

Each of the PoolListener methods wil be called with a dict as the single parameter. This dict may contain the following fields:

  • connection: The ConnectionWrapper object that persistently manages the connection
  • message: The reason this event happened
  • error: The Exception that caused this event
  • pool_id: The id of the ConnectionPool that this event came from
  • level: The prescribed logging level for this event. Can be ‘debug’, ‘info’, ‘warn’, ‘error’, or ‘critical’

Entries in the dict that are specific to only one event type are detailed with each method.

connection_checked_in(dic)

Called when a connection returns to the pool.

Fields: pool_id, level, and connection.

connection_checked_out(dic)

Called when a connection is retrieved from the Pool.

Fields: pool_id, level, and connection.

connection_created(dic)

Called once for each new Cassandra connection.

Fields: pool_id, level, and connection.

connection_disposed(dic)

Called when a connection is closed.

dic['message']: A reason for closing the connection, if any.

Fields: pool_id, level, connection, and message.

connection_failed(dic)

Called when a connection to a single server fails.

dic['server']: The server the connection was made to.

Fields: pool_id, level, error, server, and connection.

connection_recycled(dic)

Called when a connection is recycled.

dic['old_conn']: The ConnectionWrapper that is being recycled

dic['new_conn']: The ConnectionWrapper that is replacing it

Fields: pool_id, level, old_conn, and new_conn.

pool_at_max(dic)

Called when an attempt is made to get a new connection from the pool, but the pool is already at its max size.

dic['pool_max']: The max number of connections the pool will keep open at one time.

Fields: pool_id, pool_max, and level.

pool_disposed(dic)

Called when a pool is disposed.

Fields: pool_id, and level.

server_list_obtained(dic)

Called when the pool finalizes its server list.

dic['server_list']: The randomly permuted list of servers that the pool will choose from.

Fields: pool_id, level, and server_list.

Previous topic

pycassa - Exceptions and Enums

Next topic

pycassa.columnfamily – Column Family

This Page