Source code for camayoc.tests.utils

# coding=utf-8
"""Utility functions."""
import contextlib
import time

from pyVmomi import vim

from camayoc import command


[docs]def get_vcenter_vms(vcenter_client): """Return a list of available vCenter vms. :param vcenter_client: A connected vCenter client which will be used to retrieve the list of all available vms. """ view = vcenter_client.content.viewManager.CreateContainerView( vcenter_client.content.rootFolder, [vim.VirtualMachine], True ) return view.view
[docs]def is_vm_powered_on(vm): """Check if a vCenter vm is powered on. A vm is considered powered on when the VMWare Tools are installed, its power state is powered on and its IP address can be fetched. """ # Be noisy if VMWare Tools is not installed. assert vm.guest.toolsStatus != "toolsNotInstalled" return all( [ vm.runtime.powerState == "poweredOn", vm.guestHeartbeatStatus == "green", vm.guest.ipAddress is not None, ] )
[docs]def power_on_vms(vms, timeout=300): """Power on the vCenter vms. For each virtual machine: ensure that the VMWare Tools are installed, they are powered on and its IP address can be fetched. """ vms_to_wait = [] for vm in vms: if not is_vm_powered_on(vm): vm.PowerOnVM_Task() vms_to_wait.append(vm) while vms_to_wait and timeout > 0: vms_to_wait = [vm for vm in vms_to_wait if not is_vm_powered_on(vm)] time.sleep(10) timeout -= 10 if vms_to_wait: assert timeout <= 0, ( f"Could not power on all {vms}, timed out waiting for " f"{vms_to_wait}" )
[docs]def power_off_vms(vms, timeout=300): """Gracefully shutdown the vCenter vms.""" vms_to_wait = [] for vm in vms: if vm.runtime.powerState != "poweredOff": vm.ShutdownGuest() vms_to_wait.append(vm) while vms_to_wait and timeout > 0: vms_to_wait = [vm for vm in vms_to_wait if vm.runtime.powerState != "poweredOff"] time.sleep(10) timeout -= 10 if vms_to_wait: assert timeout <= 0, ( f"Could not power off all {vms}, timed out waiting for " f"{vms_to_wait}" )
[docs]@contextlib.contextmanager def vcenter_vms(vms): """Ensure vCenter machines are on and will be properly off. Given a list of vCenter VM managed objects ensure all of them are on then yeild the vms list. Ensure they will be turned off before closing the context. """ power_on_vms(vms) try: yield vms finally: power_off_vms(vms)
[docs]def is_live(cmd, server, num_pings=10): """Test if server responds to ping. Returns true if server is reachable, false otherwise. """ cmd.response_handler = command.echo_handler ping = cmd.run(("ping", "-c", num_pings, server)) return ping.returncode == 0
[docs]def wait_until_live(servers, timeout=360): """Wait for servers to be live. For each server in the "servers" list, verify if it is reachable. Keep trying until a connection is made for all servers or the timeout limit is reached. If the timeout limit is reached, we exit even if there are unreached hosts. This means tests could fail with "No auths valid for this profile" if every host in the profile is unreachable. Otherwise, if there is at least one valid host, the scan will go on and only facts about reached hosts will be tested. """ system = command.System(hostname="localhost", transport="local") cmd = command.Command(system) unreached = servers while unreached and timeout > 0: unreached = [host for host in unreached if not is_live(cmd, host)] time.sleep(10) timeout -= 10