Skip to content

Core Services API

Nancy Brain's core services provide the foundation for RAG (Retrieval-Augmented Generation) functionality, knowledge base management, and search capabilities.

RAG Service

The main service class that orchestrates retrieval operations.

rag_core.service.RAGService

RAG service for retrieving relevant context from the knowledge base.

Source code in rag_core/service.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
class RAGService:
    """RAG service for retrieving relevant context from the knowledge base."""

    def __init__(
        self,
        embeddings_path: Path,
        config_path: Path,
        weights_path: Path,
        use_dual_embedding: Optional[bool] = None,
        search_instance: Optional[object] = None,
    ):
        """
        Initialize the RAG service.

        Args:
            embeddings_path: Path to the txtai embeddings index
            config_path: Path to the repositories configuration file
            weights_path: Path to the model weights file
            use_dual_embedding: Whether to use dual embedding models (general + code).
                               If None, reads from USE_DUAL_EMBEDDING environment variable.
        """
        # Read dual embedding setting from environment if not explicitly set
        if use_dual_embedding is None:
            use_dual_embedding = os.environ.get("USE_DUAL_EMBEDDING", "true").lower() == "true"

        # Initialize core components
        self.registry = Registry(config_path, use_dual_embedding=use_dual_embedding)
        self.store = Store(embeddings_path.parent)
        # Defer loading of Search (and heavy txtai/torch imports) until actually needed.
        # Store parameters for lazy initialization. Allow an injected search instance
        # (used in tests or by DI) to bypass lazy loading.
        self._search_args = {
            "embeddings_path": embeddings_path,
            "dual": use_dual_embedding,
            "code_model": os.environ.get("CODE_EMBEDDING_MODEL", "microsoft/codebert-base"),
        }
        # Allow injection for testing / DI. If an explicit search_instance is
        # provided, use it. Otherwise prefer lazy-loading. However, many
        # existing tests assume a lightweight placeholder is present when an
        # embeddings index is not initialized. To keep both behaviours:
        # - If the embeddings index directory exists, start with `self.search`
        #   as None (true lazy init).
        # - If the index does NOT exist, provide a lightweight Placeholder
        #   so tests can set `service.search.search = Mock(...)` without
        #   pulling heavy dependencies.
        if search_instance is not None:
            self.search = search_instance
        else:
            # If rag_core.search was explicitly removed (e.g. tests set
            # sys.modules['rag_core.search'] = None) treat as missing and
            # keep search as None to surface that txtai isn't available.
            module_entry = None
            try:
                module_entry = __import__("sys").modules.get("rag_core.search", None)
            except Exception:
                module_entry = None

            index_dir = Path(embeddings_path) / "index"
            index_exists = index_dir.exists()

            if module_entry is None and "rag_core.search" in __import__("sys").modules:
                # Explicitly set to None in sys.modules -> behave as missing
                self.search = None
            elif index_exists:
                # If an index is present prefer true lazy-loading (None)
                self.search = None
            else:
                # Lightweight placeholder Search that defers to general_embeddings if present.
                class PlaceholderSearch:
                    def __init__(self):
                        self.general_embeddings = None
                        self.model_weights = {}
                        self.extension_weights = {}

                    def search(self, query, limit):
                        if self.general_embeddings is not None and hasattr(self.general_embeddings, "search"):
                            return self.general_embeddings.search(query, limit)
                        return []

                self.search = PlaceholderSearch()

        # Mirror legacy attribute for tests that check service.general_embeddings
        # Keep synced with the placeholder if present
        self.general_embeddings = getattr(self.search, "general_embeddings", None)
        self.weights = ModelWeights(weights_path)

        # Store paths
        self.embeddings_path = embeddings_path
        self.config_path = config_path
        self.weights_path = weights_path
        self.use_dual_embedding = use_dual_embedding

        self._weights = {}  # runtime weights set via API

    def _ensure_search_loaded(self):
        """Lazily initialize the Search instance to avoid importing heavy
        dependencies (txtai/torch) at module import or during CLI help/tests.
        """
        if getattr(self, "search", None) is not None:
            return

        try:
            # Import here to keep heavy imports local
            Search = None
            try:
                from .search import Search as _Search

                Search = _Search
            except Exception:
                Search = None

            if Search is None:
                self.search = None
                return

            args = self._search_args or {}
            try:
                self.search = Search(
                    args.get("embeddings_path"),
                    dual=args.get("dual", False),
                    code_model=args.get("code_model"),
                )
            except Exception:
                # If Search initialization fails (e.g., txtai import error), leave as None
                self.search = None
        except Exception:
            self.search = None
        # Sync general_embeddings mirror if search instance provided
        try:
            self.general_embeddings = getattr(self.search, "general_embeddings", None)
        except Exception:
            self.general_embeddings = None

    """
    def search(self, query, limit):
        hits = self.search.run(query, limit)
        # apply weights via self.weights then return
    def retrieve(self, doc_id, start, end):
        meta = self.registry.get_meta(doc_id)
        text = self.store.read_lines(doc_id, start, end)
        return Passage(doc_id, text, meta.github_url, meta.content_sha256)
    # list_tree, set_weight, version() similarly thin"""

    def get_context_for_query(self, query: str, max_chars: int = 4000) -> str:
        """
        Get formatted context for a query, suitable for LLM prompts.

        Args:
            query: Search query
            max_chars: Maximum characters to include in context

        Returns:
            Formatted context string
        """
        # Lazily initialize search if needed
        self._ensure_search_loaded()
        if not self.search:
            return "No relevant information found."

        results = self.search(query, limit=5)

        if not results:
            return "No relevant information found."

        context_parts = []
        current_length = 0

        for result in results:
            # Get GitHub URL for this document
            github_url = self._get_github_url(result["id"])

            # Truncate text if needed (allow more content per document)
            text = result["text"]

            # Add document info with both source path and GitHub link
            if github_url:
                # Extract filename for link text
                filename = result["id"].split("/")[-1]
                doc_info = f"Source: {result['id']}\nGitHub URL: <{github_url}|{filename}>\n"
            else:
                doc_info = f"Source: {result['id']}\n"

            content = f"{text}\n\n"

            # Check if adding this would exceed max_chars
            if current_length + len(doc_info) + len(content) > max_chars:
                break

            context_parts.append(doc_info + content)
            current_length += len(doc_info) + len(content)

        if not context_parts:
            return "No relevant information found."

        return "".join(context_parts).strip()

    def get_raw_results_for_ai(self, query: str, limit: int = 5) -> List[Dict[str, str]]:
        """
        Get raw RAG results with GitHub URLs for AI processing.

        Args:
            query: Search query
            limit: Maximum number of results to return

        Returns:
            List of dictionaries with 'id', 'text', 'score', and 'github_url' keys
        """
        # Lazily initialize search if needed
        self._ensure_search_loaded()
        if not self.search:
            return []

        results = self.search(query, limit)

        enhanced_results = []
        for result in results:
            github_url = self._get_github_url(result["id"])
            enhanced_results.append(
                {
                    "id": result["id"],
                    "text": result["text"],
                    "score": result["score"],
                    "github_url": github_url,
                    "model_score": result.get("model_score", 1.0),
                    "extension_weight": result.get("extension_weight", 1.0),
                    "adjusted_score": result.get("adjusted_score", result["score"]),
                }
            )

        return enhanced_results

    def get_detailed_context(self, query: str, max_chars: int = 6000) -> str:
        """
        Get detailed context with more content per document.

        Args:
            query: Search query
            max_chars: Maximum characters to include in context

        Returns:
            Formatted context string with more detailed content
        """
        # Lazily initialize search if needed
        self._ensure_search_loaded()
        if not self.search:
            return "No relevant information found."

        results = self.search(query, limit=2)  # Fewer results, more content each

        if not results:
            return "No relevant information found."

        context_parts = []
        current_length = 0

        for result in results:
            # Get GitHub URL for this document
            github_url = self._get_github_url(result["id"])

            # Allow much more content per document
            text = result["text"]

            # Add document info with both source path and GitHub link
            if github_url:
                # Extract filename for link text
                filename = result["id"].split("/")[-1]
                doc_info = f"Source: {result['id']}\nGitHub URL: <{github_url}|{filename}>\n"
            else:
                doc_info = f"Source: {result['id']}\n"

            content = f"{text}\n\n"

            # Check if adding this would exceed max_chars
            if current_length + len(doc_info) + len(content) > max_chars:
                break

            context_parts.append(doc_info + content)
            current_length += len(doc_info) + len(content)

        if not context_parts:
            return "No relevant information found."

        return "".join(context_parts).strip()

    def is_available(self) -> bool:
        """Check if the RAG service is available and ready."""
        # Ensure search is loaded and check its embeddings availability
        self._ensure_search_loaded()
        try:
            return bool(
                (self.general_embeddings is not None)
                or (self.search and getattr(self.search, "general_embeddings", None))
            )
        except Exception:
            return False

    async def search_docs(
        self,
        query: str,
        limit: int = 6,
        toolkit: str = None,
        doctype: str = None,
        threshold: float = 0.0,
    ) -> List[Dict]:
        """Search for documents with optional filtering."""
        # Ensure search is loaded before proceeding
        self._ensure_search_loaded()
        if not self.search:
            return []

        # Before searching, push runtime weights into search.model_weights (backwards compatibility)
        # Reload file-based weights so each search uses the latest on-disk configuration
        try:
            self.weights.reload()
        except Exception:
            # If reload fails, proceed with previously loaded values
            logger.debug("Failed to reload weights from disk; using cached values")

        # Inject file-based weights and any runtime overrides into the Search instance
        try:
            self.search.extension_weights = self.weights.extension_weights or {}
            # Start from the file-based model weights, then apply runtime overrides
            self.search.model_weights = dict(self.weights.model_weights or {})
        except Exception:
            # Defensive fallbacks
            self.search.extension_weights = {}
            self.search.model_weights = {}

        if self._weights:
            # Merge runtime weights, overriding file-based values
            self.search.model_weights.update(self._weights)

        # Get initial search results
        results = self.search.search(query, limit * 2)  # Get more to allow for filtering

        # Ensure results have expected scoring fields so downstream code can rely on them.
        for r in results:
            # If the search backend already provided model_score/extension_weight/
            # adjusted_score, preserve those values. Otherwise compute sensible
            # defaults based on current runtime weights so tests and callers can
            # rely on deterministic fields being present.
            try:
                # Determine the effective model_score: prefer explicit runtime
                # overrides found in the Search instance (self.search.model_weights).
                runtime_mw = getattr(self.search, "model_weights", {}) or {}
                if r.get("id") in runtime_mw:
                    r_model_score = float(runtime_mw.get(r.get("id"), 1.0))
                else:
                    # Fall back to provided value or default
                    r_model_score = float(r.get("model_score", 1.0))
                r["model_score"] = r_model_score
            except Exception:
                r["model_score"] = float(r.get("model_score", 1.0))

            try:
                # Determine effective extension weight: prefer search.extension_weights
                runtime_ext = getattr(self.search, "extension_weights", {}) or {}
                ext = Path(r.get("id", "")).suffix
                if ext in runtime_ext:
                    r_ext = float(runtime_ext.get(ext, 1.0))
                else:
                    r_ext = float(r.get("extension_weight", 1.0))
                r["extension_weight"] = r_ext
            except Exception:
                r["extension_weight"] = float(r.get("extension_weight", 1.0))

            try:
                # Recompute adjusted score from effective components unless the
                # search backend provided an explicit adjusted_score AND there
                # are no runtime overrides for this document or its extension.
                base = float(r.get("score", 0.0))
                has_runtime_model_override = r.get("id") in (getattr(self.search, "model_weights", {}) or {})
                ext_key = Path(r.get("id", "")).suffix
                has_runtime_ext_override = ext_key in (getattr(self.search, "extension_weights", {}) or {})

                if "adjusted_score" in r and not has_runtime_model_override and not has_runtime_ext_override:
                    # Preserve provided adjusted_score
                    r["adjusted_score"] = float(r["adjusted_score"])
                else:
                    r["adjusted_score"] = r.get("extension_weight", 1.0) * r.get("model_score", 1.0) * base
            except Exception:
                r["adjusted_score"] = float(r.get("adjusted_score", r.get("score", 0.0)))

        # Apply filters if specified
        if toolkit or doctype:
            filtered_results = []
            for result in results:
                doc_id = result["id"]
                meta = self.registry.get_meta(doc_id)

                # Check toolkit filter
                if toolkit and meta.toolkit != toolkit:
                    continue

                # Check doctype filter
                if doctype and meta.doctype != doctype:
                    continue

                filtered_results.append(result)
            results = filtered_results

        # Apply threshold filter
        if threshold > 0.0:
            results = [r for r in results if r["score"] >= threshold]

        # Return top results up to limit
        return results[:limit]

    async def retrieve(self, doc_id: str, start: int = None, end: int = None) -> Dict:
        """Retrieve a span of text from a document."""
        # Default full document if no range provided
        if start is None or end is None:
            text = self.store.read_lines(doc_id)
        else:
            text = self.store.read_lines(doc_id, start, end)
        meta = self.registry.get_meta(doc_id)
        return {
            "doc_id": doc_id,
            "text": text,
            "github_url": meta.github_url,
            "content_sha256": meta.content_sha256,
        }

    async def retrieve_batch(self, items: List[Dict]) -> List[Dict]:
        """Retrieve multiple text spans in batch."""
        results = []
        for item in items:
            doc_id = item["doc_id"]
            start = item.get("start")
            end = item.get("end")
            try:
                result = await self.retrieve(doc_id, start, end)
                results.append(result)
            except Exception as e:
                logger.error(f"Failed to retrieve {doc_id}: {e}")
                # Add placeholder with error info
                results.append(
                    {
                        "doc_id": doc_id,
                        "text": f"Error retrieving document: {str(e)}",
                        "github_url": "",
                        "content_sha256": "",
                        "error": str(e),
                    }
                )
        return results

    async def list_tree(self, prefix: str = "", depth: int = 2, max_entries: int = 500) -> List[Dict]:
        """List document IDs under a prefix as a tree structure."""
        # Lazily initialize search and prefer registry fallback if embeddings unavailable
        self._ensure_search_loaded()
        if not self.search or not getattr(self.search, "general_embeddings", None):
            # Fallback to registry-based approach if search/embeddings not available
            doc_ids = self.registry.list_ids(prefix)
        else:
            try:
                # Get all document IDs from the search index by doing a broad search
                # txtai doesn't have a direct "list all IDs" method, so we search for common terms
                all_results = []

                # Try several broad searches to get as many document IDs as possible
                search_terms = ["the", "and", "a", "import", "def", "class", "README", "docs"]
                seen_ids = set()

                for term in search_terms:
                    try:
                        results = self.search.general_embeddings.search(term, limit=2000)
                        for result in results:
                            doc_id = result.get("id", "")
                            if doc_id and doc_id not in seen_ids:
                                if not prefix or doc_id.startswith(prefix):
                                    all_results.append(doc_id)
                                    seen_ids.add(doc_id)
                    except Exception:
                        # Skip documents that can't be parsed
                        continue

                    # Stop if we have enough diverse results
                    if len(seen_ids) > 1000:
                        break

                # Filter by prefix
                if prefix:
                    doc_ids = [doc_id for doc_id in all_results if doc_id.startswith(prefix)]
                else:
                    doc_ids = all_results

            except Exception:
                # Fallback to registry-based approach if search fails
                doc_ids = self.registry.list_ids(prefix)

        # Convert flat list to tree structure
        tree_entries = []
        seen_paths = set()

        # First pass: collect ALL paths (not limited by max_entries) to properly detect directories
        all_paths = set()
        for doc_id in doc_ids:
            parts = doc_id.split("/")
            for i in range(1, min(len(parts), depth + 1) + 1):  # Go one level deeper to detect directories
                path = "/".join(parts[:i])
                all_paths.add(path)

        # Second pass: build tree entries (limited by max_entries for display)
        entries_added = 0
        for doc_id in doc_ids:
            if entries_added >= max_entries:
                break

            parts = doc_id.split("/")
            for i in range(1, min(len(parts), depth) + 1):
                path = "/".join(parts[:i])
                if path not in seen_paths:
                    seen_paths.add(path)

                    # Check if this path has any children (making it a directory)
                    is_directory = any(other_path.startswith(path + "/") for other_path in all_paths)

                    tree_entries.append(
                        {
                            "path": path,
                            "type": "directory" if is_directory else "file",
                            "doc_id": doc_id if i == len(parts) else None,
                        }
                    )
                    entries_added += 1

                    if entries_added >= max_entries:
                        break

        return tree_entries

    async def set_weight(
        self,
        doc_id: str,
        multiplier: float,
        namespace: str = "global",
        ttl_days: int = None,
    ) -> None:
        """Set model weight for a document (runtime only). Extra parameters ignored for backward compatibility."""
        if not hasattr(self, "_weights"):
            self._weights = {}
        try:
            m = float(multiplier)
        except Exception:
            m = 1.0
        # Clamp similar to search logic expectations
        m = max(0.1, min(m, 10.0))
        self._weights[doc_id] = m
        # Reflect in search if it is already loaded (so next call sees it)
        try:
            if self.search is not None:
                self.search.model_weights[doc_id] = m
        except Exception:
            # Ignore failures when search/embeddings aren't available
            pass
        logger.info(f"Runtime weight set for {doc_id}: {m}")

    async def version(self) -> Dict:
        """Return index and build version info, with robust error handling for build info and extra environment details."""
        import sys
        import platform
        from nancy_brain import __version__

        __build_sha__ = "unknown"
        __built_at__ = "unknown"
        try:
            from nancy_brain import _build_info

            __build_sha__ = getattr(_build_info, "__build_sha__", "unknown")
            __built_at__ = getattr(_build_info, "__built_at__", "unknown")
        except (ImportError, AttributeError, Exception):
            pass

        # Gather environment info
        python_version = platform.python_version()
        implementation = platform.python_implementation()
        environment = os.environ.get("CONDA_DEFAULT_ENV") or os.environ.get("VIRTUAL_ENV") or "unknown"

        # Try to get key dependency versions
        def get_version(pkg):
            try:
                return __import__(pkg).__version__
            except Exception:
                return "unknown"

        dependencies = {
            "fastapi": get_version("fastapi"),
            "pydantic": get_version("pydantic"),
            "txtai": get_version("txtai"),
            "faiss": get_version("faiss") if get_version("faiss") != "unknown" else get_version("faiss_cpu"),
            "torch": get_version("torch"),
            "transformers": get_version("transformers"),
        }

        return {
            "index_version": __version__,
            "build_sha": __build_sha__,
            "built_at": __built_at__,
            "python_version": python_version,
            "python_implementation": implementation,
            "environment": environment,
            "dependencies": dependencies,
        }

    async def health(self) -> Dict:
        """Return service health status."""
        try:
            # Basic health checks
            is_ready = self.registry is not None and self.store is not None and self.search is not None

            status = "ok" if is_ready else "degraded"

            return {
                "status": status,
                "registry_loaded": self.registry is not None,
                "store_loaded": self.store is not None,
                "search_loaded": self.search is not None,
            }
        except Exception as e:
            logger.error(f"Health check failed: {e}")
            return {"status": "error", "error": str(e)}

