input_node_names.split(","), # an array of input nodes
output_node_names.split(","), # an array of output nodes
tf.float32.as_datatype_enum)
# Finally we serialize and dump the output graph_def to the filesystem with tf.gfile.GFile('frozen_model.pb', "wb") as f:
f.write(output_graph_def.SerializeToString())
freeze_graph(model_dir[0], 'Const_1', 'softmax_tensor')
As the result, the model artifact is transformed into the frozen protobuff format (frozen_model.pb) and saved to the notebook instance's home directory (model_dir[0]).
In the code above, you must specify the input and output nodes, namely, 'Const_1' and 'softmax_tensor'. For more details, see the resnet_model_headpose.py.
When creating an AWS DeepLens project later, you'll need to add this frozen graph to the project. For this you must upload the protobuff file to an Amazon S3 folder. For this tutorial, you can use your SageMaker traing job's output folder (s3://deeplens-sagemaker-models-<my-name>/headpose/TFartifacts/<sagemaker-job-name>/
output) in S3. However, the model is considered an externally trained model in AWS DeepLens.
d. To upload the frozen graph to your SageMaker training job's output folder, run the following Python code snippet in a code cell of the running notebook instance:
data = open('frozen_model.pb', "rb")
Once uploaded, the model is ready to be imported into your AWS DeepLens project. Before creating the project, we must create a Lambda function that performs inference based on this trained model (p. 99).
Create an Inference Lambda Function to Detect Head Poses
Before creating an AWS DeepLens project for deployment to your AWS DeepLens device for head pose detection, you must create and publish a Lambda function to make inference based on the trained model.
To create and publish the inference Lambda function, follow the instruction given in the section called “Create and Publish an Inference Lambda Function” (p. 82), but replace the code in the
greengrassHelloWorld.py file with one similar to the following that is used in the Head Pose Detection Sample project (p. 62).
""" This is a lambda function that demonstrates how one can use a deep learning network (ResNet) to detect the person's head pose. We use a shaded rectangle to indicate the region that the persons head is pointed towards. We also display a red ball that moves with the person's head.
Build and Run the Head Pose Detection Project
""" Class for facilitating the local display of inference results (as images). The class is designed to run on its own thread. In particular the class dumps the inference results into a FIFO located in the tmp directory (which lambda has access to). The results can be rendered using mplayer by typing:
mplayer -demuxer lavf -lavfdopts format=mjpeg:probesize=32 /tmp/results.mjpeg """
def __init__(self, resolution):
""" resolution - Desired resolution of the project stream"""
# Initialize the base class, so that the object can run on its own self.resolution = RESOLUTION[resolution]
# Initialize the default image to be a white canvas. Clients # will update the image when ready.
self.frame = cv2.imencode('.jpg', 255*np.ones([640, 480, 3]))[1]
self.stop_request = Event() def run(self):
""" Overridden method that continually dumps images to the desired FIFO file.
def set_frame_data(self, frame):
""" Method updates the image data. This currently encodes the
numpy array to jpg but can be modified to support other encodings.
frame - Numpy array containing the image data of the next frame in the project stream.
"""
ret, jpeg = cv2.imencode('.jpg', cv2.resize(frame, self.resolution)) if not ret:
raise Exception('Failed to set frame data') self.frame = jpeg
Build and Run the Head Pose Detection Project
def join(self):
self.stop_request.set() class HeadDetection():
""" Custom class that helps us post process the data. In particular it draws a ball that moves across the screen depending on the head pose.
It also draws a rectangle indicating the region that the person's head is pointing to. We divide the frame into 9 distinct regions.
"""
def __init__(self, circ_cent_x, circ_cent_y):
""" circ_cent_x - The x coordinate for the center of the frame circ_cent_y - The y coordinate for the center of the frame """
self.result_thread = LocalDisplay('480p') self.result_thread.start()
def update_coords(self, frame, change_x, change_y, label):
""" Helper method that draws a rectangle in the region the person is looking at.
Build and Run the Head Pose Detection Project
cv2.circle(frame, (self.circ_cent_x, self.circ_cent_y), 50, (0, 0, 255), -1) # Update the balls x-y coordinates. self.result_thread.set_frame_data(frame)
Build and Run the Head Pose Detection Project
def adjust_x_vel(self, velocity, pct_of_center):
""" Helper for computing the next x coordinate.
def adjust_y_vel(self, velocity, pct_of_center):
""" Helper for computing the next y coordinate.
def send_data(self, frame, parsed_results):
""" Method that handles all post processing and sending the results to a FIFO file.
frame - The frame that will be post processed.
self.update_coords(frame, self.adjust_x_vel(velocity, pct_of_center), velocity, label)
elif label == 4:
self.update_coords(frame, self.adjust_x_vel(velocity, pct_of_center), self.adjust_y_vel(velocity, pct_of_center), label)
elif label == 5:
self.update_coords(frame, self.adjust_x_vel(velocity, pct_of_center), -1*velocity, label)
elif label == 6:
self.update_coords(frame, -1*velocity, velocity, label) elif label == 7:
self.update_coords(frame, -1*velocity, self.adjust_y_vel(velocity, pct_of_center),
label) elif label == 8:
self.update_coords(frame, -1*velocity, -1*velocity, label) def get_results(self, parsed_results, output_map):
""" Method converts the user entered number of top inference labels and associated probabilities to json format.
Build and Run the Head Pose Detection Project
parsed_results - A dictionary containing the inference results.
output_map - A dictionary that maps the numerical labels returned the inference engine to human readable labels.
"""
if self.quadrants <= 0 or self.quadrants > len(parsed_results):
return json.dumps({"Error" : "Invalid"}) top_result = parsed_results[0:self.quadrants]
cloud_output = {}
for obj in top_result:
cloud_output[output_map[obj['label']]] = obj['prob']
return json.dumps(cloud_output) def head_detection():
""" This method serves as the main entry point of our lambda.
"""
# Creating a client to send messages via IoT MQTT to the cloud client = greengrasssdk.client('iot-data')
# This is the topic where we will publish our messages too
iot_topic = '$aws/things/{}/infer'.format(os.environ['AWS_IOT_THING_NAME']) try:
client.publish(topic=iot_topic, payload="Optimizing model")
ret, model_path = mo.optimize(model_name, input_width, input_height, platform='tf') # Send message to IoT via MQTT
client.publish(topic=iot_topic, payload="Model optimization complete") if ret is not 0:
raise Exception("Model optimization failed, error code: {}".format(ret)) # Send message to IoT via MQTT
client.publish(topic=iot_topic, payload="Loading model") # Load the model into cl-dnn
model = awscam.Model(model_path, {"GPU": 1}) # Send message to IoT via MQTT
client.publish(topic=iot_topic, payload="Model loaded")
# We need to sample a frame so that we can determine where the center of
head_pose_detection = HeadDetection(sample_frame.shape[1]/2, sample_frame.shape[0]/2)
Build and Run the Head Pose Detection Project
raise Exception("Failed to get frame from the stream") # Mirror Image
frame = cv2.flip(frame, 1)
# Draw the rectangle around the area that we expect the persons head to be.
cv2.rectangle(frame, (crop_upper_left_x, crop_upper_left_y), (crop_upper_left_x + crop_width, crop_upper_left_y + crop_height),
(255, 40, 0), 4)
# Crop the the frame so that we can do inference on the area of the frame where # expect the persons head to be.
frame_crop = frame[crop_upper_left_y:crop_upper_left_y + crop_height, crop_upper_left_x:crop_upper_left_x + crop_width]
# Down sample the image.
frame_resize = cv2.resize(frame_crop, (input_width, input_height)) # Renormalize the image so that the color magnitudes are between 0 and 1 frame_resize = frame_resize.astype(np.float32)/255.0
# Run the down sampled image through the inference engine.
infer_output = model.doInference(frame_resize)
# Parse the results so that we get back a more manageble data structure.
parsed_results = model.parseResult(model_type, infer_output)[model_type]
# Post process the image and send it to the FIFO file head_pose_detection.send_data(frame, parsed_results) # Send the results to MQTT in JSON format. client.publish(topic=iot_topic, payload=msg)
# Entry point of the lambda function.
head_detection()
The LocalDisplay class defined in the Lambda function above serializes processed images, in a dedicated thread, to a specified FIFO file that serves as the source to replay the inference results as the project stream.
The HeadDetection class handles post-process of parsed data, including calling out a detected head position and pose in the project stream's local display.
The heade_detection function coordinates the whole process to infer head poses of capture frames from AWS DeepLens video feeds, including loading the model artifact deployed to your AWS DeepLens device. To load the model, you must specify the path to the model artifact. To get this path you can call the mo.optimize method and specify "frozen_model" as the model_name input value. This model name corresponds to the file name without the .pb extension of the model artifact uploaded to Amazon S3.
Having trained your model and uploaded it to S3 and created the Lambda function in your account, you're now ready to create and deploy the head pose detection project (p. 105).