Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

performance/multi-node: use fi_inject_write. #59

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 98 additions & 3 deletions performance/multi-node/rdm_one_sided.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ void print_usage(void)
ct_print_opts_usage("-l <loops>", "number of loops to measure");
ct_print_opts_usage("-s <skip>", "number of loops to skip");
ct_print_opts_usage("-i <iterations>", "iterations per loop");
ct_print_opts_usage("-n", "use fi_inject_write");
ct_print_std_usage();
}
}
Expand Down Expand Up @@ -534,6 +535,85 @@ void *thread_fn(void *data)
return NULL;
}

void *thread_fn_inject(void *data)
{
int i, j, peer;
int size;
ssize_t __attribute__((unused)) fi_rc;
struct per_thread_data *ptd;
struct per_iteration_data it;
uint64_t t_start = 0, t_end = 0;

it.data = data;
size = it.message_size;

if (it.thread_id >= tunables.threads)
return (void *)-EINVAL;

ptd = &thread_data[it.thread_id];
ptd->bytes_sent = 0;

ct_tbarrier(&ptd->tbar);

if (myid == 0) {
peer = 1;

for (i = 0; i < loop + skip; i++) {
if (i == skip) { /* warm up loop */
t_start = get_time_usec();
ptd->bytes_sent = 0;
}

for (j = 0; j < window_size; j++) {
fi_rc = fi_inject_write(ptd->ep,
ptd->s_buf,
size,
ptd->fi_addrs[peer],
ptd->rbuf_descs[peer].addr,
ptd->rbuf_descs[peer].key);
assert(fi_rc==FI_SUCCESS);
ptd->bytes_sent += size;
}
}

fi_rc = fi_send(ptd->ep, ptd->s_buf, 4, NULL,
ptd->fi_addrs[peer],
NULL);
assert(!fi_rc);
wait_for_comp(ptd->scq, 1);

fi_rc = fi_recv(ptd->ep, ptd->s_buf, 4, NULL,
ptd->fi_addrs[peer],
NULL);
assert(!fi_rc);
wait_for_comp(ptd->rcq, 1);

t_end = get_time_usec();
} else if (myid == 1) {
peer = 0;

fi_rc = fi_recv(ptd->ep, ptd->s_buf, 4, NULL,
ptd->fi_addrs[peer],
NULL);
assert(!fi_rc);
wait_for_comp(ptd->rcq, 1);

fi_rc = fi_send(ptd->ep, ptd->s_buf, 4, NULL,
ptd->fi_addrs[peer],
NULL);
assert(!fi_rc);
wait_for_comp(ptd->scq, 1);
}

ct_tbarrier(&ptd->tbar);

ptd->latency = (t_end - t_start) / (double)(loop * window_size);
ptd->time_start = t_start;
ptd->time_end = t_end;

return NULL;
}

int main(int argc, char *argv[])
{
int op, ret;
Expand All @@ -544,6 +624,7 @@ int main(int argc, char *argv[])
uint64_t time_start, time_end;
uint64_t bytes_sent;
double mbps;
int use_fi_inject_write = 0;

pthread_mutex_init(&mutex, NULL);
tunables.threads = 1;
Expand All @@ -558,7 +639,7 @@ int main(int argc, char *argv[])
return -1;
}

while ((op = getopt(argc, argv, "hmt:i:l:s:" CT_STD_OPTS)) != -1) {
while ((op = getopt(argc, argv, "hmt:i:l:s:n" CT_STD_OPTS)) != -1) {
switch (op) {
default:
ct_parse_std_opts(op, optarg, hints);
Expand Down Expand Up @@ -603,6 +684,9 @@ int main(int argc, char *argv[])
}
window_size_large = window_size;
break;
case 'n':
use_fi_inject_write = ~0;
break;
case '?':
case 'h':
print_usage();
Expand Down Expand Up @@ -682,8 +766,19 @@ int main(int argc, char *argv[])
/* threaded section */
for (i = 0; i < tunables.threads; i++) {
iter_key.thread_id = i;
ret = pthread_create(&thread_data[i].thread, NULL,
thread_fn, iter_key.data);
if (use_fi_inject_write) {
ret = pthread_create(&thread_data[i].thread,
NULL,
size <=
fi->tx_attr->inject_size ?
thread_fn_inject : thread_fn,
iter_key.data);
} else {
ret = pthread_create(&thread_data[i].thread,
NULL, thread_fn,
iter_key.data);
}

if (ret != 0) {
printf("couldn't create thread %i\n", i);
pthread_exit(NULL); /* a more robust exit would be nice here */
Expand Down