Intelligent Trash Can
AI Vision-based Trash Classification
Overview
This project began as a school science fair project. A couple classmates and I noticed that we only had one type of trash bin at school, which meant that recyclable items were not being sorted out and properly disposed of. I came up with the idea of a trash can that would sort trash by itself.
At home, we had a trash can with one of those motion-activated lids that had stopped working, which I thought would be perfect for this project. I fixed up the lid by taking it apart and replacing the motor, which had become corroded probably due to liquid in the bin. Now I could begin making.
Implementation
In order to sort the trash, the bin would first have to know what item it was dealing with. The best way would be to use a camera equipped with object detection. By using a polycarbonate platform, the object could be placed in an isolated location to undergo object detection prior to sorting. I thought that the platform could be dual purpose, as it could tilt to release the item into either side of the bin, which I later separated into landfill and recycling sides. A 20kg digital servo and some brackets that I had snagged from my robotics team were adequate in supplying the rotational motion of the flap.
To control the servo from my Raspberry Pi, I would need external power and wiring via a breadboard. However, the Adafruit HAT has external power supply socket so it can power the servo directly. Adafruit also provides some Python example code of how to use it , which was very straightforward to follow.
As for the Pi, I was considering between different operating systems to use for this project. Raspbian Jessie is a good choice: it comes with python3.4 and it is not too outdated.
Originally, my bin functioned by snapping a photo of the trash item inside using an external Pi camera, sending it off to a deep learning computer vision site called Clarifai.ai, and using the outputs from there to determine the type of trash. This worked out in terms of reliability and accuracy, so I stuck with it for a while. However, behind the scenes, I was using a free trial plan from them, meaning that I could only submit up to 500 photos on an account. So between different demos, I was always busy creating loads of new accounts in order for my bin to work.
Later, I found that I could use a Nvidia Jetson TX1 as a CPU for my bin.
Advantages:
- Training and inference can be much faster. Instead of using someone else’s algorithm, I’m using my own.
- Servo can share same hardware and python libary
- Much more Machine learning libary package support such as keras, easier to solve issue
- Better learning platform than RP3 for ML related. Good for doing other projects in future such as vehicle, autonomous drive, robots, etc
- Can use CSI camera for fast and robust processing in future Disadvantages:
- The servo hat will not be compatible with PIN in TX1. The servo board needs to use a jumper to connect to TX1.
Here is the code to training code from the TX1
mobilenet_training.py
Long ago
Feb 15, 2019
You uploaded an item
Text
mobilenet_training.py
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
import argparse
import matplotlib as mpl
mpl.use('TkAgg')
import matplotlib.pyplot as plt
import os
import sys
import glob
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split # 0.18
#from sklearn.cross_validation import train_test_split
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, cohen_kappa_score, roc_auc_score, \
roc_curve
from collections import Counter
from sklearn.utils import class_weight
from keras.applications.mobilenet import preprocess_input, MobileNet
from keras.models import Model, load_model
from keras.layers import Dense, GlobalAveragePooling2D
from keras.preprocessing import image
from keras.optimizers import SGD
IM_WIDTH, IM_HEIGHT = 224, 224
NB_EPOCHS = 1
BAT_SIZE = 32
def get_images(paths):
images = []
for path in paths:
img = image.load_img(path, target_size=(224, 224))
x = image.img_to_array(img)
# x = np.expand_dims(x, axis=0)
x = preprocess_input(x)
images.append(x)
return np.asarray(images)
def one_hot_encoding(labels):
labels = pd.Series(labels).str.get_dummies()
return labels
def split(files):
X_train, X_test = train_test_split(files, test_size=0.20, random_state=42)
X_train, X_valid = train_test_split(
X_train, test_size=0.10, random_state=42)
return X_train, X_test, X_valid
def get_labels(data_paths):
labels = []
for path in data_paths:
labels.append(os.path.basename(os.path.dirname(path)))
return labels
def fine_tune(model):
for layer in model.layers[:95]:
layer.trainable = False
for layer in model.layers[95:]:
layer.trainable = True
model.compile(
optimizer=SGD(lr=0.0001, momentum=0.9),
loss='categorical_crossentropy',
metrics=['accuracy'])
def get_data_paths():
data_folders = glob.glob(os.path.join('data', '*'))
train_paths = []
test_paths = []
valid_paths = []
for folder in data_folders:
files = glob.glob(os.path.join(folder, '*.jpg'))
train, test, valid = split(files)
train_paths = train_paths + train
test_paths = test_paths + test
valid_paths = valid_paths + valid
np.random.shuffle(train_paths)
np.random.shuffle(test_paths)
np.random.shuffle(valid_paths)
return np.asarray(train_paths), np.asarray(test_paths), np.asarray(
valid_paths)
def add_new_last_layer(base_model, nb_classes):
"""Add last layer to the convnet
Args:
base_model: keras model excluding top
nb_classes: # of classes
Returns:
new keras model with last layer
"""
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(128, activation='relu')(x) # new FC layer, random init
x = Dense(32, activation='relu')(x) # new FC layer, random init
predictions = Dense(
nb_classes, activation='softmax')(x) # new softmax layer
model = Model(inputs=base_model.input, outputs=predictions)
return model
def setup_to_transfer_learn(model, base_model):
"""Freeze all layers and compile the model"""
for layer in base_model.layers:
layer.trainable = False
model.compile(
optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy'])
def train(args):
import ipdb;
#ipdb.set_trace()
nb_classes = 6
nb_epoch = int(args.nb_epoch)
batch_size = int(args.batch_size)
train_paths, test_paths, valid_paths = get_data_paths()
print(f"No. of Train samples = {len(train_paths)} \n")
print(f"No. of Test samples = {len(test_paths)} \n")
print(f"No. of Valid samples = {len(valid_paths)} \n")
train_labels = get_labels(train_paths)
print(f'For Train = {Counter(train_labels)} \n')
train_labels = np.asarray(one_hot_encoding(train_labels))
test_labels = get_labels(test_paths)
print(f'For Test = {Counter(test_labels)} \n')
test_labels = np.asarray(one_hot_encoding(test_labels))
valid_labels = get_labels(valid_paths)
print(f'For Valid = {Counter(valid_labels)} \n')
valid_labels = np.asarray(one_hot_encoding(valid_labels))
train_images = get_images(train_paths)
test_images = get_images(test_paths)
valid_images = get_images(valid_paths)
# setup model
base_model = MobileNet(input_shape=(224, 224, 3),
weights='imagenet', include_top=False) # Not Icluding the FC layer
model = add_new_last_layer(base_model, nb_classes)
# for i, layer in enumerate(model.layers):
# print(i, layer.name)
# import ipdb; ipdb.set_trace()
for layer in base_model.layers:
layer.trainable = False
model.compile(
optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy'])
# import ipdb; ipdb.set_trace()
weight_train_labels = [np.argmax(r) for r in train_labels]
weights = class_weight.compute_class_weight(
'balanced', np.unique(weight_train_labels), y=weight_train_labels)
class_weights = {0: weights[0], 1: weights[1]}
history = model.fit(
x=train_images,
y=train_labels,
batch_size=batch_size,
epochs=int(nb_epoch),
verbose=1,
shuffle=True,
validation_data=(valid_images, valid_labels))
model.save(args.model)
y_pred_class = model.predict(test_images, verbose=1)
y_pred_class = [np.argmax(r) for r in y_pred_class]
test_y = [np.argmax(r) for r in test_labels]
print('Confusion matrix is \n', confusion_matrix(test_y, y_pred_class))
print(confusion_matrix(test_y, y_pred_class).ravel())
if args.ft:
ft_epochs = args.epoch_ft
fine_tune(model)
history = model.fit(
x=train_images,
y=train_labels,
batch_size=batch_size,
epochs=int(ft_epochs),
verbose=1,
shuffle=True,
validation_data=(valid_images, valid_labels))
model.save(args.model_ft)
print('_______Results After Fine Tuning____________')
y_pred_class = model.predict(test_images, verbose=1)
y_pred_class = [np.argmax(r) for r in y_pred_class]
test_y = [np.argmax(r) for r in test_labels]
print('Confusion matrix is \n', confusion_matrix(test_y, y_pred_class))
print(confusion_matrix(test_y, y_pred_class).ravel())
if __name__ == "__main__":
a = argparse.ArgumentParser()
a.add_argument(
"--nb_epoch",
default=NB_EPOCHS,
help='Number of epochs for Transfer Learning. Default = 1.')
a.add_argument(
"--batch_size",
default=BAT_SIZE,
help='Batch size for training. Default = 32.')
a.add_argument("--model", help='Path to save model to.')
a.add_argument("--model_ft", help='Path to save fine tuned model')
a.add_argument(
"--ft", action="store_true", help='Whether to fine tune model or not')
a.add_argument(
'--epoch_ft',
default=NB_EPOCHS,
help='Number of epochs for Fine-Tuning for model. Default = 1.')
args = a.parse_args()
if args.ft:
print("Please make sure that you have added fine tuning epochs value")
train(args)
Raspberry Pi inference code
pi_trash_classifier.py
Long ago
Feb 15, 2019
You uploaded an item
Text
pi_trash_classifier.py
import keras
#from picamera import PiCamera
#from picamera.array import PiRGBArray
from keras.models import Model, load_model
from keras.applications import mobilenet
from keras.applications.mobilenet import preprocess_input
from keras.preprocessing import image
from keras.utils.generic_utils import CustomObjectScope
import numpy as np
import time
import cv2
import os
img = []
def pp_image():
global img
img = image.load_img('data/trash/trash1.jpg', target_size=(224, 224))
x = image.img_to_array(img)
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)
return np.asarray(x)
prediction_list=['cardboard', 'glass', 'metal', 'paper', 'plastic', 'trash']
model=load_model('models/model1_org.h5', custom_objects={'relu6': mobilenet.relu6})
#camera = PiCamera()
#rawCapture = PiRGBArray(camera)
for i in range(10):
time.sleep(0.5)
try:
#camera.capture(rawCapture, format='rgb')
#img=rawCapture.array
#cv2.imwrite('pic.png', img)
pred_img = pp_image()
yo = model.predict(pred_img)
pred = prediction_list[np.argmax(yo)]
cv2.putText(img, pred, (10,1000), cv2.FONT_HERSHEY_SIMPLEX, 5, (0,0,0), 5, False)
name='img'+str(i)+'.png'
cv2.imwrite(os.path.join('prediction_images', name), img)
#rawCapture.truncate(0)
except:
print('Could not perform prediction')
camera.stop_preview()
Here is the cloud version from Clarify.io
trash_classifier.py
Long ago
Feb 15, 2019
You uploaded an item
Text
trash_classifier.py
import keras
from pypylon import pylon
from keras.models import Model, load_model
from keras.applications import mobilenet
from keras.applications.mobilenet import preprocess_input
from keras.preprocessing import image
from keras.utils.generic_utils import CustomObjectScope
import numpy as np
import matplotlib.pyplot as plt
import time
import cv2
import os
def pp_image(img):
img = image.load_img('pic.png', target_size=(224, 224))
x = image.img_to_array(img)
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)
return np.asarray(x)
prediction_list=['cardboard', 'glass', 'metal', 'paper', 'plastic', 'trash']
model=load_model('models/model1_org.h5', custom_objects={'relu6': mobilenet.relu6})
camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())
numberOfImagesToGrab = 100
camera.StartGrabbingMax(numberOfImagesToGrab)
converter = pylon.ImageFormatConverter()
converter.OutputPixelFormat = pylon.PixelType_BGR8packed
converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned
i=0
while camera.IsGrabbing():
time.sleep(0.005)
grabResult = camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grabResult.GrabSucceeded():
# Access the image data.
print("SizeX: ", grabResult.Width)
print("SizeY: ", grabResult.Height)
#import ipdb; ipdb.set_trace()
img = converter.Convert(grabResult).GetArray()
cv2.imwrite('pic.png', img)
pred_img=pp_image(img)
yo=model.predict(pred_img)
pred=prediction_list[np.argmax(yo)]
cv2.putText(img, pred, (10,1000), cv2.FONT_HERSHEY_SIMPLEX, 5, (0,0,0), 5, False)
name='img'+str(i)+'.png'
cv2.imwrite(os.path.join('prediction_images', name), img)
i=i+1
#print("Gray value of first pixel: ", img[0, 0])
grabResult.Release()
if i==10:
break