Skip to content

API Reference

luxspec.telemetry.LuxFrame dataclass

A single calibrated or raw Luxnode scan frame.

The class accepts the compact V0 firmware payload {"device": "...", "kind": "raw", "pixels": [...]} and the richer protocol payload used by the Python SDK.

Source code in src/luxspec/telemetry.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
@dataclass(frozen=True)
class LuxFrame:
    """A single calibrated or raw Luxnode scan frame.

    The class accepts the compact V0 firmware payload
    ``{"device": "...", "kind": "raw", "pixels": [...]}`` and the richer
    protocol payload used by the Python SDK.
    """

    counts: np.ndarray
    wavelengths_nm: np.ndarray
    measurement_kind: str = "raw"
    schema_version: str = PROTOCOL_VERSION
    device_id: str = "luxnode"
    firmware_version: str | None = None
    module_id: str | None = None
    frame_id: str | None = None
    timestamp_unix_ms: int | None = None
    dark_counts: np.ndarray | None = None
    reference_counts: np.ndarray | None = None
    telemetry: LuxTelemetry = field(default_factory=LuxTelemetry)
    metadata: Mapping[str, Any] = field(default_factory=dict)

    def __post_init__(self) -> None:
        counts = _array(self.counts, "counts")
        wavelengths = _array(self.wavelengths_nm, "wavelengths_nm")
        if wavelengths.size != counts.size:
            raise ValueError("wavelengths_nm and counts must have the same length.")
        object.__setattr__(self, "counts", counts)
        object.__setattr__(self, "wavelengths_nm", wavelengths)
        for attr in ("dark_counts", "reference_counts"):
            values = getattr(self, attr)
            if values is not None:
                arr = _array(values, attr)
                if arr.size != counts.size:
                    raise ValueError(f"{attr} must match counts length.")
                object.__setattr__(self, attr, arr)

    @classmethod
    def from_mapping(cls, payload: Mapping[str, Any]) -> "LuxFrame":
        counts = payload.get("counts", payload.get("pixels"))
        if counts is None:
            raise ValueError("Lux frame payload needs counts or pixels.")
        count_array = _array(counts, "counts")
        wavelengths = payload.get("wavelengths_nm", payload.get("wavelengths"))
        if wavelengths is None:
            wavelengths = WavelengthCalibration.linear(340, 850, pixels=count_array.size).wavelengths(count_array.size)
        telemetry_payload = payload.get("telemetry")
        telemetry = LuxTelemetry.from_mapping(telemetry_payload if isinstance(telemetry_payload, Mapping) else None)
        integration = payload.get("integration_time_ms")
        if telemetry.integration_time_ms is None and integration is not None:
            telemetry = LuxTelemetry.from_mapping({**telemetry.to_dict(), "integration_time_ms": integration})
        return cls(
            counts=count_array,
            wavelengths_nm=np.asarray(wavelengths, dtype=float),
            measurement_kind=str(payload.get("measurement_kind", payload.get("kind", "raw"))),
            schema_version=str(payload.get("schema_version", PROTOCOL_VERSION)),
            device_id=str(payload.get("device_id", payload.get("device", "luxnode"))),
            firmware_version=payload.get("firmware_version"),
            module_id=payload.get("module_id", payload.get("module")),
            frame_id=payload.get("frame_id"),
            timestamp_unix_ms=int(payload["timestamp_unix_ms"]) if payload.get("timestamp_unix_ms") is not None else None,
            dark_counts=np.asarray(payload["dark_counts"], dtype=float) if payload.get("dark_counts") is not None else None,
            reference_counts=np.asarray(payload["reference_counts"], dtype=float)
            if payload.get("reference_counts") is not None
            else None,
            telemetry=telemetry,
            metadata={str(k): v for k, v in payload.get("metadata", {}).items()} if isinstance(payload.get("metadata"), Mapping) else {},
        )

    @classmethod
    def from_json(cls, text: str) -> "LuxFrame":
        return cls.from_mapping(json.loads(text))

    def to_spectrum(self, kind: str | None = None) -> Spectrum:
        return Spectrum(
            self.wavelengths_nm,
            self.counts,
            kind=kind or self.measurement_kind,
            metadata={
                "device_id": self.device_id,
                "firmware_version": self.firmware_version or "",
                "module_id": self.module_id or "",
                **dict(self.metadata),
            },
        )

    def to_dict(self) -> dict[str, Any]:
        data: dict[str, Any] = {
            "schema_version": self.schema_version,
            "device_id": self.device_id,
            "firmware_version": self.firmware_version,
            "module_id": self.module_id,
            "frame_id": self.frame_id,
            "timestamp_unix_ms": self.timestamp_unix_ms,
            "measurement_kind": self.measurement_kind,
            "wavelengths_nm": self.wavelengths_nm.tolist(),
            "counts": self.counts.tolist(),
            "dark_counts": self.dark_counts.tolist() if self.dark_counts is not None else None,
            "reference_counts": self.reference_counts.tolist() if self.reference_counts is not None else None,
            "telemetry": self.telemetry.to_dict(),
            "metadata": dict(self.metadata),
        }
        return {key: value for key, value in data.items() if value not in (None, {}, [])}

    def to_json(self, *, indent: int | None = None) -> str:
        return json.dumps(self.to_dict(), indent=indent)

