Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Common Optimization Methods #361

Merged
merged 19 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions gerrychain/optimization/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .optimization import SingleMetricOptimizer
from .gingleator import Gingleator

__all__ = ['SingleMetricOptimizer', 'Gingleator']
167 changes: 167 additions & 0 deletions gerrychain/optimization/gingleator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from .optimization import SingleMetricOptimizer

from functools import partial
import numpy as np
import warnings


class Gingleator(SingleMetricOptimizer):
"""
`Gingleator` is a child class of `SingleMetricOptimizer` which can be used to search for plans
with increased numbers of Gingles' districts.
"""

def __init__(self, proposal, constraints, initial_state, tracking_funct=None,
minority_perc_col=None, threshold=0.5, score_function=None,
minority_pop_col=None, total_pop_col="TOTPOP",
min_perc_column_name="_gingleator_auxiliary_helper_updater_min_perc_col"):
"""
:param `proposal`: Function proposing the next state from the current state.
:param `constraints`: A function with signature ``Partition -> bool`` determining whether
the proposed next state is valid (passes all binary constraints). Usually this is a
:class:`~gerrychain.constraints.Validator` class instance.
:param `initial_state`: Initial :class:`gerrychain.partition.Partition` class.
:param `tracking_funct`: A function with the signiture ``Partition -> None`` to be run at
every step of the chain. If you'd like to externaly track stats beyond those reflected
in the optimation_metric here is where to implement that.
:param `minority_perc_col`: Which updater is a mapping of district ids to the fraction of
minority popultion within that district.
:param `threshold`: Beyond which fraction to consider something a "Gingles"
(or opportunity) district.
:param `score_function`: The function to using doing optimization. Should have the
signature ``Partition * str (minority_perc_col) * float (threhold) ->
'a where 'a is Comparable``. This class implement a few potnetial choices as class
methods.
:param `minority_pop_col`: If minority_perc_col is defined, the minority population column
with which to compute percentage.
:param `total_pop_col`: If minority_perc_col is defined, the total population column with
which to compute percentage.
:param `min_perc_column_name`: If minority_perc_col is defined, the name to give the created
percentage updater.
"""
if minority_perc_col is None and minority_pop_col is None:
raise ValueError("`minority_perc_col` and `minority_pop_col` cannot both be `None`. \
Unclear how to compute gingles district.")
elif minority_perc_col is not None and minority_pop_col is not None:
warnings.warn("`minority_perc_col` and `minority_pop_col` are both specified. By \
default `minority_perc_col` will be used.")
score_function = self.num_opportunity_dists if score_function is None else score_function

if minority_perc_col is None:
perc_up = {min_perc_column_name:
lambda part: {k: part[minority_pop_col][k] / part[total_pop_col][k]
for k in part.parts.keys()}}
initial_state.updaters.update(perc_up)

score = partial(score_function, minority_perc_col=minority_perc_col, threshold=threshold)

super().__init__(proposal, constraints, initial_state, score, minmax="max",
tracking_funct=tracking_funct)

"""
Score Functions
"""

@classmethod
def num_opportunity_dists(cls, part, minority_perc_col, threshold):
"""
Given a partition, returns the number of opportunity districts.

:param `part`: Partition to score.
:param `minority_perc_col`: Which updater is a mapping of district ids to the fraction of
minority popultion within that district.
:param `threshold`: Beyond which fraction to consider something a "Gingles"
(or opportunity) district.

:rtype int
"""
dist_percs = part[minority_perc_col].values()
return sum(list(map(lambda v: v >= threshold, dist_percs)))

@classmethod
def reward_partial_dist(cls, part, minority_perc_col, threshold):
"""
Given a partition, returns the number of opportunity districts + the percentage of the next
highest district.

:param `part`: Partition to score.
:param `minority_perc_col`: Which updater is a mapping of district ids to the fraction of
minority popultion within that district.
:param `threshold`: Beyond which fraction to consider something a "Gingles"
(or opportunity) district.

:rtype float
"""
dist_percs = part[minority_perc_col].values()
num_opport_dists = sum(list(map(lambda v: v >= threshold, dist_percs)))
next_dist = max(i for i in dist_percs if i < threshold)
return num_opport_dists + next_dist

