Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions bigtable/google/cloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@
from google.cloud.bigtable.row_data import PartialRowsData


# Maximum number of mutations in bulk
_MAX_BULK_MUTATIONS = 100000

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.



class TableMismatchError(ValueError):
"""Row from another table."""


class TooManyMutationsError(ValueError):
"""The number of mutations for bulk request is too big."""

This comment was marked as spam.

This comment was marked as spam.



class Table(object):
"""Representation of a Google Cloud Bigtable Table.

Expand Down Expand Up @@ -276,6 +288,34 @@ def read_rows(self, start_key=None, end_key=None, limit=None,
# We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse`
return PartialRowsData(response_iterator)

def mutate_rows(self, rows):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

"""Mutates multiple rows in bulk.

The method tries to update all specified rows.
If some of the rows weren't updated, it would not remove mutations.
They can be applied to the row separately.
If row mutations finished successfully, they would be cleaned up.

:type rows: list
:param rows: List or other iterable of :class:`.DirectRow` instances.

:rtype: list
:returns: A list of tuples (``MutateRowsResponse.Entry`` protobuf
corresponding to the errors, :class:`.DirectRow`)
"""
mutate_rows_request = _mutate_rows_request(self.name, rows)

This comment was marked as spam.

unsuccessfully_mutated_rows = []
client = self._instance._client
responses = client._data_stub.MutateRows(mutate_rows_request)
for response in responses:
for entry in response.entries:
if entry.status.code == 0:
rows[entry.index].clear()

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

else:
unsuccessfully_mutated_rows.append(
(entry, rows[entry.index]))
return unsuccessfully_mutated_rows

def sample_row_keys(self):
"""Read a sample of row keys in the table.

Expand Down Expand Up @@ -373,3 +413,65 @@ def _create_row_request(table_name, row_key=None, start_key=None, end_key=None,
message.rows.row_ranges.add(**range_kwargs)

return message


def _mutate_rows_request(table_name, rows):
"""Creates a request to mutate rows in a table.

:type table_name: str
:param table_name: The name of the table to write to.

:type rows: list
:param rows: List or other iterable of :class:`.DirectRow` instances.

:rtype: :class:`data_messages_v2_pb2.MutateRowsRequest`
:returns: The ``MutateRowsRequest`` protobuf corresponding to the inputs.
:raises: :exc:`~.table.TooManyMutationsError` if the number of mutations is
grater than 100,000

This comment was marked as spam.

This comment was marked as spam.

"""
request_pb = data_messages_v2_pb2.MutateRowsRequest(table_name=table_name)
mutations_count = 0
for row in rows:
_check_row_table_name(table_name, row)
_check_row_type(row)
entry = request_pb.entries.add()
entry.row_key = row.row_key
for mutation in row._get_mutations(None):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

mutations_count += 1
entry.mutations.add().CopyFrom(mutation)
if mutations_count > _MAX_BULK_MUTATIONS:
raise TooManyMutationsError('Maximum number of mutations is %s' %
_MAX_BULK_MUTATIONS)

This comment was marked as spam.

return request_pb


def _check_row_table_name(table_name, row):
"""Checks that a row belong to the table.

This comment was marked as spam.


:type table_name: str
:param table_name: The name of the table.

:type row: :class:`.Row`
:param row: An instance of :class:`.Row` subclasses.

:raises: :exc:`~.table.TableMismatchError` if the row does not belong to
the table.
"""

This comment was marked as spam.

if row.table.name != table_name:
raise TableMismatchError(
'Row %s is a part of %s table. Current table: %s' %
(row.row_key, row.table.name, table_name))


def _check_row_type(row):
"""Checks that a row is an instance of :class:`.DirectRow`.

:type row: :class:`.Row`
:param row: An instance of :class:`.Row` subclasses.

:raises: :class:`TypeError <exceptions.TypeError>` if the row is not an
instance of DirectRow.
"""
if not isinstance(row, DirectRow):
raise TypeError('Bulk processing can not be applied for '
'conditional or append mutations.')

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

24 changes: 24 additions & 0 deletions bigtable/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,30 @@ def _write_to_row(self, row1=None, row2=None, row3=None, row4=None):
cell4 = Cell(CELL_VAL4, timestamp4)
return cell1, cell2, cell3, cell4

def test_mutate_rows(self):
row_1 = self._table.row(ROW_KEY)
row_1.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1)
row_1.commit()
row_2 = self._table.row(ROW_KEY_ALT)
row_2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL2)
row_2.commit()
rows = [row_1, row_2]
self.rows_to_delete.extend(rows)

This comment was marked as spam.


# Change the contents
row_1.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL3)
row_2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL4)
result = self._table.mutate_rows(rows)
self.assertFalse(result)

# Check the contents
row_1_data = self._table.read_row(ROW_KEY)
self.assertEqual(
row_1_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value, CELL_VAL3)

This comment was marked as spam.

This comment was marked as spam.