luxspec.telemetry.LuxTelemetry dataclass

Device-side telemetry that travels with a Luxnode scan frame.

Source code in src/luxspec/telemetry.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@dataclass(frozen=True)
class LuxTelemetry:
    """Device-side telemetry that travels with a Luxnode scan frame."""

    board_temperature_c: float | None = None
    detector_temperature_c: float | None = None
    battery_mv: float | None = None
    usb_mv: float | None = None
    rssi_dbm: float | None = None
    integration_time_ms: float | None = None
    gain: float | None = None
    led_current_ma: float | None = None
    exposure_count: int | None = None
    warnings: tuple[str, ...] = ()
    extra: Mapping[str, Any] = field(default_factory=dict)

    @classmethod
    def from_mapping(cls, payload: Mapping[str, Any] | None) -> "LuxTelemetry":
        if not payload:
            return cls()
        warnings = payload.get("warnings", ())
        if isinstance(warnings, str):
            warnings = (warnings,)
        return cls(
            board_temperature_c=_float_or_none(payload.get("board_temperature_c")),
            detector_temperature_c=_float_or_none(payload.get("detector_temperature_c")),
            battery_mv=_float_or_none(payload.get("battery_mv")),
            usb_mv=_float_or_none(payload.get("usb_mv")),
            rssi_dbm=_float_or_none(payload.get("rssi_dbm")),
            integration_time_ms=_float_or_none(payload.get("integration_time_ms")),
            gain=_float_or_none(payload.get("gain")),
            led_current_ma=_float_or_none(payload.get("led_current_ma")),
            exposure_count=int(payload["exposure_count"]) if payload.get("exposure_count") is not None else None,
            warnings=tuple(str(item) for item in warnings),
            extra={str(k): v for k, v in payload.items() if k not in _TELEMETRY_FIELDS},
        )

    def to_dict(self) -> dict[str, Any]:
        data: dict[str, Any] = {
            "board_temperature_c": self.board_temperature_c,
            "detector_temperature_c": self.detector_temperature_c,
            "battery_mv": self.battery_mv,
            "usb_mv": self.usb_mv,
            "rssi_dbm": self.rssi_dbm,
            "integration_time_ms": self.integration_time_ms,
            "gain": self.gain,
            "led_current_ma": self.led_current_ma,
            "exposure_count": self.exposure_count,
            "warnings": list(self.warnings),
        }
        data.update(dict(self.extra))
        return {key: value for key, value in data.items() if value not in (None, [], {})}

luxspec.client.SerialLuxNode

Bases: BaseLuxNodeClient

Read JSON-line frames from Luxnode firmware over a serial port.

