Skip to content

Commit

Permalink
Replace "stokes" terminology with "polarization product"
Browse files Browse the repository at this point in the history
  • Loading branch information
mpokorny committed Mar 26, 2018
1 parent 7debae4 commit 7d08293
Showing 1 changed file with 64 additions and 47 deletions.
111 changes: 64 additions & 47 deletions vyssim/vyssim.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
#define DEFAULT_NUM_ANTENNAS 27
#define DEFAULT_NUM_SPECTRAL_WINDOWS 4
#define DEFAULT_NUM_CHANNELS 64
#define DEFAULT_NUM_STOKES 2
#define DEFAULT_NUM_POLPRODS 2
#define DEFAULT_INTEGRATION_TIME_MICROSEC 100000
#define DEFAULT_SIGNAL_MSG_NUM_SPECTRA 32
#define DEFAULT_DATA_BUFFER_LENGTH_SEC 5
Expand All @@ -94,7 +94,7 @@
#define SIGNAL_MSG_MAX_POSTED 1000
#define SIGNAL_MSG_QUEUE_LENGTH (2 * SIGNAL_MSG_MAX_POSTED)
#define SIGNAL_MSG_BLOCK_LENGTH (4 * SIGNAL_MSG_QUEUE_LENGTH)
#define MAX_STOKES_INDICES 4
#define MAX_POLPRODS_INDICES 4

#define CM_EVENT_FD 0
#define MULTICAST_EVENT_FD 1
Expand All @@ -105,8 +105,8 @@

struct spectral_window_descriptor {
unsigned index;
uint8_t num_stokes_indices;
unsigned stokes_indices[MAX_STOKES_INDICES];
uint8_t num_polprods_indices;
unsigned polprods_indices[MAX_POLPRODS_INDICES];
};

