out_signature='o',
async_callbacks=('cb', 'cbe'))
def Rename(self, name, tmo, rename_options, cb, cbe):
+ utils.validate_lv_name(LV_INTERFACE, self.vg_name_lookup(), name)
+
r = RequestEntry(
tmo, Lv._rename,
(self.Uuid, self.lvm_id, name, rename_options),
async_callbacks=('cb', 'cbe'))
def Snapshot(self, name, optional_size, tmo,
snapshot_options, cb, cbe):
+
+ utils.validate_lv_name(LV_INTERFACE, self.vg_name_lookup(), name)
+
r = RequestEntry(
tmo, Lv._snap_shot,
(self.Uuid, self.lvm_id, name,
out_signature='o',
async_callbacks=('cb', 'cbe'))
def TagsAdd(self, tags, tmo, tag_options, cb, cbe):
+
+ for t in tags:
+ utils.validate_tag(LV_INTERFACE, t)
+
r = RequestEntry(
tmo, Lv._add_rm_tags,
(self.state.Uuid, self.state.lvm_id,
out_signature='o',
async_callbacks=('cb', 'cbe'))
def TagsDel(self, tags, tmo, tag_options, cb, cbe):
+
+ for t in tags:
+ utils.validate_tag(LV_INTERFACE, t)
+
r = RequestEntry(
tmo, Lv._add_rm_tags,
(self.state.Uuid, self.state.lvm_id,
out_signature='(oo)',
async_callbacks=('cb', 'cbe'))
def LvCreate(self, name, size_bytes, tmo, create_options, cb, cbe):
+ utils.validate_lv_name(THIN_POOL_INTERFACE, self.vg_name_lookup(), name)
+
r = RequestEntry(
tmo, LvThinPool._lv_create,
(self.Uuid, self.lvm_id, name,
out_signature='(oo)',
async_callbacks=('cb', 'cbe'))
def PvCreate(self, device, tmo, create_options, cb, cbe):
+ utils.validate_device_path(MANAGER_INTERFACE, device)
r = RequestEntry(
tmo, Manager._pv_create,
(device, create_options), cb, cbe)
out_signature='(oo)',
async_callbacks=('cb', 'cbe'))
def VgCreate(self, name, pv_object_paths, tmo, create_options, cb, cbe):
+ utils.validate_vg_name(MANAGER_INTERFACE, name)
r = RequestEntry(
tmo, Manager._create_vg,
(name, pv_object_paths, create_options,),
:param cbe: Not visible in API (used for async. error callback)
:return: '/' if operation done, else job path
"""
+ for d in device_paths:
+ utils.validate_device_path(MANAGER_INTERFACE, d)
+
r = RequestEntry(
tmo, Manager._pv_scan,
(activate, cache, device_paths, major_minors,
self._rc_error = None
self._return_tuple = return_tuple
- if self.tmo == -1:
+ if self.tmo < 0:
# Client is willing to block forever
pass
elif tmo == 0:
import inspect
import ctypes
import os
+import string
import dbus
import dbus.service
if not remainder:
return size_bytes
return size_bytes + bs - remainder
+
+
+_ALLOWABLE_CH = string.ascii_letters + string.digits + '#+.:=@_\/%'
+_ALLOWABLE_CH_SET = set(_ALLOWABLE_CH)
+
+_ALLOWABLE_VG_LV_CH = string.ascii_letters + string.digits + '.-_+'
+_ALLOWABLE_VG_LV_CH_SET = set(_ALLOWABLE_VG_LV_CH)
+_LV_NAME_RESERVED = ("_cdata", "_cmeta", "_corig", "_mimage", "_mlog",
+ "_pmspare", "_rimage", "_rmeta", "_tdata", "_tmeta", "_vorigin")
+
+# Tags can have the characters, based on the code
+# a-zA-Z0-9._-+/=!:&#
+_ALLOWABLE_TAG_CH = string.ascii_letters + string.digits + "._-+/=!:&#"
+_ALLOWABLE_TAG_CH_SET = set(_ALLOWABLE_TAG_CH)
+
+
+def _allowable_tag(tag_name):
+ # LVM should impose a length restriction
+ return set(tag_name) <= _ALLOWABLE_TAG_CH_SET
+
+
+def _allowable_vg_name(vg_name):
+ if vg_name is None:
+ raise ValueError("VG name is None or empty")
+
+ vg_len = len(vg_name)
+ if vg_len == 0 or vg_len > 127:
+ raise ValueError("VG name (%s) length (%d) not in the domain 1..127" %
+ (vg_name, vg_len))
+
+ if not set(vg_name) <= _ALLOWABLE_VG_LV_CH_SET:
+ raise ValueError("VG name (%s) contains invalid character, "
+ "allowable set(%s)" % (vg_name, _ALLOWABLE_VG_LV_CH))
+
+ if vg_name == "." or vg_name == "..":
+ raise ValueError('VG name (%s) cannot be "." or ".."' % (vg_name))
+
+
+def _allowable_lv_name(vg_name, lv_name):
+
+ if lv_name is None:
+ raise ValueError("LV name is None or empty")
+
+ lv_len = len(lv_name)
+
+ # This length is derived from empirical testing
+ if lv_len == 0 or (len(vg_name) + lv_len) > 125:
+ raise ValueError("LV name (%s) length (%d) + VG name length "
+ "not in the domain 1..125" % (lv_name, lv_len))
+
+ if not set(lv_name) <= _ALLOWABLE_VG_LV_CH_SET:
+ raise ValueError("LV name (%s) contains invalid character, "
+ "allowable (%s)" % (lv_name, _ALLOWABLE_VG_LV_CH))
+
+ if any(x in lv_name for x in _LV_NAME_RESERVED):
+ raise ValueError("LV name (%s) contains a reserved word, "
+ "reserved set(%s)" % (lv_name, str(_LV_NAME_RESERVED)))
+
+ if lv_name.startswith("snapshot") or lv_name.startswith("pvmove"):
+ raise ValueError("LV name (%s) starts with a reserved word, "
+ "reserved set(%s)" % (lv_name, str(["snapshot", "pvmove"])))
+
+ if lv_name[0] == '-':
+ raise ValueError("LV name (%s) cannot start with a '-' "
+ "character" % lv_name)
+
+
+def validate_device_path(interface, device):
+ if not set(device) <= _ALLOWABLE_CH_SET:
+ raise dbus.exceptions.DBusException(
+ interface, 'Device path (%s) has invalid characters, '
+ 'allowable (%s)' % (device, _ALLOWABLE_CH))
+
+
+def validate_vg_name(interface, vg_name):
+ try:
+ _allowable_vg_name(vg_name)
+ except ValueError as ve:
+ raise dbus.exceptions.DBusException(
+ interface, str(ve))
+
+
+def validate_lv_name(interface, vg_name, lv_name):
+ try:
+ _allowable_lv_name(vg_name, lv_name)
+ except ValueError as ve:
+ raise dbus.exceptions.DBusException(
+ interface, str(ve))
+
+
+def validate_tag(interface, tag):
+ if not _allowable_tag(tag):
+ raise dbus.exceptions.DBusException(
+ interface, 'tag (%s) contains invalid character, allowable set(%s)'
+ % (tag, _ALLOWABLE_TAG_CH))
in_signature='sia{sv}', out_signature='o',
async_callbacks=('cb', 'cbe'))
def Rename(self, name, tmo, rename_options, cb, cbe):
+ utils.validate_vg_name(VG_INTERFACE, name)
r = RequestEntry(tmo, Vg._rename,
(self.state.Uuid, self.state.lvm_id, name,
rename_options), cb, cbe, False)
job object path if created. Each == '/' when it doesn't
apply.
"""
+ utils.validate_lv_name(VG_INTERFACE, self.Name, name)
r = RequestEntry(tmo, Vg._lv_create,
(self.state.Uuid, self.state.lvm_id,
name, round_size(size_bytes), pv_dests_and_ranges,
async_callbacks=('cb', 'cbe'))
def LvCreateLinear(self, name, size_bytes,
thin_pool, tmo, create_options, cb, cbe):
+ utils.validate_lv_name(VG_INTERFACE, self.Name, name)
r = RequestEntry(tmo, Vg._lv_create_linear,
(self.state.Uuid, self.state.lvm_id,
name, round_size(size_bytes), thin_pool,
def LvCreateStriped(self, name, size_bytes, num_stripes,
stripe_size_kb, thin_pool, tmo, create_options,
cb, cbe):
+ utils.validate_lv_name(VG_INTERFACE, self.Name, name)
r = RequestEntry(
tmo, Vg._lv_create_striped,
(self.state.Uuid, self.state.lvm_id, name,
async_callbacks=('cb', 'cbe'))
def LvCreateMirror(self, name, size_bytes, num_copies,
tmo, create_options, cb, cbe):
+ utils.validate_lv_name(VG_INTERFACE, self.Name, name)
r = RequestEntry(
tmo, Vg._lv_create_mirror,
(self.state.Uuid, self.state.lvm_id, name,
def LvCreateRaid(self, name, raid_type, size_bytes,
num_stripes, stripe_size_kb, tmo,
create_options, cb, cbe):
+ utils.validate_lv_name(VG_INTERFACE, self.Name, name)
r = RequestEntry(tmo, Vg._lv_create_raid,
(self.state.Uuid, self.state.lvm_id, name,
raid_type, round_size(size_bytes), num_stripes,
out_signature='o',
async_callbacks=('cb', 'cbe'))
def PvTagsAdd(self, pvs, tags, tmo, tag_options, cb, cbe):
+
+ for t in tags:
+ utils.validate_tag(VG_INTERFACE, t)
+
r = RequestEntry(tmo, Vg._pv_add_rm_tags,
(self.state.Uuid, self.state.lvm_id,
pvs, tags, None, tag_options),
out_signature='o',
async_callbacks=('cb', 'cbe'))
def PvTagsDel(self, pvs, tags, tmo, tag_options, cb, cbe):
+
+ for t in tags:
+ utils.validate_tag(VG_INTERFACE, t)
+
r = RequestEntry(
tmo, Vg._pv_add_rm_tags,
(self.state.Uuid, self.state.lvm_id,
out_signature='o',
async_callbacks=('cb', 'cbe'))
def TagsAdd(self, tags, tmo, tag_options, cb, cbe):
+
+ for t in tags:
+ utils.validate_tag(VG_INTERFACE, t)
+
r = RequestEntry(tmo, Vg._vg_add_rm_tags,
(self.state.Uuid, self.state.lvm_id,
tags, None, tag_options),
out_signature='o',
async_callbacks=('cb', 'cbe'))
def TagsDel(self, tags, tmo, tag_options, cb, cbe):
+
+ for t in tags:
+ utils.validate_tag(VG_INTERFACE, t)
+
r = RequestEntry(tmo, Vg._vg_add_rm_tags,
(self.state.Uuid, self.state.lvm_id,
None, tags, tag_options),