"""Implement SELECT query."""
import json
import time
from pyetcd import EtcdEmptyResponse
from etcdb import OperationalError, InternalError
from etcdb.eval_expr import eval_expr, EtcdbFunction
from etcdb.execute.dml.insert import get_table_columns
from etcdb.log import LOG
from etcdb.resultset import ResultSet, ColumnSet, Column, Row
[docs]def list_table(etcd_client, db, tbl):
"""
Read primary key values in table db.tbl.
:param etcd_client: etcd client.
:type etcd_client: pyetcd.client.Client
:param db: database name.
:param tbl: table name.
:return: list of primary keys.
:rtype: list
"""
pks = []
table_key = "/{db}/{tbl}".format(db=db, tbl=tbl)
etcd_result = etcd_client.read(table_key)
if 'nodes' in etcd_result.node:
nodes = etcd_result.node['nodes']
for node in nodes:
pk_key = node['key']
primary_key = pk_key.replace(table_key + '/', '', 1)
pks.append(primary_key)
pks = sorted(pks)
return pks
[docs]def prepare_columns(tree):
"""
Generate ColumnSet for query result. ColumnsSet doesn't include
a grouping function.
:return: Columns of the query result.
:rtype: ColumnSet
"""
columns = ColumnSet()
for select_item in tree.expressions:
expr, alias = select_item
colname, _ = eval_expr(None, tree=expr)
if alias:
colname = alias
columns.add(Column(colname))
return columns
[docs]def get_row_by_primary_key(etcd_client, db, table, primary_key, **kwargs):
"""
Read row from etcd by its primary key value.
:param etcd_client:
:type etcd_client: Client
:param db:
:param table:
:param primary_key: Primary key value.
:param kwargs: See below.
:return: Row
:rtype: Row
:Keyword Arguments:
* **wait** (bool) - If True it will wait for a change in the key.
* **wait_index** (int) - When waiting you can specify index to
wait for.
"""
key = "/{db}/{tbl}/{pk}".format(
db=db,
tbl=table,
pk=primary_key
)
client_kwargs = {}
if 'wait' in kwargs:
client_kwargs['wait'] = kwargs.get('wait')
if 'wait_index' in kwargs:
client_kwargs['waitIndex'] = kwargs.get('wait_index')
etcd_response = None
for i in xrange(30):
try:
etcd_response = etcd_client.read(key, **client_kwargs)
break
except EtcdEmptyResponse as err:
LOG.warning("Retry #%d after error: %s", i, err)
time.sleep(1)
if not etcd_response:
raise InternalError('Failed to get response from etcd')
row = ()
field_values = json.loads(etcd_response.node['value'])
table_columns = get_table_columns(etcd_client, db, table)
for col in table_columns:
row += (field_values[str(col)], )
try:
etcd_index = etcd_response.x_etcd_index
except AttributeError:
etcd_index = 0
return Row(row, etcd_index=etcd_index,
modified_index=etcd_response.node['modifiedIndex'])
[docs]def group_function(table_columns, table_row, tree):
"""True if resultset should be grouped
:return: Grouping function or None and its position.
:rtype: tuple(EtcdbFunction, int)"""
try:
group_position = 0
for select_item in tree.expressions:
expr = select_item[0]
expr_value = eval_expr((table_columns, table_row),
tree=expr)[1]
if isinstance(expr_value, EtcdbFunction):
if expr_value.group:
return expr_value, group_position
group_position += 1
return None, None
except TypeError:
return None, None
[docs]def eval_row(table_columns, table_row, tree):
"""Find values of a row. table_columns are fields in the table.
The result columns is taken from tree.expressions.
:param table_columns: Columns in the table row.
:type table_columns: ColumnSet
:param table_row: Input row.
:type table_row: Row
:param tree: Parsing tree.
:type tree: SQLTree
"""
result_row = ()
for select_item in tree.expressions:
expr = select_item[0]
expr_value = eval_expr((table_columns, table_row),
tree=expr)[1]
if isinstance(expr_value, EtcdbFunction) and not expr_value.group:
expr_value = expr_value()
result_row += (expr_value, )
return result_row
[docs]def group_result_set(func, result_set, table_row, tree, pos):
"""Apply a group function to result set and return an aggregated row.
:param func: Aggregation function.
:type func: callable
:param result_set: Result set to aggregate.
:type result_set: ResultSet
:param table_row: Table row to base aggregated row on.
:type table_row: Row
:param tree: Parsing tree.
:type tree: SQLTree
:param pos: Aggregate function position in the resulting row.
:type pos: int
:return: Result set with aggregated row.
:rtype: ResultSet"""
group_value = func(result_set)
values = list(eval_row(result_set.columns, table_row, tree))
values[pos] = group_value
row = Row(tuple(values))
return ResultSet(prepare_columns(tree), [row])
[docs]def execute_select_plain(etcd_client, tree, db):
"""Execute SELECT that reads rows from table."""
result_columns = prepare_columns(tree)
result_set = ResultSet(result_columns)
table_columns = get_table_columns(etcd_client, db, tree.table)
last_row = None
for primary_key in list_table(etcd_client, db, tree.table):
table_row = get_row_by_primary_key(etcd_client, db, tree.table,
primary_key)
if tree.where:
expr = tree.where
if eval_expr((table_columns, table_row), expr)[1]:
row = Row(eval_row(table_columns, table_row, tree),
etcd_index=table_row.etcd_index,
modified_index=table_row.modified_index)
result_set.add_row(row)
last_row = table_row
else:
row = Row(eval_row(table_columns, table_row, tree),
etcd_index=table_row.etcd_index,
modified_index=table_row.modified_index)
result_set.add_row(row)
last_row = table_row
g_function, pos = group_function(table_columns, last_row, tree)
if g_function:
result_set = group_result_set(g_function, result_set, last_row,
tree, pos)
return result_set
[docs]def execute_select_no_table(tree):
"""Execute SELECT that doesn't read from a table.
SELECT VERSION() or similar."""
result_columns = prepare_columns(tree)
result_set = ResultSet(result_columns)
result_row = Row(eval_row(result_columns, Row(()), tree))
result_set.add_row(result_row)
return result_set
[docs]def fix_tree_star(tree, etcd_client, db, tbl):
"""If parsing tree contains [["*", null], null] expression it means
the query was SELECT * . So, the expressions needs to be replaced
with actual field names.
"""
if tree.expressions == [(("*", None), None)]:
tree.expressions = []
for field in get_table_columns(etcd_client, db, tbl):
tree.expressions.append(
(
(
'bool_primary', (
'predicate', (
'bit_expr', (
'simple_expr', (
'IDENTIFIER',
str(field)
)
)
)
)
),
None
)
)
return tree
[docs]def execute_select(etcd_client, tree, db):
"""
Execute SELECT query.
:param etcd_client: etcd client.
:type etcd_client: pyetcd.client.Client
:param db: Current database.
:type db: str
:param tree: Parse tree.
:type tree: SQLTree
:return: ResultSet instance.
:rtype: ResultSet
"""
if not db:
raise OperationalError('No database selected')
if tree.table:
tree = fix_tree_star(tree, etcd_client, db, tree.table)
result_set = execute_select_plain(etcd_client, tree, db)
LOG.debug(result_set)
else:
result_set = execute_select_no_table(tree)
if tree.limit is not None:
result_set = ResultSet(result_set.columns, result_set[:tree.limit])
return result_set