__init__(embeddings_path, config_path, weights_path, use_dual_embedding=None, search_instance=None)

Initialize the RAG service.

Parameters:

Name Type Description Default
embeddings_path Path

Path to the txtai embeddings index

required
config_path Path

Path to the repositories configuration file

required
weights_path Path

Path to the model weights file

required
use_dual_embedding Optional[bool]

Whether to use dual embedding models (general + code). If None, reads from USE_DUAL_EMBEDDING environment variable.

None
Source code in rag_core/service.py
def __init__(
    self,
    embeddings_path: Path,
    config_path: Path,
    weights_path: Path,
    use_dual_embedding: Optional[bool] = None,
    search_instance: Optional[object] = None,
):
    """
    Initialize the RAG service.

    Args:
        embeddings_path: Path to the txtai embeddings index
        config_path: Path to the repositories configuration file
        weights_path: Path to the model weights file
        use_dual_embedding: Whether to use dual embedding models (general + code).
                           If None, reads from USE_DUAL_EMBEDDING environment variable.
    """
    # Read dual embedding setting from environment if not explicitly set
    if use_dual_embedding is None:
        use_dual_embedding = os.environ.get("USE_DUAL_EMBEDDING", "true").lower() == "true"

    # Initialize core components
    self.registry = Registry(config_path, use_dual_embedding=use_dual_embedding)
    self.store = Store(embeddings_path.parent)
    # Defer loading of Search (and heavy txtai/torch imports) until actually needed.
    # Store parameters for lazy initialization. Allow an injected search instance
    # (used in tests or by DI) to bypass lazy loading.
    self._search_args = {
        "embeddings_path": embeddings_path,
        "dual": use_dual_embedding,
        "code_model": os.environ.get("CODE_EMBEDDING_MODEL", "microsoft/codebert-base"),
    }
    # Allow injection for testing / DI. If an explicit search_instance is
    # provided, use it. Otherwise prefer lazy-loading. However, many
    # existing tests assume a lightweight placeholder is present when an
    # embeddings index is not initialized. To keep both behaviours:
    # - If the embeddings index directory exists, start with `self.search`
    #   as None (true lazy init).
    # - If the index does NOT exist, provide a lightweight Placeholder
    #   so tests can set `service.search.search = Mock(...)` without
    #   pulling heavy dependencies.
    if search_instance is not None:
        self.search = search_instance
    else:
        # If rag_core.search was explicitly removed (e.g. tests set
        # sys.modules['rag_core.search'] = None) treat as missing and
        # keep search as None to surface that txtai isn't available.
        module_entry = None
        try:
            module_entry = __import__("sys").modules.get("rag_core.search", None)
        except Exception:
            module_entry = None

        index_dir = Path(embeddings_path) / "index"
        index_exists = index_dir.exists()

        if module_entry is None and "rag_core.search" in __import__("sys").modules:
            # Explicitly set to None in sys.modules -> behave as missing
            self.search = None
        elif index_exists:
            # If an index is present prefer true lazy-loading (None)
            self.search = None
        else:
            # Lightweight placeholder Search that defers to general_embeddings if present.
            class PlaceholderSearch:
                def __init__(self):
                    self.general_embeddings = None
                    self.model_weights = {}
                    self.extension_weights = {}

                def search(self, query, limit):
                    if self.general_embeddings is not None and hasattr(self.general_embeddings, "search"):
                        return self.general_embeddings.search(query, limit)
                    return []

            self.search = PlaceholderSearch()

    # Mirror legacy attribute for tests that check service.general_embeddings
    # Keep synced with the placeholder if present
    self.general_embeddings = getattr(self.search, "general_embeddings", None)
    self.weights = ModelWeights(weights_path)

    # Store paths
    self.embeddings_path = embeddings_path
    self.config_path = config_path
    self.weights_path = weights_path
    self.use_dual_embedding = use_dual_embedding

    self._weights = {}  # runtime weights set via API

get_context_for_query(query, max_chars=4000)

Get formatted context for a query, suitable for LLM prompts.

Parameters:

Name Type Description Default
query str

Search query

required
max_chars int

Maximum characters to include in context

4000

Returns:

Type Description
str

Formatted context string

Source code in rag_core/service.py
def get_context_for_query(self, query: str, max_chars: int = 4000) -> str:
    """
    Get formatted context for a query, suitable for LLM prompts.

    Args:
        query: Search query
        max_chars: Maximum characters to include in context

    Returns:
        Formatted context string
    """
    # Lazily initialize search if needed
    self._ensure_search_loaded()
    if not self.search:
        return "No relevant information found."

    results = self.search(query, limit=5)

    if not results:
        return "No relevant information found."

    context_parts = []
    current_length = 0

    for result in results:
        # Get GitHub URL for this document
        github_url = self._get_github_url(result["id"])

        # Truncate text if needed (allow more content per document)
        text = result["text"]

        # Add document info with both source path and GitHub link
        if github_url:
            # Extract filename for link text
            filename = result["id"].split("/")[-1]
            doc_info = f"Source: {result['id']}\nGitHub URL: <{github_url}|{filename}>\n"
        else:
            doc_info = f"Source: {result['id']}\n"

        content = f"{text}\n\n"

        # Check if adding this would exceed max_chars
        if current_length + len(doc_info) + len(content) > max_chars:
            break

        context_parts.append(doc_info + content)
        current_length += len(doc_info) + len(content)

    if not context_parts:
        return "No relevant information found."

    return "".join(context_parts).strip()

