"""
Commands for interacting with the JITX parts database.
This module provides functions for communicating with the JITX parts database.
"""
import asyncio
import json
import logging
from collections.abc import Sequence, Mapping
from typing import Any, TypeAlias
from jitx._websocket import PersistentWebSocketClient, Message
from jitx._instantiation import InstantiationStructureException
from jitx.design import DesignContext
from jitx.units import PlainQuantity
JSON: TypeAlias = (
Mapping[str, "JSON"] | Sequence["JSON"] | str | int | float | bool | None
)
# PartJSON must be 'dict', not 'Mapping' in order to match the input type of `from_dict` in dataclasses_json/api.py
# so that type checking passes for the `from_dict` calls.
PartJSON = dict[str, JSON]
QueryParamValue: TypeAlias = float | int | str | Sequence[float | int | str]
# Configure logging
logging = logging.getLogger(__name__)
client = None
[docs]
async def get_client():
# FIXME: keepalive feature not working: `ERROR:jitxlib.parts.commands:download_model_3d failed: Socket not connected`
# global client
# if client is None:
# client = await PersistentWebSocketClient.create()
# # client.__keepalive = True
# client.keepalive()
# return client
return await PersistentWebSocketClient.create()
async def _send_message(type: str, body: Mapping[str, Any]) -> Any:
conversation = None
success = False
client = await get_client()
try:
# Create route handler and send message
route = client.root()
message = Message(type=type, body=body, ns="des")
conversation = await route.request(message)
# Iterate over responses
result_message = None
async for envelope in conversation:
message = envelope.message
if envelope.is_terminal():
# Terminal error envelopes are not delivered, they raise an error
match envelope.type:
case "ok":
if "message" in message.body:
result_message = message.body["message"]
success = True
case _:
raise RuntimeError(
f"Unhandled terminal message: {message.type}"
)
else:
match message.type:
case "stdout":
print(message.body["message"])
case _:
raise RuntimeError(
f"Unhandled intermediate response message: {message.type}"
)
finally:
if conversation is not None:
await conversation.close()
if not success:
raise RuntimeError("Did not receive a status report")
assert result_message is not None, "Did not receive a result message"
return result_message
async def _dbquery_async(
args: Mapping[str, QueryParamValue],
limit: int,
skip_cache: bool = False,
) -> Sequence[PartJSON]:
"""Internal async function to send a dbquery request to the JITX server.
Args:
args: The query parameters
limit: Maximum number of results to return
skip_cache: Whether to skip cached results
Returns:
A list of results
"""
# Categorize args by type according to the dbquery-schema
int_args = []
double_args = []
string_args = []
tuple_string_args = []
tuple_double_args = []
for key, value in args.items():
if isinstance(value, str):
string_args.append([key, value])
elif isinstance(value, int):
int_args.append([key, value])
elif isinstance(value, float):
double_args.append([key, value])
elif isinstance(value, PlainQuantity):
scalar = value.to_base_units().magnitude
double_args.append([key, scalar])
elif isinstance(value, Sequence):
if all(isinstance(x, str) for x in value):
tuple_string_args.append([key, list(value)])
elif all(isinstance(x, int | float) for x in value):
tuple_double_args.append(
[key, [float(x) for x in value]]
) # Convert all numeric values to float
else:
raise ValueError(f"Mixed type tuples not supported for key: {key}")
else:
raise ValueError(f"Unsupported value type for key {key}: {type(value)}")
body = {
"design-name": get_design_name(),
"int-args": int_args,
"string-args": string_args,
"double-args": double_args,
"tuple-string-args": tuple_string_args,
"tuple-double-args": tuple_double_args,
"limit": limit,
"skipCache": skip_cache,
}
return await _send_message(type="dbquery", body=body)
# Used in tests, disables per-design part locking.
ALLOW_NO_DESIGN_CONTEXT = False
[docs]
def get_design_name() -> str | None:
try:
ctx = DesignContext.get()
if ctx is None:
if ALLOW_NO_DESIGN_CONTEXT:
return None
raise RuntimeError(
"The Parts Database cannot be queried outside of the context of a Design."
)
return ctx.design.__module__ + "." + ctx.design.__class__.__qualname__
except InstantiationStructureException as e:
raise RuntimeError(
"The parts database can only be queried during the instantiation of a JITX Design, typically in the __init__ method of a Circuit subclass.\n"
"You may be trying to use it to define a class attribute?"
) from e
[docs]
def dbquery(
args: Mapping[str, QueryParamValue], limit: int = 1000, skip_cache: bool = False
) -> Sequence[PartJSON]:
"""Query the JITX parts database.
The function automatically categorizes arguments by their type to be sent to the server:
- int-args: tuple[tuple[str, int], ...]
- double-args: tuple[tuple[str, float], ...]
- string-args: tuple[tuple[str, str], ...]
- tuple-double-args: tuple[tuple[str, tuple[float, ...]], ...]
- tuple-string-args: tuple[tuple[str, tuple[str, ...]], ...]
The function also passes the following parameters along to the server:
- limit: int
- skip_cache: bool
Args:
args: The query parameters
limit: Maximum number of results to return
skip_cache: Whether to skip cached results
Returns:
A list of matching components
"""
try:
return asyncio.run(_dbquery_async(args, limit, skip_cache))
except Exception as e:
logging.error(f"dbquery failed: {e}")
raise
[docs]
def download_model3d(filepath: str) -> None:
"""Download a model 3D file from the JITX parts database.
Args:
filepath: The path to the model 3D file
"""
try:
asyncio.run(_download_model3d(filepath))
except Exception as e:
logging.error(f"download_model_3d failed: {e}")
raise
async def _download_model3d(filepath: str):
"""Download a model 3D file from the JITX parts database.
Args:
filepath: The path to the model 3D file
"""
return await _send_message(type="download-model3d", body={"filepath": filepath})
if __name__ == "__main__":
try:
# Simple test query with minimal data
result = dbquery({"category": "resistor", "resistance": 10000.0}, limit=1)
print("Result content:")
# del result[0]["sellers"]
print(json.dumps(result, indent=2))
except Exception as e:
print(f"Error running query: {e}")