use std::num::NonZeroUsize; use instruction::SocketCount; use serde::{Deserialize, Serialize}; pub mod id; pub mod instruction; pub mod semi_human; pub type Map = std::collections::BTreeMap; pub type Set = std::collections::BTreeSet; /// Gives you a super well typed graph IR for a given human-readable repr. /// /// Look at [`semi_human::GraphIr`] and the test files in the repo at `testfiles/` /// to see what the RON should look like. /// No, we don't want you to write out [`GraphIr`] in full by hand. /// That's something for the machines to do. /// /// # Errors /// /// Returns an error if the parsed source is not a valid human-readable graph IR. pub fn from_ron(source: &str) -> ron::error::SpannedResult { let human_repr: semi_human::GraphIr = ron::from_str(source)?; Ok(human_repr.into()) } /// The toplevel representation of a whole pipeline. /// /// # DAGs /// /// Pipelines may not be fully linear. They may branch out and recombine later on. /// As such, the representation for them which is currently used is a /// [**D**irected **A**cyclic **G**raph](https://en.wikipedia.org/wiki/Directed_acyclic_graph). /// /// For those who are already familiar with graphs, a DAG is one, except that: /// /// - It is **directed**: Edges have a direction they point to. /// In this case, edges point from the outputs of streamers to inputs of consumers. /// - It is **acyclic**: Those directed edges may not form loops. /// In other words, if one follows edges only in their direction, it must be impossible /// to come back to an already visited node. /// /// Here, if an edge points from _A_ to _B_ (`A --> B`), /// then _A_ is called a **dependency** or an **input source** of _B_, /// and _B_ is called a **dependent** or an **output target** of _A_. /// /// The DAG also enables another neat operation: /// [Topological sorting](https://en.wikipedia.org/wiki/Topological_sorting). /// This allows to put the entire graph into a linear list, /// where it's guaranteed that once a vertex is visited, /// all dependencies of it will have been visited already as well. /// /// The representation used here in specific is a bit more complicated, /// since **instructions** directly aren't just connected to one another, /// but their **sockets** are instead. /// /// So the vertices of the DAG are the **sockets** /// (which are either [`id::Input`] or [`id::Output`] depending on the direction), /// and each **socket** in turn belongs to an **instruction**. /// /// # Usage /// /// - If you want to build one from scratch, /// add a few helper methods like /// constructing an empty one, /// adding instructions and /// adding edges /// - If you want to construct one from an existing repr, /// maybe you want to use [`semi_human::GraphIr`]. /// /// # Storing additional data /// /// Chances are the graph IR seems somewhat fit to put metadata in it. /// However, most likely you're interacting in context of some other system, /// and also want to manage and index that data on your own. /// /// As such, consider using _secondary_ maps instead. /// That is, store in a data structure _you_ own a mapping /// from [`id`]s /// to whatever data you need. #[derive(Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize)] pub struct GraphIr { /// "Backbone" storage of all **instruction** IDs to /// what **kind of instruction** they are. instructions: Map, /// How the data flows forward. **Dependencies** map to **dependents** here. edges: Map>, /// How the data flows backward. **Dependents** map to **dependencies** here. rev_edges: Map, } // TODO: this impl block, but actually the whole module, screams for tests impl GraphIr { /// Look "backwards" in the graph, /// and find out what instructions need to be done before this one. /// The input slots are visited in order. /// /// - The iterator returns individually [`Some`]`(`[`None`]`)` if the corresponding slot is /// not connected. /// /// The same caveats as for [`GraphIr::resolve`] apply. #[must_use] pub fn input_sources( &self, subject: &id::Instruction, ) -> Option> + '_> { let (subject, kind) = self.instructions.get_key_value(subject)?; let SocketCount { inputs, .. } = kind.socket_count(); Some((0..inputs).map(|idx| { let input = id::Input(socket(subject, idx)); self.rev_edges.get(&input) })) } /// Look "forwards" in the graph to see what other instructions this instruction feeds into. /// /// The output slots represent the top-level iterator, /// and each one's connections are emitted one level below. /// /// Just [`Iterator::flatten`] if you are not interested in the slots. /// /// The same caveats as for [`GraphIr::resolve`] apply. #[must_use] pub fn output_targets( &self, subject: &id::Instruction, ) -> Option>> + '_> { let (subject, kind) = self.instructions.get_key_value(subject)?; let SocketCount { outputs, .. } = kind.socket_count(); Some((0..outputs).map(|idx| { let output = id::Output(socket(subject, idx)); self.edges.get(&output) })) } /// Returns the instruction corresponding to the given ID. /// Returns [`None`] if there is no such instruction in this graph IR. /// /// Theoretically this could be fixed easily at the expense of some memory /// by just incrementing and storing some global counter, /// however, at the moment there's no compelling reason /// to actually have multiple [`GraphIr`]s at one point in time. /// Open an issue if that poses a problem for you. #[must_use] pub fn resolve<'ir>(&'ir self, subject: &id::Instruction) -> Option> { let (id, kind) = self.instructions.get_key_value(subject)?; let input_sources = self.input_sources(subject)?.collect(); let output_targets = self.output_targets(subject)?.collect(); Some(InstructionRef { id, kind, input_sources, output_targets, }) } /// Returns the instruction this input belongs to. /// /// The same caveats as for [`GraphIr::resolve`] apply. #[must_use] pub fn owner_of_input<'ir>(&'ir self, input: &id::Input) -> Option> { self.resolve(&input.socket().belongs_to) } /// Returns the instruction this output belongs to. /// /// The same caveats as for [`GraphIr::resolve`] apply. #[must_use] pub fn owner_of_output<'ir>(&'ir self, output: &id::Output) -> Option> { self.resolve(&output.socket().belongs_to) } /// Returns the order in which the instructions could be visited /// in order to ensure that all dependencies are resolved /// before a vertex is visited. /// /// # Panics /// /// Panics if there are any cycles in the IR, as it needs to be a DAG. #[must_use] // yes, this function could probably return an iterator and be lazy // no, not today pub fn topological_sort(&self) -> Vec { // count how many incoming edges each vertex has let mut nonzero_input_counts: Map<_, NonZeroUsize> = self.rev_edges .iter() .fold(Map::new(), |mut count, (input, _)| { let _ = *count .entry(input.socket().belongs_to.clone()) .and_modify(|count| *count = count.saturating_add(1)) .or_insert(NonZeroUsize::MIN); count }); // are there any unconnected ones we could start with? // TODO: experiment if a VecDeque with some ordering fun is digested better by the executor let no_inputs: Vec<_> = { let nonzero: Set<_> = nonzero_input_counts.keys().collect(); let all: Set<_> = self.instructions.keys().collect(); all.difference(&nonzero).copied().cloned().collect() }; // then let's find the order! let mut order = Vec::new(); let mut active_queue = no_inputs; while let Some(current) = active_queue.pop() { // now that this vertex is visited and resolved, // make sure all dependents notice that let dependents = self .output_targets(¤t) .expect("graph to be consistent") .flatten() .flatten(); for dependent_input in dependents { let dependent = &dependent_input.socket().belongs_to; // how many inputs are connected to this dependent without us? let count = nonzero_input_counts .get_mut(dependent) .expect("connected output must refer to non-zero input"); let new = NonZeroUsize::new(count.get() - 1); if let Some(new) = new { // aww, still some *count = new; continue; } // none, that means this one is free now! let's throw it onto the active queue then let (now_active, _) = nonzero_input_counts .remove_entry(dependent) .expect("connected output must refer to non-zero input"); active_queue.push(now_active); } // TODO: check if this instruction is "well-fed", that is, has all the inputs it needs, // and if not, panic order.push(self.resolve(¤t).expect("graph to be consistent")); } assert!( nonzero_input_counts.is_empty(), concat!( "topological sort didn't cover all instructions\n", "either there are unconnected inputs, or there is a cycle\n", "unresolved instructions:\n", "{:#?}" ), nonzero_input_counts, ); order } } /// Constructs an [`id::Socket`] a bit more tersely. fn socket(id: &id::Instruction, idx: u16) -> id::Socket { id::Socket { belongs_to: id.clone(), idx: id::SocketIdx(idx), } } /// A full instruction bundeled in context, with its inputs and outputs. #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct Instruction { pub id: id::Instruction, pub kind: instruction::Kind, // can't have these two public since then a user might corrupt their length input_sources: Vec>, output_targets: Vec>, } impl Instruction { /// Where this instruction gets its inputs from. /// /// [`None`] means that this input is unfilled, /// and must be filled before the instruction can be ran. #[must_use] pub fn input_sources(&self) -> &[Option] { &self.input_sources } /// To whom outputs are sent. #[must_use] pub fn output_targets(&self) -> &[Set] { &self.output_targets } } /// [`Instruction`], but every single field is borrowed instead. /// See its docs. /// /// Use the [`From`] impl to handily convert into an [`Instruction`]. /// The other way around is unlikely to be wanted — since you already have an [`Instruction`], /// chances are you just want to take a reference (`&`) of it. #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct InstructionRef<'ir> { pub id: &'ir id::Instruction, pub kind: &'ir instruction::Kind, input_sources: Vec>, output_targets: Vec>>, } impl<'ir> InstructionRef<'ir> { /// Where this instruction gets its inputs from. /// /// [`None`] means that this input is unfilled, /// and must be filled before the instruction can be ran. #[must_use] pub fn input_sources(&self) -> &[Option<&'ir id::Output>] { &self.input_sources } /// To whom outputs are sent. #[must_use] pub fn output_targets(&self) -> &[Option<&'ir Set>] { &self.output_targets } } // would love to use ToOwned but Rust has no specialization yet // and it'd hurt a blanket impl of ToOwned otherwise impl From> for Instruction { fn from(source: InstructionRef<'_>) -> Self { Self { id: source.id.clone(), kind: source.kind.clone(), input_sources: source .input_sources .into_iter() .map(Option::<&_>::cloned) .collect(), output_targets: source .output_targets .into_iter() .map(|outputs| outputs.map(Clone::clone).unwrap_or_default()) .collect(), } } }