get_detailed_context(query, max_chars=6000)

Get detailed context with more content per document.

Parameters:

Name Type Description Default
query str

Search query

required
max_chars int

Maximum characters to include in context

6000

Returns:

Type Description
str

Formatted context string with more detailed content

Source code in rag_core/service.py
def get_detailed_context(self, query: str, max_chars: int = 6000) -> str:
    """
    Get detailed context with more content per document.

    Args:
        query: Search query
        max_chars: Maximum characters to include in context

    Returns:
        Formatted context string with more detailed content
    """
    # Lazily initialize search if needed
    self._ensure_search_loaded()
    if not self.search:
        return "No relevant information found."

    results = self.search(query, limit=2)  # Fewer results, more content each

    if not results:
        return "No relevant information found."

    context_parts = []
    current_length = 0

    for result in results:
        # Get GitHub URL for this document
        github_url = self._get_github_url(result["id"])

        # Allow much more content per document
        text = result["text"]

        # Add document info with both source path and GitHub link
        if github_url:
            # Extract filename for link text
            filename = result["id"].split("/")[-1]
            doc_info = f"Source: {result['id']}\nGitHub URL: <{github_url}|{filename}>\n"
        else:
            doc_info = f"Source: {result['id']}\n"

        content = f"{text}\n\n"

        # Check if adding this would exceed max_chars
        if current_length + len(doc_info) + len(content) > max_chars:
            break

        context_parts.append(doc_info + content)
        current_length += len(doc_info) + len(content)

    if not context_parts:
        return "No relevant information found."

    return "".join(context_parts).strip()

get_raw_results_for_ai(query, limit=5)

Get raw RAG results with GitHub URLs for AI processing.

Parameters:

Name Type Description Default
query str

Search query

required
limit int

Maximum number of results to return

5

Returns:

Type Description
List[Dict[str, str]]

List of dictionaries with 'id', 'text', 'score', and 'github_url' keys

Source code in rag_core/service.py
def get_raw_results_for_ai(self, query: str, limit: int = 5) -> List[Dict[str, str]]:
    """
    Get raw RAG results with GitHub URLs for AI processing.

    Args:
        query: Search query
        limit: Maximum number of results to return

    Returns:
        List of dictionaries with 'id', 'text', 'score', and 'github_url' keys
    """
    # Lazily initialize search if needed
    self._ensure_search_loaded()
    if not self.search:
        return []

    results = self.search(query, limit)

    enhanced_results = []
    for result in results:
        github_url = self._get_github_url(result["id"])
        enhanced_results.append(
            {
                "id": result["id"],
                "text": result["text"],
                "score": result["score"],
                "github_url": github_url,
                "model_score": result.get("model_score", 1.0),
                "extension_weight": result.get("extension_weight", 1.0),
                "adjusted_score": result.get("adjusted_score", result["score"]),
            }
        )

    return enhanced_results

health() async

Return service health status.

Source code in rag_core/service.py
async def health(self) -> Dict:
    """Return service health status."""
    try:
        # Basic health checks
        is_ready = self.registry is not None and self.store is not None and self.search is not None

        status = "ok" if is_ready else "degraded"

        return {
            "status": status,
            "registry_loaded": self.registry is not None,
            "store_loaded": self.store is not None,
            "search_loaded": self.search is not None,
        }
    except Exception as e:
        logger.error(f"Health check failed: {e}")
        return {"status": "error", "error": str(e)}

is_available()

Check if the RAG service is available and ready.

Source code in rag_core/service.py
def is_available(self) -> bool:
    """Check if the RAG service is available and ready."""
    # Ensure search is loaded and check its embeddings availability
    self._ensure_search_loaded()
    try:
        return bool(
            (self.general_embeddings is not None)
            or (self.search and getattr(self.search, "general_embeddings", None))
        )
    except Exception:
        return False

list_tree(prefix='', depth=2, max_entries=500) async

List document IDs under a prefix as a tree structure.

Source code in rag_core/service.py
async def list_tree(self, prefix: str = "", depth: int = 2, max_entries: int = 500) -> List[Dict]:
    """List document IDs under a prefix as a tree structure."""
    # Lazily initialize search and prefer registry fallback if embeddings unavailable
    self._ensure_search_loaded()
    if not self.search or not getattr(self.search, "general_embeddings", None):
        # Fallback to registry-based approach if search/embeddings not available
        doc_ids = self.registry.list_ids(prefix)
    else:
        try:
            # Get all document IDs from the search index by doing a broad search
            # txtai doesn't have a direct "list all IDs" method, so we search for common terms
            all_results = []

            # Try several broad searches to get as many document IDs as possible
            search_terms = ["the", "and", "a", "import", "def", "class", "README", "docs"]
            seen_ids = set()

            for term in search_terms:
                try:
                    results = self.search.general_embeddings.search(term, limit=2000)
                    for result in results:
                        doc_id = result.get("id", "")
                        if doc_id and doc_id not in seen_ids:
                            if not prefix or doc_id.startswith(prefix):
                                all_results.append(doc_id)
                                seen_ids.add(doc_id)
                except Exception:
                    # Skip documents that can't be parsed
                    continue

                # Stop if we have enough diverse results
                if len(seen_ids) > 1000:
                    break

            # Filter by prefix
            if prefix:
                doc_ids = [doc_id for doc_id in all_results if doc_id.startswith(prefix)]
            else:
                doc_ids = all_results

        except Exception:
            # Fallback to registry-based approach if search fails
            doc_ids = self.registry.list_ids(prefix)

    # Convert flat list to tree structure
    tree_entries = []
    seen_paths = set()

    # First pass: collect ALL paths (not limited by max_entries) to properly detect directories
    all_paths = set()
    for doc_id in doc_ids:
        parts = doc_id.split("/")
        for i in range(1, min(len(parts), depth + 1) + 1):  # Go one level deeper to detect directories
            path = "/".join(parts[:i])
            all_paths.add(path)

    # Second pass: build tree entries (limited by max_entries for display)
    entries_added = 0
    for doc_id in doc_ids:
        if entries_added >= max_entries:
            break

        parts = doc_id.split("/")
        for i in range(1, min(len(parts), depth) + 1):
            path = "/".join(parts[:i])
            if path not in seen_paths:
                seen_paths.add(path)

                # Check if this path has any children (making it a directory)
                is_directory = any(other_path.startswith(path + "/") for other_path in all_paths)

                tree_entries.append(
                    {
                        "path": path,
                        "type": "directory" if is_directory else "file",
                        "doc_id": doc_id if i == len(parts) else None,
                    }
                )
                entries_added += 1

                if entries_added >= max_entries:
                    break

    return tree_entries

retrieve(doc_id, start=None, end=None) async

Retrieve a span of text from a document.

Source code in rag_core/service.py
async def retrieve(self, doc_id: str, start: int = None, end: int = None) -> Dict:
    """Retrieve a span of text from a document."""
    # Default full document if no range provided
    if start is None or end is None:
        text = self.store.read_lines(doc_id)
    else:
        text = self.store.read_lines(doc_id, start, end)
    meta = self.registry.get_meta(doc_id)
    return {
        "doc_id": doc_id,
        "text": text,
        "github_url": meta.github_url,
        "content_sha256": meta.content_sha256,
    }