Source code in src/luxspec/client.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class SerialLuxNode(BaseLuxNodeClient):
    """Read JSON-line frames from Luxnode firmware over a serial port."""

    COMMANDS = {
        "raw": "SCAN",
        "scan": "SCAN",
        "dark": "DARK",
        "white": "WHITE",
        "reference": "WHITE",
    }

    def __init__(
        self,
        port: str,
        *,
        baud: int = 115200,
        timeout: float = 5.0,
        serial_factory: Any | None = None,
    ) -> None:
        self.port = port
        self.baud = baud
        self.timeout = timeout
        self._serial_factory = serial_factory
        self._serial: Any | None = None

    def __enter__(self) -> "SerialLuxNode":
        self.open()
        return self

    def __exit__(self, *_exc: object) -> None:
        self.close()

    def open(self) -> None:
        if self._serial is not None:
            return
        factory = self._serial_factory
        if factory is None:
            try:
                import serial  # type: ignore
            except ImportError as exc:
                raise LuxNodeError("Install serial support with: pip install 'luxspec[serial]'") from exc
            factory = serial.Serial
        self._serial = factory(self.port, self.baud, timeout=self.timeout)
        if hasattr(self._serial, "reset_input_buffer"):
            self._serial.reset_input_buffer()

    def close(self) -> None:
        if self._serial is not None and hasattr(self._serial, "close"):
            self._serial.close()
        self._serial = None

    def write_command(self, command: str) -> None:
        self.open()
        assert self._serial is not None
        self._serial.write((command.strip().upper() + "\n").encode("ascii"))

    def read_frame(self) -> LuxFrame:
        self.open()
        assert self._serial is not None
        while True:
            line = self._serial.readline()
            if isinstance(line, bytes):
                line = line.decode("utf-8", errors="replace")
            line = str(line).strip()
            if not line:
                raise LuxNodeError("Timed out waiting for a Luxnode JSON frame.")
            if line.startswith("{"):
                payload = json.loads(line)
                if "counts" in payload or "pixels" in payload:
                    return LuxFrame.from_mapping(payload)

    def capture(self, kind: str = "raw") -> LuxFrame:
        try:
            command = self.COMMANDS[kind.lower()]
        except KeyError as exc:
            raise ValueError(f"unknown capture kind: {kind}") from exc
        self.write_command(command)
        return self.read_frame()

    def iter_frames(self) -> Iterator[LuxFrame]:
        while True:
            yield self.read_frame()

luxspec.client.HttpLuxNode

Bases: BaseLuxNodeClient

HTTP client for Wi-Fi Luxnode firmware or a Lux Cloud edge bridge.

Source code in src/luxspec/client.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class HttpLuxNode(BaseLuxNodeClient):
    """HTTP client for Wi-Fi Luxnode firmware or a Lux Cloud edge bridge."""

    def __init__(self, base_url: str, *, timeout: float = 15.0) -> None:
        self.base_url = base_url.rstrip("/")
        self.timeout = timeout

    def _json(self, path: str, *, method: str = "GET", body: dict[str, Any] | None = None) -> dict[str, Any]:
        data = None if body is None else json.dumps(body).encode("utf-8")
        headers = {"accept": "application/json"}
        if data is not None:
            headers["content-type"] = "application/json"
        req = request.Request(f"{self.base_url}{path}", data=data, headers=headers, method=method)
        try:
            with request.urlopen(req, timeout=self.timeout) as response:
                return json.loads(response.read().decode("utf-8"))
        except HTTPError as exc:
            raise LuxNodeError(f"Luxnode HTTP request failed with {exc.code}: {exc.reason}") from exc
        except URLError as exc:
            raise LuxNodeError(f"Could not reach Luxnode HTTP endpoint: {exc.reason}") from exc

    def capture(self, kind: str = "raw") -> LuxFrame:
        return LuxFrame.from_mapping(self._json("/api/v1/capture", method="POST", body={"kind": kind}))

    def telemetry(self) -> dict[str, Any]:
        return self._json("/api/v1/telemetry")

    def iter_frames(self) -> Iterator[LuxFrame]:
        while True:
            yield self.capture()

luxspec.processing.analyze_frame(frame, *, dark=None, reference=None, mode='reflectance', module=None)

Process a raw frame into reflectance or absorbance and optional metrics.

Source code in src/luxspec/processing.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def analyze_frame(
    frame: LuxFrame,
    *,
    dark: LuxFrame | None = None,
    reference: LuxFrame | None = None,
    mode: str = "reflectance",
    module: str | None = None,
) -> FrameAnalysis:
    """Process a raw frame into reflectance or absorbance and optional metrics."""

    dark_counts = frame.dark_counts if frame.dark_counts is not None else (dark.counts if dark is not None else None)
    reference_counts = (
        frame.reference_counts if frame.reference_counts is not None else (reference.counts if reference is not None else None)
    )
    if dark_counts is None:
        raise ValueError("analyze_frame requires dark counts or a dark LuxFrame.")
    if reference_counts is None:
        raise ValueError("analyze_frame requires reference counts or a reference LuxFrame.")

    if mode == "reflectance":
        processed = process_reflectance(frame.wavelengths_nm, frame.counts, reference_counts, dark_counts)
    elif mode == "absorbance":
        processed = process_absorbance(frame.wavelengths_nm, frame.counts, reference_counts, dark_counts)
    else:
        raise ValueError("mode must be 'reflectance' or 'absorbance'.")

    return FrameAnalysis(processed=processed, metrics=module_metrics(processed.calibrated, module) if module else None)

luxspec.processing.module_metrics(spectrum, module)

Dispatch a calibrated spectrum to a Luxnode module metrics function.

Source code in src/luxspec/processing.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def module_metrics(spectrum, module: str | None):
    """Dispatch a calibrated spectrum to a Luxnode module metrics function."""

    if module is None:
        return None
    key = module.lower().replace("-", "_")
    if key in {"plant", "plants", "leaf"}:
        return plant_metrics(spectrum)
    if key in {"mineral", "minerals", "geology"}:
        return identify_mineral(spectrum)
    if key in {"beer", "brewing"}:
        return beer_metrics(spectrum)
    if key in {"kombucha", "fermentation"}:
        return kombucha_metrics(spectrum)
    raise ValueError(f"unknown Lux module: {module}")

luxspec.versioning.VersionInfo dataclass

Compatibility envelope reported by Luxnode firmware and Lux Cloud.

Source code in src/luxspec/versioning.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@dataclass(frozen=True)
class VersionInfo:
    """Compatibility envelope reported by Luxnode firmware and Lux Cloud."""

    firmware: SemVer
    protocol: SemVer
    hardware: str
    sdk_min: SemVer = SemVer(0, 2, 0)

    @classmethod
    def from_mapping(cls, payload: Mapping[str, Any]) -> "VersionInfo":
        return cls(
            firmware=SemVer.parse(str(payload["firmware"])),
            protocol=SemVer.parse(str(payload.get("protocol", "0.1.0"))),
            hardware=str(payload.get("hardware", "luxnode-v0")),
            sdk_min=SemVer.parse(str(payload.get("sdk_min", "0.2.0"))),
        )

    def supports_sdk(self, sdk_version: str) -> bool:
        sdk = SemVer.parse(sdk_version)
        return sdk.major == self.sdk_min.major and sdk >= self.sdk_min

    def supports_protocol(self, protocol_version: str) -> bool:
        protocol = SemVer.parse(protocol_version)
        return protocol.major == self.protocol.major and protocol <= self.protocol

luxspec.versioning.UpgradeManifest dataclass

Signed-release metadata for a future Luxnode firmware updater.

Source code in src/luxspec/versioning.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@dataclass(frozen=True)
class UpgradeManifest:
    """Signed-release metadata for a future Luxnode firmware updater."""

    firmware: SemVer
    protocol: SemVer
    hardware: str
    artifact_url: str
    artifact_sha256: str
    release_notes_url: str | None = None
    min_battery_mv: int = 3700

    @classmethod
    def from_mapping(cls, payload: Mapping[str, Any]) -> "UpgradeManifest":
        return cls(
            firmware=SemVer.parse(str(payload["firmware"])),
            protocol=SemVer.parse(str(payload.get("protocol", "0.1.0"))),
            hardware=str(payload.get("hardware", "luxnode-v0")),
            artifact_url=str(payload["artifact_url"]),
            artifact_sha256=str(payload["artifact_sha256"]).lower(),
            release_notes_url=payload.get("release_notes_url"),
            min_battery_mv=int(payload.get("min_battery_mv", 3700)),
        )

    def verify_file(self, path: str | Path) -> bool:
        digest = hashlib.sha256(Path(path).read_bytes()).hexdigest()
        return digest == self.artifact_sha256

lux.Spectrum dataclass

A one-dimensional spectrum with wavelengths in nanometers.

