#!/usr/bin/python
# coding: utf-8
#
from __future__ import division
import os
import re
import sys
import subprocess
from collections import defaultdict
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, Gdk, GObject
import parted
import commands
import gettext
import time
gettext.install("live-installer", "/usr/share/locale")
# Used as a decorator to run things in the main loop, from another thread
def idle(func):
def wrapper(*args, **kwargs):
GObject.idle_add(func, *args, **kwargs)
return wrapper
def shell_exec(command):
return subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
def getoutput(command):
return shell_exec(command).stdout.read().strip()
(IDX_PART_PATH,
IDX_PART_TYPE,
IDX_PART_DESCRIPTION,
IDX_PART_FORMAT_AS,
IDX_PART_MOUNT_AS,
IDX_PART_SIZE,
IDX_PART_FREE_SPACE,
IDX_PART_OBJECT,
IDX_PART_DISK) = range(9)
def is_efi_supported():
# Are we running under with efi ?
os.system("modprobe efivars >/dev/null 2>&1")
return os.path.exists("/proc/efi") or os.path.exists("/sys/firmware/efi")
def path_exists(*args):
return os.path.exists(os.path.join(*args))
TMP_MOUNTPOINT = '/tmp/live-installer/tmpmount'
RESOURCE_DIR = '/usr/share/live-installer/'
EFI_MOUNT_POINT = '/boot/efi'
SWAP_MOUNT_POINT = 'swap'
with open(RESOURCE_DIR + 'disk-partitions.html') as f:
DISK_TEMPLATE = f.read()
# cut out the single partition (skeleton) block
PARTITION_TEMPLATE = re.search('CUT_HERE([\s\S]+?)CUT_HERE', DISK_TEMPLATE, re.MULTILINE).group(1)
# delete the skeleton from original
DISK_TEMPLATE = DISK_TEMPLATE.replace(PARTITION_TEMPLATE, '')
# duplicate all { or } in original CSS so they don't get interpreted as part of string formatting
DISK_TEMPLATE = re.sub('', lambda match: match.group().replace('{', '{{').replace('}', '}}'), DISK_TEMPLATE)
def get_disks():
disks = []
exclude_devices = ['/dev/sr0', '/dev/sr1', '/dev/cdrom', '/dev/dvd', '/dev/fd0']
live_device = commands.getoutput("findmnt -n -o source /run/live/medium").split('\n')[0]
live_device = re.sub('[0-9]+$', '', live_device) # remove partition numbers if any
if live_device is not None and live_device.startswith('/dev/'):
exclude_devices.append(live_device)
print "Excluding %s (detected as the live device)" % live_device
lsblk = shell_exec('LC_ALL=en_US.UTF-8 lsblk -rindo TYPE,NAME,RM,SIZE,MODEL | sort -k3,2')
for line in lsblk.stdout:
try:
elements = line.strip().split(" ", 4)
if len(elements) < 4:
print "Can't parse blkid output: %s" % elements
continue
elif len(elements) < 5:
print "Can't find model in blkid output: %s" % elements
type, device, removable, size, model = elements[0], elements[1], elements[2], elements[3], elements[1]
else:
type, device, removable, size, model = elements
device = "/dev/" + device
if type == "disk" and device not in exclude_devices:
# convert size to manufacturer's size for show, e.g. in GB, not GiB!
unit_index = 'BKMGTPEZY'.index(size.upper()[-1])
l10n_unit = [_('B'), _('kB'), _('MB'), _('GB'), _('TB'), 'PB', 'EB', 'ZB', 'YB'][unit_index]
size = "%s %s" % (str(int(float(size[:-1]) * (1024/1000)**unit_index)), l10n_unit)
model = model.replace("\\x20", " ")
description = '{} ({})'.format(model.strip(), size)
if int(removable):
description = _('Removable:') + ' ' + description
disks.append((device, description))
except Exception, detail:
print "Could not parse blkid output: %s (%s)" % (line, detail)
return disks
def build_partitions(_installer):
global installer
installer = _installer
installer.window.get_window().set_cursor(Gdk.Cursor.new(Gdk.CursorType.WATCH)) # "busy" cursor
installer.window.set_sensitive(False)
print "Starting PartitionSetup()"
partition_setup = PartitionSetup()
print "Finished PartitionSetup()"
if partition_setup.disks:
installer._selected_disk = partition_setup.disks[0][0]
print "Loading HTML string"
installer.partitions_browser.load_html(partition_setup.get_html(installer._selected_disk), 'file:///')
print "Showing the partition screen"
installer.builder.get_object("scrolled_partitions").show_all()
installer.builder.get_object("treeview_disks").set_model(partition_setup)
installer.builder.get_object("treeview_disks").expand_all()
installer.window.get_window().set_cursor(None)
installer.window.set_sensitive(True)
def update_html_preview(selection):
model, row = selection.get_selected()
try: disk = model[row][IDX_PART_DISK]
except TypeError, IndexError: return # no disk is selected or no disk available
if disk != installer._selected_disk:
installer._selected_disk = disk
installer.partitions_browser.load_html(model.get_html(disk), 'file:///')
def edit_partition_dialog(widget, path, viewcol):
''' assign the partition ... '''
model, iter = installer.builder.get_object("treeview_disks").get_selection().get_selected()
if not iter: return
row = model[iter]
partition = row[IDX_PART_OBJECT]
if (partition.partition.type != parted.PARTITION_EXTENDED and
partition.partition.number != -1):
dlg = PartitionDialog(row[IDX_PART_PATH],
row[IDX_PART_MOUNT_AS],
row[IDX_PART_FORMAT_AS],
row[IDX_PART_TYPE])
response_is_ok, mount_as, format_as = dlg.show()
if response_is_ok:
assign_mount_point(partition, mount_as, format_as)
def assign_mount_point(partition, mount_point, filesystem):
# Assign it in the treeview
model = installer.builder.get_object("treeview_disks").get_model()
for disk in model:
for part in disk.iterchildren():
if partition == part[IDX_PART_OBJECT]:
part[IDX_PART_MOUNT_AS] = mount_point
part[IDX_PART_FORMAT_AS] = filesystem
elif mount_point == part[IDX_PART_MOUNT_AS]:
part[IDX_PART_MOUNT_AS] = ""
part[IDX_PART_FORMAT_AS] = ""
# Assign it in our setup
for part in installer.setup.partitions:
if part == partition:
partition.mount_as, partition.format_as = mount_point, filesystem
elif part.mount_as == mount_point:
part.mount_as, part.format_as = '', ''
installer.setup.print_setup()
def partitions_popup_menu(widget, event):
if event.button != 3: return
model, iter = installer.builder.get_object("treeview_disks").get_selection().get_selected()
if not iter: return
partition = model.get_value(iter, IDX_PART_OBJECT)
if not partition: return
partition_type = model.get_value(iter, IDX_PART_TYPE)
if (partition.partition.type == parted.PARTITION_EXTENDED or
partition.partition.number == -1 or
"swap" in partition_type):
return
menu = Gtk.Menu()
menuItem = Gtk.MenuItem(_("Edit"))
menuItem.connect("activate", edit_partition_dialog, None, None)
menu.append(menuItem)
menuItem = Gtk.SeparatorMenuItem()
menu.append(menuItem)
menuItem = Gtk.MenuItem(_("Assign to /"))
menuItem.connect("activate", lambda w: assign_mount_point(partition, '/', 'ext4'))
menu.append(menuItem)
menuItem = Gtk.MenuItem(_("Assign to /home"))
menuItem.connect("activate", lambda w: assign_mount_point(partition, '/home', ''))
menu.append(menuItem)
if installer.setup.gptonefi:
menuItem = Gtk.SeparatorMenuItem()
menu.append(menuItem)
menuItem = Gtk.MenuItem(_("Assign to /boot/efi"))
menuItem.connect("activate", lambda w: assign_mount_point(partition, EFI_MOUNT_POINT, ''))
menu.append(menuItem)
menu.show_all()
menu.popup(None, None, None, None, 0, event.time)
def manually_edit_partitions(widget):
""" Edit only known disks in gparted, selected one first """
model, iter = installer.builder.get_object("treeview_disks").get_selection().get_selected()
preferred = model[iter][-1] if iter else '' # prefer disk currently selected and show it first in gparted
disks = ' '.join(sorted((disk for disk,desc in model.disks), key=lambda disk: disk != preferred))
os.system('umount ' + disks) # umount disks (if possible) so gparted works out-of-the-box
os.popen('gparted {} &'.format(disks))
def build_grub_partitions():
grub_model = Gtk.ListStore(str)
try: preferred = [p.partition.disk.device.path for p in installer.setup.partitions if p.mount_as == '/'][0]
except IndexError: preferred = ''
devices = sorted(list(d[0] for d in installer.setup.partition_setup.disks) +
list(filter(None, (p.name for p in installer.setup.partitions))),
key=lambda path: path != preferred and path)
for p in devices: grub_model.append([p])
installer.builder.get_object("combobox_grub").set_model(grub_model)
installer.builder.get_object("combobox_grub").set_active(0)
class PartitionSetup(Gtk.TreeStore):
def __init__(self):
super(PartitionSetup, self).__init__(str, # path
str, # type (fs)
str, # description (OS)
str, # format to
str, # mount point
str, # size
str, # free space
object, # partition object
str) # disk device path
installer.setup.partitions = []
installer.setup.partition_setup = self
self.html_disks, self.html_chunks = {}, defaultdict(list)
os.popen('mkdir -p ' + TMP_MOUNTPOINT)
installer.setup.gptonefi = is_efi_supported()
self.disks = get_disks()
print 'Disks: ', self.disks
already_done_full_disk_format = False
for disk_path, disk_description in self.disks:
print " Analyzing path='%s' description='%s'" % (disk_path, disk_description)
disk_device = parted.getDevice(disk_path)
print " - Found the device..."
try:
disk = parted.Disk(disk_device)
print " - Found the disk..."
except Exception, detail:
print " - Found an issue while looking for the disk: %s" % detail
from frontend.gtk_interface import QuestionDialog
dialog = QuestionDialog(_("Installation Tool"),
_("No partition table was found on the hard drive: %s. Do you want the installer to create a set of partitions for you? Note: This will ERASE ALL DATA present on this disk.") % disk_description,
None, installer.window)
if not dialog: continue # the user said No, skip this disk
try:
installer.window.get_window().set_cursor(Gdk.Cursor.new(Gdk.CursorType.WATCH))
print "Performing a full disk format"
if not already_done_full_disk_format:
assign_mount_format = full_disk_format(disk_device)
already_done_full_disk_format = True
else:
full_disk_format(disk_device) # Format but don't assign mount points
installer.window.get_window().set_cursor(None)
print "Done full disk format"
disk = parted.Disk(disk_device)
print "Got disk!"
except Exception as second_exception:
installer.window.get_window().set_cursor(None)
print " - Found another issue while looking for the disk: %s" % detail
continue # Something is wrong with this disk, skip it
disk_iter = self.append(None, (disk_description, '', '', '', '', '', '', None, disk_path))
print " - Looking at partitions..."
free_space_partition = disk.getFreeSpacePartitions()
print " -> %d free space partitions" % len(free_space_partition)
primary_partitions = disk.getPrimaryPartitions()
print " -> %d primary partitions" % len(primary_partitions)
logical_partitions = disk.getLogicalPartitions()
print " -> %d logical partitions" % len(logical_partitions)
raid_partitions = disk.getRaidPartitions()
print " -> %d raid partitions" % len(raid_partitions)
lvm_partitions = disk.getLVMPartitions()
print " -> %d LVM partitions" % len(lvm_partitions)
partition_set = set(free_space_partition + primary_partitions + logical_partitions + raid_partitions + lvm_partitions)
print " -> set of %d partitions" % len(partition_set)
partitions = []
for partition in partition_set:
part = Partition(partition)
print(partition.path, part.size, part.raw_size)
# skip ranges <5MB
if part.raw_size > 5242880:
partitions.append(part)
else:
print("skipping ", partition.path, part.raw_size)
partitions = sorted(partitions, key=lambda part: part.partition.geometry.start)
print " - Found partitions..."
try: # assign mount_as and format_as if disk was just auto-formatted
for partition, (mount_as, format_as) in zip(partitions, assign_mount_format):
partition.mount_as = mount_as
partition.format_as = format_as
del assign_mount_format
except NameError: pass
print " - Iterating partitions..."
# Needed to fix the 1% minimum Partition.size_percent
sum_size_percent = sum(p.size_percent for p in partitions) + .5 # .5 for good measure
for partition in partitions:
print " . Appending partition %s..." % partition.name
partition.size_percent = round(partition.size_percent / sum_size_percent * 100, 1)
installer.setup.partitions.append(partition)
self.append(disk_iter, (partition.name,
'{}'.format(partition.color, partition.type),
partition.description,
partition.format_as,
partition.mount_as,
partition.size,
partition.free_space,
partition,
disk_path))
print " - Loading HTML view..."
self.html_disks[disk_path] = DISK_TEMPLATE.format(PARTITIONS_HTML=''.join(PARTITION_TEMPLATE.format(p) for p in partitions))
def get_html(self, disk):
if disk in self.html_disks:
return self.html_disks[disk]
else:
return ""
@idle
def show_error(message):
from frontend.gtk_interface import ErrorDialog
ErrorDialog(_("Installer"), message)
def full_disk_format(device, create_boot=False, create_swap=True):
# Create a default partition set up
disk_label = ('gpt' if device.getLength('B') > 2**32*.9 * device.sectorSize # size of disk > ~2TB
or installer.setup.gptonefi
else 'msdos')
return_code = os.system("parted -s %s mklabel %s" % (device.path, disk_label))
if return_code != 0:
show_error(_("The partition table couldn't be written for %s. Restart the computer and try again.") % device.path)
Gtk.main_quit()
sys.exit(1)
mkpart = (
# (condition, mount_as, format_as, mkfs command, size_mb)
# EFI
(installer.setup.gptonefi, EFI_MOUNT_POINT, 'vfat', 'mkfs.vfat {} -F 32 ', 300),
# boot
(create_boot, '/boot', 'ext4', 'mkfs.ext4 -F {}', 1024),
# swap - equal to RAM for hibernate to work well (but capped at ~8GB)
(create_swap, SWAP_MOUNT_POINT, 'swap', 'mkswap {}', min(8800, int(round(1.1/1024 * int(getoutput("awk '/^MemTotal/{ print $2 }' /proc/meminfo")), -2)))),
# root
(True, '/', 'ext4', 'mkfs.ext4 -F {}', 0),
)
run_parted = lambda cmd: os.system('parted --script --align optimal {} {} ; sync'.format(device.path, cmd))
start_mb = 2
partition_number = 0
partition_prefix = ""
if device.path.startswith("/dev/nvme"):
partition_prefix = "p"
for partition in mkpart:
if partition[0]:
partition_number = partition_number + 1
mkfs = partition[3]
size_mb = partition[4]
end = '{}MB'.format(start_mb + size_mb) if size_mb else '100%'
mkpart_cmd = 'mkpart primary {}MB {}'.format(start_mb, end)
print mkpart_cmd
run_parted(mkpart_cmd)
partition_path = "%s%s%d" % (device.path, partition_prefix, partition_number)
num_tries = 0
while True:
if os.path.exists(partition_path):
break
if num_tries < 5:
num_tries += 1
print ("Could not find %s, waiting 1s..." % partition_path)
os.system("sync")
time.sleep(1)
else:
show_error(_("The partition %s could not be created. The installation will stop. Restart the computer and try again.") % partition_path)
Gtk.main_quit()
sys.exit(1)
mkfs = mkfs.format(partition_path)
print mkfs
os.system(mkfs)
start_mb += size_mb + 1
if installer.setup.gptonefi:
run_parted('set 1 boot on')
return ((i[1], i[2]) for i in mkpart if i[0])
def to_human_readable(size):
for unit in [' ', _('kB'), _('MB'), _('GB'), _('TB'), 'PB', 'EB', 'ZB', 'YB']:
if size < 1000:
return "{:.1f} {}".format(size, unit)
size /= 1000
class Partition(object):
format_as = ''
mount_as = ''
def __init__(self, partition):
assert partition.type not in (parted.PARTITION_METADATA, parted.PARTITION_EXTENDED)
self.path = str(partition.path)
print " -> Building partition object for %s" % self.path
self.partition = partition
self.length = partition.getLength()
print " . length %d" % self.length
self.size_percent = max(1, round(80*self.length/partition.disk.device.getLength(), 1))
print " . size_percent %d" % self.size_percent
self.size = to_human_readable(partition.getLength('B'))
self.raw_size = partition.getLength('B')
print " . size %s" % self.size
# if not normal partition with /dev/sdXN path, set its name to '' and discard it from model
self.name = self.path if partition.number != -1 else ''
print " . name %s" % self.name
try:
self.type = partition.fileSystem.type
for fs in ('swap', 'hfs', 'ufs'): # normalize fs variations (parted.filesystem.fileSystemType.keys())
if fs in self.type:
self.type = fs
self.style = self.type
print " . type %s" % self.type
except AttributeError: # non-formatted partitions
self.type = {
parted.PARTITION_LVM: 'LVM',
parted.PARTITION_SWAP: 'swap',
parted.PARTITION_RAID: 'RAID', # Empty space on Extended partition is recognized as this
parted.PARTITION_PALO: 'PALO',
parted.PARTITION_PREP: 'PReP',
parted.PARTITION_LOGICAL: _('Logical partition'),
parted.PARTITION_EXTENDED: _('Extended partition'),
parted.PARTITION_FREESPACE: _('Free space'),
parted.PARTITION_HPSERVICE: 'HP Service',
parted.PARTITION_MSFT_RESERVED: 'MSFT Reserved',
}.get(partition.type, _('Unknown'))
self.style = {
parted.PARTITION_SWAP: 'swap',
parted.PARTITION_FREESPACE: 'freespace',
}.get(partition.type, '')
print " . type %s" % self.type
if "swap" in self.type:
self.mount_as = SWAP_MOUNT_POINT
# identify partition's description and used space
try:
print " . About to mount it..."
os.system('mount --read-only {} {}'.format(self.path, TMP_MOUNTPOINT))
size, free, self.used_percent, mount_point = getoutput("df {0} | grep '^{0}' | awk '{{print $2,$4,$5,$6}}' | tail -1".format(self.path)).split(None, 3)
self.raw_size = int(size)*1024
print " . size %s, free %s, self.used_percent %s, mount_point %s" % (size, free, self.used_percent, mount_point)
except ValueError:
print " . value error!"
if "swap" in self.type:
self.os_fs_info, self.description, self.free_space, self.used_percent = ': '+self.type, 'swap', '', 0
else:
print 'WARNING: Partition {} or type {} failed to mount!'.format(self.path, partition.type)
self.os_fs_info, self.description, self.free_space, self.used_percent = ': '+self.type, '', '', 0
print " . self.os_fs_info %s, self.description %s, self.free_space %s, self.used_percent %s" % (self.os_fs_info, self.description, self.free_space, self.used_percent)
else:
print " . About to find more about it..."
self.size = to_human_readable(int(size)*1024) # for mountable partitions, more accurate than the getLength size above
self.free_space = to_human_readable(int(free)*1024) # df returns values in 1024B-blocks by default
self.used_percent = self.used_percent.strip('%') or 0
description = ''
if path_exists(mount_point, 'etc/linuxmint/info'):
description = getoutput("cat %s/etc/linuxmint/info | grep GRUB_TITLE" % mount_point).replace('GRUB_TITLE', '').replace('=', '').replace('"', '').strip()
elif path_exists(mount_point, 'Windows/servicing/Version'):
description = 'Windows ' + {
'6.4':'10',
'6.3':'8.1',
'6.2':'8',
'6.1':'7',
'6.0':'Vista',
'5.2':'XP Pro x64',
'5.1':'XP',
'5.0':'2000',
'4.9':'ME',
'4.1':'98',
'4.0':'95',
}.get(getoutput('ls {}/Windows/servicing/Version'.format(mount_point))[:3], '')
elif path_exists(mount_point, 'Boot/BCD'):
description = 'Windows bootloader/recovery'
elif path_exists(mount_point, 'Windows/System32'):
description = 'Windows'
elif path_exists(mount_point, 'System/Library/CoreServices/SystemVersion.plist'):
description = 'Mac OS X'
elif path_exists(mount_point, 'etc/'):
description = getoutput("su -c '{{ . {0}/etc/lsb-release && echo $DISTRIB_DESCRIPTION; }} || \
{{ . {0}/etc/os-release && echo $PRETTY_NAME; }}' mint".format(mount_point)) or 'Unix'
else:
try:
if partition.active:
for flag in partition.getFlagsAsString().split(", "):
if flag in ["boot", "esp"]:
description = 'EFI System Partition'
self.mount_as = EFI_MOUNT_POINT
break
except Exception, detail:
# best effort
print "Could not read partition flags for %s: %s" % (self.path, detail)
self.description = description
self.os_fs_info = ': {0.description} ({0.type}; {0.size}; {0.free_space})'.format(self) if description else ': ' + self.type
print " . self.description %s self.os_fs_info %s" % (self.description, self.os_fs_info)
finally:
print " . umounting it"
os.system('umount ' + TMP_MOUNTPOINT + ' 2>/dev/null')
print " . done"
self.html_name = self.name.split('/')[-1]
self.html_description = self.description
if (self.size_percent < 10 and len(self.description) > 5):
self.html_description = "%s..." % self.description[0:5]
if (self.size_percent < 5):
#Not enough space, don't write the name
self.html_name = ""
self.html_description = ""
self.color = {
# colors approximately from gparted (find matching set in usr/share/disk-partitions.html)
'btrfs': '#636363',
'exfat': '#47872a',
'ext2': '#2582a0',
'ext3': '#2582a0',
'ext4': '#21619e',
'fat16': '#47872a',
'fat32': '#47872a',
'hfs': '#636363',
'jfs': '#636363',
'swap': '#be3a37',
'ntfs': '#66a6a8',
'reiserfs': '#636363',
'ufs': '#636363',
'xfs': '#636363',
'zfs': '#636363',
parted.PARTITION_EXTENDED: '#a9a9a9',
}.get(self.type, '#a9a9a9')
def print_partition(self):
print "Device: %s, format as: %s, mount as: %s" % (self.path, self.format_as, self.mount_as)
class PartitionDialog(object):
def __init__(self, path, mount_as, format_as, type):
glade_file = RESOURCE_DIR + 'interface.ui'
self.builder = Gtk.Builder()
self.builder.add_from_file(glade_file)
self.window = self.builder.get_object("dialog")
self.window.set_title(_("Edit partition"))
self.builder.get_object("label_partition").set_markup("%s" % _("Device:"))
self.builder.get_object("label_partition_value").set_label(path)
self.builder.get_object("label_use_as").set_markup(_("Format as:"))
self.builder.get_object("label_mount_point").set_markup(_("Mount point:"))
self.builder.get_object("button_cancel").set_label(_("Cancel"))
self.builder.get_object("button_ok").set_label(_("OK"))
# Build supported filesystems list
filesystems = ['', 'swap']
for path in ["/bin", "/sbin"]:
for fs in getoutput('echo %s/mkfs.*' % path).split():
filesystems.append(fs.split("mkfs.")[1])
filesystems = sorted(filesystems)
filesystems = sorted(filesystems, key=lambda x: 0 if x in ('', 'ext4') else 1 if x == 'swap' else 2)
model = Gtk.ListStore(str)
for i in filesystems:
model.append([i])
self.builder.get_object("combobox_use_as").set_model(model)
self.builder.get_object("combobox_use_as").set_active(filesystems.index(format_as))
# Build list of pre-provided mountpoints
combobox = self.builder.get_object("comboboxentry_mount_point")
model = Gtk.ListStore(str, str)
for i in ["/", "/@", "/home", "/@home", "/boot", "/boot/efi", "/srv", "/tmp", "swap"]:
model.append(["", i])
combobox.set_model(model)
combobox.set_entry_text_column(1)
combobox.set_id_column(1)
combobox.get_child().set_text(mount_as)
def show(self):
response = self.window.run()
w = self.builder.get_object("comboboxentry_mount_point")
mount_as = w.get_child().get_text().strip()
w = self.builder.get_object("combobox_use_as")
format_as = w.get_model()[w.get_active()][0]
self.window.destroy()
if response in (Gtk.ResponseType.YES, Gtk.ResponseType.APPLY, Gtk.ResponseType.OK, Gtk.ResponseType.ACCEPT):
response_is_ok = True
else:
response_is_ok = False
return response_is_ok, mount_as, format_as