Kaydet (Commit) 69eb52fe authored tarafından Suleyman Poyraz's avatar Suleyman Poyraz

add system literals for cross-platform package managing

üst 75b4291d
## Module 'strutils'
----------------------------------------
# string/list/functional utility functions
* any (pred,seq)
* concat (l):
Concatenate a list of lists.
* every (pred,seq)
* human_readable_rate (size)
* human_readable_size (size)
* multisplit (str,chars):
Split str with any of the chars.
* prefix (a,b):
Check if sequence a is a prefix of sequence b.
* remove_prefix (a,b):
Remove prefix a from sequence b.
* same (l):
Check if all elements of a sequence are equal.
* strlist (l):
Concatenate string reps of l's elements.
* unzip (seq)
## Module 'dirutils'
----------------------------------------
# dirutils module provides basic directory functions.
* dir_size (dir)
Calculate the size of files under a directory.
* remove_dir (path)
Remove all content of a directory.
## Module 'sysutils'
----------------------------------------
# sysutils module provides basic system utilities.
* FileLock class:
Create a file lock for operations
* find_executable (exec_name)
* find the given executable in PATH
* getKernelOption (option):
Get a dictionary of args for the given kernel command line option
* touch (filename):
Update file modification date, create file if necessary
* capture ()
Capture output of the command without running a shell
* run ()
Run a command without running a shell, only output errors
* run_full ()
Run a command without running a shell, with full output
* run_quiet ()
Run the command without running a shell and no output
## Module 'csapi'
----------------------------------------
# csapi module provides basic system configuration utility function.
* atoi():
Convert a string into an integer.
* settimeofday():
Set system date.
* changeroute():
Change the route table.
## Module 'iniutils'
----------------------------------------
# initutils module provides ini style configuration file utils.
## Module 'diskutils'
----------------------------------------
# diskutils module provides EDD class to query device boot order and device information utilities.
## Module 'localedata'
----------------------------------------
# localedata module provides locale information.
## Module 'localedata'
----------------------------------------
# network utility functions
#-*- coding: utf-8 -*-
#
# Copyright (C) 2006-2010 TUBITAK/UEKAE
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation; either version 2 of the License, or (at your option)
# any later version.
#
# Please read the COPYING file.
#
_all__ = ["csapi",
"diskutils",
"fileutils",
"fstabutils",
"grubutils",
"iniutils",
"localedata",
"netutils",
"shellutils",
"strutils",
"sysutils"]
/*
** Copyright (c) 2005, TUBITAK/UEKAE
**
** This program is free software; you can redistribute it and/or modify it
** under the terms of the GNU General Public License as published by the
** Free Software Foundation; either version 2 of the License, or (at your
** option) any later version. Please read the COPYING file.
*/
#include <Python.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <arpa/inet.h>
#include <net/route.h>
#include <unistd.h>
#include <string.h>
static PyObject *
csapi_atoi(PyObject *self, PyObject *args)
{
char *str;
int i;
if (!PyArg_ParseTuple(args, "s", &str))
return NULL;
i = atoi(str);
return Py_BuildValue("i", i);
}
static PyObject *
csapi_settimeofday(PyObject *self, PyObject *args)
{
struct timeval tv;
double t;
if (!PyArg_ParseTuple(args, "d", &t))
return NULL;
tv.tv_sec = t;
tv.tv_usec = 0;
if (0 != settimeofday(&tv, NULL))
return NULL;
Py_INCREF(Py_None);
return Py_None;
}
static PyObject *
csapi_changeroute(PyObject *self, PyObject *args)
{
struct rtentry route;
struct sockaddr_in gw, dst, mask;
int skfd, func;
char *gw_ip, *dst_ip, *mask_ip;
if (!PyArg_ParseTuple(args, "isss", &func, &gw_ip, &dst_ip, &mask_ip))
return NULL;
skfd = socket(AF_INET, SOCK_DGRAM, 0);
if (skfd < 0)
return NULL;
memset(&gw, 0, sizeof(struct sockaddr));
memset(&dst, 0, sizeof(struct sockaddr));
memset(&mask, 0, sizeof(struct sockaddr));
gw.sin_family = AF_INET;
dst.sin_family = AF_INET;
mask.sin_family = AF_INET;
gw.sin_addr.s_addr = inet_addr(gw_ip);
dst.sin_addr.s_addr = inet_addr(dst_ip);
mask.sin_addr.s_addr = inet_addr(mask_ip);
memset(&route, 0, sizeof(struct rtentry));
route.rt_dst = *(struct sockaddr *)&dst;
route.rt_gateway = *(struct sockaddr *)&gw;
route.rt_genmask = *(struct sockaddr *)&mask;
route.rt_flags = RTF_UP | RTF_GATEWAY;
if(ioctl(skfd, func, &route) < 0) {
return NULL;
}
close(skfd);
Py_INCREF(Py_None);
return Py_None;
}
static PyMethodDef methods[] = {
{ "atoi", csapi_atoi, METH_VARARGS,
"Convert a string into an integer." },
{ "settimeofday", csapi_settimeofday, METH_VARARGS,
"Set system date." },
{ "changeroute", csapi_changeroute, METH_VARARGS,
"Change the route table."},
{ NULL, NULL, 0, NULL }
};
static struct PyModuleDef csapimodule ={
PyModuleDef_HEAD_INIT,
"csapi",
NULL,
-1,
methods
};
PyMODINIT_FUNC
PyInit_csapi(void)
{
return PyModule_Create(&csapimodule);
}
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018, Suleyman POYRAZ (Zaryob)
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation; either version 2 of the License, or (at your option)
# any later version.
#
# Please read the COPYING file.
#
"""diskutils module provides EDD class to query device boot order."""
import os
import struct
import binascii
import subprocess
sysfs_path = "/sys"
def sysValue(*paths):
path = os.path.join(sysfs_path, *paths)
f = open(path)
data = f.read().rstrip("\n")
f.close()
return data
def idsQuery(name, vendor, device):
f = file(name)
flag = 0
company = ""
for line in f.readlines():
if flag == 0:
if line.startswith(vendor):
flag = 1
company = line[5:].strip()
else:
if line.startswith("\t"):
if line.startswith("\t" + device):
return "%s - %s" % (line[6:].strip(), company)
elif not line.startswith("#"):
flag = 0
if company != "":
return "%s (%s)" % (company, device)
else:
return "Unknown (%s:%s)" % (vendor, device)
class EDD:
def __init__(self):
self.edd_dir = "/sys/firmware/edd"
self.edd_offset = 440
self.edd_len = 4
def blockDevices(self):
devices = []
for sysfs_dev in [dev for dev in os.listdir("/sys/block") \
if not dev.startswith(("fd", "loop", "ram", "sr"))]:
dev_name = os.path.basename(sysfs_dev).replace("!", "/")
devices.append("/dev/" + dev_name)
devices.sort()
return devices
def match_sys(self, _a):
b = struct.unpack("2s2s2s2s", _a)
return "0x"+b[3]+b[2]+b[1]+b[0]
def get_edd_sig(self, _n):
sigfile = "%s/int13_dev%s/mbr_signature" % (self.edd_dir, _n)
if os.path.exists(sigfile):
sig = open(sigfile).read().strip("\n")
else:
sig = None
return sig
def get_mbr_sig(self, _f):
f = open(_f)
f.seek(self.edd_offset)
a = f.read(self.edd_len)
f.close()
sig = self.match_sys(binascii.b2a_hex(a))
return sig
def list_edd_signatures(self):
sigs = {}
if os.path.exists(self.edd_dir):
for d in os.listdir(self.edd_dir):
bios_num = d[9:]
sig = self.get_edd_sig(bios_num)
if sig:
sigs[bios_num] = sig
else:
print("please insert edd module")
return sigs
def list_mbr_signatures(self):
sigs = {}
for d in self.blockDevices():
try:
sigs[self.get_mbr_sig(d)] = d
except IOError:
pass
return sigs
def getDeviceMap():
"""
Returns list of devices and their GRUB reprensentations.
Returns:
List of devices in ("hd0", "/dev/sda") format.
"""
# edd module is required
subprocess.call(["/sbin/modprobe", "edd"])
# get signatures
edd = EDD()
mbr_list = edd.list_mbr_signatures()
edd_list = edd.list_edd_signatures()
# sort keys
edd_keys = list(edd_list.keys())
edd_keys.sort()
devices = []
# build device map
i = 0
for bios_num in edd_keys:
edd_sig = edd_list[bios_num]
if edd_sig in mbr_list:
devices.append(("hd%s" % i, mbr_list[edd_sig]))
i += 1
return devices
def parseLinuxDevice(device):
"""
Parses Linux device address and returns disk, partition and their GRUB representations.
Arguments:
device: Linux device address (e.g. "/dev/sda1")
Returns:
None on error, (LinuxDisk, PartNo, GrubDev, GrubPartNo) on success
"""
for grub_disk, linux_disk in getDeviceMap():
if device.startswith(linux_disk):
part = device.replace(linux_disk, "", 1)
if part:
# If device address ends with a number,
# "p" is used before partition number
if part.startswith("p"):
grub_part = int(part[1:]) - 1
else:
grub_part = int(part) - 1
return linux_disk, part, grub_disk, grub_part
return None
def parseGrubDevice(device):
"""
Parses GRUB device address and returns disk, partition and their Linux representations.
Arguments:
device: GRUB device address (e.g. "(hd0,0)")
Returns:
None on error, (GrubDev, GrubPartNo, LinuxDisk, PartNo) on success
"""
try:
disk, part = device.split(",")
except ValueError:
return None
disk = disk[1:]
part = part[:-1]
if not part.isdigit():
return None
for grub_disk, linux_disk in getDeviceMap():
if disk == grub_disk:
linux_part = int(part) + 1
# If device address ends with a number,
# "p" is used before partition number
if linux_disk[-1].isdigit():
linux_part = "p%s" % linux_part
return grub_disk, part, linux_disk, linux_part
return None
def grubAddress(device):
"""
Translates Linux device address to GRUB address.
Arguments:
device: Linux device address (e.g. "/dev/sda1")
Returns:
None on error, GRUB device on success
"""
try:
linux_disk, linux_part, grub_disk, grub_part = parseLinuxDevice(device)
except (ValueError, TypeError):
return None
return "(%s,%s)" % (grub_disk, grub_part)
def linuxAddress(device):
"""
Translates GRUB device address to Linux address.
Arguments:
device: GRUB device address (e.g. "(hd0,0)")
Returns:
None on error, Linux device on success
"""
try:
grub_disk, grub_part, linux_disk, linux_part = parseGrubDevice(device)
except (ValueError, TypeError):
return None
return "%s%s" % (linux_disk, linux_part)
def getDeviceByLabel(label):
"""
Find Linux device address from it's label.
Arguments:
label: Device label
Returns:
None on error, Linux device on success
"""
fn = os.path.join("/dev/disk/by-label/%s" % label)
if os.path.islink(fn):
return "/dev/%s" % os.readlink(fn)[6:]
else:
return None
def getDeviceByUUID(uuid):
"""
Find Linux device address from it's UUID.
Arguments:
uuid: Device UUID
Returns:
None on error, Linux device on success
"""
fn = os.path.join("/dev/disk/by-uuid/%s" % uuid)
if os.path.islink(fn):
return "/dev/%s" % os.readlink(fn)[6:]
else:
return None
def getDevice(path):
"""
Gives device address of a path.
Arguments:
path: Directory path
Returns:
Device address (e.g. "/dev/sda1")
"""
for mount in os.popen("/bin/mount").readlines():
mount_items = mount.split()
if mount_items[2] == path:
if mount_items[0].startswith("/dev"):
return mount_items[0]
elif mount_items[0].startswith("LABEL="):
return getDeviceByLabel(mount_items[0].split('=', 1)[1])
elif mount_items[0].startswith("UUID="):
return getDeviceByUUID(mount_items[0].split('=', 1)[1])
def getPartitions():
"""
Returns list of all partitions.
Returns:
List of partitions which includes metadata of partition
or None (if blkid not found) e.g.:
{'/dev/sda1': {label :'PARDUS_ROOT', # (if exists)
uuid :'b3cf94b9-ed79-43e2-8b22-b9054a529f01',
fstype :'ext4'}, ... }
"""
if not os.path.exists('/sbin/blkid'):
return None
cmd = os.popen('/sbin/blkid')
result = {}
try:
for line in cmd.readlines():
partition = line.split(':')[0]
if partition not in result:
result[partition] = {}
for info in line.split():
if info.startswith('LABEL='):
result[partition]['label'] = info[6:].strip('"')
if info.startswith('UUID='):
result[partition]['uuid'] = info[5:].strip('"')
if info.startswith('TYPE='):
result[partition]['fstype'] = info[5:].strip('"')
except:
return None
else:
return result
def getRoot():
"""
Gives current root device address.
Returns:
Device address (e.g. "/dev/sda1")
"""
return getDevice("/")
def getBoot():
"""
Gives current boot device address.
Returns:
Device address (e.g. "/dev/sda1")
"""
if os.path.ismount("/boot"):
return getDevice("/boot")
else:
return getRoot()
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018, Suleyman POYRAZ (Zaryob)
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation; either version 2 of the License, or (at your option)
# any later version.
#
# Please read the COPYING file.
#
"""sysutils module provides basic file I/0 utility functions."""
import os
import time
import fcntl
class FileLock:
def __init__(self, filename):
self.filename = filename
self.fd = None
def lock(self, shared=False, timeout=-1):
_type = fcntl.LOCK_EX
if shared:
_type = fcntl.LOCK_SH
if timeout != -1:
_type |= fcntl.LOCK_NB
self.fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT, 0o600)
if self.fd == -1:
raise IOError("Cannot create lock file")
while True:
try:
fcntl.flock(self.fd, _type)
return
except IOError:
if timeout > 0:
time.sleep(0.2)
timeout -= 0.2
else:
raise
def unlock(self):
fcntl.flock(self.fd, fcntl.LOCK_UN)
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018, Suleyman POYRAZ (Zaryob)
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation; either version 2 of the License, or (at your option)
# any later version.
#
# Please read the COPYING file.
#
"""/etc/fstab parser facility."""
import os
import subprocess
REMOTE_FS_LIST = [
"nfs",
"nfs4",
"cifs",
"ncpfs",
]
def get_device_by_label(label):
"""Returns the devpath associated with the given label."""
devpath = os.path.join("/dev/disk/by-label", label)
device = None
try:
device = os.path.basename(os.readlink(devpath))
except OSError:
pass
else:
return os.path.join("/dev", device)
def get_device_by_uuid(uuid):
"""Returns the devpath associated with the given UUID."""
devpath = os.path.join("/dev/disk/by-uuid", uuid)
device = None
try:
device = os.path.basename(os.readlink(devpath))
except OSError:
pass
else:
return os.path.join("/dev", device)
class FstabEntry(object):
"""Class representing an fstab entry."""
def __init__(self, entry):
"""
fs: First field in fstab file which determines either the device or
special filesystems like proc, sysfs, debugfs, etc.
mountpoint: The mountpoint to which the fs will be mounted.
type: Filesystem type. Can be none, ignore or VFSTYPE.
opts: Extra options to pass to the mount helper.
dump: Defines whether the filesystem will be dumped, optional field.
fsck: Defines whether the filesystem will be fsck'ed regularly.
"""
fields = entry.strip().split()
# If number of fields is < 6, either fs_freq or fs_passno is
# not given. So we omit them first
self.__fs_freq = None
self.__fs_passno = None
self.__fs_spec = fields[0]
self.__fs_file = fields[1]
self.__fs_vfstype = fields[2]
self.__fs_mntopts = fields[3]
if len(fields) == 6:
self.__fs_freq = fields[4]
self.__fs_passno = fields[5]
self.__volume_label = None
self.__volume_uuid = None
self.__device = None
# Entry properties
self.__is_swap = self.__fs_vfstype == "swap"
self.__entry_ignored = self.__fs_vfstype == "ignore"
self.__bind_move_mount = self.__fs_vfstype == "none"
if self.__fs_spec.startswith("UUID="):
self.__volume_uuid = self.__fs_spec.split("=")[-1]
self.__device = get_device_by_uuid(self.__volume_uuid)
if self.__fs_spec.startswith("LABEL="):
self.__volume_label = self.__fs_spec.split("=")[-1]
self.__device = get_device_by_label(self.__volume_label)
def __str__(self):
return """\
fs_spec: %s
fs_file: %s
fs_vfstype: %s
fs_mntopts: %s
fs_freq: %s
fs_passno: %s
""" % (self.__fs_spec,
self.__fs_file,
self.__fs_vfstype,
self.__fs_mntopts,
self.__fs_freq,
self.__fs_passno)
def get_mount_command(self):
"""Returns the UNIX command line for mounting this entry."""
cmd = ["/bin/mount"]
# Append vfs type
#cmd.append("-t %s" % self.get_fs_vfstype())
# Append mount options
#cmd.append("-o %s" % self.get_fs_mntopts())
#cmd.append(self.get_fs_spec())
cmd.append(self.get_fs_file())
return cmd
def get_umount_command(self):
"""Returns the UNIX command line for unmounting this entry."""
cmd = ["/bin/umount"]
cmd.append(self.get_fs_file())
return cmd
def mount(self):
"""Mounts the given entry if not mounted."""
if not self.is_mounted():
return subprocess.call(self.get_mount_command())
def unmount(self):
"""Unmounts the given entry if mounted."""
if self.is_mounted():
return subprocess.call(self.get_umount_command())
def get_volume_label(self):
"""Returns the volume label."""
return self.__volume_label
def get_volume_uuid(self):
"""Returns the volume UUID."""
return self.__volume_uuid
def get_device_path(self):
"""Returns /dev/path path for the given entry."""
return self.__device
def get_fs_spec(self):
"""Returns the first field (fs_spec) of the entry."""
return self.__fs_spec
def get_fs_file(self):
"""Returns the second field (fs_file) of the entry."""
return self.__fs_file
def get_fs_vfstype(self):
"""Returns the third field (fs_vfstype) of the entry."""
return self.__fs_vfstype
def get_fs_mntopts(self, split=False):
"""Returns the fourth field (fs_mntopts) of the entry."""
opts = self.__fs_mntopts
if split:
opts = opts.split(",")
return opts
def get_fs_freq(self):
"""Returns the fifth field (fs_freq) of the entry."""
return self.__fs_freq
def get_fs_passno(self):
"""Returns the sixth field (fs_passno) of the entry."""
return self.__fs_passno
def has_mount_option(self, opt):
"""Checks whether the given option exists in fs_mntops."""
return opt in self.get_fs_mntopts(split=True)
def is_swap_entry(self):
"""Returns True if the entry corresponds to a swap area."""
return self.__is_swap
def is_rootfs(self):
"""Returns True if the entry corresponds to /."""
return self.__fs_file == "/"
def is_ignored(self):
"""Returns True if the entry should be ignored."""
return self.__entry_ignored
def is_remote_mount(self):
"""Returns True if the entry corresponds to a remote mount."""
return self.get_fs_vfstype() in REMOTE_FS_LIST
def is_nfs(self):
"""Returns True if the entry corresponds to NFS or NFS4."""
return self.get_fs_vfstype() in ("nfs", "nfs4")
def is_mounted(self):
"""Returns True if the entry is currently mounted."""
# Always parse /proc/mounts for maximum atomicity
for mount in open("/proc/mounts", "r").read().strip().split("\n"):
if mount.split()[1] == self.__fs_file:
return True
return False
class Fstab(object):
"""Class representing an fstab file."""
def __init__(self, _fstab="/etc/fstab"):
"""Parses fstab file given as the first parameter."""
self.fstab_file = _fstab
self.__entries = []
with open(self.fstab_file, "r") as fstab_entries:
for entry in fstab_entries:
if entry.strip() and not entry.startswith("#"):
self.__entries.append(FstabEntry(entry))
def get_entries(self):
"""Returns fstab entries in a list."""
return self.__entries
def contains_remote_mounts(self):
"""Returns True if the fstab file contains remote mounts."""
for entry in self.get_entries():
if entry.is_remote_mount():
return True
return False
def mount_file_systems_with_type(self, vfs_types):
"""Mounts all file systems having a
vfstype in [vfs_types] if not mounted."""
for entry in self.get_entries():
if not entry.is_mounted() and entry.get_fs_vfstype() in vfs_types:
entry.mount()
def unmount_file_systems_with_type(self, vfs_types):
"""Unmounts all file systems having a
vfstype in [vfs_types] if mounted."""
for entry in self.get_entries():
if entry.is_mounted() and entry.get_fs_vfstype() in vfs_types:
entry.unmount()
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018, Suleyman POYRAZ (Zaryob)
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation; either version 2 of the License, or (at your option)
# any later version.
#
# Please read the COPYING file.
#
"""grubutils module provides classes for parsing grub.conf"""
import os.path
class grubCommand:
"""Grub menu command"""
def __init__(self, key, options=[], value=""):
self.key = key
self.options = options
self.value = value
def __str__(self):
if self.options:
return "%s %s %s" % (self.key, " ".join(self.options), self.value)
else:
return "%s %s" % (self.key, self.value)
class grubEntry:
"""Grub menu entry"""
def __init__(self, title):
self.title = title
self.commands = []
def listCommands(self):
"""Returns list of commands used in entry"""
return [x.key for x in self.commands]
def setCommand(self, key, value, opts=[], append=False):
"""Adds a new command to entry. Optional append argument allows addition of multiple commands like 'map'."""
if not append:
self.unsetCommand(key)
self.commands.append(grubCommand(key, opts, value))
def getCommand(self, key, only_last=True):
"""Returns command object. If only_last is False, returns a list of commands named 'key'."""
commands = [x for x in self.commands if x.key == key]
if only_last:
try:
return commands[-1]
except IndexError:
return None
return commands
def unsetCommand(self, key):
"""Removes 'key' from commands."""
self.commands = [x for x in self.commands if x.key != key]
def __str__(self):
conf = []
conf.append("title %s" % self.title)
for command in self.commands:
conf.append(str(command))
return "\n".join(conf)
class grubConf:
"""Grub configuration class."""
def __init__(self):
self.options = {}
self.entries = []
self.header = []
self.index = 0
def setHeader(self, header):
"""Sets grub.conf header"""
self.header = header.split("\n")
def __parseLine(self, line):
"""Parses single grub.conf line and returns a tupple of key, value and options."""
line = line.replace("\t"," ")
line = line.strip()
try:
key, data = line.split(" ", 1)
except ValueError:
key = line
data = ""
key = key.strip(" =")
data = data.strip(" =")
options = []
values = []
option = True
for x in data.split():
if option and x.startswith("--"):
options.append(x)
else:
values.append(x)
option = False
return key, " ".join(values), options
def parseConf(self, filename):
"""Parses a grub.conf file"""
self.options = {}
self.entries = []
option = True
entry = None
for line in file(filename):
if not line.strip() or line.startswith("#"):
continue
key, value, opts = self.__parseLine(line)
if key == "title":
option = False
if entry:
self.entries.append(entry)
entry = None
if option:
self.options[key] = value
else:
if key == "title":
entry = grubEntry(value)
else:
entry.setCommand(key, value, opts, append=True)
if entry:
self.entries.append(entry)
default = os.path.join(os.path.dirname(filename), "default")
if os.path.exists(default):
try:
self.index = int(file(default).read().split("\0")[0])
except ValueError:
self.index = 0
def getSavedIndex(self):
"""Return last booted entry index."""
return self.index
def __str__(self):
conf = []
if self.header:
for h in self.header:
conf.append("# %s" % h)
conf.append("")
if self.options:
for key, value in list(self.options.items()):
line = "%s %s" % (key, value)
conf.append(line)
conf.append("")
for index, entry in enumerate(self.entries):
if entry.getCommand("savedefault"):
entry.setCommand("savedefault", str(index))
conf.append(str(entry))
conf.append("")
return "\n".join(conf)
def write(self, filename):
"""Writes grub configuration to file."""
open(filename, "w").write(str(self))
def listOptions(self):
"""Returns list of options."""
return list(self.options.keys())
def setOption(self, key, value):
"""Sets an option."""
self.options[key] = value
def unsetOption(self, key):
"""Unsets an option."""
del self.options[key]
def getOption(self, key, default=""):
"""Returns value of an option."""
return self.options.get(key, default)
def getAllOptions(self):
"""Returns all options."""
return ["%s %s" % (key, value) for key, value in list(self.options.items())]
def listEntries(self):
"""Returns a list of entries."""
return [x.title for x in self.entries]
def addEntry(self, entry, index=-1):
"""Adds an entry object."""
if index == -1:
self.entries.append(entry)
else:
self.entries.insert(index, entry)
def getEntry(self, index):
"""Returns an entry object."""
return self.entries[index]
def indexOf(self, entry):
"""Returns index of an entry object."""
return self.entries.index(entry)
def removeEntry(self, entry):
"""Removes an entry object."""
self.entries.remove(entry)
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018, Suleyman POYRAZ (Zaryob)
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation; either version 2 of the License, or (at your option)
# any later version.
#
# Please read the COPYING file.
#
"""initutils module provides ini style configuration file utils."""
import os
import configparser
from .fileutils import FileLock
class iniDB:
def __init__(self, db_file, db_mode=0o600):
try:
os.makedirs(os.path.dirname(db_file))
except OSError:
pass
self.db_file = db_file
if not os.path.exists(db_file):
self.__writelock()
open(db_file, "w").close()
os.chmod(db_file, db_mode)
self.__unlock()
self.__readlock()
self.cp = configparser.ConfigParser()
try:
self.cp.read(db_file)
except:
print(("Network configuration file %s is corrupt" % db_file))
self.__unlock()
def __writelock(self):
self.fl = FileLock(self.db_file)
self.fl.lock(shared=False)
def __readlock(self):
self.fl = FileLock(self.db_file)
self.fl.lock(shared=True)
def __unlock(self):
self.fl.unlock()
def listDB(self):
profiles = self.cp.sections()
if "general" in profiles:
profiles.remove("general")
return profiles
def getDB(self, name):
dct = {}
if name in self.cp.sections():
dct = dict(self.cp.items(name))
return dct
def setDB(self, name, dct):
for key, value in list(dct.items()):
if value:
if name not in self.cp.sections():
self.cp.add_section(name)
self.cp.set(name, key, value)
elif name in self.cp.sections():
self.cp.remove_option(name, key)
# FIXME: This is an ugly hack...
db = iniDB(self.db_file)
for nm in db.listDB():
if nm == name:
continue
for key, value in list(db.getDB(nm).items()):
self.cp.set(nm, key, value)
self.__writelock()
fp = open(self.db_file, "w")
self.cp.write(fp)
fp.close()
self.__unlock()
def remDB(self, name):
self.cp.remove_section(name)
self.__writelock()
fp = open(self.db_file, "w")
self.cp.write(fp)
fp.close()
self.__unlock()
class iniParserError(Exception):
"""
Base exception for iniParser errors.
"""
pass
class iniParser:
"""
INI file parsing and manipulation class.
ip = iniParser("my.ini", [chmod=0600, [quiet=False]])
ip.listSections() => ["section1", "section2", ...]
ip.getSection("section1") => {"field1": "value1", "field2": "value2"}
ip.setSection("section1",{"field1": "value1", "field2": "value2"})
ip.removeSection("section2")
"""
def __init__(self, inifile, chmod=0o600, quiet=False):
"""
Constuctor. Creates INI file if it doesn't exist and sets file mode.
"""
self.inifile = inifile
self.chmod = chmod
self.quiet = quiet
try:
os.makedirs(os.path.dirname(inifile))
except OSError:
pass
if not os.path.exists(inifile):
self.__writeLock()
open(inifile, "w").close()
self.__unlock()
os.chmod(inifile, chmod)
def __writeLock(self):
"""
Puts a write lock to file.
"""
self.fl = FileLock(self.inifile)
self.fl.lock(shared=False)
def __readLock(self):
"""
Puts a read lock to file.
"""
self.fl = FileLock(self.inifile)
self.fl.lock(shared=True)
def __unlock(self):
"""
Removes lock from file.
"""
self.fl.unlock()
def __readIni(self):
"""
Gets content of the INI.
"""
ini = configparser.ConfigParser()
try:
ini.read(self.inifile)
except configparser.Error:
ini = None
return ini
def __writeIni(self, ini):
"""
Writes INI to file.
"""
fp = open(self.inifile, "w")
ini.write(fp)
fp.close()
def listSections(self):
"""
Lists sections of INI file.
"""
self.__readLock()
ini = self.__readIni()
self.__unlock()
if not ini:
if self.quiet:
self.__fixIniFile()
return []
else:
raise iniParserError("File is corrupt: %s" % self.inifile)
return ini.sections()
def getSection(self, section):
"""
Returns a section of INI file.
"""
self.__readLock()
ini = self.__readIni()
self.__unlock()
if not ini:
if self.quiet:
self.__fixIniFile()
return {}
else:
raise iniParserError("File is corrupt: %s" % self.inifile)
if section not in ini.sections():
return {}
dct = {}
if section in ini.sections():
dct = dict(ini.items(section))
return dct
def setSection(self, section, dct):
"""
Sets a section of INI file.
"""
self.__writeLock()
ini = self.__readIni()
if not ini:
self.__unlock()
if self.quiet:
self.__fixIniFile()
self.setSection(section, dct)
return
else:
raise iniParserError("File is corrupt: %s" % self.inifile)
if section not in ini.sections():
ini.add_section(section)
for key, value in list(dct.items()):
if value:
ini.set(section, key, value)
elif section in ini.sections():
ini.remove_option(section, key)
self.__writeIni(ini)
self.__unlock()
def removeSection(self, section):
"""
Removes a section from INI file.
"""
self.__writeLock()
ini = self.__readIni()
if not ini:
self.__unlock()
if self.quiet:
self.__fixIniFile()
return
else:
raise iniParserError("File is corrupt: %s" % self.inifile)
ini.remove_section(section)
self.__writeIni(ini)
self.__unlock()
def __fixIniFile(self):
"""
Cleans bogus ini file.
"""
self.__writeLock()
open(self.inifile, "w").write("")
self.__unlock()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018, Suleyman POYRAZ (Zaryob)
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation; either version 2 of the License, or (at your option)
# any later version.
#
# Please read the COPYING file.
#
"""localedata module provides locale information."""
import gettext
__trans = gettext.translation('inary', fallback=True)
_ = __trans.gettext
class Keymap:
def __init__(self, console_layout, xkb_layout=None, xkb_variant="", name=None):
self.console_layout = console_layout
self.xkb_layout = xkb_layout or console_layout
self.xkb_variant = xkb_variant
self.name = name
class Language:
def __init__(self,
name,
locale,
console_font="iso01.16",
console_translation="8859-1",
keymaps = [Keymap("us")]):
self.name = name
self.locale = locale
self.console_font = console_font
self.console_translation = console_translation
self.keymaps = keymaps
for keymap in self.keymaps:
if keymap.name is None:
keymap.name = self.name
languages = {
"tr": Language(
name = _("Turkish"),
locale = "tr_TR.UTF-8",
console_font = "lat5u-16",
console_translation = "8859-9",
keymaps = [
Keymap("trq", "tr", name = _("Turkish Q")),
Keymap("trf", "tr", "f", _("Turkish F"))
]
),
"en": Language(
name = _("English"),
locale = "en_US.UTF-8"
),
"en_GB": Language(
name = _("English GB"),
locale = "en_GB.UTF-8",
keymaps = [Keymap("uk", "gb")]
),
"af": Language(
name = _("Afrikaans"),
locale = "af_ZA.UTF-8",
keymaps = [Keymap("us")]
),
"ar": Language(
name = _("Arabic"),
locale = "ar_SA.UTF-8",
keymaps = [Keymap("us", "ara")]
),
"be": Language(
name = _("Belgium"),
locale = "be_BY.UTF-8",
keymaps = [Keymap("be-latin1", "be")]
),
"bg": Language(
name = _("Bulgarian"),
locale = "bg_BG.UTF-8",
keymaps = [Keymap("bg")]
),
"ca": Language(
name = _("Catalan"),
locale = "ca_ES.UTF-8",
keymaps = [Keymap("es")]
),
"cy": Language(
name = _("Welsh"),
locale = "cy_GB.UTF-8",
keymaps = [Keymap("uk", "gb")]
),
"cz": Language(
name = _("Czech"),
locale = "cs_CZ.UTF-8",
keymaps = [Keymap("cz-lat2", "cz")]
),
"da": Language(
name = _("Danish"),
locale = "da_DK.UTF-8",
keymaps = [Keymap("dk")]
),
"de": Language(
name = _("German"),
locale = "de_DE.UTF-8",
keymaps = [Keymap("de-latin1-nodeadkeys", "de")]
),
"es": Language(
name = _("Spanish"),
locale = "es_ES.UTF-8",
keymaps = [Keymap("es")]
),
"et": Language(
name = _("Estonian"),
locale = "et_EE.UTF-8",
keymaps = [Keymap("et", "ee")]
),
"fi": Language(
name = _("Finnish"),
locale = "fi_FI.UTF-8",
keymaps = [Keymap("fi")]
),
"fr": Language(
name = _("French"),
locale = "fr_FR.UTF-8",
keymaps = [Keymap("fr-latin1", "fr")]
),
"gr": Language(
name = _("Greek"),
locale = "el_GR.UTF-8",
keymaps = [Keymap("gr")]
),
"hr": Language(
name = _("Croatian"),
locale = "hr_HR.UTF-8",
keymaps = [Keymap("croat", "hr")]
),
"hu": Language(
name = _("Hungarian"),
locale = "hu_HU.UTF-8",
console_font = "lat2a-16",
console_translation = "8859-2",
keymaps = [Keymap("hu")]
),
"is": Language(
name = _("Icelandic"),
locale = "is_IS.UTF-8",
keymaps = [Keymap("is-latin1", "is")]
),
"it": Language(
name = _("Italian"),
locale = "it_IT.UTF-8",
keymaps = [Keymap("it")]
),
"ja": Language(
name = _("Japanese"),
locale = "ja_JP.UTF-8",
keymaps = [Keymap("jp106", "jp")]
),
"mk": Language(
name = _("Macedonian"),
locale = "mk_MK.UTF-8",
keymaps = [Keymap("mk")]
),
"ml": Language(
name = _("Malayalam"),
locale = "ml_IN.UTF-8",
keymaps = [Keymap("us")]
),
"nb": Language(
name = _("Norwegian"),
locale = "nb_NO.UTF-8",
keymaps = [Keymap("no")]
),
"nl": Language(
name = _("Dutch"),
locale = "nl_NL.UTF-8",
keymaps = [Keymap("us")]
),
"pl": Language(
name = _("Polish"),
locale = "pl_PL.UTF-8",
keymaps = [Keymap("pl2", "pl")]
),
"pt": Language(
name = _("Portuguese"),
locale = "pt_PT.UTF-8",
keymaps = [Keymap("pt-latin1", "pt")]
),
"pt_BR": Language(
name = _("Brazilian"),
locale = "pt_BR.UTF-8",
keymaps = [Keymap("br-abnt2", "br")]
),
"ru": Language(
name = _("Russian"),
locale = "ru_RU.UTF-8",
console_font = "Cyr_a8x16",
console_translation = "8859-5",
keymaps = [Keymap("ru")]
),
"sk": Language(
name = _("Slovak"),
locale = "sk_SK.UTF-8",
keymaps = [Keymap("sk-qwerty", "sk")]
),
"sl": Language(
name = _("Slovenian"),
locale = "sl_SI.UTF-8",
keymaps = [Keymap("slovene", "si")]
),
"sr": Language(
name = _("Serbian"),
locale = "sr_CS.UTF-8",
keymaps = [Keymap("sr-cy", "rs")]
),
"sv": Language(
name = _("Swedish"),
locale = "sv_SE.UTF-8",
console_font = "lat0-16",
keymaps = [Keymap("sv-latin1", "se")]
),
"uk": Language(
name = _("Ukrainian"),
locale = "uk_UA.UTF-8",
keymaps = [Keymap("ua-utf", "ua")]
),
"vi": Language(
name = _("Vietnamese"),
locale = "vi_VN.UTF-8",
keymaps = [Keymap("us", "vn")]
)
}
This diff is collapsed.
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018, Suleyman POYRAZ (Zaryob)
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation; either version 2 of the License, or (at your option)
# any later version.
#
# Please read the COPYING file.
#
"""Basic shell utilities working on files and directories."""
import os
import shutil
import subprocess
###############################
# Directory related functions #
###############################
def remove_dir(path):
"""Remove all content of a directory."""
if os.path.exists(path):
shutil.rmtree(path)
def dir_size(dir):
"""Calculate the size of files under a directory."""
from os.path import getsize, islink, isdir, exists, join
if exists(dir) and (not isdir(dir) and not islink(dir)):
#so, this is not a directory but file..
return getsize(dir)
if islink(dir):
return int(len(os.readlink(dir)))
def sizes():
for root, dirs, files in os.walk(dir):
yield sum([getsize(join(root, name)) for name in files if not islink(join(root,name))])
yield sum([int(len(os.readlink((join(root, name))))) for name in files if islink(join(root,name))])
return sum( sizes() )
#########################
# File related funtions #
#########################
def touch(filename):
"""Update file modification date, create file if necessary"""
try:
if os.path.exists(filename):
os.utime(filename, None)
else:
open(filename, "w").close()
except IOError as e:
if e.errno != 13:
raise
else:
return False
except OSError as e:
if e.errno != 13:
raise
else:
return False
return True
########################
# Process tools #
########################
def capture(*cmd):
"""Capture output of the command without running a shell"""
a = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return a.communicate()
def run(*cmd):
"""Run a command without running a shell, only output errors"""
f = open("/dev/null", "w")
return subprocess.call(cmd, stdout=f)
def run_full(*cmd):
"""Run a command without running a shell, with full output"""
return subprocess.call(cmd)
def run_quiet(*cmd):
"""Run the command without running a shell and no output"""
f = file("/dev/null", "w")
return subprocess.call(cmd, stdout=f, stderr=f)
# -*- coding: utf-8 -*-
#
# Copyright (C) 2006-2010 TUBITAK/UEKAE
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation; either version 2 of the License, or (at your option)
# any later version.
#
# Please read the COPYING file.
#
"""string/list/functional utility functions"""
import operator
from functools import reduce
def every(pred, seq):
return reduce(operator.and_, list(map(pred, seq)), True)
def any(pred, seq):
return reduce(operator.or_, list(map(pred, seq)), False)
def unzip(seq):
return list(zip(*seq))
def concat(l):
"""Concatenate a list of lists."""
return reduce( operator.concat, l )
def strlist(l):
"""Concatenate string reps of l's elements."""
return "".join([str(x) + ' ' for x in l])
def multisplit(str, chars):
"""Split str with any of the chars."""
l = [str]
for c in chars:
l = concat([x.split(c) for x in l])
return l
def same(l):
"""Check if all elements of a sequence are equal."""
if len(l)==0:
return True
else:
last = l.pop()
for x in l:
if x!=last:
return False
return True
def prefix(a, b):
"""Check if sequence a is a prefix of sequence b."""
if len(a)>len(b):
return False
for i in range(0,len(a)):
if a[i]!=b[i]:
return False
return True
def remove_prefix(a,b):
"""Remove prefix a from sequence b."""
assert prefix(a,b)
return b[len(a):]
def human_readable_size(size = 0):
symbols, depth = [' B', 'KB', 'MB', 'GB'], 0
while size > 1000 and depth < 3:
size = float(size / 1024)
depth += 1
return size, symbols[depth]
def human_readable_rate(size = 0):
x = human_readable_size(size)
return x[0], x[1] + '/s'
def ascii_lower(str):
"""Ascii only version of string.lower()"""
trans_table = str.maketrans(str.ascii_uppercase, str.ascii_lowercase)
return str.translate(trans_table)
def ascii_upper(str):
"""Ascii only version of string.upper()"""
trans_table = str.maketrans(str.ascii_lowercase, str.ascii_uppercase)
return str.translate(trans_table)
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018, Suleyman POYRAZ (Zaryob)
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation; either version 2 of the License, or (at your option)
# any later version.
#
# Please read the COPYING file.
#
"""sysutils module provides basic system utilities."""
import os
import time
import fcntl
class FileLock:
def __init__(self, filename):
self.filename = filename
self.fd = None
def lock(self, shared=False, timeout=-1):
_type = fcntl.LOCK_EX
if shared:
_type = fcntl.LOCK_SH
if timeout != -1:
_type |= fcntl.LOCK_NB
self.fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT, 0o600)
if self.fd == -1:
raise IOError("Cannot create lock file")
while True:
try:
fcntl.flock(self.fd, _type)
return
except IOError:
if timeout > 0:
time.sleep(0.2)
timeout -= 0.2
else:
raise
def unlock(self):
fcntl.flock(self.fd, fcntl.LOCK_UN)
def find_executable(exec_name):
"""find the given executable in PATH"""
# preppend /bin, /sbin explicitly to handle system configuration
# errors
paths = ["/bin", "/sbin"]
paths.extend(os.getenv("PATH").split(':'))
for p in paths:
exec_path = os.path.join(p, exec_name)
if os.path.exists(exec_path):
return exec_path
return None
def get_kernel_option(option):
"""Get a dictionary of args for the given kernel command line option"""
args = {}
try:
cmdline = open("/proc/cmdline").read().split()
except IOError:
return args
for cmd in cmdline:
if "=" in cmd:
optName, optArgs = cmd.split("=", 1)
else:
optName = cmd
optArgs = ""
if optName == option:
for arg in optArgs.split(","):
if ":" in arg:
k, v = arg.split(":", 1)
args[k] = v
else:
args[arg] = ""
return args
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment