1use super::*;
2use crate::discovery::{INITIAL_PUBLISH_DELAY, REPUBLISH_DELAY};
3use bytes::Bytes;
4use iroh::protocol::ProtocolHandler;
5use iroh_blobs::store::Store;
6use iroh_docs::Author;
7use log::{error, info};
8#[cfg(feature = "fuse")]
9use miette::IntoDiagnostic;
10use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
11use std::path::PathBuf;
12#[cfg(feature = "fuse")]
13use tokio::runtime::Handle;
14use tokio::sync::watch::{self};
15
16impl OkuFs {
17 pub async fn get_author(&self) -> anyhow::Result<Author> {
23 let default_author_id = self.default_author().await;
24
25 self.docs
26 .client()
27 .authors()
28 .export(default_author_id)
29 .await
30 .ok()
31 .flatten()
32 .ok_or(anyhow::anyhow!(
33 "Missing private key for default author ({}).",
34 crate::fs::util::fmt_short(default_author_id)
35 ))
36 }
37
38 pub async fn start(#[cfg(feature = "fuse")] handle: &Handle) -> anyhow::Result<Self> {
49 let endpoint = iroh::Endpoint::builder()
50 .discovery_n0()
51 .discovery_dht()
52 .discovery_local_network()
53 .bind()
54 .await?;
55 let blobs = iroh_blobs::net_protocol::Blobs::persistent(NODE_PATH.clone())
56 .await?
57 .build(&endpoint);
58 let gossip = iroh_gossip::net::Gossip::builder()
59 .spawn(endpoint.clone())
60 .await?;
61 let docs = iroh_docs::protocol::Docs::persistent(NODE_PATH.clone())
62 .spawn(&blobs, &gossip)
63 .await?;
64
65 let router = iroh::protocol::Router::builder(endpoint.clone())
66 .accept(iroh_blobs::ALPN, blobs.clone())
67 .accept(iroh_gossip::ALPN, gossip.clone())
68 .accept(iroh_docs::ALPN, docs.clone())
69 .spawn()
70 .await?;
71 info!(
72 "Default author ID is {} … ",
73 crate::fs::util::fmt_short(docs.client().authors().default().await.unwrap_or_default())
74 );
75
76 let (replica_sender, _replica_receiver) = watch::channel(());
77 let (okunet_fetch_sender, _okunet_fetch_receiver) = watch::channel(false);
78
79 let oku_fs = Self {
80 endpoint,
81 blobs,
82 docs,
83 router,
84 replica_sender,
85 okunet_fetch_sender,
86 #[cfg(feature = "fuse")]
87 fuse_handler: DebugIgnore::from(Arc::new(DefaultFuseHandler::new())),
88 #[cfg(feature = "fuse")]
89 handle: handle.clone(),
90 dht: mainline::Dht::server()?.as_async(),
91 };
92 let oku_fs_clone = oku_fs.clone();
93 tokio::spawn(async move {
94 loop {
95 tokio::time::sleep(INITIAL_PUBLISH_DELAY).await;
96 match oku_fs_clone.announce_replicas().await {
97 Ok(_) => info!("Announced all replicas … "),
98 Err(e) => error!("{}", e),
99 }
100 match oku_fs_clone.refresh_users().await {
101 Ok(_) => info!("Refreshed OkuNet database … "),
102 Err(e) => error!("{}", e),
103 }
104 tokio::time::sleep(REPUBLISH_DELAY - INITIAL_PUBLISH_DELAY).await;
105 }
106 });
107 Ok(oku_fs.clone())
108 }
109
110 pub async fn shutdown(self) {
112 let _ = self.endpoint.close().await;
113 let _ = self.router.shutdown().await;
114 self.docs.shutdown().await;
115 self.blobs.shutdown().await;
116 self.blobs.store().shutdown().await;
117 }
118
119 pub async fn content_bytes(&self, entry: &iroh_docs::Entry) -> anyhow::Result<Bytes> {
129 self.content_bytes_by_hash(&entry.content_hash()).await
130 }
131
132 pub async fn content_bytes_by_hash(&self, hash: &iroh_blobs::Hash) -> anyhow::Result<Bytes> {
142 self.blobs.client().read_to_bytes(*hash).await
143 }
144
145 pub async fn get_oldest_timestamp(&self) -> miette::Result<u64> {
151 let replicas = self.list_replicas().await?;
152 let mut timestamps: Vec<u64> = Vec::new();
153 for (replica, _capability_kind) in replicas {
154 timestamps.push(
155 self.get_oldest_timestamp_in_folder(&replica, &PathBuf::from("/"))
156 .await?,
157 );
158 }
159 Ok(*timestamps.par_iter().min().unwrap_or(&u64::MIN))
160 }
161
162 pub async fn get_newest_timestamp(&self) -> miette::Result<u64> {
168 let replicas = self.list_replicas().await?;
169 let mut timestamps: Vec<u64> = Vec::new();
170 for (replica, _capability_kind) in replicas {
171 timestamps.push(
172 self.get_newest_timestamp_in_folder(&replica, &PathBuf::from("/"))
173 .await?,
174 );
175 }
176 Ok(*timestamps.par_iter().max().unwrap_or(&u64::MIN))
177 }
178
179 pub async fn get_size(&self) -> miette::Result<u64> {
185 let replicas = self.list_replicas().await?;
186 let mut size = 0;
187 for (replica, _capability_kind) in replicas {
188 size += self.get_folder_size(&replica, &PathBuf::from("/")).await?;
189 }
190 Ok(size)
191 }
192
193 #[cfg(feature = "fuse")]
194 pub fn mount(&self, path: PathBuf) -> miette::Result<fuser::BackgroundSession> {
204 easy_fuser::spawn_mount(self.clone(), path, &vec![], 4).into_diagnostic()
205 }
206}