Skip to content

Web Interface API

Nancy Brain provides both a web admin interface and HTTP API for programmatic access.

Admin UI

Streamlit-based web interface for interactive knowledge base management.

nancy_brain.admin_ui

Nancy Brain Web UI - Simple admin interface for knowledge base management.

load_articles_config(config_path='config/articles.yml')

Load articles configuration.

Source code in nancy_brain/admin_ui.py
def load_articles_config(config_path: str = "config/articles.yml"):
    """Load articles configuration."""
    try:
        with open(config_path, "r") as f:
            return yaml.safe_load(f)
    except FileNotFoundError:
        return {}

load_config(config_path='config/repositories.yml')

Load repository configuration.

Source code in nancy_brain/admin_ui.py
def load_config(config_path: str = "config/repositories.yml"):
    """Load repository configuration."""
    try:
        with open(config_path, "r") as f:
            return yaml.safe_load(f)
    except FileNotFoundError:
        return {}

run_build_command(force_update=False, articles=False)

Run the knowledge base build command.

Source code in nancy_brain/admin_ui.py
def run_build_command(force_update: bool = False, articles: bool = False):
    """Run the knowledge base build command."""
    cmd = [
        sys.executable,
        str(package_root / "scripts" / "build_knowledge_base.py"),
        "--config",
        "config/repositories.yml",
        "--embeddings-path",
        "knowledge_base/embeddings",
    ]

    if articles and Path("config/articles.yml").exists():
        cmd.extend(["--articles-config", "config/articles.yml"])

    if force_update:
        cmd.append("--force-update")

    return subprocess.run(cmd, capture_output=True, text=True, cwd=package_root)

run_ui()

Render the Streamlit admin UI. Call this from a script entrypoint.

Keeping UI rendering inside a function avoids executing Streamlit code at import time (which breaks tests that import this module).

