def DataLv(self):
return dbus.ObjectPath(self._data_lv)
+ @staticmethod
+ def _enable_disable_compression(pool_uuid, pool_name, enable, comp_options):
+ # Make sure we have a dbus object representing it
+ LvCommon.validate_dbus_object(pool_uuid, pool_name)
+ # Rename the logical volume
+ LvCommon.handle_execute(*cmdhandler.lv_vdo_compression(
+ pool_name, enable, comp_options))
+ return '/'
+
+ @dbus.service.method(
+ dbus_interface=VDO_POOL_INTERFACE,
+ in_signature='ia{sv}',
+ out_signature='o',
+ async_callbacks=('cb', 'cbe'))
+ def EnableCompression(self, tmo, comp_options, cb, cbe):
+ r = RequestEntry(
+ tmo, LvVdoPool._enable_disable_compression,
+ (self.Uuid, self.lvm_id, True, comp_options),
+ cb, cbe, False)
+ cfg.worker_q.put(r)
+
+ @dbus.service.method(
+ dbus_interface=VDO_POOL_INTERFACE,
+ in_signature='ia{sv}',
+ out_signature='o',
+ async_callbacks=('cb', 'cbe'))
+ def DisableCompression(self, tmo, comp_options, cb, cbe):
+ r = RequestEntry(
+ tmo, LvVdoPool._enable_disable_compression,
+ (self.Uuid, self.lvm_id, False, comp_options),
+ cb, cbe, False)
+ cfg.worker_q.put(r)
+
+ @staticmethod
+ def _enable_disable_deduplication(pool_uuid, pool_name, enable, dedup_options):
+ # Make sure we have a dbus object representing it
+ LvCommon.validate_dbus_object(pool_uuid, pool_name)
+ # Rename the logical volume
+ LvCommon.handle_execute(*cmdhandler.lv_vdo_deduplication(
+ pool_name, enable, dedup_options))
+ return '/'
+
+ @dbus.service.method(
+ dbus_interface=VDO_POOL_INTERFACE,
+ in_signature='ia{sv}',
+ out_signature='o',
+ async_callbacks=('cb', 'cbe'))
+ def EnableDeduplication(self, tmo, dedup_options, cb, cbe):
+ r = RequestEntry(
+ tmo, LvVdoPool._enable_disable_deduplication,
+ (self.Uuid, self.lvm_id, True, dedup_options),
+ cb, cbe, False)
+ cfg.worker_q.put(r)
+
+ @dbus.service.method(
+ dbus_interface=VDO_POOL_INTERFACE,
+ in_signature='ia{sv}',
+ out_signature='o',
+ async_callbacks=('cb', 'cbe'))
+ def DisableDeduplication(self, tmo, dedup_options, cb, cbe):
+ r = RequestEntry(
+ tmo, LvVdoPool._enable_disable_deduplication,
+ (self.Uuid, self.lvm_id, False, dedup_options),
+ cb, cbe, False)
+ cfg.worker_q.put(r)
+
# noinspection PyPep8Naming
class LvThinPool(Lv):
self.assertEqual(pv_object_path, self._lookup(symlink))
self.assertEqual(pv_object_path, self._lookup(pv_device_path))
- def _create_vdo_pool_and_lv(self):
+ def _create_vdo_pool_and_lv(self, vg_prefix="vdo_"):
pool_name = lv_n("_vdo_pool")
lv_name = lv_n()
- vg_proxy = self._vg_create(vg_prefix="vdo_")
+ vg_proxy = self._vg_create(vg_prefix=vg_prefix)
vdo_pool_object_path = self.handle_return(
vg_proxy.VgVdo.CreateVdoPoolandLv(
pool_name, lv_name,
vg, _, _ = self._create_vdo_pool_and_lv()
self.handle_return(vg.Vg.Remove(dbus.Int32(g_tmo), EOD))
+ def test_vdo_pool_compression_deduplication(self):
+ if not self.vdo:
+ raise unittest.SkipTest('vdo not supported')
+
+ vg, pool, _lv = self._create_vdo_pool_and_lv(vg_prefix="vdo2_")
+
+ # compression and deduplication should be enabled by default
+ self.assertEqual(pool.VdoPool.Compression, "enabled")
+ self.assertEqual(pool.VdoPool.Deduplication, "enabled")
+
+ self.handle_return(
+ pool.VdoPool.DisableCompression(dbus.Int32(g_tmo), EOD))
+ self.handle_return(
+ pool.VdoPool.DisableDeduplication(dbus.Int32(g_tmo), EOD))
+ pool.update()
+ self.assertEqual(pool.VdoPool.Compression, "")
+ self.assertEqual(pool.VdoPool.Deduplication, "")
+
+ self.handle_return(
+ pool.VdoPool.EnableCompression(dbus.Int32(g_tmo), EOD))
+ self.handle_return(
+ pool.VdoPool.EnableDeduplication(dbus.Int32(g_tmo), EOD))
+ pool.update()
+ self.assertEqual(pool.VdoPool.Compression, "enabled")
+ self.assertEqual(pool.VdoPool.Deduplication, "enabled")
+
+ self.handle_return(vg.Vg.Remove(dbus.Int32(g_tmo), EOD))
+
def _test_lv_method_interface(self, lv):
self._rename_lv_test(lv)
self._test_activate_deactivate(lv)