Source code for pabutools.election.pabulib

"""
Tools to work with PaBuLib.
"""

from natsort import natsorted
from copy import deepcopy

from pabutools.fractions import str_as_frac
from pabutools.election.instance import Instance, Project
from pabutools.election.ballot import (
    ApprovalBallot,
    CardinalBallot,
    OrdinalBallot,
    CumulativeBallot,
    AbstractCardinalBallot,
)
from pabutools.election.profile import (
    AbstractProfile,
    Profile,
    ApprovalProfile,
    AbstractApprovalProfile,
    CardinalProfile,
    AbstractCardinalProfile,
    CumulativeProfile,
    AbstractCumulativeProfile,
    OrdinalProfile,
    AbstractOrdinalProfile,
)

import urllib.request
import csv
import os


def parse_pabulib_from_string(file_content: str) -> tuple[Instance, Profile]:
    """
    Parses a PaBuLib file given as a string and returns the corresponding instance and profile. The
    returned profile will be of the correct type depending on the metadata in the file.

    Parameters
    ----------
        file_content : str
            String containing the contents of the PaBuLib file to be parsed.

    Returns
    -------
        tuple[:py:class:`~pabutools.election.instance.Instance`, :py:class:`~pabutools.election.profile.profile.Profile`]
            The instance and the profile corresponding to the file.
    """
    instance = Instance()
    ballots = []
    optional_sets = {"categories": set(), "targets": set()}

    lines = file_content.splitlines()
    section = ""
    header = []
    reader = csv.reader(lines, delimiter=";")
    for row in reader:
        if len(row) == 1 and len(row[0].strip()) == 0:
            continue
        if str(row[0]).strip().lower() in ["meta", "projects", "votes"]:
            section = str(row[0]).strip().lower()
            header = next(reader)
        elif section == "meta":
            instance.meta[row[0].strip()] = row[1].strip()
        elif section == "projects":
            p = Project()
            project_meta = dict()
            for i in range(len(row)):
                key = header[i].strip()
                p.name = row[0].strip()
                if row[i].strip().lower() != "none":
                    if key in ["category", "categories"]:
                        project_meta["categories"] = {
                            entry.strip() for entry in row[i].split(",")
                        }
                        p.categories = set(project_meta["categories"])
                        optional_sets["categories"].update(project_meta["categories"])
                    elif key in ["target", "targets"]:
                        project_meta["targets"] = {
                            entry.strip() for entry in row[i].split(",")
                        }
                        p.targets = set(project_meta["targets"])
                        optional_sets["targets"].update(project_meta["targets"])
                    else:
                        project_meta[key] = row[i].strip()
            p.cost = str_as_frac(project_meta["cost"].replace(",", "."))
            instance.add(p)
            instance.project_meta[p] = project_meta
        elif section == "votes":
            ballot_meta = dict()
            for i in range(len(row)):
                if row[i].strip().lower() != "none":
                    ballot_meta[header[i].strip()] = row[i].strip()
            vote_type = instance.meta["vote_type"]
            if vote_type == "approval":
                ballot = ApprovalBallot()
                for project_name in ballot_meta["vote"].split(","):
                    if project_name:
                        ballot.add(instance.get_project(project_name))
                ballot_meta.pop("vote")
            elif vote_type in ["scoring", "cumulative"]:
                if vote_type == "scoring":
                    ballot = CardinalBallot()
                else:
                    ballot = CumulativeBallot()
                if "points" in ballot_meta:  # if not, the ballot should be empty
                    points = ballot_meta["points"].split(",")
                    for index, project_name in enumerate(
                        ballot_meta["vote"].split(",")
                    ):
                        ballot[instance.get_project(project_name)] = str_as_frac(
                            points[index].strip()
                        )
                    ballot_meta.pop("vote")
                    ballot_meta.pop("points")
            elif vote_type == "ordinal":
                ballot = OrdinalBallot()
                for project_name in ballot_meta["vote"].split(","):
                    if project_name:
                        ballot.append(instance.get_project(project_name))
                ballot_meta.pop("vote")
            else:
                raise NotImplementedError(
                    "The PaBuLib parser cannot parse {} profiles for now.".format(
                        instance.meta["vote_type"]
                    )
                )
            ballot.meta = ballot_meta
            ballots.append(ballot)

    legal_min_length = instance.meta.get("min_length", None)
    if legal_min_length is not None:
        legal_min_length = int(legal_min_length)
        if legal_min_length == 1:
            legal_min_length = None
    legal_max_length = instance.meta.get("max_length", None)
    if legal_max_length is not None:
        legal_max_length = int(legal_max_length)
        if legal_max_length >= len(instance):
            legal_max_length = None
    legal_min_cost = instance.meta.get("min_sum_cost", None)
    if legal_min_cost is not None:
        legal_min_cost = str_as_frac(legal_min_cost)
        if legal_min_cost == 0:
            legal_min_cost = None
    legal_max_cost = instance.meta.get("max_sum_cost", None)
    if legal_max_cost is not None:
        legal_max_cost = str_as_frac(legal_max_cost)
        if legal_max_cost >= instance.budget_limit:
            legal_max_cost = None
    legal_min_total_score = instance.meta.get("min_sum_points", None)
    if legal_min_total_score is not None:
        legal_min_total_score = str_as_frac(legal_min_total_score)
        if legal_min_total_score == 0:
            legal_min_total_score = None
    legal_max_total_score = instance.meta.get("max_sum_points", None)
    if legal_max_total_score is not None:
        legal_max_total_score = str_as_frac(legal_max_total_score)
    legal_min_score = instance.meta.get("min_points", None)
    if legal_min_score is not None:
        legal_min_score = str_as_frac(legal_min_score)
        if legal_min_score == 0:
            legal_min_score = None
    legal_max_score = instance.meta.get("max_points", None)
    if legal_max_score is not None:
        legal_max_score = str_as_frac(legal_max_score)
        if legal_max_score == legal_max_total_score:
            legal_max_score = None

    profile = None
    if instance.meta["vote_type"] == "approval":
        profile = ApprovalProfile(
            deepcopy(ballots),
            legal_min_length=legal_min_length,
            legal_max_length=legal_max_length,
            legal_min_cost=legal_min_cost,
            legal_max_cost=legal_max_cost,
        )
    elif instance.meta["vote_type"] == "scoring":
        profile = CardinalProfile(
            deepcopy(ballots),
            legal_min_length=legal_min_length,
            legal_max_length=legal_max_length,
            legal_min_score=legal_min_score,
            legal_max_score=legal_max_score,
        )
    elif instance.meta["vote_type"] == "cumulative":
        profile = CumulativeProfile(
            deepcopy(ballots),
            legal_min_length=legal_min_length,
            legal_max_length=legal_max_length,
            legal_min_score=legal_min_score,
            legal_max_score=legal_max_score,
            legal_min_total_score=legal_min_total_score,
            legal_max_total_score=legal_max_total_score,
        )
    elif instance.meta["vote_type"] == "ordinal":
        profile = OrdinalProfile(
            deepcopy(ballots),
            legal_min_length=legal_min_length,
            legal_max_length=legal_max_length,
        )

    # We retrieve the budget limit from the meta information
    instance.budget_limit = str_as_frac(instance.meta["budget"].replace(",", "."))

    # We add the category and target information that we collected from the projects
    instance.categories = optional_sets["categories"]
    instance.targets = optional_sets["targets"]

    return instance, profile


