Implementing Pipelining in Python: A Step-by-Step Guide

1) Retrieve the treated frame from the output queue and transfer it to the priority queue if the output queue is not vacant.
2) Continue drawing frames until the output queue becomes empty.
3) It is anticipated that this process will be beneficial.
4) Read the frames and place them in the input queue along with their respective frame numbers.
5) Transfer the frames from the input queue to the output queue, ensuring that their corresponding frame numbers are preserved.


Solution 1:

Due to my lack of 50 reputations, I was unable to comment on it. Additionally, my knowledge on the subject is limited, but after conducting some research, I found a website that discusses real-time and video processing using the Multiprocessing library. I hope you find it helpful.

1) Retrieve frames and assign frame numbers to each, then place them in the input queue.

  # Check input queue is not full
  if not input_q.full():
     # Read frame and store in input queue
     ret, frame = vs.read()
      if ret:            
        input_q.put((int(vs.get(cv2.CAP_PROP_POS_FRAMES)),frame))

2) Transfer the frames from the input queue to the output queue, ensuring that their respective frame numbers are preserved.

while True:
  frame = input_q.get()
frame_rgb = cv2.cvtColor(frame[1], cv2.COLOR_BGR2RGB)
  output_q.put((frame[0], detect_objects(frame_rgb, sess, detection_graph)))

If the output queue is not empty, the treated frame is retrieved from the output queue and transferred to the priority queue for processing.

# Check output queue is not empty
if not output_q.empty():
  # Recover treated frame in output queue and feed priority queue
  output_pq.put(output_q.get())

Continue drawing frames until the output queue is depleted.

# Check output priority queue is not empty
  if not output_pq.empty():
    prior, output_frame = output_pq.get()
    if prior > countWriteFrame:
      output_pq.put((prior, output_frame))
    else: 
      countWriteFrame = countWriteFrame + 1    
      # Draw something with your frame

In conclusion, break if the input queue is empty to stop.

if((not ret) & input_q.empty() & 
    output_q.empty() & output_pq.empty()):
  break

Link can be found HERE


Solution 2:


With the assistance of r_e, I came across a useful toolkit called mpipe, which facilitates pipelining in Python.

During testing, I discovered that the import and display of images is significantly faster compared to converting, processing, and drawing the UI. Therefore, I have opted to utilize a 3-stage pipeline.

It is pretty easy to use:

def conversion(input_data):
    original, frame_counter = input_data
    ...
    return con.gbg(con.down_sample(con.copy_img(original))), frame_counter
def processing(input_data):
    image, frame_counter = input_data
    ...
    return markers, frame_counter
def ui(input_data):
    markers, frame_counter = input_data
    ...
    return image, frame_counter, triangle
def main():
    ...
    while True:
        stage1 = mpipe.OrderedStage(conversion, 3)
        stage2 = mpipe.OrderedStage(processing, 3)
        stage3 = mpipe.OrderedStage(ui, 3)
        pipe = mpipe.Pipeline(stage1.link(stage2.link(stage3)))
        images = []
        while len(images) < 3:
            ret = False
            while not ret:
                ret, image = cap.read()
            images.append(image)
        for i in images:
            t = (i, frame_counter)
            pipe.put(t)
        pipe.put(None)
        for result in pipe.results():
            image, frame_counter, triangle = result
            if not triangle:
                if t_count > 6:
                    Show.show_win("video", image)
                    t_count = 0
                else:
                    t_count += 1
            else:
                Show.show_win("video", image)


Solution 3:


I made a small effort in this regard, drawing heavily from your diagram. The approach involves implementing a 5-stage pipeline and utilizing multi-processing. Please begin reading towards the end.

def main():
    ...
    ...


#!/usr/bin/env python3
import logging
import numpy as np
from time import sleep
from multiprocessing import Process, Queue
class Stage1(Process):
    """Acquire frames as fast as possible and send to next stage"""
    def __init__(self, oqueue):
        super().__init__()
        # Pick up parameters and store in class variables
        self.oqueue = oqueue      # output queue
    def run(self,):
        # Turn on logging
        logging.basicConfig(level=logging.DEBUG,
                        format='%(created).6f [%(levelname)s] Stage1 %(message)s',
                        filename='log-stage1.txt', filemode='w')
        logging.info('started')
        # Generate frames and send down pipeline
        for f in range(NFRAMES):
            logging.debug('Generating frame %d',f)
            # Generate frame of random stuff
            frame = np.random.randint(0,256,(480,640,3), dtype=np.uint8)
            logging.debug('Forwarding frame %d',f)
            self.oqueue.put(frame)
class Stage2(Process):
    """Read frames from previous stage as fast as possible, process and send to next stage"""
    def __init__(self, iqueue, oqueue):
        super().__init__()
        # Pick up parameters and store in class variables
        self.iqueue = iqueue      # input queue
        self.oqueue = oqueue      # output queue
    def run(self,):
        # Turn on logging
        logging.basicConfig(level=logging.DEBUG,
                        format='%(created).6f [%(levelname)s] Stage2 %(message)s',
                        filename='log-stage2.txt', filemode='w')
        logging.info('started')
        for f in range(NFRAMES):
            # Wait for next frame
            frame = self.iqueue.get()
            logging.debug('Received frame %d', f)
            # Process frame ...
            logging.debug('Forwarding frame %d', f)
            self.oqueue.put(frame)
