Source code for ztcli_api

A Python async client wrapper for the zerotier-cli node API.

import asyncio
import json
import logging

import aiohttp
import async_timeout

from ._version import __version__

__all__ = [




[docs]class ZeroTierError(Exception): """General ZeroTierError exception occurred."""
[docs]class ZeroTierConnectionError(ZeroTierError): """Raise when a connection error is encountered."""
[docs]class ZeroTierNoDataAvailable(ZeroTierError): """Raise when no data is available."""
[docs]class ZeroTier: """ Async class to create a ZeroTier-cli connection object to get, set, and delete endpoint-specific data. Writable items for each endpoint are exported. """ def __init__(self, api_token, loop, session, port=9993): """Initialize the connection.""" self._loop = loop self._session = session self.headers = {'X-ZT1-Auth': api_token} = None self.url = f'localhost:{port}'
[docs] async def get_data(self, endpoint): """Send a GET request to JSON API ``endpoint``.""" try: with async_timeout.timeout(5): response = await self._session.get( f'http://{self.url}/{endpoint}', headers=self.headers ) logging.debug("Response status: %s", response.status) = await response.json() except (asyncio.TimeoutError, aiohttp.ClientError) as exc: logging.debug("Cannot load data from ZeroTier node") raise ZeroTierConnectionError('Cannot connect to ZeroTier API') from exc
[docs] async def set_value(self, cfg_dict, endpoint): """Send a POST request to JSON API ``endpoint``.""" payload = json.dumps(cfg_dict, separators=(',', ':')) logging.debug("Using payload: %s", payload) try: with async_timeout.timeout(5): response = await f'http://{self.url}/{endpoint}', headers=self.headers, data=payload, ) logging.debug("Response status: %s", response.status) = await response.json() except (asyncio.TimeoutError, aiohttp.ClientError) as exc: logging.debug("Cannot update entry of ZeroTier node") raise ZeroTierConnectionError('Cannot connect to ZeroTier API') from exc
[docs] async def delete_thing(self, endpoint): """Send a DELETE request to JSON API ``endpoint``.""" try: with async_timeout.timeout(5): response = await self._session.delete( f'http://{self.url}/{endpoint}', headers=self.headers ) logging.debug("Response status: %s", response.status) = await response.json() except (asyncio.TimeoutError, aiohttp.ClientError) as exc: logging.debug("Cannot delete entry from ZeroTier node") raise ZeroTierConnectionError('Cannot connect to ZeroTier API') from exc