[docs] def parse_pabulib(file_path: str) -> tuple[Instance, Profile]: """ Parses a PaBuLib files and returns the corresponding instance and profile. The returned profile will be of the correct type depending on the metadata in the file. Parameters ---------- file_path : str Path to the PaBuLib file to be parsed. Returns ------- tuple[:py:class:`~pabutools.election.instance.Instance`, :py:class:`~pabutools.election.profile.profile.Profile`] The instance and the profile corresponding to the file. """ with open(file_path, "r", newline="", encoding="utf-8-sig") as csvfile: instance, profile = parse_pabulib_from_string(csvfile.read()) instance.file_path = file_path instance.file_name = os.path.basename(file_path) return instance, profile
def parse_pabulib_from_url(url: str) -> tuple[Instance, Profile]: """ Parses a PaBuLib files given a URL and returns the corresponding instance and profile. The returned profile will be of the correct type depending on the metadata in the file. Parameters ---------- url : str URL to the PaBuLib file to be parsed. Returns ------- tuple[:py:class:`~pabutools.election.instance.Instance`, :py:class:`~pabutools.election.profile.profile.Profile`] The instance and the profile corresponding to the file. """ data = urllib.request.urlopen(url) lines = data.read().decode("utf-8-sig") data.close() instance, profile = parse_pabulib_from_string(lines) instance.file_path = url instance.file_name = url.split("/")[-1] return instance, profile
[docs] def election_as_pabulib_string(instance: Instance, profile: AbstractProfile) -> str: """ Creates a string representing the instance and the profile according to the Pabulib standard (as specified in https://arxiv.org/pdf/2305.11035.pdf). Parameters ---------- instance: :py:class:`~pabutools.election.instance.Instance` The instance. profile: :py:class:`~pabutools.election.profile.profile.Profile` The profile. Returns ------- str The instance and profile represented as a Pabulib string """ def update_meta_value(meta_dict, inst_meta, field, mandatory=False): if field in inst_meta: meta_dict[field] = inst_meta[field] elif mandatory: meta_dict[field] = "Auto-filled " + str(field) meta = {} update_meta_value(meta, instance.meta, "description", mandatory=True) update_meta_value(meta, instance.meta, "country", mandatory=True) update_meta_value(meta, instance.meta, "unit", mandatory=True) update_meta_value(meta, instance.meta, "subunit") update_meta_value(meta, instance.meta, "instance", mandatory=True) meta["num_projects"] = len(instance) meta["num_votes"] = profile.num_ballots() meta["budget"] = instance.budget_limit if isinstance(profile, AbstractApprovalProfile): meta["vote_type"] = "approval" elif isinstance(profile, AbstractCumulativeProfile): meta["vote_type"] = "cumulative" elif isinstance(profile, AbstractCardinalProfile): meta["vote_type"] = "scoring" elif isinstance(profile, AbstractOrdinalProfile): meta["vote_type"] = "ordinal" update_meta_value(meta, instance.meta, "rule", mandatory=True) update_meta_value(meta, instance.meta, "date_begin", mandatory=False) update_meta_value(meta, instance.meta, "date_end", mandatory=False) update_meta_value(meta, instance.meta, "date_language", mandatory=False) update_meta_value(meta, instance.meta, "date_edition", mandatory=False) update_meta_value(meta, instance.meta, "date_district", mandatory=False) update_meta_value(meta, instance.meta, "date_comment", mandatory=False) if profile.legal_min_length: meta["min_length"] = str(profile.legal_min_length) if profile.legal_max_length: meta["max_length"] = str(profile.legal_max_length) if isinstance(profile, AbstractApprovalProfile): if profile.legal_min_cost: meta["min_sum_cost"] = str(profile.legal_min_cost) if profile.legal_max_cost: meta["max_sum_cost"] = str(profile.legal_max_cost) elif isinstance(profile, AbstractCumulativeProfile): if profile.legal_min_score: meta["min_points"] = str(profile.legal_min_score) if profile.legal_max_score: meta["max_points"] = str(profile.legal_max_score) if profile.legal_min_total_score: meta["min_sum_points"] = str(profile.legal_min_total_score) if profile.legal_max_total_score: meta["max_sum_points"] = str(profile.legal_max_total_score) else: meta["max_sum_points"] = "Auto-filled max_sum_points" elif isinstance(profile, AbstractCardinalProfile): if profile.legal_min_score: meta["min_points"] = str(profile.legal_min_score) if profile.legal_max_score: meta["max_points"] = str(profile.legal_max_score) update_meta_value(meta, instance.meta, "default_score", mandatory=False) elif isinstance(profile, AbstractOrdinalProfile): update_meta_value(meta, instance.meta, "scoring_fn", mandatory=False) for key, value in instance.meta.items(): if key not in meta: meta[key] = value project_dicts = [] project_keys = ["project_id", "cost"] for project in instance: project_meta = {"project_id": project.name, "cost": project.cost} if "name" in instance.project_meta[project]: project_meta["name"] = instance.project_meta[project]["name"] if "name" not in project_keys: project_keys.append("name") if project.categories: project_meta["category"] = ",".join(project.categories) if "category" not in project_keys: project_keys.append("category") if project.targets: project_meta["target"] = ",".join(project.targets) if "target" not in project_keys: project_keys.append("target") for key, value in instance.project_meta[project].items(): if key not in project_meta and key not in ["categories", "targets"]: project_meta[key] = value if key not in project_keys: project_keys.append(key) project_dicts.append(project_meta) project_dicts = natsorted(project_dicts, key=lambda d: d["project_id"]) vote_dicts = [] vote_keys = ["voter_id"] voter_ids = set() for index, ballot in enumerate(profile): if "voter_id" in ballot.meta: voter_id = str(ballot.meta["voter_id"]) else: voter_id = index counter = 1 base_id = voter_id while voter_id in voter_ids: voter_id = base_id + "__" + str(counter) vote_meta = {"voter_id": voter_id} if "age" in ballot.meta: vote_meta["age"] = ballot.meta["age"] if "age" not in vote_keys: vote_keys.append("age") if "sex" in ballot.meta: vote_meta["sex"] = ballot.meta["sex"] if "sex" not in vote_keys: vote_keys.append("sex") if "voting_method" in ballot.meta: vote_meta["voting_method"] = ballot.meta["voting_method"] if "voting_method" not in vote_keys: vote_keys.append("voting_method") vote_meta["vote"] = ",".join([p.name for p in ballot]) if "vote" not in vote_keys: vote_keys.append("vote") if isinstance(ballot, AbstractCardinalBallot): vote_meta["points"] = ",".join([str(float(ballot[p])) for p in ballot]) if "points" not in vote_keys: vote_keys.append("points") for key, value in ballot.meta.items(): if key not in vote_meta: vote_meta[key] = value if key not in vote_keys: project_keys.append(key) for mul in range(profile.multiplicity(ballot)): vote_dicts.append(vote_meta) vote_dicts = natsorted(vote_dicts, key=lambda d: d["voter_id"]) file_str = "META\nkey;value\n" for key, value in meta.items(): file_str += f"{key};{value}\n" file_str += "PROJECTS\n" + ";".join(project_keys) + "\n" for project_dict in project_dicts: file_str += ( ";".join([str(project_dict.get(key, "None")) for key in project_keys]) + "\n" ) file_str += "VOTES\n" + ";".join(vote_keys) + "\n" for vote_dict in vote_dicts: file_str += ( ";".join([str(vote_dict.get(key, "None")) for key in vote_keys]) + "\n" ) return file_str
[docs] def write_pabulib(instance: Instance, profile: AbstractProfile, file_path: str) -> None: """Writes an instance and a profile to a file using the pabulib format (as specified in https://arxiv.org/pdf/2305.11035.pdf). Parameters ---------- instance: :py:class:`~pabutools.election.instance.Instance` The instance. profile: :py:class:`~pabutools.election.profile.profile.Profile` The profile. file_path: str The path to the output file. Defaults to :code:`None`. """ with open(file_path, "w", encoding="utf-8-sig") as f: f.write(election_as_pabulib_string(instance, profile))