Source code for hivemind.state

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from typing import List, Dict, Any
from ipfs_dict_chain.IPFSDictChain import IPFSDictChain
from itertools import combinations
import logging

from .issue import HivemindIssue
from .option import HivemindOption
from .opinion import HivemindOpinion
from .utils import verify_message

LOG = logging.getLogger(__name__)


[docs] class HivemindState(IPFSDictChain): """A class representing the current state of a Hivemind voting issue. This class manages the state of a voting issue, including options, opinions, and voting results. It handles the addition of new options and opinions, calculates voting results, and manages restrictions on who can vote. :ivar hivemind_id: The IPFS hash of the associated hivemind issue :vartype hivemind_id: str | None :ivar _issue: The associated hivemind issue object :vartype _hivemind_issue: HivemindIssue | None :ivar option_cids: List of option CIDs :vartype option_cids: List[str] :ivar opinion_cids: List of dictionaries containing opinions for each question :vartype opinion_cids: List[Dict[str, Any]] :ivar signatures: Dictionary mapping addresses to their signatures :vartype signatures: Dict[str, Dict[str, Dict[str, int]]] :ivar participants: Dictionary mapping addresses to their participation data :vartype participants: Dict[str, Any] :ivar selected: List of options that have been selected :vartype selected: List[str] :ivar final: Whether the hivemind is finalized :vartype final: bool """
[docs] def __init__(self, cid: str = None) -> None: """Initialize a new HivemindState. :param cid: The IPFS multihash of the state :type cid: str """ self.hivemind_id: str | None = None self._issue: HivemindIssue | None = None self.option_cids: List[str] = [] self.opinion_cids: List[Dict[str, Any]] = [{}] self.signatures: Dict[str, Dict[str, Dict[str, int]]] = {} self.participants: Dict[str, Any] = {} self.selected: List[str] = [] self.final: bool = False super(HivemindState, self).__init__(cid=cid) self._options: List[HivemindOption] = [HivemindOption(cid=option_cid) for option_cid in self.option_cids] self._opinions: List = [] self._rankings: List = [] for question_index in range(len(self.opinion_cids)): opinions = [] rankings = {} for participant, opinion_data in self.opinion_cids[question_index].items(): opinion = HivemindOpinion(cid=opinion_data['opinion_cid']) opinions.append(opinion) rankings[opinion_data['opinion_cid']] = opinion.ranking.get(options=self._options) self._opinions.append(opinions) self._rankings.append(rankings) self._results = None
[docs] def hivemind_issue(self) -> HivemindIssue: """Get the associated hivemind issue. :return: The associated hivemind issue object :rtype: HivemindIssue """ return self._issue
[docs] def get_options(self) -> List[HivemindOption]: """Get list of hivemind options. :return: List of HivemindOption objects :rtype: List[HivemindOption] """ return self._options
[docs] def set_hivemind_issue(self, issue_cid: str) -> None: """Set the associated hivemind issue. :param issue_cid: IPFS hash of the hivemind issue :type issue_cid: str :return: None """ self.hivemind_id = issue_cid self._issue = HivemindIssue(cid=self.hivemind_id) self.opinion_cids = [{} for _ in range(len(self._issue.questions))]
[docs] def add_predefined_options(self) -> Dict[str, Dict[str, Any]]: """Add predefined options to the hivemind state. :return: Dictionary mapping option CIDs to their data :rtype: Dict[str, Dict[str, Any]] """ options = {} if self._issue.answer_type == 'Bool': true_option = HivemindOption() true_option.set_issue(self.hivemind_id) true_option.text = self._issue.constraints['true_value'] true_option.set(value=True) true_option_hash = true_option.save() if isinstance(true_option, HivemindOption) and true_option.valid(): if true_option_hash not in self.option_cids: self.option_cids.append(true_option_hash) options[true_option_hash] = {'value': true_option.value, 'text': true_option.text} false_option = HivemindOption() false_option.set_issue(self.hivemind_id) false_option.text = self._issue.constraints['false_value'] false_option.set(value=False) false_option_hash = false_option.save() if isinstance(false_option, HivemindOption) and false_option.valid(): if false_option_hash not in self.option_cids: self.option_cids.append(false_option_hash) options[false_option_hash] = {'value': false_option.value, 'text': false_option.text} elif 'choices' in self._issue.constraints: for choice in self._issue.constraints['choices']: if isinstance(choice, dict): option = HivemindOption() option.set_issue(self.hivemind_id) option.text = choice['text'] option.set(value=choice['value']) option_hash = option.save() if isinstance(option, HivemindOption) and option.valid(): if option_hash not in self.option_cids: self.option_cids.append(option_hash) options[option_hash] = {'value': option.value, 'text': option.text} return options
[docs] def load(self, cid: str) -> None: """Load the hivemind state from IPFS. :param cid: The IPFS multihash of the state :type cid: str :return: None """ super(HivemindState, self).load(cid=cid) self._issue = HivemindIssue(cid=self.hivemind_id) # Only initialize opinions if they don't exist if not hasattr(self, 'opinion_cids') or self.opinion_cids is None: self.opinion_cids = [{} for _ in range(len(self._issue.questions))]
[docs] def add_option(self, timestamp: int, option_hash: str, address: str = None, signature: str = None) -> None: """Add an option to the hivemind state. :param timestamp: Unix timestamp :type timestamp: int :param option_hash: The IPFS multihash of the option :type option_hash: str :param address: The address that supports the option (optional) :type address: str :param signature: The signature of the message (optional) :type signature: str :raises Exception: If the option is invalid or restrictions are not met """ if self.final is True: raise Exception('Can not add option: hivemind state is finalized') if not isinstance(self._issue, HivemindIssue): return # If we have address restrictions, require address and signature if self._issue.restrictions is not None and 'addresses' in self._issue.restrictions: if address is None or signature is None: raise Exception('Can not add option: no address or signature given') elif address not in self._issue.restrictions['addresses']: raise Exception('Can not add option: there are address restrictions on this hivemind issue and address %s is not allowed to add options' % address) # If address and signature are provided, verify the signature regardless of restrictions if address is not None and signature is not None: if not verify_message(message='%s%s' % (timestamp, option_hash), address=address, signature=signature): raise Exception('Can not add option: Signature is not valid') if self._issue.restrictions is not None and 'options_per_address' in self._issue.restrictions: number_of_options = len(self.options_by_participant(address=address)) if number_of_options >= self._issue.restrictions['options_per_address']: raise Exception('Can not add option: address %s already added too many options: %s' % (address, number_of_options)) option = self.get_option(cid=option_hash) if isinstance(option, HivemindOption) and option.valid(): if option_hash in self.option_cids: raise Exception("Option already exists") # Check if an option with the same value already exists for existing_option_cid in self.option_cids: existing_option = self.get_option(cid=existing_option_cid) if existing_option.value == option.value: raise Exception(f"Option with value '{option.value}' already exists with different text") # Only add signature if both address and signature are provided if address is not None and signature is not None: self.add_signature(address=address, timestamp=timestamp, message=option_hash, signature=signature) self.option_cids.append(option_hash) self._results = None # Invalidate cached results
[docs] def options_by_participant(self, address: str) -> List[str]: """Get the options added by a participant. :param address: The participant's address :type address: str :return: List of option CIDs :rtype: List[str] """ # Track which options were added by this address by checking signatures participant_options = [] if address in self.signatures: for option_hash in self.option_cids: # Check if this address has signed this option if option_hash in self.signatures[address]: participant_options.append(option_hash) return participant_options
[docs] def add_opinion(self, timestamp: int, opinion_hash: str, address: str, signature: str) -> None: """Add an opinion to the hivemind state. :param timestamp: Unix timestamp :type timestamp: int :param opinion_hash: The IPFS multihash of the opinion :type opinion_hash: str :param address: The address of the opinionator :type address: str :param signature: The signature of the message :type signature: str :raises Exception: If the opinion is invalid or restrictions are not met """ if self.final is True: raise Exception('Can not add opinion: hivemind state is finalized') opinion = self.get_opinion(cid=opinion_hash) if not verify_message(address=address, message='%s%s' % (timestamp, opinion_hash), signature=signature): raise Exception('Signature is invalid') # Check address restrictions if self._issue.restrictions is not None and 'addresses' in self._issue.restrictions: if address not in self._issue.restrictions['addresses']: raise Exception('Can not add opinion: there are address restrictions on this hivemind issue and address %s is not allowed to add opinions' % address) # Get the ranking as a list of options try: # For auto rankings, we need to calculate the ranking based on the options LOG.info(f"Getting ranking options with {len(self._options)} available options") ranking_options = opinion.ranking.get(options=self._options) LOG.info(f"Ranking options: {ranking_options}") except Exception as e: LOG.error(f"Error getting ranking options: {str(e)}") raise Exception(f"Error validating opinion: {str(e)}") # Check if all options in the ranking exist in the state # Strip '/ipfs/' prefix from option hashes if present for comparison normalized_ranking_options = [option_hash.replace('/ipfs/', '') for option_hash in ranking_options] normalized_state_options = [option_hash.replace('/ipfs/', '') for option_hash in self.option_cids] invalid_options = [option_hash for option_hash in normalized_ranking_options if option_hash not in normalized_state_options] if invalid_options: LOG.error(f"Invalid options found: {invalid_options}") LOG.error(f"Available options: {normalized_state_options}") raise Exception(f"Opinion is invalid: contains options that do not exist in the hivemind state: {invalid_options}") if not invalid_options: try: self.add_signature(address=address, timestamp=timestamp, message=opinion_hash, signature=signature) except Exception as ex: raise Exception('Invalid signature: %s' % ex) # Ensure we have enough dictionaries in the opinions list while len(self.opinion_cids) <= opinion.question_index: self.opinion_cids.append({}) self.opinion_cids[opinion.question_index][address] = {'opinion_cid': opinion_hash, 'timestamp': timestamp} while len(self._rankings) <= opinion.question_index: self._rankings.append({}) self._rankings[opinion.question_index][opinion_hash] = ranking_options self._results = None # Invalidate cached results
[docs] def get_weight(self, opinionator: str) -> float: """Get the weight of an opinion. :param opinionator: The participant's address :type opinionator: str :return: The weight of the opinion :rtype: float """ weight = 1.0 if self._issue.restrictions is not None and 'addresses' in self._issue.restrictions: weight = 0.0 # Default weight if there are addresses in restrictions for address in self._issue.restrictions['addresses']: if address.startswith(opinionator): weight = 1.0 # Check if the address has a weight specification (e.g., "address@2") if '@' in address: parts = address.split('@', 1) try: specified_weight = float(parts[1].strip()) if specified_weight >= 0: weight = specified_weight except ValueError: # If weight parsing fails, use default weight pass break return weight
[docs] def info(self) -> str: """Get the information of the hivemind. :return: A string containing the information of the hivemind :rtype: str """ ret = "=================================================================================" ret += '\nHivemind id: ' + self.hivemind_id ret += '\nHivemind main question: ' + self._issue.questions[0] ret += '\nHivemind description: ' + self._issue.description if self._issue.tags is not None: ret += '\nHivemind tags: ' + ' '.join(self._issue.tags) ret += '\nAnswer type: ' + self._issue.answer_type if self._issue.constraints is not None: ret += '\nOption constraints: ' + str(self._issue.constraints) ret += '\n' + "=================================================================================" ret += '\n' + self.options_info() for i, question in enumerate(self._issue.questions): ret += '\nHivemind question %s: %s' % (i, self._issue.questions[i]) ret += '\n' + self.opinions_info(question_index=i) results = self.results()[i] ret += '\n' + self.results_info(results=results, question_index=i) return ret
[docs] def options_info(self) -> str: """Get the information of the options. :return: A string containing the information of the options :rtype: str """ ret = "Options" ret += "\n=======" for i, option_hash in enumerate(self.option_cids): ret += '\nOption %s:' % (i + 1) option = self.get_option(cid=option_hash) ret += '\n' + option.info() ret += '\n' return ret
[docs] def opinions_info(self, question_index: int = 0) -> str: """Get the information of the opinions. :param question_index: The index of the question (default=0) :type question_index: int :return: A string containing the information of the opinions :rtype: str """ ret = "Opinions" ret += "\n========" # opinion_data is a list containing [opinion_hash, signature of '/ipfs/opinion_hash', timestamp] for opinionator, opinion_data in self.opinion_cids[question_index].items(): ret += '\nTimestamp: %s' % opinion_data['timestamp'] opinion = self.get_opinion(cid=opinion_data['opinion_cid']) ret += '\n' + opinion.info() ret += '\n' return ret
[docs] def results(self) -> List[Dict[str, Dict[str, float]]]: """Get the results of the hivemind. :return: The results of the hivemind :rtype: Any """ if self._results is None: self._results = [self.calculate_results(question_index=i) for i in range(len(self._issue.questions))] return self._results
[docs] def calculate_results(self, question_index: int = 0) -> Dict[str, Dict[str, float]]: """Calculate the results of the hivemind. :param question_index: Index of the question to calculate results for :type question_index: int :return: Dictionary mapping option CIDs to their scores :rtype: Dict[str, Dict[str, float]] :raises Exception: If question_index is invalid """ # if selection mode is 'Exclude', we must exclude previously selected options from the results if self._issue.on_selection == 'Exclude': available_options = [option_hash for option_hash in self.option_cids if option_hash not in self.selected] else: available_options = self.option_cids results = {option: {'win': 0, 'loss': 0, 'unknown': 0, 'score': 0} for option in available_options} for a, b in combinations(available_options, 2): for opinionator in self.opinion_cids[question_index]: winner = self.compare(a, b, self.opinion_cids[question_index][opinionator]['opinion_cid']) weight = self.get_weight(opinionator=opinionator) if winner == a: results[a]['win'] += weight results[b]['loss'] += weight elif winner == b: results[b]['win'] += weight results[a]['loss'] += weight elif winner is None: results[a]['unknown'] += weight results[b]['unknown'] += weight # Calculate scores for each option for option_id in results: if results[option_id]['win'] + results[option_id]['loss'] + results[option_id]['unknown'] > 0: results[option_id]['score'] = results[option_id]['win'] / float(results[option_id]['win'] + results[option_id]['loss'] + results[option_id]['unknown']) results_info = self.results_info(results=results, question_index=question_index) for line in results_info.split('\n'): LOG.info(line) return results
[docs] def get_score(self, option_hash: str, question_index: int = 0) -> float: """Get the score of an option. :param option_hash: The IPFS multihash of the option :type option_hash: str :param question_index: The index of the question (default=0) :type question_index: int :return: The score of the option :rtype: float """ results = self.results()[question_index] return results[option_hash.replace('/ipfs/', '')]['score']
[docs] def get_sorted_options(self, question_index: int = 0) -> List[HivemindOption]: """Get the sorted list of options. :param question_index: The index of the question (default=0) :type question_index: int :return: List of HivemindOption objects sorted by highest score :rtype: List[HivemindOption] """ results = self.results()[question_index] return [self.get_option(cid=option[0]) for option in sorted(results.items(), key=lambda x: x[1]['score'], reverse=True)]
[docs] def consensus(self, question_index: int = 0) -> Any: """Get the consensus of the hivemind. :param question_index: The index of the question (default=0) :type question_index: int :return: The consensus value :rtype: Any """ results = self.results()[question_index] sorted_options = self.get_sorted_options(question_index=question_index) if len(sorted_options) == 0: return None elif len(sorted_options) == 1: return sorted_options[0].value # Make sure the consensus is not tied between the first two options elif len(sorted_options) >= 2 and results[sorted_options[0].cid().replace('/ipfs/', '')]['score'] > results[sorted_options[1].cid().replace('/ipfs/', '')]['score']: return sorted_options[0].value else: return None
[docs] def ranked_consensus(self, question_index: int = 0) -> List[Any]: """Get the ranked consensus of the hivemind. :param question_index: The index of the question (default=0) :type question_index: int :return: List of consensus values :rtype: List[Any] """ return [option.value for option in self.get_sorted_options(question_index=question_index)]
[docs] def results_info(self, results: Dict[str, Dict[str, float]], question_index: int = 0) -> str: """Get the results information of the hivemind. :param results: Dictionary mapping option CIDs to their scores :type results: Dict[str, Dict[str, float]] :param question_index: The index of the question (default=0) :type question_index: int :return: A string containing the results information :rtype: str """ ret = 'Hivemind id: ' + self.hivemind_id + '\n' ret += self._issue.questions[question_index] ret += '\nResults:\n========' i = 0 # if selection mode is 'Exclude', we must exclude previously selected options from the results if self._issue.on_selection == 'Exclude': available_options = [option_hash for option_hash in self.option_cids if option_hash not in self.selected] else: available_options = self.option_cids for option_hash, option_result in sorted(results.items(), key=lambda x: x[1]['score'], reverse=True): if option_hash not in available_options: continue i += 1 option = self.get_option(cid=option_hash) ret += '\n%s: (%g%%) : %s' % (i, round(option_result['score'] * 100, 2), option.value) ret += '\nContributions:' ret += '\n================' for opinionator, contribution in self.contributions(results=results, question_index=question_index).items(): ret += '\n%s: %s' % (opinionator, contribution) ret += '\n================' return ret
[docs] def contributions(self, results: Dict[str, Dict[str, float]], question_index: int = 0) -> Dict[str, float]: """Get the contributions of the participants. :param results: Dictionary mapping option CIDs to their scores :type results: Dict[str, Dict[str, float]] :param question_index: The index of the question (default=0) :type question_index: int :return: Dictionary mapping participant addresses to their contributions :rtype: Dict[str, float] """ deviances = {} total_deviance = 0 multipliers = {} # sort the option hashes by highest score option_hashes_by_score = [option[0] for option in sorted(results.items(), key=lambda x: x[1]['score'], reverse=True)] # sort the opinionators by the timestamp of their opinion opinionators_by_timestamp = [opinionator for opinionator, opinion_data in sorted(self.opinion_cids[question_index].items(), key=lambda x: x[1]['timestamp'])] for i, opinionator in enumerate(opinionators_by_timestamp): deviance = 0 # Calculate the 'early bird' multiplier (whoever gives their opinion first gets the highest multiplier, value is between 0 and 1), if opinion is an empty list, then multiplier is 0 ranking = self._rankings[question_index][self.opinion_cids[question_index][opinionator]['opinion_cid']] multipliers[opinionator] = 1 - (i / float(len(opinionators_by_timestamp))) if len(ranking) > 0 else 0 # Calculate the deviance of the opinion, the closer the opinion is to the final result, the lower the deviance for j, option_hash in enumerate(option_hashes_by_score): if option_hash in ranking: deviance += abs(j - ranking.index(option_hash)) else: deviance += len(option_hashes_by_score) - j total_deviance += deviance deviances[opinionator] = deviance if total_deviance != 0: # to avoid divide by zero contributions = {opinionator: (1 - (deviances[opinionator] / float(total_deviance))) * multipliers[opinionator] for opinionator in deviances} else: # everyone has perfect opinion, but contributions should still be multiplied by the 'early bird' multiplier contributions = {opinionator: 1 * multipliers[opinionator] for opinionator in deviances} return contributions
[docs] def select_consensus(self, timestamp: int = None, address: str = None, signature: str = None) -> List[str]: """Select the consensus of the hivemind. This method selects the option with the highest consensus for each question and sets it as the selected option. If the on_selection property of the hivemind issue is set, it will perform the specified action. :param timestamp: Timestamp of the signature :type timestamp: int :param address: Bitcoin address of the signer :type address: str :param signature: Signature of the message :type signature: str :return: List of selected option CIDs :rtype: List[str] :raises ValueError: If the hivemind is already finalized :raises ValueError: If the address is not the author of the hivemind """ if self.final: raise Exception('Can not add option: hivemind issue is finalized') author = self._issue.author if author is not None: # If author is specified, verify that the address matches if not address or address != author: raise ValueError(f"Only the author ({author}) can select consensus") # Verify signature if provided if address and timestamp and signature: # Use the provided message if available, otherwise generate it message_to_verify = f"{timestamp}:select_consensus:{self.hivemind_id}" if not verify_message(message=message_to_verify, address=author, signature=signature): raise ValueError("Invalid signature") # Add the signature to the state self.add_signature( address=author, timestamp=timestamp, message=message_to_verify, signature=signature ) else: LOG.debug("Hivemind issue has no author specified") # Get the option hash with highest consensus for each question selection = [self.get_sorted_options(question_index=question_index)[0].cid().replace('/ipfs/', '') for question_index in range(len(self._issue.questions))] if self._issue.on_selection is None: return selection elif self._issue.on_selection == 'Finalize': # The hivemind is final, no more options or opinions can be added self.final = True elif self._issue.on_selection == 'Exclude': # Only add the winner of the first question to self.selected if len(selection) > 0: winner = selection[0] # Make sure we normalize the CID by removing the '/ipfs/' prefix if present winner = winner.replace('/ipfs/', '') if winner not in self.selected: self.selected.append(winner) elif self._issue.on_selection == 'Reset': # All opinions are reset self.opinion_cids = [{} for _ in range(len(self._issue.questions))] else: raise NotImplementedError('Unknown selection mode: %s' % self._issue.on_selection) self._results = None # Invalidate cached results return selection
[docs] def add_signature(self, address: str, timestamp: int, message: str, signature: str) -> None: """Add a signature to the hivemind state. :param address: The address of the participant :type address: str :param timestamp: Unix timestamp :type timestamp: int :param message: The message that was signed :type message: str :param signature: The signature of the message :type signature: str :raises Exception: If the signature is invalid :return: None """ if address in self.signatures and message in self.signatures[address]: if signature in self.signatures[address][message] and timestamp == self.signatures[address][message][signature]: return if address not in self.signatures: self.signatures[address] = {message: {signature: timestamp}} elif message not in self.signatures[address]: self.signatures[address].update({message: {signature: timestamp}}) else: timestamps = [int(key) for key in self.signatures[address][message].values()] if timestamp > max(timestamps): self.signatures[address][message][signature] = timestamp else: raise Exception('Invalid timestamp: must be more recent than any previous signature timestamp')
[docs] def update_participant_name(self, timestamp: int, name: str, address: str, signature: str, message: str) -> None: """Update the name of a participant. :param timestamp: Unix timestamp :type timestamp: int :param name: The new name of the participant :type name: str :param address: The address of the participant :type address: str :param signature: The signature of the message :type signature: str :param message: The message that was signed :type message: str :raises Exception: If the signature is invalid or name exceeds maximum length :return: None """ if self.final is True: raise Exception('Can not update participant name: hivemind state is finalized') # Check if name exceeds maximum length max_name_length = 50 if len(name) > max_name_length: raise Exception(f'Name exceeds maximum length of {max_name_length} characters') # Only need to update name if it is not known yet or if it has changed if address not in self.participants or name != self.participants[address]['name']: if verify_message(address=address, message=message, signature=signature) is True: # First try to add the signature, if the timestamp is not the most recent it will throw an exception # This is to prevent a reused signature attack try: self.add_signature(address=address, timestamp=timestamp, message=name, signature=signature) except Exception as ex: raise Exception('%s' % ex) self.participants[address] = {'name': name} else: raise Exception('Invalid signature')
[docs] def compare(self, a: str, b: str, opinion_hash: str) -> str | None: """Helper function to compare 2 Option objects against each other based on a given Opinion. :param a: The first Option object CID :type a: str :param b: The second Option object CID :type b: str :param opinion_hash: The Opinion object CID :type opinion_hash: str :return: The Option CID that is considered better by the Opinion, or None if both options are not in the Opinion :rtype: str | None """ opinion = self.get_opinion(cid=opinion_hash) ranked_choice = self._rankings[opinion.question_index][opinion_hash] if a in ranked_choice and b in ranked_choice: if ranked_choice.index(a) < ranked_choice.index(b): return a elif ranked_choice.index(a) > ranked_choice.index(b): return b elif a in ranked_choice: return a elif b in ranked_choice: return b else: return None
[docs] def get_option(self, cid: str) -> HivemindOption: """Get an option by its CID. :param cid: The IPFS multihash of the option :type cid: str :return: The option object :rtype: HivemindOption """ # Check if the option is already in the state for option in self._options: if cid.replace('/ipfs/', '') in option.cid(): return option return HivemindOption(cid=cid)
[docs] def get_opinion(self, cid: str) -> HivemindOpinion: """Get an opinion by its CID. :param cid: The IPFS multihash of the opinion :type cid: str :return: The opinion object :rtype: HivemindOpinion """ # Check if the opinion is already in the state for question_index in range(len(self._opinions)): for opinion in self._opinions[question_index]: if cid.replace('/ipfs/', '') in opinion.cid(): return opinion return HivemindOpinion(cid=cid)