#!/usr/bin/env python
# -*- coding: utf-8 -*-
from typing import List, Dict
from ipfs_dict_chain.IPFSDict import IPFSDict
[docs]
class HivemindIssue(IPFSDict):
"""A class representing a voting issue in the Hivemind protocol.
This class handles the creation and management of voting issues, including
questions, constraints, and restrictions on who can vote.
:ivar questions: List of questions associated with this issue
:vartype questions: List[str]
:ivar name: Name of the issue
:vartype name: str | None
:ivar description: Description of the issue
:vartype description: str
:ivar tags: List of tags associated with this issue
:vartype tags: List[str]
:ivar answer_type: Type of answer expected ('String', 'Integer', 'Float', 'Bool', 'Hivemind', 'File', 'Complex', 'Address')
:vartype answer_type: str
:ivar constraints: Constraints on voting
:vartype constraints: Dict[str, str | int | float | list] | None
:ivar restrictions: Restrictions on who can vote
:vartype restrictions: Dict[str, List[str] | int] | None
:ivar on_selection: Action to take when an option is selected
:vartype on_selection: str | None
:ivar author: Bitcoin address of the author who can finalize the hivemind
:vartype author: str | None
"""
[docs]
def __init__(self, cid: str | None = None) -> None:
"""Initialize a new HivemindIssue.
:param cid: The IPFS multihash of the hivemind issue
:type cid: str | None
:return: None
"""
self.questions: List[str] = []
self.name: str | None = None
self.description: str = ''
self.tags: List[str] = []
self.answer_type: str = 'String'
self.constraints: Dict[str, str | int | float | list] | None = None
self.restrictions: Dict[str, List[str] | int] | None = None
# What happens when an option is selected: valid values are None, Finalize, Exclude, Reset
# None : nothing happens
# Finalize : Hivemind is finalized, no new options or opinions can be added anymore
# Exclude : The selected option is excluded from the results
# Reset : All opinions are reset
self.on_selection: str | None = None
# Bitcoin address of the author who can finalize the hivemind
self.author: str | None = None
super().__init__(cid=cid)
[docs]
def add_question(self, question: str) -> None:
"""Add a question to the hivemind issue.
:param question: The question text to add
:type question: str
:return: None
:raises ValueError: If question is invalid or already exists
"""
if isinstance(question, str) and question not in self.questions:
self.questions.append(question)
[docs]
def set_constraints(self, constraints: Dict[str, str | int | float | list] | None) -> None:
"""Set constraints for the hivemind issue.
Constraints can include various limitations on the answers, such as:
- min_length/max_length: For string answers
- min_value/max_value: For numeric answers
- decimals: For float answers
- regex: For string pattern validation
- true_value/false_value: For boolean answers
- specs: For complex answer types
- choices: For predefined answer options
- block_height: For blockchain-related constraints
- filetype: For file answer types
:param constraints: Dictionary of constraints
:type constraints: Dict[str, str | int | float | list] | None
:return: None
:raises Exception: If constraints are invalid
"""
if constraints is None:
self.constraints = None
return
if not isinstance(constraints, dict):
raise Exception('constraints must be a dict, got %s' % type(constraints))
if 'specs' in constraints:
specs = constraints['specs']
if not isinstance(constraints['specs'], dict):
raise Exception('constraint "specs" must be a dict, got %s' % type(specs))
for key in specs:
if specs[key] not in ['String', 'Integer', 'Float', 'Bool']:
raise Exception('Spec type must be String, Integer, Float, or Bool, got %s' % specs[key])
for constraint_type in ['min_length', 'max_length', 'min_value', 'max_value', 'decimals']:
if constraint_type in constraints and not isinstance(constraints[constraint_type], (int, float)):
raise Exception('Value of constraint %s must be a number' % constraint_type)
for constraint_type in ['regex', 'true_value', 'false_value', 'filetype']:
if constraint_type in constraints and not isinstance(constraints[constraint_type], str):
raise Exception('Value of constraint %s must be a string' % constraint_type)
for constraint_type in ['choices']:
if constraint_type in constraints and not isinstance(constraints[constraint_type], list):
raise Exception('Value of constraint %s must be a list' % constraint_type)
for constraint_type in ['block_height']:
if constraint_type in constraints and not isinstance(constraints[constraint_type], int):
raise Exception('Value of constraint %s must be a integer' % constraint_type)
# Updated list of valid constraint keys
valid_constraints = [
'min_length', 'max_length', 'min_value', 'max_value', 'decimals',
'regex', 'true_value', 'false_value', 'specs', 'choices', 'block_height',
'filetype'
]
if all([key in valid_constraints for key in constraints.keys()]):
self.constraints = constraints
else:
raise Exception('constraints contain an invalid key: %s' % constraints)
[docs]
def set_restrictions(self, restrictions: Dict[str, List[str] | int] | None) -> None:
"""Set voting restrictions for the hivemind issue.
Restrictions can include:
- addresses: List of Bitcoin addresses allowed to vote
- options_per_address: Maximum number of options each address can submit
:param restrictions: Dictionary of restrictions
:type restrictions: Dict[str, List[str] | int] | None
:return: None
:raises Exception: If restrictions are invalid
"""
if restrictions is None:
self.restrictions = None
return
if not isinstance(restrictions, dict):
raise Exception('Restrictions is not a dict or None, got %s instead' % type(restrictions))
for key in restrictions.keys():
if key not in ['addresses', 'options_per_address']:
raise Exception('Invalid key in restrictions: %s' % key)
if 'addresses' in restrictions:
if not isinstance(restrictions['addresses'], list):
raise Exception('addresses in restrictions must be a list, got %s instead' % type(restrictions['addresses']))
for address in restrictions['addresses']:
if not isinstance(address, str):
raise Exception('Address %s in restrictions is not a string!' % address)
if 'options_per_address' in restrictions:
if not isinstance(restrictions['options_per_address'], int) or restrictions['options_per_address'] < 1:
raise Exception('options_per_address in restrictions must be a positive integer')
self.restrictions = restrictions
[docs]
def save(self) -> str:
"""Save the hivemind issue to IPFS.
Validates the issue before saving to ensure it meets all requirements.
:return: The IPFS hash of the saved issue
:rtype: str
:raises Exception: If the issue is invalid
"""
try:
self.valid()
except Exception as ex:
raise Exception('Error: %s' % ex)
else:
return super(HivemindIssue, self).save()
[docs]
def valid(self) -> bool:
"""Check if the hivemind issue is valid.
Validates all properties of the issue including:
- name: Must be a non-empty string ≤ 50 characters
- description: Must be a string ≤ 5000 characters
- tags: Must be a list of unique strings without spaces, each ≤ 20 characters
- questions: Must be a non-empty list of unique strings, each ≤ 255 characters
- answer_type: Must be one of the allowed types
- on_selection: Must be one of the allowed values
:return: True if valid, raises exception otherwise
:rtype: bool
:raises Exception: If any validation fails
"""
# Name must be a string, not empty and not longer than 50 characters
if not isinstance(self.name, str) or not (0 < len(self.name) <= 50):
raise Exception('Invalid name for Hivemind Issue: %s' % self.name)
# Description must be a string, not longer than 5000 characters
if not (isinstance(self.description, str) and len(self.description) <= 5000):
raise Exception('Invalid description for Hivemind Issue: %s' % self.description)
# Tags must be a list of strings, each tag can not contain spaces and can not be empty or longer than 20 characters
if not (isinstance(self.tags, list) and all([isinstance(tag, str) and ' ' not in tag and 0 < len(tag) <= 20 and self.tags.count(tag) == 1 for tag in self.tags])):
raise Exception('Invalid tags for Hivemind Issue: %s' % self.tags)
# Questions must be a list of strings, each question can not be empty or longer than 255 characters and must be unique
if not (isinstance(self.questions, list) and all([isinstance(question, str) and 0 < len(question) <= 255 and self.questions.count(question) == 1 for question in self.questions])):
raise Exception('Invalid questions for Hivemind Issue: %s' % self.questions)
if len(self.questions) == 0:
raise Exception('There must be at least 1 question in the Hivemind Issue.')
# Answer_type must in allowed values
if self.answer_type not in ['String', 'Bool', 'Integer', 'Float', 'Hivemind', 'File', 'Complex', 'Address']:
raise Exception('Invalid answer_type for Hivemind Issue: %s' % self.answer_type)
# On_selection must be in allowed values
if self.on_selection not in [None, 'Finalize', 'Exclude', 'Reset']:
raise Exception('Invalid on_selection for Hivemind Issue: %s' % self.on_selection)
return True
[docs]
def get_identification_cid(self, name: str) -> str:
"""Get the identification CID so that a participant can self-identify for this issue.
Creates an IPFS dictionary containing the hivemind ID and participant name,
then saves it to IPFS and returns the resulting CID.
:param name: The name of the participant
:type name: str
:return: The identification CID
:rtype: str
"""
data = IPFSDict()
data['hivemind_id'] = self.cid().replace('/ipfs/', '')
data['name'] = name
cid = data.save()
return cid