struct client_connection_context {
Expand All @@ -118,7 +118,7 @@ struct client_connection_context {
struct dataset_parameters {
unsigned num_antennas;
unsigned num_spectral_windows;
unsigned num_stokes;
unsigned num_polprods;
unsigned num_channels;
unsigned integration_time_microsec;
};
Expand Down Expand Up @@ -196,7 +196,7 @@ static int set_nonblocking(int fd);
static struct vys_signal_msg *gen_one_signal_msg(
struct vyssim_context *vyssim, guint32 *id_num,
guint64 timestamp_us, unsigned ant0, unsigned ant1,
unsigned spectral_window_index, unsigned stokes_index)
unsigned spectral_window_index, unsigned polprods_index)
__attribute__((nonnull,returns_nonnull));
static void *data_generator(struct vyssim_context *vyssim)
__attribute__((nonnull));
Expand Down Expand Up @@ -291,7 +291,7 @@ static gchar * add_param(
static bool parse_options(
int *argc, char **argv[], unsigned *num_antennas,
unsigned *num_spectral_windows, unsigned *num_channels,
unsigned *num_stokes, unsigned *integration_time_microsec,
unsigned *num_polprods, unsigned *integration_time_microsec,
unsigned *signal_msg_num_spectra, unsigned *data_buffer_length_sec,
char **vys_configuration_path, GError **error)
__attribute__((nonnull));
Expand All @@ -302,8 +302,8 @@ static char num_spectral_windows_lname[] = "num-spectral-windows";
static char num_spectral_windows_sname[] = "w";
static char num_channels_lname[] = "num-channels";
static char num_channels_sname[] = "c";
static char num_stokes_lname[] = "num-stokes";
static char num_stokes_sname[] = "k";
static char num_polprods_lname[] = "num-polprods";
static char num_polprods_sname[] = "p";
static char integration_time_microsec_lname[] = "integration-time";
static char integration_time_microsec_sname[] = "i";
static char signal_msg_num_spectra_lname[] = "signal-message-length";
Expand Down Expand Up @@ -379,7 +379,7 @@ pop_msg_from_queue(struct vyssim_context *vyssim)
static struct vys_signal_msg *
gen_one_signal_msg(struct vyssim_context *vyssim, guint32 *id_num,
guint64 timestamp_us, unsigned ant0, unsigned ant1,
unsigned spectral_window_index, unsigned stokes_index)
unsigned spectral_window_index, unsigned polprods_index)
{
static gchar *config_id = NULL;
if (G_UNLIKELY(config_id == NULL))
Expand All @@ -400,7 +400,7 @@ gen_one_signal_msg(struct vyssim_context *vyssim, guint32 *id_num,
payload->stations[1] = ant1;
payload->spectral_window_index = spectral_window_index;
payload->baseband_id = 0;
payload->polarization_product_id = stokes_index;
payload->polarization_product_id = polprods_index;
payload->num_spectra = mcast_ctx->signal_msg_num_spectra;
for (unsigned n = 0; n < mcast_ctx->signal_msg_num_spectra; ++n) {
struct vys_spectrum_info *info = &(payload->infos[n]);
Expand All @@ -421,7 +421,7 @@ gen_one_signal_msg(struct vyssim_context *vyssim, guint32 *id_num,
*fbuff++ = (float)ant0;
*fbuff++ = (float)ant1;
*fbuff++ = (float)spectral_window_index;
*fbuff++ = (float)stokes_index;
*fbuff++ = (float)polprods_index;
server_ctx->data_buffer_index =
(server_ctx->data_buffer_index + 1) % server_ctx->num_data_buffers;
}
Expand Down Expand Up @@ -450,10 +450,10 @@ data_generator(struct vyssim_context *vyssim)
&g_array_index(vyssim->spw_descriptors,
struct spectral_window_descriptor,
spw_idx);
for (unsigned sto_idx = 0;
!quit && sto_idx < spw_desc->num_stokes_indices;
++sto_idx) {
unsigned sto = spw_desc->stokes_indices[sto_idx];
for (unsigned pp_idx = 0;
!quit && pp_idx < spw_desc->num_polprods_indices;
++pp_idx) {
unsigned pp = spw_desc->polprods_indices[pp_idx];
MUTEX_LOCK(ctx->queue_mutex);
if (vyssim->mcast_ctx.signal_msg_num_spectra == 0) {
quit = true;
Expand All @@ -462,7 +462,7 @@ data_generator(struct vyssim_context *vyssim)
vyssim,
gen_one_signal_msg(
vyssim, &id_num, t_us, a0 + 1, a1 + 1,
spw_desc->index, sto));
spw_desc->index, pp));
}
MUTEX_UNLOCK(ctx->queue_mutex);
}
Expand Down Expand Up @@ -704,7 +704,7 @@ num_spectra_per_integration_per_baseline(const struct vyssim_context *vyssim)
for (unsigned i = 0; i < spw_descs->len; ++i) {
result +=
(&g_array_index(spw_descs, struct spectral_window_descriptor, i))
->num_stokes_indices;
->num_polprods_indices;
}
return result;
}
Expand Down Expand Up @@ -1411,8 +1411,8 @@ parse_param(const gchar *option_name, const gchar *value, GHashTable *params,
"Failed to parse %s value: %s", value, strerror(EINVAL));
return false;
}
if (strcmp(opt, num_stokes_lname) == 0
|| strcmp(opt, num_stokes_sname) == 0) {
if (strcmp(opt, num_polprods_lname) == 0
|| strcmp(opt, num_polprods_sname) == 0) {
if (!(l == 1 || l == 2 || l == 4)) {
g_set_error(error, VYSSIM_ARG_ERROR, VYSSIM_ARG_ERROR_RANGE,
"%s value must be 1, 2, or 4", option_name);
Expand Down Expand Up @@ -1445,7 +1445,7 @@ add_param(GHashTable *params, const gchar *lname, const gchar *sname,
static bool
parse_options(int *argc, char **argv[], unsigned *num_antennas,
unsigned *num_spectral_windows, unsigned *num_channels,
unsigned *num_stokes, unsigned *integration_time_microsec,
unsigned *num_polprods, unsigned *integration_time_microsec,
unsigned *signal_msg_num_spectra,
unsigned *data_buffer_length_sec,
char **vys_configuration_path,
Expand Down Expand Up @@ -1478,11 +1478,11 @@ parse_options(int *argc, char **argv[], unsigned *num_antennas,
params, num_channels_lname, num_channels_sname,
DEFAULT_NUM_CHANNELS, "Number of channels (per spw)",
num_channels);
gchar *num_stokes_desc =
gchar *num_polprods_desc =
add_param(
params, num_stokes_lname, num_stokes_sname,
DEFAULT_NUM_STOKES, "Number of stokes products (per spw)",
num_stokes);
params, num_polprods_lname, num_polprods_sname,
DEFAULT_NUM_POLPRODS, "Number of polarization products (per spw)",
num_polprods);
gchar *integration_time_microsec_desc =
add_param(
params, integration_time_microsec_lname,
Expand Down Expand Up @@ -1510,9 +1510,9 @@ parse_options(int *argc, char **argv[], unsigned *num_antennas,
{num_channels_lname, num_channels_sname[0], 0,
G_OPTION_ARG_CALLBACK, parse_param,
num_channels_desc, "N"},
{num_stokes_lname, num_stokes_sname[0], 0,
{num_polprods_lname, num_polprods_sname[0], 0,
G_OPTION_ARG_CALLBACK, parse_param,
num_stokes_desc, "N"},
num_polprods_desc, "N"},
{integration_time_microsec_lname, integration_time_microsec_sname[0], 0,
G_OPTION_ARG_CALLBACK, parse_param,
integration_time_microsec_desc, "N"},
Expand All @@ -1534,7 +1534,7 @@ parse_options(int *argc, char **argv[], unsigned *num_antennas,
g_free(num_antennas_desc);
g_free(num_spectral_windows_desc);
g_free(num_channels_desc);
g_free(num_stokes_desc);
g_free(num_polprods_desc);
g_free(integration_time_microsec_desc);
g_free(signal_msg_num_spectra_desc);
g_free(data_buffer_length_sec_desc);
Expand All @@ -1556,7 +1556,7 @@ main(int argc, char *argv[])
&vyssim.params.num_antennas,
&vyssim.params.num_spectral_windows,
&vyssim.params.num_channels,
&vyssim.params.num_stokes,
&vyssim.params.num_polprods,
&vyssim.params.integration_time_microsec,
&vyssim.mcast_ctx.signal_msg_num_spectra,
&vyssim.data_buffer_length_sec,
Expand Down Expand Up @@ -1593,42 +1593,59 @@ main(int argc, char *argv[])
MPI_Comm_size(vyssim.comm, &num_servers);

/* distribute products among all servers */
unsigned sto_group_size =
(vyssim.params.num_spectral_windows * vyssim.params.num_stokes)
unsigned polprod_group_size =
(vyssim.params.num_spectral_windows * vyssim.params.num_polprods)
/ num_servers;// min number of products per server
if (sto_group_size == 0) {
if (polprod_group_size == 0) {
fprintf(stderr,
"require reduced number of servers (no more than %u for "
"provided configuration)\n",
vyssim.params.num_spectral_windows *
vyssim.params.num_stokes);
vyssim.params.num_polprods);
goto cleanup_and_return;
}
/* min power of two products per server*/
sto_group_size =
1 << g_bit_nth_msf(sto_group_size, 8 * sizeof(unsigned));
/* distribute by no more than num_stokes products at a time */
sto_group_size = MIN(sto_group_size, vyssim.params.num_stokes);
/* number of groups a spectral window's stokes products are divided into */
unsigned num_sto_groups = vyssim.params.num_stokes / sto_group_size;
/* total number of groups of stokes products to distribute */
polprod_group_size =
1 << g_bit_nth_msf(polprod_group_size, 8 * sizeof(unsigned));
/* distribute by no more than num_polprods products at a time */
polprod_group_size = MIN(polprod_group_size, vyssim.params.num_polprods);
/* number of groups a spectral window's polarization products are divided
* into */
unsigned num_polprod_groups =
vyssim.params.num_polprods / polprod_group_size;
/* total number of groups of polprods products to distribute */
unsigned num_groups =
vyssim.params.num_spectral_windows * num_sto_groups;
/* distribute groups (spectral window plus a number of stokes products) in
vyssim.params.num_spectral_windows * num_polprod_groups;
/* distribute groups (spectral window plus a number of polprods products) in
* round-robin order to all processes (servers) */
vyssim.spw_descriptors = g_array_new(
FALSE, FALSE, sizeof(struct spectral_window_descriptor));
for (unsigned grp = rank; grp < num_groups; grp += num_servers) {
unsigned sto_group = grp % num_sto_groups;
unsigned polprod_group = grp % num_polprod_groups;
struct spectral_window_descriptor spw_desc = {
.index = grp / num_sto_groups,
.num_stokes_indices = sto_group_size
.index = grp / num_polprod_groups,
.num_polprods_indices = polprod_group_size
};
for (unsigned s = 0; s < sto_group_size; ++s)
spw_desc.stokes_indices[s] = sto_group * sto_group_size + s;
for (unsigned p = 0; p < polprod_group_size; ++p)
spw_desc.polprods_indices[p] = polprod_group * polprod_group_size + p;
g_array_append_val(vyssim.spw_descriptors, spw_desc);
}

if (rank == 0)
g_print("Starting vyssim using %u process%s\n"
"with %u antennas, %u spectral windows of %u channels and\n"
"%u polarization products each, distributed %u way%s,\n"
"integration time of %u microseconds,\n"
"%u spectra per signal message, spectra retained for %u seconds\n",
num_servers, ((num_servers > 1) ? "es" : ""),
vyssim.params.num_antennas, vyssim.params.num_spectral_windows,
vyssim.params.num_channels,
vyssim.params.num_polprods, num_polprod_groups,
((num_polprod_groups > 1) ? "s" : ""),
vyssim.params.integration_time_microsec,
vyssim.mcast_ctx.signal_msg_num_spectra,
vyssim.data_buffer_length_sec);

vyssim.vconfig = vys_configuration_new(vys_configuration_path);

rc = run(&vyssim);
Expand Down

0 comments on commit 7d08293

Please sign in to comment.