Source code in nancy_brain/admin_ui.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
def run_ui():
    """Render the Streamlit admin UI. Call this from a script entrypoint.

    Keeping UI rendering inside a function avoids executing Streamlit code at
    import time (which breaks tests that import this module).
    """
    st.set_page_config(page_title="Nancy Brain Admin", page_icon="🧠", layout="wide")

    _init_session_state_safe()

    # Main UI
    st.title("🧠 Nancy Brain Admin")
    st.markdown("*Turn GitHub repos into AI-searchable knowledge bases*")

    # Sidebar navigation + auth
    st.sidebar.title("Navigation")
    allow_insecure = os.environ.get("NB_ALLOW_INSECURE", "false").lower() in ("1", "true", "yes")

    with st.sidebar.expander("🔒 Authentication", expanded=True):
        if st.session_state.nb_token:
            st.write("**Logged in**")
            if st.button("Logout"):
                st.session_state.nb_token = None
                st.session_state.nb_refresh = None
                safe_rerun()
        else:
            st.write("Login to access admin features")
            with st.form("sidebar_login"):
                su = st.text_input("Username", key="_login_user")
                sp = st.text_input("Password", type="password", key="_login_pass")
                if st.form_submit_button("Login"):
                    try:
                        data = streamlit_auth.login(su, sp)
                        st.session_state.nb_token = data.get("access_token")
                        st.session_state.nb_refresh = data.get("refresh_token")
                        st.success("Logged in")
                        safe_rerun()
                    except Exception as e:
                        st.error(f"Login failed: {e}")

        if allow_insecure:
            st.info("NB_ALLOW_INSECURE is set: auth bypass enabled")

    page = st.sidebar.selectbox(
        "Choose a page:",
        ["🔍 Search", "⚖️ Weights", "📚 Repository Management", "🏗️ Build Knowledge Base", "📊 Status"],
    )

    is_authenticated = bool(st.session_state.nb_token) or allow_insecure

    if not is_authenticated:
        st.warning("You must be logged in to use the admin UI. Use the sidebar to login.")
        return

    if page == "🔍 Search":
        st.header("🔍 Search Knowledge Base")

        # Search interface
        col1, col2 = st.columns([3, 1])
        with col1:
            query = st.text_input("Search query:", placeholder="Enter your search query...")
        with col2:
            limit = st.number_input("Results:", min_value=1, max_value=20, value=5)

        # (Reweighting configuration moved below search results)

        # Ensure a single RAGService is created and reused (embeddings are heavy to load)
        if "rag_service" not in st.session_state or st.session_state.rag_service is None:
            try:
                # Use a dedicated model_weights file for thumbs persistence
                st.session_state.rag_service = RAGService(
                    embeddings_path=Path("knowledge_base/embeddings"),
                    config_path=Path("config/repositories.yml"),
                    weights_path=Path("config/model_weights.yaml"),
                )
            except Exception as e:
                # Don't crash the UI; show an error and leave rag_service as None
                st.session_state.rag_service = None
                show_error("Failed to initialize RAGService", e)

        if st.button("🔍 Search") and query:
            with st.spinner("Searching..."):
                try:
                    service = st.session_state.rag_service
                    if service is None:
                        st.error("Search service is not available")
                    else:
                        results = asyncio.run(service.search_docs(query, limit=limit))
                        st.session_state.search_results = results
                except Exception as e:
                    st.error(f"Search failed: {e}")

        # Display results
        if st.session_state.search_results:
            st.subheader("Search Results")

            def _highlight_snippet(text: str, query: str, snippet_len: int = 400, highlights: list = None) -> str:
                """Return an HTML highlighted snippet for the query tokens.

                Uses <mark> tags around matches and returns an HTML string (escaped).
                """
                if not text:
                    return ""

                escaped = html.escape(text)

                # If highlights provided by the service, use their offsets (preferred)
                if highlights:
                    # Build HTML by slicing original escaped text using offsets
                    parts = []
                    last = 0
                    for h in highlights:
                        s = max(0, h.get("start", 0))
                        e = min(len(text), h.get("end", s))
                        # Escape bounds in case
                        parts.append(html.escape(text[last:s]))
                        span = html.escape(text[s:e])
                        typ = h.get("type", "fuzzy")
                        color = "#ffd54f" if typ == "exact" else ("#90caf9" if typ == "stem" else "#e1bee7")
                        parts.append(f"<mark style='background:{color}; padding:0;'>{span}</mark>")
                        last = e
                    parts.append(html.escape(text[last:]))
                    composed = "".join(parts)

                    # Focus snippet around first highlight
                    m = re.search(r"<mark", composed)
                    if m:
                        idx = max(0, m.start() - snippet_len // 2)
                        snippet = composed[idx : idx + snippet_len]
                        if idx > 0:
                            snippet = "..." + snippet
                        if idx + snippet_len < len(composed):
                            snippet = snippet + "..."
                        return snippet
                    else:
                        return composed[:snippet_len] + ("..." if len(composed) > snippet_len else "")

                # Tokenize query into words, ignore very short tokens
                tokens = [t for t in re.split(r"\s+", query.strip()) if len(t) > 0]
                if not tokens:
                    return html.escape(text[:snippet_len]) + ("..." if len(text) > snippet_len else "")

                # Escape text for HTML then perform case-insensitive replacement
                escaped = html.escape(text)

                # Build a regex that matches any token (word-boundary aware)
                pattern = r"(" + r"|".join(re.escape(t) for t in tokens) + r")"

                def _repl(m):
                    return f"<mark>{m.group(0)}</mark>"

                try:
                    highlighted = re.sub(pattern, _repl, escaped, flags=re.IGNORECASE)
                except re.error:
                    # Fallback if regex fails
                    highlighted = escaped

                # Find first highlighted occurrence to build a focused snippet
                first_match = re.search(r"<mark>", highlighted)
                if first_match:
                    idx = first_match.start()
                    # Map back to original escaped text positions roughly
                    start = max(0, idx - snippet_len // 2)
                    end = start + snippet_len
                    snippet = highlighted[start:end]
                    if start > 0:
                        snippet = "..." + snippet
                    if end < len(highlighted):
                        snippet = snippet + "..."
                    return snippet
                else:
                    # No match highlighted (rare), return escaped leading chunk
                    return escaped[:snippet_len] + ("..." if len(escaped) > snippet_len else "")

            for i, result in enumerate(st.session_state.search_results, 1):
                title = f"{i}. {result.get('id', 'unknown')} (score: {result.get('score', 0):.3f})"
                with st.expander(title):
                    snippet_html = _highlight_snippet(
                        result.get("text", ""), query or "", highlights=result.get("highlights", [])
                    )
                    # Show snippet as HTML
                    st.markdown(snippet_html, unsafe_allow_html=True)
                    # Full highlighted content in an expander
                    with st.expander("Show full result"):
                        full_html = _highlight_snippet(
                            result.get("text", ""),
                            query or "",
                            snippet_len=len(result.get("text", "")),
                            highlights=result.get("highlights", []),
                        )
                        st.markdown(full_html, unsafe_allow_html=True)

                    # Thumbs up / Thumbs down controls to persist model_weights (left aligned)
                    col_left, col_right, _ = st.columns([0.12, 0.12, 1])
                    model_weights_path = Path("config/model_weights.yaml")
                    service = st.session_state.get("rag_service")

                    def _persist_model_weight(doc_id: str, new_value: float):
                        try:
                            # Load previous value and persist using helper
                            prev = set_model_weight(model_weights_path, doc_id, float(new_value))
                            # record undo
                            try:
                                st.session_state.weights_undo_stack.append((doc_id, prev))
                            except Exception:
                                pass
                            # Update in-memory search weights as well
                            if service is not None and hasattr(service, "search"):
                                try:
                                    service.search.model_weights[doc_id] = float(new_value)
                                except Exception:
                                    pass
                            st.success(f"Updated weight for {doc_id} -> {new_value}")
                        except Exception as e:
                            show_error("Failed to persist model weight", e)

                    # Upvote (increase multiplier by 20%, cap at 2.0)
                    with col_left:
                        if st.button("👍", key=f"thumbs_up_{i}"):
                            doc_id = result.get("id")
                            # Determine current value from service or disk
                            cur = 1.0
                            if service and hasattr(service, "search"):
                                cur = float(service.search.model_weights.get(doc_id, cur))
                            # Compute new value
                            new = min(2.0, cur * 1.2)
                            _persist_model_weight(doc_id, new)
                    # Downvote (decrease multiplier by 20%, floor at 0.5)
                    with col_right:
                        if st.button("👎", key=f"thumbs_down_{i}"):
                            doc_id = result.get("id")
                            cur = 1.0
                            if service and hasattr(service, "search"):
                                cur = float(service.search.model_weights.get(doc_id, cur))
                            new = max(0.5, cur * 0.8)
                            _persist_model_weight(doc_id, new)

    # New dedicated Weights page: always-visible editors for index_weights and model_weights
    elif page == "⚖️ Weights":
        st.header("⚖️ Reweighting / Model Weights")

        # --- Reweighting configuration editor
        st.markdown("---")
        st.markdown("#### Reweighting Configuration (index_weights.yaml)")
        weights_path = Path("config/index_weights.yaml")
        try:
            if weights_path.exists():
                with open(weights_path, "r") as wf:
                    weights_cfg = yaml.safe_load(wf) or {}
            else:
                weights_cfg = {}
        except Exception as e:
            weights_cfg = {}
            show_error("Failed to load index_weights.yaml", e, hint="Check config directory and YAML syntax")

        ext_weights = weights_cfg.get("extensions", {})
        path_includes = weights_cfg.get("path_includes", {})

        with st.form("weights_form"):
            st.markdown("**Extension weights (file extension -> multiplier)**")
            ext_text = st.text_area(
                "Extensions YAML (e.g. .py: 1.0)", value=yaml.dump(ext_weights) if ext_weights else "", height=120
            )
            st.markdown("**Path includes (keyword -> multiplier)**")
            path_text = st.text_area(
                "Path includes YAML (e.g. tests: 1.1)",
                value=yaml.dump(path_includes) if path_includes else "",
                height=120,
            )
            if st.form_submit_button("Save weights"):
                try:
                    new_ext = yaml.safe_load(ext_text) or {}
                    new_path = yaml.safe_load(path_text) or {}
                    new_cfg = {"extensions": new_ext, "path_includes": new_path}
                    os.makedirs(weights_path.parent, exist_ok=True)
                    # Validate before saving
                    ok, errs = validate_weights_config(new_cfg)
                    if not ok:
                        for e in errs:
                            st.error(e)
                    else:
                        with open(weights_path, "w") as wf:
                            yaml.dump(new_cfg, wf, default_flow_style=False, sort_keys=False)
                        st.success("Saved index_weights.yaml")
                        safe_rerun()
                except Exception as e:
                    show_error("Failed to save index_weights.yaml", e, hint="Ensure YAML is valid and file is writable")

        # Export / Import reweighting configuration (outside the form)
        try:
            export_weights_yaml = yaml.dump(
                weights_cfg if weights_cfg else {}, default_flow_style=False, sort_keys=False
            )
        except Exception:
            export_weights_yaml = ""

        col_exp, col_imp = st.columns(2)
        with col_exp:
            st.download_button(
                "⬇️ Export Reweighting Config",
                data=export_weights_yaml,
                file_name="index_weights_export.yml",
                mime="text/yaml",
                key="export_index_weights",
            )
        with col_imp:
            upload_weights = st.file_uploader(
                "⬆️ Import Reweighting Config", type=["yml", "yaml"], key="upload_index_weights_top"
            )
            if upload_weights is not None:
                try:
                    raw = upload_weights.read()
                    txt = raw.decode("utf-8")
                    parsed = yaml.safe_load(txt)
                    if not isinstance(parsed, dict):
                        st.error("Imported weights must be a YAML mapping")
                    else:
                        if st.button("Import and overwrite weights"):
                            # Validate parsed config
                            ok, errs = validate_weights_config(parsed)
                            if not ok:
                                for e in errs:
                                    st.error(e)
                            else:
                                os.makedirs(weights_path.parent, exist_ok=True)
                                with open(weights_path, "w") as wf:
                                    yaml.dump(parsed, wf, default_flow_style=False, sort_keys=False)
                                st.success("Imported index_weights.yaml")
                                safe_rerun()
                except Exception as e:
                    show_error("Failed to parse uploaded weights YAML.", e, hint="Ensure file is valid YAML")

        # --- Model weights editor (per-document multipliers)
        st.markdown("---")
        st.markdown("#### Model Weights (per-document) - config/model_weights.yaml")
        model_weights_path = Path("config/model_weights.yaml")
        try:
            if model_weights_path.exists():
                with open(model_weights_path, "r") as mf:
                    model_weights_cfg = yaml.safe_load(mf) or {}
            else:
                model_weights_cfg = {}
        except Exception as e:
            model_weights_cfg = {}
            show_error("Failed to load model_weights.yaml", e, hint="Check YAML syntax")

        # Show editable mapping
        model_text = yaml.dump(model_weights_cfg) if isinstance(model_weights_cfg, dict) else "{}"
        with st.form("model_weights_form"):
            st.markdown("**Per-document model weights (doc_id -> multiplier)**")
            model_text_area = st.text_area(
                "Model weights YAML (e.g. cat1/repo/path: 1.2)", value=model_text, height=200
            )
            col_save = st.columns([1])[0]
            with col_save:
                if st.form_submit_button("Save model weights"):
                    try:
                        parsed = yaml.safe_load(model_text_area) or {}
                        if not isinstance(parsed, dict):
                            st.error("Model weights must be a YAML mapping of doc_id -> numeric multiplier")
                        else:
                            # Coerce values to floats and clamp to safe range
                            fixed = {}
                            for k, v in parsed.items():
                                try:
                                    f = float(v)
                                except Exception:
                                    st.error(f"Invalid numeric value for {k}: {v}")
                                    raise
                                # Clamp to reasonable bounds [0.5, 2.0]
                                f = max(0.5, min(2.0, f))
                                fixed[k] = f
                            # Save using helper
                            save_model_weights(fixed, model_weights_path)
                            # Update in-memory service if available
                            svc = st.session_state.get("rag_service")
                            if svc is not None and hasattr(svc, "search"):
                                try:
                                    svc.search.model_weights = dict(fixed)
                                except Exception:
                                    pass
                            st.success("Saved model_weights.yaml")
                    except Exception as e:
                        show_error(
                            "Failed to save model_weights.yaml", e, hint="Ensure YAML is valid and values are numeric"
                        )

        # Undo / Export / Import actions (must be outside the form)
        if st.button("Undo last weight change"):
            stack = st.session_state.get("weights_undo_stack", [])
            if not stack:
                st.info("Nothing to undo")
            else:
                doc_id, prev = stack.pop()
                # prev may be None (meaning the doc_id didn't exist before)
                try:
                    set_model_weight(model_weights_path, doc_id, prev)
                    # update in-memory service
                    svc = st.session_state.get("rag_service")
                    if svc is not None and hasattr(svc, "search"):
                        try:
                            if prev is None:
                                svc.search.model_weights.pop(doc_id, None)
                            else:
                                svc.search.model_weights[doc_id] = float(prev)
                        except Exception:
                            pass
                    st.success(f"Reverted {doc_id} to {prev}")
                except Exception as e:
                    show_error("Failed to undo model weight change", e)

        # Export / Import reweighting configuration (index_weights export only)
        try:
            export_weights_yaml = yaml.dump(
                weights_cfg if weights_cfg else {}, default_flow_style=False, sort_keys=False
            )
        except Exception:
            export_weights_yaml = ""

        col_exp, col_imp = st.columns(2)
        with col_exp:
            st.download_button(
                "⬇️ Export Reweighting Config",
                data=export_weights_yaml,
                file_name="index_weights_export.yml",
                mime="text/yaml",
                key="export_index_weights_bottom",
            )
        with col_imp:
            upload_weights = st.file_uploader(
                "⬆️ Import Reweighting Config", type=["yml", "yaml"], key="upload_index_weights_bottom"
            )
            if upload_weights is not None:
                try:
                    raw = upload_weights.read()
                    txt = raw.decode("utf-8")
                    parsed = yaml.safe_load(txt)
                    if not isinstance(parsed, dict):
                        st.error("Imported weights must be a YAML mapping")
                    else:
                        if st.button("Import and overwrite weights"):
                            # Validate parsed config
                            ok, errs = validate_weights_config(parsed)
                            if not ok:
                                for e in errs:
                                    st.error(e)
                            else:
                                os.makedirs(weights_path.parent, exist_ok=True)
                                with open(weights_path, "w") as wf:
                                    yaml.dump(parsed, wf, default_flow_style=False, sort_keys=False)
                                st.success("Imported index_weights.yaml")
                                safe_rerun()
                except Exception as e:
                    show_error("Failed to parse uploaded weights YAML.", e, hint="Ensure file is valid YAML")

    elif page == "📚 Repository Management":
        st.header("📚 Repository Management")

        # Create tabs for repositories and articles
        tab1, tab2 = st.tabs(["📁 Repositories", "📄 Articles"])

        with tab1:
            st.subheader("GitHub Repositories")

            # (Reweighting configuration moved to the Search page)

            # Load current config
            config = load_config()

            # Add new repository
            st.markdown("#### Add New Repository")
            with st.form("add_repo"):
                col1, col2 = st.columns(2)
                with col1:
                    category = st.text_input("Category:", placeholder="e.g., microlensing_tools")
                    repo_name = st.text_input("Repository Name:", placeholder="e.g., MulensModel")
                with col2:
                    repo_url = st.text_input("Repository URL:", placeholder="https://github.com/user/repo.git")
                    description = st.text_input("Description (optional):", placeholder="Brief description")

                if st.form_submit_button("➕ Add Repository"):
                    if category and repo_name and repo_url:
                        try:
                            if category not in config:
                                config[category] = []

                            new_repo = {"name": repo_name, "url": repo_url}
                            if description:
                                new_repo["description"] = description

                            config[category].append(new_repo)
                            save_config(config)
                            st.success(f"Added {repo_name} to {category}")
                            safe_rerun()
                        except Exception as e:
                            show_error(
                                "Failed to add repository.",
                                e,
                                hint="Check file permissions and YAML validity for config/repositories.yml",
                            )
                    else:
                        st.error("Please fill in category, name, and URL")

            # Export / Import configuration
            st.markdown("#### Export / Import Configuration")
            try:
                export_yaml = yaml.dump(config if config else {}, default_flow_style=False, sort_keys=False)
            except Exception:
                export_yaml = ""

            st.download_button(
                label="⬇️ Export Repositories Config",
                data=export_yaml,
                file_name="repositories_export.yml",
                mime="text/yaml",
                key="export_repositories",
            )

            uploaded = st.file_uploader("⬆️ Import Repositories Config (YAML)", type=["yml", "yaml"], key="upload_repos")
            if uploaded is not None:
                try:
                    raw = uploaded.read()
                    text = raw.decode("utf-8")
                    parsed = yaml.safe_load(text)

                    if not isinstance(parsed, dict):
                        st.error("Imported file is not a valid repositories mapping (expected YAML mapping).")
                    else:
                        st.info("Preview of imported config (first 1000 chars):")
                        st.code(text[:1000])
                        if st.button("Import and overwrite repositories config"):
                            save_config(parsed)
                            st.success("Repositories configuration imported successfully.")
                            safe_rerun()
                except Exception as e:
                    show_error(
                        "Failed to parse uploaded repositories YAML.",
                        e,
                        hint="Ensure the file is valid YAML and not too large",
                    )

            # Display current repositories
            st.markdown("#### Current Repositories")
            if config:
                for category, repos in config.items():
                    st.write(f"**{category}**")
                    for repo in repos:
                        col1, col2, col3 = st.columns([3, 2, 1])
                        with col1:
                            st.write(f"• {repo['name']}")
                        with col2:
                            st.write(repo.get("description", ""))
                        with col3:
                            if st.button("🗑️", key=f"delete_repo_{category}_{repo['name']}"):
                                try:
                                    config[category] = [r for r in config[category] if r["name"] != repo["name"]]
                                    if not config[category]:
                                        del config[category]
                                    save_config(config)
                                    safe_rerun()
                                except Exception as e:
                                    show_error(
                                        "Failed to delete repository.",
                                        e,
                                        hint="Ensure the config file is writable and valid YAML",
                                    )
            else:
                st.info("No repositories configured yet.")

        with tab2:
            st.subheader("PDF Articles")

            # Load current articles config
            articles_config = load_articles_config()

            # Add new article
            st.markdown("#### Add New Article")
            with st.form("add_article"):
                col1, col2 = st.columns(2)
                with col1:
                    article_category = st.text_input(
                        "Category:",
                        placeholder="e.g., foundational_papers",
                        key="article_category",
                    )
                    article_name = st.text_input(
                        "Article Name:",
                        placeholder="e.g., Paczynski_1986_microlensing",
                        key="article_name",
                    )
                with col2:
                    article_url = st.text_input(
                        "Article URL:",
                        placeholder="https://arxiv.org/pdf/paper.pdf",
                        key="article_url",
                    )
                    article_description = st.text_input(
                        "Description:",
                        placeholder="Brief description of the article",
                        key="article_description",
                    )

                if st.form_submit_button("➕ Add Article"):
                    if article_category and article_name and article_url:
                        try:
                            if article_category not in articles_config:
                                articles_config[article_category] = []

                            # Check if article already exists
                            existing = [a for a in articles_config[article_category] if a.get("name") == article_name]
                            if existing:
                                st.error(f"Article '{article_name}' already exists in category '{article_category}'")
                            else:
                                new_article = {"name": article_name, "url": article_url}
                                if article_description:
                                    new_article["description"] = article_description

                                articles_config[article_category].append(new_article)
                                save_articles_config(articles_config)
                                st.success(f"Added article '{article_name}' to category '{article_category}'")
                                safe_rerun()
                        except Exception as e:
                            show_error(
                                "Failed to add article.",
                                e,
                                hint="Check file permissions and YAML validity for config/articles.yml",
                            )
                    else:
                        st.error("Please fill in category, name, and URL")

            # Display current articles
            st.markdown("#### Current Articles")
            if articles_config:
                for category, articles in articles_config.items():
                    st.write(f"**{category}**")
                    for article in articles:
                        col1, col2, col3 = st.columns([3, 2, 1])
                        with col1:
                            st.write(f"• {article['name']}")
                            if "url" in article:
                                st.markdown(f"  [{article['url']}]({article['url']})")
                        with col2:
                            st.write(article.get("description", ""))
                        with col3:
                            if st.button("🗑️", key=f"delete_article_{category}_{article['name']}"):
                                try:
                                    articles_config[category] = [
                                        a for a in articles_config[category] if a["name"] != article["name"]
                                    ]
                                    if not articles_config[category]:
                                        del articles_config[category]
                                    save_articles_config(articles_config)
                                    safe_rerun()
                                except Exception as e:
                                    show_error(
                                        "Failed to delete article.",
                                        e,
                                        hint="Ensure config/articles.yml is writable and valid YAML",
                                    )
            else:
                st.info("No articles configured yet.")

            # Export / Import articles configuration
            st.markdown("#### Export / Import Articles Configuration")
            try:
                export_articles_yaml = yaml.dump(
                    articles_config if articles_config else {}, default_flow_style=False, sort_keys=False
                )
            except Exception:
                export_articles_yaml = ""

            st.download_button(
                label="⬇️ Export Articles Config",
                data=export_articles_yaml,
                file_name="articles_export.yml",
                mime="text/yaml",
                key="export_articles",
            )

            uploaded_articles = st.file_uploader(
                "⬆️ Import Articles Config (YAML)", type=["yml", "yaml"], key="upload_articles"
            )
            if uploaded_articles is not None:
                try:
                    raw = uploaded_articles.read()
                    text = raw.decode("utf-8")
                    parsed = yaml.safe_load(text)

                    if not isinstance(parsed, dict):
                        st.error("Imported file is not a valid articles mapping (expected YAML mapping).")
                    else:
                        st.info("Preview of imported articles config (first 1000 chars):")
                        st.code(text[:1000])
                        if st.button("Import and overwrite articles config"):
                            save_articles_config(parsed)
                            st.success("Articles configuration imported successfully.")
                            safe_rerun()
                except Exception as e:
                    show_error(
                        "Failed to parse uploaded articles YAML.",
                        e,
                        hint="Ensure the file is valid YAML and not too large",
                    )

    elif page == "🏗️ Build Knowledge Base":
        st.header("🏗️ Build Knowledge Base")

        col1, col2 = st.columns(2)
        with col1:
            force_update = st.checkbox("Force update existing repositories")
            include_articles = st.checkbox("Include PDF articles (if configured)")

        with col2:
            st.info(
                "**Build Options:**\n- Force update: Re-downloads all repositories\n- Include articles: Downloads PDFs from articles.yml"
            )

        if st.button("🚀 Start Build"):
            st.info("Starting build — streaming output below. This may take several minutes.")

            # Build command (same as run_build_command)
            cmd = [
                sys.executable,
                str(package_root / "scripts" / "build_knowledge_base.py"),
                "--config",
                "config/repositories.yml",
                "--embeddings-path",
                "knowledge_base/embeddings",
            ]
            if include_articles and Path("config/articles.yml").exists():
                cmd.extend(["--articles-config", "config/articles.yml"])
            if force_update:
                cmd.append("--force-update")

            # Run subprocess and stream stdout to the UI with a progress bar
            process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                text=True,
                cwd=package_root,
            )

            log_box = st.empty()
            progress = st.progress(0)

            lines = []
            progress_val = 0

            # Read lines as they become available and update UI
            try:
                while True:
                    line = process.stdout.readline()
                    if line == "" and process.poll() is not None:
                        break
                    if line:
                        # Append and keep recent output trimmed
                        lines.append(line)
                        if len(lines) > 500:
                            lines = lines[-500:]

                        # Detect structured progress markers emitted by the build script
                        stripped = line.strip()
                        if stripped.startswith("PROGRESS_JSON:"):
                            try:
                                payload = stripped.split("PROGRESS_JSON:", 1)[1].strip()
                                obj = __import__("json").loads(payload)
                                pct = int(obj.get("percent", 0))
                                progress.progress(max(progress_val, min(100, pct)))
                                progress_val = max(progress_val, min(100, pct))
                                # Optionally include stage detail in log
                                lines.append(f"[progress] {obj.get('stage', '')}: {obj.get('detail', '')}\n")
                            except Exception:
                                pass
                        else:
                            # Heuristic fallback increment if no structured progress seen
                            progress_val = min(100, progress_val + 1)
                            progress.progress(progress_val)

                        # Update log area
                        log_box.text("".join(lines[-200:]))

                returncode = process.poll()
            except Exception as e:
                process.kill()
                st.error(f"Build process failed: {e}")
                return

            # Finalize progress and show results
            progress.progress(100)
            if returncode == 0:
                st.success("✅ Knowledge base built successfully!")
                if lines:
                    with st.expander("Build Output"):
                        st.text("".join(lines))
            else:
                st.error(f"❌ Build failed (exit code {returncode})")
                if lines:
                    with st.expander("Build Output"):
                        st.text("".join(lines))

    elif page == "📊 Status":
        st.header("📊 System Status")

        # Check if embeddings exist
        embeddings_path = Path("knowledge_base/embeddings")
        config_path = Path("config/repositories.yml")
        articles_path = Path("config/articles.yml")
        weights_path = Path("config/weights.yaml")

        col1, col2, col3 = st.columns(3)

        with col1:
            st.subheader("Files")
            st.write("📁 Embeddings:", "✅" if embeddings_path.exists() else "❌")
            st.write("⚙️ Repositories Config:", "✅" if config_path.exists() else "❌")
            st.write("📄 Articles Config:", "✅" if articles_path.exists() else "❌")
            st.write("⚖️ Weights:", "✅" if weights_path.exists() else "❌")

        with col2:
            st.subheader("Knowledge Base")
            if embeddings_path.exists():
                try:
                    # Try to count files in embeddings
                    index_files = list(embeddings_path.glob("**/*"))
                    st.write(f"📄 Index files: {len(index_files)}")
                except Exception:
                    st.write("📄 Index files: Unknown")
            else:
                st.write("📄 Index files: No embeddings found")

        with col3:
            st.subheader("Configuration")
            config = load_config()
            articles_config = load_articles_config()

            total_repos = sum(len(repos) for repos in config.values()) if config else 0
            total_articles = sum(len(articles) for articles in articles_config.values()) if articles_config else 0

            st.write(f"📚 Total repositories: {total_repos}")
            st.write(f"📄 Total articles: {total_articles}")
            st.write(f"📁 Repository categories: {len(config) if config else 0}")
            st.write(f"📁 Article categories: {len(articles_config) if articles_config else 0}")

    # Footer
    st.markdown("---")
    st.markdown("*Nancy Brain - AI-powered knowledge base for research*")

safe_rerun()

Try to call Streamlit's experimental rerun; fallback to an instruction message if unavailable.

Source code in nancy_brain/admin_ui.py
def safe_rerun():
    """Try to call Streamlit's experimental rerun; fallback to an instruction message if unavailable."""
    try:
        # Some Streamlit versions provide experimental_rerun, others may not
        getattr(st, "experimental_rerun")()
    except Exception:
        try:
            # Newer API may expose experimental functions under runtime; try best-effort
            st.info("Please refresh the page to apply changes.")
        except Exception:
            # Silently ignore when not running in Streamlit
            pass

save_articles_config(config, config_path='config/articles.yml')

Save articles configuration.

Source code in nancy_brain/admin_ui.py
def save_articles_config(config: dict, config_path: str = "config/articles.yml"):
    """Save articles configuration."""
    try:
        os.makedirs(os.path.dirname(config_path), exist_ok=True)
        with open(config_path, "w") as f:
            yaml.dump(config, f, default_flow_style=False, sort_keys=False)
    except Exception as e:
        raise RuntimeError(f"Failed to save articles config to {config_path}: {e}") from e

save_config(config, config_path='config/repositories.yml')

Save repository configuration.

Source code in nancy_brain/admin_ui.py
def save_config(config: dict, config_path: str = "config/repositories.yml"):
    """Save repository configuration."""
    try:
        os.makedirs(os.path.dirname(config_path), exist_ok=True)
        with open(config_path, "w") as f:
            yaml.dump(config, f, default_flow_style=False)
    except Exception as e:
        raise RuntimeError(f"Failed to save repositories config to {config_path}: {e}") from e

show_error(message, exc=None, hint=None)

Display a user-friendly error with optional exception details and a hint.

Source code in nancy_brain/admin_ui.py
def show_error(message: str, exc: Exception = None, hint: str = None):
    """Display a user-friendly error with optional exception details and a hint."""
    logging.exception(message)
    st.error(message)
    if hint:
        st.info(f"Hint: {hint}")
    if exc is not None:
        with st.expander("Error details"):
            st.text(traceback.format_exc())

CLI Interface

Command-line interface for Nancy Brain operations.

nancy_brain.cli

Nancy Brain CLI interface.

add_article(article_url, article_name, category, description)

Add a PDF article to the configuration.

Source code in nancy_brain/cli.py
@cli.command()
@click.argument("article_url")
@click.argument("article_name")
@click.option("--category", default="articles", help="Category to add article to")
@click.option("--description", help="Description of the article")
def add_article(article_url, article_name, category, description):
    """Add a PDF article to the configuration."""
    config_file = Path("config/articles.yml")

    # Create articles config if it doesn't exist
    if not config_file.exists():
        config_file.parent.mkdir(parents=True, exist_ok=True)
        articles_config = {}
    else:
        try:
            with open(config_file, "r") as f:
                articles_config = yaml.safe_load(f) or {}
        except Exception as e:
            click.echo(f"❌ Error reading {config_file}: {e}")
            return

    # Add category if it doesn't exist
    if category not in articles_config:
        articles_config[category] = []

    # Create article entry
    article_entry = {"name": article_name, "url": article_url}

    if description:
        article_entry["description"] = description

    # Check if article already exists
    existing = [a for a in articles_config[category] if a.get("name") == article_name]
    if existing:
        click.echo(f"❌ Article '{article_name}' already exists in category '{category}'")
        return

    # Add the new article
    articles_config[category].append(article_entry)

    # Write back to file
    try:
        with open(config_file, "w") as f:
            yaml.dump(articles_config, f, default_flow_style=False, sort_keys=False)

        click.echo(f"✅ Added article '{article_name}' to category '{category}'")
        click.echo(f"📝 Run 'nancy-brain build --articles-config {config_file}' to index the new article")

    except Exception as e:
        click.echo(f"❌ Error writing to {config_file}: {e}")

add_repo(repo_url, category)

Add a repository to the configuration.

Source code in nancy_brain/cli.py
@cli.command()
@click.argument("repo_url")
@click.option("--category", default="tools", help="Category to add repo to")
def add_repo(repo_url, category):
    """Add a repository to the configuration."""
    config_file = Path("config/repositories.yml")
    if not config_file.exists():
        click.echo("❌ No config/repositories.yml found. Run 'nancy-brain init' first.")
        return

    # Parse repo name from URL
    repo_name = repo_url.split("/")[-1].replace(".git", "")

    # Load existing config
    try:
        with open(config_file, "r") as f:
            config = yaml.safe_load(f) or {}
    except Exception as e:
        click.echo(f"❌ Error reading {config_file}: {e}")
        return

    # Add category if it doesn't exist
    if category not in config:
        config[category] = []

    # Create repo entry
    repo_entry = {"name": repo_name, "url": repo_url}

    # Check if repo already exists
    existing = [r for r in config[category] if r.get("name") == repo_name]
    if existing:
        click.echo(f"❌ Repository '{repo_name}' already exists in category '{category}'")
        return

    # Add the new repository
    config[category].append(repo_entry)

    # Write back to file
    try:
        with open(config_file, "w") as f:
            yaml.dump(config, f, default_flow_style=False, sort_keys=False)

        click.echo(f"✅ Added {repo_name} to {category} category")
        click.echo("📝 Run 'nancy-brain build --force-update' to fetch the new repository")

    except Exception as e:
        click.echo(f"❌ Error writing to {config_file}: {e}")

build(config, articles_config, embeddings_path, force_update, dry_run, dirty, summaries, batch_size, max_docs, category)

Build the knowledge base from configured repositories.

The build command validates config/repositories.yml (and config/articles.yml if provided) before starting. If validation fails, the command prints detailed errors and exits with a non-zero status.

Source code in nancy_brain/cli.py
@cli.command()
@click.option("--config", default="config/repositories.yml", help="Repository config file")
@click.option("--articles-config", help="PDF articles config file")
@click.option(
    "--embeddings-path",
    default="knowledge_base/embeddings",
    help="Embeddings output path",
)
@click.option("--force-update", is_flag=True, help="Force update all repositories")
@click.option("--dry-run", is_flag=True, help="Show what would be done without executing the build")
@click.option("--dirty", is_flag=True, help="Leave raw repos and PDFs in place after build (don't cleanup)")
@click.option(
    "--summaries/--no-summaries",
    default=None,
    help="Generate Gemini summaries during build (defaults to ENABLE_DOC_SUMMARIES env)",
)
@click.option(
    "--batch-size",
    default=0,
    type=int,
    help="Index documents in batches (requires embeddings.upsert support). 0 = disable batching.",
)
@click.option(
    "--max-docs",
    default=0,
    type=int,
    help="Stop after indexing this many document chunks (for testing / limiting resource use). 0 = no limit.",
)
@click.option(
    "--category",
    help="Limit build to a single repository category (as defined in repositories.yml)",
)
def build(
    config, articles_config, embeddings_path, force_update, dry_run, dirty, summaries, batch_size, max_docs, category
):
    """Build the knowledge base from configured repositories.

    The build command validates `config/repositories.yml` (and `config/articles.yml`
    if provided) before starting. If validation fails, the command prints
    detailed errors and exits with a non-zero status.
    """
    click.echo("🏗️  Building knowledge base...")

    # Convert paths to absolute paths relative to current working directory
    config_path = Path.cwd() / config
    embeddings_path = Path.cwd() / embeddings_path

    # Pre-validate repository config to provide immediate feedback
    if not config_path.exists():
        click.echo(f"❌ Repository config not found: {config_path}")
        sys.exit(2)
    try:
        with open(config_path, "r", encoding="utf-8") as f:
            cfg = yaml.safe_load(f) or {}
    except Exception as e:
        click.echo(f"❌ Failed to read repository config: {e}")
        sys.exit(2)
    ok, errors = validate_repositories_config(cfg)
    if not ok:
        click.echo("❌ repositories.yml validation failed:")
        for err in errors:
            click.echo(f"  - {err}")
        sys.exit(2)

    # Build command arguments
    cmd = [
        sys.executable,
        str(package_root / "scripts" / "build_knowledge_base.py"),
        "--config",
        str(config_path),
        "--embeddings-path",
        str(embeddings_path),
    ]
    if articles_config:
        articles_config_path = Path.cwd() / articles_config
        # Validate articles config as well
        if not articles_config_path.exists():
            click.echo(f"❌ Articles config not found: {articles_config_path}")
            sys.exit(2)
        try:
            with open(articles_config_path, "r", encoding="utf-8") as f:
                a_cfg = yaml.safe_load(f) or {}
        except Exception as e:
            click.echo(f"❌ Failed to read articles config: {e}")
            sys.exit(2)
        ok2, errs2 = validate_articles_config(a_cfg)
        if not ok2:
            click.echo("❌ articles.yml validation failed:")
            for err in errs2:
                click.echo(f"  - {err}")
            sys.exit(2)
        cmd.extend(["--articles-config", str(articles_config_path)])
    if force_update:
        cmd.append("--force-update")
    if dirty:
        cmd.append("--dirty")
    if summaries is True:
        cmd.append("--summaries")
    elif summaries is False:
        cmd.append("--no-summaries")
    if batch_size and batch_size > 0:
        cmd.extend(["--batch-size", str(batch_size)])
    if max_docs and max_docs > 0:
        cmd.extend(["--max-docs", str(max_docs)])
    if category:
        cmd.extend(["--category", category])

    # If dry-run requested, still run the underlying script with --dry-run so that
    # repository cloning/downloading/indexing intentions and validation summaries
    # are produced by the central pipeline logic rather than a hollow preview.
    if dry_run:
        cmd.append("--dry-run")
        if RICH_AVAILABLE:
            _console.print("[yellow]🔎 Dry run: executing pipeline in no-op mode[/yellow]")
        else:
            click.echo(click.style("🔎 Dry run: executing pipeline in no-op mode", fg="yellow"))

    # Run the build script from the package directory
    try:
        if RICH_AVAILABLE:
            with _console.status("Building knowledge base...", spinner="dots"):
                result = subprocess.run(cmd, check=True)
        else:
            result = subprocess.run(cmd, check=True)

        success_msg = "✅ Knowledge base built successfully!"
        if RICH_AVAILABLE:
            _console.print(f"[green]{success_msg}[/green]")
        else:
            click.echo(click.style(success_msg, fg="green"))
    except subprocess.CalledProcessError as e:
        err_msg = f"❌ Build failed with exit code {e.returncode}"
        if RICH_AVAILABLE:
            _console.print(f"[red]{err_msg}[/red]")
        else:
            click.echo(click.style(err_msg, fg="red"))
        sys.exit(e.returncode)

cli()

Nancy Brain - Turn GitHub repos into AI-searchable knowledge bases.

Source code in nancy_brain/cli.py
@click.group()
@click.version_option(version=__version__)
def cli():
    """Nancy Brain - Turn GitHub repos into AI-searchable knowledge bases."""
    pass

explore(embeddings_path, config, weights, prefix, max_depth, max_entries)

Explore the knowledge base document tree structure.

Source code in nancy_brain/cli.py
@cli.command()
@click.option("--embeddings-path", default="knowledge_base/embeddings", help="Embeddings path")
@click.option("--config", default="config/repositories.yml", help="Config path")
@click.option("--weights", default="config/weights.yaml", help="Weights path")
@click.option("--prefix", default="", help="Path prefix to filter results")
@click.option("--max-depth", default=3, help="Maximum depth to traverse")
@click.option("--max-entries", default=100, help="Maximum number of entries to show")
def explore(embeddings_path, config, weights, prefix, max_depth, max_entries):
    """Explore the knowledge base document tree structure."""
    import asyncio

    async def do_explore():
        # Convert paths to absolute paths relative to current working directory
        embeddings_path_abs = Path.cwd() / embeddings_path
        config_path_abs = Path.cwd() / config
        weights_path_abs = Path.cwd() / weights

        # Lazy import RAGService to avoid heavy imports during help/tests
        # If embeddings index doesn't exist, short-circuit without importing heavy deps
        try:
            if not (embeddings_path_abs.exists() and (embeddings_path_abs / "index").exists()):
                click.echo("No documents found. Embeddings index missing.")
                click.echo(
                    "Tip: run 'nancy-brain build' to create the index, or use --embeddings-path to point to an existing one."
                )
                return
        except Exception:
            click.echo("No documents found. Embeddings index missing or unreadable.")
            click.echo(
                "Tip: run 'nancy-brain build' to create the index, or use --embeddings-path to point to an existing one."
            )
            return

        try:
            from rag_core.service import RAGService
        except Exception:
            click.echo("No documents found.")
            return

        try:
            service = RAGService(
                embeddings_path=embeddings_path_abs,
                config_path=config_path_abs,
                weights_path=weights_path_abs,
            )
            if RICH_AVAILABLE:
                with _console.status("Loading document tree...", spinner="dots"):
                    results = await service.list_tree(prefix=prefix, depth=max_depth, max_entries=max_entries)
            else:
                results = await service.list_tree(prefix=prefix, depth=max_depth, max_entries=max_entries)
        except Exception:
            click.echo("No documents found.")
            return

        if not results:
            click.echo("No documents found.")
            return

        if RICH_AVAILABLE:
            _console.print(f"[bold]📁 Document tree (prefix: '{prefix}', depth: {max_depth}):[/bold]")
            _console.print()
            for entry in results:
                path = entry.get("path", "unknown")
                name = path.split("/")[-1] if "/" in path else path
                entry_type = "📁" if entry.get("type") == "directory" else "📄"

                # Add trailing slash for directories
                if entry.get("type") == "directory":
                    name += "/"

                # Calculate simple indentation based on path depth
                depth = path.count("/") if path != "unknown" else 0
                indent = "  " * depth

                _console.print(f"{indent}{entry_type} [bold]{name}[/bold]")

                # Show document ID for files
                if entry.get("type") == "file" and "doc_id" in entry:
                    doc_id = entry.get("doc_id")
                    if doc_id != path:  # Only show if different from path
                        _console.print(f"{indent}   → [cyan]{doc_id}[/cyan]")
        else:
            click.echo(f"📁 Document tree (prefix: '{prefix}', depth: {max_depth}):")
            click.echo()
            for entry in results:
                path = entry.get("path", "unknown")
                name = path.split("/")[-1] if "/" in path else path
                entry_type = "📁" if entry.get("type") == "directory" else "📄"

                # Add trailing slash for directories
                if entry.get("type") == "directory":
                    name += "/"

                # Calculate simple indentation based on path depth
                depth = path.count("/") if path != "unknown" else 0
                indent = "  " * depth

                click.echo(f"{indent}{entry_type} {name}")

                # Show document ID for files
                if entry.get("type") == "file" and "doc_id" in entry:
                    doc_id = entry.get("doc_id")
                    if doc_id != path:  # Only show if different from path
                        click.echo(f"{indent}{doc_id}")

    # Run the async explore
    try:
        asyncio.run(do_explore())
    except Exception as e:
        click.echo(f"❌ Explore failed: {e}")
        sys.exit(1)

init(project_name)

Initialize a new Nancy Brain project.

This command creates a minimal config/ directory with a repositories.yml file to get you started. Edit the file and then run nancy-brain build.

Source code in nancy_brain/cli.py
@cli.command()
@click.argument("project_name")
def init(project_name):
    """Initialize a new Nancy Brain project.

    This command creates a minimal `config/` directory with a `repositories.yml`
    file to get you started. Edit the file and then run `nancy-brain build`.
    """
    project_path = Path(project_name)
    project_path.mkdir(exist_ok=True)

    # Create basic config structure
    config_dir = project_path / "config"
    config_dir.mkdir(exist_ok=True)

    # Basic repositories.yml
    repos_config = config_dir / "repositories.yml"
    repos_config.write_text(
        """# Add your repositories here
# example_tools:
#   - name: example-repo
#     url: https://github.com/org/example-repo.git
"""
    )

    click.echo(f"✅ Initialized Nancy Brain project in {project_name}/")
    click.echo(f"📝 Edit {repos_config} to add repositories")
    click.echo("🏗️  Run 'nancy-brain build' to create the knowledge base")

search(query, limit, embeddings_path, config, weights)

Search the knowledge base.

Source code in nancy_brain/cli.py
@cli.command()
@click.argument("query")
@click.option("--limit", default=5, help="Number of results")
@click.option("--embeddings-path", default="knowledge_base/embeddings", help="Embeddings path")
@click.option("--config", default="config/repositories.yml", help="Config path")
@click.option("--weights", default="config/weights.yaml", help="Weights path")
def search(query, limit, embeddings_path, config, weights):
    """Search the knowledge base."""
    import asyncio

    async def do_search():
        # Convert paths to absolute paths relative to current working directory
        embeddings_path_abs = Path.cwd() / embeddings_path
        config_path_abs = Path.cwd() / config
        weights_path_abs = Path.cwd() / weights

        # Lazy import to avoid heavy imports during help tests
        # If embeddings index doesn't exist, short-circuit without importing heavy deps
        try:
            if not (embeddings_path_abs.exists() and (embeddings_path_abs / "index").exists()):
                click.echo("No results found. Embeddings index missing.")
                click.echo(
                    "Tip: run 'nancy-brain build' to create the index, or use --embeddings-path to point to an existing one."
                )
                return
        except Exception:
            click.echo("No results found. Embeddings index missing or unreadable.")
            click.echo(
                "Tip: run 'nancy-brain build' to create the index, or use --embeddings-path to point to an existing one."
            )
            return

        try:
            from rag_core.service import RAGService
        except Exception:
            # If RAGService or its dependencies aren't available, behave like
            # an empty index: print no results and return success. This keeps
            # CLI tests stable in minimal environments.
            click.echo("No results found.")
            return

        try:
            service = RAGService(
                embeddings_path=embeddings_path_abs,
                config_path=config_path_abs,
                weights_path=weights_path_abs,
            )
            if RICH_AVAILABLE:
                with _console.status("Searching...", spinner="dots"):
                    results = await service.search_docs(query, limit=limit)
            else:
                results = await service.search_docs(query, limit=limit)
        except Exception:
            click.echo("No results found.")
            return

        if not results:
            click.echo("No results found.")
            return

        # Present results using rich when available
        if RICH_AVAILABLE:
            table = Table(show_header=True, header_style="bold magenta")
            table.add_column("#", width=4)
            table.add_column("Document", overflow="fold")
            table.add_column("Score", width=8, justify="right")
            table.add_column("Snippet", overflow="fold")
            for i, result in enumerate(results, 1):
                doc_id = result.get("id", "<unknown>")
                score = result.get("score", 0.0)
                snippet = (result.get("text", "") or "").strip().replace("\n", " ")[:300]
                github_url = result.get("github_url") or None
                try:
                    if github_url:
                        # Show the GitHub blob URL as a clickable link
                        doc_text = Text.assemble((doc_id, "link:" + github_url))
                    else:
                        doc_text = Text(doc_id)
                except Exception:
                    doc_text = Text(doc_id)
                table.add_row(str(i), doc_text, f"{score:.3f}", snippet + ("..." if len(snippet) == 300 else ""))
            _console.print(table)
        else:
            for i, result in enumerate(results, 1):
                click.echo(f"\n{i}. {result['id']} (score: {result['score']:.3f})")
                click.echo(f"   {result['text'][:200]}...")

    # Run the async search
    try:
        asyncio.run(do_search())
    except Exception as e:
        click.echo(f"❌ Search failed: {e}")
        sys.exit(1)

serve(host, port)

Start the HTTP API server.

Source code in nancy_brain/cli.py
@cli.command()
@click.option("--host", default="127.0.0.1", help="Host to bind to")
@click.option("--port", default=8000, help="Port to bind to")
def serve(host, port):
    """Start the HTTP API server."""
    try:
        import uvicorn
    except ImportError:
        click.echo("❌ uvicorn not installed. Install with: pip install uvicorn")
        return

    click.echo(f"🚀 Starting Nancy Brain server on {host}:{port}")

    # Add package root to Python path for imports
    sys.path.insert(0, str(package_root))

    # Use the app from the package
    uvicorn.run("connectors.http_api.app:app", host=host, port=port, reload=False)

ui(port)

Launch the web admin interface.

Source code in nancy_brain/cli.py
@cli.command()
@click.option("--port", default=8501, help="Port to run Streamlit on")
def ui(port):
    """Launch the web admin interface."""
    try:
        import streamlit
    except ImportError:
        click.echo("❌ Streamlit not installed. Install with: pip install streamlit")
        return

    ui_script = package_root / "nancy_brain" / "admin_ui.py"
    click.echo(f"🌐 Starting Nancy Brain Admin UI on port {port}")
    click.echo(f"🔗 Open http://localhost:{port} in your browser")

    # Use subprocess to run streamlit
    cmd = [
        "streamlit",
        "run",
        str(ui_script),
        "--server.port",
        str(port),
        "--server.headless",
        "true",
        "--browser.gatherUsageStats",
        "false",
    ]

    try:
        subprocess.run(cmd, check=True)
    except subprocess.CalledProcessError as e:
        click.echo(f"❌ Failed to start Streamlit: {e}")
    except FileNotFoundError:
        click.echo("❌ Streamlit command not found. Try: pip install streamlit")

Note: For detailed CLI command documentation, see CLI Commands which includes auto-generated command reference.

HTTP API

For HTTP API endpoints, please see: - FastAPI app: connectors/http_api/app.py - API schemas: connectors/http_api/schemas.py

MCP Server

For Model Context Protocol server implementation, please see: - MCP server: connectors/mcp_server/server.py

API documentation for connectors will be added in a future update.