retrieve_batch(items) async

Retrieve multiple text spans in batch.

Source code in rag_core/service.py
async def retrieve_batch(self, items: List[Dict]) -> List[Dict]:
    """Retrieve multiple text spans in batch."""
    results = []
    for item in items:
        doc_id = item["doc_id"]
        start = item.get("start")
        end = item.get("end")
        try:
            result = await self.retrieve(doc_id, start, end)
            results.append(result)
        except Exception as e:
            logger.error(f"Failed to retrieve {doc_id}: {e}")
            # Add placeholder with error info
            results.append(
                {
                    "doc_id": doc_id,
                    "text": f"Error retrieving document: {str(e)}",
                    "github_url": "",
                    "content_sha256": "",
                    "error": str(e),
                }
            )
    return results

search_docs(query, limit=6, toolkit=None, doctype=None, threshold=0.0) async

Search for documents with optional filtering.

Source code in rag_core/service.py
async def search_docs(
    self,
    query: str,
    limit: int = 6,
    toolkit: str = None,
    doctype: str = None,
    threshold: float = 0.0,
) -> List[Dict]:
    """Search for documents with optional filtering."""
    # Ensure search is loaded before proceeding
    self._ensure_search_loaded()
    if not self.search:
        return []

    # Before searching, push runtime weights into search.model_weights (backwards compatibility)
    # Reload file-based weights so each search uses the latest on-disk configuration
    try:
        self.weights.reload()
    except Exception:
        # If reload fails, proceed with previously loaded values
        logger.debug("Failed to reload weights from disk; using cached values")

    # Inject file-based weights and any runtime overrides into the Search instance
    try:
        self.search.extension_weights = self.weights.extension_weights or {}
        # Start from the file-based model weights, then apply runtime overrides
        self.search.model_weights = dict(self.weights.model_weights or {})
    except Exception:
        # Defensive fallbacks
        self.search.extension_weights = {}
        self.search.model_weights = {}

    if self._weights:
        # Merge runtime weights, overriding file-based values
        self.search.model_weights.update(self._weights)

    # Get initial search results
    results = self.search.search(query, limit * 2)  # Get more to allow for filtering

    # Ensure results have expected scoring fields so downstream code can rely on them.
    for r in results:
        # If the search backend already provided model_score/extension_weight/
        # adjusted_score, preserve those values. Otherwise compute sensible
        # defaults based on current runtime weights so tests and callers can
        # rely on deterministic fields being present.
        try:
            # Determine the effective model_score: prefer explicit runtime
            # overrides found in the Search instance (self.search.model_weights).
            runtime_mw = getattr(self.search, "model_weights", {}) or {}
            if r.get("id") in runtime_mw:
                r_model_score = float(runtime_mw.get(r.get("id"), 1.0))
            else:
                # Fall back to provided value or default
                r_model_score = float(r.get("model_score", 1.0))
            r["model_score"] = r_model_score
        except Exception:
            r["model_score"] = float(r.get("model_score", 1.0))

        try:
            # Determine effective extension weight: prefer search.extension_weights
            runtime_ext = getattr(self.search, "extension_weights", {}) or {}
            ext = Path(r.get("id", "")).suffix
            if ext in runtime_ext:
                r_ext = float(runtime_ext.get(ext, 1.0))
            else:
                r_ext = float(r.get("extension_weight", 1.0))
            r["extension_weight"] = r_ext
        except Exception:
            r["extension_weight"] = float(r.get("extension_weight", 1.0))

        try:
            # Recompute adjusted score from effective components unless the
            # search backend provided an explicit adjusted_score AND there
            # are no runtime overrides for this document or its extension.
            base = float(r.get("score", 0.0))
            has_runtime_model_override = r.get("id") in (getattr(self.search, "model_weights", {}) or {})
            ext_key = Path(r.get("id", "")).suffix
            has_runtime_ext_override = ext_key in (getattr(self.search, "extension_weights", {}) or {})

            if "adjusted_score" in r and not has_runtime_model_override and not has_runtime_ext_override:
                # Preserve provided adjusted_score
                r["adjusted_score"] = float(r["adjusted_score"])
            else:
                r["adjusted_score"] = r.get("extension_weight", 1.0) * r.get("model_score", 1.0) * base
        except Exception:
            r["adjusted_score"] = float(r.get("adjusted_score", r.get("score", 0.0)))

    # Apply filters if specified
    if toolkit or doctype:
        filtered_results = []
        for result in results:
            doc_id = result["id"]
            meta = self.registry.get_meta(doc_id)

            # Check toolkit filter
            if toolkit and meta.toolkit != toolkit:
                continue

            # Check doctype filter
            if doctype and meta.doctype != doctype:
                continue

            filtered_results.append(result)
        results = filtered_results

    # Apply threshold filter
    if threshold > 0.0:
        results = [r for r in results if r["score"] >= threshold]

    # Return top results up to limit
    return results[:limit]

set_weight(doc_id, multiplier, namespace='global', ttl_days=None) async

Set model weight for a document (runtime only). Extra parameters ignored for backward compatibility.

Source code in rag_core/service.py
async def set_weight(
    self,
    doc_id: str,
    multiplier: float,
    namespace: str = "global",
    ttl_days: int = None,
) -> None:
    """Set model weight for a document (runtime only). Extra parameters ignored for backward compatibility."""
    if not hasattr(self, "_weights"):
        self._weights = {}
    try:
        m = float(multiplier)
    except Exception:
        m = 1.0
    # Clamp similar to search logic expectations
    m = max(0.1, min(m, 10.0))
    self._weights[doc_id] = m
    # Reflect in search if it is already loaded (so next call sees it)
    try:
        if self.search is not None:
            self.search.model_weights[doc_id] = m
    except Exception:
        # Ignore failures when search/embeddings aren't available
        pass
    logger.info(f"Runtime weight set for {doc_id}: {m}")

version() async

Return index and build version info, with robust error handling for build info and extra environment details.

Source code in rag_core/service.py
async def version(self) -> Dict:
    """Return index and build version info, with robust error handling for build info and extra environment details."""
    import sys
    import platform
    from nancy_brain import __version__

    __build_sha__ = "unknown"
    __built_at__ = "unknown"
    try:
        from nancy_brain import _build_info

        __build_sha__ = getattr(_build_info, "__build_sha__", "unknown")
        __built_at__ = getattr(_build_info, "__built_at__", "unknown")
    except (ImportError, AttributeError, Exception):
        pass

    # Gather environment info
    python_version = platform.python_version()
    implementation = platform.python_implementation()
    environment = os.environ.get("CONDA_DEFAULT_ENV") or os.environ.get("VIRTUAL_ENV") or "unknown"

    # Try to get key dependency versions
    def get_version(pkg):
        try:
            return __import__(pkg).__version__
        except Exception:
            return "unknown"

    dependencies = {
        "fastapi": get_version("fastapi"),
        "pydantic": get_version("pydantic"),
        "txtai": get_version("txtai"),
        "faiss": get_version("faiss") if get_version("faiss") != "unknown" else get_version("faiss_cpu"),
        "torch": get_version("torch"),
        "transformers": get_version("transformers"),
    }

    return {
        "index_version": __version__,
        "build_sha": __build_sha__,
        "built_at": __built_at__,
        "python_version": python_version,
        "python_implementation": implementation,
        "environment": environment,
        "dependencies": dependencies,
    }

Search Service

Handles text search and embedding-based retrieval.

rag_core.search.Search

Search for relevant documents using embeddings.

Source code in rag_core/search.py
class Search:
    """Search for relevant documents using embeddings."""

    def __init__(
        self,
        embeddings_path: Path,
        dual: bool = False,
        code_model: str = "microsoft/codebert-base",
        extension_weights: Dict = None,
        model_weights: Dict = None,
    ):
        """
        Initialize the Search with embeddings.
        """
        self.embeddings_path = embeddings_path
        self.use_dual_embedding = dual
        self.code_model = code_model
        self.extension_weights = extension_weights or {}
        self.model_weights = model_weights or {}
        self.general_embeddings = None
        self.code_embeddings = None
        # Load embedding indexes
        self._load_embeddings()

    def _load_embeddings(self):
        """Load txtai embeddings for general and code indexes."""
        try:
            from txtai.embeddings import Embeddings

            # Load general embeddings (index is in 'index' subdirectory)
            general_index = self.embeddings_path / "index"
            logger.info(f"Loading general embeddings from {general_index}")
            self.general_embeddings = Embeddings()
            self.general_embeddings.load(str(general_index))
            # Load code embeddings if dual embedding enabled
            if self.use_dual_embedding:
                code_index = self.embeddings_path / "code_index"
                if code_index.exists():
                    logger.info(f"Loading code embeddings from {code_index}")
                    self.code_embeddings = Embeddings()
                    self.code_embeddings.load(str(code_index))
                else:
                    logger.warning(f"Code embeddings not found at {code_index}")
                    self.code_embeddings = None
            else:
                self.code_embeddings = None
        except ImportError:
            logger.error("txtai not installed. Please install via `pip install txtai`")
            self.general_embeddings = None
            self.code_embeddings = None
        except Exception as e:
            logger.error(f"Failed to load embeddings: {e}")
            self.general_embeddings = None
            self.code_embeddings = None

    def search(self, query: str, limit: int = 5) -> List[Dict[str, str]]:
        """
        Search for relevant documents using dual embedding if available.

        Args:
            query: Search query
            limit: Maximum number of results to return

        Returns:
            List of dictionaries with 'id', 'text', and 'score' keys
        """
        if not self.general_embeddings:
            logger.warning("Embeddings not loaded, cannot perform search")
            return []

        try:
            # Get results from both models if dual embedding is active
            if self.use_dual_embedding and self.code_embeddings:
                return self._dual_embedding_search(query, limit)
            else:
                return self._single_embedding_search(query, limit)

        except Exception as e:
            logger.error(f"Search failed: {e}")
            return []

    def _single_embedding_search(self, query: str, limit: int) -> List[Dict[str, str]]:
        """Perform search with single embedding model (backward compatibility)."""
        results = self.general_embeddings.search(query, limit * 50)
        # Attach highlights computed from query
        for r in results:
            r.setdefault("highlights", [])
            # compute highlights here by reusing helper from _process_and_rank_results scope
            # simple approach: compute inline lightweight highlights
            try:
                tokens = [t for t in re.split(r"\s+", query.strip()) if len(t) > 0]
            except Exception:
                tokens = []
            r["highlights"] = []
            if tokens:
                lower_text = r.get("text", "").lower()
                for tok in tokens:
                    if not tok:
                        continue
                    start = 0
                    lower_tok = tok.lower()
                    while True:
                        idx = lower_text.find(lower_tok, start)
                        if idx == -1:
                            break
                        r["highlights"].append({"start": idx, "end": idx + len(tok), "type": "stem"})
                        start = idx + len(tok)
        return self._process_and_rank_results(results, limit, dual_scores=None)

    def _dual_embedding_search(self, query: str, limit: int) -> List[Dict[str, str]]:
        """Perform search with dual embedding models and merge results."""
        # Search both models with larger candidate pools for reweighting
        general_results = self.general_embeddings.search(query, limit * 50)
        code_results = self.code_embeddings.search(query, limit * 50)

        # Create dictionaries for quick lookup
        general_scores = {r["id"]: r for r in general_results}
        code_scores = {r["id"]: r for r in code_results}

        # Get all unique document IDs but limit to reasonable candidate pool
        all_doc_ids = set(general_scores.keys()) | set(code_scores.keys())

        # Merge results with dual scoring
        merged_results = []
        for doc_id in all_doc_ids:
            general_result = general_scores.get(doc_id)
            code_result = code_scores.get(doc_id)

            # Use the result with content (prefer general model if both have it)
            if general_result:
                base_result = general_result
            elif code_result:
                base_result = code_result
            else:
                continue

            # Calculate dual scores
            general_score = general_result["score"] if general_result else 0.0
            code_score = code_result["score"] if code_result else 0.0

            # Gather metadata and derive base document id for weighting
            metadata = {}
            try:
                if isinstance(base_result.get("data"), dict):
                    metadata = base_result.get("data", {}) or {}
                elif isinstance(base_result.get("metadata"), dict):
                    metadata = base_result.get("metadata", {}) or {}
            except Exception:
                metadata = {}
            base_doc_id = metadata.get("source_document") or strip_chunk_suffix(doc_id)

            # Apply file-type-aware weighting
            file_type = get_file_type_category(base_doc_id)
            if file_type == "code":
                # Code files: reduce code model influence to avoid too many low-level files
                dual_score = 0.6 * general_score + 0.4 * code_score
            elif file_type == "mixed":
                # Mixed content: equal weighting
                dual_score = 0.5 * general_score + 0.5 * code_score
            else:
                # Documentation: favor general model
                dual_score = 0.8 * general_score + 0.2 * code_score

            # Create merged result
            merged_result = {
                "id": doc_id,
                "text": base_result.get("text", ""),
                "score": dual_score,  # Use dual score as primary score
                "general_score": general_score,
                "code_score": code_score,
                "file_type": file_type,
                "data": metadata,
                "source_document": base_doc_id,
            }
            merged_results.append(merged_result)

        # Sort by dual score and attach simple highlights from query
        merged_results.sort(key=lambda r: r["score"], reverse=True)
        for r in merged_results:
            r.setdefault("highlights", [])
            try:
                tokens = [t for t in re.split(r"\s+", query.strip()) if len(t) > 0]
            except Exception:
                tokens = []
            if tokens:
                lower_text = r.get("text", "").lower()
                for tok in tokens:
                    if not tok:
                        continue
                    start = 0
                    lower_tok = tok.lower()
                    while True:
                        idx = lower_text.find(lower_tok, start)
                        if idx == -1:
                            break
                        r["highlights"].append({"start": idx, "end": idx + len(tok), "type": "stem"})
                        start = idx + len(tok)

        # Send all merged results - let _process_and_rank_results do the reweighting and limiting
        return self._process_and_rank_results(merged_results, limit, dual_scores=True)

    def _process_and_rank_results(
        self, results: List[Dict], limit: int, dual_scores: bool = False
    ) -> List[Dict[str, str]]:
        """Apply extension weights, model weights, and final ranking."""
        formatted_results = []

        # Load weights config
        weights_cfg = self.extension_weights or {}
        ext_weights = weights_cfg.get("extensions", {})
        path_includes = weights_cfg.get("path_includes", {})

        for result in results:
            doc_id = result["id"]
            metadata = {}
            if isinstance(result.get("data"), dict):
                metadata = result.get("data") or {}
            elif isinstance(result.get("metadata"), dict):
                metadata = result.get("metadata") or {}
            base_doc_id = metadata.get("source_document") or strip_chunk_suffix(doc_id)

            ext = Path(base_doc_id).suffix
            weight = ext_weights.get(ext, 1.0)
            doc_id_lower = base_doc_id.lower()

            # Apply path-based multipliers
            for keyword, mult in path_includes.items():
                if keyword.lower() in doc_id_lower:
                    weight *= mult

            # Apply model weight
            model_score = self.model_weights.get(base_doc_id, self.model_weights.get(doc_id, 1.0))
            try:
                model_score = float(model_score)
            except Exception:
                model_score = 1.0
            model_score = max(0.5, min(model_score, 2.0))

            # Calculate final adjusted score
            base_score = result.get("score", 0.0)
            adjusted_score = weight * model_score * base_score

            # Build result dictionary
            result_dict = {
                "id": doc_id,
                "source_document": base_doc_id,
                "text": result.get("text", ""),
                "score": base_score,
                "extension_weight": weight,
                "model_score": model_score,
                "adjusted_score": adjusted_score,
                "data": metadata,
            }
            if result.get("highlights") is not None:
                result_dict["highlights"] = result.get("highlights")

            # Add dual embedding info if available
            if dual_scores:
                result_dict.update(
                    {
                        "general_score": result.get("general_score", 0.0),
                        "code_score": result.get("code_score", 0.0),
                        "file_type": result.get("file_type", "unknown"),
                    }
                )

            formatted_results.append(result_dict)

        # Sort by adjusted_score, descending
        formatted_results.sort(key=lambda r: r["adjusted_score"], reverse=True)

        # Log search results
        dual_info = " (dual embedding)" if dual_scores else ""
        logger.info(f"Found {len(formatted_results)} results{dual_info} (sorted by adjusted_score)")

        # Compute lightweight highlights (offsets) for each result based on the query
        def compute_highlights(text: str, query: str) -> List[Dict]:
            highlights = []
            if not query or not text:
                return highlights

            tokens = [t for t in re.split(r"\s+", query.strip()) if len(t) > 0]
            if not tokens:
                return highlights

            lower_text = text.lower()

            # Exact (word-boundary) matches
            for tok in tokens:
                try:
                    pattern = r"\b" + re.escape(tok) + r"\b"
                    for m in re.finditer(pattern, text, flags=re.IGNORECASE):
                        highlights.append({"start": m.start(), "end": m.end(), "type": "exact"})
                except re.error:
                    continue

            # Stem-like matches: token as substring (not already covered)
            for tok in tokens:
                lower_tok = tok.lower()
                start = 0
                while True:
                    idx = lower_text.find(lower_tok, start)
                    if idx == -1:
                        break
                    end = idx + len(lower_tok)
                    # skip if overlapping an exact
                    if not any(
                        h["start"] <= idx < h["end"] or h["start"] < end <= h["end"]
                        for h in highlights
                        if h["type"] == "exact"
                    ):
                        highlights.append({"start": idx, "end": end, "type": "stem"})
                    start = end

            # Fuzzy matches: compare token to words in text using difflib
            words = list(re.finditer(r"\w+", text))
            for tok in tokens:
                for w in words:
                    word_text = w.group(0)
                    # skip if already covered
                    if any(h["start"] <= w.start() < h["end"] or h["start"] < w.end() <= h["end"] for h in highlights):
                        continue
                    try:
                        ratio = difflib.SequenceMatcher(None, tok.lower(), word_text.lower()).ratio()
                    except Exception:
                        ratio = 0.0
                    if ratio >= 0.7:
                        highlights.append({"start": w.start(), "end": w.end(), "type": "fuzzy"})

            # Merge and sort non-overlapping, preferring exact > stem > fuzzy
            type_priority = {"exact": 3, "stem": 2, "fuzzy": 1}
            # Sort by start, then by -priority
            highlights.sort(key=lambda h: (h["start"], -type_priority.get(h["type"], 0)))

            # Remove overlaps by keeping higher priority spans
            merged = []
            for h in highlights:
                if not merged:
                    merged.append(h)
                else:
                    last = merged[-1]
                    if h["start"] < last["end"]:
                        # overlap, keep the one with higher priority
                        if type_priority.get(h["type"], 0) > type_priority.get(last["type"], 0):
                            merged[-1] = h
                    else:
                        merged.append(h)

            return merged

        # If query not available in this scope, we can't compute highlights; skip.
        # The UI will prefer highlights if provided by the service layer. We add empty highlights here.
        for r in formatted_results:
            r.setdefault("highlights", [])

        return formatted_results[:limit]

