Commit d1806520 authored by msrba's avatar msrba
Browse files

Merge branch 'feature/python3' into 'master'

Feature/python3

See merge request !15
parents a14365d0 d7b07af0
# -*- coding: utf-8 -*-
import pika
......@@ -36,3 +35,13 @@ class ConnectionParameters(object):
self.port,
virtual_host=self.vhost)
return pika_params
def to_args(self):
args = ['--port', self.port,
'-v', self.vhost]
if self.username is not None:
args += ['-u', self.username]
if self.password is not None:
args += ['-p', self.password]
args = [str(arg) for arg in args]
return args
# -*- coding: utf-8 -*-
class ActuatorException(Exception):
pass
# -*- coding: utf-8 -*-
import logging
def configure_logger(logger):
logger.setLevel(logging.DEBUG)
def configure_logger(logger, level=logging.INFO):
logger.setLevel(level)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setLevel(level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# -*- coding: utf-8 -*-
import logging
import importlib
import os
......
# -*- coding: utf-8 -*-
import yaml
......
# -*- coding: utf-8 -*-
import json
import logging
import uuid
......@@ -102,7 +101,7 @@ class Controller(object):
@property
def agents(self):
live_agents = []
for agent_name, timestamp in self._agents.iteritems():
for agent_name, timestamp in self._agents.items():
timed_out = self._is_agent_timed_out(agent_name, timestamp)
if not timed_out:
live_agents.append(agent_name)
......
# -*- coding: utf-8 -*-
import logging
import subprocess32
import subprocess
import os.path
import actuator.core.settings as settings
......@@ -26,13 +25,13 @@ class IOcm(AbstractPlugin):
iocm_start_script_name = '{}_io_manager.py'.format(script_name)
iocm_start_path = os.path.join(iocm_base_path, iocm_start_script_name)
iocm_command = [iocm_start_path]
subprocess32.check_output(iocm_command)
subprocess.check_output(iocm_command)
def _turn_iocm_off(self):
self._call_iocm_script('stop')
def get_state(self, **kwargs):
raise ActuatorException(u'IOcm does not offer a way to read it\'s state.')
raise ActuatorException('IOcm does not offer a way to read it\'s state.')
def _get_package_name(self):
return 'iocm'
......
# -*- coding: utf-8 -*-
import logging
import re
import actuator.core.settings as settings
from actuator.core.plugins import AbstractPlugin
......@@ -12,9 +10,9 @@ logger = logging.getLogger(__name__)
class IOScheduler(AbstractPlugin):
def activate_config(self, config):
self.verify_is_root()
block_devices = settings.settings['plugins']['ioscheduler']['devices']
for block_device in block_devices:
self._activate_config_on_device(config, block_device)
device = config['device']
scheduler = config['scheduler']
self._activate_config_on_device(scheduler, device)
def _activate_config_on_device(self, config, device):
log_message = 'Changing the IO scheduler on device {} to {} '.format(device, config)
......@@ -28,11 +26,13 @@ class IOScheduler(AbstractPlugin):
scheduler_file.write(value)
def get_state(self, **kwargs):
active_scheduler = self._get_active_scheduler()
device = kwargs['device']
active_scheduler = self._get_active_scheduler(device)
return active_scheduler
def _get_active_scheduler(self):
with open('/sys/block/sda/queue/scheduler', 'r') as scheduler_file:
def _get_active_scheduler(self, device):
scheduler_path = '/sys/block/{}/queue/scheduler'.format(device)
with open(scheduler_path, 'r') as scheduler_file:
scheduler_string = scheduler_file.read()
active_scheduler_regex = r'\[([\w\d]+)\]'
result = re.search(active_scheduler_regex, scheduler_string)
......
# -*- coding: utf-8 -*-
import psutil
import subprocess32
import subprocess
import logging
from actuator.core.plugins import AbstractPlugin
......@@ -18,7 +17,7 @@ class StressngPlugin(AbstractPlugin):
logger.info(info_message)
command = [self.executable_name] + config.split()
self._add_default_timeout(command)
subprocess32.Popen(command)
subprocess.Popen(command)
def _add_default_timeout(self, command):
if '--timeout' not in command:
......
# -*- coding: utf-8 -*-
import subprocess32
import subprocess
import os
import re
......@@ -11,7 +10,7 @@ class TasksetPlugin(AbstractPlugin):
return __name__
def activate_config(self, config):
for process, affinity in config.iteritems():
for process, affinity in config.items():
pids = self._get_pids_for_name(process)
self._set_affinity_for_pids(affinity, pids)
......@@ -23,7 +22,7 @@ class TasksetPlugin(AbstractPlugin):
pid_str = str(pid)
affinity_str = str(affinity)
command = ['taskset', '-p', affinity_str, pid_str]
subprocess32.check_call(command)
subprocess.check_call(command)
def get_state(self, **kwargs):
processes = kwargs['processes']
......@@ -38,13 +37,14 @@ class TasksetPlugin(AbstractPlugin):
affinities = {}
for pid in pids:
affinity = self._get_pid_affinity(pid)
pid_str = u'{}'.format(pid)
pid_str = '{}'.format(pid)
affinities[pid_str] = affinity
return affinities
def _get_pids_for_name(self, process):
command = ['pgrep', process]
output = subprocess32.check_output(command)
output = subprocess.check_output(command)
output = output.decode('utf8')
pid_strings = output.split('\n')
pid_strings = filter(lambda entry: entry != '', pid_strings)
pids = map(lambda pid_string: int(float(pid_string)), pid_strings)
......@@ -61,7 +61,8 @@ class TasksetPlugin(AbstractPlugin):
def _get_pid_affinity(self, pid):
command = self._get_state_command(pid)
output = subprocess32.check_output(command)
output = subprocess.check_output(command)
output = output.decode('utf8')
affinity = self._parse_affinity_output(output)
return affinity
......
# -*- coding: utf-8 -*-
import json
import logging
import os
......@@ -21,6 +20,8 @@ class SlaveAgent(object):
self.config_path = config_path
self.connection_params = connection_params
self._message_dispatcher = MessageDispatcher(settings)
self._post_ack_hook = None
self.port = None
def __enter__(self):
connection_params = self.connection_params.to_pika_params()
......@@ -84,7 +85,7 @@ class SlaveAgent(object):
handler_method = getattr(self, handler_method_name)
response = handler_method()
if method not in allowed_methods:
error_message = u'Method {} is not allowed'.format(method)
error_message = 'Method {} is not allowed'.format(method)
raise ActuatorException(error_message)
return response
......@@ -101,7 +102,7 @@ class SlaveAgent(object):
sha = git.Repo('.').head.object.hexsha
response = {'type': 'get_version_response', 'sha': sha}
return response
def handle_autoupdate(self):
self._autoupdate()
response = {'type': 'autoupdate_response'}
......@@ -117,11 +118,16 @@ class SlaveAgent(object):
origin.pull()
def _restart(self):
port = self.port
if port is None:
port = 5672
config_path = self.config_path
os.execl('startslave.py', self.host, port, config_path, self.name)
args = self._create_args()
info_message = 'Restarting slave agent with followings args: {}'.format(args)
logger.info(info_message)
os.execl('startslave.py', 'startslave.py', *args)
def _create_args(self):
mandatory_args = [self.connection_params.host, self.config_path, self.name]
optional_args = self.connection_params.to_args()
args = optional_args + mandatory_args
return args
def consume(self):
self._channel.start_consuming()
......
# -*- coding: utf-8 -*-
from actuator.core.plugins import load_plugins_from_settings
from actuator.core.exceptions import ActuatorException
......
# -*- coding: utf-8 -*-
from actuator.core.plugins import AbstractPlugin
......
......@@ -3,5 +3,5 @@ import sys
if __name__ == '__main__':
print 'pid 4045\'s current affinity mask: 3'
print('pid 4045\'s current affinity mask: 3')
sys.exit(1)
# -*- coding: utf-8 -*-
import unittest
from actuator.core.connection import ConnectionParameters
......@@ -71,4 +70,37 @@ class ConnectionParametersTest(unittest.TestCase):
password = credentials.password
self.assertEqual(username, 'guest')
self.assertEqual(password, 'guest')
def test_to_args(self):
connection_params = ConnectionParameters(
username=self.username,
password=self.password,
host=self.host,
port=self.port,
vhost=self.vhost)
args = connection_params.to_args()
expected_args = ['--port', str(self.port),
'-u', self.username, '-p', self.password,
'-v', self.vhost]
self.assertSetEqual(set(args), set(expected_args))
def test_to_args_with_default_settings(self):
connection_params = ConnectionParameters(
username=None,
password=None,
host=self.host,
port=self.port)
args = connection_params.to_args()
expected_args = ['--port', str(self.port),
'-v', '/']
self.assertEqual(args, expected_args)
def test_all_args_are_strings(self):
connection_params = ConnectionParameters(
username=self.username,
password=self.password,
host=self.host,
port=self.port,
vhost=self.vhost)
connection_args = connection_params.to_args()
self.assertTrue(all(type(arg) == str for arg in connection_args))
# -*- coding: utf-8 -*-
import unittest
from unittest import mock
import json
import mock
import datetime
import time
import pika
from actuator.master.controller import Controller
from actuator.plugins import ioscheduler
from actuator.core.connection import ConnectionParameters
class ControllerTest(unittest.TestCase):
test_device = 'sda'
@mock.patch('pika.BlockingConnection')
def test_context_manager(self, connection_type_mock):
self._connection_type_mock = connection_type_mock
......@@ -58,7 +57,8 @@ class ControllerTest(unittest.TestCase):
message = {'foo': 'foo'}
response_message = json.dumps(message)
props_mock = mock.MagicMock()
props_mock.correlation_id = controller._received_messages.keys()[0]
keys = list(controller._received_messages.keys())
props_mock.correlation_id = keys[0]
controller._on_response(None, None, props_mock, response_message)
def _mock_process_data_events(self,
......@@ -72,7 +72,7 @@ class ControllerTest(unittest.TestCase):
def _test_config_response(self, config):
test_value = config['foo']
self.assertEquals(test_value, 'foo')
self.assertEqual(test_value, 'foo')
@mock.patch('pika.BlockingConnection')
def test_create_getstate_message(self, connection_type_mock):
......@@ -83,7 +83,7 @@ class ControllerTest(unittest.TestCase):
message = controller._create_getstate_message(
plugin_name, expected_kwargs)
method_name = message['method']
self.assertEquals(method_name, 'get_state')
self.assertEqual(method_name, 'get_state')
args = message['args']
self.assertListEqual(args, [plugin_name])
kwargs = message['kwargs']
......@@ -100,7 +100,7 @@ class ControllerTest(unittest.TestCase):
self._test_config_response(response)
def _choose_new_config(self):
current_config = ioscheduler.IOScheduler().get_state()
current_config = ioscheduler.IOScheduler().get_state(device=self.test_device)
config_space = ['noop', 'deadline', 'cfp']
remaining_options = set(config_space) - set([current_config])
new_config = remaining_options.pop()
......@@ -161,7 +161,8 @@ class ControllerTest(unittest.TestCase):
response = {'type': 'agent_discovery_response', 'name': agent}
response_message = json.dumps(response)
props_mock = mock.MagicMock()
props_mock.correlation_id = controller._received_messages.keys()[0]
keys = list(controller._received_messages.keys())
props_mock.correlation_id = keys[0]
controller._on_response(None, None, props_mock, response_message)
@property
......@@ -195,7 +196,7 @@ class ControllerTest(unittest.TestCase):
with Controller(self.connection_params) as controller:
body = {'plugins': test_plugins}
self._mock_process_data_events(connection_mock, controller, body)
active_plugins = controller.list_active_plugins(u'mock-agent')
active_plugins = controller.list_active_plugins('mock-agent')
connection_mock.process_data_events.assert_called()
self.assertSetEqual(set(active_plugins), set(test_plugins))
......@@ -210,9 +211,9 @@ class ControllerTest(unittest.TestCase):
}
self._mock_process_data_events(connection_mock, controller,
response_body)
version = controller.get_version(u'mock-agent')
version = controller.get_version('mock-agent')
connection_mock.process_data_events.assert_called()
self.assertEquals(version, expected_sha)
self.assertEqual(version, expected_sha)
@mock.patch('pika.BlockingConnection')
def test_autoupdate(self, connection_type_mock):
......@@ -221,6 +222,6 @@ class ControllerTest(unittest.TestCase):
response_body = {'type': 'autoupdate_response'}
self._mock_process_data_events(connection_mock, controller,
response_body)
response = controller.autoupdate(u'mock-agent')
response = controller.autoupdate('mock-agent')
connection_mock.process_data_events.assert_called()
self.assertEquals(response, response_body)
self.assertEqual(response, response_body)
# -*- coding: utf-8 -*-
import unittest
import mock
from unittest import mock
import logging
import actuator.plugins.iocm as iocm
......@@ -36,13 +35,13 @@ class IOcmTest(unittest.TestCase):
with self.assertRaises(ActuatorException):
iocm.plugin.get_state()
@mock.patch('subprocess32.check_output')
@mock.patch('subprocess.check_output')
def test_start_iocm(self, output_mock):
state = 'on'
iocm.plugin.activate_config(state)
output_mock.assert_called_with(['/var/lib/iocm/start_io_manager.py'])
@mock.patch('subprocess32.check_output')
@mock.patch('subprocess.check_output')
def test_stop_iocm(self, output_mock):
state = 'off'
iocm.plugin.activate_config(state)
......
# -*- coding: utf-8 -*-
import unittest
from unittest import mock
import re
import mock
import actuator.plugins.ioscheduler as ioscheduler
from actuator.core.exceptions import ActuatorException
......@@ -9,8 +8,11 @@ import actuator.core.plugins
class IOSchedulerTestCase(unittest.TestCase):
test_device = 'sda'
def _get_active_scheduler(self):
with open('/sys/block/sda/queue/scheduler', 'r') as scheduler_file:
scheduler_path = '/sys/block/{}/queue/scheduler'.format(self.test_device)
with open(scheduler_path, 'r') as scheduler_file:
scheduler_string = scheduler_file.read()
active_scheduler_regex = r'\[([\w\d]+)\]'
result = re.search(active_scheduler_regex, scheduler_string)
......@@ -20,16 +22,20 @@ class IOSchedulerTestCase(unittest.TestCase):
def test_is_plugin(self):
self.assertIsInstance(ioscheduler.plugin, actuator.core.plugins.AbstractPlugin)
def _create_config(self, scheduler):
scheduler_config = {'device': self.test_device,
'scheduler': scheduler}
return scheduler_config
class ActivateConfigTestCase(IOSchedulerTestCase):
settings_path = 'actuator/tests/resources/slave-settings.yaml'
class ActivateConfigTestCase(IOSchedulerTestCase):
def test_change_io_scheduler(self):
active_scheduler = self._get_active_scheduler()
target_scheduler = self._get_target_scheduler(active_scheduler)
ioscheduler.plugin.activate_config(target_scheduler)
scheduler_config = self._create_config(target_scheduler)
ioscheduler.plugin.activate_config(scheduler_config)
active_scheduler = self._get_active_scheduler()
self.assertEquals(active_scheduler, target_scheduler)
self.assertEqual(active_scheduler, target_scheduler)
def _get_target_scheduler(self, active_scheduler):
if active_scheduler == 'noop':
......@@ -50,16 +56,16 @@ class ActivateConfigTestCase(IOSchedulerTestCase):
def test_uses_devices_from_settings(self, write_mock):
active_scheduler = self._get_active_scheduler()
target_scheduler = self._get_target_scheduler(active_scheduler)
ioscheduler.plugin.activate_config(target_scheduler)
config = self._create_config(target_scheduler)
ioscheduler.plugin.activate_config(config)
active_scheduler = self._get_active_scheduler()
write_mock.assert_called()
settings_device = actuator.core.settings.settings['plugins']['ioscheduler']['devices'][0]
device_path = '/sys/block/{}/queue/scheduler'.format(settings_device)
device_path = '/sys/block/{}/queue/scheduler'.format(self.test_device)
write_mock.assert_called_once_with(device_path, target_scheduler)
class GetConfigTestCase(IOSchedulerTestCase):
def test_get_state(self):
state = ioscheduler.plugin.get_state()
state = ioscheduler.plugin.get_state(device='sda')
active_scheduler = self._get_active_scheduler()
self.assertEquals(state, active_scheduler)
self.assertEqual(state, active_scheduler)
# -*- coding: utf-8 -*-
import unittest
import mock
from unittest import mock
from contextlib import contextmanager
import actuator.core.plugins as plugins
......@@ -16,7 +15,7 @@ class PluginTest(unittest.TestCase):
plugin = plugins.load_plugin(package_name)
self.assertIsInstance(plugin, plugins.AbstractPlugin)
response = plugin.get_state()
self.assertEquals(response, 42)
self.assertEqual(response, 42)
def test_load_plugins_from_settings(self):
slave_settings = settings.load_settings(self.settings_path)
......@@ -29,13 +28,13 @@ class PluginTest(unittest.TestCase):
def test_plugin_name_to_package(self):
module_name = plugins.plugin_name_to_package('foo')
self.assertEquals(module_name, 'actuator.plugins.foo')
self.assertEqual(module_name, 'actuator.plugins.foo')
class AbstractPluginTest(unittest.TestCase):
def test_name(self):
plugin = plugins.load_plugin('actuator.tests.resources.mockplugin')
self.assertEquals(plugin.name, 'mockplugin')
self.assertEqual(plugin.name, 'mockplugin')
@contextmanager
def _test_method(self):
......
# -*- coding: utf-8 -*-
import unittest
import mock
from unittest import mock
import json
import os
import pika
import git
from actuator.slave.agent import SlaveAgent
......@@ -33,7 +31,7 @@ class SlaveAgentTest(unittest.TestCase):
def test_init_settings(self, connection_type_mock):
with SlaveAgent(self.name, self.settings, self.settings_path,
self.connection_params) as slave:
self.assertEquals(slave.name, self.name)
self.assertEqual(slave.name, self.name)
self.assertEqual(slave.settings, self.settings)
self.assertEqual(slave.config_path, self.settings_path)
self.assertEqual(slave.connection_params, self.connection_params)
......@@ -109,7 +107,7 @@ class SlaveAgentTest(unittest.TestCase):
def test_handle_message_with_ioscheduler_get_state(self,
connection_type_mock):
plugin = ioscheduler.IOScheduler()
self._test_handle_message_with_plugin_get_state(plugin)
self._test_handle_message_with_plugin_get_state(plugin, device='sda')
def _test_handle_message_with_plugin_get_state(self, plugin, **kwargs):
body = {'method': 'get_state', 'args': [plugin.name], 'kwargs': kwargs}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment