2024-01-19 01:01:30 +00:00
|
|
|
use std::{collections::BTreeSet, iter, ops::RangeInclusive};
|
2024-01-12 16:23:17 +00:00
|
|
|
|
2024-01-19 01:01:30 +00:00
|
|
|
use either::Either;
|
2024-01-18 23:45:01 +00:00
|
|
|
use instruction::SocketCount;
|
2024-01-12 16:23:17 +00:00
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
|
2024-01-18 21:09:11 +00:00
|
|
|
pub mod id;
|
2024-01-12 16:23:17 +00:00
|
|
|
pub mod instruction;
|
2024-01-18 21:24:03 +00:00
|
|
|
pub mod semi_human;
|
2024-01-18 20:54:41 +00:00
|
|
|
|
2024-01-18 23:45:01 +00:00
|
|
|
pub type Map<K, V> = std::collections::BTreeMap<K, V>;
|
|
|
|
pub type Set<V> = std::collections::BTreeSet<V>;
|
2024-01-12 16:23:17 +00:00
|
|
|
|
2024-01-18 20:39:19 +00:00
|
|
|
/// Gives you a super well typed graph IR for a given human-readable repr.
|
|
|
|
///
|
2024-01-18 21:24:03 +00:00
|
|
|
/// Look at [`semi_human::GraphIr`] and the test files in the repo at `testfiles/`
|
2024-01-18 20:39:19 +00:00
|
|
|
/// to see what the RON should look like.
|
|
|
|
/// No, we don't want you to write out [`GraphIr`] in full by hand.
|
|
|
|
/// That's something for the machines to do.
|
|
|
|
///
|
2024-01-12 16:23:17 +00:00
|
|
|
/// # Errors
|
|
|
|
///
|
2024-01-18 20:39:19 +00:00
|
|
|
/// Returns an error if the parsed source is not a valid human-readable graph IR.
|
2024-01-12 16:23:17 +00:00
|
|
|
pub fn from_ron(source: &str) -> ron::error::SpannedResult<GraphIr> {
|
2024-01-18 20:54:41 +00:00
|
|
|
let human_repr: semi_human::GraphIr = ron::from_str(source)?;
|
2024-01-18 20:39:19 +00:00
|
|
|
Ok(human_repr.into())
|
2024-01-12 16:23:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// The toplevel representation of a whole pipeline.
|
|
|
|
///
|
|
|
|
/// Pipelines may not be fully linear. They may branch out and recombine later on.
|
|
|
|
/// As such, the representation for them which is currently used is a
|
2024-01-18 21:24:03 +00:00
|
|
|
/// [**D**irected **A**cyclic **G**raph](https://en.wikipedia.org/wiki/Directed_acyclic_graph).
|
2024-01-12 16:23:17 +00:00
|
|
|
///
|
|
|
|
/// For those who are already familiar with graphs, a DAG is one, except that:
|
|
|
|
///
|
|
|
|
/// - It is **directed**: Edges have a direction they point to.
|
|
|
|
/// In this case, edges point from the outputs of streamers to inputs of consumers.
|
|
|
|
/// - It is **acyclic**: Those directed edges may not form loops.
|
|
|
|
/// In other words, if one follows edges only in their direction, it must be impossible
|
|
|
|
/// to come back to an already visited node.
|
|
|
|
///
|
|
|
|
/// Here, if an edge points from _A_ to _B_ (`A --> B`),
|
|
|
|
/// then _A_ is called a **dependency** of _B_,
|
|
|
|
/// and _B_ is called a **dependent** of _A_.
|
|
|
|
///
|
|
|
|
/// The DAG also enables another neat operation:
|
|
|
|
/// [Topological sorting](https://en.wikipedia.org/wiki/Topological_sorting).
|
|
|
|
/// This allows to put the entire graph into a linear list,
|
|
|
|
/// where it's guaranteed that once a vertex is visited,
|
|
|
|
/// all dependencies of it will have been visited already as well.
|
|
|
|
///
|
|
|
|
/// The representation used here in specific is a bit more complicated,
|
|
|
|
/// since **instructions** directly aren't just connected to one another,
|
|
|
|
/// but their **sockets** are instead.
|
|
|
|
///
|
|
|
|
/// So the vertices of the DAG are the **sockets**
|
|
|
|
/// (which are either [`id::Input`] or [`id::Output`] depending on the direction),
|
|
|
|
/// and each **socket** in turn belongs to an **instruction**.
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
|
|
|
|
pub struct GraphIr {
|
|
|
|
/// "Backbone" storage of all **instruction** IDs to
|
|
|
|
/// what **kind of instruction** they are.
|
|
|
|
instructions: Map<id::Instruction, instruction::Kind>,
|
|
|
|
|
|
|
|
/// How the data flows forward. **Dependencies** map to **dependents** here.
|
|
|
|
edges: Map<id::Output, Set<id::Input>>,
|
|
|
|
/// How the data flows backward. **Dependents** map to **dependencies** here.
|
2024-01-18 23:45:01 +00:00
|
|
|
rev_edges: Map<id::Input, id::Output>,
|
2024-01-12 16:23:17 +00:00
|
|
|
}
|
|
|
|
|
2024-01-18 23:45:01 +00:00
|
|
|
impl GraphIr {
|
2024-01-19 01:01:30 +00:00
|
|
|
/// Look "forwards" in the graph to see what other instructions this instruction feeds into.
|
|
|
|
///
|
|
|
|
/// The output slots represent the top-level iterator,
|
|
|
|
/// and each one's connections are emitted one level below.
|
|
|
|
///
|
|
|
|
/// Just [`Iterator::flatten`] if you are not interested in the slots.
|
|
|
|
///
|
|
|
|
/// The same caveats as for [`GraphIr::resolve`] apply.
|
|
|
|
#[must_use]
|
|
|
|
pub fn dependents(
|
|
|
|
&self,
|
|
|
|
subject: &id::Instruction,
|
|
|
|
) -> Option<impl Iterator<Item = impl Iterator<Item = &id::Instruction>> + '_> {
|
|
|
|
let (subject, kind) = self.instructions.get_key_value(subject)?;
|
|
|
|
let SocketCount { inputs, .. } = kind.socket_count();
|
|
|
|
|
|
|
|
Some((0..inputs).map(|idx| {
|
|
|
|
let output = id::Output(socket(subject, idx));
|
|
|
|
self.edges
|
|
|
|
.get(&output)
|
|
|
|
.map_or(Either::Right(iter::empty()), |targets| {
|
|
|
|
Either::Left(targets.iter().map(|input| &input.socket().belongs_to))
|
|
|
|
})
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Look "backwards" in the graph,
|
|
|
|
/// and find out what instructions need to be done before this one.
|
|
|
|
/// The input slots are visited in order.
|
|
|
|
///
|
|
|
|
/// - The iterator returns individually [`Some`]`(`[`None`]`)` if the corresponding slot is
|
|
|
|
/// not connected.
|
|
|
|
///
|
|
|
|
/// The same caveats as for [`GraphIr::resolve`] apply.
|
|
|
|
#[must_use]
|
|
|
|
pub fn dependencies(
|
|
|
|
&self,
|
|
|
|
subject: &id::Instruction,
|
|
|
|
) -> Option<impl Iterator<Item = Option<&id::Instruction>> + '_> {
|
|
|
|
let (subject, kind) = self.instructions.get_key_value(subject)?;
|
|
|
|
let SocketCount { inputs, .. } = kind.socket_count();
|
|
|
|
|
|
|
|
Some((0..inputs).map(|idx| {
|
|
|
|
let input = id::Input(socket(subject, idx));
|
|
|
|
self.rev_edges
|
|
|
|
.get(&input)
|
|
|
|
.map(|output| &output.socket().belongs_to)
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
|
2024-01-18 23:45:01 +00:00
|
|
|
// TODO: this function, but actually the whole module, screams for tests
|
|
|
|
/// Returns the instruction corresponding to the given ID.
|
|
|
|
/// Returns [`None`] if there is no such instruction in this graph IR.
|
|
|
|
///
|
|
|
|
/// Theoretically this could be fixed easily at the expense of some memory
|
|
|
|
/// by just incrementing and storing some global counter,
|
|
|
|
/// however, at the moment there's no compelling reason
|
|
|
|
/// to actually have multiple [`GraphIr`]s at one point in time.
|
|
|
|
/// Open an issue if that poses a problem for you.
|
|
|
|
#[must_use]
|
2024-01-19 01:01:30 +00:00
|
|
|
pub fn resolve<'ir>(&'ir self, subject: &id::Instruction) -> Option<Instruction<'ir>> {
|
|
|
|
let (id, kind) = self.instructions.get_key_value(subject)?;
|
2024-01-18 23:45:01 +00:00
|
|
|
|
|
|
|
// just try each slot and see if it's connected
|
|
|
|
// very crude, but it works for a proof of concept
|
|
|
|
let SocketCount { inputs, outputs } = kind.socket_count();
|
|
|
|
let socket = |id: &id::Instruction, idx| id::Socket {
|
|
|
|
belongs_to: id.clone(),
|
|
|
|
// impossible since the length is limited to a u16 already
|
|
|
|
#[allow(clippy::cast_possible_truncation)]
|
|
|
|
idx: id::SocketIdx(idx as u16),
|
|
|
|
};
|
|
|
|
|
|
|
|
let mut inputs_from = vec![None; inputs.into()];
|
|
|
|
for (idx, slot) in inputs_from.iter_mut().enumerate() {
|
|
|
|
let input = id::Input(socket(id, idx));
|
|
|
|
*slot = self.rev_edges.get(&input);
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut outputs_to = vec![None; outputs.into()];
|
|
|
|
for (idx, slot) in outputs_to.iter_mut().enumerate() {
|
|
|
|
let output = id::Output(socket(id, idx));
|
|
|
|
*slot = self.edges.get(&output);
|
|
|
|
}
|
|
|
|
|
|
|
|
Some(Instruction {
|
|
|
|
id,
|
|
|
|
kind,
|
|
|
|
inputs_from,
|
|
|
|
outputs_to,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the instruction this input belongs to.
|
|
|
|
///
|
|
|
|
/// The same caveats as for [`GraphIr::resolve`] apply.
|
|
|
|
#[must_use]
|
|
|
|
pub fn owner_of_input<'ir>(&'ir self, input: &id::Input) -> Option<Instruction<'ir>> {
|
|
|
|
self.resolve(&input.socket().belongs_to)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the instruction this output belongs to.
|
|
|
|
///
|
|
|
|
/// The same caveats as for [`GraphIr::resolve`] apply.
|
|
|
|
#[must_use]
|
|
|
|
pub fn owner_of_output<'ir>(&'ir self, output: &id::Output) -> Option<Instruction<'ir>> {
|
|
|
|
self.resolve(&output.socket().belongs_to)
|
|
|
|
}
|
|
|
|
|
2024-01-19 01:01:30 +00:00
|
|
|
/// Returns the order in which the instructions could be visited
|
|
|
|
/// in order to ensure that all dependencies are resolved
|
|
|
|
/// before a vertex is visited.
|
|
|
|
///
|
|
|
|
/// # Panics
|
|
|
|
///
|
|
|
|
/// Panics if there are any cycles in the IR, as it needs to be a DAG.
|
2024-01-18 23:45:01 +00:00
|
|
|
#[must_use]
|
2024-01-19 01:01:30 +00:00
|
|
|
// yes, this function could actually return an iterator and be lazy
|
|
|
|
// no, not today
|
2024-01-18 23:45:01 +00:00
|
|
|
pub fn topological_sort(&self) -> Vec<Instruction> {
|
|
|
|
// count how many incoming edges each vertex has
|
2024-01-19 01:10:27 +00:00
|
|
|
let nonzero_input_counts: Map<_, usize> =
|
2024-01-19 01:01:30 +00:00
|
|
|
self.rev_edges
|
|
|
|
.iter()
|
|
|
|
.fold(Map::new(), |mut count, (input, _)| {
|
|
|
|
*count.entry(input.socket().belongs_to.clone()).or_default() += 1;
|
|
|
|
count
|
|
|
|
});
|
|
|
|
|
2024-01-19 01:10:27 +00:00
|
|
|
// are there any unconnected ones we could start with?
|
|
|
|
// TODO: experiment if a VecDeque with some ordering fun is digested better by the executor
|
|
|
|
let no_inputs: Vec<_> = {
|
|
|
|
let nonzero: Set<_> = nonzero_input_counts.keys().collect();
|
|
|
|
let all: Set<_> = self.instructions.keys().collect();
|
|
|
|
all.difference(&nonzero).copied().collect()
|
|
|
|
};
|
|
|
|
let mut active_queue = no_inputs;
|
2024-01-18 23:45:01 +00:00
|
|
|
|
2024-01-19 01:01:30 +00:00
|
|
|
// then let's find the order!
|
|
|
|
let mut order = Vec::new();
|
|
|
|
|
|
|
|
while let Some(current) = active_queue.pop() {
|
|
|
|
// now that this vertex is visited and resolved,
|
|
|
|
// make sure all dependents notice that
|
|
|
|
|
|
|
|
for dependent in self
|
2024-01-19 01:10:27 +00:00
|
|
|
.dependents(current)
|
2024-01-19 01:01:30 +00:00
|
|
|
.expect("graph to be consistent")
|
|
|
|
.flatten()
|
|
|
|
{
|
|
|
|
dbg!(dependent);
|
|
|
|
}
|
|
|
|
|
2024-01-19 01:10:27 +00:00
|
|
|
// TODO: check if this instruction is "well-fed", that is, has all the inputs it needs,
|
|
|
|
// and if not, panic
|
|
|
|
order.push(self.resolve(current).expect("graph to be consistent"));
|
2024-01-19 01:01:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
assert!(
|
2024-01-19 01:10:27 +00:00
|
|
|
!nonzero_input_counts.is_empty(),
|
2024-01-19 01:01:30 +00:00
|
|
|
concat!(
|
|
|
|
"topological sort didn't cover all instructions\n",
|
|
|
|
"either there are unconnected inputs, or there is a cycle\n",
|
|
|
|
"unresolved instructions:\n",
|
|
|
|
"{:#?}"
|
|
|
|
),
|
2024-01-19 01:10:27 +00:00
|
|
|
nonzero_input_counts,
|
2024-01-19 01:01:30 +00:00
|
|
|
);
|
|
|
|
|
|
|
|
order
|
2024-01-18 23:45:01 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// A full instruction in context, with its inputs and outputs.
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
|
|
|
pub struct Instruction<'ir> {
|
|
|
|
pub id: &'ir id::Instruction,
|
|
|
|
pub kind: &'ir instruction::Kind,
|
|
|
|
|
|
|
|
// can't have these two public since then a user might corrupt their length
|
|
|
|
inputs_from: Vec<Option<&'ir id::Output>>,
|
|
|
|
outputs_to: Vec<Option<&'ir BTreeSet<id::Input>>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'ir> Instruction<'ir> {
|
|
|
|
/// Where this instruction gets its inputs from.
|
|
|
|
///
|
|
|
|
/// [`None`] means that this input is unfilled,
|
|
|
|
/// and must be filled before the instruction can be ran.
|
|
|
|
#[must_use]
|
|
|
|
pub fn inputs_from(&self) -> &[Option<&'ir id::Output>] {
|
|
|
|
&self.inputs_from
|
|
|
|
}
|
|
|
|
|
|
|
|
/// To whom outputs are sent. [`None`] means that this output is unused.
|
|
|
|
#[must_use]
|
|
|
|
pub fn outputs_to(&self) -> &[Option<&'ir BTreeSet<id::Input>>] {
|
|
|
|
&self.outputs_to
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Some part referred to in source code.
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
|
2024-01-12 16:23:17 +00:00
|
|
|
pub struct Span {
|
2024-01-18 23:45:01 +00:00
|
|
|
// would love to use an actual [`std::ops::RangeInclusive`], but those don't implement
|
|
|
|
// `PartialOrd` and `Ord` unfortunately
|
|
|
|
/// At which byte this span starts, inclusively.
|
|
|
|
pub from: usize,
|
|
|
|
/// At which byte this span ends, inclusively.
|
|
|
|
pub to: usize,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<RangeInclusive<usize>> for Span {
|
|
|
|
fn from(range: RangeInclusive<usize>) -> Self {
|
|
|
|
Self {
|
|
|
|
from: *range.start(),
|
|
|
|
to: *range.end(),
|
|
|
|
}
|
|
|
|
}
|
2024-01-12 16:23:17 +00:00
|
|
|
}
|
2024-01-19 01:01:30 +00:00
|
|
|
|
|
|
|
/// Constructs an [`id::Socket`] a bit more tersely.
|
|
|
|
fn socket(id: &id::Instruction, idx: u16) -> id::Socket {
|
|
|
|
id::Socket {
|
|
|
|
belongs_to: id.clone(),
|
|
|
|
idx: id::SocketIdx(idx),
|
|
|
|
}
|
|
|
|
}
|