Commit 1f46814f authored by Matteo S's avatar Matteo S
Browse files

Upload code

parent c37a8cd8
cmake_minimum_required(VERSION 2.5)
project(energy-plugin C)
# Avoid 'lib' prepend
set(CMAKE_SHARED_LIBRARY_PREFIX "")
set(CMAKE_C_STANDARD 99)
# Set the output folder where your program will be create
set(CMAKE_SOURCE_DIR src)
set(CMAKE_BINARY_DIR bin)
set(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR})
set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR})
set(SLURM_ROOT /vagrant/tests/vagrant/rpmbuild/BUILD/slurm-17.11.12/)
set(SLURM_SRC ${SLURM_ROOT}/src)
set(HDF_ROOT /shared/hdf-latest/)
include_directories(${CMAKE_SOURCE_DIR}/include)
include_directories(${SLURM_ROOT})
include_directories(${HDF_ROOT}/include)
link_directories(${HDF_ROOT}/lib)
add_library(energy_reporting SHARED
${CMAKE_SOURCE_DIR}/energy_reporting.c
${CMAKE_SOURCE_DIR}/include/energy_reporting.h
${SLURM_SRC}/db_api/connection_functions.c
${SLURM_SRC}/db_api/job_functions.c
${SLURM_SRC}/common/list.c
${HDF_ROOT}/include/hdf5.h)
target_link_libraries(energy_reporting PRIVATE hdf5_hl hdf5 z m rt)
#include <stdio.h>
#include <slurm/spank.h>
#include <slurm/slurm.h>
#include <slurm/slurmdb.h>
#include <sys/mman.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/time.h>
#include <wait.h>
#include <hdf5.h>
#include "nvml.h"
#include "include/energy_reporting.h"
SPANK_PLUGIN(energy-reporting, 1)
int sampling_frequency = DEFAULT_SAMPLING_FREQUENCY;
char output_folder[MAX_NAME_LEN] = OUTPUT_FOLDER;
int flag = 0;
struct spank_option energy_plugin =
{
"energy-reporting", NULL,
"Produce energy report. Correctly works if the node is exclusively allocated.",
0, 0, (spank_opt_cb_f) _energy_reporting_set
};
struct spank_option frequency =
{
"sampling-frequency", NULL,
"Sampling frequency for GPU power consumption.",
1, 0, _sampling_freq_set
};
struct spank_option output =
{
"output-reporting", NULL,
"Output folder for the energy reporting files.",
1, 0, _output_folder_set
};
int slurm_spank_init(spank_t sp, int ac, char **av)
{
if (spank_option_register (sp, &energy_plugin) != ESPANK_SUCCESS) {
slurm_error("Argument registration failed %s.", energy_plugin.name);
}
if (spank_option_register (sp, &frequency) != ESPANK_SUCCESS) {
slurm_error("Argument registration failed %s.", frequency.name);
}
if (spank_option_register (sp, &output) != ESPANK_SUCCESS) {
slurm_error("Argument registration failed %s.", output.name);
}
return (0);
}
int slurm_spank_user_init(spank_t sp, int ac, char **av)
{
if (flag == 0)
return (0);
_redirect_output();
uint32_t jobStepID;
char hostname[MAX_NAME_LEN],
hdf_file_name[MAX_NAME_LEN];
gethostname(hostname, MAX_NAME_LEN);
_get_step_id(sp, &jobStepID);
if (jobStepID == SLURM_BATCH_SCRIPT) { // External wrapper
return (0);
} else if (jobStepID == 0) {
_get_hdf_file_path(hdf_file_name);
// Create the file
hid_t file = H5Fcreate(hdf_file_name, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT);
H5Fclose(file);
}
if (fork() == 0) {
nvmlDevice_t * device_list = NULL;
unsigned int device_count;
_start_retrieval(sp, getpid());
if (_initialize_gpus()) {
slurm_error("Unable to initialize the GPUs");
}
if (_count_gpus(&device_count)) {
slurm_error("Unable to count the GPUs");
}
device_list = (nvmlDevice_t *) malloc(device_count * sizeof(nvmlDevice_t));
if (_get_handles(device_count, device_list)) {
slurm_error("Unable to get the GPU handles");
}
// Start the collection
_power_collection(sp, device_count, device_list);
if (_shutdown_gpus()) {
slurm_error("Unable to stop the GPUs");
}
exit(EXIT_SUCCESS);
}
return (0);
}
int slurm_spank_task_exit(spank_t sp, int ac, char **av)
{
if (flag == 0)
return (0);
_redirect_output();
uint32_t jobStepID;
pid_t pid;
int status;
char hostname[MAX_NAME_LEN];
gethostname(hostname, MAX_NAME_LEN);
_get_step_id(sp, &jobStepID);
if (jobStepID == SLURM_BATCH_SCRIPT) {
_slurmdb_retrieve(sp);
return (0);
}
// Turn the retrieval flag off
_stop_retrieval(sp, &pid);
waitpid(pid, &status, 0);
_calculate_gpu_consumption(sp);
return (0);
}
void _redirect_output()
{
char output_spank[MAX_NAME_LEN],
error_spank[MAX_NAME_LEN];
snprintf(output_spank, MAX_NAME_LEN, "%s/output_spank.txt", output_folder);
snprintf(error_spank, MAX_NAME_LEN, "%s/error_spank.txt", output_folder);
freopen(output_spank, "a+", stdout);
freopen(error_spank, "a+", stderr);
}
int _power_collection(spank_t sp, unsigned int device_count, nvmlDevice_t * device_list)
{
unsigned int i, k;
char hostname[MAX_NAME_LEN],
group_names[device_count][MAX_NAME_LEN],
dataset_names[device_count][MAX_NAME_LEN],
hdf_file_name[MAX_NAME_LEN],
step_group_name[MAX_NAME_LEN];
struct timeval currentTime;
int fd;
synchro_retrieval_t * retrieve = NULL;
hid_t file, file_space, node_group, group_ids[device_count],
datasets[device_count], plist, step_group;
herr_t status;
double buffer[device_count][sampling_frequency][N_COLS];
// Sizes
hsize_t dims[N_DIMS] = {sampling_frequency, N_COLS};
hsize_t max_dims[N_DIMS] = {H5S_UNLIMITED, N_COLS};
hsize_t chunk_dims[N_DIMS] = {sampling_frequency, N_COLS};
// Dataspace for chunking
hid_t mem_space = H5Screate_simple(N_DIMS, dims, NULL);
hsize_t count[N_DIMS] = {sampling_frequency, N_COLS};
hsize_t start[N_DIMS] = {0, 0};
// Variables for rerieving
int cnt = 0;
int period_milliseconds = 1000 / sampling_frequency;
unsigned int power;
// Open shared memory
if (_open_reader(sp, &fd) != 0)
return (0);
gethostname(hostname, MAX_NAME_LEN);
_get_hdf_file_path(hdf_file_name);
file = H5Fopen(hdf_file_name, H5F_ACC_RDWR, H5P_DEFAULT);
retrieve = (synchro_retrieval_t *) mmap(0, SIZE_SHARED_MEMORY, PROT_READ, MAP_SHARED, fd, 0);
// Create the (infinite) data space for the dataset
file_space = H5Screate_simple(N_DIMS, dims, max_dims);
// Enable chunking
plist = H5Pcreate(H5P_DATASET_CREATE);
status = H5Pset_layout(plist, H5D_CHUNKED);
status = H5Pset_chunk(plist, N_DIMS, chunk_dims);
// Enable auto-creation of intermediate groups
_get_step_group_name(sp, step_group_name, hostname);
hid_t gcpl = H5Pcreate (H5P_LINK_CREATE);
status = H5Pset_create_intermediate_group (gcpl, 1);
// Create one group per GPU
for (i = 0; i < device_count; i++) {
snprintf(group_names[i], MAX_NAME_LEN, "%s/gpu%d", step_group_name, i);
group_ids[i] = H5Gcreate(file, group_names[i], gcpl, H5P_DEFAULT, H5P_DEFAULT);
}
// Create one dataset per each GPU on the node
for (i = 0; i < device_count; i++) {
snprintf(dataset_names[i], MAX_NAME_LEN, "%s/consumption", group_names[i]);
datasets[i] = H5Dcreate2(file, dataset_names[i], H5T_NATIVE_DOUBLE, file_space,
H5P_DEFAULT, plist, H5P_DEFAULT);
}
// Get starting time to be subtracted at each measurement
gettimeofday(&currentTime, NULL);
double start_time = currentTime.tv_sec + (currentTime.tv_usec / 1e6);
while (retrieve->retrieve) {
// Extend dataspace
dims[0] = sampling_frequency * (cnt + 1);
start[0] = cnt * sampling_frequency;
// Retrieve 1 second of measurements saving them on a buffer
for (i = 0; i < sampling_frequency; i++) {
for (k = 0; k < device_count; k++) {
gettimeofday(&currentTime, NULL);
buffer[k][i][0] = currentTime.tv_sec + (currentTime.tv_usec / 1e6) - start_time;
nvmlDeviceGetPowerUsage(device_list[k], &power);
buffer[k][i][1] = power / 1e3;
}
usleep(period_milliseconds * 1000);
}
// Store each second
for (k = 0; k < device_count; k++) {
H5Dset_extent(datasets[k], dims);
file_space = H5Dget_space(datasets[k]);
H5Sselect_hyperslab(file_space, H5S_SELECT_SET, start, NULL, count, NULL);
H5Dwrite(datasets[k], H5T_NATIVE_DOUBLE, mem_space, file_space, H5P_DEFAULT, buffer[k]);
}
cnt++;
}
// Close shared memory
munmap(retrieve, SIZE_SHARED_MEMORY);
_close_shared_memory(sp, fd);
// Close HDF entities
for (i = 0; i < device_count; i++) {
status = H5Gclose(group_ids[i]);
status = H5Dclose(datasets[i]);
}
status = H5Sclose(mem_space);
status = H5Pclose (plist);
status = H5Sclose(file_space);
status = H5Fclose(file);
return (0);
}
int _calculate_gpu_consumption(spank_t sp)
{
hid_t file, group;
char hostname[MAX_NAME_LEN];
char hdf_file_name[MAX_NAME_LEN];
char step_group_name[MAX_NAME_LEN];
// Open the output file
_get_hdf_file_path(hdf_file_name);
file = H5Fopen(hdf_file_name, H5F_ACC_RDONLY, H5P_DEFAULT);
// Get hostname and open the group
gethostname(hostname, MAX_NAME_LEN);
_get_step_group_name(sp, step_group_name, hostname);
group = H5Gopen(file, step_group_name, H5P_DEFAULT);
_read_data(group);
H5Fclose(file);
return 0;
}
void _read_data(hid_t gid)
{
unsigned long long i, k;
hsize_t n_obj,
dims[N_DIMS];
herr_t err;
hid_t dataset_id, space;
int obj_type;
char hostname[MAX_NAME_LEN],
gpu_name[MAX_NAME_LEN];
int n_gpus = 0;
double consumption = 0;
gethostname(hostname, MAX_NAME_LEN);
// Count the number of groups (i.e. gpus)
err = H5Gget_num_objs(gid, &n_obj);
for (i = 0; i < n_obj; i++) {
obj_type = H5Gget_objtype_by_idx(gid, (size_t) i);
if (obj_type == H5G_GROUP) {
n_gpus++;
}
}
for (i = 0; i < n_obj; i++) {
obj_type = H5Gget_objtype_by_idx(gid, (size_t) i);
if (obj_type == H5G_GROUP) {
// Search for the group name
H5Gget_objname_by_idx(gid, (hsize_t) i, gpu_name, (size_t) MAX_NAME_LEN);
// Open the dataset
hid_t gpu_group = H5Gopen2(gid, gpu_name, H5P_DEFAULT);
dataset_id = H5Dopen2(gpu_group, "consumption", H5P_DEFAULT);
// Get the dimension of the dataspace
space = H5Dget_space (dataset_id);
H5Sget_simple_extent_dims (space, dims, NULL);
// Allocate memory
double ** data = (double **) malloc (dims[0] * sizeof (double *));
data[0] = (double *) malloc (dims[0] * dims[1] * sizeof (double));
for (k = 1; k < dims[0]; k++)
data[k] = data[0] + k * dims[1];
// Read the whole dataset and save on data
err = H5Dread (dataset_id, H5T_NATIVE_DOUBLE, H5S_ALL, H5S_ALL, H5P_DEFAULT,
data[0]);
// Integrate the power consumption over time
_integrate(data, dims, &consumption);
printf("Power consumption on %s, %s = %fJ\n", hostname, gpu_name, consumption);
fflush(stdout);
H5Dclose(dataset_id);
free(data[0]);
free(data);
consumption = 0;
}
}
}
int _slurmdb_retrieve(spank_t sp)
{
uint32_t jobID;
slurmdb_job_rec_t * job = NULL;
ListIterator itr = NULL;
_get_job_id(sp, &jobID);
// Initialize variables to query slurmdb
slurmdb_job_cond_t conditions = {};
List selected_jobs = slurm_list_create(slurmdb_destroy_selected_step);
// Select Job
slurmdb_selected_step_t current_job = {NO_VAL, jobID, NO_VAL, NO_VAL};
slurm_list_append(selected_jobs, &current_job);
conditions.step_list = selected_jobs;
// Open connection to the database
void * connection = slurmdb_connection_get();
List jobs = slurmdb_jobs_get(connection, &conditions);
// Iterate over the results
itr = slurm_list_iterator_create(jobs);
while ((job = slurm_list_next(itr))) {
printf("Jobid: %d, consumed energy: %lx\n", job->jobid, job->stats.consumed_energy);
}
fflush(stdout);
slurm_list_destroy(jobs);
slurm_list_iterator_destroy(itr);
slurmdb_connection_close(&connection);
return (0);
}
int _start_retrieval(spank_t sp, pid_t pid)
{
int fd;
_open_writer(&fd, sp);
synchro_retrieval_t * retrieve = (synchro_retrieval_t *) mmap(0, SIZE_SHARED_MEMORY, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
retrieve->retrieve = 1;
retrieve->pid = pid;
munmap(retrieve, SIZE_SHARED_MEMORY);
close(fd);
return (0);
}
int _stop_retrieval(spank_t sp, pid_t * pid)
{
int fd;
_open_writer(&fd, sp);
synchro_retrieval_t * retrieve = (synchro_retrieval_t *) mmap(0, SIZE_SHARED_MEMORY,
PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
* pid = retrieve->pid;
retrieve->retrieve = 0;
munmap(retrieve, SIZE_SHARED_MEMORY);
close(fd);
return (0);
}
int _open_writer(int * fd, spank_t sp)
{
char memory_name[MAX_NAME_LEN];
_get_memory_name(sp, memory_name);
* fd = shm_open(memory_name,O_CREAT | O_RDWR, 0600);
if (* fd < 0) {
perror("Error opening the shared memory (writer)");
return EXIT_FAILURE;
}
ftruncate(* fd, SIZE_SHARED_MEMORY);
return (EXIT_SUCCESS);
}
int _open_reader(spank_t sp, int * fd)
{
char memory_name[MAX_NAME_LEN];
_get_memory_name(sp, memory_name);
* fd = shm_open(memory_name, O_RDONLY, 0666);
if (* fd < 0) {
return (EXIT_FAILURE);
}
return (EXIT_SUCCESS);
}
void _close_shared_memory(spank_t sp, int fd)
{
char memory_name[MAX_NAME_LEN];
_get_memory_name(sp, memory_name);
close(fd);
shm_unlink(memory_name);
}
void _integrate(double ** data, const hsize_t * dims, double * consumption)
{
unsigned long long m;
char hostname[MAX_NAME_LEN];
gethostname(hostname, MAX_NAME_LEN);
for (m = 1; m < dims[0]; m++) // Divide by 2 inside to increase the precision
* consumption += (data[m][0] - data[m - 1][0]) * (data[m][1] + data[m - 1][1]) / 2;
}
int _get_memory_name(spank_t sp, char * memory_name)
{
uint32_t jobID;
uint32_t jobStepID;
_get_job_id(sp, &jobID);
_get_step_id(sp, &jobStepID);
fflush(stdout);
snprintf(memory_name, 100, "%s_%d_%d", NAME, jobID, jobStepID);
return (0);
}
void _get_hdf_file_path(char * file)
{
char hostname[MAX_NAME_LEN];
gethostname(hostname, MAX_NAME_LEN);
snprintf(file, MAX_NAME_LEN, "%s/energy_consumption_%s.h5", output_folder, hostname);
}
void _get_step_group_name(spank_t sp, char * name, char * hostname)
{
uint32_t jobStepID;
_get_step_id(sp, &jobStepID);
snprintf(name, MAX_NAME_LEN, "/%s/step%d", hostname, jobStepID);
}
int _get_job_id(spank_t sp, uint32_t * jobID)
{
if (spank_get_item (sp, S_JOB_ID, jobID) != ESPANK_SUCCESS) {
slurm_error("Error retrieving the job ID.");
return (ESPANK_ERROR);
}
return (ESPANK_SUCCESS);
}
int _get_step_id(spank_t sp, uint32_t * jobStepID)
{
if (spank_get_item (sp, S_JOB_STEPID, jobStepID) != ESPANK_SUCCESS ) {
slurm_error("Error retrieving the job ID.");
return (ESPANK_ERROR);
}
return (ESPANK_SUCCESS);
}
static int _energy_reporting_set(int val,
const char *optarg,
int remote)
{
flag = 1;
return (0);
}
static int _sampling_freq_set(int val, const char *optarg, int remote)
{
sampling_frequency = atoi(optarg);
return (0);
}
static int _output_folder_set(int val, const char *optarg, int remote)
{
strcpy(output_folder, optarg);
return (0);
}
int _initialize_gpus()
{
// First initialize NVML library
nvmlReturn_t result = nvmlInit();
if (result != NVML_SUCCESS) {
printf("Failed to initialize NVML: %s\n", nvmlErrorString(result));
return (1);
}
return (0);
}
int _count_gpus(unsigned int *device_count)
{
// Count the number of devices on the node
nvmlReturn_t result = nvmlDeviceGetCount(device_count);
if (result != NVML_SUCCESS) {
printf("Failed to query device count: %s\n", nvmlErrorString(result));
return (1);
}
return (0);
}
int _get_handles(unsigned int device_count, nvmlDevice_t * device_list)
{
nvmlReturn_t result;
for (unsigned int i = 0; i < device_count; i++) {
result = nvmlDeviceGetHandleByIndex(i, &device_list[i]);
if (result != NVML_SUCCESS) {
printf("Failed to get handle for device %u: %s\n", i, nvmlErrorString(result));
return (1);
}
}
return (0);
}
int _shutdown_gpus()
{
nvmlReturn_t result = nvmlShutdown();
if (result != NVML_SUCCESS) {
printf("Failed to shutdown NVML: %s\n", nvmlErrorString(result));
return (1);
}
return (0);
}
\ No newline at end of file
#define MAX_NAME_LEN 1000
#define DEFAULT_SAMPLING_FREQUENCY 30
// Default output folder
#define OUTPUT_FOLDER "/tmp"
// Shared memory
#define NAME "/retrieve"
#define SIZE_SHARED_MEMORY (sizeof(synchro_retrieval_t))
// HDF
#define N_DIMS 2
#define N_COLS 2
typedef struct {
int retrieve;
pid_t pid;
} synchro_retrieval_t;
static int _energy_reporting_set(int val,
const char *optarg,
int remote);
int _power_collection (spank_t sp, unsigned int device_count, nvmlDevice_t * device_list);
int _get_memory_name(spank_t sp, char * memory_name);
int _open_writer(int * fd, spank_t sp);
int _open_reader(spank_t sp, int * fd);
void _close_shared_memory(spank_t sp, int fd);
int _start_retrieval(spank_t sp, pid_t pid);
int _stop_retrieval(spank_t sp, pid_t * pid);
void _integrate(double ** data, const hsize_t * dims, double * consumption);
void _read_data(hid_t gid);
int _calculate_gpu_consumption(spank_t sp);
void _redirect_output();
void _get_hdf_file_path(char * file);
int _slurmdb_retrieve(spank_t sp);
static int _sampling_freq_set(int val, const char *optarg, int remote);
void _get_step_group_name(spank_t sp, char * name, char * hostname);
int _get_step_id(spank_t sp, uint32_t * jobStepID);
int _get_job_id(spank_t sp, uint32_t * jobID);
static int _output_folder_set(int val, const char *optarg, int remote);
int _initialize_gpus ();
int _count_gpus (unsigned int *device_count);
int _get_handles (unsigned int device_count, nvmlDevice_t * device_list);
int _shutdown_gpus ();
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment