import asyncio
import json
import aioipfs
from multiaddr import Multiaddr
from typing import Dict
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 5001
multi_address = Multiaddr(f'/ip4/{DEFAULT_HOST}/tcp/{DEFAULT_PORT}')
[docs]
def connect(host: str, port: int) -> None:
"""Connect to an IPFS daemon.
:param host: The host of the IPFS daemon.
:type host: str
:param port: The port of the IPFS daemon.
:type port: int
:raises IPFSError: If the connection to the IPFS daemon fails.
"""
global multi_address
multi_address = Multiaddr(f'/ip4/{host}/tcp/{port}')
try:
_ = add_json(data={'key': 'value'})
except Exception as e:
raise IPFSError(f'Failed to connect to IPFS daemon at {multi_address}: {e}')
[docs]
class IPFSError(Exception):
"""Custom exception for IPFS-related errors."""
pass
[docs]
class IPFSCache:
"""A simple cache for IPFS data."""
def __init__(self):
self._cache = {}
[docs]
def get(self, cid: str) -> Dict:
"""Retrieve data from the cache by its Content Identifier (CID).
:param cid: The Content Identifier (CID) of the data in the cache.
:type cid: str
:return: The data retrieved from the cache.
:rtype: Dict
"""
return self._cache.get(cid)
[docs]
def set(self, cid: str, data: Dict) -> None:
"""Store data in the cache with its Content Identifier (CID).
:param cid: The Content Identifier (CID) of the data.
:type cid: str
:param data: The data to be stored in the cache.
:type data: Dict
"""
self._cache[cid] = data
ipfs_cache = IPFSCache()
[docs]
async def get_file_content(cid: str) -> str:
"""Retrieve the content of a file from IPFS by its Content Identifier (CID).
:param cid: The Content Identifier (CID) of the file in IPFS.
:type cid: str
:return: The content of the file.
:rtype: str
"""
client = aioipfs.AsyncIPFS(maddr=multi_address)
content = await client.cat(cid)
await client.close()
return content.decode()
[docs]
async def _add_json(data: Dict) -> str:
"""Add JSON data to IPFS and return its Content Identifier (CID).
:param data: The JSON data to be added to IPFS.
:type data: Dict
:return: The Content Identifier (CID) of the added JSON data.
:rtype: str
"""
client = aioipfs.AsyncIPFS(maddr=multi_address)
try:
response = await client.add_json(data=data)
except Exception as e:
raise IPFSError(f'Failed to add JSON data to IPFS: {e}')
finally:
await client.close()
return response.get('Hash', None)
[docs]
async def _get_json(cid: str) -> Dict:
"""Retrieve JSON data from IPFS by its Content Identifier (CID) and cache the result.
:param cid: The Content Identifier (CID) of the JSON data in IPFS.
:type cid: str
:return: The JSON data retrieved from IPFS.
:rtype: Dict
"""
cached_data = ipfs_cache.get(cid)
if cached_data:
return cached_data
try:
data = await get_file_content(cid=cid)
except Exception as e:
raise IPFSError(f'Failed to retrieve json data from IPFS hash {cid}: {e}')
try:
json_data = json.loads(data)
except Exception as e:
raise IPFSError(f'Failed to parse json data from IPFS hash {cid}: {e}')
ipfs_cache.set(cid, json_data)
return json_data
[docs]
def add_json(data: Dict) -> str:
"""Add JSON data to IPFS and return its Content Identifier (CID) using a synchronous wrapper.
:param data: The JSON data to be added to IPFS.
:type data: Dict
:return: The Content Identifier (CID) of the added JSON data.
:rtype: str
"""
event_loop = asyncio.new_event_loop()
cid = event_loop.run_until_complete(_add_json(data=data))
event_loop.close()
return cid
[docs]
def get_json(cid: str) -> Dict:
"""Retrieve JSON data from IPFS by its Content Identifier (CID) using a synchronous wrapper.
:param cid: The Content Identifier (CID) of the JSON data in IPFS.
:type cid: str
:return: The JSON data retrieved from IPFS.
:rtype: Dict
"""
cached_data = ipfs_cache.get(cid)
if cached_data:
return cached_data
event_loop = asyncio.new_event_loop()
json_data = event_loop.run_until_complete(_get_json(cid=cid))
event_loop.close()
return json_data