1919
2020
2121class Pipeline [T ]:
22+ """Manages a data source and applies transformers to it.
23+
24+ A Pipeline provides a high-level interface for data processing by chaining
25+ transformers together. It automatically manages a multiprocessing-safe
26+ shared context that can be accessed by all transformers in the chain.
2227 """
23- Manages a data source and applies transformers to it.
24- Always uses a multiprocessing-safe shared context.
25- """
2628
27- def __init__ (self , * data : Iterable [T ]):
29+ def __init__ (self , * data : Iterable [T ]) -> None :
30+ """Initialize a pipeline with one or more data sources.
31+
32+ Args:
33+ *data: One or more iterable data sources. If multiple sources are
34+ provided, they will be chained together.
35+
36+ Raises:
37+ ValueError: If no data sources are provided.
38+ """
2839 if len (data ) == 0 :
2940 raise ValueError ("At least one data source must be provided to Pipeline." )
3041 self .data_source : Iterable [T ] = itertools .chain .from_iterable (data ) if len (data ) > 1 else data [0 ]
@@ -39,19 +50,25 @@ def __init__(self, *data: Iterable[T]):
3950 # Store reference to original context for final synchronization
4051 self ._original_context_ref : PipelineContext | None = None
4152
42- def __del__ (self ):
53+ def __del__ (self ) -> None :
4354 """Clean up the multiprocessing manager when the pipeline is destroyed."""
4455 try :
4556 self ._sync_context_back ()
4657 self ._manager .shutdown ()
4758 except Exception :
48- pass # Ignore errors during cleanup
59+ pass
4960
5061 def context (self , ctx : PipelineContext ) -> "Pipeline[T]" :
51- """
52- Updates the pipeline context and stores a reference to the original context.
62+ """Update the pipeline context and store a reference to the original context.
63+
5364 When the pipeline finishes processing, the original context will be updated
5465 with the final pipeline context data.
66+
67+ Args:
68+ ctx: The pipeline context to use for this pipeline execution.
69+
70+ Returns:
71+ The pipeline instance for method chaining.
5572 """
5673 # Store reference to the original context
5774 self ._original_context_ref = ctx
@@ -60,9 +77,10 @@ def context(self, ctx: PipelineContext) -> "Pipeline[T]":
6077 return self
6178
6279 def _sync_context_back (self ) -> None :
63- """
64- Synchronize the final pipeline context back to the original context reference.
65- This is called after processing is complete.
80+ """Synchronize the final pipeline context back to the original context reference.
81+
82+ This is called after processing is complete to update the original
83+ context with any changes made during pipeline execution.
6684 """
6785 if self ._original_context_ref is not None :
6886 # Copy the final context state back to the original context reference
@@ -72,15 +90,16 @@ def _sync_context_back(self) -> None:
7290 self ._original_context_ref .update (final_context_state )
7391
7492 def transform [U ](self , t : Callable [[Transformer [T , T ]], Transformer [T , U ]]) -> "Pipeline[U]" :
75- """
76- Shorthand method to apply a transformation using a lambda function.
93+ """Apply a transformation using a lambda function.
94+
7795 Creates a Transformer under the hood and applies it to the pipeline.
96+ This is a shorthand method for simple transformations.
7897
7998 Args:
80- t: A callable that takes a transformer and returns a transformed transformer
99+ t: A callable that takes a transformer and returns a transformed transformer.
81100
82101 Returns:
83- A new Pipeline with the transformed data
102+ A new Pipeline with the transformed data.
84103 """
85104 # Create a new transformer and apply the transformation function
86105 transformer = t (Transformer [T , T ]())
@@ -101,53 +120,90 @@ def apply[U](
101120 | Callable [[Iterable [T ]], Iterator [U ]]
102121 | Callable [[Iterable [T ], PipelineContext ], Iterator [U ]],
103122 ) -> "Pipeline[U]" :
104- """
105- Applies a transformer to the current data source. The pipeline's
106- managed context is passed down.
123+ """Apply a transformer to the current data source.
124+
125+ The pipeline's managed context is passed down to the transformer.
126+
127+ Args:
128+ transformer: Either a Transformer instance or a callable function
129+ that processes the data.
130+
131+ Returns:
132+ A new Pipeline with the transformed data.
133+
134+ Raises:
135+ TypeError: If the transformer is not a supported type.
107136 """
108137 match transformer :
109138 case Transformer ():
110- # The transformer is called with self.ctx, which is the
111- # shared mp.Manager.dict proxy when inside a 'with' block.
112139 self .processed_data = transformer (self .processed_data , self .ctx ) # type: ignore
113140 case _ if callable (transformer ):
114141 if is_context_aware (transformer ):
115- processed_transformer = transformer
142+ self . processed_data = transformer ( self . processed_data , self . ctx ) # type: ignore
116143 else :
117- processed_transformer = lambda data , ctx : transformer (data ) # type: ignore # noqa: E731
118- self .processed_data = processed_transformer (self .processed_data , self .ctx ) # type: ignore
144+ self .processed_data = transformer (self .processed_data ) # type: ignore
119145 case _:
120146 raise TypeError ("Transformer must be a Transformer instance or a callable function" )
121147
122148 return self # type: ignore
123149
124150 def buffer (self , size : int ) -> "Pipeline[T]" :
151+ """Buffer the pipeline using threaded processing.
152+
153+ Args:
154+ size: The number of worker threads to use for buffering.
155+
156+ Returns:
157+ The pipeline instance for method chaining.
158+ """
125159 self .apply (ThreadedTransformer (max_workers = size ))
126160 return self
127161
128162 def __iter__ (self ) -> Iterator [T ]:
129- """Allows the pipeline to be iterated over."""
163+ """Allow the pipeline to be iterated over.
164+
165+ Returns:
166+ An iterator over the processed data.
167+ """
130168 yield from self .processed_data
131169
132170 def to_list (self ) -> list [T ]:
133- """Executes the pipeline and returns the results as a list."""
171+ """Execute the pipeline and return the results as a list.
172+
173+ Returns:
174+ A list containing all processed items from the pipeline.
175+ """
134176 return list (self .processed_data )
135177
136178 def each (self , function : PipelineFunction [T ]) -> None :
137- """Applies a function to each element (terminal operation)."""
138- # Context needs to be accessed from the function if it's context-aware,
139- # but the pipeline itself doesn't own a context. This is a design choice.
140- # For simplicity, we assume the function is not context-aware here
141- # or that context is handled within the Transformers.
179+ """Apply a function to each element (terminal operation).
180+
181+ Args:
182+ function: The function to apply to each element.
183+ """
142184 for item in self .processed_data :
143185 function (item )
144186
145187 def first (self , n : int = 1 ) -> list [T ]:
146- """Gets the first n elements of the pipeline (terminal operation)."""
188+ """Get the first n elements of the pipeline (terminal operation).
189+
190+ Args:
191+ n: The number of elements to retrieve.
192+
193+ Returns:
194+ A list containing the first n elements.
195+
196+ Raises:
197+ AssertionError: If n is less than 1.
198+ """
147199 assert n >= 1 , "n must be at least 1"
148200 return list (itertools .islice (self .processed_data , n ))
149201
150202 def consume (self ) -> None :
151- """Consumes the pipeline without returning results."""
203+ """Consume the pipeline without returning results.
204+
205+ This is useful when you want to execute the pipeline for side effects
206+ without collecting the results.
207+ """
152208 for _ in self .processed_data :
153209 pass
0 commit comments