Source code in src/lux/spectrum.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@dataclass(frozen=True)
class Spectrum:
    """A one-dimensional spectrum with wavelengths in nanometers."""

    wavelengths_nm: np.ndarray
    values: np.ndarray
    kind: str = "intensity"
    metadata: Mapping[str, Any] = field(default_factory=dict)

    def __post_init__(self) -> None:
        wavelengths = np.asarray(self.wavelengths_nm, dtype=float)
        values = np.asarray(self.values, dtype=float)
        if wavelengths.ndim != 1 or values.ndim != 1:
            raise ValueError("wavelengths and values must be one-dimensional.")
        if wavelengths.size != values.size:
            raise ValueError("wavelength and value arrays must have the same length.")
        if wavelengths.size < 3:
            raise ValueError("a spectrum needs at least three channels.")
        object.__setattr__(self, "wavelengths_nm", wavelengths)
        object.__setattr__(self, "values", values)

    def interpolate_to(self, target_wavelengths_nm: np.ndarray) -> "Spectrum":
        target = np.asarray(target_wavelengths_nm, dtype=float)
        values = np.interp(target, self.wavelengths_nm, self.values)
        return Spectrum(target, values, kind=self.kind, metadata=dict(self.metadata))

    def window(self, min_nm: float, max_nm: float) -> "Spectrum":
        mask = (self.wavelengths_nm >= min_nm) & (self.wavelengths_nm <= max_nm)
        if mask.sum() < 3:
            raise ValueError("window leaves fewer than three channels.")
        return Spectrum(
            self.wavelengths_nm[mask],
            self.values[mask],
            kind=self.kind,
            metadata=dict(self.metadata),
        )

    def normalized(self, method: str = "l2") -> "Spectrum":
        y = self.values.astype(float).copy()
        if method == "l2":
            scale = np.linalg.norm(y)
            y = y / scale if scale > 0 else y
        elif method == "area":
            area = float(np.trapezoid(np.abs(y), self.wavelengths_nm))
            y = y / area if area > 0 else y
        elif method == "minmax":
            span = float(np.nanmax(y) - np.nanmin(y))
            y = (y - np.nanmin(y)) / span if span > 0 else y * 0
        elif method == "zscore":
            std = float(np.nanstd(y))
            y = (y - np.nanmean(y)) / std if std > 0 else y * 0
        else:
            raise ValueError(f"unknown normalization method: {method}")
        return Spectrum(self.wavelengths_nm, y, kind=self.kind, metadata=dict(self.metadata))

    def to_csv_rows(self) -> list[tuple[float, float]]:
        return list(zip(self.wavelengths_nm.tolist(), self.values.tolist()))

    @classmethod
    def from_pairs(
        cls,
        pairs: list[tuple[float, float]],
        kind: str = "intensity",
        metadata: Mapping[str, Any] | None = None,
    ) -> "Spectrum":
        arr = np.asarray(pairs, dtype=float)
        return cls(arr[:, 0], arr[:, 1], kind=kind, metadata=metadata or {})

lux.peaks.detect_peaks(wavelengths_nm, values, *, smooth_window=9, smooth_degree=3, min_prominence=None, min_distance_nm=10.0, baseline_correct=True, max_peaks=None)

Detect resolved peaks with local prominence and FWHM.

The default parameters are tuned for C12880MA/C16767MA spectra whose optical resolution is roughly 5-15 nm. Use smaller min_distance_nm only for the UV head or high-resolution external spectra.