row_2_data = self._table.read_row(ROW_KEY_ALT)
self.assertEqual(
row_2_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value, CELL_VAL4)

def test_read_large_cell_limit(self):
row = self._table.row(ROW_KEY)
self.rows_to_delete.append(row)
Expand Down
1 change: 1 addition & 0 deletions bigtable/tests/unit/test_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class TestRow(unittest.TestCase):
@staticmethod
def _get_target_class():
from google.cloud.bigtable.row import Row

return Row

def _make_one(self, *args, **kwargs):
Expand Down
96 changes: 96 additions & 0 deletions bigtable/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,65 @@
import unittest


class Test___mutate_rows_request(unittest.TestCase):

This comment was marked as spam.

This comment was marked as spam.


def _call_fut(self, table_name, rows):
from google.cloud.bigtable.table import _mutate_rows_request

return _mutate_rows_request(table_name, rows)

def test__mutate_rows_too_many_mutations(self):

This comment was marked as spam.

from collections import namedtuple
from google.cloud.bigtable._generated.data_pb2 import Mutation

This comment was marked as spam.

from google.cloud.bigtable.row import DirectRow
import google.cloud.bigtable.table as table_module
from google.cloud.bigtable.table import TooManyMutationsError

table = namedtuple('Table', ['name'])

This comment was marked as spam.

table.name = 'table'
table_module._MAX_BULK_MUTATIONS = 3

This comment was marked as spam.

mutation = Mutation()
rows = [DirectRow(row_key=b'row_key', table=table),
DirectRow(row_key=b'row_key_2', table=table)]
rows[0]._pb_mutations = [mutation, mutation]
rows[1]._pb_mutations = [mutation, mutation]

This comment was marked as spam.

with self.assertRaises(TooManyMutationsError):
self._call_fut('table', rows)

This comment was marked as spam.



class Test__check_row_table_name(unittest.TestCase):

def _call_fut(self, table_name, row):
from google.cloud.bigtable.table import _check_row_table_name

return _check_row_table_name(table_name, row)

def test__check_rows_wrong_table_name(self):

This comment was marked as spam.

from collections import namedtuple
from google.cloud.bigtable.table import TableMismatchError
from google.cloud.bigtable.row import DirectRow

table = namedtuple('Table', ['name'])

This comment was marked as spam.

table.name = 'table'
row = DirectRow(row_key=b'row_key', table=table)
with self.assertRaises(TableMismatchError):
self._call_fut('other_table', row)


class Test__check_row_type(unittest.TestCase):
def _call_fut(self, table_name, row):
from google.cloud.bigtable.table import _check_row_type

return _check_row_type(table_name, row)

def test__check_rows_wrong_row_type(self):

This comment was marked as spam.

from google.cloud.bigtable.row import ConditionalRow

row = ConditionalRow(row_key=b'row_key', table='table', filter_=None)
with self.assertRaises(TypeError):
self._call_fut('table', row)

This comment was marked as spam.



class TestTable(unittest.TestCase):

PROJECT_ID = 'project-id'
Expand Down Expand Up @@ -348,6 +407,37 @@ def test_read_row_still_partial(self):
with self.assertRaises(ValueError):
self._read_row_helper(chunks, None)

def test_mutate_rows(self):
from google.cloud.bigtable._generated.bigtable_pb2 import (
MutateRowsResponse)
from google.cloud.bigtable.row import DirectRow
from tests.unit._testing import _FakeStub

client = _Client()
instance = _Instance(self.INSTANCE_NAME, client=client)
table = self._make_one(self.TABLE_ID, instance)

row_1 = DirectRow(row_key=b'row_key', table=table)
row_1.set_cell('cf', b'col', b'value1')
row_2 = DirectRow(row_key=b'row_key_2', table=table)
row_2.set_cell('cf', b'col', b'value2')

response = MutateRowsResponse()

This comment was marked as spam.

entry_1 = response.entries.add()
entry_1.status.code = 0
entry_2 = response.entries.add()
entry_2.status.code = 1

# Patch the stub used by the API method.
client._data_stub = _FakeStub([response])
result = table.mutate_rows([row_1, row_2])

self.assertIs(result[0][1], row_1)
self.assertTrue(len(result))
self.assertFalse(row_1._get_mutations(None))
self.assertTrue(row_2._get_mutations(None))


def test_read_rows(self):
from google.cloud._testing import _Monkey
from tests.unit._testing import _FakeStub
Expand Down Expand Up @@ -570,6 +660,12 @@ def _SampleRowKeysRequestPB(*args, **kw):
return messages_v2_pb2.SampleRowKeysRequest(*args, **kw)


def _mutate_rows_request_pb(*args, **kw):
from google.cloud.bigtable._generated import (
bigtable_pb2 as data_messages_v2_pb2)

return data_messages_v2_pb2.MutateRowsRequest(*args, **kw)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

def _TablePB(*args, **kw):
from google.cloud.bigtable._generated import (
table_pb2 as table_v2_pb2)
Expand Down