Source code for yardstick.benchmark.scenarios.compute.cyclictest

##############################################################################
# Copyright (c) 2015 Huawei Technologies Co.,Ltd and other.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
import pkg_resources
import logging
import json
import re
import time
import os

import yardstick.ssh as ssh
from yardstick.benchmark.scenarios import base

LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)


[docs]class Cyclictest(base.Scenario): """Execute cyclictest benchmark on guest vm Parameters affinity - run thread #N on processor #N, if possible type: int unit: na default: 1 interval - base interval of thread type: int unit: us default: 1000 loops - number of loops, 0 for endless type: int unit: na default: 1000 priority - priority of highest prio thread type: int unit: na default: 99 threads - number of threads type: int unit: na default: 1 histogram - dump a latency histogram to stdout after the run here set the max time to be tracked type: int unit: ms default: 90 Read link below for more fio args description: https://rt.wiki.kernel.org/index.php/Cyclictest """ __scenario_type__ = "Cyclictest" TARGET_SCRIPT = "cyclictest_benchmark.bash" WORKSPACE = "/root/workspace/" REBOOT_CMD_PATTERN = r";\s*reboot\b" def __init__(self, scenario_cfg, context_cfg): self.scenario_cfg = scenario_cfg self.context_cfg = context_cfg self.setup_done = False def _put_files(self, client): setup_options = self.scenario_cfg["setup_options"] rpm_dir = setup_options["rpm_dir"] script_dir = setup_options["script_dir"] image_dir = setup_options["image_dir"] LOG.debug("Send RPMs from %s to workspace %s" % (rpm_dir, self.WORKSPACE)) client.put(rpm_dir, self.WORKSPACE, recursive=True) LOG.debug("Send scripts from %s to workspace %s" % (script_dir, self.WORKSPACE)) client.put(script_dir, self.WORKSPACE, recursive=True) LOG.debug("Send guest image from %s to workspace %s" % (image_dir, self.WORKSPACE)) client.put(image_dir, self.WORKSPACE, recursive=True) def _connect_host(self): host = self.context_cfg["host"] user = host.get("user", "root") ip = host.get("ip", None) key_filename = host.get("key_filename", "~/.ssh/id_rsa") LOG.debug("user:%s, host:%s", user, ip) self.host = ssh.SSH(user, ip, key_filename=key_filename) self.host.wait(timeout=600) def _connect_guest(self): host = self.context_cfg["host"] user = host.get("user", "root") ip = host.get("ip", None) key_filename = host.get("key_filename", "~/.ssh/id_rsa") LOG.debug("user:%s, host:%s", user, ip) self.guest = ssh.SSH(user, ip, port=5555, key_filename=key_filename) self.guest.wait(timeout=600) def _run_setup_cmd(self, client, cmd): LOG.debug("Run cmd: %s" % cmd) status, stdout, stderr = client.execute(cmd) if status: if re.search(self.REBOOT_CMD_PATTERN, cmd): LOG.debug("Error on reboot") else: raise RuntimeError(stderr) def _run_host_setup_scripts(self, scripts): setup_options = self.scenario_cfg["setup_options"] script_dir = os.path.basename(setup_options["script_dir"]) for script in scripts: cmd = "cd %s/%s; export PATH=./:$PATH; %s" %\ (self.WORKSPACE, script_dir, script) self._run_setup_cmd(self.host, cmd) if re.search(self.REBOOT_CMD_PATTERN, cmd): time.sleep(3) self._connect_host() def _run_guest_setup_scripts(self, scripts): setup_options = self.scenario_cfg["setup_options"] script_dir = os.path.basename(setup_options["script_dir"]) for script in scripts: cmd = "cd %s/%s; export PATH=./:$PATH; %s" %\ (self.WORKSPACE, script_dir, script) self._run_setup_cmd(self.guest, cmd) if re.search(self.REBOOT_CMD_PATTERN, cmd): time.sleep(3) self._connect_guest()
[docs] def setup(self): '''scenario setup''' setup_options = self.scenario_cfg["setup_options"] host_setup_seqs = setup_options["host_setup_seqs"] guest_setup_seqs = setup_options["guest_setup_seqs"] self._connect_host() self._put_files(self.host) self._run_host_setup_scripts(host_setup_seqs) self._connect_guest() self._put_files(self.guest) self._run_guest_setup_scripts(guest_setup_seqs) # copy script to host self.target_script = pkg_resources.resource_filename( "yardstick.benchmark.scenarios.compute", Cyclictest.TARGET_SCRIPT) self.guest.run("cat > ~/cyclictest_benchmark.sh", stdin=open(self.target_script, "rb")) self.setup_done = True
[docs] def run(self, result): """execute the benchmark""" default_args = "-m -n -q" if not self.setup_done: self.setup() options = self.scenario_cfg["options"] affinity = options.get("affinity", 1) interval = options.get("interval", 1000) priority = options.get("priority", 99) loops = options.get("loops", 1000) threads = options.get("threads", 1) histogram = options.get("histogram", 90) cmd_args = "-a %s -i %s -p %s -l %s -t %s -h %s %s" \ % (affinity, interval, priority, loops, threads, histogram, default_args) cmd = "bash cyclictest_benchmark.sh %s" % (cmd_args) LOG.debug("Executing command: %s", cmd) status, stdout, stderr = self.guest.execute(cmd) if status: raise RuntimeError(stderr) result.update(json.loads(stdout)) if "sla" in self.scenario_cfg: sla_error = "" for t, latency in result.items(): if 'max_%s_latency' % t not in self.scenario_cfg['sla']: continue sla_latency = int(self.scenario_cfg['sla'][ 'max_%s_latency' % t]) latency = int(latency) if latency > sla_latency: sla_error += "%s latency %d > sla:max_%s_latency(%d); " % \ (t, latency, t, sla_latency) assert sla_error == "", sla_error
def _test(): # pragma: no cover '''internal test function''' key_filename = pkg_resources.resource_filename("yardstick.resources", "files/yardstick_key") ctx = { "host": { "ip": "10.229.47.137", "user": "root", "key_filename": key_filename } } logger = logging.getLogger("yardstick") logger.setLevel(logging.DEBUG) options = { "affinity": 2, "interval": 100, "priority": 88, "loops": 10000, "threads": 2, "histogram": 80 } sla = { "max_min_latency": 100, "max_avg_latency": 500, "max_max_latency": 1000, } args = { "options": options, "sla": sla } result = {} cyclictest = Cyclictest(args, ctx) cyclictest.run(result) print result if __name__ == '__main__': # pragma: no cover _test()