区块链技术近年来发展迅速,不仅在金融领域有广泛应用,在其他行业也逐渐崭露头角。今天,我们就来探讨10个使用Python开发的区块链应用案例,帮助大家更好地理解和应用这项技术。
1. 简单的区块链实现
首先,我们从一个简单的区块链实现开始。这个例子将帮助你理解区块链的基本结构和工作原理。
import hashlib
import time
class Block:
def __init__(self, index, previous_hash, timestamp, data, hash):
self.index = index
self.previous_hash = previous_hash
self.timestamp = timestamp
self.data = data
self.hash = hash
def calculate_hash(index, previous_hash, timestamp, data):
value = str(index) + str(previous_hash) + str(timestamp) + str(data)
return hashlib.sha256(value.encode('utf-8')).hexdigest()
def create_genesis_block():
return Block(0, "0", int(time.time()), "Genesis Block", calculate_hash(0, "0", int(time.time()), "Genesis Block"))
def create_new_block(previous_block, data):
index = previous_block.index + 1
timestamp = int(time.time())
hash = calculate_hash(index, previous_block.hash, timestamp, data)
return Block(index, previous_block.hash, timestamp, data, hash)
# 创建创世块
blockchain = [create_genesis_block()]
previous_block = blockchain[0]
# 添加新的区块
num_of_blocks_to_add = 5
for i in range(num_of_blocks_to_add):
new_block_data = f"Block {i+1} data"
new_block = create_new_block(previous_block, new_block_data)
blockchain.append(new_block)
previous_block = new_block
print(f"Block #{new_block.index} has been added to the blockchain!")
print(f"Hash: {new_block.hash}\n")
2. 基于Flask的简单区块链API
接下来,我们使用Flask框架创建一个简单的区块链API,以便可以通过HTTP请求与区块链交互。
from flask import Flask, request, jsonify
import hashlib
import time
app = Flask(__name__)
class Block:
def __init__(self, index, previous_hash, timestamp, data, hash):
self.index = index
self.previous_hash = previous_hash
self.timestamp = timestamp
self.data = data
self.hash = hash
def calculate_hash(index, previous_hash, timestamp, data):
value = str(index) + str(previous_hash) + str(timestamp) + str(data)
return hashlib.sha256(value.encode('utf-8')).hexdigest()
def create_genesis_block():
return Block(0, "0", int(time.time()), "Genesis Block", calculate_hash(0, "0", int(time.time()), "Genesis Block"))
def create_new_block(previous_block, data):
index = previous_block.index + 1
timestamp = int(time.time())
hash = calculate_hash(index, previous_block.hash, timestamp, data)
return Block(index, previous_block.hash, timestamp, data, hash)
blockchain = [create_genesis_block()]
previous_block = blockchain[0]
@app.route('/mine', methods=['POST'])
def mine():
data = request.json['data']
new_block = create_new_block(previous_block, data)
blockchain.append(new_block)
global previous_block
previous_block = new_block
response = {
'index': new_block.index,
'previous_hash': new_block.previous_hash,
'timestamp': new_block.timestamp,
'data': new_block.data,
'hash': new_block.hash
}
return jsonify(response), 200
@app.route('/chain', methods=['GET'])
def full_chain():
response = {
'chain': [block.__dict__ for block in blockchain],
'length': len(blockchain),
}
return jsonify(response), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
3. 分布式区块链网络
为了实现分布式区块链网络,我们可以使用Peer-to-Peer (P2P)技术。这里我们使用socket
库来实现一个简单的P2P网络。
import socket
import threading
import json
# 定义节点类
class Node:
def __init__(self, host, port):
self.host = host
self.port = port
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind((host, port))
self.server.listen(5)
self.nodes = []
self.blockchain = []
def start(self):
print(f"Node started on {self.host}:{self.port}")
while True:
client_socket, client_address = self.server.accept()
self.nodes.append(client_socket)
threading.Thread(target=self.handle_client, args=(client_socket,)).start()
def handle_client(self, client_socket):
while True:
try:
message = client_socket.recv(1024).decode('utf-8')
if message:
print(f"Received: {message}")
self.broadcast(message)
except:
client_socket.close()
self.nodes.remove(client_socket)
break
def broadcast(self, message):
for node in self.nodes:
try:
node.sendall(message.encode('utf-8'))
except:
continue
def send_message(self, message):
for node in self.nodes:
try:
node.sendall(message.encode('utf-8'))
except:
continue
if __name__ == '__main__':
node = Node('127.0.0.1.txt', 5000)
threading.Thread(target=node.start).start()
# 模拟发送消息
time.sleep(2)
node.send_message(json.dumps({'type': 'block', 'data': 'Block 1.txt'}))
4. 区块链投票系统
区块链可以用于构建透明且不可篡改的投票系统。以下是一个简单的投票系统的实现。
class VoteBlock:
def __init__(self, index, previous_hash, timestamp, vote, hash):
self.index = index
self.previous_hash = previous_hash
self.timestamp = timestamp
self.vote = vote
self.hash = hash
def calculate_vote_hash(index, previous_hash, timestamp, vote):
value = str(index) + str(previous_hash) + str(timestamp) + str(vote)
return hashlib.sha256(value.encode('utf-8')).hexdigest()
def create_genesis_vote_block():
return VoteBlock(0, "0", int(time.time()), "Genesis Vote", calculate_vote_hash(0, "0", int(time.time()), "Genesis Vote"))
def create_new_vote_block(previous_block, vote):
index = previous_block.index + 1
timestamp = int(time.time())
hash = calculate_vote_hash(index, previous_block.hash, timestamp, vote)
return VoteBlock(index, previous_block.hash, timestamp, vote, hash)
vote_blockchain = [create_genesis_vote_block()]
previous_vote_block = vote_blockchain[0]
def add_vote(vote):
new_vote_block = create_new_vote_block(previous_vote_block, vote)
vote_blockchain.append(new_vote_block)
global previous_vote_block
previous_vote_block = new_vote_block
print(f"Vote Block #{new_vote_block.index} has been added to the blockchain!")
print(f"Hash: {new_vote_block.hash}\n")
# 测试投票
add_vote("Candidate A")
add_vote("Candidate B")
add_vote("Candidate A")
5. 区块链供应链管理
区块链可以用于提高供应链的透明度和安全性。以下是一个简单的供应链管理系统的实现。
class SupplyChainBlock:
def __init__(self, index, previous_hash, timestamp, product_info, hash):
self.index = index
self.previous_hash = previous_hash
self.timestamp = timestamp
self.product_info = product_info
self.hash = hash
def calculate_supply_chain_hash(index, previous_hash, timestamp, product_info):
value = str(index) + str(previous_hash) + str(timestamp) + str(product_info)
return hashlib.sha256(value.encode('utf-8')).hexdigest()
def create_genesis_supply_chain_block():
return SupplyChainBlock(0, "0", int(time.time()), "Genesis Product", calculate_supply_chain_hash(0, "0", int(time.time()), "Genesis Product"))
def create_new_supply_chain_block(previous_block, product_info):
index = previous_block.index + 1
timestamp = int(time.time())
hash = calculate_supply_chain_hash(index, previous_block.hash, timestamp, product_info)
return SupplyChainBlock(index, previous_block.hash, timestamp, product_info, hash)
supply_chain_blockchain = [create_genesis_supply_chain_block()]
previous_supply_chain_block = supply_chain_blockchain[0]
def add_product(product_info):
new_supply_chain_block = create_new_supply_chain_block(previous_supply_chain_block, product_info)
supply_chain_blockchain.append(new_supply_chain_block)
global previous_supply_chain_block
previous_supply_chain_block = new_supply_chain_block
print(f"Supply Chain Block #{new_supply_chain_block.index} has been added to the blockchain!")
print(f"Hash: {new_supply_chain_block.hash}\n")
# 测试添加产品
add_product({"product_id": 1, "name": "Apple", "origin": "China"})
add_product({"product_id": 2, "name": "Banana", "origin": "India"})
6. 区块链身份验证
区块链可以用于实现安全的身份验证系统。以下是一个简单的身份验证系统的实现。
class IdentityBlock:
def __init__(self, index, previous_hash, timestamp, identity_info, hash):
self.index = index
self.previous_hash = previous_hash
self.timestamp = timestamp
self.identity_info = identity_info
self.hash = hash
def calculate_identity_hash(index, previous_hash, timestamp, identity_info):
value = str(index) + str(previous_hash) + str(timestamp) + str(identity_info)
return hashlib.sha256(value.encode('utf-8')).hexdigest()
def create_genesis_identity_block():
return IdentityBlock(0, "0", int(time.time()), "Genesis Identity", calculate_identity_hash(0, "0", int(time.time()), "Genesis Identity"))
def create_new_identity_block(previous_block, identity_info):
index = previous_block.index + 1
timestamp = int(time.time())
hash = calculate_identity_hash(index, previous_block.hash, timestamp, identity_info)
return IdentityBlock(index, previous_block.hash, timestamp, identity_info, hash)
identity_blockchain = [create_genesis_identity_block()]
previous_identity_block = identity_blockchain[0]
def add_identity(identity_info):
new_identity_block = create_new_identity_block(previous_identity_block, identity_info)
identity_blockchain.append(new_identity_block)
global previous_identity_block
previous_identity_block = new_identity_block
print(f"Identity Block #{new_identity_block.index} has been added to the blockchain!")
print(f"Hash: {new_identity_block.hash}\n")
# 测试添加身份信息
add_identity({"user_id": 1, "name": "Alice", "email": "alice@example.com"})
add_identity({"user_id": 2, "name": "Bob", "email": "bob@example.com"})
7. 区块链数字版权管理
区块链可以用于保护数字版权,确保创作者的权益。以下是一个简单的数字版权管理系统的实现。
class DigitalRightBlock:
def __init__(self, index, previous_hash, timestamp, right_info, hash):
self.index = index
self.previous_hash = previous_hash
self.timestamp = timestamp
self.right_info = right_info
self.hash = hash
def calculate_digital_right_hash(index, previous_hash, timestamp, right_info):
value = str(index) + str(previous_hash) + str(timestamp) + str(right_info)
return hashlib.sha256(value.encode('utf-8')).hexdigest()
def create_genesis_digital_right_block():
return DigitalRightBlock(0, "0", int(time.time()), "Genesis Right", calculate_digital_right_hash(0, "0", int(time.time()), "Genesis Right"))
def create_new_digital_right_block(previous_block, right_info):
index = previous_block.index + 1
timestamp = int(time.time())
hash = calculate_digital_right_hash(index, previous_block.hash, timestamp, right_info)
return DigitalRightBlock(index, previous_block.hash, timestamp, right_info, hash)
digital_right_blockchain = [create_genesis_digital_right_block()]
previous_digital_right_block = digital_right_blockchain[0]
def add_right(right_info):
new_digital_right_block = create_new_digital_right_block(previous_digital_right_block, right_info)
digital_right_blockchain.append(new_digital_right_block)
global previous_digital_right_block
previous_digital_right_block = new_digital_right_block
print(f"Digital Right Block #{new_digital_right_block.index} has been added to the blockchain!")
print(f"Hash: {new_digital_right_block.hash}\n")
# 测试添加数字版权信息
add_right({"content_id": 1, "creator": "Alice", "rights": "All rights reserved"})
add_right({"content_id": 2, "creator": "Bob", "rights": "Creative Commons"})
8. 区块链医疗记录管理
区块链可以用于存储和管理医疗记录,确保数据的安全性和隐私性。以下是一个简单的医疗记录管理系统的实现。
class MedicalRecordBlock:
def __init__(self, index, previous_hash, timestamp, record_info, hash):
self.index = index
self.previous_hash = previous_hash
self.timestamp = timestamp
self.record_info = record_info
self.hash = hash
def calculate_medical_record_hash(index, previous_hash, timestamp, record_info):
value = str(index) + str(previous_hash) + str(timestamp) + str(record_info)
return hashlib.sha256(value.encode('utf-8')).hexdigest()
def create_genesis_medical_record_block():
return MedicalRecordBlock(0, "0", int(time.time()), "Genesis Record", calculate_medical_record_hash(0, "0", int(time.time()), "Genesis Record"))
def create_new_medical_record_block(previous_block, record_info):
index = previous_block.index + 1
timestamp = int(time.time())
hash = calculate_medical_record_hash(index, previous_block.hash, timestamp, record_info)
return MedicalRecordBlock(index, previous_block.hash, timestamp, record_info, hash)
medical_record_blockchain = [create_genesis_medical_record_block()]
previous_medical_record_block = medical_record_blockchain[0]
def add_record(record_info):
new_medical_record_block = create_new_medical_record_block(previous_medical_record_block, record_info)
medical_record_blockchain.append(new_medical_record_block)
global previous_medical_record_block
previous_medical_record_block = new_medical_record_block
print(f"Medical Record Block #{new_medical_record_block.index} has been added to the blockchain!")
print(f"Hash: {new_medical_record_block.hash}\n")
# 测试添加医疗记录
add_record({"patient_id": 1, "doctor": "Dr. Smith", "diagnosis": "Flu"})
add_record({"patient_id": 2, "doctor": "Dr. Johnson", "diagnosis": "Cold"})
9. 区块链房地产交易
区块链可以用于记录和验证房地产交易,确保交易的透明性和安全性。以下是一个简单的房地产交易系统的实现。
class RealEstateBlock:
def __init__(self, index, previous_hash, timestamp, transaction_info, hash):
self.index = index
self.previous_hash = previous_hash
self.timestamp = timestamp
self.transaction_info = transaction_info
self.hash = hash
def calculate_real_estate_hash(index, previous_hash, timestamp, transaction_info):
value = str(index) + str(previous_hash) + str(timestamp) + str(transaction_info)
return hashlib.sha256(value.encode('utf-8')).hexdigest()
def create_genesis_real_estate_block():
return RealEstateBlock(0, "0", int(time.time()), "Genesis Transaction", calculate_real_estate_hash(0, "0", int(time.time()), "Genesis Transaction"))
def create_new_real_estate_block(previous_block, transaction_info):
index = previous_block.index + 1
timestamp = int(time.time())
hash = calculate_real_estate_hash(index, previous_block.hash, timestamp, transaction_info)
return RealEstateBlock(index, previous_block.hash, timestamp, transaction_info, hash)
real_estate_blockchain = [create_genesis_real_estate_block()]
previous_real_estate_block = real_estate_blockchain[0]
def add_transaction(transaction_info):
new_real_estate_block = create_new_real_estate_block(previous_real_estate_block, transaction_info)
real_estate_blockchain.append(new_real_estate_block)
global previous_real_estate_block
previous_real_estate_block = new_real_estate_block
print(f"Real Estate Block #{new_real_estate_block.index} has been added to the blockchain!")
print(f"Hash: {new_real_estate_block.hash}\n")
# 测试添加房地产交易
add_transaction({"property_id": 1, "seller": "Alice", "buyer": "Bob", "price": 500000})
add_transaction({"property_id": 2, "seller": "Charlie", "buyer": "David", "price": 750000})
10. 区块链智能合约
智能合约是区块链的一个重要应用,可以自动执行合同条款。以下是一个简单的智能合约的实现。
class SmartContractBlock:
def __init__(self, index, previous_hash, timestamp, contract_info, hash):
self.index = index
self.previous_hash = previous_hash
self.timestamp = timestamp
self.contract_info = contract_info
self.hash = hash
def calculate_smart_contract_hash(index, previous_hash, timestamp, contract_info):
value = str(index) + str(previous_hash) + str(timestamp) + str(contract_info)
return hashlib.sha256(value.encode('utf-8')).hexdigest()
def create_genesis_smart_contract_block():
return SmartContractBlock(0, "0", int(time.time()), "Genesis Contract", calculate_smart_contract_hash(0, "0", int(time.time()), "Genesis Contract"))
def create_new_smart_contract_block(previous_block, contract_info):
index = previous_block.index + 1
timestamp = int(time.time())
hash = calculate_smart_contract_hash(index, previous_block.hash, timestamp, contract_info)
return SmartContractBlock(index, previous_block.hash, timestamp, contract_info, hash)
smart_contract_blockchain = [create_genesis_smart_contract_block()]
previous_smart_contract_block = smart_contract_blockchain[0]
def add_contract(contract_info):
new_smart_contract_block = create_new_smart_contract_block(previous_smart_contract_block, contract_info)
smart_contract_blockchain.append(new_smart_contract_block)
global previous_smart_contract_block
previous_smart_contract_block = new_smart_contract_block
print(f"Smart Contract Block #{new_smart_contract_block.index} has been added to the blockchain!")
print(f"Hash: {new_smart_contract_block.hash}\n")
# 测试添加智能合约
add_contract({"contract_id": 1, "parties": ["Alice", "Bob"], "terms": "Alice will pay Bob $1000 if the weather is sunny on 2023-10-01"})
add_contract({"contract_id": 2, "parties": ["Charlie", "David"], "terms": "Charlie will deliver a package to David by 2023-10-15"})
实战案例:基于区块链的投票系统
假设我们要为一个社区活动开发一个投票系统,确保投票过程的透明性和不可篡改性。我们将使用前面提到的投票系统代码,并增加一些额外的功能,如用户注册和投票验证。
import hashlib
import time
import json
class VoteBlock:
def __init__(self, index, previous_hash, timestamp, vote, hash):
self.index = index
self.previous_hash = previous_hash
self.timestamp = timestamp
self.vote = vote
self.hash = hash
def calculate_vote_hash(index, previous_hash, timestamp, vote):
value = str(index) + str(previous_hash) + str(timestamp) + str(vote)
return hashlib.sha256(value.encode('utf-8')).hexdigest()
def create_genesis_vote_block():
return VoteBlock(0, "0", int(time.time()), "Genesis Vote", calculate_vote_hash(0, "0", int(time.time()), "Genesis Vote"))
def create_new_vote_block(previous_block, vote):
index = previous_block.index + 1
timestamp = int(time.time())
hash = calculate_vote_hash(index, previous_block.hash, timestamp, vote)
return VoteBlock(index, previous_block.hash, timestamp, vote, hash)
vote_blockchain = [create_genesis_vote_block()]
previous_vote_block = vote_blockchain[0]
users = {}
def register_user(user_id, name):
if user_id not in users:
users[user_id] = {'name': name, 'voted': False}
print(f"User {name} registered with ID: {user_id}")
else:
print(f"User with ID {user_id} already exists.")
def add_vote(user_id, candidate):
if user_id in users and not users[user_id]['voted']:
new_vote_block = create_new_vote_block(previous_vote_block, {'user_id': user_id, 'candidate': candidate})
vote_blockchain.append(new_vote_block)
global previous_vote_block
previous_vote_block = new_vote_block
users[user_id]['voted'] = True
print(f"Vote Block #{new_vote_block.index} has been added to the blockchain!")
print(f"Hash: {new_vote_block.hash}\n")
else:
print(f"User with ID {user_id} has already voted or does not exist.")
# 注册用户
register_user(1, "Alice")
register_user(2, "Bob")
register_user(3, "Charlie")
# 用户投票
add_vote(1, "Candidate A")
add_vote(2, "Candidate B")
add_vote(3, "Candidate A")
# 查看区块链
for block in vote_blockchain:
print(f"Index: {block.index}, Hash: {block.hash}, Vote: {block.vote}")
总结
本文介绍了10个使用Python开发的区块链应用案例,包括简单的区块链实现、基于Flask的区块链API、分布式区块链网络、区块链投票系统、区块链供应链管理、区块链身份验证、区块链数字版权管理、区块链医疗记录管理、区块链房地产交易和区块链智能合约。每个案例都提供了详细的代码示例和解释,帮助你更好地理解和应用区块链技术。
好了,今天的分享就到这里了,我们下期见。如果本文对你有帮助,请动动你可爱的小手指点赞、转发、在看吧!
付费合集推荐
文末福利
公众号消息窗口回复“编程资料”,获取Python编程、人工智能、爬虫等100+本精品电子书。