Source code for hivemind.option

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from typing import Any, Dict
import re
import logging
from ipfs_dict_chain.IPFSDict import IPFSDict
from .validators import valid_address, valid_bech32_address
from .issue import HivemindIssue

LOG = logging.getLogger(__name__)


[docs] class HivemindOption(IPFSDict): """A class representing a voting option in the Hivemind protocol. This class handles the creation and validation of voting options, supporting various types of answers including strings, booleans, integers, floats, and complex types. :ivar value: The value of the option :vartype value: str | bool | int | float | Dict[str, Any] | None :ivar text: Additional text description of the option :vartype text: str :ivar _hivemind_issue: The associated hivemind issue :vartype _hivemind_issue: HivemindIssue | None :ivar _answer_type: Type of the answer ('String', 'Bool', 'Integer', etc.) :vartype _answer_type: str :ivar hivemind_id: The IPFS hash of the associated hivemind issue :vartype hivemind_id: str | None """
[docs] def __init__(self, cid: str | None = None) -> None: """Initialize a new HivemindOption. :param cid: The IPFS multihash of the Option :type cid: str | None :return: None """ self.value: str | bool | int | float | Dict[str, Any] | None = None self.text: str = '' self._hivemind_issue: HivemindIssue | None = None self._answer_type: str = 'String' self.hivemind_id: str | None = None super().__init__(cid=cid) # base method will call the load method
[docs] def cid(self) -> str | None: """Get the IPFS CID of this option. :return: The IPFS CID :rtype: str | None """ return self._cid
[docs] def load(self, cid: str) -> None: """Load the option from IPFS. :param cid: The IPFS multihash to load :type cid: str :return: None """ super().load(cid=cid) if self.hivemind_id: self.set_issue(hivemind_issue_cid=self.hivemind_id)
[docs] def set_issue(self, hivemind_issue_cid: str) -> None: """Set the hivemind issue for this option. :param hivemind_issue_cid: The IPFS hash of the hivemind issue :type hivemind_issue_cid: str :return: None """ self.hivemind_id = hivemind_issue_cid issue = HivemindIssue(cid=hivemind_issue_cid) self._hivemind_issue = issue self._answer_type = issue.answer_type
[docs] def set(self, value: str | bool | int | float | Dict[str, Any]) -> None: """Set the value of this option. :param value: The value to set :type value: str | bool | int | float | Dict[str, Any] :raises Exception: If the value is invalid for the answer type :return: None """ self.value = value if not self.valid(): raise Exception('Invalid value for answer type %s: %s' % (self._answer_type, value))
[docs] def valid(self) -> bool: """Check if the option is valid according to its type and constraints. :return: True if valid, False otherwise :rtype: bool :raises Exception: If no hivemind issue is set or if constraints are violated """ if not isinstance(self._hivemind_issue, HivemindIssue): raise Exception('No hivemind question set on option yet! Must set the hivemind question first before setting the value!') if self._answer_type != self._hivemind_issue.answer_type: LOG.error('Option value is not the correct answer type, got %s but should be %s' % (self._answer_type, self._hivemind_issue.answer_type)) return False if self._hivemind_issue.constraints is not None and 'choices' in self._hivemind_issue.constraints: valid_choice = False for choice in self._hivemind_issue.constraints['choices']: if choice.get("value", None) == self.value: valid_choice = True if not valid_choice: LOG.error('Option %s is not valid because this it is not in the allowed choices of this hiveminds constraints!' % self.value) raise Exception('Option %s is not valid because this it is not in the allowed choices of this hiveminds constraints!' % self.value) if self._answer_type == 'String' and self.is_valid_string_option(): return True elif self._answer_type == 'Bool' and self.is_valid_bool_option(): return True elif self._answer_type == 'Integer' and self.is_valid_integer_option(): return True elif self._answer_type == 'Float' and self.is_valid_float_option(): return True elif self._answer_type == 'Hivemind' and self.is_valid_hivemind_option(): return True elif self._answer_type == 'File' and self.is_valid_file_option(): return True elif self._answer_type == 'Complex' and self.is_valid_complex_option(): return True elif self._answer_type == 'Address' and self.is_valid_address_option(): return True else: return False
[docs] def is_valid_string_option(self) -> bool: """Check if the option is a valid string option. :return: True if valid, False otherwise :rtype: bool """ if not isinstance(self.value, str): return False if self._hivemind_issue.constraints is not None: if 'min_length' in self._hivemind_issue.constraints and len(self.value) < self._hivemind_issue.constraints['min_length']: return False elif 'max_length' in self._hivemind_issue.constraints and len(self.value) > self._hivemind_issue.constraints['max_length']: return False elif 'regex' in self._hivemind_issue.constraints and re.match(pattern=self._hivemind_issue.constraints['regex'], string=self.value) is None: return False return True
[docs] def is_valid_float_option(self) -> bool: """Check if the option is a valid float option. :return: True if valid, False otherwise :rtype: bool """ if not isinstance(self.value, float): LOG.error('Option value %s is not a floating number value but instead is a %s' % (self.value, type(self.value))) return False if self._hivemind_issue.constraints is not None: if 'min_value' in self._hivemind_issue.constraints and self.value < self._hivemind_issue.constraints['min_value']: LOG.error('Option value is below minimum value: %s < %s' % (self.value, self._hivemind_issue.constraints['min_value'])) return False elif 'max_value' in self._hivemind_issue.constraints and self.value > self._hivemind_issue.constraints['max_value']: LOG.error('Option value is above maximum value: %s > %s' % (self.value, self._hivemind_issue.constraints['max_value'])) return False elif 'decimals' in self._hivemind_issue.constraints: decimals = self._hivemind_issue.constraints['decimals'] # Convert to string with required number of decimals in case the number has trailing zeros value_as_string = f"{self.value:.{decimals}f}" if float(value_as_string) != self.value: LOG.error('Option value does not have the correct number of decimals (%s): %s' % (self._hivemind_issue.constraints['decimals'], self.value)) return False return True
[docs] def is_valid_integer_option(self) -> bool: """Check if the option is a valid integer option. :return: True if valid, False otherwise :rtype: bool """ if not isinstance(self.value, int): LOG.error('Option value %s is not a integer value but instead is a %s' % (self.value, type(self.value))) return False if self._hivemind_issue.constraints is not None: if 'min_value' in self._hivemind_issue.constraints and self.value < self._hivemind_issue.constraints['min_value']: LOG.error('Option value is below minimum value: %s < %s' % (self.value, self._hivemind_issue.constraints['min_value'])) return False elif 'max_value' in self._hivemind_issue.constraints and self.value > self._hivemind_issue.constraints['max_value']: LOG.error('Option value is above maximum value: %s > %s' % (self.value, self._hivemind_issue.constraints['max_value'])) return False return True
[docs] def is_valid_bool_option(self) -> bool: """Check if the option is a valid boolean option. :return: True if valid, False otherwise :rtype: bool """ if not isinstance(self.value, bool): LOG.error('Option value %s is not a boolean value but instead is a %s' % (self.value, type(self.value))) return False # Validate that the text matches the constraints if self._hivemind_issue.constraints is not None: if 'true_value' in self._hivemind_issue.constraints and self.value is True: expected_text = self._hivemind_issue.constraints['true_value'] if self.text != expected_text: LOG.error('Bool option text for True value must match the true_value constraint: %s, got: %s' % (expected_text, self.text)) return False elif 'false_value' in self._hivemind_issue.constraints and self.value is False: expected_text = self._hivemind_issue.constraints['false_value'] if self.text != expected_text: LOG.error('Bool option text for False value must match the false_value constraint: %s, got: %s' % (expected_text, self.text)) return False return True
[docs] def is_valid_hivemind_option(self) -> bool: """Check if the option is a valid hivemind option. :return: True if valid, False otherwise :rtype: bool """ try: isinstance(HivemindIssue(cid=self.value), HivemindIssue) except Exception as ex: LOG.error('IPFS hash %s is not a valid hivemind: %s' % (self.value, ex)) return False return True
[docs] def is_valid_file_option(self) -> bool: """Check if the option is a valid file option. :return: True if valid, False otherwise :rtype: bool """ if not isinstance(self.value, str): LOG.error('Option value %s is not a string value but instead is a %s' % (self.value, type(self.value))) return False # Check if it's a valid IPFS hash format if not self._is_valid_ipfs_hash(self.value): LOG.error('Option value %s is not a valid IPFS hash' % self.value) return False return True
[docs] @staticmethod def _is_valid_ipfs_hash(hash_str: str) -> bool: """Check if a string is a valid IPFS hash. :param hash_str: The string to check :type hash_str: str :return: True if valid, False otherwise :rtype: bool """ # IPFS CIDv0 starts with "Qm" and is 46 characters long if hash_str.startswith('Qm') and len(hash_str) == 46: # Check if it only contains valid base58 characters valid_chars = set('123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz') return all(c in valid_chars for c in hash_str) # IPFS CIDv1 validation (more complex, would need a full implementation) # For now, we'll just check if it starts with 'b' or 'B' followed by valid base32 characters elif (hash_str.startswith('b') or hash_str.startswith('B')) and len(hash_str) > 1: # Simplified check for base32 characters valid_chars = set('ABCDEFGHIJKLMNOPQRSTUVWXYZ234567abcdefghijklmnopqrstuvwxyz') return all(c in valid_chars for c in hash_str[1:]) return False
[docs] def is_valid_complex_option(self) -> bool: """Check if the option is a valid complex option according to the specifications in the constraints. :return: True if valid, False otherwise :rtype: bool """ if not isinstance(self.value, dict): LOG.error('Option value %s is not a dictionary but instead is a %s' % (self.value, type(self.value))) return False # If there are no specs in the constraints, any dictionary is valid if 'specs' not in self._hivemind_issue.constraints: return True specs = self._hivemind_issue.constraints['specs'] # Check if the option has all the fields specified in the constraints for spec_key in specs: if spec_key not in self.value: LOG.error('Required field %s missing from option value' % spec_key) return False # Check if the option has any fields not specified in the constraints for value_key in self.value: if value_key not in specs: LOG.error('Unexpected field %s in option value' % value_key) return False # Check if the types of the fields match the specs for spec_key, spec_value in self.value.items(): if specs[spec_key] == 'String' and not isinstance(spec_value, str): LOG.error('Field %s should be String but is %s' % (spec_key, type(spec_value).__name__)) return False elif specs[spec_key] == 'Integer' and not isinstance(spec_value, int): LOG.error('Field %s should be Integer but is %s' % (spec_key, type(spec_value).__name__)) return False elif specs[spec_key] == 'Float' and not isinstance(spec_value, float): LOG.error('Field %s should be Float but is %s' % (spec_key, type(spec_value).__name__)) return False elif specs[spec_key] == 'Bool' and not isinstance(spec_value, bool): LOG.error('Field %s should be Bool but is %s' % (spec_key, type(spec_value).__name__)) return False return True
[docs] def is_valid_address_option(self) -> bool: """Check if the option is a valid address option. :return: True if valid, False otherwise :rtype: bool """ return valid_address(self.value) or valid_bech32_address(self.value)
[docs] def info(self) -> str: """Get information about the option. :return: A string containing formatted information about the option :rtype: str """ info = f'Option cid: {self.cid}\n' info += f'Answer type: {self._answer_type}\n' info += f'Value: {self.value}\n' if self.text: info += f'Text: {self.text}\n' return info
def __repr__(self) -> str: """Return a string representation of the option. :return: The IPFS CID of the option without the '/ipfs/' prefix :rtype: str """ return self._cid.replace('/ipfs/', '')
[docs] def get_answer_type(self) -> str: """Get the answer type of the option. :return: The answer type :rtype: str """ return self._answer_type