Source code for jitxlib.parts.commands

"""
Commands for interacting with the JITX parts database.

This module provides functions for communicating with the JITX parts database.
"""

import asyncio
import json
import logging
from collections.abc import Sequence, Mapping
from typing import Any, TypeAlias
from jitx._websocket import PersistentWebSocketClient, Message
from jitx._instantiation import InstantiationStructureException
from jitx.design import DesignContext
from jitx.units import PlainQuantity

JSON: TypeAlias = (
    Mapping[str, "JSON"] | Sequence["JSON"] | str | int | float | bool | None
)
# PartJSON must be 'dict', not 'Mapping' in order to match the input type of `from_dict` in dataclasses_json/api.py
# so that type checking passes for the `from_dict` calls.
PartJSON = dict[str, JSON]
QueryParamValue: TypeAlias = float | int | str | Sequence[float | int | str]

# Configure logging
logging = logging.getLogger(__name__)

client = None


[docs] async def get_client(): # FIXME: keepalive feature not working: `ERROR:jitxlib.parts.commands:download_model_3d failed: Socket not connected` # global client # if client is None: # client = await PersistentWebSocketClient.create() # # client.__keepalive = True # client.keepalive() # return client return await PersistentWebSocketClient.create()
async def _send_message(type: str, body: Mapping[str, Any]) -> Any: conversation = None success = False client = await get_client() try: # Create route handler and send message route = client.root() message = Message(type=type, body=body, ns="des") conversation = await route.request(message) # Iterate over responses result_message = None async for envelope in conversation: message = envelope.message if envelope.is_terminal(): # Terminal error envelopes are not delivered, they raise an error match envelope.type: case "ok": if "message" in message.body: result_message = message.body["message"] success = True case _: raise RuntimeError( f"Unhandled terminal message: {message.type}" ) else: match message.type: case "stdout": print(message.body["message"]) case _: raise RuntimeError( f"Unhandled intermediate response message: {message.type}" ) finally: if conversation is not None: await conversation.close() if not success: raise RuntimeError("Did not receive a status report") assert result_message is not None, "Did not receive a result message" return result_message async def _dbquery_async( args: Mapping[str, QueryParamValue], limit: int, skip_cache: bool = False, ) -> Sequence[PartJSON]: """Internal async function to send a dbquery request to the JITX server. Args: args: The query parameters limit: Maximum number of results to return skip_cache: Whether to skip cached results Returns: A list of results """ # Categorize args by type according to the dbquery-schema int_args = [] double_args = [] string_args = [] tuple_string_args = [] tuple_double_args = [] for key, value in args.items(): if isinstance(value, str): string_args.append([key, value]) elif isinstance(value, int): int_args.append([key, value]) elif isinstance(value, float): double_args.append([key, value]) elif isinstance(value, PlainQuantity): scalar = value.to_base_units().magnitude double_args.append([key, scalar]) elif isinstance(value, Sequence): if all(isinstance(x, str) for x in value): tuple_string_args.append([key, list(value)]) elif all(isinstance(x, int | float) for x in value): tuple_double_args.append( [key, [float(x) for x in value]] ) # Convert all numeric values to float else: raise ValueError(f"Mixed type tuples not supported for key: {key}") else: raise ValueError(f"Unsupported value type for key {key}: {type(value)}") body = { "design-name": get_design_name(), "int-args": int_args, "string-args": string_args, "double-args": double_args, "tuple-string-args": tuple_string_args, "tuple-double-args": tuple_double_args, "limit": limit, "skipCache": skip_cache, } return await _send_message(type="dbquery", body=body) # Used in tests, disables per-design part locking. ALLOW_NO_DESIGN_CONTEXT = False
[docs] def get_design_name() -> str | None: try: ctx = DesignContext.get() if ctx is None: if ALLOW_NO_DESIGN_CONTEXT: return None raise RuntimeError( "The Parts Database cannot be queried outside of the context of a Design." ) return ctx.design.__module__ + "." + ctx.design.__class__.__qualname__ except InstantiationStructureException as e: raise RuntimeError( "The parts database can only be queried during the instantiation of a JITX Design, typically in the __init__ method of a Circuit subclass.\n" "You may be trying to use it to define a class attribute?" ) from e
[docs] def dbquery( args: Mapping[str, QueryParamValue], limit: int = 1000, skip_cache: bool = False ) -> Sequence[PartJSON]: """Query the JITX parts database. The function automatically categorizes arguments by their type to be sent to the server: - int-args: tuple[tuple[str, int], ...] - double-args: tuple[tuple[str, float], ...] - string-args: tuple[tuple[str, str], ...] - tuple-double-args: tuple[tuple[str, tuple[float, ...]], ...] - tuple-string-args: tuple[tuple[str, tuple[str, ...]], ...] The function also passes the following parameters along to the server: - limit: int - skip_cache: bool Args: args: The query parameters limit: Maximum number of results to return skip_cache: Whether to skip cached results Returns: A list of matching components """ try: return asyncio.run(_dbquery_async(args, limit, skip_cache)) except Exception as e: logging.error(f"dbquery failed: {e}") raise
[docs] def download_model3d(filepath: str) -> None: """Download a model 3D file from the JITX parts database. Args: filepath: The path to the model 3D file """ try: asyncio.run(_download_model3d(filepath)) except Exception as e: logging.error(f"download_model_3d failed: {e}") raise
async def _download_model3d(filepath: str): """Download a model 3D file from the JITX parts database. Args: filepath: The path to the model 3D file """ return await _send_message(type="download-model3d", body={"filepath": filepath}) if __name__ == "__main__": try: # Simple test query with minimal data result = dbquery({"category": "resistor", "resistance": 10000.0}, limit=1) print("Result content:") # del result[0]["sellers"] print(json.dumps(result, indent=2)) except Exception as e: print(f"Error running query: {e}")