@classmethod
def reward_next_highest_close(cls, part, minority_perc_col, threshold):
"""
Given a partition, returns the number of opportunity districts, if no additional district
is within 10% of reaching the threshold. If one is, the distance that district is from the
threshold is scaled between 0 and 1 and added to the count of opportunity districts.

:param `part`: Partition to score.
:param `minority_perc_col`: Which updater is a mapping of district ids to the fraction of
minority popultion within that district.
:param `threshold`: Beyond which fraction to consider something a "Gingles"
(or opportunity) district.

:rtype float
"""
dist_percs = part[minority_perc_col].values()
num_opport_dists = sum(list(map(lambda v: v >= threshold, dist_percs)))
next_dist = max(i for i in dist_percs if i < threshold)

if next_dist < threshold - 0.1:
return num_opport_dists
else:
return num_opport_dists + (next_dist - threshold + 0.1) * 10

@classmethod
def penalize_maximum_over(cls, part, minority_perc_col, threshold):
"""
Given a partition, returns the number of opportunity districts + (1 - the maximum excess)
scaled to between 0 and 1.

:param `part`: Partition to score.
:param `minority_perc_col`: Which updater is a mapping of district ids to the fraction of
minority popultion within that district.
:param `threshold`: Beyond which fraction to consider something a "Gingles"
(or opportunity) district.

:rtype float
"""
dist_percs = part[minority_perc_col].values()
num_opportunity_dists = sum(list(map(lambda v: v >= threshold, dist_percs)))
if num_opportunity_dists == 0:
return 0
else:
max_dist = max(dist_percs)
return num_opportunity_dists + (1 - max_dist) / (1 - threshold)

@classmethod
def penalize_avg_over(cls, part, minority_perc_col, threshold):
"""
Given a partition, returns the number of opportunity districts + (1 - the average excess)
scaled to between 0 and 1.

:param `part`: Partition to score.
:param `minority_perc_col`: Which updater is a mapping of district ids to the fraction of
minority popultion within that district.
:param `threshold`: Beyond which fraction to consider something a "Gingles"
(or opportunity) district.

:rtype float
"""
dist_percs = part[minority_perc_col].values()
opport_dists = list(filter(lambda v: v >= threshold, dist_percs))
if opport_dists == []:
return 0
else:
num_opportunity_dists = len(opport_dists)
avg_opportunity_dist = np.mean(opport_dists)
return num_opportunity_dists + (1 - avg_opportunity_dist) / (1 - threshold)
178 changes: 178 additions & 0 deletions gerrychain/optimization/optimization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
from ..chain import MarkovChain
from ..accept import always_accept
from ..random import random

import numpy as np


class SingleMetricOptimizer:
"""
SingleMetricOptimizer represents the class of algorithms / chains that optimize plans with
respect to a single metric. This class implements some common methods of optimization.
"""

def __init__(self, proposal, constraints, initial_state, optimization_metric, minmax="max",
tracking_funct=None):
"""
:param `proposal`: Function proposing the next state from the current state.
:param `constraints`: A function with signature ``Partition -> bool`` determining whether
the proposed next state is valid (passes all binary constraints). Usually this is a
:class:`~gerrychain.constraints.Validator` class instance.
:param `initial_state`: Initial :class:`gerrychain.partition.Partition` class.
:param `optimization_metric`: The score function with which to optimize over. This should
have the signiture: ``Partition -> 'a where 'a is Comparable``
:param `minmax` (str): Whether to minimize or maximize the function?
:param `tracking_funct`: A function with the signiture ``Partition -> None`` to be run at
every step of the chain. If you'd like to externaly track stats beyond those reflected
in the optimation_metric here is where to implement that.
"""
self.part = initial_state
self.proposal = proposal
self.constraints = constraints
self.score = optimization_metric
self.maximize = minmax == "max" or minmax == "maximize"
self.tracking_funct = tracking_funct if tracking_funct is not None else lambda p: None

def _is_improvement(self, part_score, max_score):
if self.maximize:
return part_score >= max_score
else:
return part_score <= max_score