__init__(embeddings_path, dual=False, code_model='microsoft/codebert-base', extension_weights=None, model_weights=None)

Initialize the Search with embeddings.

Source code in rag_core/search.py
def __init__(
    self,
    embeddings_path: Path,
    dual: bool = False,
    code_model: str = "microsoft/codebert-base",
    extension_weights: Dict = None,
    model_weights: Dict = None,
):
    """
    Initialize the Search with embeddings.
    """
    self.embeddings_path = embeddings_path
    self.use_dual_embedding = dual
    self.code_model = code_model
    self.extension_weights = extension_weights or {}
    self.model_weights = model_weights or {}
    self.general_embeddings = None
    self.code_embeddings = None
    # Load embedding indexes
    self._load_embeddings()

search(query, limit=5)

Search for relevant documents using dual embedding if available.

Parameters:

Name Type Description Default
query str

Search query

required
limit int

Maximum number of results to return

5

Returns:

Type Description
List[Dict[str, str]]

List of dictionaries with 'id', 'text', and 'score' keys

Source code in rag_core/search.py
def search(self, query: str, limit: int = 5) -> List[Dict[str, str]]:
    """
    Search for relevant documents using dual embedding if available.

    Args:
        query: Search query
        limit: Maximum number of results to return

    Returns:
        List of dictionaries with 'id', 'text', and 'score' keys
    """
    if not self.general_embeddings:
        logger.warning("Embeddings not loaded, cannot perform search")
        return []

    try:
        # Get results from both models if dual embedding is active
        if self.use_dual_embedding and self.code_embeddings:
            return self._dual_embedding_search(query, limit)
        else:
            return self._single_embedding_search(query, limit)

    except Exception as e:
        logger.error(f"Search failed: {e}")
        return []

Store Service

Manages knowledge base storage and embeddings persistence.

rag_core.store.Store

Store for reading document text by line ranges.

Source code in rag_core/store.py
class Store:
    """Store for reading document text by line ranges."""

    def __init__(self, base_path: Path):
        """Initialize store with base directory for text files."""
        self.base_path = base_path

    # I really don't understand what is suppose to go in here

    def read_lines(self, doc_id: str, start: Optional[int] = None, end: Optional[int] = None) -> str:
        """Read lines from a document. If start and end are None, return full content."""
        # Try the doc_id as-is first, then with .txt extension
        doc_path = self.base_path / doc_id
        if not doc_path.exists():
            doc_path = self.base_path / f"{doc_id}.txt"
        if not doc_path.exists():
            raise FileNotFoundError(f"Document not found: {doc_id}")

        # Read all lines including newline characters
        with open(doc_path, "r") as f:
            lines = f.readlines()
        # Default to full range
        s = start if start is not None else 0
        e = end if end is not None else len(lines)
        return "".join(lines[s:e])

__init__(base_path)

Initialize store with base directory for text files.

Source code in rag_core/store.py
def __init__(self, base_path: Path):
    """Initialize store with base directory for text files."""
    self.base_path = base_path

read_lines(doc_id, start=None, end=None)

Read lines from a document. If start and end are None, return full content.

Source code in rag_core/store.py
def read_lines(self, doc_id: str, start: Optional[int] = None, end: Optional[int] = None) -> str:
    """Read lines from a document. If start and end are None, return full content."""
    # Try the doc_id as-is first, then with .txt extension
    doc_path = self.base_path / doc_id
    if not doc_path.exists():
        doc_path = self.base_path / f"{doc_id}.txt"
    if not doc_path.exists():
        raise FileNotFoundError(f"Document not found: {doc_id}")

    # Read all lines including newline characters
    with open(doc_path, "r") as f:
        lines = f.readlines()
    # Default to full range
    s = start if start is not None else 0
    e = end if end is not None else len(lines)
    return "".join(lines[s:e])

Registry Service

Manages repository configurations and model weights.

rag_core.registry.Registry

Registry for document repositories.

