"""
Module provides functions to evaluate an expression given in
a WHERE statement.
Grammar of expression is described on MySQL website
(https://dev.mysql.com/doc/refman/5.7/en/expressions.html).
"""
from etcdb import OperationalError, __version__
from etcdb.resultset import Column
from etcdb.sqlparser.parser import SQLParserError
[docs]class EtcdbFunction(object):
"""EtcdbFunction represents an SQL function.
:param function_name: python function that implements SQL function.
:type function_name: callable
:param group: True if the functions is aggregate function
:type group: bool
:param args: Arguments to pass to function_name
:param kwargs: Keyword arguments"""
def __init__(self, *args, **kwargs):
self._function = args[0]
self._group = kwargs.get('group', False)
self._args = args[1:]
@property
def function(self):
"""Return function name"""
return self._function
@property
def group(self):
"""Return whether the function is aggregate"""
return self._group
def __call__(self, *args, **kwargs):
return self._function(*args, **kwargs)
def __eq__(self, other):
return all(
(
isinstance(other, EtcdbFunction),
self.function == other.function,
self.group == other.group
)
)
def __ne__(self, other):
return not self.__eq__(other)
[docs]def eval_identifier(row, identifier):
"""
Get value of identifier for a given row
:param row: row
:type row: tuple(ColumnSet, Row)
:param identifier: Identifier
:type identifier: str
:return: value of identifier
"""
try:
identifier_strip = identifier.split('.')[1]
except IndexError:
identifier_strip = identifier
# If row isn't given return column name only
if row:
columns = row[0]
data = row[1]
else:
return identifier_strip, None
try:
pos = columns.index(Column(identifier_strip))
return identifier_strip, data[pos]
except ValueError:
raise OperationalError('Unknown identifier %s' % identifier_strip)
[docs]def eval_string(value):
"""Evaluate string token"""
return '%s' % value, value
[docs]def etcdb_version():
"""Get etcdb version"""
return __version__
[docs]def etcdb_count(result_set):
"""
Count rows in result set
:param result_set: ResultSet instance
:type result_set: ResultSet
:return: number of rows in ResultSet
:rtype: int
"""
return len(result_set.rows)
[docs]def eval_function_call(row, tree): # pylint: disable=unused-argument
"""Evaluate function call
:return: tuple with field name and EtcdbFunction instance"""
if tree == 'VERSION':
# func = EtcdbFunction(version, group=False)
return "VERSION()", EtcdbFunction(etcdb_version, group=False)
if tree == 'COUNT':
# func = EtcdbFunction(version, group=False)
return "COUNT(*)", EtcdbFunction(etcdb_count, group=True)
raise NotImplementedError('Unknown function %s' % tree)
[docs]def eval_simple_expr(row, tree):
"""Evaluate simple_expr"""
if tree[0] == 'IDENTIFIER':
return eval_identifier(row, tree[1])
elif tree[0] == 'STRING' or tree[0] == 'literal':
return eval_string(tree[1])
elif tree[0] == 'function_call':
return eval_function_call(row, tree[1])
elif tree[0] == 'expr':
return eval_expr(row, tree[1])
else:
raise SQLParserError('%s is not implemented' % tree[0])
[docs]def eval_bit_expr(row, tree):
"""Evaluate bit_expr"""
if tree[0] == 'simple_expr':
return eval_simple_expr(row, tree[1])
else:
raise SQLParserError('%s is not implemented' % tree[0])
[docs]def eval_predicate(row, tree):
"""Evaluate predicate"""
if tree[0] == 'bit_expr':
return eval_bit_expr(row, tree[1])
if tree[0] == 'IN':
simple_expr = eval_simple_expr(row, tree[1][1])
for expr_tree in tree[2]:
expr_value = eval_expr(row, expr_tree)
if expr_value[1] == simple_expr[1]:
return '%s IN (%s)' % (simple_expr[0], expr_value[0]), True
return '%s IN (expr)' % (simple_expr[0], ), False
else:
raise SQLParserError('%s is not implemented' % tree[0])
[docs]def eval_bool_primary(row, tree): # pylint: disable=too-many-return-statements
"""Evaluate bool_primary"""
if tree[0] == 'IS NULL':
bool_primary1 = eval_bool_primary(row, tree[1])
return "%s IS NULL" % bool_primary1[0], \
bool_primary1[1] is None
elif tree[0] == 'IS NOT NULL':
bool_primary1 = eval_bool_primary(row, tree[1])
return "%s IS NOT NULL" % bool_primary1[0], \
bool_primary1[1] is not None
elif tree[0] == '=':
bool_primary1 = eval_bool_primary(row, tree[1])
bool_primary2 = eval_bool_primary(row, tree[2])
return (
"%s = %s" % (bool_primary1[0], bool_primary2[0]),
bool_primary1[1] == bool_primary2[1]
)
elif tree[0] == '>=':
bool_primary1 = eval_bool_primary(row, tree[1])
bool_primary2 = eval_bool_primary(row, tree[2])
return (
"%s >= %s" % (bool_primary1[0], bool_primary2[0]),
bool_primary1[1] >= bool_primary2[1]
)
elif tree[0] == '>':
bool_primary1 = eval_bool_primary(row, tree[1])
bool_primary2 = eval_bool_primary(row, tree[2])
return (
"%s > %s" % (bool_primary1[0], bool_primary2[0]),
bool_primary1[1] > bool_primary2[1]
)
elif tree[0] == '<=':
bool_primary1 = eval_bool_primary(row, tree[1])
bool_primary2 = eval_bool_primary(row, tree[2])
return (
"%s <= %s" % (bool_primary1[0], bool_primary2[0]),
bool_primary1[1] <= bool_primary2[1]
)
elif tree[0] == '<':
bool_primary1 = eval_bool_primary(row, tree[1])
bool_primary2 = eval_bool_primary(row, tree[2])
return (
"%s < %s" % (bool_primary1[0], bool_primary2[0]),
bool_primary1[1] < bool_primary2[1]
)
elif tree[0] in ['<>', '!=']:
bool_primary1 = eval_bool_primary(row, tree[1])
bool_primary2 = eval_bool_primary(row, tree[2])
return (
"%s %s %s" % (bool_primary1[0], tree[0], bool_primary2[0]),
bool_primary1[1] != bool_primary2[1]
)
elif tree[0] == 'predicate':
return eval_predicate(row, tree[1])
elif tree[0] == 'bit_expr':
return eval_bit_expr(row, tree[1])
else:
raise SQLParserError('%s is not implemented' % tree[0])
[docs]def eval_expr(row, tree):
"""Evaluate expression
:return: Tuple with string representation and value.
For example, ('id', 5).
:rtype: tuple
"""
if tree[0] == 'OR':
expr1 = eval_expr(row, tree[1])
expr2 = eval_expr(row, tree[2])
return (
'%s OR %s' % (expr1[0], expr2[0]),
expr1[1] or expr2[1]
)
elif tree[0] == 'AND':
expr1 = eval_expr(row, tree[1])
expr2 = eval_expr(row, tree[2])
return (
'%s AND %s' % (expr1[0], expr2[0]),
expr1[1] and expr2[1]
)
elif tree[0] == 'NOT':
expr1 = eval_expr(row, tree[1])
return (
'NOT %s' % expr1[0],
not expr1[1]
)
elif tree[0] == 'bool_primary':
return eval_bool_primary(row, tree[1])
else:
raise SQLParserError('%r is not implemented' % tree[0])