Source code in src/lux/peaks.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def detect_peaks(
    wavelengths_nm: np.ndarray,
    values: np.ndarray,
    *,
    smooth_window: int = 9,
    smooth_degree: int = 3,
    min_prominence: float | None = None,
    min_distance_nm: float = 10.0,
    baseline_correct: bool = True,
    max_peaks: int | None = None,
) -> list[Peak]:
    """Detect resolved peaks with local prominence and FWHM.

    The default parameters are tuned for C12880MA/C16767MA spectra whose optical
    resolution is roughly 5-15 nm. Use smaller `min_distance_nm` only for the UV
    head or high-resolution external spectra.
    """

    wavelengths = np.asarray(wavelengths_nm, dtype=float)
    y = np.asarray(values, dtype=float)
    if wavelengths.size != y.size:
        raise ValueError("wavelength and value arrays must have the same length.")
    if wavelengths.size < smooth_window:
        smooth_window = wavelengths.size if wavelengths.size % 2 else wavelengths.size - 1
    smoothed = savitzky_golay(y, window=smooth_window, degree=min(smooth_degree, smooth_window - 1))
    if baseline_correct:
        baseline = asymmetric_least_squares_baseline(smoothed)
        signal = smoothed - baseline
    else:
        baseline = np.zeros_like(smoothed)
        signal = smoothed

    scaled = robust_scale(signal)
    robust_threshold = max(2.5, 0.05 * float(np.nanmax(scaled) - np.nanmin(scaled)))
    absolute_threshold = float(min_prominence) if min_prominence is not None else None

    candidate_indices: list[int] = []
    for i in range(1, signal.size - 1):
        if signal[i] >= signal[i - 1] and signal[i] > signal[i + 1]:
            left_min = float(np.min(signal[max(0, i - 20) : i + 1]))
            right_min = float(np.min(signal[i : min(signal.size, i + 21)]))
            prominence = signal[i] - max(left_min, right_min)
            robust_prominence = scaled[i] - max(np.min(scaled[max(0, i - 20) : i + 1]), np.min(scaled[i : min(signal.size, i + 21)]))
            passes_threshold = (
                prominence >= absolute_threshold
                if absolute_threshold is not None
                else robust_prominence >= robust_threshold
            )
            if prominence > 0 and passes_threshold:
                candidate_indices.append(i)

    candidate_indices.sort(key=lambda idx: signal[idx], reverse=True)
    selected: list[int] = []
    for idx in candidate_indices:
        if all(abs(wavelengths[idx] - wavelengths[other]) >= min_distance_nm for other in selected):
            selected.append(idx)
        if max_peaks is not None and len(selected) >= max_peaks:
            break
    selected.sort()

    peaks: list[Peak] = []
    for idx in selected:
        left_slice = signal[max(0, idx - 20) : idx + 1]
        right_slice = signal[idx : min(signal.size, idx + 21)]
        left_base_idx = max(0, idx - 20) + int(np.argmin(left_slice))
        right_base_idx = idx + int(np.argmin(right_slice))
        local_base = max(signal[left_base_idx], signal[right_base_idx])
        peaks.append(
            Peak(
                wavelength_nm=float(wavelengths[idx]),
                value=float(signal[idx]),
                prominence=float(signal[idx] - local_base),
                left_base_nm=float(wavelengths[left_base_idx]),
                right_base_nm=float(wavelengths[right_base_idx]),
                fwhm_nm=_half_width(wavelengths, signal, idx, local_base),
                index=int(idx),
            )
        )
    return peaks

lux.matching.match_spectrum(measured, library, *, top_k=5, smooth_window=9, metric='composite')

Rank spectra using complementary whole-shape metrics.

metric can be composite (the default), sam, correlation, derivative_correlation, or continuum_correlation.

Source code in src/lux/matching.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def match_spectrum(
    measured: Spectrum,
    library: Iterable[LibraryEntry],
    *,
    top_k: int = 5,
    smooth_window: int = 9,
    metric: str = "composite",
) -> list[MatchResult]:
    """Rank spectra using complementary whole-shape metrics.

    ``metric`` can be ``composite`` (the default), ``sam``, ``correlation``,
    ``derivative_correlation``, or ``continuum_correlation``.
    """

    y = measured.values.astype(float)
    if smooth_window and y.size >= smooth_window:
        y = savitzky_golay(y, smooth_window, min(3, smooth_window - 1))
    y_deriv = first_derivative(measured.wavelengths_nm, y)
    y_cont = continuum_removed(y)

    results: list[MatchResult] = []
    for entry in library:
        ref = entry.spectrum.interpolate_to(measured.wavelengths_nm).values
        if smooth_window and ref.size >= smooth_window:
            ref = savitzky_golay(ref, smooth_window, min(3, smooth_window - 1))
        angle = spectral_angle(y, ref)
        corr = correlation(y, ref)
        dcorr = correlation(y_deriv, first_derivative(measured.wavelengths_nm, ref))
        ccorr = correlation(y_cont, continuum_removed(ref))
        angle_score = 1.0 - min(angle / np.pi, 1.0)
        if metric == "composite":
            score = 0.40 * angle_score + 0.30 * max(corr, -1) + 0.20 * max(dcorr, -1) + 0.10 * max(ccorr, -1)
        elif metric == "sam":
            score = angle_score
        elif metric == "correlation":
            score = 0.5 * (corr + 1.0)
        elif metric == "derivative_correlation":
            score = 0.5 * (dcorr + 1.0)
        elif metric in {"continuum", "continuum_removed", "continuum_correlation"}:
            score = 0.5 * (ccorr + 1.0)
        else:
            raise ValueError(f"unknown matching metric: {metric}")
        results.append(
            MatchResult(
                name=entry.name,
                group=entry.group,
                score=float(score),
                spectral_angle=float(angle),
                correlation=float(corr),
                derivative_correlation=float(dcorr),
                continuum_correlation=float(ccorr),
                metadata=dict(entry.metadata),
            )
        )
    results.sort(key=lambda item: item.score, reverse=True)
    return results[:top_k]