def short_bursts(self, burst_length, num_bursts, accept=always_accept):
"""
Preforms a short burst run using the instance's score function. Each burst starts at the
best preforming plan of the previsdous burst. If there's a tie, the later observed one is
selected.

:param `burst_length` (int): How many steps to run within each burst?
:param `num_bursts` (int): How many bursts to preform?
:param `accept`: Function accepting or rejecting the proposed state. In the most basic
use case, this always returns ``True``.

:rtype (Partition * np.array): Tuple of maximal (or minimal) observed partition and a 2D
numpy array of observed scores over each of the bursts.
"""
max_part = (self.part, self.score(self.part))
observed_scores = np.zeros((num_bursts, burst_length))

for i in range(num_bursts):
chain = MarkovChain(self.proposal, self.constraints, accept, max_part[0], burst_length)

for j, part in enumerate(chain):
part_score = self.score(part)
observed_scores[i][j] = part_score

if self._is_improvement(part_score, max_part[1]):
max_part = (part, part_score)

self.tracking_fun(part)

return (max_part[0], observed_scores)

def _titled_acceptance_function(self, p):
"""
Function factory that binds and returns a tilted acceptance function.

:rtype (``Partition -> Bool``)
"""
def tilted_acceptance_function(part):
if part.parent is None:
return True

part_score = self.score(part)
prev_score = self.score(part.parent)

if self.maximize and part_score >= prev_score:
return True
elif not self.maximize and part_score <= prev_score:
return True
else:
return random.random() < p

return tilted_acceptance_function

def tilted_short_bursts(self, burst_length, num_bursts, p):
"""
Preforms a short burst run using the instance's score function. Each burst starts at the
best preforming plan of the previsdous burst. If there's a tie, the later observed one is
selected. Within each burst a tilted acceptance function is used where better scoring plans
are always accepted and worse scoring plans are accepted with probability `p`.

:param `burst_length` (int): How many steps to run within each burst?
:param `num_bursts` (int): How many bursts to preform?
:param `p` (float): The probability with which to accept worse scoring plans.

:rtype (Partition * np.array): Tuple of maximal (or minimal) observed partition and a 2D
numpy array of observed scores over each of the bursts.
"""
return self.short_bursts(burst_length, num_bursts,
accept=self._titled_acceptance_function(p))

def variable_lenght_short_bursts(self, num_steps, stuck_buffer, accept=always_accept):
"""
Preforms a short burst where the burst length is alowed to increase as it gets harder to
find high scoring plans. The initial burst length is set to 2, and it is doubled each time
there is no improvement over the passed number (`stuck_buffer`) of runs.

:param `num_steps` (int): Total number of steps to take overall.
:param `stuck_buffer` (int): How many bursts of a given length with no improvement to allow
before imcreasing the burst length.
:param accept: Function accepting or rejecting the proposed state. In the most basic
use case, this always returns ``True``.

:rtype (Partition * np.array): Tuple of maximal (or minimal) observed partition and a 1D
numpy array of observed scores over each of the bursts.
"""
max_part = (self.part, self.score(self.part))
observed_scores = np.zeros(num_steps)
time_stuck = 0
burst_length = 2
i = 0

while(i < num_steps):
chain = MarkovChain(self.proposal, self.constraints, accept, max_part[0], burst_length)
for part in chain:
part_score = self.score(part)

if self._is_improvement(part_score, max_part[1]):
max_part = (part, part_score)
time_stuck = 0
else:
time_stuck += 1

i += 1
if i >= num_steps:
break

if time_stuck >= stuck_buffer * burst_length:
burst_length *= 2

return (max_part[0], observed_scores)

def tilted_run(self, num_steps, p):
"""
Preforms a tilted run. A chain where the acceptance function always accepts better plans
and accepts worse plans with some probabilty.

:param `num_steps` (int): Total number of steps to take overall.
:param `p` (float): The probability with which to accept worse scoring plans.

:rtype (Partition * np.array): Tuple of maximal (or minimal) observed partition and a 1D
numpy array of observed scores over each of the bursts.
"""
chain = MarkovChain(self.proposal, self.constraints, self._titled_acceptance_function(p),
self.part, num_steps)
max_part = (self.part, self.score(self.part))
observed_scores = np.zeros(num_steps)

for i, part in enumerate(chain):
part_score = self.score(part)
observed_scores[i] = part_score

if self._is_improvement(part_score, max_part[1]):
max_part = (part, part_score)

self.tracking_fun(part)

return (max_part[0], observed_scores)