class Stage3(Process):
    """Read frames from previous stage as fast as possible, process and send to next stage"""
    def __init__(self, iqueue, oqueue):
        super().__init__()
        # Pick up parameters and store in class variables
        self.iqueue = iqueue      # input queue
        self.oqueue = oqueue      # output queue
    def run(self,):
        # Turn on logging
        logging.basicConfig(level=logging.DEBUG,
                        format='%(created).6f [%(levelname)s] Stage3 %(message)s',
                        filename='log-stage3.txt', filemode='w')
        logging.info('started')
        for f in range(NFRAMES):
            # Wait for next frame
            frame = self.iqueue.get()
            logging.debug('Received frame %d', f)
            # Process frame ...
            logging.debug('Forwarding frame %d', f)
            self.oqueue.put(frame)
class Stage4(Process):
    """Read frames from previous stage as fast as possible, process and send to next stage"""
    def __init__(self, iqueue, oqueue):
        super().__init__()
        # Pick up parameters and store in class variables
        self.iqueue = iqueue      # input queue
        self.oqueue = oqueue      # output queue
    def run(self,):
        # Turn on logging
        logging.basicConfig(level=logging.DEBUG,
                        format='%(created).6f [%(levelname)s] Stage4 %(message)s',
                        filename='log-stage4.txt', filemode='w')
        logging.info('started')
        for f in range(NFRAMES):
            # Wait for next frame
            frame = self.iqueue.get()
            logging.debug('Received frame %d', f)
            # Process frame ...
            logging.debug('Forwarding frame %d', f)
            self.oqueue.put(frame)
class Stage5(Process):
    """Read frames from previous stage as fast as possible, and display"""
    def __init__(self, iqueue):
        super().__init__()
        # Pick up parameters and store in class variables
        self.iqueue = iqueue      # input queue
    def run(self,):
        # Turn on logging
        logging.basicConfig(level=logging.DEBUG,
                        format='%(created).6f [%(levelname)s] Stage5 %(message)s',
                        filename='log-stage5.txt', filemode='w')
        logging.info('started')
        for f in range(NFRAMES):
            # Wait for next frame
            frame = self.iqueue.get()
            logging.debug('Displaying frame %d', f)
            # Display frame ...
def main():
    # Create Queues to send data between pipeline stages
    q1_2 = Queue(5)    # queue between stages 1 and 2
    q2_3 = Queue(5)    # queue between stages 2 and 3
    q3_4 = Queue(5)    # queue between stages 3 and 4
    q4_5 = Queue(5)    # queue between stages 4 and 5
    # Create Processes for stages of pipeline
    stages = []
    stages.append(Stage1(q1_2))
    stages.append(Stage2(q1_2,q2_3))
    stages.append(Stage3(q2_3,q3_4))
    stages.append(Stage4(q3_4,q4_5))
    stages.append(Stage5(q4_5))
    # Start the stages
    for stage in stages:
        stage.start()
    # Wait for stages to finish
    for stage in stages:
        stage.join()
if __name__ == "__main__":
    NFRAMES = 1000
    main()

Currently, it produces a random noise frame and sends it through the pipeline. Each process is logged in a distinct file that is overwritten with each new program run due to
filemode='w'
. You can view the separate logs in the following manner:

-rw-r--r--  1 mark  staff  1097820 26 Jun 17:07 log-stage1.txt
-rw-r--r--  1 mark  staff  1077820 26 Jun 17:07 log-stage2.txt
-rw-r--r--  1 mark  staff  1077820 26 Jun 17:07 log-stage3.txt
-rw-r--r--  1 mark  staff  1077820 26 Jun 17:07 log-stage4.txt
-rw-r--r--  1 mark  staff   548930 26 Jun 17:07 log-stage5.txt

Afterward, you can observe the timestamps for when each process received and sent each frame.

more log-stage1.txt
1561565618.603456 [INFO] Stage1 started
1561565618.604812 [DEBUG] Stage1 Generating frame 0
1561565618.623938 [DEBUG] Stage1 Forwarding frame 0
1561565618.625659 [DEBUG] Stage1 Generating frame 1
1561565618.647139 [DEBUG] Stage1 Forwarding frame 1
1561565618.648173 [DEBUG] Stage1 Generating frame 2
1561565618.687316 [DEBUG] Stage1 Forwarding frame 2

Follow the progression of “frame 1” as it moves through the various stages.

pi@pi3:~ $ grep "frame 1$" log*
log-stage1.txt:1561565618.625659 [DEBUG] Stage1 Generating frame 1
log-stage1.txt:1561565618.647139 [DEBUG] Stage1 Forwarding frame 1
log-stage2.txt:1561565618.671272 [DEBUG] Stage2 Received frame 1
log-stage2.txt:1561565618.672272 [DEBUG] Stage2 Forwarding frame 1
log-stage3.txt:1561565618.713618 [DEBUG] Stage3 Received frame 1
log-stage3.txt:1561565618.715468 [DEBUG] Stage3 Forwarding frame 1
log-stage4.txt:1561565618.746488 [DEBUG] Stage4 Received frame 1
log-stage4.txt:1561565618.747617 [DEBUG] Stage4 Forwarding frame 1
log-stage5.txt:1561565618.790802 [DEBUG] Stage5 Displaying frame 1

Alternatively, you can merge all the logs in chronological order.

sort -g log*
1561565618.603456 [INFO] Stage1 started
1561565618.604812 [DEBUG] Stage1 Generating frame 0
1561565618.607765 [INFO] Stage2 started
1561565618.612311 [INFO] Stage3 started
1561565618.618425 [INFO] Stage4 started
1561565618.618785 [INFO] Stage5 started
1561565618.623938 [DEBUG] Stage1 Forwarding frame 0
1561565618.625659 [DEBUG] Stage1 Generating frame 1
1561565618.640585 [DEBUG] Stage2 Received frame 0
1561565618.642438 [DEBUG] Stage2 Forwarding frame 0

Frequently Asked Questions