1use super::*;
2use crate::discovery::{INITIAL_PUBLISH_DELAY, REPUBLISH_DELAY};
3use bytes::Bytes;
4#[cfg(feature = "fuse")]
5use fuse_mt::spawn_mount;
6use iroh::protocol::ProtocolHandler;
7use iroh_blobs::store::Store;
8use iroh_docs::Author;
9use log::{error, info};
10#[cfg(feature = "fuse")]
11use miette::IntoDiagnostic;
12use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
13#[cfg(feature = "fuse")]
14use std::collections::HashMap;
15use std::path::PathBuf;
16#[cfg(feature = "fuse")]
17use std::sync::Arc;
18#[cfg(feature = "fuse")]
19use std::sync::RwLock;
20#[cfg(feature = "fuse")]
21use tokio::runtime::Handle;
22use tokio::sync::watch::{self};
23
24impl OkuFs {
25 pub async fn get_author(&self) -> anyhow::Result<Author> {
31 let default_author_id = self.default_author().await;
32
33 self.docs
34 .client()
35 .authors()
36 .export(default_author_id)
37 .await
38 .ok()
39 .flatten()
40 .ok_or(anyhow::anyhow!(
41 "Missing private key for default author ({}).",
42 crate::fs::util::fmt_short(default_author_id)
43 ))
44 }
45
46 pub async fn start(#[cfg(feature = "fuse")] handle: &Handle) -> anyhow::Result<Self> {
57 let endpoint = iroh::Endpoint::builder()
58 .discovery_n0()
59 .discovery_dht()
60 .discovery_local_network()
61 .bind()
62 .await?;
63 let blobs = iroh_blobs::net_protocol::Blobs::persistent(NODE_PATH.clone())
64 .await?
65 .build(&endpoint);
66 let gossip = iroh_gossip::net::Gossip::builder()
67 .spawn(endpoint.clone())
68 .await?;
69 let docs = iroh_docs::protocol::Docs::persistent(NODE_PATH.clone())
70 .spawn(&blobs, &gossip)
71 .await?;
72 let blobs_clone = blobs.clone();
73 let willow = iroh_willow::Engine::spawn(
74 endpoint.clone(),
75 move || {
76 iroh_willow::store::persistent::Store::new(
77 NODE_PATH.clone(),
78 blobs_clone.store().clone(),
79 )
80 .unwrap()
81 },
82 Default::default(),
83 );
84
85 let router = iroh::protocol::Router::builder(endpoint.clone())
86 .accept(iroh_blobs::ALPN, blobs.clone())
87 .accept(iroh_gossip::ALPN, gossip.clone())
88 .accept(iroh_docs::ALPN, docs.clone())
89 .spawn()
90 .await?;
91 info!(
92 "Default author ID is {} … ",
93 crate::fs::util::fmt_short(docs.client().authors().default().await.unwrap_or_default())
94 );
95
96 let (replica_sender, _replica_receiver) = watch::channel(());
97 let (okunet_fetch_sender, _okunet_fetch_receiver) = watch::channel(false);
98
99 let oku_fs = Self {
100 endpoint,
101 blobs,
102 docs,
103 willow,
104 router,
105 replica_sender,
106 okunet_fetch_sender,
107 #[cfg(feature = "fuse")]
108 fs_handles: Arc::new(RwLock::new(HashMap::new())),
109 #[cfg(feature = "fuse")]
110 newest_handle: Arc::new(RwLock::new(0)),
111 #[cfg(feature = "fuse")]
112 handle: handle.clone(),
113 dht: mainline::Dht::server()?.as_async(),
114 };
115 let oku_fs_clone = oku_fs.clone();
116 tokio::spawn(async move {
117 loop {
118 tokio::time::sleep(INITIAL_PUBLISH_DELAY).await;
119 match oku_fs_clone.announce_replicas().await {
120 Ok(_) => info!("Announced all replicas … "),
121 Err(e) => error!("{}", e),
122 }
123 match oku_fs_clone.refresh_users().await {
124 Ok(_) => info!("Refreshed OkuNet database … "),
125 Err(e) => error!("{}", e),
126 }
127 tokio::time::sleep(REPUBLISH_DELAY - INITIAL_PUBLISH_DELAY).await;
128 }
129 });
130 Ok(oku_fs.clone())
131 }
132
133 pub async fn shutdown(self) {
135 let _ = self.endpoint.close().await;
136 let _ = self.router.shutdown().await;
137 self.docs.shutdown().await;
138 self.blobs.shutdown().await;
139 self.blobs.store().shutdown().await;
140 }
141
142 pub async fn content_bytes(&self, entry: &iroh_docs::Entry) -> anyhow::Result<Bytes> {
152 self.content_bytes_by_hash(&entry.content_hash()).await
153 }
154
155 pub async fn content_bytes_by_hash(&self, hash: &iroh_blobs::Hash) -> anyhow::Result<Bytes> {
165 self.blobs.client().read_to_bytes(*hash).await
166 }
167
168 pub async fn get_oldest_timestamp(&self) -> miette::Result<u64> {
174 let replicas = self.list_replicas().await?;
175 let mut timestamps: Vec<u64> = Vec::new();
176 for (replica, _capability_kind) in replicas {
177 timestamps.push(
178 self.get_oldest_timestamp_in_folder(&replica, &PathBuf::from("/"))
179 .await?,
180 );
181 }
182 Ok(*timestamps.par_iter().min().unwrap_or(&u64::MIN))
183 }
184
185 pub async fn get_newest_timestamp(&self) -> miette::Result<u64> {
191 let replicas = self.list_replicas().await?;
192 let mut timestamps: Vec<u64> = Vec::new();
193 for (replica, _capability_kind) in replicas {
194 timestamps.push(
195 self.get_newest_timestamp_in_folder(&replica, &PathBuf::from("/"))
196 .await?,
197 );
198 }
199 Ok(*timestamps.par_iter().max().unwrap_or(&u64::MIN))
200 }
201
202 pub async fn get_size(&self) -> miette::Result<u64> {
208 let replicas = self.list_replicas().await?;
209 let mut size = 0;
210 for (replica, _capability_kind) in replicas {
211 size += self.get_folder_size(&replica, &PathBuf::from("/")).await?;
212 }
213 Ok(size)
214 }
215
216 #[cfg(feature = "fuse")]
217 pub fn mount(&self, path: PathBuf) -> miette::Result<fuser::BackgroundSession> {
227 spawn_mount(fuse_mt::FuseMT::new(self.clone(), 1), path, &[]).into_diagnostic()
228 }
229}