# -*- coding: utf-8 -*-
#
# Copyright (c) 2015 Rodolphe QuiƩdeville <rodolphe@quiedeville.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""Read information in Postgresql system catalog
"""
from psycopg2.extensions import AsIs
[docs]class ColumnDoesNotExists(Exception):
"""A column does not exists
Raised when a column is specificaly requested as a parameter in a
function
"""
pass
[docs]class TableDoesNotExists(Exception):
"""A table does not exists
Raised when a table is specificaly requested as a parameter in a
function
"""
pass
[docs]class SchemaDoesNotExists(Exception):
"""A schema does not exists
Raised when a schema is specificaly requested as a parameter in a
function
"""
pass
[docs]class PygCatalog(object):
"""Python library to read PostgreSQL system catalog
"""
def __init__(self, conn=None, default_schemas=['public']):
self.conn = conn
self.tables = None
self.indexes = {}
self.lastquery = None
self.default_schemas = default_schemas
def _read_db(self, schema='public'):
if self.tables is None:
self.get_tables(schema=schema)
[docs] def set_default_schema(self, schema):
"""Define the default schema to work on
:param schema: The schema's name to work on
:type schema: string
:return: The result of the addition
:rtype: boolean
"""
if not isinstance(schema, str):
raise ValueError
self.default_schemas = [schema]
return self.default_schemas
[docs] def set_default_schemas(self, schemas):
"""Define as set of schemas to work on
Remove schemas set twice or more
"""
if not isinstance(schemas, list):
raise ValueError
def_schemas = []
for schema in list(set(schemas)):
def_schemas.append(self.set_default_schema(schema)[0])
self.default_schemas = def_schemas
[docs] def reset_cache(self):
"""Reset the cache
"""
self.tables = None
self.lastquery = None
[docs] def analyze(self, table=None):
"""Run an ANALYZE over the database or a table
"""
cur = self.conn.cursor()
if table:
qry = """ANALYZE %s"""
else:
qry = """ANALYZE"""
return cur.execute(qry, (AsIs(table), ))
[docs] def pgversion(self):
"""Run the version of PostgreSQL
"""
cur = self.conn.cursor()
qry = "SELECT version()"
cur.execute(qry)
return cur.fetchone()[0]
[docs] def get_tables(self, **kwargs):
"""Return tables list
You may specify a single schema to look in by specifying the
keyword argumeent `schema`
:Example:
>>> cat.get_tables(schema='public')
"""
qry = """
SELECT c.relname, c.reltuples::bigint, c.oid, n.nspname
FROM pg_class AS c
INNER JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE relkind = 'r'
AND n.nspname IN %s
"""
rows = self._execute_sql(qry, (self._which_schemas(**kwargs),))
self.tables = {}
for row in rows:
self.tables[row[0]] = {'tuple': row[1],
'oid': row[2],
'schema': row[3],
'columns': None}
return self.tables
[docs] def schemas(self):
"""Return schemas
Return the list of all schemas present in the database
:Example:
>>> cat.get_schemas()
['pg_toast', 'pg_temp_1', 'pg_toast_temp_1', 'pg_catalog',
'public', 'information_schema', 'alice']
:rtype: list
"""
qry = """
SELECT nspname
FROM pg_catalog.pg_namespace
"""
return [row[0] for row in self._execute_sql(qry)]
def _which_schemas(self, **kwargs):
if kwargs.get('schema'):
return (kwargs['schema'],)
else:
return tuple(self.default_schemas)
[docs] def table_tuples(self, table, **kwargs):
"""Return the table's number of tuples
"""
schemas = self._which_schemas(**kwargs)
qry = """
SELECT c.relname, c.reltuples::bigint, n.nspname
FROM pg_class AS c
INNER JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE relkind = 'r'
AND c.relname = %s
AND n.nspname IN %s
"""
rows = self._execute_sql(qry, (table, schemas, ))
row = rows[0]
if row is None:
raise TableDoesNotExists
return row
[docs] def biggest_tables(self, max=1, **kwargs):
"""Return the biggest table in term of total size
The size is compute all disk usage used by the table, it
includes datas, indexes and TOAST data.
:Example:
>>> cat.biggest_table()
('foo', 163840L, 1000L)
"""
schemas = self._which_schemas(**kwargs)
qry = """
SELECT c.relname::text, n.nspname::text,
pg_total_relation_size(c.relname::text),
c.reltuples::bigint
FROM pg_class AS c
INNER JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE c.relkind = 'r'
AND n.nspname IN %s
ORDER BY 2 DESC
LIMIT %s
"""
rows = self._execute_sql(qry, (schemas, max))
return rows
[docs] def biggest_table(self):
"""Return the biggest table in term of total size
The size is compute all disk usage used by the table, it
includes datas, indexes and TOAST data. Sizes are express in Bytes.
:Example:
>>> cat.biggest_table()
('foo', 163840L, 1000L)
"""
rows = self.biggest_tables()
return rows[0]
[docs] def get_table_columns(self, table, schema='public'):
"""Return all existing columns in a table
Won't return dropped columns
:rtype: list
"""
self._read_db(schema=schema)
if self.tables.get(table):
return self._get_columns(self.tables[table]['oid'])
else:
return None
[docs] def get_table_columns_extended(self, tablename, schema='public'):
"""Return all existing columns in a table
Won't return dropped columns
:rtype: list
"""
self._read_db(schema=schema)
if self.tables.get(tablename):
return self._get_columns_extended(tablename,
schema)
else:
raise TableDoesNotExists
def _get_columns(self, oid):
"""Return all existing columns in a table
Won't return dropped columns
"""
cur = self.conn.cursor()
qry = """
SELECT attname, attnum
FROM pg_catalog.pg_attribute AS a
WHERE attrelid = %s
AND attnum > 0
AND attisdropped = false
"""
cur.execute(qry, (oid, ))
columns = []
for row in cur.fetchall():
columns.append(row[0])
return columns
def _get_columns_extended(self, table_name, schema_name):
"""Return columns for a table
"""
cur = self.conn.cursor()
qry = """
SELECT column_name, data_type, is_nullable::text, column_default,
ordinal_position
FROM information_schema.columns
WHERE table_schema = %s
AND table_name = %s
ORDER BY ordinal_position
"""
cur.execute(qry, (schema_name, table_name))
columns = []
for row in cur.fetchall():
columns.append({'name': row[0],
'type': row[1],
'is_nullable': row[2],
'default_value': row[3],
'ordinal_position': row[4]})
return columns
def _execute_sql(self, qry, parms=None):
"""Execute a sql query
"""
cur = self.conn.cursor()
self.lastquery = cur.mogrify(qry, parms)
cur.execute(qry, parms)
return cur.fetchall()
[docs] def get_indexes(self, schema='public', **kwargs):
"""Return all indexes in a schema
Return all indexes defined in the schemas, each indexex is
associated with the table oid, it's own oid, the number of
tuples present in it and the name of the columns.
:Example:
>>> cat.get_indexes()
{'foo_name_idx': {'table_oid': 121090,
'oid': 121093,
'columns': None,
'tuple': 1000L},
'foo_name_ratio_idx': {'table_oid': 121090,
'oid': 121094,
'columns': None,
'tuple': 1000L}
}
:return: dict that contains all indexes
:rtype: dict
"""
qry = """
SELECT c.relname, c.reltuples::bigint, c.oid, i.indrelid,
i.indkey, i.indisunique, i.indclass, a.amname
FROM pg_catalog.pg_class AS c
INNER JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
INNER JOIN pg_catalog.pg_index i ON c.oid = i.indexrelid
INNER JOIN pg_catalog.pg_am a ON a.oid = c.relam
WHERE c.relkind = 'i'
AND n.nspname = %s
"""
params = [schema]
if kwargs.get('table'):
qry = qry + "AND i.indrelid = %s"
params.append(self._table_oid(kwargs.get('table'), schema))
qry = qry + " ORDER BY c.relname"
rows = self._execute_sql(qry, tuple(params))
indexes = {}
for row in rows:
indcols = row[4].split(' ')
cols = []
if kwargs.get('table'):
for col in self._get_columns_extended(kwargs.get('table'),
schema):
if str(col['ordinal_position']) in indcols:
cols.append(col['name'])
indexes[row[0]] = {'tuple': row[1],
'oid': row[2],
'columns': cols,
'table_oid': row[3],
'access_method': row[7],
'is_unique': row[5]}
return indexes
[docs] def get_operator_class(self, **kwargs):
"""Return information on oeprator class
http://www.postgresql.org/docs/current/static/catalog-pg-opclass.html
"""
qry = """
SELECT oid, opcname
FROM pg_catalog.pg_opclass
"""
params = []
if kwargs.get('oid'):
qry = qry + "WHERE oid = %s"
params.append(kwargs.get('oid'))
rows = self._execute_sql(qry, tuple(params))
operators = []
for row in rows:
operators.append({'oid': row[0],
'name': row[1]})
return operators
[docs] def get_triggers(self, tablename, **kwargs):
"""Return information on triggers
http://www.postgresql.org/docs/current/static/catalog-pg-trigger.html
:Example:
>>> cat.get_triggers('foobar')
[{'name': 'car_insert_trigger','event': 'INSERT'
'timing', 'BEFORE'},
{'name': 'car_update_trigger','event': 'UPDATE',
'timing': 'AFTER'}
]
:return: all triggers on a table
:rtype: array
"""
qry = """
SELECT trigger_name, event_manipulation, action_timing
FROM information_schema.triggers
WHERE event_object_table = %s
"""
rows = self._execute_sql(qry, (tablename,))
triggers = []
for row in rows:
triggers.append({'name': row[0],
'event': row[1],
'timing': row[2]})
return triggers
[docs] def is_column_indexed(self, column_name, table_name, schema='public'):
"""Check if a column is indexed
Check if the column is present in at least one index.
:param column_name: The column's name to look for
:param table_name: The table's name to look in
:type column_name: string
:type table_name: string
:return: The result of the addition
:rtype: boolean
:Example:
>>> is_table_exists('foobar')
true
"""
if not self.is_table_exists(table_name, schema):
msg = "table %s does not exist in schema%s"
raise TableDoesNotExists(msg % (schema, table_name))
if not self.is_column_exists(column_name, table_name, schema):
msg = "column %s does not exist in table %s.%s"
raise ColumnDoesNotExists(msg % (column_name, schema, table_name))
qry = """
WITH cte AS (
SELECT c.relname as indexname, c.oid, i.indrelid,
unnest( i.indkey) as attnum
FROM pg_class AS c
INNER JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
INNER JOIN pg_catalog.pg_index i ON c.oid = i.indexrelid
WHERE c.relkind = 'i'
AND n.nspname = %s
)
SELECT cte.*, a.attname, t.relname as tablename
FROM cte
INNER JOIN pg_class t ON cte.indrelid = t.oid
INNER JOIN pg_catalog.pg_attribute a
ON (cte.attnum = a.attnum AND a.attrelid = cte.indrelid)
WHERE a.attname = %s AND t.relname = %s
"""
rows = self._execute_sql(qry, (schema, column_name, table_name))
return (len(rows) > 0)
[docs] def is_column_exists(self, column_name, table_name, schema='public'):
"""Check if a column exists in a table
:param column_name: the column's name to look for
:param table_name: the table's name to look in
"""
qry = """
SELECT 1
FROM pg_catalog.pg_attribute a
INNER JOIN pg_class t ON a.attrelid = t.oid
INNER JOIN pg_catalog.pg_namespace n ON t.relnamespace = n.oid
WHERE n.nspname = %s
AND a.attname = %s
AND t.relname = %s
"""
rows = self._execute_sql(qry, (schema, column_name, table_name))
return (len(rows) > 0)
[docs] def is_table_exists(self, table_name, schema='public'):
"""Check if a table exists
:param table_name: The table's name to look for
:type table_name: string
:return: The result of the addition
:rtype: boolean
:Example:
>>> is_table_exists('foobar')
true
"""
qry = """
SELECT 1
FROM pg_class t
INNER JOIN pg_catalog.pg_namespace n ON t.relnamespace = n.oid
WHERE n.nspname = %s
AND t.relname = %s
"""
rows = self._execute_sql(qry, (schema, table_name))
return (len(rows) > 0)
def _table_oid(self, table_name, schema='public'):
"""Return the table's oid
:param table_name: The table's name to look for
:type table_name: string
:return: The result of the addition
:rtype: boolean
:Example:
>>> _table_oid('foobar', schema='public')
1241
"""
qry = """
SELECT t.oid
FROM pg_class t
INNER JOIN pg_catalog.pg_namespace n ON t.relnamespace = n.oid
WHERE n.nspname = %s
AND t.relname = %s
"""
rows = self._execute_sql(qry, (schema, table_name))
return rows[0][0]