Source code in rag_core/registry.py
class Registry:
    """Registry for document repositories."""

    def __init__(self, config_path: Path, use_dual_embedding: Optional[bool] = None):
        """Initialize registry and load repository configuration."""
        self.config_path = config_path
        self.use_dual_embedding = use_dual_embedding
        self.repo_config: Dict = {}
        self._load_config()
        # Load repository configuration on init
        self.repo_config: Dict = {}
        self._load_config()

    def _load_config(self):
        """Load the repositories configuration."""
        try:
            with open(self.config_path, "r") as f:
                self.repo_config = yaml.safe_load(f)
            logger.info(f"Loaded repository configuration from {self.config_path}")
        except Exception as e:
            logger.error(f"Failed to load repository configuration: {e}")
            self.repo_config = {}

    def _get_github_url(self, doc_id: str) -> Optional[str]:
        """
        Convert a document ID to a GitHub URL.

        Args:
            doc_id: Document ID in format "category/repo_name/path/to/file"

        Returns:
            GitHub URL or None if not found
        """
        if not self.repo_config:
            return None

        parts = doc_id.split("/", 2)  # Split into category, repo_name, file_path
        if len(parts) < 3:
            return None

        category, repo_name, file_path = parts

        # Find the repository in config
        if category in self.repo_config:
            for repo in self.repo_config[category]:
                if repo["name"] == repo_name:
                    # Convert GitHub URL to blob URL
                    github_url = repo["url"]
                    if github_url.endswith(".git"):
                        github_url = github_url[:-4]
                    return f"{github_url}/blob/master/{file_path}"

        return None

    def get_github_url(self, doc_id: str) -> Optional[str]:
        """Public method to retrieve GitHub URL for a document id."""
        return self._get_github_url(doc_id)

    def get_meta(self, doc_id: str) -> DocMeta:
        """Get metadata for a document id."""
        github_url = self.get_github_url(doc_id)
        default_branch = "master"
        toolkit = None
        doctype = get_file_type_category(doc_id)
        content_sha256 = ""
        line_index: list[int] = []
        return DocMeta(
            doc_id=doc_id,
            github_url=github_url or "",
            default_branch=default_branch,
            toolkit=toolkit,
            doctype=doctype,
            content_sha256=content_sha256,
            line_index=line_index,
        )

    def list_ids(self, prefix: str = "") -> List[str]:
        """List document IDs that start with the given prefix."""
        ids: List[str] = []
        # Iterate through categories and repos to build doc IDs
        for category, repos in self.repo_config.items():
            for repo in repos or []:
                name = repo.get("name")
                if not name:
                    continue
                doc_id = f"{category}/{name}"
                # If prefix is empty or doc_id matches prefix, include
                if not prefix or doc_id.startswith(prefix):
                    ids.append(doc_id)
        return ids

__init__(config_path, use_dual_embedding=None)

Initialize registry and load repository configuration.

Source code in rag_core/registry.py
def __init__(self, config_path: Path, use_dual_embedding: Optional[bool] = None):
    """Initialize registry and load repository configuration."""
    self.config_path = config_path
    self.use_dual_embedding = use_dual_embedding
    self.repo_config: Dict = {}
    self._load_config()
    # Load repository configuration on init
    self.repo_config: Dict = {}
    self._load_config()

get_github_url(doc_id)

Public method to retrieve GitHub URL for a document id.

Source code in rag_core/registry.py
def get_github_url(self, doc_id: str) -> Optional[str]:
    """Public method to retrieve GitHub URL for a document id."""
    return self._get_github_url(doc_id)

get_meta(doc_id)

Get metadata for a document id.

Source code in rag_core/registry.py
def get_meta(self, doc_id: str) -> DocMeta:
    """Get metadata for a document id."""
    github_url = self.get_github_url(doc_id)
    default_branch = "master"
    toolkit = None
    doctype = get_file_type_category(doc_id)
    content_sha256 = ""
    line_index: list[int] = []
    return DocMeta(
        doc_id=doc_id,
        github_url=github_url or "",
        default_branch=default_branch,
        toolkit=toolkit,
        doctype=doctype,
        content_sha256=content_sha256,
        line_index=line_index,
    )

list_ids(prefix='')

List document IDs that start with the given prefix.

Source code in rag_core/registry.py
def list_ids(self, prefix: str = "") -> List[str]:
    """List document IDs that start with the given prefix."""
    ids: List[str] = []
    # Iterate through categories and repos to build doc IDs
    for category, repos in self.repo_config.items():
        for repo in repos or []:
            name = repo.get("name")
            if not name:
                continue
            doc_id = f"{category}/{name}"
            # If prefix is empty or doc_id matches prefix, include
            if not prefix or doc_id.startswith(prefix):
                ids.append(doc_id)
    return ids

rag_core.registry.ModelWeights

Manage model weights for different file types.

Source code in rag_core/registry.py
class ModelWeights:
    """Manage model weights for different file types."""

    def __init__(self, model_weights_path: Path):
        self.model_weights_path = model_weights_path
        self.model_weights = self._load_model_weights()
        self.extension_weights = self._load_extension_weights()

    def _load_model_weights(self):
        if self.model_weights_path.exists():
            try:
                with open(self.model_weights_path, "r") as f:
                    data = yaml.safe_load(f) or {}
                    # If the file looks like an extension-weight file (contains extensions/path_includes),
                    # treat it as not providing per-doc model weights.
                    if isinstance(data, dict) and ("extensions" in data or "path_includes" in data):
                        return {}
                    return data
            except Exception as e:
                logger.warning(f"Failed to load model weights: {e}")
        return {}

    def _load_extension_weights(self):
        # Prefer an index-specific weights file if present in the same directory as the
        # provided model_weights_path. Fall back to package-level config/weights.yaml.
        if hasattr(self, "model_weights_path") and self.model_weights_path:
            base = self.model_weights_path.parent
        else:
            base = Path(__file__).parent.parent

        index_weights = base / "index_weights.yaml"
        weights_path = base / "weights.yaml"
        target = index_weights if index_weights.exists() else weights_path
        if target.exists():
            try:
                with open(target, "r") as f:
                    return yaml.safe_load(f) or {}
            except Exception as e:
                logger.warning(f"Failed to load extension weights from {target}: {e}")
        return {}

    def reload(self):
        """Reload model weights and extension weights from disk.

        Call this before searches to ensure the latest file-based weights are used.
        """
        self.model_weights = self._load_model_weights()
        self.extension_weights = self._load_extension_weights()

reload()

Reload model weights and extension weights from disk.

Call this before searches to ensure the latest file-based weights are used.

Source code in rag_core/registry.py
def reload(self):
    """Reload model weights and extension weights from disk.

    Call this before searches to ensure the latest file-based weights are used.
    """
    self.model_weights = self._load_model_weights()
    self.extension_weights = self._load_extension_weights()

Type Definitions

Core data types and models used throughout the system.

rag_core.types

DocMeta dataclass

Metadata for a document in the knowledge base.

Source code in rag_core/types.py
@dataclass
class DocMeta:
    """Metadata for a document in the knowledge base."""

    doc_id: str
    github_url: str
    default_branch: str
    toolkit: str
    doctype: str
    content_sha256: str
    line_index: List[int]

Passage dataclass

Passage retrieved from a document.

Source code in rag_core/types.py
@dataclass
class Passage:
    """Passage retrieved from a document."""

    doc_id: str
    text: str
    github_url: str
    content_sha256: str
    index_version: str = ""

SearchHit dataclass

Search result hit.

Source code in rag_core/types.py
@dataclass
class SearchHit:
    """Search result hit."""

    id: str
    text: str
    score: float

get_file_type_category(doc_id)

Determine if a document should be treated as code, mixed content, or docs. Returns 'code', 'mixed', or 'docs'.

Source code in rag_core/types.py
def get_file_type_category(doc_id: str) -> str:
    """
    Determine if a document should be treated as code, mixed content, or docs.
    Returns 'code', 'mixed', or 'docs'.
    """
    base_id = strip_chunk_suffix(doc_id)
    path = Path(base_id)

    # Direct code files
    code_extensions = {
        ".py",
        ".js",
        ".ts",
        ".cpp",
        ".java",
        ".go",
        ".rs",
        ".c",
        ".h",
        ".css",
        ".scss",
        ".jsx",
        ".tsx",
    }
    if path.suffix in code_extensions:
        return "code"

    # Converted notebooks (mixed code + documentation)
    if ".nb" in path.suffixes or ".nb.txt" in str(path):
        return "mixed"
    if path.suffix in {".json", ".yaml", ".yml", ".toml", ".ini", ".md", ".rst"}:
        return "mixed"
    return "docs"