Source code for ipfs_dict_chain.IPFS

"""IPFS client utilities for adding and retrieving JSON data."""

import asyncio
import atexit
import json
import time
import aioipfs
from multiaddr import Multiaddr
from typing import Dict, Optional, Tuple

_loop = None

[docs] def _get_loop() -> asyncio.AbstractEventLoop: """Return the global asyncio event loop, creating it if necessary. If the global event loop is ``None`` or has been closed, a new event loop is created and set as the current event loop for the thread. :returns: The global asyncio event loop. :rtype: asyncio.AbstractEventLoop """ global _loop if _loop is None or _loop.is_closed(): _loop = asyncio.new_event_loop() asyncio.set_event_loop(_loop) return _loop
[docs] @atexit.register def _close_loop() -> None: """Close the global asyncio event loop at interpreter shutdown. This function is registered with :func:`atexit.register` so that the event loop is properly cleaned up when the Python interpreter exits. If the loop is already closed or was never created, no action is taken. """ global _loop if _loop is not None and not _loop.is_closed(): # pragma: no cover _loop.close() # pragma: no cover _loop = None # pragma: no cover
DEFAULT_HOST = '127.0.0.1' DEFAULT_PORT = 5001 multi_address = Multiaddr(f'/ip4/{DEFAULT_HOST}/tcp/{DEFAULT_PORT}')
[docs] def connect(host: str, port: int) -> None: """Connect to an IPFS daemon. :param host: The host of the IPFS daemon. :type host: str :param port: The port of the IPFS daemon. :type port: int :raises IPFSError: If the connection to the IPFS daemon fails. """ global multi_address multi_address = Multiaddr(f'/ip4/{host}/tcp/{port}') try: _ = _get_loop().run_until_complete(_test_connection()) except Exception as e: raise IPFSError(f'Failed to connect to IPFS daemon at {multi_address}: {e}')
[docs] class IPFSError(Exception): """Custom exception for IPFS-related errors.""" pass
[docs] class IPFSCache: """An in-memory cache for IPFS data with TTL-based expiration. Entries are stored with an expiry timestamp. When an entry is retrieved, it is checked for expiration; expired entries are automatically removed. The TTL can be configured via the ``ttl`` parameter (default 300 seconds). """
[docs] def __init__(self, ttl: int = 300) -> None: """Initialize an empty IPFS cache. :param ttl: Time-to-live for cache entries in seconds. Defaults to 300. :type ttl: int """ self._cache: Dict[str, Tuple[Dict, float]] = {} self._ttl: int = ttl
[docs] def get(self, cid: str) -> Optional[Dict]: """Retrieve data from the cache by its Content Identifier (CID). Expired entries are removed and ``None`` is returned. :param cid: The Content Identifier (CID) of the data in the cache. :type cid: str :return: The data retrieved from the cache, or ``None`` if not found or expired. :rtype: Optional[Dict] """ entry = self._cache.get(cid) if entry is None: return None data, expiry = entry if time.time() > expiry: del self._cache[cid] return None return data
[docs] def set(self, cid: str, data: Dict) -> None: """Store data in the cache with its Content Identifier (CID). The entry will expire after the configured TTL. :param cid: The Content Identifier (CID) of the data. :type cid: str :param data: The data to be stored in the cache. :type data: Dict """ self._cache[cid] = (data, time.time() + self._ttl)
[docs] def clear(self) -> None: """Remove all entries from the cache.""" self._cache.clear()
[docs] def cleanup(self) -> None: """Remove all expired entries from the cache.""" now = time.time() expired = [cid for cid, (_, expiry) in self._cache.items() if now > expiry] for cid in expired: del self._cache[cid]
ipfs_cache = IPFSCache()
[docs] async def get_file_content(cid: str) -> str: """Retrieve the content of a file from IPFS by its Content Identifier (CID). :param cid: The Content Identifier (CID) of the file in IPFS. :type cid: str :return: The content of the file. :rtype: str """ client = aioipfs.AsyncIPFS(maddr=multi_address) try: content = await client.cat(cid) return content.decode() finally: await client.close()
[docs] async def _add_json(data: Dict) -> str: """Add JSON data to IPFS and return its Content Identifier (CID). :param data: The JSON data to be added to IPFS. :type data: Dict :return: The Content Identifier (CID) of the added JSON data. :rtype: str """ client = aioipfs.AsyncIPFS(maddr=multi_address) try: response = await client.add_json(data=data) except Exception as e: raise IPFSError(f'Failed to add JSON data to IPFS: {e}') finally: await client.close() cid = response.get('Hash') if cid is None: raise IPFSError('IPFS response did not contain a Hash/CID') return cid
[docs] async def _get_json(cid: str) -> Dict: """Retrieve JSON data from IPFS by its Content Identifier (CID) and cache the result. :param cid: The Content Identifier (CID) of the JSON data in IPFS. :type cid: str :return: The JSON data retrieved from IPFS. :rtype: Dict """ cached_data = ipfs_cache.get(cid) if cached_data: return cached_data try: data = await get_file_content(cid=cid) except Exception as e: raise IPFSError(f'Failed to retrieve json data from IPFS hash {cid}: {e}') try: json_data = json.loads(data) except Exception as e: raise IPFSError(f'Failed to parse json data from IPFS hash {cid}: {e}') ipfs_cache.set(cid, json_data) return json_data
[docs] async def _test_connection() -> bool: """Test the connection to the IPFS daemon using a read-only operation. :return: True if the connection is successful. :rtype: bool :raises IPFSError: If the connection test fails. """ client = aioipfs.AsyncIPFS(maddr=multi_address) try: await client.id() except Exception as e: raise IPFSError(f'Failed to connect to IPFS daemon at {multi_address}: {e}') finally: await client.close() return True
[docs] def add_json(data: Dict) -> str: """Add JSON data to IPFS and return its Content Identifier (CID) using a synchronous wrapper. :param data: The JSON data to be added to IPFS. :type data: Dict :return: The Content Identifier (CID) of the added JSON data. :rtype: str """ return _get_loop().run_until_complete(_add_json(data=data))
[docs] def get_json(cid: str) -> Dict: """Retrieve JSON data from IPFS by its Content Identifier (CID) using a synchronous wrapper. :param cid: The Content Identifier (CID) of the JSON data in IPFS. :type cid: str :return: The JSON data retrieved from IPFS. :rtype: Dict """ return _get_loop().run_until_complete(_get_json(cid=cid))