1use super::*;
2use crate::discovery::{INITIAL_PUBLISH_DELAY, REPUBLISH_DELAY};
3use bytes::Bytes;
4#[cfg(feature = "fuse")]
5use fuse_mt::spawn_mount;
6use iroh::protocol::ProtocolHandler;
7use iroh_blobs::store::Store;
8use iroh_docs::Author;
9use log::{error, info};
10#[cfg(feature = "fuse")]
11use miette::IntoDiagnostic;
12use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
13#[cfg(feature = "fuse")]
14use std::collections::HashMap;
15use std::path::PathBuf;
16#[cfg(feature = "fuse")]
17use std::sync::Arc;
18#[cfg(feature = "fuse")]
19use std::sync::RwLock;
20#[cfg(feature = "fuse")]
21use tokio::runtime::Handle;
22use tokio::sync::watch::{self};
23
24impl OkuFs {
25 pub async fn get_author(&self) -> anyhow::Result<Author> {
31 let default_author_id = self.default_author().await;
32
33 self.docs
34 .client()
35 .authors()
36 .export(default_author_id)
37 .await
38 .ok()
39 .flatten()
40 .ok_or(anyhow::anyhow!(
41 "Missing private key for default author ({}).",
42 crate::fs::util::fmt_short(default_author_id)
43 ))
44 }
45
46 pub async fn start(#[cfg(feature = "fuse")] handle: &Handle) -> anyhow::Result<Self> {
57 let endpoint = iroh::Endpoint::builder()
58 .discovery_n0()
59 .discovery_dht()
60 .discovery_local_network()
61 .bind()
62 .await?;
63 let blobs = iroh_blobs::net_protocol::Blobs::persistent(NODE_PATH.clone())
64 .await?
65 .build(&endpoint);
66 let gossip = iroh_gossip::net::Gossip::builder()
67 .spawn(endpoint.clone())
68 .await?;
69 let docs = iroh_docs::protocol::Docs::persistent(NODE_PATH.clone())
70 .spawn(&blobs, &gossip)
71 .await?;
72
73 let router = iroh::protocol::Router::builder(endpoint.clone())
74 .accept(iroh_blobs::ALPN, blobs.clone())
75 .accept(iroh_gossip::ALPN, gossip.clone())
76 .accept(iroh_docs::ALPN, docs.clone())
77 .spawn()
78 .await?;
79 info!(
80 "Default author ID is {} … ",
81 crate::fs::util::fmt_short(docs.client().authors().default().await.unwrap_or_default())
82 );
83
84 let (replica_sender, _replica_receiver) = watch::channel(());
85 let (okunet_fetch_sender, _okunet_fetch_receiver) = watch::channel(false);
86
87 let oku_fs = Self {
88 endpoint,
89 blobs,
90 docs,
91 router,
92 replica_sender,
93 okunet_fetch_sender,
94 #[cfg(feature = "fuse")]
95 fs_handles: Arc::new(RwLock::new(HashMap::new())),
96 #[cfg(feature = "fuse")]
97 newest_handle: Arc::new(RwLock::new(0)),
98 #[cfg(feature = "fuse")]
99 handle: handle.clone(),
100 dht: mainline::Dht::server()?.as_async(),
101 };
102 let oku_fs_clone = oku_fs.clone();
103 tokio::spawn(async move {
104 loop {
105 tokio::time::sleep(INITIAL_PUBLISH_DELAY).await;
106 match oku_fs_clone.announce_replicas().await {
107 Ok(_) => info!("Announced all replicas … "),
108 Err(e) => error!("{}", e),
109 }
110 match oku_fs_clone.refresh_users().await {
111 Ok(_) => info!("Refreshed OkuNet database … "),
112 Err(e) => error!("{}", e),
113 }
114 tokio::time::sleep(REPUBLISH_DELAY - INITIAL_PUBLISH_DELAY).await;
115 }
116 });
117 Ok(oku_fs.clone())
118 }
119
120 pub async fn shutdown(self) {
122 let _ = self.endpoint.close().await;
123 let _ = self.router.shutdown().await;
124 self.docs.shutdown().await;
125 self.blobs.shutdown().await;
126 self.blobs.store().shutdown().await;
127 }
128
129 pub async fn content_bytes(&self, entry: &iroh_docs::Entry) -> anyhow::Result<Bytes> {
139 self.content_bytes_by_hash(&entry.content_hash()).await
140 }
141
142 pub async fn content_bytes_by_hash(&self, hash: &iroh_blobs::Hash) -> anyhow::Result<Bytes> {
152 self.blobs.client().read_to_bytes(*hash).await
153 }
154
155 pub async fn get_oldest_timestamp(&self) -> miette::Result<u64> {
161 let replicas = self.list_replicas().await?;
162 let mut timestamps: Vec<u64> = Vec::new();
163 for (replica, _capability_kind) in replicas {
164 timestamps.push(
165 self.get_oldest_timestamp_in_folder(&replica, &PathBuf::from("/"))
166 .await?,
167 );
168 }
169 Ok(*timestamps.par_iter().min().unwrap_or(&u64::MIN))
170 }
171
172 pub async fn get_newest_timestamp(&self) -> miette::Result<u64> {
178 let replicas = self.list_replicas().await?;
179 let mut timestamps: Vec<u64> = Vec::new();
180 for (replica, _capability_kind) in replicas {
181 timestamps.push(
182 self.get_newest_timestamp_in_folder(&replica, &PathBuf::from("/"))
183 .await?,
184 );
185 }
186 Ok(*timestamps.par_iter().max().unwrap_or(&u64::MIN))
187 }
188
189 pub async fn get_size(&self) -> miette::Result<u64> {
195 let replicas = self.list_replicas().await?;
196 let mut size = 0;
197 for (replica, _capability_kind) in replicas {
198 size += self.get_folder_size(&replica, &PathBuf::from("/")).await?;
199 }
200 Ok(size)
201 }
202
203 #[cfg(feature = "fuse")]
204 pub fn mount(&self, path: PathBuf) -> miette::Result<fuser::BackgroundSession> {
214 spawn_mount(fuse_mt::FuseMT::new(self.clone(), 1), path, &[]).into_diagnostic()
215 }
216}