implement usage of native cgroups instead of systemd-run

* allow server to be started w\o daemon
* handle process childs
This commit is contained in:
Evgenii Alekseev 2017-11-10 01:33:23 +03:00
parent 2bd72344ba
commit 3717d1426e
12 changed files with 228 additions and 212 deletions

View File

@ -18,6 +18,8 @@
#include <queued/Queued.h>
#include <QCoreApplication>
#include "QueuedTcpServer.h"
@ -43,6 +45,15 @@ QueuedServer::~QueuedServer()
void QueuedServer::init()
{
while (QueuedCoreAdaptor::getStatus().type() != Result::Content::Value) {
qCWarning(LOG_SERV)
<< "Daemon seems to be unavailable wait" << WAIT_FOR_DAEMON;
QTime timer = QTime::currentTime().addMSecs(WAIT_FOR_DAEMON);
while (QTime::currentTime() < timer)
QCoreApplication::processEvents(QEventLoop::AllEvents, 100);
}
m_server->init(QueuedCoreAdaptor::getOption(
QueuedConfig::QueuedSettings::ServerTimeout)
.get()

View File

@ -28,6 +28,8 @@ class QueuedServer : public QObject
Q_OBJECT
public:
static const long WAIT_FOR_DAEMON = 30000;
explicit QueuedServer(QObject *parent, const QVariantHash &args);
virtual ~QueuedServer();
void init();

View File

@ -44,10 +44,26 @@ class QueuedControlGroupsAdaptor : public QObject
public:
// constants
/**
* @brief name of file contains cpu limit
*/
const char *CG_CPU_LIMIT = "cpu.cfs_quota_us";
/**
* @brief name of file contains memory limit
*/
const char *CG_MEMORY_LIMIT = "memory.limit_in_bytes";
/**
* @brief name of file contains notify status
*/
const char *CG_NOTIFY_ON_RELEASE_FILE = "notify_on_release";
/**
* @brief name of file contains processes list
*/
const char *CG_PROC_FILE = "cgroup.procs";
/**
* @brief name of file contains release command
*/
const char *CG_RELEASE_FILE = "release_agent";
/**
* @brief QueuedControlGroupsAdaptor class constructor
@ -56,11 +72,18 @@ public:
* @param _name
* control group name
*/
explicit QueuedControlGroupsAdaptor(QObject *_parent, QString &_name);
explicit QueuedControlGroupsAdaptor(QObject *_parent, QString _name);
/**
* @brief QueuedControlGroupsAdaptor class destructor
*/
virtual ~QueuedControlGroupsAdaptor();
/**
* @brief build group path for specific base directory
* @param _base
* full path to base directory
* @return full path to group properties
*/
QString groupPath(const QString &_base) const;
// static properties
/**
* @brief paths to all control directories

View File

@ -30,25 +30,24 @@
#include "QueuedLimits.h"
class QueuedControlGroupsAdaptor;
/**
* @brief implementation over QProcess to run processes
*/
class QueuedProcess : public QProcess
{
Q_OBJECT
Q_PROPERTY(QList<Q_PID> childrenPids READ childrenPids)
Q_PROPERTY(long long index READ index)
Q_PROPERTY(QString name READ name)
// mutable properties
Q_PROPERTY(QString command READ command WRITE setCommand)
Q_PROPERTY(QStringList commandArguments READ commandArguments WRITE
setCommandArguments)
Q_PROPERTY(QDateTime endTime READ endTime WRITE setEndTime)
Q_PROPERTY(uint gid READ uid WRITE setGid)
Q_PROPERTY(QString limits READ limits WRITE setLimits)
Q_PROPERTY(QString logError READ logError WRITE setLogError)
Q_PROPERTY(QString logOutput READ logOutput WRITE setLogOutput)
Q_PROPERTY(uint nice READ nice WRITE setNice)
Q_PROPERTY(QString processLine READ processLine WRITE setProcessLine)
Q_PROPERTY(QDateTime startTime READ startTime WRITE setStartTime)
Q_PROPERTY(uint uid READ uid WRITE setUid)
Q_PROPERTY(long long user READ user WRITE setUser)
@ -109,10 +108,19 @@ public:
*/
virtual ~QueuedProcess();
/**
* @brief update command arguments
* @brief force kill ald children
*/
void updateArguments();
void killChildren();
/**
* @brief update cgroup limits
*/
void updateLimits();
// properties
/**
* @brief children processes
* @return list of pids of children processes
*/
QList<Q_PID> childrenPids() const;
/**
* @brief index of process
* @return assigned index of process
@ -124,16 +132,6 @@ public:
*/
QString name() const;
// mutable properties
/**
* @brief command line
* @return process command line
*/
QString command() const;
/**
* @brief command line arguments
* @return process command line arguments
*/
QStringList commandArguments() const;
/**
* @brief process end time
* @return process end time
@ -169,11 +167,6 @@ public:
* @return process nice
*/
uint nice() const;
/**
* @brief process line
* @return process line as is in configuration
*/
QString processLine() const;
/**
* @brief process start time
* @return process start time
@ -194,17 +187,6 @@ public:
* @return process working directory
*/
QString workDirectory() const;
/**
* @brief set command line
* @param _command new command line
*/
void setCommand(const QString &_command);
/**
* @brief set command line arguments
* @param _commandArguments
* new command line arguments
*/
void setCommandArguments(const QStringList &_commandArguments);
/**
* @brief set end time
* @param _time
@ -237,17 +219,6 @@ public:
* new process nice
*/
void setNice(const uint _nice);
/**
* @brief set process line
* @param _processLine
* original process line
* @remark values in {} will be replaced
* 1. Property names, like {name}, {uid}, etc
* 2. {cpu} will be replaced to QueuedSystemInfo::cpuWeight(limit) in %
* 3. {memory} will be replaced to limit
* 4. {application} will be replaced to application line and arguments
*/
void setProcessLine(const QString &_processLine);
/**
* @brief set start time
* @param _time
@ -280,7 +251,20 @@ public:
*/
bool operator==(const QueuedProcess &_other);
public slots:
/**
* @brief method which will be called to apply cgroup after start
*/
void applyCGroup();
protected:
/**
* @brief apply child process properties
*/
void setupChildProcess();
private:
QueuedControlGroupsAdaptor *m_cgroup = nullptr;
/**
* @brief process definitions
*/
@ -289,10 +273,6 @@ private:
* @brief index of process
*/
long long m_index = -1;
/**
* @brief process line to launch
*/
QString m_processLine;
};

View File

@ -41,7 +41,6 @@ class QueuedProcessManager : public QObject
{
Q_OBJECT
Q_PROPERTY(QueuedEnums::ExitAction onExit READ onExit WRITE setExitAction)
Q_PROPERTY(QString processLine READ processLine WRITE setProcessLine)
public:
/**
@ -132,23 +131,12 @@ public:
* @return default action from possible ones
*/
QueuedEnums::ExitAction onExit() const;
/**
* @brief process command line
* @return current command line
*/
QString processLine() const;
/**
* @brief set on exit action
* @param _action
* new on exit action
*/
void setExitAction(const QueuedEnums::ExitAction _action);
/**
* @brief set command line
* @param _processLine
* new command line
*/
void setProcessLine(const QString _processLine);
/**
* @brief get used limits
* @return used system limits
@ -200,10 +188,6 @@ private:
* @brief processes list
*/
QueuedProcessMap m_processes;
/**
* @brief command line
*/
QString m_processLine;
};

View File

@ -95,8 +95,6 @@ typedef struct {
* on queued exit action enum
* @var QueuedSettings::Plugins
* plugin list
* @var QueuedSettings::ProcessCommandLine
* control process command line
* @var QueuedSettings::ServerAddress
* queued server bind address
* @var QueuedSettings::ServerMaxConnections
@ -117,7 +115,6 @@ enum class QueuedSettings {
KeepUsers,
OnExitAction,
Plugins,
ProcessCommandLine,
ServerAddress,
ServerMaxConnections,
ServerPort,
@ -154,10 +151,6 @@ static const QueuedSettingsDefaultMap QueuedSettingsDefaults = {
{"KeepUsers", {QueuedSettings::KeepUsers, 0}},
{"OnExitAction", {QueuedSettings::OnExitAction, 2}},
{"Plugins", {QueuedSettings::Plugins, ""}},
{"ProcessCommandLine",
{QueuedSettings::ProcessCommandLine,
"systemd-run\n--scope\n--unit={name}\n--uid={uid}\n--gid={gid}"
"\n-p\nCPUQuota={cpu}%\n-p\nMemoryHigh={memory}\n{application}"}},
{"ServerAddress", {QueuedSettings::ServerAddress, ""}},
{"ServerMaxConnections", {QueuedSettings::ServerMaxConnections, 30}},
{"ServerPort", {QueuedSettings::ServerPort, 8080}},

View File

@ -33,12 +33,12 @@
* @fn QueuedControlGroupsAdaptor
*/
QueuedControlGroupsAdaptor::QueuedControlGroupsAdaptor(QObject *_parent,
QString &_name)
QString _name)
: QObject(_parent)
, m_name(_name)
{
qCDebug(LOG_LIB) << __PRETTY_FUNCTION__;
m_name = std::move(_name);
createGroup();
}
@ -54,6 +54,17 @@ QueuedControlGroupsAdaptor::~QueuedControlGroupsAdaptor()
}
/**
* @fn groupPath
*/
QString QueuedControlGroupsAdaptor::groupPath(const QString &_base) const
{
qCDebug(LOG_LIB) << "Get group path for base" << _base;
return QDir(_base).filePath(name());
}
/**
* @fn controlPaths
*/
@ -86,7 +97,7 @@ QString QueuedControlGroupsAdaptor::memoryPath()
*/
long long QueuedControlGroupsAdaptor::cpuLimit() const
{
QFile file(QDir(cpuPath()).filePath("cpu.cfs_quota_us"));
QFile file(QDir(groupPath(cpuPath())).filePath(CG_CPU_LIMIT));
long long limit = 0;
if (file.open(QIODevice::ReadOnly | QFile::Text)) {
@ -107,7 +118,7 @@ long long QueuedControlGroupsAdaptor::cpuLimit() const
*/
long long QueuedControlGroupsAdaptor::memoryLimit() const
{
QFile file(QDir(memoryPath()).filePath("memory.limit_in_bytes"));
QFile file(QDir(groupPath(memoryPath())).filePath(CG_MEMORY_LIMIT));
long long limit = 0;
if (file.open(QIODevice::ReadOnly | QFile::Text)) {
@ -139,11 +150,12 @@ void QueuedControlGroupsAdaptor::setCpuLimit(const long long _value)
{
qCDebug(LOG_LIB) << "Set new CPU limit to" << _value;
QFile file(QDir(cpuPath()).filePath("cpu.cfs_quota_us"));
QFile file(QDir(groupPath(cpuPath())).filePath(CG_CPU_LIMIT));
if (file.open(QIODevice::ReadWrite)) {
if (file.open(QIODevice::WriteOnly)) {
QTextStream stream(&file);
stream << _value;
stream.flush();
} else {
qCCritical(LOG_LIB)
<< "Could not set CPU limit" << name() << "to" << _value;
@ -160,17 +172,20 @@ void QueuedControlGroupsAdaptor::setMemoryLimit(const long long _value)
{
qCDebug(LOG_LIB) << "Set new memory limit to" << _value;
QFile file(QDir(cpuPath()).filePath("memory.limit_in_bytes"));
QFile file(QDir(groupPath(memoryPath())).filePath(CG_MEMORY_LIMIT));
if (file.open(QIODevice::ReadWrite)) {
if (file.open(QIODevice::WriteOnly)) {
QTextStream stream(&file);
stream << _value;
stream.flush();
} else {
qCCritical(LOG_LIB)
<< "Could not set memory limit" << name() << "to" << _value;
return;
}
file.close();
Q_ASSERT(_value == memoryLimit());
}
@ -182,7 +197,7 @@ bool QueuedControlGroupsAdaptor::addProcess(const uint _pid)
qCDebug(LOG_LIB) << "Assign add process" << _pid;
for (auto &path : controlPaths()) {
auto proc = QDir(QDir(path).filePath(name())).filePath(CG_PROC_FILE);
auto proc = QDir(groupPath(path)).filePath(CG_PROC_FILE);
QFile file(proc);
if (file.open(QIODevice::ReadWrite)) {
@ -210,9 +225,44 @@ bool QueuedControlGroupsAdaptor::createGroup()
auto paths = controlPaths();
return std::all_of(
// create cgroups
bool status = std::all_of(
paths.cbegin(), paths.cend(),
[this](const QString &path) { return QDir(path).mkpath(name()); });
// apply settings
status &= std::all_of(
paths.cbegin(), paths.cend(), [this](const QString &path) {
auto notify
= QDir(groupPath(path)).filePath(CG_NOTIFY_ON_RELEASE_FILE);
QFile file(notify);
if (file.open(QIODevice::WriteOnly)) {
QTextStream stream(&file);
stream << 1;
} else {
qCCritical(LOG_LIB)
<< "Could not apply rules to" << CG_NOTIFY_ON_RELEASE_FILE;
return false;
}
return true;
});
status &= std::all_of(
paths.cbegin(), paths.cend(), [this](const QString &path) {
auto agent = QDir(groupPath(path)).filePath(CG_RELEASE_FILE);
QFile file(agent);
if (file.open(QIODevice::WriteOnly)) {
QTextStream stream(&file);
stream
<< QString("rmdir \"%1\"").arg(QDir(path).filePath(name()));
} else {
qCCritical(LOG_LIB)
<< "Could not apply rules to" << CG_RELEASE_FILE;
return false;
}
return true;
});
return status;
}
@ -226,7 +276,6 @@ bool QueuedControlGroupsAdaptor::removeGroup()
auto paths = controlPaths();
return std::all_of(
paths.cbegin(), paths.cend(), [this](const QString &path) {
return QDir(QDir(path).filePath(name())).removeRecursively();
});
paths.cbegin(), paths.cend(),
[this](const QString &path) { return QDir(path).rmdir(name()); });
}

View File

@ -123,13 +123,8 @@ void QueuedCorePrivate::initProcesses()
auto onExitAction = static_cast<QueuedEnums::ExitAction>(
m_advancedSettings->get(QueuedConfig::QueuedSettings::OnExitAction)
.toInt());
auto processLine
= m_advancedSettings
->get(QueuedConfig::QueuedSettings::ProcessCommandLine)
.toString();
m_processes = m_helper->initObject(m_processes);
m_processes->setProcessLine(processLine);
m_processes->setExitAction(onExitAction);
auto dbProcesses
= m_database->get(QueuedDB::TASKS_TABLE, "WHERE endTime IS NULL");

View File

@ -70,9 +70,6 @@ void QueuedCorePrivate::updateSettings(const QueuedConfig::QueuedSettings _id,
case QueuedConfig::QueuedSettings::Plugins:
// do nothing here
break;
case QueuedConfig::QueuedSettings::ProcessCommandLine:
m_processes->setProcessLine(_value.toString());
break;
case QueuedConfig::QueuedSettings::ServerAddress:
case QueuedConfig::QueuedSettings::ServerMaxConnections:
case QueuedConfig::QueuedSettings::ServerPort:

View File

@ -23,9 +23,17 @@
#include <queued/Queued.h>
#include <QDir>
#include <QMetaProperty>
#include <QStandardPaths>
#include <cmath>
#include <csignal>
extern "C" {
#include <unistd.h>
}
/**
* @class QueuedProcess
@ -44,10 +52,16 @@ QueuedProcess::QueuedProcess(QObject *_parent,
qRegisterMetaType<QueuedLimits::Limits>("QueuedLimits::Limits");
m_cgroup = new QueuedControlGroupsAdaptor(this, name());
// update QProcess related values as well
setCommand(m_definitions.command);
setCommandArguments(m_definitions.arguments);
setProgram(m_definitions.command);
setArguments(m_definitions.arguments);
setWorkDirectory(m_definitions.workingDirectory);
updateLimits();
connect(this, SIGNAL(started()), this, SLOT(applyCGroup()));
}
@ -61,43 +75,65 @@ QueuedProcess::~QueuedProcess()
/**
* @fn updateArguments
* @fn killChildren
*/
void QueuedProcess::updateArguments()
void QueuedProcess::killChildren()
{
QString application = processLine();
auto pids = childrenPids();
qCInfo(LOG_LIB) << "Found children pids" << pids;
// replace generic properties first
auto meta = metaObject();
int count = meta->propertyCount();
for (int i = 0; i < count; i++) {
QMetaProperty prop = meta->property(i);
auto name = prop.name();
// replace string now
application.replace(QString("{%1}").arg(name),
property(name).toString());
for (auto pid : pids) {
if (::kill(pid, SIGTERM) != 0) {
qCWarning(LOG_LIB) << "SIGTERM failed, trying to kill";
::kill(pid, SIGKILL);
}
}
}
// replace limits now
application.replace(
"{cpu}", QString("%1").arg(
QueuedSystemInfo::cpuWeight(nativeLimits().cpu) * 100.0, 0,
'f', 0));
application.replace(
"{memory}",
QString("%1").arg(QueuedSystemInfo::memoryWeight(nativeLimits().memory)
* QueuedSystemInfo::memoryCount(),
0, 'f', 0));
// command line
QString commandLine = command() + "\n" + commandArguments().join('\n');
application.replace("{application}", commandLine);
/**
* @fn updateLimits
*/
void QueuedProcess::updateLimits()
{
auto nl = nativeLimits();
QStringList arguments = application.split('\n');
m_cgroup->setCpuLimit(
std::llround(QueuedSystemInfo::cpuWeight(nl.cpu) * 100.0));
m_cgroup->setMemoryLimit(
std::llround(QueuedSystemInfo::memoryWeight(nl.memory)
* QueuedSystemInfo::memoryCount()));
}
// set QProcess properties
setProgram(arguments.takeFirst());
setArguments(arguments);
/**
* @fn childrenPids
*/
QList<Q_PID> QueuedProcess::childrenPids() const
{
QStringList allDirectories = QDir("/proc").entryList(
QDir::Dirs | QDir::NoDotAndDotDot, QDir::Name);
QStringList directories = allDirectories.filter(QRegExp("(\\d+)"));
QList<Q_PID> pids = std::accumulate(
directories.cbegin(), directories.cend(), QList<Q_PID>(),
[this](QList<Q_PID> &list, const QString &dir) {
QFile statFile(QString("/proc/%1/stat").arg(dir));
if (!statFile.open(QIODevice::ReadOnly | QIODevice::Text))
return list;
QString output = statFile.readAll();
output.remove(QRegExp("\\d+ \\(.*\\) . "));
Q_PID ppid = output.split(' ').first().toLongLong();
if (ppid == pid())
list.append(dir.toLongLong());
statFile.close();
return list;
});
return pids;
}
@ -119,24 +155,6 @@ QString QueuedProcess::name() const
}
/**
* @fn command
*/
QString QueuedProcess::command() const
{
return m_definitions.command;
}
/**
* @fn commandArguments
*/
QStringList QueuedProcess::commandArguments() const
{
return m_definitions.arguments;
}
/**
* @fn endTime
*/
@ -200,15 +218,6 @@ uint QueuedProcess::nice() const
}
/**
* @fn processLine
*/
QString QueuedProcess::processLine() const
{
return m_processLine;
}
/**
* @fn startTime
*/
@ -245,30 +254,6 @@ QString QueuedProcess::workDirectory() const
}
/**
* @fn setCommand
*/
void QueuedProcess::setCommand(const QString &_command)
{
qCDebug(LOG_LIB) << "Set command to" << _command;
m_definitions.command = _command;
updateArguments();
}
/**
* @fn setCommandArguments
*/
void QueuedProcess::setCommandArguments(const QStringList &_commandArguments)
{
qCDebug(LOG_LIB) << "Set command line arguments to" << _commandArguments;
m_definitions.arguments = _commandArguments;
updateArguments();
}
/**
* @fn setEndTime
*/
@ -288,7 +273,6 @@ void QueuedProcess::setGid(const uint _gid)
qCDebug(LOG_LIB) << "Set process GID to" << _gid;
m_definitions.gid = _gid;
updateArguments();
}
@ -300,7 +284,7 @@ void QueuedProcess::setLimits(const QString &_limits)
qCDebug(LOG_LIB) << "Set process limits" << _limits;
m_definitions.limits = _limits;
updateArguments();
updateLimits();
}
@ -333,18 +317,6 @@ void QueuedProcess::setNice(const uint _nice)
}
/**
* @fn setProcessLine
*/
void QueuedProcess::setProcessLine(const QString &_processLine)
{
qCDebug(LOG_LIB) << "Set process line to" << _processLine;
m_processLine = _processLine;
updateArguments();
}
/**
* @fn setStartTime
*/
@ -364,7 +336,6 @@ void QueuedProcess::setUid(const uint _uid)
qCDebug(LOG_LIB) << "Set process UID to" << _uid;
m_definitions.uid = _uid;
updateArguments();
}
@ -404,3 +375,23 @@ bool QueuedProcess::operator==(const QueuedProcess &_other)
{
return name() == _other.name();
}
/**
* applyCGroup
*/
void QueuedProcess::applyCGroup()
{
m_cgroup->addProcess(pid());
}
/**
* @fn setupChildProcess
*/
void QueuedProcess::setupChildProcess()
{
::setuid(m_definitions.uid);
::setgid(m_definitions.gid);
return QProcess::setupChildProcess();
}

View File

@ -23,7 +23,10 @@
#include <queued/Queued.h>
#include <csignal>
extern "C" {
#include <sys/prctl.h>
#include <unistd.h>
}
@ -97,7 +100,6 @@ QueuedProcess *QueuedProcessManager::add(
return process(_index);
auto *process = new QueuedProcess(this, _definitions, _index);
process->setProcessLine(processLine());
m_processes[_index] = process;
// connect to signal
m_connections[_index] = connect(
@ -249,6 +251,7 @@ void QueuedProcessManager::stop(const long long _index)
return;
}
pr->killChildren();
switch (onExit()) {
case QueuedEnums::ExitAction::Kill:
pr->kill();
@ -269,15 +272,6 @@ QueuedEnums::ExitAction QueuedProcessManager::onExit() const
}
/**
* @fn processLine
*/
QString QueuedProcessManager::processLine() const
{
return m_processLine;
}
/**
* @fn setExitAction
*/
@ -286,19 +280,16 @@ void QueuedProcessManager::setExitAction(const QueuedEnums::ExitAction _action)
qCDebug(LOG_LIB) << "New action on exit" << static_cast<int>(_action);
m_onExit = _action;
}
/**
* @fn setProcessLine
*/
void QueuedProcessManager::setProcessLine(const QString _processLine)
{
qCDebug(LOG_LIB) << "Set process line to" << _processLine;
m_processLine = _processLine;
for (auto process : processes().values())
process->setProcessLine(processLine());
// update child signal handler
switch (onExit()) {
case QueuedEnums::ExitAction::Kill:
prctl(PR_SET_PDEATHSIG, SIGKILL);
break;
case QueuedEnums::ExitAction::Terminate:
prctl(PR_SET_PDEATHSIG, SIGTERM);
break;
}
}

View File

@ -340,7 +340,7 @@ QueuedctlCommon::QueuedctlResult QueuedctlTask::stopTask(const long long _id,
{
qCDebug(LOG_APP) << "Stop task" << _id;
auto res = QueuedCoreAdaptor::sendTaskStart(_id, _token);
auto res = QueuedCoreAdaptor::sendTaskStop(_id, _token);
QueuedctlCommon::QueuedctlResult output;
res.match([&output](const bool val) { output.status = val; },