#!/usr/bin/env python # -*- coding: utf-8 -*- # # Raspberry Pi Internet Radio playlist utility # $Id: create_m3u.py,v 1.6 2016/04/29 17:22:50 bob Exp $ # # Create playlist files from the following url formats # iPhone stream files (.asx) # Playlist files (.pls) # Extended M3U files (.m3u) # Direct media streams (no extension) # # See Raspberry PI Radio constructors manual for instructions # # Author : Bob Rathbone # Web site : http://www.bobrathbone.com # # # License: GNU V3, See https://www.gnu.org/copyleft/gpl.html # # Disclaimer: Software is provided as is and absolutly no warranties are implied or given. # The authors shall not be liable for any loss or damage however caused. # import os import re import sys import urllib2 from xml.dom.minidom import parseString # Output errors to STDERR stderr = sys.stderr.write; # File locations PlsDirectory = '/var/lib/mpd/playlists/' RadioDir = '/usr/share/radio/' RadioLibDir = '/var/lib/radiod/' StationList = RadioLibDir + 'stationlist' DistFile = RadioDir + 'station.urls' TempDir = '/tmp/radio_stream_files/' PlaylistsDir = RadioDir + 'playlists/' duplicateCount = 0 # Execute system command def execCommand(cmd): p = os.popen(cmd) return p.readline().rstrip('\n') # Create the initial list of files def createList(): if not os.path.isfile(StationList): print 'Creating ' + StationList + '\n' execCommand ("mkdir -p " + RadioLibDir ) print ("cp " + DistFile + ' ' + StationList ) execCommand ("cp " + DistFile + ' ' + StationList) print return # Create M3U output from the title and URL def createM3uOutput(title,url,filenumber): lines = [] title = title.rstrip() url = url.rstrip() lines.append('#EXTM3U') lines.append('#EXTINF:-1,%s' % title) lines.append(url + '#' + title) return lines # Create M3U or M3U file def createM3uFile(filename,output,nlines): global duplicateCount uniqueCount = 1 if len(filename) < 1: filename = 'unknown' outfile = TempDir + filename + '.m3u' # Create unique files exists = True while exists: if os.path.exists(outfile): print "Warning: " + outfile + ' already exists' outfile = TempDir + filename + '[' + str(uniqueCount) + '].m3u' uniqueCount += 1 duplicateCount += 1 else: exists = False try: print 'Creating ' + outfile + '\n' outfile = open(outfile,'w') for item in output: outstr = item.encode('utf8', 'replace') outfile.write(outstr + "\n") outfile.close() except: print "Failed to create",outfile return # Beautify HTML convert tags to lower case def parseHTML(data): lcdata = '' for line in data: lcline = '' line = line.rstrip() line = line.lstrip() line.replace('href =', 'href=') length = len(line) if length < 1: continue tag1right = line.find('>') if tag1right > 1: start_tag = line[0:tag1right+1] lcline = lcline + start_tag.lower() tag2left = line.find('<', tag1right+1) if tag2left > 1: end_tag = line[tag2left:length] parameter = line[tag1right+1:tag2left] lcline = lcline + parameter + end_tag.lower() lcdata = lcdata + lcline return lcdata # Get XML/HTML parameter def getParameter(line): tag1right = line.find('>') tag2left = line.find('<', tag1right+1) parameter = line[tag1right+1:tag2left] return parameter # Create a M3U file from an ASX(XML) file def parseAsx(title,url,data,filenumber): global errorCount global warningCount lcdata = parseHTML(data) try: dom = parseString(lcdata) except Exception,e: print "Error:",e print "Error: Could not parse XML data from,", url + '\n' errorCount += 1 return try: # If title undefined in the station list get it from the file if len(title) < 1: titleTag = dom.getElementsByTagName('title')[0].toxml() title = getParameter(titleTag) except: print "Warning: Title not found in", url pass finally: try: urlTag = dom.getElementsByTagName('ref')[0].toxml() url = urlTag.replace('','') urls = url.split('?') url = urls[0] title = title.rstrip() url = url.rstrip() print 'Title:',title m3ufile = title.replace(' ','_') output = createM3uOutput(title,url,filenumber) except IndexError,e: print "Error:",e print "Error parsing", url errorCount += 1 return "# DOM Error" return output # Create filename from URL def createFileName(title,url): if len(title) > 0: name = title name = name.replace('.',' ') name = name.replace(' ','_') else: try: urlparts = url.rsplit('/',1) site = urlparts[0] siteparts = site.split('/') name = siteparts[2] siteparts = name.split(':') name = siteparts[0] except: name = url name = name.replace('www.','') name = name.replace('.com','') name = name.replace('.','_') name = name.replace('__','_') return name # Create default title def createTitle(url): urlparts = url.rsplit('/',1) site = urlparts[0] siteparts = site.split('/') name = siteparts[2] siteparts = name.split(':') title = siteparts[0] return title # Direct radio stream (MP3 AAC etc) def parseDirect(title,url,filenumber): url = url.replace('(stream)', '') if len(title) < 1: title = createTitle(url) print "Title:",title output = createM3uOutput(title,url,filenumber) return output # Create M3U file from PLS in the temporary directory def parsePls(title,url,lines,filenumber): plstitle = '' plsurl = '' for line in lines: if line.startswith('Title1='): titleline = line.split('=') plstitle = titleline[1] if line.startswith('File1='): fileline = line.split('=') plsurl = fileline[1] # If title undefined in the station list get it from the file if len(title) < 1: if len(plstitle) > 1: title = plstitle else: title = createTitle(url) m3ufile = createFileName(title,url) print 'Title:',title m3ufile = title.replace(' ','_') output = createM3uOutput(title,plsurl,filenumber) return output # Parse M3U file to PLS output def parseM3u(title,lines,filenumber): info = 'Unknown' output = [] for line in lines: line = line.replace('\r','') line = line.replace('\n','') if line.startswith('http:'): url = line elif line.startswith('#EXTINF:'): info = line if len(title) < 1: title = info if len(title) < 1: filename = createFileName(title,url) else: filename = title.replace(' ','_') print 'Title:',title output.append('#EXTM3U') output.append('#EXTINF:-1,%s'% title) output.append('%s#%s'% (url,title)) return output # Usage message def usage(): stderr("\nUsage: %s [--delete_old] [--no_delete] [--help]\n" % sys.argv[0]) stderr("\tWhere: --delete_old Delete old playlists\n") stderr("\t --no_delete Don't delete old playlists\n") stderr("\t --help Display help message\n\n") return # Station definition help message def format(): stderr ("Start a playlist with the name between brackets. For example:\n") stderr ("(BBC Radio Stations)\n") stderr ("This will create a playlist called BBC_Radio_Stations.m3u\n") stderr ("\nThe above is followed by station definitions which take the following format:\n") stderr ("\t[title] http://\n") stderr ("\tExample:\n") stderr ("\t[BBC Radio 3] http://bbc.co.uk/radio/listen/live/r3.asx\n\n") stderr ("End a playlist by inserting a blank line at the end of the list of stations\n") stderr ("or start a new playlist definition.\n\n") return # Start of MAIN script if os.getuid() != 0: print "This program can only be run as root user or using sudo" sys.exit(1) deleteOld = False noDelete = False if len(sys.argv) > 2: stderr("\nError: you may not define more than one parameter at a time\n") usage() sys.exit(1) if len(sys.argv) > 1: param = sys.argv[1] if param == '--delete_old': deleteOld = True elif param == '--no_delete': noDelete = True elif param == '--help': usage() format() sys.exit(0) else: stderr("Invalid parameter %s\n" % param) usage() sys.exit(1) # Create station URL list createList() # Temporary directory - if it exists then delete all pls files from it execCommand ("mkdir -p " + TempDir ) execCommand ("rm -f " + TempDir + '*' ) # Open the list of URLs print "Creating M3U files from", StationList + '\n' lineCount = 0 # Line being processed (Including comments) errorCount = 0 # Errors duplicateCount = 0 # Duplicate file names warningCount = 0 # Warnings processedCount = 0 # Processed station count # Copy user stream files to temporary directory if os.path.exists(PlaylistsDir + '*.m3u'): print "Copying user M3U files from " + PlaylistsDir + " to " + TempDir + '\n' execCommand ("cp -f " + PlaylistsDir + '*.m3u ' + TempDir ) # Playlist file name filename = '' m3u_output = [] filenumber = 1 writeFile = False url = '' # Main processing loop for line in open(StationList,'r'): lineCount += 1 lines = [] newplaylist = '' # Set url types to False isASX = False isM3U = False isPLS = False # Skip commented out or blank lines line = line.rstrip() # Remove line feed if line[:1] == '#': continue # Handle playlist definition in () brackets elif line[:1] == '(': newplaylist = line.strip('(') # Remove left bracket newplaylist = newplaylist.strip(')') # Remove right bracket playlistname = newplaylist newplaylist = newplaylist.replace(' ', '_') if len(filename) > 0: writeFile = True else: print "Playlist:", playlistname filename = newplaylist filenumber = 1 continue if len(line) < 1 or writeFile: if len(filename) < 1 and len(url) > 0: filename = createFileName(title,url) if len(filename) > 0 and len(m3u_output) > 0: createM3uFile(filename,m3u_output,filenumber-1) filenumber = 1 m3u_output = [] filename = '' url = '' if len(newplaylist) > 0: filename = newplaylist continue if writeFile and len(line) > 0: writeFile = False else: continue # Check start of title defined elif line[:1] != '[': stderr("Error: Missing left bracket [ in line %s in %s\n" % (lineCount,StationList)) format() errorCount += 1 continue processedCount += 1 line = line.lstrip('[') # Get title and URL parts line = line.strip() lineparts = line.split(']') # Should be 2 parts (title and url) if len(lineparts) != 2: stderr("Error: Missing right bracket [ in line %s in %s\n" % (lineCount,StationList)) format() errorCount += 1 continue # Get title and URL from station definition title = lineparts[0].lstrip() url = lineparts[1].lstrip() # Get the published URL and determine its type print 'Processing line ' + str(lineCount) + ': ' + url # Extended M3U (MPEG 3 URL) format if url.endswith('.m3u'): isM3U = True # Advanced Stream Redirector (ASX) elif url.endswith('.asx'): isASX = True # Playlist format elif url.endswith('.pls'): isPLS = True # Advanced Audio Coding stream (Don't retrieve any URL) else: # Remove redundant (stream) parameter url = url.replace('(stream)', '') m3u_output += parseDirect(title,url,filenumber) if len(filename) < 1: filename = createFileName(title,url) writeFile = True filenumber += 1 continue # Set timeout depending on the parameter if deleteOld: timeo = 1 else: timeo = 10 # Get the published URL to the stream file try: file = urllib2.urlopen(url, timeout=timeo) data = file.read() file.close() # Creat list from data lines = data.split('\n') firstline = lines[0].rstrip() except: print "Error: Failed to retrieve ",title, url errorCount += 1 continue # process lines accoording to URL type if isPLS: m3u_output += parsePls(title,url,lines,filenumber) elif isM3U: m3u_output += parseM3u(title,lines,filenumber) elif isASX: if firstline.startswith(' 0 and len(m3u_output) > 0: createM3uFile(filename,m3u_output,filenumber-1) print ("Processed %s station URLs from %s" % (processedCount,StationList)) # Copy files from temporary directory to playlist directory oldfiles = len(os.listdir(PlsDirectory)) if oldfiles > 0: if not deleteOld and not noDelete: stderr("There are %s old playlist files in the %s directory.\n" % (oldfiles-2,PlsDirectory)) stderr("Do you wish to remove the old files y/n: ") answer = raw_input("") if answer == 'y': deleteOld = True if deleteOld: stderr ("\nRemoving old playlists from directory %s\n" % PlsDirectory) execCommand ("rm -f " + PlsDirectory + "*.pls" ) execCommand ("rm -f " + PlsDirectory + "*.m3u" ) else: print "Old playlist files not removed" copiedCount = len(os.listdir(TempDir)) print "Copying %s new playlist files to directory %s" % (copiedCount,PlsDirectory) execCommand ("cp -f " + TempDir + '* ' + PlsDirectory ) # Create summary report print "\nNew radio playlist files will be found in " + PlsDirectory if errorCount > 0: print str(errorCount) + " error(s)" if duplicateCount > 0: print str(duplicateCount) + " duplicate file name(s) found and renamed." warningCount += duplicateCount if warningCount > 0: print str(warningCount) + " warning(s)" # End of script