1use super::*;
2use crate::config::OkuFsConfig;
3use bytes::Bytes;
4use iroh::protocol::ProtocolHandler;
5use iroh_blobs::{store::fs::FsStore, BlobsProtocol};
6use iroh_docs::Author;
7use iroh_gossip::Gossip;
8use log::{error, info};
9#[cfg(feature = "fuse")]
10use miette::IntoDiagnostic;
11use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
12use std::path::PathBuf;
13#[cfg(feature = "fuse")]
14use tokio::runtime::Handle;
15use tokio::sync::watch::{self};
16
17impl OkuFs {
18 pub async fn get_author(&self) -> anyhow::Result<Author> {
24 let default_author_id = self.default_author().await;
25
26 self.docs
27 .author_export(default_author_id)
28 .await
29 .ok()
30 .flatten()
31 .ok_or(anyhow::anyhow!(
32 "Missing private key for default author ({}).",
33 crate::fs::util::fmt_short(default_author_id)
34 ))
35 }
36
37 pub async fn start(#[cfg(feature = "fuse")] handle: &Handle) -> anyhow::Result<Self> {
48 let mdns = iroh::address_lookup::mdns::MdnsAddressLookup::builder();
49 let dht_discovery = iroh::address_lookup::DhtAddressLookup::builder();
50
51 let endpoint = iroh::Endpoint::builder()
52 .address_lookup(mdns)
53 .address_lookup(dht_discovery)
54 .bind()
55 .await?;
56
57 let store = FsStore::load(NODE_PATH.clone()).await?;
58
59 let blobs = BlobsProtocol::new(&store, None);
60
61 let gossip = Gossip::builder().spawn(endpoint.clone());
62 let docs = iroh_docs::protocol::Docs::persistent(NODE_PATH.clone())
63 .spawn(endpoint.clone(), store.into(), gossip.clone())
64 .await?;
65
66 let router = iroh::protocol::Router::builder(endpoint.clone())
67 .accept(iroh_blobs::ALPN, blobs.clone())
68 .accept(iroh_gossip::ALPN, gossip.clone())
69 .accept(iroh_docs::ALPN, docs.clone())
70 .spawn();
71 info!(
72 "Default author ID is {} … ",
73 crate::fs::util::fmt_short(docs.author_default().await.unwrap_or_default())
74 );
75
76 let (replica_sender, _replica_receiver) = watch::channel(());
77 let (okunet_fetch_sender, _okunet_fetch_receiver) = watch::channel(false);
78
79 let oku_fs = Self {
80 endpoint,
81 blobs,
82 docs,
83 router,
84 replica_sender,
85 okunet_fetch_sender,
86 #[cfg(feature = "fuse")]
87 fuse_handler: DebugIgnore::from(Arc::new(DefaultFuseHandler::new())),
88 #[cfg(feature = "fuse")]
89 handle: handle.clone(),
90 dht: mainline::Dht::server()?.as_async(),
91 };
92 let oku_fs_clone = oku_fs.clone();
93 let config = OkuFsConfig::load_or_create_config().unwrap_or_default();
94 let republish_delay = config.get_republish_delay();
95 let initial_publish_delay = config.get_initial_publish_delay();
96
97 tokio::spawn(async move {
98 tokio::time::sleep(initial_publish_delay).await;
99 loop {
100 match oku_fs_clone.announce_replicas().await {
101 Ok(_) => info!("Announced all replicas … "),
102 Err(e) => error!("{}", e),
103 }
104 match oku_fs_clone.refresh_users().await {
105 Ok(_) => info!("Refreshed OkuNet database … "),
106 Err(e) => error!("{}", e),
107 }
108 tokio::time::sleep(republish_delay).await;
109 }
110 });
111 Ok(oku_fs.clone())
112 }
113
114 pub async fn shutdown(self) {
116 info!("Node shutting down … ");
117 self.endpoint.close().await;
118 if let Err(e) = self.router.shutdown().await {
119 error!("{e}");
120 }
121 self.docs.shutdown().await;
122 self.blobs.shutdown().await;
123 if let Err(e) = self.blobs.store().shutdown().await {
124 error!("{e}");
125 }
126 }
127
128 pub async fn content_bytes(&self, entry: &iroh_docs::Entry) -> anyhow::Result<Bytes> {
138 self.content_bytes_by_hash(&entry.content_hash()).await
139 }
140
141 pub async fn content_bytes_by_hash(&self, hash: &iroh_blobs::Hash) -> anyhow::Result<Bytes> {
151 self.blobs
152 .blobs()
153 .get_bytes(*hash)
154 .await
155 .map_err(|e| anyhow::anyhow!(e))
156 }
157
158 pub async fn get_oldest_timestamp(&self) -> miette::Result<u64> {
164 let replicas = self.list_replicas().await?;
165 let mut timestamps: Vec<u64> = Vec::new();
166 for (replica, _capability_kind, _is_home_replica) in replicas {
167 timestamps.push(
168 self.get_oldest_timestamp_in_folder(&replica, &PathBuf::from("/"))
169 .await?,
170 );
171 }
172 Ok(*timestamps.par_iter().min().unwrap_or(&u64::MIN))
173 }
174
175 pub async fn get_newest_timestamp(&self) -> miette::Result<u64> {
181 let replicas = self.list_replicas().await?;
182 let mut timestamps: Vec<u64> = Vec::new();
183 for (replica, _capability_kind, _is_home_replica) in replicas {
184 timestamps.push(
185 self.get_newest_timestamp_in_folder(&replica, &PathBuf::from("/"))
186 .await?,
187 );
188 }
189 Ok(*timestamps.par_iter().max().unwrap_or(&u64::MIN))
190 }
191
192 pub async fn get_size(&self) -> miette::Result<u64> {
198 let replicas = self.list_replicas().await?;
199 let mut size = 0;
200 for (replica, _capability_kind, _is_home_replica) in replicas {
201 size += self.get_folder_size(&replica, &PathBuf::from("/")).await?;
202 }
203 Ok(size)
204 }
205
206 #[cfg(feature = "fuse")]
207 pub fn mount(&self, path: PathBuf) -> miette::Result<fuser::BackgroundSession> {
217 easy_fuser::spawn_mount(self.clone(), path, &vec![], 4).into_diagnostic()
218 }
219}